Metadata Stores¶
Metaxy abstracts interactions with metadata behind an interface called MetadataStore.
Users can extend this class to implement support for arbitrary metadata storage such as databases, lakehouse formats, or really any kind of external system.
Learn how to use metadata stores here.
Here are some of the built-in metadata store types (1):
- the full list can be found here
Databases¶
Storage¶
Metadata Store Interface¶
metaxy.MetadataStore
¶
MetadataStore(
*,
name: str | None = None,
hash_algorithm: HashAlgorithm | None = None,
versioning_engine: VersioningEngineOptions = "auto",
fallback_stores: Sequence[MetadataStore | str]
| None = None,
auto_create_tables: bool | None = None,
materialization_id: str | None = None,
)
Bases: ABC
Abstract base class for metadata storage backends.
Parameters:
-
name(str | None, default:None) βOptional name for this store. For representation purposes only. Is typically included into
.display(). -
hash_algorithm(HashAlgorithm | None, default:None) βHash algorithm to use for the versioning engine.
-
versioning_engine(VersioningEngineOptions, default:'auto') βWhich versioning engine to use.
-
"auto": Prefer the store's native engine and fall back to Polars if needed
-
"native": Always use the store's native engine, raise
VersioningEngineMismatchErrorif at some point the metadata store has to switch to Polars -
"polars": Always use the Polars engine
-
-
fallback_stores(Sequence[MetadataStore | str] | None, default:None) βOrdered list of read-only fallback stores. Used when upstream features are not in this store.
VersioningEngineMismatchErroris not raised when reading from fallback stores. -
auto_create_tables(bool | None, default:None) βWhether to automatically create tables when writing new features. If not provided, the global Metaxy configuration
auto_created_tablesoption will be used (which can be set viaMETAXY_AUTO_CREATE_TABLESenv var).Warning
This is intended for development/testing purposes. Use proper database migration tools like Alembic to manage table infrastructure in production.
-
materialization_id(str | None, default:None) βOptional external orchestration ID. If provided, all metadata writes will include this ID in the
metaxy_materialization_idcolumn. Can be overridden perMetadataStore.writecall.
Raises:
-
ValueErrorβIf fallback stores use different hash algorithms or truncation lengths
-
VersioningEngineMismatchErrorβIf the versioning engine is attempted to be switched to Polars and
versioning_engineis set tonative.
Source code in src/metaxy/metadata_store/base.py
def __init__(
self,
*,
name: str | None = None,
hash_algorithm: HashAlgorithm | None = None,
versioning_engine: VersioningEngineOptions = "auto",
fallback_stores: Sequence[MetadataStore | str] | None = None,
auto_create_tables: bool | None = None,
materialization_id: str | None = None,
):
"""
Initialize the metadata store.
Args:
name: Optional name for this store. For representation purposes only.
Is typically included into `.display()`.
hash_algorithm: Hash algorithm to use for the versioning engine.
versioning_engine: Which versioning engine to use.
- "auto": Prefer the store's native engine and fall back to Polars if needed
- "native": Always use the store's native engine, raise `VersioningEngineMismatchError`
if at some point the metadata store has to switch to Polars
- "polars": Always use the Polars engine
fallback_stores: Ordered list of read-only fallback stores.
Used when upstream features are not in this store.
`VersioningEngineMismatchError` is not raised when reading from fallback stores.
auto_create_tables: Whether to automatically create tables when writing new features.
If not provided, the global Metaxy configuration `auto_created_tables` option will
be used (which can be set via `METAXY_AUTO_CREATE_TABLES` env var).
!!! warning
This is intended for development/testing purposes.
Use proper database migration tools like Alembic to manage table infrastructure in production.
materialization_id: Optional external orchestration ID.
If provided, all metadata writes will include this ID in the `metaxy_materialization_id` column.
Can be overridden per [`MetadataStore.write`][metaxy.MetadataStore.write] call.
Raises:
ValueError: If fallback stores use different hash algorithms or truncation lengths
VersioningEngineMismatchError: If the versioning engine is attempted to be switched
to Polars and `versioning_engine` is set to `native`.
"""
self._name = name
self._is_open = False
self._context_depth = 0
self._versioning_engine = versioning_engine
self._materialization_id = materialization_id
self._open_cm: AbstractContextManager[Self] | None = None
self._transaction_timestamp: datetime | None = None
self._soft_delete_in_progress: bool = False
if auto_create_tables is None:
from metaxy.config import MetaxyConfig
self.auto_create_tables = MetaxyConfig.get().auto_create_tables
else:
self.auto_create_tables = auto_create_tables
if hash_algorithm is None:
hash_algorithm = self._get_default_hash_algorithm()
self.hash_algorithm = hash_algorithm
from metaxy.metadata_store.fallback import FallbackStoreList
self.fallback_stores: FallbackStoreList = (
fallback_stores
if isinstance(fallback_stores, FallbackStoreList)
else FallbackStoreList(fallback_stores or [])
)
Attributes¶
metaxy.MetadataStore.materialization_id
property
¶
materialization_id: str | None
The external orchestration ID for this store instance.
If set, all metadata writes include this ID in the metaxy_materialization_id column,
allowing filtering of rows written during a specific materialization run.
metaxy.MetadataStore.qualified_class_name
property
¶
qualified_class_name: str
The fully qualified class name (module.classname).
Functions¶
metaxy.MetadataStore.resolve_update
¶
resolve_update(
feature: CoercibleToFeatureKey,
*,
samples: IntoFrame | Frame | None = None,
filters: Mapping[CoercibleToFeatureKey, Sequence[Expr]]
| None = None,
global_filters: Sequence[Expr] | None = None,
target_filters: Sequence[Expr] | None = None,
lazy: Literal[False] = False,
versioning_engine: Literal["auto", "native", "polars"]
| None = None,
skip_comparison: bool = False,
**kwargs: Any,
) -> Increment
resolve_update(
feature: CoercibleToFeatureKey,
*,
samples: IntoFrame | Frame | None = None,
filters: Mapping[CoercibleToFeatureKey, Sequence[Expr]]
| None = None,
global_filters: Sequence[Expr] | None = None,
target_filters: Sequence[Expr] | None = None,
lazy: Literal[True],
versioning_engine: Literal["auto", "native", "polars"]
| None = None,
skip_comparison: bool = False,
**kwargs: Any,
) -> LazyIncrement
Calculate an incremental update for a feature.
This is the main workhorse in Metaxy.
Parameters:
-
feature(CoercibleToFeatureKey) βFeature class to resolve updates for
-
samples(IntoFrame | Frame | None, default:None) βA dataframe with joined upstream metadata and
"metaxy_provenance_by_field"column set. When provided,MetadataStoreskips loading upstream feature metadata and provenance calculations.Required for root features
Metaxy doesn't know how to populate input metadata for root features, so
samplesargument for must be provided for them.Tip
For non-root features, use
samplesto customize the automatic upstream loading and field provenance calculation. For example, it can be used to requires processing for specific sample IDs.Setting this parameter during normal operations is not required.
-
filters(Mapping[CoercibleToFeatureKey, Sequence[Expr]] | None, default:None) βA mapping from feature keys to lists of Narwhals filter expressions. Keys can be feature classes, FeatureKey objects, or string paths. Applied at read-time. May filter the current feature, in this case it will also be applied to
samples(if provided). Example:{UpstreamFeature: [nw.col("x") > 10], ...} -
global_filters(Sequence[Expr] | None, default:None) βA list of Narwhals filter expressions applied to all features (both upstream and target). These filters are combined with any feature-specific filters from
filters. Must reference columns that exist in ALL features. Useful for filtering by common columns likesample_uidacross all features. Example:[nw.col("sample_uid").is_in(["s1", "s2"])] -
target_filters(Sequence[Expr] | None, default:None) βA list of Narwhals filter expressions applied ONLY to the target feature (not to upstream features). Use this when filtering by columns that only exist in the target feature. Example:
[nw.col("height").is_null()] -
lazy(bool, default:False) βWhether to return a
LazyIncrementor aIncrement. -
versioning_engine(Literal['auto', 'native', 'polars'] | None, default:None) βOverride the store's versioning engine for this operation.
-
skip_comparison(bool, default:False) βIf True, skip the increment comparison logic and return all upstream samples in
increment.new. Thechangedandremovedframes will be empty.
Raises:
-
ValueErrorβIf no
samplesdataframe has been provided when resolving an update for a root feature. -
VersioningEngineMismatchErrorβIf
versioning_enginehas been set to"native"and a dataframe of a different implementation has been encountered duringresolve_update.
Note
This method automatically loads feature definitions from the metadata store before computing the update. This ensures that any external feature dependencies are resolved with their actual definitions from the store, preventing incorrect version calculations from stale external feature definitions.
With a root feature
import narwhals as nw
import polars as pl
samples = pl.DataFrame(
{
"id": ["x", "y", "z"],
"metaxy_provenance_by_field": [
{"part_1": "h1", "part_2": "h2"},
{"part_1": "h3", "part_2": "h4"},
{"part_1": "h5", "part_2": "h6"},
],
}
)
with store.open(mode="w"):
result = store.resolve_update(MyFeature, samples=nw.from_native(samples))
Source code in src/metaxy/metadata_store/base.py
def resolve_update(
self,
feature: CoercibleToFeatureKey,
*,
samples: IntoFrame | Frame | None = None,
filters: Mapping[CoercibleToFeatureKey, Sequence[nw.Expr]] | None = None,
global_filters: Sequence[nw.Expr] | None = None,
target_filters: Sequence[nw.Expr] | None = None,
lazy: bool = False,
versioning_engine: Literal["auto", "native", "polars"] | None = None,
skip_comparison: bool = False,
**kwargs: Any,
) -> Increment | LazyIncrement:
"""Calculate an incremental update for a feature.
This is the main workhorse in Metaxy.
Args:
feature: Feature class to resolve updates for
samples: A dataframe with joined upstream metadata and `"metaxy_provenance_by_field"` column set.
When provided, `MetadataStore` skips loading upstream feature metadata and provenance calculations.
!!! info "Required for root features"
Metaxy doesn't know how to populate input metadata for root features,
so `samples` argument for **must** be provided for them.
!!! tip
For non-root features, use `samples` to customize the automatic upstream loading and field provenance calculation.
For example, it can be used to requires processing for specific sample IDs.
Setting this parameter during normal operations is not required.
filters: A mapping from feature keys to lists of Narwhals filter expressions.
Keys can be feature classes, FeatureKey objects, or string paths.
Applied at read-time. May filter the current feature,
in this case it will also be applied to `samples` (if provided).
Example: `{UpstreamFeature: [nw.col("x") > 10], ...}`
global_filters: A list of Narwhals filter expressions applied to all features
(both upstream and target). These filters are combined with any feature-specific
filters from `filters`. Must reference columns that exist in ALL features.
Useful for filtering by common columns like `sample_uid` across all features.
Example: `[nw.col("sample_uid").is_in(["s1", "s2"])]`
target_filters: A list of Narwhals filter expressions applied ONLY to the target
feature (not to upstream features). Use this when filtering by columns that
only exist in the target feature.
Example: `[nw.col("height").is_null()]`
lazy: Whether to return a [`LazyIncrement`][metaxy.versioning.types.LazyIncrement] or a [`Increment`][metaxy.versioning.types.Increment].
versioning_engine: Override the store's versioning engine for this operation.
skip_comparison: If True, skip the increment comparison logic and return all
upstream samples in `increment.new`. The `changed` and `removed` frames will
be empty.
Raises:
ValueError: If no `samples` dataframe has been provided when resolving an update for a root feature.
VersioningEngineMismatchError: If `versioning_engine` has been set to `"native"`
and a dataframe of a different implementation has been encountered during `resolve_update`.
!!! note
This method automatically loads feature definitions from the metadata store
before computing the update. This ensures that any external feature dependencies
are resolved with their actual definitions from the store, preventing incorrect
version calculations from stale external feature definitions.
!!! example "With a root feature"
```py
import narwhals as nw
import polars as pl
samples = pl.DataFrame(
{
"id": ["x", "y", "z"],
"metaxy_provenance_by_field": [
{"part_1": "h1", "part_2": "h2"},
{"part_1": "h3", "part_2": "h4"},
{"part_1": "h5", "part_2": "h6"},
],
}
)
with store.open(mode="w"):
result = store.resolve_update(MyFeature, samples=nw.from_native(samples))
```
"""
import narwhals as nw
import metaxy as mx
from metaxy.config import MetaxyConfig
# Sync external feature definitions from the store to replace any external feature placeholders.
# This ensures version hashes are computed correctly against actual stored definitions.
# it is acceptable to call this here automatically for three reasons:
# 1. `resolve_update` is typically only called once at the start of the workflow
# 2. `resolve_update` is already doing heavy computations so an extra little call won't hurt performance
# 3. it is extremely important to get the result right
if MetaxyConfig.get(_allow_default_config=True).sync:
mx.sync_external_features(self)
# Convert samples to Narwhals frame if not already
samples_nw: nw.DataFrame[Any] | nw.LazyFrame[Any] | None = None
if samples is not None:
if isinstance(samples, (nw.DataFrame, nw.LazyFrame)):
samples_nw = samples
else:
samples_nw = nw.from_native(samples) # ty: ignore[invalid-assignment]
# Normalize filter keys to FeatureKey
normalized_filters: dict[FeatureKey, list[nw.Expr]] = {}
if filters:
for key, exprs in filters.items():
feature_key = self._resolve_feature_key(key)
normalized_filters[feature_key] = list(exprs)
# Convert global_filters and target_filters to lists for easy concatenation
global_filter_list = list(global_filters) if global_filters else []
target_filter_list = list(target_filters) if target_filters else []
feature_key = self._resolve_feature_key(feature)
if self._is_system_table(feature_key):
raise NotImplementedError("Delete operations are not yet supported for system tables.")
graph = current_graph()
plan = graph.get_feature_plan(feature_key)
# Root features without samples: error (samples required)
if not plan.deps and samples_nw is None:
raise ValueError(
f"Feature {feature_key} has no upstream dependencies (root feature). "
f"Must provide 'samples' parameter with sample_uid and {METAXY_PROVENANCE_BY_FIELD} columns. "
f"Root features require manual {METAXY_PROVENANCE_BY_FIELD} computation."
)
# Combine feature-specific filters, global filters, and target filters for current feature
# target_filters are ONLY applied to the current feature, not to upstream features
current_feature_filters = [
*normalized_filters.get(feature_key, []),
*global_filter_list,
*target_filter_list,
]
# Read current metadata with deduplication (with_sample_history=False by default)
# Use allow_fallback=False since we only want metadata from THIS store
# to determine what needs to be updated locally
try:
current_metadata: nw.LazyFrame[Any] | None = self.read(
feature_key,
filters=current_feature_filters if current_feature_filters else None,
allow_fallback=False,
with_feature_history=False, # filters by current feature_version
with_sample_history=False, # deduplicates by id_columns, keeping latest
)
except FeatureNotFoundError:
current_metadata = None
upstream_by_key: dict[FeatureKey, nw.LazyFrame[Any]] = {}
filters_by_key: dict[FeatureKey, list[nw.Expr]] = {}
# if samples are provided, use them as source of truth for upstream data
if samples_nw is not None:
# Apply filters to samples if any
filtered_samples = samples_nw
if current_feature_filters:
filtered_samples = samples_nw.filter(current_feature_filters)
# fill in METAXY_PROVENANCE column if it's missing (e.g. for root features)
samples_nw = self.hash_struct_version_column(
plan,
df=filtered_samples,
struct_column=METAXY_PROVENANCE_BY_FIELD,
hash_column=METAXY_PROVENANCE,
)
# For root features, add data_version columns if they don't exist
# (root features have no computation, so data_version equals provenance)
# Use collect_schema().names() to avoid PerformanceWarning on lazy frames
if METAXY_DATA_VERSION_BY_FIELD not in samples_nw.collect_schema().names():
samples_nw = samples_nw.with_columns(
nw.col(METAXY_PROVENANCE_BY_FIELD).alias(METAXY_DATA_VERSION_BY_FIELD),
nw.col(METAXY_PROVENANCE).alias(METAXY_DATA_VERSION),
)
else:
for upstream_spec in plan.deps or []:
# Combine feature-specific filters with global filters for upstream
upstream_filters = [
*normalized_filters.get(upstream_spec.key, []),
*global_filter_list,
]
upstream_feature_metadata = self.read(
upstream_spec.key,
filters=upstream_filters if upstream_filters else None,
)
if upstream_feature_metadata is not None:
upstream_by_key[upstream_spec.key] = upstream_feature_metadata
# determine which implementation to use for resolving the increment
# consider (1) whether all upstream metadata has been loaded with the native implementation
# (2) if samples have native implementation
# Use parameter if provided, otherwise use store default
engine_mode = versioning_engine if versioning_engine is not None else self._versioning_engine
# If "polars" mode, force Polars immediately
if engine_mode == "polars":
implementation = nw.Implementation.POLARS
switched_to_polars = True
else:
implementation = self.native_implementation()
switched_to_polars = False
for upstream_key, df in upstream_by_key.items():
if df.implementation != implementation:
switched_to_polars = True
# Only raise error in "native" mode if no fallback stores configured.
# If fallback stores exist, the implementation mismatch indicates data came
# from fallback (different implementation), which is legitimate fallback access.
# If data were local, it would have the native implementation.
if engine_mode == "native" and not self.fallback_stores:
raise VersioningEngineMismatchError(
f"versioning_engine='native' but upstream feature `{upstream_key.to_string()}` "
f"has implementation {df.implementation}, expected {self.native_implementation()}"
)
elif engine_mode == "auto" or (engine_mode == "native" and self.fallback_stores):
PolarsMaterializationWarning.warn_on_implementation_mismatch(
expected=self.native_implementation(),
actual=df.implementation,
message=f"Using Polars for resolving the increment instead. This was caused by upstream feature `{upstream_key.to_string()}`.",
)
implementation = nw.Implementation.POLARS
break
if samples_nw is not None and samples_nw.implementation != self.native_implementation():
if not switched_to_polars:
if engine_mode == "native":
# Always raise error for samples with wrong implementation, regardless
# of fallback stores, because samples come from user argument, not from fallback
raise VersioningEngineMismatchError(
f"versioning_engine='native' but provided `samples` have implementation {samples_nw.implementation}, "
f"expected {self.native_implementation()}"
)
elif engine_mode == "auto":
PolarsMaterializationWarning.warn_on_implementation_mismatch(
expected=self.native_implementation(),
actual=samples_nw.implementation,
message=f"Provided `samples` have implementation {samples_nw.implementation}. Using Polars for resolving the increment instead.",
)
implementation = nw.Implementation.POLARS
switched_to_polars = True
if switched_to_polars:
if current_metadata:
current_metadata = switch_implementation_to_polars(current_metadata)
if samples_nw:
samples_nw = switch_implementation_to_polars(samples_nw)
for upstream_key, df in upstream_by_key.items():
upstream_by_key[upstream_key] = switch_implementation_to_polars(df)
with self.create_versioning_engine(plan=plan, implementation=implementation) as engine:
if skip_comparison:
# Skip comparison: return all upstream samples as added
if samples_nw is not None:
# Root features or user-provided samples: use samples directly
# Note: samples already has metaxy_provenance computed
added = samples_nw.lazy()
input_df = None # Root features have no upstream input
else:
# Non-root features: load all upstream with provenance
added = engine.load_upstream_with_provenance(
upstream=upstream_by_key,
hash_algo=self.hash_algorithm,
filters=filters_by_key,
)
input_df = added # Input is the same as added when skipping comparison
changed = None
removed = None
else:
added, changed, removed, input_df = engine.resolve_increment_with_provenance(
current=current_metadata,
upstream=upstream_by_key,
hash_algorithm=self.hash_algorithm,
filters=filters_by_key,
sample=samples_nw.lazy() if samples_nw is not None else None,
)
# Convert None to empty DataFrames
if changed is None:
changed = empty_frame_like(added)
if removed is None:
removed = empty_frame_like(added)
if lazy:
return LazyIncrement(
new=added if isinstance(added, nw.LazyFrame) else nw.from_native(added),
stale=changed if isinstance(changed, nw.LazyFrame) else nw.from_native(changed),
orphaned=removed if isinstance(removed, nw.LazyFrame) else nw.from_native(removed),
input=input_df if input_df is None or isinstance(input_df, nw.LazyFrame) else nw.from_native(input_df),
)
else:
return Increment(
new=added.collect() if isinstance(added, nw.LazyFrame) else added,
stale=changed.collect() if isinstance(changed, nw.LazyFrame) else changed,
orphaned=removed.collect() if isinstance(removed, nw.LazyFrame) else removed,
)
metaxy.MetadataStore.compute_provenance
¶
compute_provenance(
feature: CoercibleToFeatureKey, df: FrameT
) -> FrameT
Compute provenance columns for a DataFrame with pre-joined upstream data.
Note
This method may be useful in very rare cases.
Rely on MetadataStore.resolve_update instead.
Use this method when you perform custom joins outside of Metaxy's auto-join system but still want Metaxy to compute provenance. The method computes metaxy_provenance_by_field, metaxy_provenance, metaxy_data_version_by_field, and metaxy_data_version columns based on the upstream metadata.
Info
The input DataFrame must contain the renamed metaxy_data_version_by_field
columns from each upstream feature. The naming convention follows the pattern
metaxy_data_version_by_field__<feature_key.to_column_suffix()>. For example, for an
upstream feature with key ["video", "raw"], the column should be named
metaxy_data_version_by_field__video_raw.
Parameters:
-
feature(CoercibleToFeatureKey) βThe feature to compute provenance for.
-
df(FrameT) βA DataFrame containing pre-joined upstream data with renamed metaxy_data_version_by_field columns from each upstream feature.
Returns:
-
FrameTβThe input DataFrame with provenance columns added. Returns the same
-
FrameTβframe type as the input, either an eager DataFrame or a LazyFrame.
Raises:
-
StoreNotOpenErrorβIf the store is not open.
-
ValueErrorβIf required upstream
metaxy_data_version_by_fieldcolumns are missing from the DataFrame.
Example
# Read upstream metadata
video_df = store.read(VideoFeature).collect()
audio_df = store.read(AudioFeature).collect()
# Rename data_version_by_field columns to the expected convention
video_df = video_df.rename({
"metaxy_data_version_by_field": "metaxy_data_version_by_field__video_raw"
})
audio_df = audio_df.rename({
"metaxy_data_version_by_field": "metaxy_data_version_by_field__audio_raw"
})
# Perform custom join
joined = video_df.join(audio_df, on="sample_uid", how="inner")
# Compute provenance
with_provenance = store.compute_provenance(MyFeature, joined)
# Pass to resolve_update
increment = store.resolve_update(MyFeature, samples=with_provenance)
Source code in src/metaxy/metadata_store/base.py
def compute_provenance(
self,
feature: CoercibleToFeatureKey,
df: FrameT,
) -> FrameT:
"""Compute provenance columns for a DataFrame with pre-joined upstream data.
!!! note
This method may be useful in very rare cases.
Rely on [`MetadataStore.resolve_update`][metaxy.metadata_store.base.MetadataStore.resolve_update] instead.
Use this method when you perform custom joins outside of Metaxy's auto-join
system but still want Metaxy to compute provenance. The method computes
metaxy_provenance_by_field, metaxy_provenance, metaxy_data_version_by_field,
and metaxy_data_version columns based on the upstream metadata.
!!! info
The input DataFrame must contain the renamed metaxy_data_version_by_field
columns from each upstream feature. The naming convention follows the pattern
`metaxy_data_version_by_field__<feature_key.to_column_suffix()>`. For example, for an
upstream feature with key `["video", "raw"]`, the column should be named
`metaxy_data_version_by_field__video_raw`.
Args:
feature: The feature to compute provenance for.
df: A DataFrame containing pre-joined upstream data with renamed
metaxy_data_version_by_field columns from each upstream feature.
Returns:
The input DataFrame with provenance columns added. Returns the same
frame type as the input, either an eager DataFrame or a LazyFrame.
Raises:
StoreNotOpenError: If the store is not open.
ValueError: If required upstream `metaxy_data_version_by_field` columns
are missing from the DataFrame.
Example:
<!-- skip next -->
```py
# Read upstream metadata
video_df = store.read(VideoFeature).collect()
audio_df = store.read(AudioFeature).collect()
# Rename data_version_by_field columns to the expected convention
video_df = video_df.rename({
"metaxy_data_version_by_field": "metaxy_data_version_by_field__video_raw"
})
audio_df = audio_df.rename({
"metaxy_data_version_by_field": "metaxy_data_version_by_field__audio_raw"
})
# Perform custom join
joined = video_df.join(audio_df, on="sample_uid", how="inner")
# Compute provenance
with_provenance = store.compute_provenance(MyFeature, joined)
# Pass to resolve_update
increment = store.resolve_update(MyFeature, samples=with_provenance)
```
"""
self._check_open()
feature_key = self._resolve_feature_key(feature)
graph = current_graph()
plan = graph.get_feature_plan(feature_key)
# Use native implementation if DataFrame matches, otherwise fall back to Polars
implementation = self.native_implementation()
if df.implementation != implementation:
implementation = nw.Implementation.POLARS
df = switch_implementation_to_polars(df) # ty: ignore[no-matching-overload]
with self.create_versioning_engine(plan=plan, implementation=implementation) as engine:
# Validate required upstream columns exist
expected_columns = {
dep.feature: engine.get_renamed_data_version_by_field_col(dep.feature)
for dep in (plan.feature_deps or [])
}
df_columns = set(df.collect_schema().names()) # ty: ignore[invalid-argument-type]
missing_columns = [
f"{col} (from upstream feature {key.to_string()})"
for key, col in expected_columns.items()
if col not in df_columns
]
if missing_columns:
raise ValueError(
f"DataFrame is missing required upstream columns for computing "
f"provenance of feature {feature_key.to_string()}. "
f"Missing columns: {missing_columns}. "
f"Make sure to rename metaxy_data_version_by_field columns from "
f"each upstream feature using the pattern "
f"metaxy_data_version_by_field__<feature_key.table_name>."
)
return engine.compute_provenance_columns(df, hash_algo=self.hash_algorithm) # ty: ignore[invalid-argument-type]
metaxy.MetadataStore.read
¶
read(
feature: CoercibleToFeatureKey,
*,
feature_version: str | None = None,
filters: Sequence[Expr] | None = None,
columns: Sequence[str] | None = None,
allow_fallback: bool = True,
with_feature_history: bool = False,
with_sample_history: bool = False,
include_soft_deleted: bool = False,
with_store_info: Literal[False] = False,
) -> LazyFrame[Any]
read(
feature: CoercibleToFeatureKey,
*,
feature_version: str | None = None,
filters: Sequence[Expr] | None = None,
columns: Sequence[str] | None = None,
allow_fallback: bool = True,
with_feature_history: bool = False,
with_sample_history: bool = False,
include_soft_deleted: bool = False,
with_store_info: Literal[True],
) -> tuple[LazyFrame[Any], MetadataStore]
Read metadata with optional fallback to upstream stores.
By default, does not include:
- rows from historical feature versions (configured via with_feature_history=False)
- rows that have been overwritten by subsequent writes with the same feature version (configured via with_sample_history=False)
- soft-deleted with metaxy_deleted_at set to a non-null value (configured via include_soft_deleted=False)
Parameters:
-
feature(CoercibleToFeatureKey) βFeature to read metadata for
-
feature_version(str | None, default:None) βExplicit feature_version to filter by (mutually exclusive with with_feature_history=False)
-
filters(Sequence[Expr] | None, default:None) βSequence of Narwhals filter expressions to apply to this feature. Example:
[nw.col("x") > 10, nw.col("y") < 5] -
columns(Sequence[str] | None, default:None) βSubset of columns to include. Metaxy's system columns are always included.
-
allow_fallback(bool, default:True) βWhether to allow fallback to upstream stores if the requested feature is not found in the current store.
-
with_feature_history(bool, default:False) βIf True, include rows from all historical feature versions. By default (False), only returns rows with the currently registered feature version.
-
with_sample_history(bool, default:False) βIf True, include all historical materializations per sample. By default (False), deduplicates samples within
id_columnsgroups ordered bymetaxy_created_at. -
include_soft_deleted(bool, default:False) βIf
True, include soft-deleted rows in the result. Previous historical materializations of the same feature version will be effectively removed from the output otherwise. -
with_store_info(bool, default:False) βIf
True, return a tuple of (LazyFrame, MetadataStore) where the MetadataStore is the store that actually contained the feature (which may be a fallback store if allow_fallback=True).
Returns:
-
LazyFrame[Any] | tuple[LazyFrame[Any], MetadataStore]βNarwhals
LazyFramewith metadata, or, ifwith_store_info=True, a tuple of (LazyFrame,MetadataStore) containing the metadata store which has been actually used to retrieve the feature (may be a fallback store).
Raises:
-
FeatureNotFoundErrorβIf feature not found in any store
-
SystemDataNotFoundErrorβWhen attempting to read non-existent Metaxy system data
-
ValueErrorβIf both feature_version and with_feature_history=False are provided
Info
When this method is called with default arguments, it will return the latest (by metaxy_created_at)
metadata for the current feature version excluding soft-deleted rows. Therefore, it's perfectly suitable
for most use cases.
Warning
The order of rows is not guaranteed.
Source code in src/metaxy/metadata_store/base.py
def read(
self,
feature: CoercibleToFeatureKey,
*,
feature_version: str | None = None,
filters: Sequence[nw.Expr] | None = None,
columns: Sequence[str] | None = None,
allow_fallback: bool = True,
with_feature_history: bool = False,
with_sample_history: bool = False,
include_soft_deleted: bool = False,
with_store_info: bool = False,
) -> nw.LazyFrame[Any] | tuple[nw.LazyFrame[Any], MetadataStore]:
"""
Read metadata with optional fallback to upstream stores.
By default, does not include:
- rows from historical feature versions (configured via `with_feature_history=False`)
- rows that have been overwritten by subsequent writes with the same feature version (configured via `with_sample_history=False`)
- soft-deleted with `metaxy_deleted_at` set to a non-null value (configured via `include_soft_deleted=False`)
Args:
feature: Feature to read metadata for
feature_version: Explicit feature_version to filter by (mutually exclusive with with_feature_history=False)
filters: Sequence of Narwhals filter expressions to apply to this feature.
Example: `[nw.col("x") > 10, nw.col("y") < 5]`
columns: Subset of columns to include. Metaxy's system columns are always included.
allow_fallback: Whether to allow fallback to upstream stores if the requested feature is not found in the current store.
with_feature_history: If True, include rows from all historical feature versions.
By default (False), only returns rows with the currently registered feature version.
with_sample_history: If True, include all historical materializations per sample.
By default (False), deduplicates samples within `id_columns` groups ordered by `metaxy_created_at`.
include_soft_deleted: If `True`, include soft-deleted rows in the result. Previous historical materializations of the same feature version will be effectively removed from the output otherwise.
with_store_info: If `True`, return a tuple of (LazyFrame, MetadataStore) where
the MetadataStore is the store that actually contained the feature (which
may be a fallback store if allow_fallback=True).
Returns:
Narwhals `LazyFrame` with metadata, or, if `with_store_info=True`, a tuple of (`LazyFrame`, `MetadataStore`) containing the metadata store which has been actually used to retrieve the feature (may be a fallback store).
Raises:
FeatureNotFoundError: If feature not found in any store
SystemDataNotFoundError: When attempting to read non-existent Metaxy system data
ValueError: If both feature_version and with_feature_history=False are provided
!!! info
When this method is called with default arguments, it will return the latest (by `metaxy_created_at`)
metadata for the current feature version excluding soft-deleted rows. Therefore, it's perfectly suitable
for most use cases.
!!! warning
The order of rows is not guaranteed.
"""
import metaxy as mx
from metaxy.config import MetaxyConfig
self._check_open()
filters = filters or []
columns = columns or []
feature_key = self._resolve_feature_key(feature)
is_system_table = self._is_system_table(feature_key)
# Sync external features if auto-sync is enabled (default)
# This call is a no-op most of the time and is very lightweight when it's not
# Skip for system tables to avoid infinite recursion (sync_external_features reads system tables)
if not is_system_table and MetaxyConfig.get(_allow_default_config=True).sync:
mx.sync_external_features(self)
# If caller wants soft-deleted records, do not filter them out later
filter_deleted = not include_soft_deleted and not is_system_table
# Validate mutually exclusive parameters
if feature_version is not None and not with_feature_history:
raise ValueError(
"Cannot specify both feature_version and with_feature_history=False. "
"Use with_feature_history=True with feature_version parameter."
)
# Separate system filters (applied before dedup) from user filters (applied after dedup)
# System filters like feature_version need to be applied early to reduce data volume
# User filters should see the deduplicated view of the data
system_filters: list[nw.Expr] = []
user_filters = list(filters) if filters else []
# Add feature_version filter only when needed (this is a system filter)
if not with_feature_history or feature_version is not None and not is_system_table:
version_filter = nw.col(METAXY_FEATURE_VERSION) == (
current_graph().get_feature_version(feature_key) if not with_feature_history else feature_version
)
system_filters.append(version_filter)
# If user filters are provided, we need to read all columns since filters may
# reference columns not in the requested columns list. Column selection happens
# after filtering
if user_filters:
read_columns = None
elif columns and not is_system_table:
# Add only system columns that aren't already in the user's columns list
columns_set = set(columns)
missing_system_cols = [c for c in ALL_SYSTEM_COLUMNS if c not in columns_set]
read_columns = [*columns, *missing_system_cols]
else:
read_columns = None
lazy_frame = None
try:
# Only pass system filters to _read_feature
# User filters will be applied after deduplication
lazy_frame = self._read_feature(
feature, filters=system_filters if system_filters else None, columns=read_columns
)
except FeatureNotFoundError as e:
# do not read system features from fallback stores
if is_system_table:
raise SystemDataNotFoundError(
f"System Metaxy data with key {feature_key} is missing in {self}. Invoke `metaxy push` before attempting to read system data."
) from e
# Handle case where _read_feature returns None (no exception raised)
if lazy_frame is None and is_system_table:
raise SystemDataNotFoundError(
f"System Metaxy data with key {feature_key} is missing in {self}. Invoke `metaxy push` before attempting to read system data."
)
if lazy_frame is not None and not is_system_table:
# Deduplicate first, then filter soft-deleted rows
if not with_sample_history:
id_cols = list(self._resolve_feature_plan(feature_key).feature.id_columns)
# Treat soft-deletes like hard deletes by ordering on the
# most recent lifecycle timestamp.
lazy_frame = self.versioning_engine_cls.keep_latest_by_group(
df=lazy_frame,
group_columns=id_cols,
timestamp_columns=[METAXY_DELETED_AT, METAXY_UPDATED_AT],
)
if filter_deleted:
lazy_frame = lazy_frame.filter(nw.col(METAXY_DELETED_AT).is_null())
# Apply user filters AFTER deduplication so they see the latest version of each row
for user_filter in user_filters:
lazy_frame = lazy_frame.filter(user_filter)
# For system tables, apply user filters directly (no dedup needed)
if lazy_frame is not None and is_system_table:
for user_filter in user_filters:
lazy_frame = lazy_frame.filter(user_filter)
if lazy_frame is not None:
# After dedup and user filters, filter to requested columns if specified
if columns:
lazy_frame = lazy_frame.select(columns)
if with_store_info:
return lazy_frame, self
return lazy_frame
# Try fallback stores (opened on demand)
if allow_fallback:
for store in self.fallback_stores:
try:
# Open fallback store on demand for reading
with store:
# Use full read to handle nested fallback chains
if with_store_info:
return store.read(
feature,
feature_version=feature_version,
filters=filters,
columns=columns,
allow_fallback=True,
with_feature_history=with_feature_history,
with_sample_history=with_sample_history,
include_soft_deleted=include_soft_deleted,
with_store_info=True,
)
return store.read(
feature,
feature_version=feature_version,
filters=filters,
columns=columns,
allow_fallback=True,
with_feature_history=with_feature_history,
with_sample_history=with_sample_history,
include_soft_deleted=include_soft_deleted,
)
except FeatureNotFoundError:
# Try next fallback store
continue
# Not found anywhere
raise FeatureNotFoundError(
f"Feature {feature_key.to_string()} not found in store" + (" or fallback stores" if allow_fallback else "")
)
metaxy.MetadataStore.write
¶
write(
feature: CoercibleToFeatureKey,
df: IntoFrame,
materialization_id: str | None = None,
) -> None
Write metadata for a feature (append-only by design).
Automatically adds the Metaxy system columns, unless they already exist in the DataFrame.
Parameters:
-
feature(CoercibleToFeatureKey) βFeature to write metadata for
-
df(IntoFrame) βMetadata DataFrame of any type supported by Narwhals. Must have
metaxy_provenance_by_fieldcolumn of type Struct with fields matching feature's fields. Optionally, may also containmetaxy_data_version_by_field. -
materialization_id(str | None, default:None) βOptional external orchestration ID for this write. Overrides the store's default
materialization_idif provided. Useful for tracking which orchestration run produced this metadata.
Raises:
-
MetadataSchemaErrorβIf DataFrame schema is invalid
-
StoreNotOpenErrorβIf store is not open
Note:
- Must be called within a MetadataStore.open(mode="w") context manager.
- Metaxy always performs an "append" operation. Metadata is never deleted or mutated.
- Fallback stores are never used for writes.
Source code in src/metaxy/metadata_store/base.py
def write(
self,
feature: CoercibleToFeatureKey,
df: IntoFrame,
materialization_id: str | None = None,
) -> None:
"""
Write metadata for a feature (append-only by design).
Automatically adds the Metaxy system columns, unless they already exist in the DataFrame.
Args:
feature: Feature to write metadata for
df: Metadata DataFrame of any type supported by [Narwhals](https://narwhals-dev.github.io/narwhals/).
Must have `metaxy_provenance_by_field` column of type Struct with fields matching feature's fields.
Optionally, may also contain `metaxy_data_version_by_field`.
materialization_id: Optional external orchestration ID for this write.
Overrides the store's default `materialization_id` if provided.
Useful for tracking which orchestration run produced this metadata.
Raises:
MetadataSchemaError: If DataFrame schema is invalid
StoreNotOpenError: If store is not open
Note:
- Must be called within a `MetadataStore.open(mode="w")` context manager.
- Metaxy always performs an "append" operation. Metadata is never deleted or mutated.
- Fallback stores are never used for writes.
"""
self._check_open()
feature_key = self._resolve_feature_key(feature)
is_system_table = self._is_system_table(feature_key)
# Convert Polars to Narwhals to Polars if needed
# if isinstance(df_nw, (pl.DataFrame, pl.LazyFrame)):
df_nw = nw.from_native(df)
assert isinstance(df_nw, (nw.DataFrame, nw.LazyFrame)), f"df must be a Narwhals DataFrame, got {type(df_nw)}"
# For system tables, write directly without feature_version tracking
if is_system_table:
self._validate_schema_system_table(df_nw)
self._write_feature(feature_key, df_nw)
return
# Use collect_schema().names() to avoid PerformanceWarning on lazy frames
if METAXY_PROVENANCE_BY_FIELD not in df_nw.collect_schema().names():
from metaxy.metadata_store.exceptions import MetadataSchemaError
raise MetadataSchemaError(f"DataFrame must have '{METAXY_PROVENANCE_BY_FIELD}' column")
# Add all required system columns
# warning: for dataframes that do not match the native MetadataStore implementation
# and are missing the METAXY_DATA_VERSION column, this call will lead to materializing the equivalent Polars DataFrame
# while calculating the missing METAXY_DATA_VERSION column
df_nw = self._add_system_columns(df_nw, feature, materialization_id=materialization_id)
self._validate_schema(df_nw)
self._write_feature(feature_key, df_nw)
metaxy.MetadataStore.write_multi
¶
Write metadata for multiple features in reverse topological order.
Processes features so that dependents are written before their dependencies. This ordering ensures that downstream features are written first, which can be useful for certain data consistency requirements or when features need to be processed in a specific order.
Parameters:
-
metadata(Mapping[Any, IntoFrame]) βMapping from feature keys to metadata DataFrames. Keys can be any type coercible to FeatureKey (string, sequence, FeatureKey, or BaseFeature class). Values must be DataFrames compatible with Narwhals, containing required system columns.
-
materialization_id(str | None, default:None) βOptional external orchestration ID for all writes. Overrides the store's default
materialization_idif provided. Applied to all feature writes in this batch.
Raises:
-
MetadataSchemaErrorβIf any DataFrame schema is invalid
-
StoreNotOpenErrorβIf store is not open
-
ValueErrorβIf writing to a feature from a different project than expected
Note
- Must be called within a
MetadataStore.open(mode="w")context manager. - Empty mappings are handled gracefully (no-op).
- Each feature's metadata is written via
write, so all validation and system column handling from that method applies.
Example
Source code in src/metaxy/metadata_store/base.py
def write_multi(
self,
metadata: Mapping[Any, IntoFrame],
materialization_id: str | None = None,
) -> None:
"""
Write metadata for multiple features in reverse topological order.
Processes features so that dependents are written before their dependencies.
This ordering ensures that downstream features are written first, which can
be useful for certain data consistency requirements or when features need
to be processed in a specific order.
Args:
metadata: Mapping from feature keys to metadata DataFrames.
Keys can be any type coercible to FeatureKey (string, sequence,
FeatureKey, or BaseFeature class). Values must be DataFrames
compatible with Narwhals, containing required system columns.
materialization_id: Optional external orchestration ID for all writes.
Overrides the store's default `materialization_id` if provided.
Applied to all feature writes in this batch.
Raises:
MetadataSchemaError: If any DataFrame schema is invalid
StoreNotOpenError: If store is not open
ValueError: If writing to a feature from a different project than expected
Note:
- Must be called within a `MetadataStore.open(mode="w")` context manager.
- Empty mappings are handled gracefully (no-op).
- Each feature's metadata is written via `write`, so all
validation and system column handling from that method applies.
Example:
<!-- skip next -->
```py
with store.open(mode="w"):
store.write_multi(
{
ChildFeature: child_df,
ParentFeature: parent_df,
}
)
# Features are written in reverse topological order:
# ChildFeature first, then ParentFeature
```
"""
if not metadata:
return
# Build mapping from resolved keys to dataframes in one pass
resolved_metadata = {self._resolve_feature_key(key): df for key, df in metadata.items()}
# Get reverse topological order (dependents first)
graph = current_graph()
sorted_keys = graph.topological_sort_features(list(resolved_metadata.keys()), descending=True)
# Write metadata in reverse topological order
for feature_key in sorted_keys:
self.write(
feature_key,
resolved_metadata[feature_key],
materialization_id=materialization_id,
)
metaxy.MetadataStore.config_model
abstractmethod
classmethod
¶
config_model() -> type[MetadataStoreConfig]
Return the configuration model class for this store type.
Subclasses must override this to return their specific config class.
Returns:
-
type[MetadataStoreConfig]βThe config class type (e.g., DuckDBMetadataStoreConfig)
Note
Subclasses override this with a more specific return type. Type checkers may show a warning about incompatible override, but this is intentional - each store returns its own config type.
Source code in src/metaxy/metadata_store/base.py
@classmethod
@abstractmethod
def config_model(cls) -> type[MetadataStoreConfig]:
"""Return the configuration model class for this store type.
Subclasses must override this to return their specific config class.
Returns:
The config class type (e.g., DuckDBMetadataStoreConfig)
Note:
Subclasses override this with a more specific return type.
Type checkers may show a warning about incompatible override,
but this is intentional - each store returns its own config type.
"""
...
metaxy.MetadataStore.from_config
classmethod
¶
from_config(
config: MetadataStoreConfig, **kwargs: Any
) -> Self
Create a store instance from a configuration object.
This method creates a store by: 1. Converting the config to a dict 2. Resolving fallback store names to actual store instances 3. Calling the store's init with the config parameters
Parameters:
-
config(MetadataStoreConfig) βConfiguration object (should be the type returned by config_model())
-
**kwargs(Any, default:{}) βAdditional arguments passed directly to the store constructor (e.g., materialization_id for runtime parameters not in config)
Returns:
-
SelfβA new store instance configured according to the config object
Example
Source code in src/metaxy/metadata_store/base.py
@classmethod
def from_config(cls, config: MetadataStoreConfig, **kwargs: Any) -> Self:
"""Create a store instance from a configuration object.
This method creates a store by:
1. Converting the config to a dict
2. Resolving fallback store names to actual store instances
3. Calling the store's __init__ with the config parameters
Args:
config: Configuration object (should be the type returned by config_model())
**kwargs: Additional arguments passed directly to the store constructor
(e.g., materialization_id for runtime parameters not in config)
Returns:
A new store instance configured according to the config object
Example:
<!-- skip next -->
```python
from metaxy.ext.metadata_stores.duckdb import (
DuckDBMetadataStore,
DuckDBMetadataStoreConfig,
)
config = DuckDBMetadataStoreConfig(
database="metadata.db",
fallback_stores=["prod"],
)
store = DuckDBMetadataStore.from_config(config)
```
"""
from metaxy.metadata_store.fallback import FallbackStoreList
config_dict = config.model_dump(exclude_unset=True)
fallback_store_names = config_dict.pop("fallback_stores", [])
store = cls(**config_dict, **kwargs)
if fallback_store_names:
store.fallback_stores = FallbackStoreList(
fallback_store_names,
config=MetaxyConfig.get(),
parent_hash_algorithm=store.hash_algorithm,
)
return store
metaxy.MetadataStore.native_implementation
¶
native_implementation() -> Implementation
metaxy.MetadataStore.create_versioning_engine
¶
create_versioning_engine(
plan: FeaturePlan, implementation: Implementation
) -> Iterator[VersioningEngine | PolarsVersioningEngine]
Creates an appropriate provenance engine.
Falls back to Polars implementation if the required implementation differs from the store's native implementation.
Parameters:
-
plan(FeaturePlan) βThe feature plan.
-
implementation(Implementation) βThe desired engine implementation.
Returns:
-
Iterator[VersioningEngine | PolarsVersioningEngine]βAn appropriate provenance engine.
Source code in src/metaxy/metadata_store/base.py
@contextmanager
def create_versioning_engine(
self, plan: FeaturePlan, implementation: nw.Implementation
) -> Iterator[VersioningEngine | PolarsVersioningEngine]:
"""
Creates an appropriate provenance engine.
Falls back to Polars implementation if the required implementation differs from the store's native implementation.
Args:
plan: The feature plan.
implementation: The desired engine implementation.
Returns:
An appropriate provenance engine.
"""
if implementation == nw.Implementation.POLARS:
cm = self._create_polars_versioning_engine(plan)
elif implementation == self.native_implementation():
cm = self._create_versioning_engine(plan)
else:
cm = self._create_polars_versioning_engine(plan)
with cm as engine:
yield engine
metaxy.MetadataStore.open
abstractmethod
¶
open(mode: AccessMode = 'r') -> Iterator[Self]
Open/initialize the store for operations.
Context manager that opens the store with specified access mode.
Called internally by __enter__.
Child classes should implement backend-specific connection setup/teardown here.
Parameters:
-
mode(AccessMode, default:'r') βAccess mode for this connection session.
Yields:
-
Self(Self) βThe store instance with connection open
Note
Users should prefer using with store: pattern except when write access mode is needed.
Source code in src/metaxy/metadata_store/base.py
@abstractmethod
@contextmanager
def open(self, mode: AccessMode = "r") -> Iterator[Self]:
"""Open/initialize the store for operations.
Context manager that opens the store with specified access mode.
Called internally by `__enter__`.
Child classes should implement backend-specific connection setup/teardown here.
Args:
mode: Access mode for this connection session.
Yields:
Self: The store instance with connection open
Note:
Users should prefer using `with store:` pattern except when write access mode is needed.
"""
...
metaxy.MetadataStore.__enter__
¶
Enter context manager - opens store in READ mode by default.
Use MetadataStore.open for write access mode instead.
Returns:
-
Self(Self) βThe opened store instance
Source code in src/metaxy/metadata_store/base.py
def __enter__(self) -> Self:
"""Enter context manager - opens store in READ mode by default.
Use [`MetadataStore.open`][metaxy.metadata_store.base.MetadataStore.open] for write access mode instead.
Returns:
Self: The opened store instance
"""
# Determine mode based on auto_create_tables
mode = "w" if self.auto_create_tables else "r"
# Open the store (open() manages _context_depth internally)
self._open_cm = self.open(mode) # ty: ignore[invalid-assignment]
self._open_cm.__enter__() # ty: ignore[possibly-missing-attribute]
return self
metaxy.MetadataStore.validate_hash_algorithm
¶
validate_hash_algorithm(
check_fallback_stores: bool = True,
) -> None
Validate that hash algorithm is supported by this store's components.
Public method - can be called to verify hash compatibility.
Parameters:
-
check_fallback_stores(bool, default:True) βIf True, also validate hash is supported by fallback stores (ensures compatibility for future cross-store operations)
Raises:
-
ValueErrorβIf hash algorithm not supported by components or fallback stores
Source code in src/metaxy/metadata_store/base.py
def validate_hash_algorithm(
self,
check_fallback_stores: bool = True,
) -> None:
"""Validate that hash algorithm is supported by this store's components.
Public method - can be called to verify hash compatibility.
Args:
check_fallback_stores: If True, also validate hash is supported by
fallback stores (ensures compatibility for future cross-store operations)
Raises:
ValueError: If hash algorithm not supported by components or fallback stores
"""
# Validate hash algorithm support without creating a full engine
# (engine creation requires a graph which isn't available during store init)
self._validate_hash_algorithm_support()
# Check fallback stores
if check_fallback_stores:
for fallback in self.fallback_stores:
fallback.validate_hash_algorithm(check_fallback_stores=False)
metaxy.MetadataStore.drop_feature_metadata
¶
drop_feature_metadata(
feature: CoercibleToFeatureKey,
) -> None
Drop all metadata for a feature.
This removes all stored metadata for the specified feature from the store. Useful for cleanup in tests or when re-computing feature metadata from scratch.
Warning
This operation is irreversible and will permanently delete all metadata for the specified feature.
Parameters:
-
feature(CoercibleToFeatureKey) βFeature class or key to drop metadata for
Example
Source code in src/metaxy/metadata_store/base.py
def drop_feature_metadata(self, feature: CoercibleToFeatureKey) -> None:
"""Drop all metadata for a feature.
This removes all stored metadata for the specified feature from the store.
Useful for cleanup in tests or when re-computing feature metadata from scratch.
Warning:
This operation is irreversible and will **permanently delete all metadata** for the specified feature.
Args:
feature: Feature class or key to drop metadata for
Example:
```py
with store_with_data.open(mode="w"):
assert store_with_data.has_feature(MyFeature)
store_with_data.drop_feature_metadata(MyFeature)
```
"""
self._check_open()
feature_key = self._resolve_feature_key(feature)
if self._is_system_table(feature_key):
raise NotImplementedError(f"{self.__class__.__name__} does not support deletes for system tables")
self._drop_feature(feature_key)
metaxy.MetadataStore.delete
¶
delete(
feature: CoercibleToFeatureKey,
filters: Sequence[Expr] | Expr | None,
*,
soft: bool = True,
with_feature_history: bool = False,
with_sample_history: bool = False,
) -> None
Delete records matching provided filters.
Performs a soft delete by default. This is achieved by setting metaxy_deleted_at to the current timestamp. Subsequent [[MetadataStore.read]] calls would ignore these records by default.
Parameters:
-
feature(CoercibleToFeatureKey) βFeature to delete from.
-
filters(Sequence[Expr] | Expr | None) βOne or more Narwhals expressions or a sequence of expressions that determine which records to delete. If
None, deletes all records (subject towith_feature_historyandwith_sample_historyconstraints). -
soft(bool, default:True) βWhether to perform a soft delete.
-
with_feature_history(bool, default:False) βIf True, delete across all historical feature versions. If False (default), only current version.
-
with_sample_history(bool, default:False) βIf True, include all historical materializations. If False (default), deduplicate to latest rows.
Critical
By default, deletions target historical records. Even when with_feature_history is False,
records with the same feature version but an older metaxy_created_at would be targeted as
well. Consider adding additional conditions to filters if you want to avoid that.
Source code in src/metaxy/metadata_store/base.py
def delete(
self,
feature: CoercibleToFeatureKey,
filters: Sequence[nw.Expr] | nw.Expr | None,
*,
soft: bool = True,
with_feature_history: bool = False,
with_sample_history: bool = False,
) -> None:
"""Delete records matching provided filters.
Performs a soft delete by default. This is achieved by setting metaxy_deleted_at to the current timestamp.
Subsequent [[MetadataStore.read]] calls would ignore these records by default.
Args:
feature: Feature to delete from.
filters: One or more Narwhals expressions or a sequence of expressions that determine which records to delete.
If `None`, deletes all records (subject to `with_feature_history` and `with_sample_history` constraints).
soft: Whether to perform a soft delete.
with_feature_history: If True, delete across all historical feature versions. If False (default), only current version.
with_sample_history: If True, include all historical materializations. If False (default), deduplicate to latest rows.
!!! critical
By default, deletions target historical records. Even when `with_feature_history` is `False`,
records with the same feature version but an older `metaxy_created_at` would be targeted as
well. Consider adding additional conditions to `filters` if you want to avoid that.
"""
self._check_open()
feature_key = self._resolve_feature_key(feature)
# Normalize filters to list
if filters is None:
filter_list: list[nw.Expr] = []
elif isinstance(filters, nw.Expr):
filter_list = [filters]
else:
filter_list = list(filters)
if soft:
# Soft delete: mark records with deletion timestamp, preserving original updated_at
lazy = self.read(
feature_key,
filters=filter_list,
include_soft_deleted=False,
with_feature_history=with_feature_history,
with_sample_history=with_sample_history,
allow_fallback=True,
)
with self._shared_transaction_timestamp(soft_delete=True) as ts:
soft_deletion_marked = lazy.with_columns(
nw.lit(ts).alias(METAXY_DELETED_AT),
)
self.write(feature_key, soft_deletion_marked.to_native())
else:
# Hard delete: add version filter if needed, then delegate to backend
if not with_feature_history and not self._is_system_table(feature_key):
version_filter = nw.col(METAXY_FEATURE_VERSION) == current_graph().get_feature_version(feature_key)
filter_list = [version_filter, *filter_list]
self._delete_feature(feature_key, filter_list, with_feature_history=with_feature_history)
metaxy.MetadataStore.read_feature_schema_from_store
¶
read_feature_schema_from_store(
feature: CoercibleToFeatureKey,
) -> Schema
Read the schema for a feature from the store.
Parameters:
-
feature(CoercibleToFeatureKey) βFeature to read schema for
Returns:
-
SchemaβNarwhals schema for the feature
Raises:
-
FeatureNotFoundErrorβIf feature not found in the store
Source code in src/metaxy/metadata_store/base.py
def read_feature_schema_from_store(
self,
feature: CoercibleToFeatureKey,
) -> nw.Schema:
"""Read the schema for a feature from the store.
Args:
feature: Feature to read schema for
Returns:
Narwhals schema for the feature
Raises:
FeatureNotFoundError: If feature not found in the store
"""
lazy = self.read(
feature,
allow_fallback=False,
)
return lazy.collect_schema()
metaxy.MetadataStore.has_feature
¶
has_feature(
feature: CoercibleToFeatureKey,
*,
check_fallback: bool = False,
) -> bool
Check if feature exists in store.
Parameters:
-
feature(CoercibleToFeatureKey) βFeature to check
-
check_fallback(bool, default:False) βIf True, also check fallback stores
Returns:
-
boolβTrue if feature exists, False otherwise
Source code in src/metaxy/metadata_store/base.py
def has_feature(
self,
feature: CoercibleToFeatureKey,
*,
check_fallback: bool = False,
) -> bool:
"""
Check if feature exists in store.
Args:
feature: Feature to check
check_fallback: If True, also check fallback stores
Returns:
True if feature exists, False otherwise
"""
self._check_open()
if self._read_feature(feature) is not None:
return True
# Check fallback stores
if not check_fallback:
return self._has_feature_impl(feature)
else:
for store in self.fallback_stores:
if store.has_feature(feature, check_fallback=True):
return True
return False
metaxy.MetadataStore.display
abstractmethod
¶
display() -> str
Return a human-readable display string for this store.
Used in warnings, logs, and CLI output to identify the store.
Returns:
-
strβDisplay string (e.g., "DuckDBMetadataStore(database=/path/to/db.duckdb)")
Source code in src/metaxy/metadata_store/base.py
metaxy.MetadataStore.find_store_for_feature
¶
find_store_for_feature(
feature_key: CoercibleToFeatureKey,
*,
check_fallback: bool = True,
) -> MetadataStore | None
Find the store that contains the given feature.
Parameters:
-
feature_key(CoercibleToFeatureKey) βFeature to find
-
check_fallback(bool, default:True) βWhether to check fallback stores when the feature is not found in the current store
Returns:
-
MetadataStore | NoneβThe store containing the feature, or None if not found
Source code in src/metaxy/metadata_store/base.py
def find_store_for_feature(
self,
feature_key: CoercibleToFeatureKey,
*,
check_fallback: bool = True,
) -> MetadataStore | None:
"""Find the store that contains the given feature.
Args:
feature_key: Feature to find
check_fallback: Whether to check fallback stores when the feature
is not found in the current store
Returns:
The store containing the feature, or None if not found
"""
self._check_open()
# Check if feature exists in this store
if self.has_feature(feature_key):
return self
# Try fallback stores if enabled (opened on demand)
if check_fallback:
for store in self.fallback_stores:
with store:
found = store.find_store_for_feature(feature_key, check_fallback=True)
if found is not None:
return found
return None
metaxy.MetadataStore.get_store_metadata
¶
get_store_metadata(
feature_key: CoercibleToFeatureKey,
*,
check_fallback: bool = True,
) -> dict[str, Any]
Arbitrary key-value pairs with useful metadata for logging purposes (like a path in storage).
This method should not expose sensitive information.
Parameters:
-
feature_key(CoercibleToFeatureKey) βFeature to get metadata for
-
check_fallback(bool, default:True) βWhether to check fallback stores when the feature is not found in the current store
Returns:
-
dict[str, Any]βDictionary with store-specific metadata. Contains:
-
dict[str, Any]βname,display: The queried store (self)
-
dict[str, Any]βresolved_from: Where the feature was actually found (may be a fallback store), includesname,display, and store-specific fields liketable_nameoruri
Source code in src/metaxy/metadata_store/base.py
def get_store_metadata(
self,
feature_key: CoercibleToFeatureKey,
*,
check_fallback: bool = True,
) -> dict[str, Any]:
"""Arbitrary key-value pairs with useful metadata for logging purposes (like a path in storage).
This method should not expose sensitive information.
Args:
feature_key: Feature to get metadata for
check_fallback: Whether to check fallback stores when the feature
is not found in the current store
Returns:
Dictionary with store-specific metadata. Contains:
- `name`, `display`: The queried store (self)
- `resolved_from`: Where the feature was actually found (may be a fallback store),
includes `name`, `display`, and store-specific fields like `table_name` or `uri`
"""
resolved_store = self.find_store_for_feature(feature_key, check_fallback=check_fallback)
result: dict[str, Any] = {
"name": self.name,
"display": self.display(),
}
if resolved_store is not None:
result["resolved_from"] = resolved_store.get_store_info(feature_key)
return result
metaxy.MetadataStore.get_store_info
¶
get_store_info(
feature_key: CoercibleToFeatureKey,
) -> dict[str, Any]
Build a dictionary with store identification and feature-specific metadata.
Parameters:
-
feature_key(CoercibleToFeatureKey) βFeature to get metadata for
Returns:
-
dict[str, Any]βDictionary containing:
-
dict[str, Any]βname: The store name
-
dict[str, Any]βtype: The fully qualified class name (module.classname)
-
dict[str, Any]βdisplay: Human-readable store description
-
dict[str, Any]β- Store-specific metadata from
_get_store_metadata_impl(e.g.,table_name,uri)
- Store-specific metadata from
Source code in src/metaxy/metadata_store/base.py
def get_store_info(self, feature_key: CoercibleToFeatureKey) -> dict[str, Any]:
"""Build a dictionary with store identification and feature-specific metadata.
Args:
feature_key: Feature to get metadata for
Returns:
Dictionary containing:
- `name`: The store name
- `type`: The fully qualified class name (module.classname)
- `display`: Human-readable store description
- Store-specific metadata from `_get_store_metadata_impl` (e.g., `table_name`, `uri`)
"""
return {
"name": self.name,
"type": self.qualified_class_name,
"display": self.display(),
**self._get_store_metadata_impl(feature_key),
}
metaxy.MetadataStore.calculate_input_progress
¶
calculate_input_progress(
lazy_increment: LazyIncrement,
feature_key: CoercibleToFeatureKey,
) -> float | None
Calculate progress percentage from lazy increment.
Uses the input field from LazyIncrement to count total input units
and compares with added to determine how many are missing.
Progress represents the percentage of input units that have been processed
at least once. Stale samples (in changed) are counted as processed since
they have existing metadata, even though they may need re-processing due to
upstream changes.
Parameters:
-
lazy_increment(LazyIncrement) βThe lazy increment containing input and added dataframes.
-
feature_key(CoercibleToFeatureKey) βThe feature key to look up lineage information.
Returns:
-
float | NoneβProgress percentage (0-100), or None if input is not available.
Source code in src/metaxy/metadata_store/base.py
def calculate_input_progress(
self,
lazy_increment: LazyIncrement,
feature_key: CoercibleToFeatureKey,
) -> float | None:
"""Calculate progress percentage from lazy increment.
Uses the `input` field from LazyIncrement to count total input units
and compares with `added` to determine how many are missing.
Progress represents the percentage of input units that have been processed
at least once. Stale samples (in `changed`) are counted as processed since
they have existing metadata, even though they may need re-processing due to
upstream changes.
Args:
lazy_increment: The lazy increment containing input and added dataframes.
feature_key: The feature key to look up lineage information.
Returns:
Progress percentage (0-100), or None if input is not available.
"""
if lazy_increment.input is None:
return None
key = self._resolve_feature_key(feature_key)
graph = current_graph()
plan = graph.get_feature_plan(key)
# Get the columns that define input units from the feature plan
input_id_columns = plan.input_id_columns
# Count distinct input units using two separate queries
# We can't use concat because input and added may have different schemas
# (e.g., nullable vs non-nullable columns)
total_units: int = lazy_increment.input.select(input_id_columns).unique().select(nw.len()).collect().item()
if total_units == 0:
return None # No input available from upstream
missing_units: int = lazy_increment.new.select(input_id_columns).unique().select(nw.len()).collect().item()
processed_units = total_units - missing_units
return (processed_units / total_units) * 100
metaxy.MetadataStore.copy_metadata
¶
copy_metadata(
from_store: MetadataStore,
features: Sequence[CoercibleToFeatureKey] | None = None,
*,
filters: Mapping[str, Sequence[Expr]] | None = None,
global_filters: Sequence[Expr] | None = None,
with_feature_history: bool = True,
with_sample_history: bool = False,
) -> dict[str, int]
Copy metadata from another store.
Parameters:
-
from_store(MetadataStore) βSource metadata store to copy from (must be opened for reading)
-
features(Sequence[CoercibleToFeatureKey] | None, default:None) βFeatures to copy. Can be:
-
None: copies all features from the active graph -
Sequence of specific features to copy
-
-
filters(Mapping[str, Sequence[Expr]] | None, default:None) βDict mapping feature keys (as strings) to sequences of Narwhals filter expressions. These filters are applied when reading from the source store. Example: {"feature/key": [nw.col("x") > 10], "other/feature": [...]}
-
global_filters(Sequence[Expr] | None, default:None) βSequence of Narwhals filter expressions applied to all features. These filters are combined with any feature-specific filters from
filters. Example: [nw.col("sample_uid").is_in(["s1", "s2"])] -
with_feature_history(bool, default:True) βIf True (default), include rows from all historical feature versions. If False, only copy rows with the current feature_version.
-
with_sample_history(bool, default:False) βIf True, include all historical materializations per sample. If False (default), deduplicate within
id_columnsgroups by keeping latest.
Returns:
Raises:
-
ValueErrorβIf source or destination store is not open
-
FeatureNotFoundErrorβIf a specified feature doesn't exist in source store
Examples:
# Copy all features
with source_store, dest_store.open("w"):
stats = dest_store.copy_metadata(from_store=source_store)
# Copy specific features
with source_store, dest_store.open("w"):
stats = dest_store.copy_metadata(
from_store=source_store,
features=[mx.FeatureKey("my_feature")],
)
# Copy with global filters applied to all features
with source_store, dest_store.open("w"):
stats = dest_store.copy_metadata(
from_store=source_store,
global_filters=[nw.col("id").is_in(["a", "b"])],
)
# Copy specific features with per-feature filters
with source_store, dest_store.open("w"):
stats = dest_store.copy_metadata(
from_store=source_store,
features=[
mx.FeatureKey("feature_a"),
mx.FeatureKey("feature_b"),
],
filters={
"feature_a": [nw.col("field_a") > 10],
"feature_b": [nw.col("field_b") < 30],
},
)
Source code in src/metaxy/metadata_store/base.py
def copy_metadata(
self,
from_store: MetadataStore,
features: Sequence[CoercibleToFeatureKey] | None = None,
*,
filters: Mapping[str, Sequence[nw.Expr]] | None = None,
global_filters: Sequence[nw.Expr] | None = None,
with_feature_history: bool = True,
with_sample_history: bool = False,
) -> dict[str, int]:
"""Copy metadata from another store.
Args:
from_store: Source metadata store to copy from (must be opened for reading)
features: Features to copy. Can be:
- `None`: copies all features from the active graph
- Sequence of specific features to copy
filters: Dict mapping feature keys (as strings) to sequences of Narwhals filter expressions.
These filters are applied when reading from the source store.
Example: {"feature/key": [nw.col("x") > 10], "other/feature": [...]}
global_filters: Sequence of Narwhals filter expressions applied to all features.
These filters are combined with any feature-specific filters from `filters`.
Example: [nw.col("sample_uid").is_in(["s1", "s2"])]
with_feature_history: If True (default), include rows from all historical feature versions.
If False, only copy rows with the current feature_version.
with_sample_history: If True, include all historical materializations per sample.
If False (default), deduplicate within `id_columns` groups by keeping latest.
Returns:
Dict with statistics: {"features_copied": int, "rows_copied": int}
Raises:
ValueError: If source or destination store is not open
FeatureNotFoundError: If a specified feature doesn't exist in source store
Examples:
<!-- skip next -->
```py
# Copy all features
with source_store, dest_store.open("w"):
stats = dest_store.copy_metadata(from_store=source_store)
```
<!-- skip next -->
```py
# Copy specific features
with source_store, dest_store.open("w"):
stats = dest_store.copy_metadata(
from_store=source_store,
features=[mx.FeatureKey("my_feature")],
)
```
<!-- skip next -->
```py
# Copy with global filters applied to all features
with source_store, dest_store.open("w"):
stats = dest_store.copy_metadata(
from_store=source_store,
global_filters=[nw.col("id").is_in(["a", "b"])],
)
```
<!-- skip next -->
```py
# Copy specific features with per-feature filters
with source_store, dest_store.open("w"):
stats = dest_store.copy_metadata(
from_store=source_store,
features=[
mx.FeatureKey("feature_a"),
mx.FeatureKey("feature_b"),
],
filters={
"feature_a": [nw.col("field_a") > 10],
"feature_b": [nw.col("field_b") < 30],
},
)
```
"""
import logging
logger = logging.getLogger(__name__)
# Validate both stores are open
if not self._is_open:
raise ValueError('Destination store must be opened with store.open("w") before use')
if not from_store._is_open:
raise ValueError("Source store must be opened with store before use")
return self._copy_metadata_impl(
from_store=from_store,
features=features,
filters=filters,
global_filters=global_filters,
with_feature_history=with_feature_history,
with_sample_history=with_sample_history,
logger=logger,
)
metaxy.metadata_store.base.VersioningEngineOptions
module-attribute
¶
Base Configuration Class¶
The following base configuration class is typically used by child metadata stores:
metaxy.metadata_store.base.MetadataStoreConfig
¶
Bases: BaseSettings
Base configuration class for metadata stores.
This class defines common configuration fields shared by all metadata store types. Store-specific config classes should inherit from this and add their own fields.
Example
Configuration¶
The base MetadataStoreConfig class injects the following configuration options:
fallback_stores¶
List of fallback store names to search when features are not found in the current store.
Type: list[str]
hash_algorithm¶
Hash algorithm for versioning. If None, uses store's default.
Type: metaxy.versioning.types.HashAlgorithm | None
versioning_engine¶
Which versioning engine to use: 'auto' (prefer native), 'native', or 'polars'.
Type: Literal['auto', 'native', 'polars'] | Default: "auto"