Skip to content

Dagster Integration API Reference

Integration docs


metaxy.ext.dagster

Decorators

metaxy.ext.dagster.metaxify.metaxify

metaxify(
    _asset: _T | None = None,
    *,
    key: CoercibleToAssetKey | None = None,
    key_prefix: CoercibleToAssetKeyPrefix | None = None,
    inject_metaxy_kind: bool = True,
    inject_code_version: bool = True,
    set_description: bool = True,
    inject_column_schema: bool = True,
    inject_column_lineage: bool = True,
)

Inject Metaxy metadata into a Dagster AssetsDefinition or AssetSpec.

Affects assets with metaxy/feature metadata set.

Learn more about @metaxify and see example screenshots here.

Parameters:

  • key (CoercibleToAssetKey | None, default: None ) –

    Explicit asset key that overrides all other key resolution logic. Cannot be used with key_prefix or with multi-asset definitions that produce multiple outputs.

  • key_prefix (CoercibleToAssetKeyPrefix | None, default: None ) –

    Prefix to prepend to the resolved asset key. Also applied to upstream dependency keys. Cannot be used with key.

  • inject_metaxy_kind (bool, default: True ) –

    Whether to inject "metaxy" kind into asset kinds.

  • inject_code_version (bool, default: True ) –

    Whether to inject the Metaxy feature code version into the asset's code version. The version is appended in the format metaxy:<version>.

  • set_description (bool, default: True ) –

    Whether to set the asset description from the feature class docstring if the asset doesn't already have a description.

  • inject_column_schema (bool, default: True ) –

    Whether to inject Pydantic field definitions as Dagster column schema. Field types are converted to strings, and field descriptions are used as column descriptions.

  • inject_column_lineage (bool, default: True ) –

    Whether to inject column-level lineage into the asset metadata under dagster/column_lineage. Uses Pydantic model fields to track column provenance via FeatureDep.rename, FeatureDep.lineage, and direct pass-through.

Tip

Multiple Dagster assets can contribute to the same Metaxy feature. This is a perfectly valid setup since Metaxy writes are append-only. In order to do this, set the following metadata keys:

  • "metaxy/feature" pointing to the same Metaxy feature key

  • "metaxy/partition" should be set to a dictionary mapping column names to values produced by the specific Dagster asset

"metaxy/partition" will be used to create Narwhals filter expressions allowing to identify the specific feature slice produced by the Dagster asset.

Example

import dagster as dg
import metaxy as mx
import metaxy.ext.dagster as mxd


@mxd.metaxify()
@dg.asset(
    metadata={"metaxy/feature": "my/feature/key"},
)
def my_asset(store: mx.MetadataStore):
    with store:
        increment = store.resolve_update("my/feature/key")
    ...
With @multi_asset

Multiple Metaxy features can be produced by the same @multi_asset. (1)

  1. Typically, they are produced independently of each other
@mxd.metaxify()
@dg.multi_asset(
    specs=[
        dg.AssetSpec("output_a", metadata={"metaxy/feature": "feature/a"}),
        dg.AssetSpec("output_b", metadata={"metaxy/feature": "feature/b"}),
    ]
)
def my_multi_asset(): ...
With dagster.AssetSpec
asset_spec = dg.AssetSpec(
    key="my_asset",
    metadata={"metaxy/feature": "my/feature/key"},
)
asset_spec = mxd.metaxify()(asset_spec)
Multiple Dagster assets contributing to the same Metaxy feature
@dg.asset(
    metadata={
        "metaxy/feature": "my/feature/key",
        "metaxy/partition": {"dataset": "a"},
    },
)
def my_feature_dataset_a(): ...


@dg.asset(
    metadata={
        "metaxy/feature": "my/feature/key",
        "metaxy/partition": {"dataset": "b"},
    },
)
def my_feature_dataset_b(): ...
Source code in src/metaxy/ext/dagster/metaxify.py
def __init__(
    self,
    _asset: "_T | None" = None,
    *,
    key: CoercibleToAssetKey | None = None,
    key_prefix: CoercibleToAssetKeyPrefix | None = None,
    inject_metaxy_kind: bool = True,
    inject_code_version: bool = True,
    set_description: bool = True,
    inject_column_schema: bool = True,
    inject_column_lineage: bool = True,
) -> None:
    # Actual initialization happens in __new__, but we set defaults here for type checkers
    self.key = dg.AssetKey.from_coercible(key) if key is not None else None
    self.key_prefix = dg.AssetKey.from_coercible(key_prefix) if key_prefix is not None else None
    self.inject_metaxy_kind = inject_metaxy_kind
    self.inject_code_version = inject_code_version
    self.set_description = set_description
    self.inject_column_schema = inject_column_schema
    self.inject_column_lineage = inject_column_lineage

metaxy.ext.dagster.observable.observable_metaxy_asset

observable_metaxy_asset(
    feature: CoercibleToFeatureKey,
    *,
    store_resource_key: str = "store",
    inject_metaxy_kind: bool = True,
    inject_code_version: bool = True,
    set_description: bool = True,
    **observable_kwargs: Any,
)

Decorator to create an observable source asset for a Metaxy feature.

The observation reads the feature's metadata from the store, counts rows, and uses mean(metaxy_created_at) as the data version to track changes. Using mean ensures that both additions and deletions are detected.

The decorated function receives (context, store, lazy_df) and can return a dict of additional metadata to include in the observation.

Parameters:

  • feature (CoercibleToFeatureKey) –

    The Metaxy feature to observe.

  • store_resource_key (str, default: 'store' ) –

    Resource key for the MetadataStore (default: "store").

  • inject_metaxy_kind (bool, default: True ) –

    Whether to inject "metaxy" kind into asset kinds.

  • inject_code_version (bool, default: True ) –

    Whether to inject the Metaxy feature code version.

  • set_description (bool, default: True ) –

    Whether to set description from feature class docstring.

  • **observable_kwargs (Any, default: {} ) –

    Passed to @observable_source_asset (key, group_name, tags, metadata, description, partitions_def, etc.)

Example
import metaxy.ext.dagster as mxd
from myproject.features import ExternalFeature


@mxd.observable_metaxy_asset(feature=ExternalFeature)
def external_data(context, store, lazy_df):
    pass


# With custom metadata - return a dict
@mxd.observable_metaxy_asset(feature=ExternalFeature)
def external_data_with_metrics(context, store, lazy_df):
    # Run aggregations in the database
    total = lazy_df.select(nw.col("value").sum()).collect().item(0, 0)
    return {"custom/total": total}
Note

observable_source_asset does not support deps. Upstream Metaxy feature dependencies from the feature spec are not propagated to the SourceAsset.

