Skip to content

Ray

Metaxy can integrate with Ray for distributed computing workloads.

metaxy.ext.ray

Classes

metaxy.ext.ray.MetaxyDatasink

MetaxyDatasink(
    feature: CoercibleToFeatureKey,
    store: MetadataStore,
    config: MetaxyConfig | None = None,
)

Bases: Datasink[_WriteTaskResult]

A Ray Data Datasink for writing to a Metaxy metadata store.

Example

import metaxy as mx
import ray

cfg = mx.init_metaxy()
dataset = ...  # a ray.data.Dataset

datasink = MetaxyDatasink(
    feature="my/feature",
    store=cfg.get_store(),
    config=cfg,
)
dataset.write_datasink(datasink)

print(f"Wrote {datasink.result.rows_written} rows, {datasink.result.rows_failed} failed")

Note

In the future this Datasink will support writing multiple features at once.

Parameters:

  • feature (CoercibleToFeatureKey) –

    Feature to write metadata for.

  • store (MetadataStore) –

    Metadata store to write to.

  • config (MetaxyConfig | None, default: None ) –

    Metaxy configuration. Will be auto-discovered by the worker if not provided.

    Warning

    Ensure the Ray environment is set up properly when not passing config explicitly. This can be achieved by setting METAXY_CONFIG and other METAXY_ environment variables. The best practice is to pass config explicitly to avoid surprises.

Source code in src/metaxy/ext/ray/datasink.py
def __init__(
    self,
    feature: mx.CoercibleToFeatureKey,
    store: mx.MetadataStore,
    config: mx.MetaxyConfig | None = None,
):
    self.config = mx.init_metaxy(config)

    self.store = store
    self.config = config

    self._feature_key = mx.coerce_to_feature_key(feature)

    # Populated after write completes
    self._result: MetaxyWriteResult | None = None
Attributes
result property
result: MetaxyWriteResult

Result of the write operation.

Raises:

  • RuntimeError

    If accessed before the write operation completes.

Functions
write
write(
    blocks: Iterable[Block], ctx: TaskContext
) -> _WriteTaskResult

Write blocks of metadata to the store.

Source code in src/metaxy/ext/ray/datasink.py
def write(
    self,
    blocks: Iterable[Block],
    ctx: TaskContext,
) -> _WriteTaskResult:
    """Write blocks of metadata to the store."""
    # Initialize metaxy on the worker - config and features are needed for write_metadata
    config = mx.init_metaxy(self.config)
    if config.sync:
        mx.sync_external_features(self.store)

    rows_written = 0
    rows_failed = 0

    for i, block in enumerate(blocks):
        block_accessor = BlockAccessor.for_block(block)
        num_rows = block_accessor.num_rows()

        try:
            with self.store.open("write"):
                self.store.write_metadata(self._feature_key, block)
            rows_written += num_rows
        except Exception:
            logger.exception(
                f"Failed to write {num_rows} metadata rows for feature {self._feature_key.to_string()} block {i} of task {ctx.task_idx} ({ctx.op_name})"
            )
            rows_failed += num_rows

    return _WriteTaskResult(rows_written=rows_written, rows_failed=rows_failed)
on_write_complete
on_write_complete(
    write_result: WriteResult[_WriteTaskResult],
) -> None

Aggregate write statistics from all tasks.

Source code in src/metaxy/ext/ray/datasink.py
def on_write_complete(self, write_result: WriteResult[_WriteTaskResult]) -> None:
    """Aggregate write statistics from all tasks."""
    rows_written = 0
    rows_failed = 0

    for task_result in write_result.write_returns:
        rows_written += task_result.rows_written
        rows_failed += task_result.rows_failed

    self._result = MetaxyWriteResult(rows_written=rows_written, rows_failed=rows_failed)

    logger.info(
        f"MetaxyDatasink write complete for {self._feature_key.to_string()}: "
        f"{rows_written} rows written, {rows_failed} rows failed"
    )

metaxy.ext.ray.MetaxyDatasource

MetaxyDatasource(
    feature: CoercibleToFeatureKey,
    store: MetadataStore,
    config: MetaxyConfig | None = None,
    *,
    incremental: bool = False,
    feature_version: str | None = None,
    filters: Sequence[Expr] | None = None,
    columns: Sequence[str] | None = None,
    allow_fallback: bool = True,
    current_only: bool = True,
    latest_only: bool = True,
    include_soft_deleted: bool = False,
)

Bases: Datasource

A Ray Data Datasource for reading from a Metaxy metadata store.

This datasource reads metadata entries from a Metaxy metadata store as Ray Data blocks, associated with a specific feature key.

Example

import metaxy as mx
import ray

cfg = mx.init_metaxy()

ds = ray.data.read_datasource(
    MetaxyDatasource(
        feature="my/feature",
        store=cfg.get_store(),
        config=cfg,
    )
)

with filters and column selection

import narwhals as nw

