Skip to content

Metadata Stores

Metaxy abstracts interactions with metadata behind an interface called MetadataStore.

Users can extend this class to implement support for arbitrary metadata storage such as databases, lakehouse formats, or really any kind of external system.

Learn how to use metadata stores here.

Here are some of the built-in metadata store types (1):

  1. the full list can be found here

Databases

Storage


Metadata Store Interface

metaxy.MetadataStore

MetadataStore(
    *,
    name: str | None = None,
    hash_algorithm: HashAlgorithm | None = None,
    versioning_engine: VersioningEngineOptions = "auto",
    fallback_stores: Sequence[MetadataStore | str]
    | None = None,
    auto_create_tables: bool | None = None,
    materialization_id: str | None = None,
)

Bases: ABC

Abstract base class for metadata storage backends.

Parameters:

  • name (str | None, default: None ) –

    Optional name for this store. For representation purposes only. Is typically included into .display().

  • hash_algorithm (HashAlgorithm | None, default: None ) –

    Hash algorithm to use for the versioning engine.

  • versioning_engine (VersioningEngineOptions, default: 'auto' ) –

    Which versioning engine to use.

    • "auto": Prefer the store's native engine and fall back to Polars if needed

    • "native": Always use the store's native engine, raise VersioningEngineMismatchError if at some point the metadata store has to switch to Polars

    • "polars": Always use the Polars engine

  • fallback_stores (Sequence[MetadataStore | str] | None, default: None ) –

    Ordered list of read-only fallback stores. Used when upstream features are not in this store. VersioningEngineMismatchError is not raised when reading from fallback stores.

  • auto_create_tables (bool | None, default: None ) –

    Whether to automatically create tables when writing new features. If not provided, the global Metaxy configuration auto_created_tables option will be used (which can be set via METAXY_AUTO_CREATE_TABLES env var).

    Warning

    This is intended for development/testing purposes. Use proper database migration tools like Alembic to manage table infrastructure in production.

  • materialization_id (str | None, default: None ) –

    Optional external orchestration ID. If provided, all metadata writes will include this ID in the metaxy_materialization_id column. Can be overridden per MetadataStore.write call.

Raises:

  • ValueError –

    If fallback stores use different hash algorithms or truncation lengths

  • VersioningEngineMismatchError –

    If the versioning engine is attempted to be switched to Polars and versioning_engine is set to native.

Source code in src/metaxy/metadata_store/base.py
def __init__(
    self,
    *,
    name: str | None = None,
    hash_algorithm: HashAlgorithm | None = None,
    versioning_engine: VersioningEngineOptions = "auto",
    fallback_stores: Sequence[MetadataStore | str] | None = None,
    auto_create_tables: bool | None = None,
    materialization_id: str | None = None,
):
    """
    Initialize the metadata store.

    Args:
        name: Optional name for this store. For representation purposes only.
            Is typically included into `.display()`.
        hash_algorithm: Hash algorithm to use for the versioning engine.
        versioning_engine: Which versioning engine to use.

            - "auto": Prefer the store's native engine and fall back to Polars if needed

            - "native": Always use the store's native engine, raise `VersioningEngineMismatchError`
                if at some point the metadata store has to switch to Polars

            - "polars": Always use the Polars engine

        fallback_stores: Ordered list of read-only fallback stores.
            Used when upstream features are not in this store.
            `VersioningEngineMismatchError` is not raised when reading from fallback stores.
        auto_create_tables: Whether to automatically create tables when writing new features.
            If not provided, the global Metaxy configuration `auto_created_tables` option will
            be used (which can be set via `METAXY_AUTO_CREATE_TABLES` env var).

            !!! warning
                This is intended for development/testing purposes.
                Use proper database migration tools like Alembic to manage table infrastructure in production.

        materialization_id: Optional external orchestration ID.
            If provided, all metadata writes will include this ID in the `metaxy_materialization_id` column.
            Can be overridden per [`MetadataStore.write`][metaxy.MetadataStore.write] call.

    Raises:
        ValueError: If fallback stores use different hash algorithms or truncation lengths
        VersioningEngineMismatchError: If the versioning engine is attempted to be switched
            to Polars and `versioning_engine` is set to `native`.
    """
    self._name = name
    self._is_open = False
    self._context_depth = 0
    self._versioning_engine = versioning_engine
    self._materialization_id = materialization_id
    self._open_cm: AbstractContextManager[Self] | None = None
    self._transaction_timestamp: datetime | None = None
    self._soft_delete_in_progress: bool = False

    if auto_create_tables is None:
        from metaxy.config import MetaxyConfig

        self.auto_create_tables = MetaxyConfig.get().auto_create_tables
    else:
        self.auto_create_tables = auto_create_tables

    if hash_algorithm is None:
        hash_algorithm = self._get_default_hash_algorithm()
    self.hash_algorithm = hash_algorithm

    from metaxy.metadata_store.fallback import FallbackStoreList

    self.fallback_stores: FallbackStoreList = (
        fallback_stores
        if isinstance(fallback_stores, FallbackStoreList)
        else FallbackStoreList(fallback_stores or [])
    )

Attributes

metaxy.MetadataStore.materialization_id property

materialization_id: str | None

The external orchestration ID for this store instance.

If set, all metadata writes include this ID in the metaxy_materialization_id column, allowing filtering of rows written during a specific materialization run.

metaxy.MetadataStore.name property

name: str | None

The configured name of this store, if any.

metaxy.MetadataStore.qualified_class_name property

qualified_class_name: str

The fully qualified class name (module.classname).

Functions

metaxy.MetadataStore.resolve_update

resolve_update(
    feature: CoercibleToFeatureKey,
    *,
    samples: IntoFrame | Frame | None = None,
    filters: Mapping[CoercibleToFeatureKey, Sequence[Expr]]
    | None = None,
    global_filters: Sequence[Expr] | None = None,
    target_filters: Sequence[Expr] | None = None,
    lazy: Literal[False] = False,
    versioning_engine: Literal["auto", "native", "polars"]
    | None = None,
    skip_comparison: bool = False,
    **kwargs: Any,
) -> Increment
resolve_update(
    feature: CoercibleToFeatureKey,
    *,
    samples: IntoFrame | Frame | None = None,
    filters: Mapping[CoercibleToFeatureKey, Sequence[Expr]]
    | None = None,
    global_filters: Sequence[Expr] | None = None,
    target_filters: Sequence[Expr] | None = None,
    lazy: Literal[True],
    versioning_engine: Literal["auto", "native", "polars"]
    | None = None,
    skip_comparison: bool = False,
    **kwargs: Any,
) -> LazyIncrement

Calculate an incremental update for a feature.

This is the main workhorse in Metaxy.

Parameters:

  • feature (CoercibleToFeatureKey) –

    Feature class to resolve updates for

  • samples (IntoFrame | Frame | None, default: None ) –

    A dataframe with joined upstream metadata and "metaxy_provenance_by_field" column set. When provided, MetadataStore skips loading upstream feature metadata and provenance calculations.

    Required for root features

    Metaxy doesn't know how to populate input metadata for root features, so samples argument for must be provided for them.

    Tip

    For non-root features, use samples to customize the automatic upstream loading and field provenance calculation. For example, it can be used to requires processing for specific sample IDs.

    Setting this parameter during normal operations is not required.

  • filters (Mapping[CoercibleToFeatureKey, Sequence[Expr]] | None, default: None ) –

    A mapping from feature keys to lists of Narwhals filter expressions. Keys can be feature classes, FeatureKey objects, or string paths. Applied at read-time. May filter the current feature, in this case it will also be applied to samples (if provided). Example: {UpstreamFeature: [nw.col("x") > 10], ...}

  • global_filters (Sequence[Expr] | None, default: None ) –

    A list of Narwhals filter expressions applied to all features (both upstream and target). These filters are combined with any feature-specific filters from filters. Must reference columns that exist in ALL features. Useful for filtering by common columns like sample_uid across all features. Example: [nw.col("sample_uid").is_in(["s1", "s2"])]

  • target_filters (Sequence[Expr] | None, default: None ) –

    A list of Narwhals filter expressions applied ONLY to the target feature (not to upstream features). Use this when filtering by columns that only exist in the target feature. Example: [nw.col("height").is_null()]

  • lazy (bool, default: False ) –

    Whether to return a LazyIncrement or a Increment.

  • versioning_engine (Literal['auto', 'native', 'polars'] | None, default: None ) –

    Override the store's versioning engine for this operation.

  • skip_comparison (bool, default: False ) –

    If True, skip the increment comparison logic and return all upstream samples in increment.new. The changed and removed frames will be empty.

Raises:

  • ValueError –

    If no samples dataframe has been provided when resolving an update for a root feature.

  • VersioningEngineMismatchError –

    If versioning_engine has been set to "native" and a dataframe of a different implementation has been encountered during resolve_update.

Note

This method automatically loads feature definitions from the metadata store before computing the update. This ensures that any external feature dependencies are resolved with their actual definitions from the store, preventing incorrect version calculations from stale external feature definitions.

With a root feature

import narwhals as nw
import polars as pl

samples = pl.DataFrame(
    {
        "id": ["x", "y", "z"],
        "metaxy_provenance_by_field": [
            {"part_1": "h1", "part_2": "h2"},
            {"part_1": "h3", "part_2": "h4"},
            {"part_1": "h5", "part_2": "h6"},
        ],
    }
)
with store.open(mode="w"):
    result = store.resolve_update(MyFeature, samples=nw.from_native(samples))