Source code in src/metaxy/ext/dagster/observable.py
@public
def observable_metaxy_asset(
    feature: mx.CoercibleToFeatureKey,
    *,
    store_resource_key: str = "store",
    # metaxify kwargs
    inject_metaxy_kind: bool = True,
    inject_code_version: bool = True,
    set_description: bool = True,
    # observable_source_asset kwargs
    **observable_kwargs: Any,
):
    """Decorator to create an observable source asset for a Metaxy feature.

    The observation reads the feature's metadata from the store, counts rows,
    and uses `mean(metaxy_created_at)` as the data version to track changes.
    Using mean ensures that both additions and deletions are detected.

    The decorated function receives `(context, store, lazy_df)` and can return
    a dict of additional metadata to include in the observation.

    Args:
        feature: The Metaxy feature to observe.
        store_resource_key: Resource key for the MetadataStore (default: `"store"`).
        inject_metaxy_kind: Whether to inject `"metaxy"` kind into asset kinds.
        inject_code_version: Whether to inject the Metaxy feature code version.
        set_description: Whether to set description from feature class docstring.
        **observable_kwargs: Passed to `@observable_source_asset`
            (key, group_name, tags, metadata, description, partitions_def, etc.)

    Example:
        ```python
        import metaxy.ext.dagster as mxd
        from myproject.features import ExternalFeature


        @mxd.observable_metaxy_asset(feature=ExternalFeature)
        def external_data(context, store, lazy_df):
            pass


        # With custom metadata - return a dict
        @mxd.observable_metaxy_asset(feature=ExternalFeature)
        def external_data_with_metrics(context, store, lazy_df):
            # Run aggregations in the database
            total = lazy_df.select(nw.col("value").sum()).collect().item(0, 0)
            return {"custom/total": total}
        ```

    Note:
        `observable_source_asset` does not support `deps`. Upstream Metaxy feature
        dependencies from the feature spec are not propagated to the SourceAsset.
    """
    feature_key = mx.coerce_to_feature_key(feature)

    def decorator(fn: Callable[..., Any]) -> dg.SourceAsset:
        # Build an AssetSpec from kwargs and enrich with metaxify
        # Merge user metadata with metaxy/feature
        user_metadata = observable_kwargs.pop("metadata", None) or {}
        spec = dg.AssetSpec(
            key=observable_kwargs.pop("key", None) or fn.__name__,  # ty: ignore[unresolved-attribute]
            group_name=observable_kwargs.pop("group_name", None),
            tags=observable_kwargs.pop("tags", None),
            metadata={
                **user_metadata,
                DAGSTER_METAXY_FEATURE_METADATA_KEY: feature_key.to_string(),
            },
            description=observable_kwargs.pop("description", None),
        )
        enriched = metaxify(
            inject_metaxy_kind=inject_metaxy_kind,
            inject_code_version=inject_code_version,
            set_description=set_description,
        )(spec)

        def _observe(context: dg.AssetExecutionContext) -> dg.ObserveResult:
            store: mx.MetadataStore = getattr(context.resources, store_resource_key)

            # Check for metaxy/partition metadata to apply filtering
            metaxy_partition = enriched.metadata.get(DAGSTER_METAXY_PARTITION_METADATA_KEY)
            filters = build_metaxy_partition_filter(metaxy_partition)

            with store:
                lazy_df = store.read(feature_key, filters=filters)
                stats = compute_stats_from_lazy_frame(lazy_df)

                # Call the user's function - it can return additional metadata
                extra_metadata = fn(context, store, lazy_df) or {}

            metadata: dict[str, Any] = {"dagster/row_count": stats.row_count}
            metadata.update(extra_metadata)

            return dg.ObserveResult(
                data_version=stats.data_version,
                metadata=metadata,
                tags=build_feature_event_tags(feature_key),
            )

        # Apply observable_source_asset decorator
        return dg.observable_source_asset(
            key=enriched.key,
            description=enriched.description,
            group_name=enriched.group_name,
            tags=dict(enriched.tags) if enriched.tags else None,
            metadata=dict(enriched.metadata) if enriched.metadata else None,
            required_resource_keys={store_resource_key},
            **observable_kwargs,
        )(_observe)

    return decorator

IO Manager

metaxy.ext.dagster.MetaxyIOManager

Bases: ConfigurableIOManager

MetaxyIOManager is a Dagster IOManager that reads and writes data to/from Metaxy's MetadataStore.

It automatically attaches Metaxy feature and store metadata to Dagster materialization events and handles partitioned assets.

Always set "metaxy/feature" Dagster metadata

This IOManager is using "metaxy/feature" Dagster metadata key to map Dagster assets into Metaxy features. It expects it to be set on the assets being loaded or materialized.

Example
import dagster as dg


@dg.asset(
    metadata={
        "metaxy/feature": "my/feature/key",
    }
)
def my_asset(): ...

Defining Partitioned Assets

To tell Metaxy which column to use when filtering partitioned assets, set "partition_by" Dagster metadata key.

Example
import dagster as dg


@dg.asset(
    metadata={
        "metaxy/feature": "my/feature/key",
        "partition_by": "date",
    }
)
def my_partitioned_asset(): ...

This key is commonly used to configure partitioning behavior by various Dagster IO managers.

Functions

metaxy.ext.dagster.MetaxyIOManager.load_input
load_input(context: InputContext) -> LazyFrame[Any]

Load feature metadata from MetadataStore.

Reads metadata for the feature specified in the asset's "metaxy/feature" metadata. For partitioned assets, filters to the current partition using the column specified in "partition_by" metadata.

Parameters:

  • context (InputContext) –

    Dagster input context containing asset metadata.

Returns:

  • LazyFrame[Any]

    A narwhals LazyFrame with the feature metadata.

Source code in src/metaxy/ext/dagster/io_manager.py
def load_input(self, context: "dg.InputContext") -> nw.LazyFrame[Any]:
    """Load feature metadata from [`MetadataStore`][metaxy.MetadataStore].

    Reads metadata for the feature specified in the asset's `"metaxy/feature"` metadata.
    For partitioned assets, filters to the current partition using the column specified
    in `"partition_by"` metadata.

    Args:
        context: Dagster input context containing asset metadata.

    Returns:
        A narwhals LazyFrame with the feature metadata.
    """
    with self.metadata_store:
        feature_key = self._feature_key_from_context(context)

        # Build partition filters from context (handles partition_by and metaxy/partition)
        filters = build_partition_filter_from_input_context(context)

        # Read metadata with store info in a single call (avoids extra network round-trip)
        lazy_frame, resolved_store = self.metadata_store.read(
            feature=feature_key,
            filters=filters,
            with_store_info=True,
        )

        # Build input metadata from resolved store
        # metaxy/store shows where data was actually found (may be a fallback store)
        resolved_from = resolved_store.get_store_info(feature_key)
        input_metadata: dict[str, Any] = {
            "name": self.metadata_store.name,
            "metaxy/store": resolved_store.display(),
            "resolved_from": resolved_from,
        }

        # Map resolved store metadata to dagster standard keys
        if "table_name" in resolved_from:
            input_metadata["dagster/table_name"] = resolved_from["table_name"]
        if "uri" in resolved_from:
            input_metadata["dagster/uri"] = dg.MetadataValue.path(resolved_from["uri"])

        # Only add input metadata if we have exactly one partition key
        # (add_input_metadata internally uses asset_partition_key which fails with multiple)
        has_single_partition = context.has_asset_partitions and len(list(context.asset_partition_keys)) == 1
        if input_metadata and (not context.has_asset_partitions or has_single_partition):
            context.add_input_metadata(input_metadata, description="Metadata Store Info")

        return lazy_frame