ds = ray.data.read_datasource(
    MetaxyDatasource(
        feature="my/feature",
        store=cfg.get_store(),
        config=cfg,
        filters=[nw.col("value") > 10],
        columns=["sample_uid", "value"],
    )
)

incremental mode

# Read only samples that need processing
ds = ray.data.read_datasource(
    MetaxyDatasource(
        feature="my/feature",
        store=cfg.get_store(),
        config=cfg,
        incremental=True,
    )
)

Parameters:

  • feature (CoercibleToFeatureKey) –

    Feature to read metadata for.

  • store (MetadataStore) –

    Metadata store to read from.

  • config (MetaxyConfig | None, default: None ) –

    Metaxy configuration. Will be auto-discovered by the worker if not provided.

    Warning

    Ensure the Ray environment is set up properly when not passing config explicitly. This can be achieved by setting METAXY_CONFIG and other METAXY_ environment variables. The best practice is to pass config explicitly to avoid any surprises.

  • incremental (bool, default: False ) –

    If True, return only samples that need processing (new and stale). Adds a metaxy_status column with values:

    • "new": samples that have not been processed yet

    • "stale": samples that have been processed but have to be reprocessed

  • filters (Sequence[Expr] | None, default: None ) –

    Sequence of Narwhals filter expressions to apply.

  • columns (Sequence[str] | None, default: None ) –

    Subset of columns to include. Metaxy's system columns are always included.

  • allow_fallback (bool, default: True ) –

    If True, check fallback stores on main store miss.

  • current_only (bool, default: True ) –

    If True, only return rows with current feature version.

  • feature_version (str | None, default: None ) –

    Explicit feature version to filter by (mutually exclusive with current_only=True).

  • latest_only (bool, default: True ) –

    Whether to deduplicate samples within id_columns groups ordered by metaxy_created_at.

  • include_soft_deleted (bool, default: False ) –

    If True, include soft-deleted rows in the result.

Source code in src/metaxy/ext/ray/datasource.py
def __init__(
    self,
    feature: mx.CoercibleToFeatureKey,
    store: mx.MetadataStore,
    config: mx.MetaxyConfig | None = None,
    *,
    incremental: bool = False,
    feature_version: str | None = None,
    filters: Sequence[nw.Expr] | None = None,
    columns: Sequence[str] | None = None,
    allow_fallback: bool = True,
    current_only: bool = True,
    latest_only: bool = True,
    include_soft_deleted: bool = False,
):
    self.config = mx.init_metaxy(config)
    self.store = store
    self.incremental = incremental
    self.feature_version = feature_version
    self.filters = list(filters) if filters else None
    self.columns = list(columns) if columns else None
    self.allow_fallback = allow_fallback
    self.current_only = current_only
    self.latest_only = latest_only
    self.include_soft_deleted = include_soft_deleted

    self._feature_key = mx.coerce_to_feature_key(feature)
Functions
get_read_tasks
get_read_tasks(
    parallelism: int, per_task_row_limit: int | None = None
) -> list[ReadTask]

Return read tasks for the feature metadata.

Parameters:

  • parallelism (int) –

    Requested parallelism level (currently ignored, returns single task).

  • per_task_row_limit (int | None, default: None ) –

    Maximum rows per returned block. If set, the data will be split into multiple blocks of at most this size.

Returns:

  • list[ReadTask]

    List containing a single ReadTask that may return multiple blocks.

Source code in src/metaxy/ext/ray/datasource.py
def get_read_tasks(self, parallelism: int, per_task_row_limit: int | None = None) -> list[ReadTask]:
    """Return read tasks for the feature metadata.

    Args:
        parallelism: Requested parallelism level (currently ignored, returns single task).
        per_task_row_limit: Maximum rows per returned block. If set, the data will be
            split into multiple blocks of at most this size.

    Returns:
        List containing a single ReadTask that may return multiple blocks.
    """
    num_rows = self._get_row_count()

    # Capture self for the closure
    datasource = self
    row_limit = per_task_row_limit

    def read_fn() -> list[pa.Table]:
        mx.init_metaxy(datasource.config)

        with datasource.store.open("read"):
            lf = datasource._read_metadata_lazy()
            table = lf.collect(backend="pyarrow").to_arrow()
            batches = table.to_batches(max_chunksize=row_limit)
            return [pa.Table.from_batches([b]) for b in batches]

    metadata = BlockMetadata(
        num_rows=num_rows,
        size_bytes=None,
        input_files=None,
        exec_stats=None,
    )

    return [ReadTask(read_fn, metadata)]
estimate_inmemory_data_size
estimate_inmemory_data_size() -> int | None

Return an estimate of in-memory data size, or None if unknown.

Source code in src/metaxy/ext/ray/datasource.py
def estimate_inmemory_data_size(self) -> int | None:
    """Return an estimate of in-memory data size, or None if unknown."""
    return None

members: true