Source code in src/metaxy/metadata_store/base.py
def resolve_update(
    self,
    feature: CoercibleToFeatureKey,
    *,
    samples: IntoFrame | Frame | None = None,
    filters: Mapping[CoercibleToFeatureKey, Sequence[nw.Expr]] | None = None,
    global_filters: Sequence[nw.Expr] | None = None,
    target_filters: Sequence[nw.Expr] | None = None,
    lazy: bool = False,
    versioning_engine: Literal["auto", "native", "polars"] | None = None,
    skip_comparison: bool = False,
    **kwargs: Any,
) -> Increment | LazyIncrement:
    """Calculate an incremental update for a feature.

    This is the main workhorse in Metaxy.

    Args:
        feature: Feature class to resolve updates for
        samples: A dataframe with joined upstream metadata and `"metaxy_provenance_by_field"` column set.
            When provided, `MetadataStore` skips loading upstream feature metadata and provenance calculations.

            !!! info "Required for root features"
                Metaxy doesn't know how to populate input metadata for root features,
                so `samples` argument for **must** be provided for them.

            !!! tip
                For non-root features, use `samples` to customize the automatic upstream loading and field provenance calculation.
                For example, it can be used to requires processing for specific sample IDs.

            Setting this parameter during normal operations is not required.

        filters: A mapping from feature keys to lists of Narwhals filter expressions.
            Keys can be feature classes, FeatureKey objects, or string paths.
            Applied at read-time. May filter the current feature,
            in this case it will also be applied to `samples` (if provided).
            Example: `{UpstreamFeature: [nw.col("x") > 10], ...}`
        global_filters: A list of Narwhals filter expressions applied to all features
            (both upstream and target). These filters are combined with any feature-specific
            filters from `filters`. Must reference columns that exist in ALL features.
            Useful for filtering by common columns like `sample_uid` across all features.
            Example: `[nw.col("sample_uid").is_in(["s1", "s2"])]`
        target_filters: A list of Narwhals filter expressions applied ONLY to the target
            feature (not to upstream features). Use this when filtering by columns that
            only exist in the target feature.
            Example: `[nw.col("height").is_null()]`
        lazy: Whether to return a [`LazyIncrement`][metaxy.versioning.types.LazyIncrement] or a [`Increment`][metaxy.versioning.types.Increment].
        versioning_engine: Override the store's versioning engine for this operation.
        skip_comparison: If True, skip the increment comparison logic and return all
            upstream samples in `increment.new`. The `changed` and `removed` frames will
            be empty.

    Raises:
        ValueError: If no `samples` dataframe has been provided when resolving an update for a root feature.
        VersioningEngineMismatchError: If `versioning_engine` has been set to `"native"`
            and a dataframe of a different implementation has been encountered during `resolve_update`.

    !!! note
        This method automatically loads feature definitions from the metadata store
        before computing the update. This ensures that any external feature dependencies
        are resolved with their actual definitions from the store, preventing incorrect
        version calculations from stale external feature definitions.

    !!! example "With a root feature"

        ```py
        import narwhals as nw
        import polars as pl

        samples = pl.DataFrame(
            {
                "id": ["x", "y", "z"],
                "metaxy_provenance_by_field": [
                    {"part_1": "h1", "part_2": "h2"},
                    {"part_1": "h3", "part_2": "h4"},
                    {"part_1": "h5", "part_2": "h6"},
                ],
            }
        )
        with store.open(mode="w"):
            result = store.resolve_update(MyFeature, samples=nw.from_native(samples))
        ```
    """
    import narwhals as nw

    import metaxy as mx
    from metaxy.config import MetaxyConfig

    # Sync external feature definitions from the store to replace any external feature placeholders.
    # This ensures version hashes are computed correctly against actual stored definitions.
    # it is acceptable to call this here automatically for three reasons:
    # 1. `resolve_update` is typically only called once at the start of the workflow
    # 2. `resolve_update` is already doing heavy computations so an extra little call won't hurt performance
    # 3. it is extremely important to get the result right
    if MetaxyConfig.get(_allow_default_config=True).sync:
        mx.sync_external_features(self)

    # Convert samples to Narwhals frame if not already
    samples_nw: nw.DataFrame[Any] | nw.LazyFrame[Any] | None = None
    if samples is not None:
        if isinstance(samples, (nw.DataFrame, nw.LazyFrame)):
            samples_nw = samples
        else:
            samples_nw = nw.from_native(samples)  # ty: ignore[invalid-assignment]

    # Normalize filter keys to FeatureKey
    normalized_filters: dict[FeatureKey, list[nw.Expr]] = {}
    if filters:
        for key, exprs in filters.items():
            feature_key = self._resolve_feature_key(key)
            normalized_filters[feature_key] = list(exprs)

    # Convert global_filters and target_filters to lists for easy concatenation
    global_filter_list = list(global_filters) if global_filters else []
    target_filter_list = list(target_filters) if target_filters else []

    feature_key = self._resolve_feature_key(feature)
    if self._is_system_table(feature_key):
        raise NotImplementedError("Delete operations are not yet supported for system tables.")
    graph = current_graph()
    plan = graph.get_feature_plan(feature_key)

    # Root features without samples: error (samples required)
    if not plan.deps and samples_nw is None:
        raise ValueError(
            f"Feature {feature_key} has no upstream dependencies (root feature). "
            f"Must provide 'samples' parameter with sample_uid and {METAXY_PROVENANCE_BY_FIELD} columns. "
            f"Root features require manual {METAXY_PROVENANCE_BY_FIELD} computation."
        )

    # Combine feature-specific filters, global filters, and target filters for current feature
    # target_filters are ONLY applied to the current feature, not to upstream features
    current_feature_filters = [
        *normalized_filters.get(feature_key, []),
        *global_filter_list,
        *target_filter_list,
    ]

    # Read current metadata with deduplication (with_sample_history=False by default)
    # Use allow_fallback=False since we only want metadata from THIS store
    # to determine what needs to be updated locally
    try:
        current_metadata: nw.LazyFrame[Any] | None = self.read(
            feature_key,
            filters=current_feature_filters if current_feature_filters else None,
            allow_fallback=False,
            with_feature_history=False,  # filters by current feature_version
            with_sample_history=False,  # deduplicates by id_columns, keeping latest
        )
    except FeatureNotFoundError:
        current_metadata = None

    upstream_by_key: dict[FeatureKey, nw.LazyFrame[Any]] = {}
    filters_by_key: dict[FeatureKey, list[nw.Expr]] = {}

    # if samples are provided, use them as source of truth for upstream data
    if samples_nw is not None:
        # Apply filters to samples if any
        filtered_samples = samples_nw
        if current_feature_filters:
            filtered_samples = samples_nw.filter(current_feature_filters)

        # fill in METAXY_PROVENANCE column if it's missing (e.g. for root features)
        samples_nw = self.hash_struct_version_column(
            plan,
            df=filtered_samples,
            struct_column=METAXY_PROVENANCE_BY_FIELD,
            hash_column=METAXY_PROVENANCE,
        )

        # For root features, add data_version columns if they don't exist
        # (root features have no computation, so data_version equals provenance)
        # Use collect_schema().names() to avoid PerformanceWarning on lazy frames
        if METAXY_DATA_VERSION_BY_FIELD not in samples_nw.collect_schema().names():
            samples_nw = samples_nw.with_columns(
                nw.col(METAXY_PROVENANCE_BY_FIELD).alias(METAXY_DATA_VERSION_BY_FIELD),
                nw.col(METAXY_PROVENANCE).alias(METAXY_DATA_VERSION),
            )
    else:
        for upstream_spec in plan.deps or []:
            # Combine feature-specific filters with global filters for upstream
            upstream_filters = [
                *normalized_filters.get(upstream_spec.key, []),
                *global_filter_list,
            ]
            upstream_feature_metadata = self.read(
                upstream_spec.key,
                filters=upstream_filters if upstream_filters else None,
            )
            if upstream_feature_metadata is not None:
                upstream_by_key[upstream_spec.key] = upstream_feature_metadata

    # determine which implementation to use for resolving the increment
    # consider (1) whether all upstream metadata has been loaded with the native implementation
    # (2) if samples have native implementation

    # Use parameter if provided, otherwise use store default
    engine_mode = versioning_engine if versioning_engine is not None else self._versioning_engine

    # If "polars" mode, force Polars immediately
    if engine_mode == "polars":
        implementation = nw.Implementation.POLARS
        switched_to_polars = True
    else:
        implementation = self.native_implementation()
        switched_to_polars = False

        for upstream_key, df in upstream_by_key.items():
            if df.implementation != implementation:
                switched_to_polars = True
                # Only raise error in "native" mode if no fallback stores configured.
                # If fallback stores exist, the implementation mismatch indicates data came
                # from fallback (different implementation), which is legitimate fallback access.
                # If data were local, it would have the native implementation.
                if engine_mode == "native" and not self.fallback_stores:
                    raise VersioningEngineMismatchError(
                        f"versioning_engine='native' but upstream feature `{upstream_key.to_string()}` "
                        f"has implementation {df.implementation}, expected {self.native_implementation()}"
                    )
                elif engine_mode == "auto" or (engine_mode == "native" and self.fallback_stores):
                    PolarsMaterializationWarning.warn_on_implementation_mismatch(
                        expected=self.native_implementation(),
                        actual=df.implementation,
                        message=f"Using Polars for resolving the increment instead. This was caused by upstream feature `{upstream_key.to_string()}`.",
                    )
                implementation = nw.Implementation.POLARS
                break

        if samples_nw is not None and samples_nw.implementation != self.native_implementation():
            if not switched_to_polars:
                if engine_mode == "native":
                    # Always raise error for samples with wrong implementation, regardless
                    # of fallback stores, because samples come from user argument, not from fallback
                    raise VersioningEngineMismatchError(
                        f"versioning_engine='native' but provided `samples` have implementation {samples_nw.implementation}, "
                        f"expected {self.native_implementation()}"
                    )
                elif engine_mode == "auto":
                    PolarsMaterializationWarning.warn_on_implementation_mismatch(
                        expected=self.native_implementation(),
                        actual=samples_nw.implementation,
                        message=f"Provided `samples` have implementation {samples_nw.implementation}. Using Polars for resolving the increment instead.",
                    )
            implementation = nw.Implementation.POLARS
            switched_to_polars = True

    if switched_to_polars:
        if current_metadata:
            current_metadata = switch_implementation_to_polars(current_metadata)
        if samples_nw:
            samples_nw = switch_implementation_to_polars(samples_nw)
        for upstream_key, df in upstream_by_key.items():
            upstream_by_key[upstream_key] = switch_implementation_to_polars(df)

    with self.create_versioning_engine(plan=plan, implementation=implementation) as engine:
        if skip_comparison:
            # Skip comparison: return all upstream samples as added
            if samples_nw is not None:
                # Root features or user-provided samples: use samples directly
                # Note: samples already has metaxy_provenance computed
                added = samples_nw.lazy()
                input_df = None  # Root features have no upstream input
            else:
                # Non-root features: load all upstream with provenance
                added = engine.load_upstream_with_provenance(
                    upstream=upstream_by_key,
                    hash_algo=self.hash_algorithm,
                    filters=filters_by_key,
                )
                input_df = added  # Input is the same as added when skipping comparison
            changed = None
            removed = None
        else:
            added, changed, removed, input_df = engine.resolve_increment_with_provenance(
                current=current_metadata,
                upstream=upstream_by_key,
                hash_algorithm=self.hash_algorithm,
                filters=filters_by_key,
                sample=samples_nw.lazy() if samples_nw is not None else None,
            )

    # Convert None to empty DataFrames
    if changed is None:
        changed = empty_frame_like(added)
    if removed is None:
        removed = empty_frame_like(added)

    if lazy:
        return LazyIncrement(
            new=added if isinstance(added, nw.LazyFrame) else nw.from_native(added),
            stale=changed if isinstance(changed, nw.LazyFrame) else nw.from_native(changed),
            orphaned=removed if isinstance(removed, nw.LazyFrame) else nw.from_native(removed),
            input=input_df if input_df is None or isinstance(input_df, nw.LazyFrame) else nw.from_native(input_df),
        )
    else:
        return Increment(
            new=added.collect() if isinstance(added, nw.LazyFrame) else added,
            stale=changed.collect() if isinstance(changed, nw.LazyFrame) else changed,
            orphaned=removed.collect() if isinstance(removed, nw.LazyFrame) else removed,
        )