metaxy.ext.dagster.MetaxyIOManager.handle_output
handle_output(
    context: OutputContext, obj: MetaxyOutput
) -> None

Write feature metadata to MetadataStore.

Writes the output dataframe to the metadata store for the feature specified in the asset's "metaxy/feature" metadata. Also logs metadata about the feature and store to Dagster's materialization events.

If obj is None, only metadata logging is performed (no data is written).

Parameters:

  • context (OutputContext) –

    Dagster output context containing asset metadata.

  • obj (MetaxyOutput) –

    A narwhals-compatible dataframe to write, or None to skip writing.

Source code in src/metaxy/ext/dagster/io_manager.py
def handle_output(self, context: "dg.OutputContext", obj: MetaxyOutput) -> None:
    """Write feature metadata to [`MetadataStore`][metaxy.MetadataStore].

    Writes the output dataframe to the metadata store for the feature specified
    in the asset's `"metaxy/feature"` metadata. Also logs metadata about the
    feature and store to Dagster's materialization events.

    If `obj` is `None`, only metadata logging is performed (no data is written).

    Args:
        context: Dagster output context containing asset metadata.
        obj: A narwhals-compatible dataframe to write, or None to skip writing.
    """
    assert DAGSTER_METAXY_FEATURE_METADATA_KEY in context.definition_metadata, (
        f'Missing `"{DAGSTER_METAXY_FEATURE_METADATA_KEY}"` key in asset metadata'
    )
    key = self._feature_key_from_context(context)
    feature = mx.get_feature_by_key(key)

    if obj is not None:
        context.log.debug(f'Writing metadata for Metaxy feature "{key.to_string()}" into {self.metadata_store}')
        with self.metadata_store.open("w"):
            self.metadata_store.write(feature=feature, df=obj)
        context.log.debug(f'Metadata written for Metaxy feature "{key.to_string()}" into {self.metadata_store}')
    else:
        context.log.debug(
            f'The output corresponds to Metaxy feature "{key.to_string()}" stored in {self.metadata_store}'
        )

    self._log_output_metadata(context)

Dagster Types

metaxy.ext.dagster.dagster_type.feature_to_dagster_type

feature_to_dagster_type(
    feature: CoercibleToFeatureKey,
    *,
    name: str | None = None,
    description: str | None = None,
    inject_column_schema: bool = True,
    inject_column_lineage: bool = True,
    metadata: Mapping[str, Any] | None = None,
) -> DagsterType

Build a Dagster type from a Metaxy feature.

Creates a dagster.DagsterType that validates outputs are MetaxyOutput (i.e., narwhals-compatible dataframes or None) and includes metadata derived from the feature's Pydantic model fields.

Parameters:

  • feature (CoercibleToFeatureKey) –

    The Metaxy feature to create a type for. Can be a feature class, feature key, or string that can be coerced to a feature key.

  • name (str | None, default: None ) –

    Optional custom name for the DagsterType. Defaults to the feature's table name (e.g., "project__feature_name").

  • description (str | None, default: None ) –

    Optional custom description. Defaults to the feature class docstring or a generated description.

  • inject_column_schema (bool, default: True ) –

    Whether to inject the column schema as metadata. The schema is derived from Pydantic model fields.

  • inject_column_lineage (bool, default: True ) –

    Whether to inject column lineage as metadata. The lineage is derived from feature dependencies.

  • metadata (Mapping[str, Any] | None, default: None ) –

    Optional custom metadata to inject into the DagsterType.

Returns:

  • DagsterType

    A DagsterType configured for the Metaxy feature with appropriate

  • DagsterType

    type checking and metadata.

Tip

This is automatically injected by @metaxify

Example
import dagster as dg
import polars as pl
import metaxy.ext.dagster as mxd
from myproject.features import MyFeature  # Your Metaxy feature class


@mxd.metaxify(feature=MyFeature)
@dg.asset(dagster_type=mxd.feature_to_dagster_type(MyFeature))
def my_asset():
    return pl.DataFrame({"id": [1, 2, 3], "value": ["a", "b", "c"]})

See also

  • metaxify: Decorator for injecting Metaxy metadata into Dagster assets.
  • MetaxyOutput: The type alias for valid Metaxy outputs.
Source code in src/metaxy/ext/dagster/dagster_type.py
@public
def feature_to_dagster_type(
    feature: mx.CoercibleToFeatureKey,
    *,
    name: str | None = None,
    description: str | None = None,
    inject_column_schema: bool = True,
    inject_column_lineage: bool = True,
    metadata: Mapping[str, Any] | None = None,
) -> dg.DagsterType:
    """Build a Dagster type from a Metaxy feature.

    Creates a `dagster.DagsterType` that validates outputs are
    [`MetaxyOutput`][metaxy.ext.dagster.MetaxyOutput] (i.e., narwhals-compatible
    dataframes or `None`) and includes metadata derived from the feature's Pydantic
    model fields.

    Args:
        feature: The Metaxy feature to create a type for. Can be a feature class,
            feature key, or string that can be coerced to a feature key.
        name: Optional custom name for the DagsterType. Defaults to the feature's
            table name (e.g., "project__feature_name").
        description: Optional custom description. Defaults to the feature class
            docstring or a generated description.
        inject_column_schema: Whether to inject the column schema as metadata.
            The schema is derived from Pydantic model fields.
        inject_column_lineage: Whether to inject column lineage as metadata.
            The lineage is derived from feature dependencies.
        metadata: Optional custom metadata to inject into the DagsterType.

    Returns:
        A DagsterType configured for the Metaxy feature with appropriate
        type checking and metadata.

    !!! tip
        This is automatically injected by [`@metaxify`][metaxy.ext.dagster.metaxify.metaxify]

    Example:
        ```python
        import dagster as dg
        import polars as pl
        import metaxy.ext.dagster as mxd
        from myproject.features import MyFeature  # Your Metaxy feature class


        @mxd.metaxify(feature=MyFeature)
        @dg.asset(dagster_type=mxd.feature_to_dagster_type(MyFeature))
        def my_asset():
            return pl.DataFrame({"id": [1, 2, 3], "value": ["a", "b", "c"]})
        ```

    !!! info "See also"
        - [`metaxify`][metaxy.ext.dagster.metaxify.metaxify]: Decorator for injecting
          Metaxy metadata into Dagster assets.
        - [`MetaxyOutput`][metaxy.ext.dagster.MetaxyOutput]: The type alias for valid
          Metaxy outputs.
    """
    from metaxy.ext.dagster.io_manager import MetaxyOutput

    feature_key = mx.coerce_to_feature_key(feature)
    feature_def = mx.get_feature_by_key(feature_key)

    # For build_column_schema, prefer the original class if provided
    # (handles cases where class is defined inside a function and can't be imported)
    feature_for_schema: mx.FeatureDefinition | type[mx.BaseFeature]
    if isinstance(feature, type) and issubclass(feature, mx.BaseFeature):
        feature_for_schema = feature
    else:
        feature_for_schema = feature_def

    # Determine name
    type_name = name or feature_key.table_name

    # Determine description - use schema description if available, else default
    if description is None:
        schema_desc = feature_def.feature_schema.get("description")
        if schema_desc:
            description = schema_desc
        else:
            description = f"Metaxy feature '{feature_key.to_string()}'."

    # Build metadata - start with custom metadata if provided
    final_metadata: dict[str, Any] = dict(metadata) if metadata else {}
    final_metadata[DAGSTER_METAXY_INFO_METADATA_KEY] = build_feature_info_metadata(feature_key)
    # Skip column schema for external features (no Python class to extract schema from)
    if inject_column_schema and not feature_def.is_external:
        column_schema = build_column_schema(feature_for_schema)
        if column_schema is not None:
            final_metadata[DAGSTER_COLUMN_SCHEMA_METADATA_KEY] = column_schema

    # Skip column lineage for external features (no Python class to extract columns from)
    if inject_column_lineage and not feature_def.is_external:
        column_lineage = build_column_lineage(feature_for_schema)
        if column_lineage is not None:
            final_metadata[DAGSTER_COLUMN_LINEAGE_METADATA_KEY] = column_lineage

    dagster_type = dg.DagsterType(
        type_check_fn=_create_type_check_fn(feature_key),
        name=type_name,
        description=description,
        typing_type=MetaxyOutput,
        metadata=final_metadata,
    )

    return dagster_type