metaxy.MetadataStore.compute_provenance

compute_provenance(
    feature: CoercibleToFeatureKey, df: FrameT
) -> FrameT

Compute provenance columns for a DataFrame with pre-joined upstream data.

Note

This method may be useful in very rare cases. Rely on MetadataStore.resolve_update instead.

Use this method when you perform custom joins outside of Metaxy's auto-join system but still want Metaxy to compute provenance. The method computes metaxy_provenance_by_field, metaxy_provenance, metaxy_data_version_by_field, and metaxy_data_version columns based on the upstream metadata.

Info

The input DataFrame must contain the renamed metaxy_data_version_by_field columns from each upstream feature. The naming convention follows the pattern metaxy_data_version_by_field__<feature_key.to_column_suffix()>. For example, for an upstream feature with key ["video", "raw"], the column should be named metaxy_data_version_by_field__video_raw.

Parameters:

  • feature (CoercibleToFeatureKey) –

    The feature to compute provenance for.

  • df (FrameT) –

    A DataFrame containing pre-joined upstream data with renamed metaxy_data_version_by_field columns from each upstream feature.

Returns:

  • FrameT –

    The input DataFrame with provenance columns added. Returns the same

  • FrameT –

    frame type as the input, either an eager DataFrame or a LazyFrame.

Raises:

  • StoreNotOpenError –

    If the store is not open.

  • ValueError –

    If required upstream metaxy_data_version_by_field columns are missing from the DataFrame.

Example
    # Read upstream metadata
    video_df = store.read(VideoFeature).collect()
    audio_df = store.read(AudioFeature).collect()

    # Rename data_version_by_field columns to the expected convention
    video_df = video_df.rename({
        "metaxy_data_version_by_field": "metaxy_data_version_by_field__video_raw"
    })
    audio_df = audio_df.rename({
        "metaxy_data_version_by_field": "metaxy_data_version_by_field__audio_raw"
    })

    # Perform custom join
    joined = video_df.join(audio_df, on="sample_uid", how="inner")

    # Compute provenance
    with_provenance = store.compute_provenance(MyFeature, joined)

    # Pass to resolve_update
    increment = store.resolve_update(MyFeature, samples=with_provenance)
Source code in src/metaxy/metadata_store/base.py
def compute_provenance(
    self,
    feature: CoercibleToFeatureKey,
    df: FrameT,
) -> FrameT:
    """Compute provenance columns for a DataFrame with pre-joined upstream data.

    !!! note
        This method may be useful in very rare cases.
        Rely on [`MetadataStore.resolve_update`][metaxy.metadata_store.base.MetadataStore.resolve_update] instead.

    Use this method when you perform custom joins outside of Metaxy's auto-join
    system but still want Metaxy to compute provenance. The method computes
    metaxy_provenance_by_field, metaxy_provenance, metaxy_data_version_by_field,
    and metaxy_data_version columns based on the upstream metadata.

    !!! info
        The input DataFrame must contain the renamed metaxy_data_version_by_field
        columns from each upstream feature. The naming convention follows the pattern
        `metaxy_data_version_by_field__<feature_key.to_column_suffix()>`. For example, for an
        upstream feature with key `["video", "raw"]`, the column should be named
        `metaxy_data_version_by_field__video_raw`.

    Args:
        feature: The feature to compute provenance for.
        df: A DataFrame containing pre-joined upstream data with renamed
            metaxy_data_version_by_field columns from each upstream feature.

    Returns:
        The input DataFrame with provenance columns added. Returns the same
        frame type as the input, either an eager DataFrame or a LazyFrame.

    Raises:
        StoreNotOpenError: If the store is not open.
        ValueError: If required upstream `metaxy_data_version_by_field` columns
            are missing from the DataFrame.

    Example:
        <!-- skip next -->
        ```py

            # Read upstream metadata
            video_df = store.read(VideoFeature).collect()
            audio_df = store.read(AudioFeature).collect()

            # Rename data_version_by_field columns to the expected convention
            video_df = video_df.rename({
                "metaxy_data_version_by_field": "metaxy_data_version_by_field__video_raw"
            })
            audio_df = audio_df.rename({
                "metaxy_data_version_by_field": "metaxy_data_version_by_field__audio_raw"
            })

            # Perform custom join
            joined = video_df.join(audio_df, on="sample_uid", how="inner")

            # Compute provenance
            with_provenance = store.compute_provenance(MyFeature, joined)

            # Pass to resolve_update
            increment = store.resolve_update(MyFeature, samples=with_provenance)
        ```
    """
    self._check_open()

    feature_key = self._resolve_feature_key(feature)
    graph = current_graph()
    plan = graph.get_feature_plan(feature_key)

    # Use native implementation if DataFrame matches, otherwise fall back to Polars
    implementation = self.native_implementation()
    if df.implementation != implementation:
        implementation = nw.Implementation.POLARS
        df = switch_implementation_to_polars(df)  # ty: ignore[no-matching-overload]

    with self.create_versioning_engine(plan=plan, implementation=implementation) as engine:
        # Validate required upstream columns exist
        expected_columns = {
            dep.feature: engine.get_renamed_data_version_by_field_col(dep.feature)
            for dep in (plan.feature_deps or [])
        }

        df_columns = set(df.collect_schema().names())  # ty: ignore[invalid-argument-type]
        missing_columns = [
            f"{col} (from upstream feature {key.to_string()})"
            for key, col in expected_columns.items()
            if col not in df_columns
        ]

        if missing_columns:
            raise ValueError(
                f"DataFrame is missing required upstream columns for computing "
                f"provenance of feature {feature_key.to_string()}. "
                f"Missing columns: {missing_columns}. "
                f"Make sure to rename metaxy_data_version_by_field columns from "
                f"each upstream feature using the pattern "
                f"metaxy_data_version_by_field__<feature_key.table_name>."
            )

        return engine.compute_provenance_columns(df, hash_algo=self.hash_algorithm)  # ty: ignore[invalid-argument-type]

metaxy.MetadataStore.read

read(
    feature: CoercibleToFeatureKey,
    *,
    feature_version: str | None = None,
    filters: Sequence[Expr] | None = None,
    columns: Sequence[str] | None = None,
    allow_fallback: bool = True,
    with_feature_history: bool = False,
    with_sample_history: bool = False,
    include_soft_deleted: bool = False,
    with_store_info: Literal[False] = False,
) -> LazyFrame[Any]
read(
    feature: CoercibleToFeatureKey,
    *,
    feature_version: str | None = None,
    filters: Sequence[Expr] | None = None,
    columns: Sequence[str] | None = None,
    allow_fallback: bool = True,
    with_feature_history: bool = False,
    with_sample_history: bool = False,
    include_soft_deleted: bool = False,
    with_store_info: Literal[True],
) -> tuple[LazyFrame[Any], MetadataStore]

Read metadata with optional fallback to upstream stores.

By default, does not include: - rows from historical feature versions (configured via with_feature_history=False) - rows that have been overwritten by subsequent writes with the same feature version (configured via with_sample_history=False) - soft-deleted with metaxy_deleted_at set to a non-null value (configured via include_soft_deleted=False)

Parameters:

  • feature (CoercibleToFeatureKey) –

    Feature to read metadata for

  • feature_version (str | None, default: None ) –

    Explicit feature_version to filter by (mutually exclusive with with_feature_history=False)

  • filters (Sequence[Expr] | None, default: None ) –

    Sequence of Narwhals filter expressions to apply to this feature. Example: [nw.col("x") > 10, nw.col("y") < 5]

  • columns (Sequence[str] | None, default: None ) –

    Subset of columns to include. Metaxy's system columns are always included.

  • allow_fallback (bool, default: True ) –

    Whether to allow fallback to upstream stores if the requested feature is not found in the current store.

  • with_feature_history (bool, default: False ) –

    If True, include rows from all historical feature versions. By default (False), only returns rows with the currently registered feature version.

  • with_sample_history (bool, default: False ) –

    If True, include all historical materializations per sample. By default (False), deduplicates samples within id_columns groups ordered by metaxy_created_at.

  • include_soft_deleted (bool, default: False ) –

    If True, include soft-deleted rows in the result. Previous historical materializations of the same feature version will be effectively removed from the output otherwise.

  • with_store_info (bool, default: False ) –

    If True, return a tuple of (LazyFrame, MetadataStore) where the MetadataStore is the store that actually contained the feature (which may be a fallback store if allow_fallback=True).

Returns:

  • LazyFrame[Any] | tuple[LazyFrame[Any], MetadataStore] –

    Narwhals LazyFrame with metadata, or, if with_store_info=True, a tuple of (LazyFrame, MetadataStore) containing the metadata store which has been actually used to retrieve the feature (may be a fallback store).

Raises:

Info

When this method is called with default arguments, it will return the latest (by metaxy_created_at) metadata for the current feature version excluding soft-deleted rows. Therefore, it's perfectly suitable for most use cases.

Warning

The order of rows is not guaranteed.

Source code in src/metaxy/metadata_store/base.py
def read(
    self,
    feature: CoercibleToFeatureKey,
    *,
    feature_version: str | None = None,
    filters: Sequence[nw.Expr] | None = None,
    columns: Sequence[str] | None = None,
    allow_fallback: bool = True,
    with_feature_history: bool = False,
    with_sample_history: bool = False,
    include_soft_deleted: bool = False,
    with_store_info: bool = False,
) -> nw.LazyFrame[Any] | tuple[nw.LazyFrame[Any], MetadataStore]:
    """
    Read metadata with optional fallback to upstream stores.

    By default, does not include:
       - rows from historical feature versions (configured via `with_feature_history=False`)
       - rows that have been overwritten by subsequent writes with the same feature version (configured via `with_sample_history=False`)
       - soft-deleted with `metaxy_deleted_at` set to a non-null value (configured via `include_soft_deleted=False`)

    Args:
        feature: Feature to read metadata for
        feature_version: Explicit feature_version to filter by (mutually exclusive with with_feature_history=False)
        filters: Sequence of Narwhals filter expressions to apply to this feature.
            Example: `[nw.col("x") > 10, nw.col("y") < 5]`
        columns: Subset of columns to include. Metaxy's system columns are always included.
        allow_fallback: Whether to allow fallback to upstream stores if the requested feature is not found in the current store.
        with_feature_history: If True, include rows from all historical feature versions.
            By default (False), only returns rows with the currently registered feature version.
        with_sample_history: If True, include all historical materializations per sample.
            By default (False), deduplicates samples within `id_columns` groups ordered by `metaxy_created_at`.
        include_soft_deleted: If `True`, include soft-deleted rows in the result. Previous historical materializations of the same feature version will be effectively removed from the output otherwise.
        with_store_info: If `True`, return a tuple of (LazyFrame, MetadataStore) where
            the MetadataStore is the store that actually contained the feature (which
            may be a fallback store if allow_fallback=True).

    Returns:
        Narwhals `LazyFrame` with metadata, or, if `with_store_info=True`, a tuple of (`LazyFrame`, `MetadataStore`) containing the metadata store which has been actually used to retrieve the feature (may be a fallback store).

    Raises:
        FeatureNotFoundError: If feature not found in any store
        SystemDataNotFoundError: When attempting to read non-existent Metaxy system data
        ValueError: If both feature_version and with_feature_history=False are provided

    !!! info
        When this method is called with default arguments, it will return the latest (by `metaxy_created_at`)
        metadata for the current feature version excluding soft-deleted rows. Therefore, it's perfectly suitable
        for most use cases.

    !!! warning
        The order of rows is not guaranteed.
    """
    import metaxy as mx
    from metaxy.config import MetaxyConfig

    self._check_open()

    filters = filters or []
    columns = columns or []

    feature_key = self._resolve_feature_key(feature)
    is_system_table = self._is_system_table(feature_key)

    # Sync external features if auto-sync is enabled (default)
    # This call is a no-op most of the time and is very lightweight when it's not
    # Skip for system tables to avoid infinite recursion (sync_external_features reads system tables)
    if not is_system_table and MetaxyConfig.get(_allow_default_config=True).sync:
        mx.sync_external_features(self)

    # If caller wants soft-deleted records, do not filter them out later
    filter_deleted = not include_soft_deleted and not is_system_table

    # Validate mutually exclusive parameters
    if feature_version is not None and not with_feature_history:
        raise ValueError(
            "Cannot specify both feature_version and with_feature_history=False. "
            "Use with_feature_history=True with feature_version parameter."
        )

    # Separate system filters (applied before dedup) from user filters (applied after dedup)
    # System filters like feature_version need to be applied early to reduce data volume
    # User filters should see the deduplicated view of the data
    system_filters: list[nw.Expr] = []
    user_filters = list(filters) if filters else []

    # Add feature_version filter only when needed (this is a system filter)
    if not with_feature_history or feature_version is not None and not is_system_table:
        version_filter = nw.col(METAXY_FEATURE_VERSION) == (
            current_graph().get_feature_version(feature_key) if not with_feature_history else feature_version
        )
        system_filters.append(version_filter)

    # If user filters are provided, we need to read all columns since filters may
    # reference columns not in the requested columns list. Column selection happens
    # after filtering
    if user_filters:
        read_columns = None
    elif columns and not is_system_table:
        # Add only system columns that aren't already in the user's columns list
        columns_set = set(columns)
        missing_system_cols = [c for c in ALL_SYSTEM_COLUMNS if c not in columns_set]
        read_columns = [*columns, *missing_system_cols]
    else:
        read_columns = None

    lazy_frame = None
    try:
        # Only pass system filters to _read_feature
        # User filters will be applied after deduplication
        lazy_frame = self._read_feature(
            feature, filters=system_filters if system_filters else None, columns=read_columns
        )
    except FeatureNotFoundError as e:
        # do not read system features from fallback stores
        if is_system_table:
            raise SystemDataNotFoundError(
                f"System Metaxy data with key {feature_key} is missing in {self}. Invoke `metaxy push` before attempting to read system data."
            ) from e

    # Handle case where _read_feature returns None (no exception raised)
    if lazy_frame is None and is_system_table:
        raise SystemDataNotFoundError(
            f"System Metaxy data with key {feature_key} is missing in {self}. Invoke `metaxy push` before attempting to read system data."
        )

    if lazy_frame is not None and not is_system_table:
        # Deduplicate first, then filter soft-deleted rows
        if not with_sample_history:
            id_cols = list(self._resolve_feature_plan(feature_key).feature.id_columns)
            # Treat soft-deletes like hard deletes by ordering on the
            # most recent lifecycle timestamp.
            lazy_frame = self.versioning_engine_cls.keep_latest_by_group(
                df=lazy_frame,
                group_columns=id_cols,
                timestamp_columns=[METAXY_DELETED_AT, METAXY_UPDATED_AT],
            )

        if filter_deleted:
            lazy_frame = lazy_frame.filter(nw.col(METAXY_DELETED_AT).is_null())

        # Apply user filters AFTER deduplication so they see the latest version of each row
        for user_filter in user_filters:
            lazy_frame = lazy_frame.filter(user_filter)

    # For system tables, apply user filters directly (no dedup needed)
    if lazy_frame is not None and is_system_table:
        for user_filter in user_filters:
            lazy_frame = lazy_frame.filter(user_filter)

    if lazy_frame is not None:
        # After dedup and user filters, filter to requested columns if specified
        if columns:
            lazy_frame = lazy_frame.select(columns)

        if with_store_info:
            return lazy_frame, self
        return lazy_frame

    # Try fallback stores (opened on demand)
    if allow_fallback:
        for store in self.fallback_stores:
            try:
                # Open fallback store on demand for reading
                with store:
                    # Use full read to handle nested fallback chains
                    if with_store_info:
                        return store.read(
                            feature,
                            feature_version=feature_version,
                            filters=filters,
                            columns=columns,
                            allow_fallback=True,
                            with_feature_history=with_feature_history,
                            with_sample_history=with_sample_history,
                            include_soft_deleted=include_soft_deleted,
                            with_store_info=True,
                        )
                    return store.read(
                        feature,
                        feature_version=feature_version,
                        filters=filters,
                        columns=columns,
                        allow_fallback=True,
                        with_feature_history=with_feature_history,
                        with_sample_history=with_sample_history,
                        include_soft_deleted=include_soft_deleted,
                    )
            except FeatureNotFoundError:
                # Try next fallback store
                continue

    # Not found anywhere
    raise FeatureNotFoundError(
        f"Feature {feature_key.to_string()} not found in store" + (" or fallback stores" if allow_fallback else "")
    )