Dagster Event Generators

metaxy.ext.dagster.utils.generate_materialize_results

generate_materialize_results(
    context: AssetExecutionContext | OpExecutionContext,
    store: MetadataStore | MetaxyStoreFromConfigResource,
    specs: Iterable[AssetSpec] | None = None,
) -> Iterator[MaterializeResult[None]]

Generate dagster.MaterializeResult events for assets in topological order.

Yields a MaterializeResult for each asset spec, sorted by their associated Metaxy features in topological order (dependencies before dependents). Each result includes the row count as "dagster/row_count" metadata.

Parameters:

Yields:

  • MaterializeResult[None]

    Materialization result for each asset in topological order.

Example

Using with @multi_asset:

specs = [
    dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
    dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
]


@metaxify
@dg.multi_asset(specs=specs)
def my_multi_asset(context: dg.AssetExecutionContext, store: mx.MetadataStore):
    # ... compute and write data ...
    yield from generate_materialize_results(context, store)

Using with @op:

specs = [
    dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
]


@dg.op
def my_op(context: dg.OpExecutionContext, store: mx.MetadataStore):
    # ... compute and write data ...
    yield from generate_materialize_results(context, store, specs=specs)

Source code in src/metaxy/ext/dagster/utils.py
@public
def generate_materialize_results(
    context: dg.AssetExecutionContext | dg.OpExecutionContext,
    store: mx.MetadataStore | MetaxyStoreFromConfigResource,
    specs: Iterable[dg.AssetSpec] | None = None,
) -> Iterator[dg.MaterializeResult[None]]:
    """Generate `dagster.MaterializeResult` events for assets in topological order.

    Yields a `MaterializeResult` for each asset spec, sorted by their associated
    Metaxy features in topological order (dependencies before dependents).
    Each result includes the row count as `"dagster/row_count"` metadata.

    Args:
        context: The Dagster execution context.
        store: The Metaxy metadata store to read from.
        specs: Concrete Dagster asset specs. Required when using `OpExecutionContext`.
            Optional for `AssetExecutionContext` (defaults to `context.assets_def.specs`).

    Yields:
        Materialization result for each asset in topological order.

    Example:
        Using with `@multi_asset`:
        ```python
        specs = [
            dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
            dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
        ]


        @metaxify
        @dg.multi_asset(specs=specs)
        def my_multi_asset(context: dg.AssetExecutionContext, store: mx.MetadataStore):
            # ... compute and write data ...
            yield from generate_materialize_results(context, store)
        ```

        Using with `@op`:
        ```python
        specs = [
            dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
        ]


        @dg.op
        def my_op(context: dg.OpExecutionContext, store: mx.MetadataStore):
            # ... compute and write data ...
            yield from generate_materialize_results(context, store, specs=specs)
        ```
    """
    # Build mapping from feature key to asset spec
    spec_by_feature_key: dict[mx.FeatureKey, dg.AssetSpec] = {}
    if specs is None:
        if not isinstance(context, dg.AssetExecutionContext):
            raise ValueError("specs must be provided when using OpExecutionContext")
        specs = context.assets_def.specs
    for spec in specs:
        if feature_key_raw := spec.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY):
            feature_key = mx.coerce_to_feature_key(feature_key_raw)
            spec_by_feature_key[feature_key] = spec

    # Sort by topological order of feature keys
    graph = mx.FeatureGraph.get_active()
    sorted_keys = graph.topological_sort_features(list(spec_by_feature_key.keys()))

    for key in sorted_keys:
        asset_spec = spec_by_feature_key[key]
        partition_col = asset_spec.metadata.get(DAGSTER_METAXY_PARTITION_KEY)
        metaxy_partition = asset_spec.metadata.get(DAGSTER_METAXY_PARTITION_METADATA_KEY)

        with store:  # ty: ignore[invalid-context-manager]
            try:
                # Build runtime metadata (handles reading, filtering, and stats internally)
                metadata, stats = build_runtime_feature_metadata(
                    key,
                    store,
                    context,
                    partition_col=partition_col,
                    metaxy_partition=metaxy_partition,
                )
            except FeatureNotFoundError:
                context.log.exception(f"Feature {key.to_string()} not found in store, skipping materialization result")
                continue

            # Get materialized-in-run count if materialization_id is set
            if store.materialization_id is not None:  # ty: ignore[unresolved-attribute]
                mat_df = store.read(  # ty: ignore[unresolved-attribute]
                    key,
                    filters=[
                        nw.col(METAXY_MATERIALIZATION_ID) == store.materialization_id  # ty: ignore[unresolved-attribute]
                    ],
                )
                metadata["metaxy/materialized_in_run"] = mat_df.select(nw.len()).collect().item(0, 0)

        yield dg.MaterializeResult(
            value=None,
            asset_key=asset_spec.key,
            metadata=metadata,
            data_version=stats.data_version,
            tags=build_feature_event_tags(key),
        )

metaxy.ext.dagster.utils.generate_observe_results

generate_observe_results(
    context: AssetExecutionContext | OpExecutionContext,
    store: MetadataStore | MetaxyStoreFromConfigResource,
    specs: Iterable[AssetSpec] | None = None,
) -> Iterator[ObserveResult]

Generate dagster.ObserveResult events for assets in topological order.

Yields an ObserveResult for each asset spec that has "metaxy/feature" metadata key set, sorted by their associated Metaxy features in topological order. Each result includes the row count as "dagster/row_count" metadata.

Parameters:

Yields:

  • ObserveResult

    Observation result for each asset in topological order.

Example

Using with @multi_observable_source_asset:

specs = [
    dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
    dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
]


@metaxify
@dg.multi_observable_source_asset(specs=specs)
def my_observable_assets(context: dg.AssetExecutionContext, store: mx.MetadataStore):
    yield from generate_observe_results(context, store)

Using with @op:

specs = [
    dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
]


@dg.op
def my_op(context: dg.OpExecutionContext, store: mx.MetadataStore):
    yield from generate_observe_results(context, store, specs=specs)

Source code in src/metaxy/ext/dagster/utils.py
@public
def generate_observe_results(
    context: dg.AssetExecutionContext | dg.OpExecutionContext,
    store: mx.MetadataStore | MetaxyStoreFromConfigResource,
    specs: Iterable[dg.AssetSpec] | None = None,
) -> Iterator[dg.ObserveResult]:
    """Generate `dagster.ObserveResult` events for assets in topological order.

    Yields an `ObserveResult` for each asset spec that has `"metaxy/feature"` metadata key set, sorted by their associated
    Metaxy features in topological order.
    Each result includes the row count as `"dagster/row_count"` metadata.

    Args:
        context: The Dagster execution context.
        store: The Metaxy metadata store to read from.
        specs: Concrete Dagster asset specs. Required when using `OpExecutionContext`.
            Optional for `AssetExecutionContext` (defaults to `context.assets_def.specs`).

    Yields:
        Observation result for each asset in topological order.

    Example:
        Using with `@multi_observable_source_asset`:
        ```python
        specs = [
            dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
            dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
        ]


        @metaxify
        @dg.multi_observable_source_asset(specs=specs)
        def my_observable_assets(context: dg.AssetExecutionContext, store: mx.MetadataStore):
            yield from generate_observe_results(context, store)
        ```

        Using with `@op`:
        ```python
        specs = [
            dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
        ]


        @dg.op
        def my_op(context: dg.OpExecutionContext, store: mx.MetadataStore):
            yield from generate_observe_results(context, store, specs=specs)
        ```
    """
    # Build mapping from feature key to asset spec
    spec_by_feature_key: dict[mx.FeatureKey, dg.AssetSpec] = {}
    if specs is None:
        if not isinstance(context, dg.AssetExecutionContext):
            raise ValueError("specs must be provided when using OpExecutionContext")
        specs = context.assets_def.specs

    for spec in specs:
        if feature_key_raw := spec.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY):
            feature_key = mx.coerce_to_feature_key(feature_key_raw)
            spec_by_feature_key[feature_key] = spec

    # Sort by topological order of feature keys
    graph = mx.FeatureGraph.get_active()
    sorted_keys = graph.topological_sort_features(list(spec_by_feature_key.keys()))

    for key in sorted_keys:
        asset_spec = spec_by_feature_key[key]
        partition_col = asset_spec.metadata.get(DAGSTER_METAXY_PARTITION_KEY)
        metaxy_partition = asset_spec.metadata.get(DAGSTER_METAXY_PARTITION_METADATA_KEY)

        with store:  # ty: ignore[invalid-context-manager]
            try:
                # Build runtime metadata (handles reading, filtering, and stats internally)
                # For observers with no metaxy_partition, this reads all data
                metadata, stats = build_runtime_feature_metadata(
                    key,
                    store,
                    context,
                    partition_col=partition_col,
                    metaxy_partition=metaxy_partition,
                )
            except FeatureNotFoundError:
                context.log.exception(f"Feature {key.to_string()} not found in store, skipping observation result")
                continue

        yield dg.ObserveResult(
            asset_key=asset_spec.key,
            metadata=metadata,
            data_version=stats.data_version,
            tags=build_feature_event_tags(key),
        )

metaxy.ext.dagster.utils.build_feature_info_metadata

build_feature_info_metadata(
    feature: CoercibleToFeatureKey,
) -> dict[str, Any]

Build feature info metadata dict for Dagster assets.

Creates a dictionary with information about the Metaxy feature that can be used as Dagster asset metadata under the "metaxy/feature_info" key.

Parameters:

Returns:

  • dict[str, Any]

    A nested dictionary containing:

  • dict[str, Any]
    • feature: Feature information
    • project: The project name
    • spec: The full feature spec as a dict (via model_dump())
    • version: The feature version string
    • type: The feature class module path
  • dict[str, Any]
    • metaxy: Metaxy library information
    • version: The metaxy library version

Tip

This is automatically injected by @metaxify

Example
from metaxy.ext.dagster.utils import build_feature_info_metadata

info = build_feature_info_metadata(MyFeature)
# {
#     "feature": {
#         "project": "my_project",
#         "spec": {...},  # Full FeatureSpec model_dump()
#         "version": "my__feature@abc123",
#         "type": "myproject.features",
#     },
#     "metaxy": {
#         "version": "0.1.0",
#     },
# }
Source code in src/metaxy/ext/dagster/utils.py
@public
def build_feature_info_metadata(
    feature: mx.CoercibleToFeatureKey,
) -> dict[str, Any]:
    """Build feature info metadata dict for Dagster assets.

    Creates a dictionary with information about the Metaxy feature that can be
    used as Dagster asset metadata under the `"metaxy/feature_info"` key.

    Args:
        feature: The Metaxy feature (class, key, or string).

    Returns:
        A nested dictionary containing:

        - `feature`: Feature information
            - `project`: The project name
            - `spec`: The full feature spec as a dict (via `model_dump()`)
            - `version`: The feature version string
            - `type`: The feature class module path
        - `metaxy`: Metaxy library information
            - `version`: The metaxy library version

    !!! tip
        This is automatically injected by [`@metaxify`][metaxy.ext.dagster.metaxify.metaxify]

    Example:
        ```python
        from metaxy.ext.dagster.utils import build_feature_info_metadata

        info = build_feature_info_metadata(MyFeature)
        # {
        #     "feature": {
        #         "project": "my_project",
        #         "spec": {...},  # Full FeatureSpec model_dump()
        #         "version": "my__feature@abc123",
        #         "type": "myproject.features",
        #     },
        #     "metaxy": {
        #         "version": "0.1.0",
        #     },
        # }
        ```
    """
    feature_key = mx.coerce_to_feature_key(feature)
    feature_def = mx.get_feature_by_key(feature_key)
    feature_version = mx.current_graph().get_feature_version(feature_key)

    return {
        "feature": {
            "project": feature_def.project,
            "spec": feature_def.spec.model_dump(mode="json"),
            "version": feature_version,
            "type": feature_def.feature_class_path,
        },
        "metaxy": {
            "version": mx.__version__,
            "plugins": mx.MetaxyConfig.get().plugins,
        },
    }

Observation Jobs

metaxy.ext.dagster.observation_job.build_metaxy_multi_observation_job

build_metaxy_multi_observation_job(
    name: str,
    *,
    asset_selection: AssetSelection | None = None,
    defs: Definitions | None = None,
    assets: Sequence[
        AssetSpec | AssetsDefinition | SourceAsset
    ]
    | None = None,
    store_resource_key: str = "store",
    tags: Mapping[str, str] | None = None,
    **kwargs: Any,
) -> JobDefinition

Build a dynamic Dagster job that observes multiple Metaxy feature assets.

Creates a job that dynamically spawns one op per asset, yielding AssetObservation events. Uses Dagster's dynamic orchestration to process multiple assets in parallel.

Tip