metaxy.MetadataStore.write

write(
    feature: CoercibleToFeatureKey,
    df: IntoFrame,
    materialization_id: str | None = None,
) -> None

Write metadata for a feature (append-only by design).

Automatically adds the Metaxy system columns, unless they already exist in the DataFrame.

Parameters:

  • feature (CoercibleToFeatureKey) –

    Feature to write metadata for

  • df (IntoFrame) –

    Metadata DataFrame of any type supported by Narwhals. Must have metaxy_provenance_by_field column of type Struct with fields matching feature's fields. Optionally, may also contain metaxy_data_version_by_field.

  • materialization_id (str | None, default: None ) –

    Optional external orchestration ID for this write. Overrides the store's default materialization_id if provided. Useful for tracking which orchestration run produced this metadata.

Raises:

Note: - Must be called within a MetadataStore.open(mode="w") context manager.

- Metaxy always performs an "append" operation. Metadata is never deleted or mutated.

- Fallback stores are never used for writes.
Source code in src/metaxy/metadata_store/base.py
def write(
    self,
    feature: CoercibleToFeatureKey,
    df: IntoFrame,
    materialization_id: str | None = None,
) -> None:
    """
    Write metadata for a feature (append-only by design).

    Automatically adds the Metaxy system columns, unless they already exist in the DataFrame.

    Args:
        feature: Feature to write metadata for
        df: Metadata DataFrame of any type supported by [Narwhals](https://narwhals-dev.github.io/narwhals/).
            Must have `metaxy_provenance_by_field` column of type Struct with fields matching feature's fields.
            Optionally, may also contain `metaxy_data_version_by_field`.
        materialization_id: Optional external orchestration ID for this write.
            Overrides the store's default `materialization_id` if provided.
            Useful for tracking which orchestration run produced this metadata.

    Raises:
        MetadataSchemaError: If DataFrame schema is invalid
        StoreNotOpenError: If store is not open
    Note:
        - Must be called within a `MetadataStore.open(mode="w")` context manager.

        - Metaxy always performs an "append" operation. Metadata is never deleted or mutated.

        - Fallback stores are never used for writes.

    """
    self._check_open()

    feature_key = self._resolve_feature_key(feature)
    is_system_table = self._is_system_table(feature_key)

    # Convert Polars to Narwhals to Polars if needed
    # if isinstance(df_nw, (pl.DataFrame, pl.LazyFrame)):
    df_nw = nw.from_native(df)

    assert isinstance(df_nw, (nw.DataFrame, nw.LazyFrame)), f"df must be a Narwhals DataFrame, got {type(df_nw)}"

    # For system tables, write directly without feature_version tracking
    if is_system_table:
        self._validate_schema_system_table(df_nw)
        self._write_feature(feature_key, df_nw)
        return

    # Use collect_schema().names() to avoid PerformanceWarning on lazy frames
    if METAXY_PROVENANCE_BY_FIELD not in df_nw.collect_schema().names():
        from metaxy.metadata_store.exceptions import MetadataSchemaError

        raise MetadataSchemaError(f"DataFrame must have '{METAXY_PROVENANCE_BY_FIELD}' column")

    # Add all required system columns
    # warning: for dataframes that do not match the native MetadataStore implementation
    # and are missing the METAXY_DATA_VERSION column, this call will lead to materializing the equivalent Polars DataFrame
    # while calculating the missing METAXY_DATA_VERSION column
    df_nw = self._add_system_columns(df_nw, feature, materialization_id=materialization_id)

    self._validate_schema(df_nw)
    self._write_feature(feature_key, df_nw)

metaxy.MetadataStore.write_multi

write_multi(
    metadata: Mapping[Any, IntoFrame],
    materialization_id: str | None = None,
) -> None

Write metadata for multiple features in reverse topological order.

Processes features so that dependents are written before their dependencies. This ordering ensures that downstream features are written first, which can be useful for certain data consistency requirements or when features need to be processed in a specific order.

Parameters:

  • metadata (Mapping[Any, IntoFrame]) –

    Mapping from feature keys to metadata DataFrames. Keys can be any type coercible to FeatureKey (string, sequence, FeatureKey, or BaseFeature class). Values must be DataFrames compatible with Narwhals, containing required system columns.

  • materialization_id (str | None, default: None ) –

    Optional external orchestration ID for all writes. Overrides the store's default materialization_id if provided. Applied to all feature writes in this batch.

Raises:

Note
  • Must be called within a MetadataStore.open(mode="w") context manager.
  • Empty mappings are handled gracefully (no-op).
  • Each feature's metadata is written via write, so all validation and system column handling from that method applies.
Example
with store.open(mode="w"):
    store.write_multi(
        {
            ChildFeature: child_df,
            ParentFeature: parent_df,
        }
    )
# Features are written in reverse topological order:
# ChildFeature first, then ParentFeature
Source code in src/metaxy/metadata_store/base.py
def write_multi(
    self,
    metadata: Mapping[Any, IntoFrame],
    materialization_id: str | None = None,
) -> None:
    """
    Write metadata for multiple features in reverse topological order.

    Processes features so that dependents are written before their dependencies.
    This ordering ensures that downstream features are written first, which can
    be useful for certain data consistency requirements or when features need
    to be processed in a specific order.

    Args:
        metadata: Mapping from feature keys to metadata DataFrames.
            Keys can be any type coercible to FeatureKey (string, sequence,
            FeatureKey, or BaseFeature class). Values must be DataFrames
            compatible with Narwhals, containing required system columns.
        materialization_id: Optional external orchestration ID for all writes.
            Overrides the store's default `materialization_id` if provided.
            Applied to all feature writes in this batch.

    Raises:
        MetadataSchemaError: If any DataFrame schema is invalid
        StoreNotOpenError: If store is not open
        ValueError: If writing to a feature from a different project than expected

    Note:
        - Must be called within a `MetadataStore.open(mode="w")` context manager.
        - Empty mappings are handled gracefully (no-op).
        - Each feature's metadata is written via `write`, so all
          validation and system column handling from that method applies.

    Example:
        <!-- skip next -->
        ```py
        with store.open(mode="w"):
            store.write_multi(
                {
                    ChildFeature: child_df,
                    ParentFeature: parent_df,
                }
            )
        # Features are written in reverse topological order:
        # ChildFeature first, then ParentFeature
        ```
    """
    if not metadata:
        return

    # Build mapping from resolved keys to dataframes in one pass
    resolved_metadata = {self._resolve_feature_key(key): df for key, df in metadata.items()}

    # Get reverse topological order (dependents first)
    graph = current_graph()
    sorted_keys = graph.topological_sort_features(list(resolved_metadata.keys()), descending=True)

    # Write metadata in reverse topological order
    for feature_key in sorted_keys:
        self.write(
            feature_key,
            resolved_metadata[feature_key],
            materialization_id=materialization_id,
        )

metaxy.MetadataStore.config_model abstractmethod classmethod

config_model() -> type[MetadataStoreConfig]

Return the configuration model class for this store type.

Subclasses must override this to return their specific config class.

Returns:

Note

Subclasses override this with a more specific return type. Type checkers may show a warning about incompatible override, but this is intentional - each store returns its own config type.

Source code in src/metaxy/metadata_store/base.py
@classmethod
@abstractmethod
def config_model(cls) -> type[MetadataStoreConfig]:
    """Return the configuration model class for this store type.

    Subclasses must override this to return their specific config class.

    Returns:
        The config class type (e.g., DuckDBMetadataStoreConfig)

    Note:
        Subclasses override this with a more specific return type.
        Type checkers may show a warning about incompatible override,
        but this is intentional - each store returns its own config type.
    """
    ...

metaxy.MetadataStore.from_config classmethod

from_config(
    config: MetadataStoreConfig, **kwargs: Any
) -> Self

Create a store instance from a configuration object.

This method creates a store by: 1. Converting the config to a dict 2. Resolving fallback store names to actual store instances 3. Calling the store's init with the config parameters

Parameters:

  • config (MetadataStoreConfig) –

    Configuration object (should be the type returned by config_model())

  • **kwargs (Any, default: {} ) –

    Additional arguments passed directly to the store constructor (e.g., materialization_id for runtime parameters not in config)

Returns:

  • Self –

    A new store instance configured according to the config object

Example
from metaxy.ext.metadata_stores.duckdb import (
    DuckDBMetadataStore,
    DuckDBMetadataStoreConfig,
)