This is a very powerful way to observe all your Metaxy features at once. Use it in combination with a Dagster schedule to run it periodically.

Provide either: - asset_selection and defs: Select assets from a Definitions object

  • assets: Direct list of assets to observe

Note

All selected assets must share the same partitioning (if any).

Parameters:

  • name (str) –

    Name for the job.

  • asset_selection (AssetSelection | None, default: None ) –

    An AssetSelection specifying which assets to observe. Must be used together with defs.

  • defs (Definitions | None, default: None ) –

    The Definitions object to resolve the selection against. Must be used together with asset_selection.

  • assets (Sequence[AssetSpec | AssetsDefinition | SourceAsset] | None, default: None ) –

    Direct sequence of assets to observe. Each item can be an AssetSpec, AssetsDefinition, or SourceAsset. Cannot be used together with asset_selection/defs.

  • store_resource_key (str, default: 'store' ) –

    Resource key for the MetadataStore (default: "store").

  • tags (Mapping[str, str] | None, default: None ) –

    Optional tags to apply to the job.

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments passed to the @job decorator.

Returns:

  • JobDefinition

    A Dagster job definition that observes all matching Metaxy assets.

Raises:

  • ValueError

    If no specs have metaxy/feature metadata, if assets have inconsistent partitions_def, or if invalid argument combinations are provided.

Example
import dagster as dg
import metaxy.ext.dagster as mxd


@mxd.metaxify()
@dg.asset(metadata={"metaxy/feature": "my/feature_a"})
def feature_a(): ...


@mxd.metaxify()
@dg.asset(metadata={"metaxy/feature": "my/feature_b"})
def feature_b(): ...


# Option 1: Using asset_selection + defs
my_defs = dg.Definitions(assets=[feature_a, feature_b])
observation_job = mxd.build_metaxy_multi_observation_job(
    name="observe_my_features",
    asset_selection=dg.AssetSelection.kind("metaxy"),
    defs=my_defs,
)

# Option 2: Using direct assets list
observation_job = mxd.build_metaxy_multi_observation_job(
    name="observe_my_features",
    assets=[feature_a, feature_b],
)
Source code in src/metaxy/ext/dagster/observation_job.py
@public
def build_metaxy_multi_observation_job(
    name: str,
    *,
    asset_selection: dg.AssetSelection | None = None,
    defs: dg.Definitions | None = None,
    assets: Sequence[dg.AssetSpec | dg.AssetsDefinition | dg.SourceAsset] | None = None,
    store_resource_key: str = "store",
    tags: Mapping[str, str] | None = None,
    **kwargs: Any,
) -> dg.JobDefinition:
    """Build a dynamic Dagster job that observes multiple Metaxy feature assets.

    Creates a job that dynamically spawns one op per asset, yielding
    [`AssetObservation`](https://docs.dagster.io/api/python-api/ops#dagster.AssetObservation) events.
    Uses Dagster's dynamic orchestration to process multiple assets in parallel.

    !!! tip
        This is a very powerful way to observe all your Metaxy features at once.
        Use it in combination with a [Dagster schedule](https://docs.dagster.io/concepts/schedules)
        to run it periodically.

    Provide either:
    - `asset_selection` and `defs`: Select assets from a
      [`Definitions`](https://docs.dagster.io/api/python-api/definitions#dagster.Definitions) object

    - `assets`: Direct list of assets to observe

    !!! note
        All selected assets must share the same partitioning (if any).

    Args:
        name: Name for the job.
        asset_selection: An `AssetSelection` specifying which assets to observe.
            Must be used together with `defs`.
        defs: The `Definitions` object to resolve the selection against.
            Must be used together with `asset_selection`.
        assets: Direct sequence of assets to observe. Each item can be an
            `AssetSpec`, `AssetsDefinition`, or `SourceAsset`.
            Cannot be used together with `asset_selection`/`defs`.
        store_resource_key: Resource key for the MetadataStore (default: `"store"`).
        tags: Optional tags to apply to the job.
        **kwargs: Additional keyword arguments passed to the
            [`@job`](https://docs.dagster.io/api/python-api/jobs#dagster.job) decorator.

    Returns:
        A Dagster job definition that observes all matching Metaxy assets.

    Raises:
        ValueError: If no specs have `metaxy/feature` metadata, if assets have
            inconsistent `partitions_def`, or if invalid argument combinations
            are provided.

    Example:
        ```python
        import dagster as dg
        import metaxy.ext.dagster as mxd


        @mxd.metaxify()
        @dg.asset(metadata={"metaxy/feature": "my/feature_a"})
        def feature_a(): ...


        @mxd.metaxify()
        @dg.asset(metadata={"metaxy/feature": "my/feature_b"})
        def feature_b(): ...


        # Option 1: Using asset_selection + defs
        my_defs = dg.Definitions(assets=[feature_a, feature_b])
        observation_job = mxd.build_metaxy_multi_observation_job(
            name="observe_my_features",
            asset_selection=dg.AssetSelection.kind("metaxy"),
            defs=my_defs,
        )

        # Option 2: Using direct assets list
        observation_job = mxd.build_metaxy_multi_observation_job(
            name="observe_my_features",
            assets=[feature_a, feature_b],
        )
        ```
    """
    tags = tags or {}

    # Validate argument combinations
    has_selection = asset_selection is not None or defs is not None
    has_assets = assets is not None

    if has_selection and has_assets:
        raise ValueError(
            "Cannot provide both 'assets' and 'asset_selection'/'defs'. "
            "Use either asset_selection + defs, or assets alone."
        )

    if not has_selection and not has_assets:
        raise ValueError("Must provide either 'asset_selection' + 'defs', or 'assets'.")

    if has_selection:
        if asset_selection is None:
            raise ValueError("'defs' requires 'asset_selection' to be provided.")
        if defs is None:
            raise ValueError("'asset_selection' requires 'defs' to be provided.")

        # Resolve selection using defs
        all_assets_defs = list(defs.resolve_asset_graph().assets_defs)
        selected_keys = asset_selection.resolve(all_assets_defs)

        # Get specs for selected keys, with partitions_def
        metaxy_specs: list[dg.AssetSpec] = []
        partitions_defs: list[dg.PartitionsDefinition | None] = []

        for asset_def in all_assets_defs:
            for spec in asset_def.specs:
                if spec.key in selected_keys:
                    if spec.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY) is not None:
                        metaxy_specs.append(spec)
                        partitions_defs.append(asset_def.partitions_def)
    else:
        # Direct assets list
        assert assets is not None
        metaxy_specs = []
        partitions_defs = []

        for asset in assets:
            if isinstance(asset, dg.AssetSpec):
                if asset.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY) is not None:
                    metaxy_specs.append(asset)
                    partitions_defs.append(asset.partitions_def)
            elif isinstance(asset, dg.AssetsDefinition):
                for spec in asset.specs:
                    if spec.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY) is not None:
                        metaxy_specs.append(spec)
                        partitions_defs.append(asset.partitions_def)
            elif isinstance(asset, dg.SourceAsset):
                # SourceAsset doesn't have metaxy/feature metadata typically
                pass
            else:
                raise TypeError(f"Expected AssetSpec, AssetsDefinition, or SourceAsset, got {type(asset).__name__}")

    if not metaxy_specs:
        raise ValueError(
            "No assets have specs with 'metaxy/feature' metadata. "
            "Ensure your assets have metadata={'metaxy/feature': 'feature/key'}."
        )

    # Validate all specs have the same partitions_def
    first_partitions_def = partitions_defs[0]
    for i, pdef in enumerate(partitions_defs[1:], start=1):
        if pdef != first_partitions_def:
            raise ValueError(
                f"All assets must have the same partitions_def. "
                f"Asset 0 has {first_partitions_def}, but asset {i} has {pdef}."
            )
    partitions_def = first_partitions_def

    # Build feature keys for description (may have duplicates when multiple assets share a feature)
    feature_keys = [
        mx.coerce_to_feature_key(spec.metadata[DAGSTER_METAXY_FEATURE_METADATA_KEY]) for spec in metaxy_specs
    ]

    # Build a mapping of asset key -> spec for the dynamic op
    # This ensures each asset gets its own op, even if multiple assets share the same feature
    spec_by_asset_key = {spec.key.to_user_string(): spec for spec in metaxy_specs}
    all_asset_keys = list(spec_by_asset_key.keys())

    # Config class for runtime filtering of assets to observe
    class _ObserveAssetsConfig(dg.Config):
        asset_keys: list[str] = all_asset_keys

    # Op that emits dynamic outputs for each asset, optionally filtered by config
    @dg.op(
        name=f"{name}_fanout",
        out=dg.DynamicOut(str),
        config_schema=_ObserveAssetsConfig.to_config_schema(),
    )
    def fanout_assets(context: dg.OpExecutionContext) -> Any:
        config = _ObserveAssetsConfig.model_validate(context.op_config)

        # Validate that requested asset keys exist
        requested_keys = set(config.asset_keys)
        available_keys = set(spec_by_asset_key.keys())
        invalid_keys = requested_keys - available_keys
        if invalid_keys:
            raise ValueError(
                f"Requested asset keys not found in job: {sorted(invalid_keys)}. "
                f"Available keys: {sorted(available_keys)}"
            )
        asset_keys_to_observe = [k for k in spec_by_asset_key if k in requested_keys]

        for asset_key_str in asset_keys_to_observe:
            # Use asset key (with / replaced by __) as mapping key for Dagster identifiers
            safe_mapping_key = asset_key_str.replace("/", "__")
            yield dg.DynamicOutput(asset_key_str, mapping_key=safe_mapping_key)

    # Build the shared observation op
    observe_op = _build_observation_op_for_specs(
        name=f"{name}_observe",
        spec_by_asset_key=spec_by_asset_key,
        store_resource_key=store_resource_key,
    )

    # Build job metadata with asset references
    job_metadata: dict[str, Any] = {
        "metaxy/features": [fk.to_string() for fk in feature_keys],
    }
    for spec in metaxy_specs:
        job_metadata[f"metaxy/asset/{spec.key.to_user_string()}"] = dg.MetadataValue.asset(spec.key)

    # Build description as markdown list showing both assets and features
    asset_list = "\n".join(
        f"- `{spec.key.to_user_string()}` → `{spec.metadata[DAGSTER_METAXY_FEATURE_METADATA_KEY]}`"
        for spec in metaxy_specs
    )
    description = f"Observe {len(metaxy_specs)} Metaxy assets:\n\n{asset_list}"

    @dg.job(
        name=name,
        partitions_def=partitions_def,
        tags=tags,
        description=description,
        metadata=job_metadata,
        **kwargs,
    )
    def observation_job() -> None:
        asset_keys_dynamic = fanout_assets()
        asset_keys_dynamic.map(observe_op)

    return observation_job

metaxy.ext.dagster.observation_job.build_metaxy_observation_job

build_metaxy_observation_job(
    asset: AssetSpec | AssetsDefinition,
    *,
    store_resource_key: str = "store",
    tags: dict[str, str] | None = None,
) -> list[JobDefinition]

Build Dagster job(s) that observe Metaxy feature asset(s).

Creates job(s) that yield AssetObservation events for the given asset. The job can be run independently from asset materialization, e.g., on a schedule.

Returns one job per metaxy/feature spec found in the asset.

Jobs are constructed with matching partitions definitions. Job names are always derived as observe_<FeatureKey.table_name()>.

Parameters:

  • asset (AssetSpec | AssetsDefinition) –

    Asset spec or asset definition to observe. Must have metaxy/feature metadata on at least one spec.

  • store_resource_key (str, default: 'store' ) –

    Resource key for the MetadataStore (default: "store").

  • tags (dict[str, str] | None, default: None ) –

    Optional tags to apply to the job(s).

Returns:

  • list[JobDefinition]

    List of Dagster job definitions, one per metaxy/feature spec.

Raises:

  • ValueError

    If no specs have metaxy/feature metadata.

Example
import dagster as dg
import metaxy.ext.dagster as mxd


@mxd.metaxify()
@dg.asset(metadata={"metaxy/feature": "my/feature"})
def my_asset(): ...


# Build the observation job - partitions_def is extracted automatically
observation_job = mxd.build_metaxy_observation_job(my_asset)

# Include in your Definitions
defs = dg.Definitions(
    jobs=[observation_job],
    resources={"store": my_store_resource},
)
Source code in src/metaxy/ext/dagster/observation_job.py
@public
def build_metaxy_observation_job(
    asset: dg.AssetSpec | dg.AssetsDefinition,
    *,
    store_resource_key: str = "store",
    tags: dict[str, str] | None = None,
) -> list[dg.JobDefinition]:
    """Build Dagster job(s) that observe Metaxy feature asset(s).

    Creates job(s) that yield `AssetObservation` events for the given asset.
    The job can be run independently from asset materialization, e.g., on a schedule.

    Returns one job per `metaxy/feature` spec found in the asset.

    Jobs are constructed with matching partitions definitions.
    Job names are always derived as `observe_<FeatureKey.table_name()>`.

    Args:
        asset: Asset spec or asset definition to observe. Must have `metaxy/feature`
            metadata on at least one spec.
        store_resource_key: Resource key for the MetadataStore (default: `"store"`).
        tags: Optional tags to apply to the job(s).

    Returns:
        List of Dagster job definitions, one per `metaxy/feature` spec.

    Raises:
        ValueError: If no specs have `metaxy/feature` metadata.

    Example:
        ```python
        import dagster as dg
        import metaxy.ext.dagster as mxd


        @mxd.metaxify()
        @dg.asset(metadata={"metaxy/feature": "my/feature"})
        def my_asset(): ...


        # Build the observation job - partitions_def is extracted automatically
        observation_job = mxd.build_metaxy_observation_job(my_asset)

        # Include in your Definitions
        defs = dg.Definitions(
            jobs=[observation_job],
            resources={"store": my_store_resource},
        )
        ```
    """
    # Extract specs and partitions_def from asset
    if isinstance(asset, dg.AssetSpec):
        specs = [asset]
        partitions_def = None
    elif isinstance(asset, dg.AssetsDefinition):
        specs = list(asset.specs)
        partitions_def = asset.partitions_def
    else:
        raise TypeError(f"Expected AssetSpec or AssetsDefinition, got {type(asset).__name__}")

    # Filter to specs with metaxy/feature metadata
    metaxy_specs = [spec for spec in specs if spec.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY) is not None]

    if not metaxy_specs:
        raise ValueError(
            "Asset has no specs with 'metaxy/feature' metadata. "
            "Ensure your asset has metadata={'metaxy/feature': 'feature/key'}."
        )

    # Build jobs for each metaxy spec
    jobs = [
        _build_observation_job_for_spec(
            spec,
            partitions_def=partitions_def,
            store_resource_key=store_resource_key,
            tags=tags,
        )
        for spec in metaxy_specs
    ]

    return jobs