config = DuckDBMetadataStoreConfig(
    database="metadata.db",
    fallback_stores=["prod"],
)

store = DuckDBMetadataStore.from_config(config)
Source code in src/metaxy/metadata_store/base.py
@classmethod
def from_config(cls, config: MetadataStoreConfig, **kwargs: Any) -> Self:
    """Create a store instance from a configuration object.

    This method creates a store by:
    1. Converting the config to a dict
    2. Resolving fallback store names to actual store instances
    3. Calling the store's __init__ with the config parameters

    Args:
        config: Configuration object (should be the type returned by config_model())
        **kwargs: Additional arguments passed directly to the store constructor
            (e.g., materialization_id for runtime parameters not in config)

    Returns:
        A new store instance configured according to the config object

    Example:
        <!-- skip next -->
        ```python
        from metaxy.ext.metadata_stores.duckdb import (
            DuckDBMetadataStore,
            DuckDBMetadataStoreConfig,
        )

        config = DuckDBMetadataStoreConfig(
            database="metadata.db",
            fallback_stores=["prod"],
        )

        store = DuckDBMetadataStore.from_config(config)
        ```
    """
    from metaxy.metadata_store.fallback import FallbackStoreList

    config_dict = config.model_dump(exclude_unset=True)
    fallback_store_names = config_dict.pop("fallback_stores", [])
    store = cls(**config_dict, **kwargs)
    if fallback_store_names:
        store.fallback_stores = FallbackStoreList(
            fallback_store_names,
            config=MetaxyConfig.get(),
            parent_hash_algorithm=store.hash_algorithm,
        )
    return store

metaxy.MetadataStore.native_implementation

native_implementation() -> Implementation

Get the native Narwhals implementation for this store's backend.

Source code in src/metaxy/metadata_store/base.py
def native_implementation(self) -> nw.Implementation:
    """Get the native Narwhals implementation for this store's backend."""
    return self.versioning_engine_cls.implementation()

metaxy.MetadataStore.create_versioning_engine

create_versioning_engine(
    plan: FeaturePlan, implementation: Implementation
) -> Iterator[VersioningEngine | PolarsVersioningEngine]

Creates an appropriate provenance engine.

Falls back to Polars implementation if the required implementation differs from the store's native implementation.

Parameters:

  • plan (FeaturePlan) –

    The feature plan.

  • implementation (Implementation) –

    The desired engine implementation.

Returns:

  • Iterator[VersioningEngine | PolarsVersioningEngine] –

    An appropriate provenance engine.

Source code in src/metaxy/metadata_store/base.py
@contextmanager
def create_versioning_engine(
    self, plan: FeaturePlan, implementation: nw.Implementation
) -> Iterator[VersioningEngine | PolarsVersioningEngine]:
    """
    Creates an appropriate provenance engine.

    Falls back to Polars implementation if the required implementation differs from the store's native implementation.

    Args:
        plan: The feature plan.
        implementation: The desired engine implementation.

    Returns:
        An appropriate provenance engine.
    """

    if implementation == nw.Implementation.POLARS:
        cm = self._create_polars_versioning_engine(plan)
    elif implementation == self.native_implementation():
        cm = self._create_versioning_engine(plan)
    else:
        cm = self._create_polars_versioning_engine(plan)

    with cm as engine:
        yield engine

metaxy.MetadataStore.open abstractmethod

open(mode: AccessMode = 'r') -> Iterator[Self]

Open/initialize the store for operations.

Context manager that opens the store with specified access mode. Called internally by __enter__. Child classes should implement backend-specific connection setup/teardown here.

Parameters:

  • mode (AccessMode, default: 'r' ) –

    Access mode for this connection session.

Yields:

  • Self ( Self ) –

    The store instance with connection open

Note

Users should prefer using with store: pattern except when write access mode is needed.

Source code in src/metaxy/metadata_store/base.py
@abstractmethod
@contextmanager
def open(self, mode: AccessMode = "r") -> Iterator[Self]:
    """Open/initialize the store for operations.

    Context manager that opens the store with specified access mode.
    Called internally by `__enter__`.
    Child classes should implement backend-specific connection setup/teardown here.

    Args:
        mode: Access mode for this connection session.

    Yields:
        Self: The store instance with connection open

    Note:
        Users should prefer using `with store:` pattern except when write access mode is needed.
    """
    ...

metaxy.MetadataStore.__enter__

__enter__() -> Self

Enter context manager - opens store in READ mode by default.

Use MetadataStore.open for write access mode instead.

Returns:

  • Self ( Self ) –

    The opened store instance

Source code in src/metaxy/metadata_store/base.py
def __enter__(self) -> Self:
    """Enter context manager - opens store in READ mode by default.

    Use [`MetadataStore.open`][metaxy.metadata_store.base.MetadataStore.open] for write access mode instead.

    Returns:
        Self: The opened store instance
    """
    # Determine mode based on auto_create_tables
    mode = "w" if self.auto_create_tables else "r"

    # Open the store (open() manages _context_depth internally)
    self._open_cm = self.open(mode)  # ty: ignore[invalid-assignment]
    self._open_cm.__enter__()  # ty: ignore[possibly-missing-attribute]

    return self

metaxy.MetadataStore.validate_hash_algorithm

validate_hash_algorithm(
    check_fallback_stores: bool = True,
) -> None

Validate that hash algorithm is supported by this store's components.

Public method - can be called to verify hash compatibility.

Parameters:

  • check_fallback_stores (bool, default: True ) –

    If True, also validate hash is supported by fallback stores (ensures compatibility for future cross-store operations)

Raises:

  • ValueError –

    If hash algorithm not supported by components or fallback stores

Source code in src/metaxy/metadata_store/base.py
def validate_hash_algorithm(
    self,
    check_fallback_stores: bool = True,
) -> None:
    """Validate that hash algorithm is supported by this store's components.

    Public method - can be called to verify hash compatibility.

    Args:
        check_fallback_stores: If True, also validate hash is supported by
            fallback stores (ensures compatibility for future cross-store operations)

    Raises:
        ValueError: If hash algorithm not supported by components or fallback stores
    """
    # Validate hash algorithm support without creating a full engine
    # (engine creation requires a graph which isn't available during store init)
    self._validate_hash_algorithm_support()

    # Check fallback stores
    if check_fallback_stores:
        for fallback in self.fallback_stores:
            fallback.validate_hash_algorithm(check_fallback_stores=False)

metaxy.MetadataStore.drop_feature_metadata

drop_feature_metadata(
    feature: CoercibleToFeatureKey,
) -> None

Drop all metadata for a feature.

This removes all stored metadata for the specified feature from the store. Useful for cleanup in tests or when re-computing feature metadata from scratch.

Warning

This operation is irreversible and will permanently delete all metadata for the specified feature.

Parameters:

Example
with store_with_data.open(mode="w"):
    assert store_with_data.has_feature(MyFeature)
    store_with_data.drop_feature_metadata(MyFeature)
Source code in src/metaxy/metadata_store/base.py
def drop_feature_metadata(self, feature: CoercibleToFeatureKey) -> None:
    """Drop all metadata for a feature.

    This removes all stored metadata for the specified feature from the store.
    Useful for cleanup in tests or when re-computing feature metadata from scratch.

    Warning:
        This operation is irreversible and will **permanently delete all metadata** for the specified feature.

    Args:
        feature: Feature class or key to drop metadata for

    Example:
        ```py
        with store_with_data.open(mode="w"):
            assert store_with_data.has_feature(MyFeature)
            store_with_data.drop_feature_metadata(MyFeature)
        ```
    """
    self._check_open()
    feature_key = self._resolve_feature_key(feature)
    if self._is_system_table(feature_key):
        raise NotImplementedError(f"{self.__class__.__name__} does not support deletes for system tables")
    self._drop_feature(feature_key)

metaxy.MetadataStore.delete

delete(
    feature: CoercibleToFeatureKey,
    filters: Sequence[Expr] | Expr | None,
    *,
    soft: bool = True,
    with_feature_history: bool = False,
    with_sample_history: bool = False,
) -> None

Delete records matching provided filters.

Performs a soft delete by default. This is achieved by setting metaxy_deleted_at to the current timestamp. Subsequent [[MetadataStore.read]] calls would ignore these records by default.

Parameters:

  • feature (CoercibleToFeatureKey) –

    Feature to delete from.

  • filters (Sequence[Expr] | Expr | None) –

    One or more Narwhals expressions or a sequence of expressions that determine which records to delete. If None, deletes all records (subject to with_feature_history and with_sample_history constraints).

  • soft (bool, default: True ) –

    Whether to perform a soft delete.

  • with_feature_history (bool, default: False ) –

    If True, delete across all historical feature versions. If False (default), only current version.

  • with_sample_history (bool, default: False ) –

    If True, include all historical materializations. If False (default), deduplicate to latest rows.

Critical

By default, deletions target historical records. Even when with_feature_history is False, records with the same feature version but an older metaxy_created_at would be targeted as well. Consider adding additional conditions to filters if you want to avoid that.