Resources

metaxy.ext.dagster.MetaxyStoreFromConfigResource pydantic-model

Bases: ConfigurableResource[MetadataStore]

This resource creates a metaxy.MetadataStore based on the current Metaxy configuration (metaxy.toml).

If name is not provided, the default store will be used. The default store name can be set with store = "my_name" in metaxy.toml or with$METAXY_STORE environment variable.

Show JSON schema:
{
  "description": "This resource creates a [`metaxy.MetadataStore`](https://docs.metaxy.io/latest/guide/concepts/metadata-stores/) based on the current Metaxy configuration (`metaxy.toml`).\n\nIf `name` is not provided, the default store will be used.\nThe default store name can be set with `store = \"my_name\"` in `metaxy.toml` or with` $METAXY_STORE` environment variable.",
  "properties": {
    "name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "title": "Name"
    }
  },
  "title": "MetaxyStoreFromConfigResource",
  "type": "object"
}

Functions

metaxy.ext.dagster.MetaxyStoreFromConfigResource.create_resource
create_resource(
    context: InitResourceContext,
) -> MetadataStore

Create a MetadataStore from the Metaxy configuration.

Parameters:

Returns:

  • MetadataStore

    A MetadataStore configured with the Dagster run ID as the materialization ID.

Source code in src/metaxy/ext/dagster/resources.py
def create_resource(self, context: dg.InitResourceContext) -> mx.MetadataStore:
    """Create a MetadataStore from the Metaxy configuration.

    Args:
        context: Dagster resource initialization context.

    Returns:
        A MetadataStore configured with the Dagster run ID as the materialization ID.
    """
    assert context.run is not None
    return mx.MetaxyConfig.get().get_store(self.name, materialization_id=context.run.run_id)

Helpers

metaxy.ext.dagster.utils.FeatureStats

Bases: NamedTuple

Statistics about a feature's metadata for Dagster events.

metaxy.ext.dagster.selection.select_metaxy_assets

select_metaxy_assets(
    *,
    project: str | None = None,
    feature: CoercibleToFeatureKey | None = None,
) -> AssetSelection

Select Metaxy assets by project and/or feature.

This helper creates an AssetSelection that filters assets tagged by @metaxify.

Parameters:

  • project (str | None, default: None ) –

    Filter by project name. If None, uses MetaxyConfig.get().project.

  • feature (CoercibleToFeatureKey | None, default: None ) –

    Filter by specific feature key. If provided, further narrows the selection.

Returns:

  • AssetSelection

    An AssetSelection that can be used with dg.define_asset_job,

  • AssetSelection

    dg.materialize, or AssetSelection operations like | and &.

Select all Metaxy assets in current project
import metaxy.ext.dagster as mxd

all_metaxy = mxd.select_metaxy_assets()
Select assets for a specific project
prod_assets = mxd.select_metaxy_assets(project="production")
Select a specific feature's assets
feature_assets = mxd.select_metaxy_assets(feature="my/feature/key")
Use with asset jobs
metaxy_job = dg.define_asset_job(
    name="materialize_metaxy",
    selection=mxd.select_metaxy_assets(),
)
Combine with other selections
# All metaxy assets plus some other assets
combined = mxd.select_metaxy_assets() | dg.AssetSelection.keys("other_asset")

# Metaxy assets that are also in a specific group
filtered = mxd.select_metaxy_assets() & dg.AssetSelection.groups("my_group")
Source code in src/metaxy/ext/dagster/selection.py
@public
def select_metaxy_assets(
    *,
    project: str | None = None,
    feature: mx.CoercibleToFeatureKey | None = None,
) -> dg.AssetSelection:
    """Select Metaxy assets by project and/or feature.

    This helper creates an `AssetSelection` that filters assets tagged by `@metaxify`.

    Args:
        project: Filter by project name. If None, uses `MetaxyConfig.get().project`.
        feature: Filter by specific feature key. If provided, further narrows the selection.

    Returns:
        An `AssetSelection` that can be used with `dg.define_asset_job`,
        `dg.materialize`, or `AssetSelection` operations like `|` and `&`.

    Example: Select all Metaxy assets in current project
        ```python
        import metaxy.ext.dagster as mxd

        all_metaxy = mxd.select_metaxy_assets()
        ```

    Example: Select assets for a specific project
        ```python
        prod_assets = mxd.select_metaxy_assets(project="production")
        ```

    Example: Select a specific feature's assets
        ```python
        feature_assets = mxd.select_metaxy_assets(feature="my/feature/key")
        ```

    Example: Use with asset jobs
        ```python
        metaxy_job = dg.define_asset_job(
            name="materialize_metaxy",
            selection=mxd.select_metaxy_assets(),
        )
        ```

    Example: Combine with other selections
        ```python
        # All metaxy assets plus some other assets
        combined = mxd.select_metaxy_assets() | dg.AssetSelection.keys("other_asset")

        # Metaxy assets that are also in a specific group
        filtered = mxd.select_metaxy_assets() & dg.AssetSelection.groups("my_group")
        ```
    """
    resolved_project = project if project is not None else mx.MetaxyConfig.get().project
    if resolved_project is None:
        raise ValueError("project must be specified or configured in MetaxyConfig")

    selection = dg.AssetSelection.tag(DAGSTER_METAXY_PROJECT_TAG_KEY, resolved_project)

    if feature is not None:
        feature_key = mx.coerce_to_feature_key(feature)
        selection = selection & dg.AssetSelection.tag(DAGSTER_METAXY_FEATURE_METADATA_KEY, str(feature_key))

    return selection

Types

metaxy.ext.dagster.MetaxyOutput module-attribute

MetaxyOutput = IntoFrame | None

Constants

metaxy.ext.dagster.DAGSTER_METAXY_FEATURE_METADATA_KEY module-attribute

DAGSTER_METAXY_FEATURE_METADATA_KEY = 'metaxy/feature'

metaxy.ext.dagster.DAGSTER_METAXY_KIND module-attribute

DAGSTER_METAXY_KIND = 'metaxy'

metaxy.ext.dagster.DAGSTER_METAXY_INFO_METADATA_KEY module-attribute

DAGSTER_METAXY_INFO_METADATA_KEY = 'metaxy/info'

metaxy.ext.dagster.DAGSTER_METAXY_PARTITION_KEY module-attribute

DAGSTER_METAXY_PARTITION_KEY = 'partition_by'