Source code in src/metaxy/metadata_store/base.py
def delete(
    self,
    feature: CoercibleToFeatureKey,
    filters: Sequence[nw.Expr] | nw.Expr | None,
    *,
    soft: bool = True,
    with_feature_history: bool = False,
    with_sample_history: bool = False,
) -> None:
    """Delete records matching provided filters.

    Performs a soft delete by default. This is achieved by setting metaxy_deleted_at to the current timestamp.
    Subsequent [[MetadataStore.read]] calls would ignore these records by default.

    Args:
        feature: Feature to delete from.
        filters: One or more Narwhals expressions or a sequence of expressions that determine which records to delete.
            If `None`, deletes all records (subject to `with_feature_history` and `with_sample_history` constraints).
        soft: Whether to perform a soft delete.
        with_feature_history: If True, delete across all historical feature versions. If False (default), only current version.
        with_sample_history: If True, include all historical materializations. If False (default), deduplicate to latest rows.

    !!! critical
        By default, deletions target historical records. Even when `with_feature_history` is `False`,
        records with the same feature version but an older `metaxy_created_at` would be targeted as
        well. Consider adding additional conditions to `filters` if you want to avoid that.
    """
    self._check_open()
    feature_key = self._resolve_feature_key(feature)

    # Normalize filters to list
    if filters is None:
        filter_list: list[nw.Expr] = []
    elif isinstance(filters, nw.Expr):
        filter_list = [filters]
    else:
        filter_list = list(filters)

    if soft:
        # Soft delete: mark records with deletion timestamp, preserving original updated_at
        lazy = self.read(
            feature_key,
            filters=filter_list,
            include_soft_deleted=False,
            with_feature_history=with_feature_history,
            with_sample_history=with_sample_history,
            allow_fallback=True,
        )
        with self._shared_transaction_timestamp(soft_delete=True) as ts:
            soft_deletion_marked = lazy.with_columns(
                nw.lit(ts).alias(METAXY_DELETED_AT),
            )
            self.write(feature_key, soft_deletion_marked.to_native())
    else:
        # Hard delete: add version filter if needed, then delegate to backend
        if not with_feature_history and not self._is_system_table(feature_key):
            version_filter = nw.col(METAXY_FEATURE_VERSION) == current_graph().get_feature_version(feature_key)
            filter_list = [version_filter, *filter_list]

        self._delete_feature(feature_key, filter_list, with_feature_history=with_feature_history)

metaxy.MetadataStore.read_feature_schema_from_store

read_feature_schema_from_store(
    feature: CoercibleToFeatureKey,
) -> Schema

Read the schema for a feature from the store.

Parameters:

Returns:

  • Schema –

    Narwhals schema for the feature

Raises:

Source code in src/metaxy/metadata_store/base.py
def read_feature_schema_from_store(
    self,
    feature: CoercibleToFeatureKey,
) -> nw.Schema:
    """Read the schema for a feature from the store.

    Args:
        feature: Feature to read schema for

    Returns:
        Narwhals schema for the feature

    Raises:
        FeatureNotFoundError: If feature not found in the store
    """
    lazy = self.read(
        feature,
        allow_fallback=False,
    )
    return lazy.collect_schema()

metaxy.MetadataStore.has_feature

has_feature(
    feature: CoercibleToFeatureKey,
    *,
    check_fallback: bool = False,
) -> bool

Check if feature exists in store.

Parameters:

  • feature (CoercibleToFeatureKey) –

    Feature to check

  • check_fallback (bool, default: False ) –

    If True, also check fallback stores

Returns:

  • bool –

    True if feature exists, False otherwise

Source code in src/metaxy/metadata_store/base.py
def has_feature(
    self,
    feature: CoercibleToFeatureKey,
    *,
    check_fallback: bool = False,
) -> bool:
    """
    Check if feature exists in store.

    Args:
        feature: Feature to check
        check_fallback: If True, also check fallback stores

    Returns:
        True if feature exists, False otherwise
    """
    self._check_open()

    if self._read_feature(feature) is not None:
        return True

    # Check fallback stores
    if not check_fallback:
        return self._has_feature_impl(feature)
    else:
        for store in self.fallback_stores:
            if store.has_feature(feature, check_fallback=True):
                return True

    return False

metaxy.MetadataStore.display abstractmethod

display() -> str

Return a human-readable display string for this store.

Used in warnings, logs, and CLI output to identify the store.

Returns:

  • str –

    Display string (e.g., "DuckDBMetadataStore(database=/path/to/db.duckdb)")

Source code in src/metaxy/metadata_store/base.py
@abstractmethod
def display(self) -> str:
    """Return a human-readable display string for this store.

    Used in warnings, logs, and CLI output to identify the store.

    Returns:
        Display string (e.g., "DuckDBMetadataStore(database=/path/to/db.duckdb)")
    """
    pass

metaxy.MetadataStore.__repr__

__repr__() -> str

Return the display string with optional name prefix.

Source code in src/metaxy/metadata_store/base.py
def __repr__(self) -> str:
    """Return the display string with optional name prefix."""
    return self._format_display_with_name(self.display())

metaxy.MetadataStore.find_store_for_feature

find_store_for_feature(
    feature_key: CoercibleToFeatureKey,
    *,
    check_fallback: bool = True,
) -> MetadataStore | None

Find the store that contains the given feature.

Parameters:

  • feature_key (CoercibleToFeatureKey) –

    Feature to find

  • check_fallback (bool, default: True ) –

    Whether to check fallback stores when the feature is not found in the current store

Returns:

  • MetadataStore | None –

    The store containing the feature, or None if not found

Source code in src/metaxy/metadata_store/base.py
def find_store_for_feature(
    self,
    feature_key: CoercibleToFeatureKey,
    *,
    check_fallback: bool = True,
) -> MetadataStore | None:
    """Find the store that contains the given feature.

    Args:
        feature_key: Feature to find
        check_fallback: Whether to check fallback stores when the feature
            is not found in the current store

    Returns:
        The store containing the feature, or None if not found
    """
    self._check_open()

    # Check if feature exists in this store
    if self.has_feature(feature_key):
        return self

    # Try fallback stores if enabled (opened on demand)
    if check_fallback:
        for store in self.fallback_stores:
            with store:
                found = store.find_store_for_feature(feature_key, check_fallback=True)
                if found is not None:
                    return found

    return None

metaxy.MetadataStore.get_store_metadata

get_store_metadata(
    feature_key: CoercibleToFeatureKey,
    *,
    check_fallback: bool = True,
) -> dict[str, Any]

Arbitrary key-value pairs with useful metadata for logging purposes (like a path in storage).

This method should not expose sensitive information.

Parameters:

  • feature_key (CoercibleToFeatureKey) –

    Feature to get metadata for

  • check_fallback (bool, default: True ) –

    Whether to check fallback stores when the feature is not found in the current store

Returns:

  • dict[str, Any] –

    Dictionary with store-specific metadata. Contains:

  • dict[str, Any] –
    • name, display: The queried store (self)
  • dict[str, Any] –
    • resolved_from: Where the feature was actually found (may be a fallback store), includes name, display, and store-specific fields like table_name or uri
Source code in src/metaxy/metadata_store/base.py
def get_store_metadata(
    self,
    feature_key: CoercibleToFeatureKey,
    *,
    check_fallback: bool = True,
) -> dict[str, Any]:
    """Arbitrary key-value pairs with useful metadata for logging purposes (like a path in storage).

    This method should not expose sensitive information.

    Args:
        feature_key: Feature to get metadata for
        check_fallback: Whether to check fallback stores when the feature
            is not found in the current store

    Returns:
        Dictionary with store-specific metadata. Contains:
        - `name`, `display`: The queried store (self)
        - `resolved_from`: Where the feature was actually found (may be a fallback store),
            includes `name`, `display`, and store-specific fields like `table_name` or `uri`
    """
    resolved_store = self.find_store_for_feature(feature_key, check_fallback=check_fallback)
    result: dict[str, Any] = {
        "name": self.name,
        "display": self.display(),
    }
    if resolved_store is not None:
        result["resolved_from"] = resolved_store.get_store_info(feature_key)
    return result

metaxy.MetadataStore.get_store_info

get_store_info(
    feature_key: CoercibleToFeatureKey,
) -> dict[str, Any]

Build a dictionary with store identification and feature-specific metadata.

Parameters:

Returns:

  • dict[str, Any] –

    Dictionary containing:

  • dict[str, Any] –
    • name: The store name
  • dict[str, Any] –
    • type: The fully qualified class name (module.classname)
  • dict[str, Any] –
    • display: Human-readable store description
  • dict[str, Any] –
    • Store-specific metadata from _get_store_metadata_impl (e.g., table_name, uri)
Source code in src/metaxy/metadata_store/base.py
def get_store_info(self, feature_key: CoercibleToFeatureKey) -> dict[str, Any]:
    """Build a dictionary with store identification and feature-specific metadata.

    Args:
        feature_key: Feature to get metadata for

    Returns:
        Dictionary containing:
        - `name`: The store name
        - `type`: The fully qualified class name (module.classname)
        - `display`: Human-readable store description
        - Store-specific metadata from `_get_store_metadata_impl` (e.g., `table_name`, `uri`)
    """
    return {
        "name": self.name,
        "type": self.qualified_class_name,
        "display": self.display(),
        **self._get_store_metadata_impl(feature_key),
    }

metaxy.MetadataStore.calculate_input_progress

calculate_input_progress(
    lazy_increment: LazyIncrement,
    feature_key: CoercibleToFeatureKey,
) -> float | None

Calculate progress percentage from lazy increment.

Uses the input field from LazyIncrement to count total input units and compares with added to determine how many are missing.

Progress represents the percentage of input units that have been processed at least once. Stale samples (in changed) are counted as processed since they have existing metadata, even though they may need re-processing due to upstream changes.

Parameters:

  • lazy_increment (LazyIncrement) –

    The lazy increment containing input and added dataframes.

  • feature_key (CoercibleToFeatureKey) –

    The feature key to look up lineage information.

Returns:

  • float | None –

    Progress percentage (0-100), or None if input is not available.

Source code in src/metaxy/metadata_store/base.py
def calculate_input_progress(
    self,
    lazy_increment: LazyIncrement,
    feature_key: CoercibleToFeatureKey,
) -> float | None:
    """Calculate progress percentage from lazy increment.

    Uses the `input` field from LazyIncrement to count total input units
    and compares with `added` to determine how many are missing.

    Progress represents the percentage of input units that have been processed
    at least once. Stale samples (in `changed`) are counted as processed since
    they have existing metadata, even though they may need re-processing due to
    upstream changes.

    Args:
        lazy_increment: The lazy increment containing input and added dataframes.
        feature_key: The feature key to look up lineage information.

    Returns:
        Progress percentage (0-100), or None if input is not available.
    """
    if lazy_increment.input is None:
        return None

    key = self._resolve_feature_key(feature_key)
    graph = current_graph()
    plan = graph.get_feature_plan(key)

    # Get the columns that define input units from the feature plan
    input_id_columns = plan.input_id_columns

    # Count distinct input units using two separate queries
    # We can't use concat because input and added may have different schemas
    # (e.g., nullable vs non-nullable columns)
    total_units: int = lazy_increment.input.select(input_id_columns).unique().select(nw.len()).collect().item()

    if total_units == 0:
        return None  # No input available from upstream

    missing_units: int = lazy_increment.new.select(input_id_columns).unique().select(nw.len()).collect().item()

    processed_units = total_units - missing_units
    return (processed_units / total_units) * 100

metaxy.MetadataStore.copy_metadata

copy_metadata(
    from_store: MetadataStore,
    features: Sequence[CoercibleToFeatureKey] | None = None,
    *,
    filters: Mapping[str, Sequence[Expr]] | None = None,
    global_filters: Sequence[Expr] | None = None,
    with_feature_history: bool = True,
    with_sample_history: bool = False,
) -> dict[str, int]

Copy metadata from another store.

Parameters:

  • from_store (MetadataStore) –

    Source metadata store to copy from (must be opened for reading)

  • features (Sequence[CoercibleToFeatureKey] | None, default: None ) –

    Features to copy. Can be:

    • None: copies all features from the active graph

    • Sequence of specific features to copy

  • filters (Mapping[str, Sequence[Expr]] | None, default: None ) –

    Dict mapping feature keys (as strings) to sequences of Narwhals filter expressions. These filters are applied when reading from the source store. Example: {"feature/key": [nw.col("x") > 10], "other/feature": [...]}

  • global_filters (Sequence[Expr] | None, default: None ) –

    Sequence of Narwhals filter expressions applied to all features. These filters are combined with any feature-specific filters from filters. Example: [nw.col("sample_uid").is_in(["s1", "s2"])]

  • with_feature_history (bool, default: True ) –

    If True (default), include rows from all historical feature versions. If False, only copy rows with the current feature_version.

  • with_sample_history (bool, default: False ) –

    If True, include all historical materializations per sample. If False (default), deduplicate within id_columns groups by keeping latest.

Returns:

  • dict[str, int] –

    Dict with statistics: {"features_copied": int, "rows_copied": int}

Raises:

Examples:

# Copy all features
with source_store, dest_store.open("w"):
    stats = dest_store.copy_metadata(from_store=source_store)
# Copy specific features
with source_store, dest_store.open("w"):
    stats = dest_store.copy_metadata(
        from_store=source_store,
        features=[mx.FeatureKey("my_feature")],
    )
# Copy with global filters applied to all features
with source_store, dest_store.open("w"):
    stats = dest_store.copy_metadata(
        from_store=source_store,
        global_filters=[nw.col("id").is_in(["a", "b"])],
    )
# Copy specific features with per-feature filters
with source_store, dest_store.open("w"):
    stats = dest_store.copy_metadata(
        from_store=source_store,
        features=[
            mx.FeatureKey("feature_a"),
            mx.FeatureKey("feature_b"),
        ],
        filters={
            "feature_a": [nw.col("field_a") > 10],
            "feature_b": [nw.col("field_b") < 30],
        },
    )
Source code in src/metaxy/metadata_store/base.py
def copy_metadata(
    self,
    from_store: MetadataStore,
    features: Sequence[CoercibleToFeatureKey] | None = None,
    *,
    filters: Mapping[str, Sequence[nw.Expr]] | None = None,
    global_filters: Sequence[nw.Expr] | None = None,
    with_feature_history: bool = True,
    with_sample_history: bool = False,
) -> dict[str, int]:
    """Copy metadata from another store.

    Args:
        from_store: Source metadata store to copy from (must be opened for reading)
        features: Features to copy. Can be:

            - `None`: copies all features from the active graph

            - Sequence of specific features to copy

        filters: Dict mapping feature keys (as strings) to sequences of Narwhals filter expressions.
            These filters are applied when reading from the source store.
            Example: {"feature/key": [nw.col("x") > 10], "other/feature": [...]}
        global_filters: Sequence of Narwhals filter expressions applied to all features.
            These filters are combined with any feature-specific filters from `filters`.
            Example: [nw.col("sample_uid").is_in(["s1", "s2"])]
        with_feature_history: If True (default), include rows from all historical feature versions.
            If False, only copy rows with the current feature_version.
        with_sample_history: If True, include all historical materializations per sample.
            If False (default), deduplicate within `id_columns` groups by keeping latest.

    Returns:
        Dict with statistics: {"features_copied": int, "rows_copied": int}

    Raises:
        ValueError: If source or destination store is not open
        FeatureNotFoundError: If a specified feature doesn't exist in source store

    Examples:
        <!-- skip next -->
        ```py
        # Copy all features
        with source_store, dest_store.open("w"):
            stats = dest_store.copy_metadata(from_store=source_store)
        ```

        <!-- skip next -->
        ```py
        # Copy specific features
        with source_store, dest_store.open("w"):
            stats = dest_store.copy_metadata(
                from_store=source_store,
                features=[mx.FeatureKey("my_feature")],
            )
        ```

        <!-- skip next -->
        ```py
        # Copy with global filters applied to all features
        with source_store, dest_store.open("w"):
            stats = dest_store.copy_metadata(
                from_store=source_store,
                global_filters=[nw.col("id").is_in(["a", "b"])],
            )
        ```

        <!-- skip next -->
        ```py
        # Copy specific features with per-feature filters
        with source_store, dest_store.open("w"):
            stats = dest_store.copy_metadata(
                from_store=source_store,
                features=[
                    mx.FeatureKey("feature_a"),
                    mx.FeatureKey("feature_b"),
                ],
                filters={
                    "feature_a": [nw.col("field_a") > 10],
                    "feature_b": [nw.col("field_b") < 30],
                },
            )
        ```
    """
    import logging

    logger = logging.getLogger(__name__)

    # Validate both stores are open
    if not self._is_open:
        raise ValueError('Destination store must be opened with store.open("w") before use')
    if not from_store._is_open:
        raise ValueError("Source store must be opened with store before use")

    return self._copy_metadata_impl(
        from_store=from_store,
        features=features,
        filters=filters,
        global_filters=global_filters,
        with_feature_history=with_feature_history,
        with_sample_history=with_sample_history,
        logger=logger,
    )

metaxy.metadata_store.types.AccessMode module-attribute

AccessMode = Literal['r', 'w']

metaxy.metadata_store.base.VersioningEngineOptions module-attribute

VersioningEngineOptions: TypeAlias = Literal[
    "auto", "native", "polars"
]

Base Configuration Class

The following base configuration class is typically used by child metadata stores:

metaxy.metadata_store.base.MetadataStoreConfig

Bases: BaseSettings

Base configuration class for metadata stores.

This class defines common configuration fields shared by all metadata store types. Store-specific config classes should inherit from this and add their own fields.

Example
from metaxy.ext.metadata_stores.duckdb import DuckDBMetadataStoreConfig

config = DuckDBMetadataStoreConfig(
    database="metadata.db",
    hash_algorithm=HashAlgorithm.MD5,
)

store = DuckDBMetadataStore.from_config(config)

Configuration

The base MetadataStoreConfig class injects the following configuration options:

fallback_stores

List of fallback store names to search when features are not found in the current store.

Type: list[str]

[stores.dev.config]
# Optional
# fallback_stores = []
[tool.metaxy.stores.dev.config]
# Optional
# fallback_stores = []
export METAXY_STORES__DEV__CONFIG__FALLBACK_STORES=...

hash_algorithm

Hash algorithm for versioning. If None, uses store's default.

Type: metaxy.versioning.types.HashAlgorithm | None

[stores.dev.config]
# Optional
# hash_algorithm = null
[tool.metaxy.stores.dev.config]
# Optional
# hash_algorithm = null
export METAXY_STORES__DEV__CONFIG__HASH_ALGORITHM=...

versioning_engine

Which versioning engine to use: 'auto' (prefer native), 'native', or 'polars'.

Type: Literal['auto', 'native', 'polars'] | Default: "auto"

[stores.dev.config]
versioning_engine = "auto"
[tool.metaxy.stores.dev.config]
versioning_engine = "auto"
export METAXY_STORES__DEV__CONFIG__VERSIONING_ENGINE=auto