Skip to content

Ray

Metaxy has basic integration with Ray to assist with setting up Ray Data jobs.

Ray Environment Setup

It's critically important for Metaxy to resolve correct configuration and feature graph on the Ray worker.

  • ensure METAXY_CONFIG points to the correct Metaxy config file

  • configure worker_process_setup_hook parameter of RuntimeEnv to run metaxy.init before anything else on the Ray worker

Per-task setup

Additionally, RAY_USER_SETUP_FUNCTION can be configured to execute a Python function on every Ray task startup

metaxy.ext.ray

Classes

metaxy.ext.ray.MetaxyDatasink

MetaxyDatasink(
    feature: CoercibleToFeatureKey,
    store: MetadataStore,
    config: MetaxyConfig | None = None,
)

Bases: Datasink[_WriteTaskResult]

A Ray Data Datasink for writing to a Metaxy metadata store.

Example

import metaxy as mx
import ray

cfg = mx.init()
dataset = ...  # a ray.data.Dataset

datasink = MetaxyDatasink(
    feature="my/feature",
    store=cfg.get_store(),
    config=cfg,
)
dataset.write_datasink(datasink)

print(f"Wrote {datasink.result.rows_written} rows, {datasink.result.rows_failed} failed")

Note

In the future this Datasink will support writing multiple features at once.

Parameters:

  • feature (CoercibleToFeatureKey) –

    Feature to write metadata for.

  • store (MetadataStore) –

    Metadata store to write to.

  • config (MetaxyConfig | None, default: None ) –

    Metaxy configuration. Will be auto-discovered by the worker if not provided.

    Warning

    Ensure the Ray environment is set up properly when not passing config explicitly. This can be achieved by setting METAXY_CONFIG and other METAXY_ environment variables. The best practice is to pass config explicitly to avoid surprises.

Source code in src/metaxy/ext/ray/datasink.py
def __init__(
    self,
    feature: mx.CoercibleToFeatureKey,
    store: mx.MetadataStore,
    config: mx.MetaxyConfig | None = None,
):
    self.config = mx.init(config)

    self.store = store
    self.config = config

    self._feature_key = mx.coerce_to_feature_key(feature)

    # Populated after write completes
    self._result: MetaxyWriteResult | None = None
Attributes
result property
result: MetaxyWriteResult

Result of the write operation.

Raises:

  • RuntimeError –

    If accessed before the write operation completes.

Functions
write
write(
    blocks: Iterable[Block], ctx: TaskContext
) -> _WriteTaskResult

Write blocks of metadata to the store.

Source code in src/metaxy/ext/ray/datasink.py
def write(
    self,
    blocks: Iterable[Block],
    ctx: TaskContext,
) -> _WriteTaskResult:
    """Write blocks of metadata to the store."""
    # Initialize metaxy on the worker - config and features are needed for write
    config = mx.init(self.config)
    if config.sync:
        mx.sync_external_features(self.store)

    rows_written = 0
    rows_failed = 0

    for i, block in enumerate(blocks):
        block_accessor = BlockAccessor.for_block(block)
        num_rows = block_accessor.num_rows()

        try:
            with self.store.open("w"):
                self.store.write(self._feature_key, block)
            rows_written += num_rows
        except Exception:
            logger.exception(
                f"Failed to write {num_rows} metadata rows for feature {self._feature_key.to_string()} block {i} of task {ctx.task_idx} ({ctx.op_name})"
            )
            rows_failed += num_rows

    return _WriteTaskResult(rows_written=rows_written, rows_failed=rows_failed)
on_write_complete
on_write_complete(
    write_result: WriteResult[_WriteTaskResult],
) -> None

Aggregate write statistics from all tasks.

Source code in src/metaxy/ext/ray/datasink.py
def on_write_complete(self, write_result: WriteResult[_WriteTaskResult]) -> None:
    """Aggregate write statistics from all tasks."""
    rows_written = 0
    rows_failed = 0

    for task_result in write_result.write_returns:
        rows_written += task_result.rows_written
        rows_failed += task_result.rows_failed

    self._result = MetaxyWriteResult(rows_written=rows_written, rows_failed=rows_failed)

    logger.info(
        f"MetaxyDatasink write complete for {self._feature_key.to_string()}: "
        f"{rows_written} rows written, {rows_failed} rows failed"
    )

metaxy.ext.ray.MetaxyDatasource

MetaxyDatasource(
    feature: CoercibleToFeatureKey,
    store: MetadataStore,
    config: MetaxyConfig | None = None,
    *,
    incremental: bool = False,
    feature_version: str | None = None,
    filters: Sequence[Expr] | None = None,
    columns: Sequence[str] | None = None,
    allow_fallback: bool = True,
    with_feature_history: bool = False,
    with_sample_history: bool = False,
    include_soft_deleted: bool = False,
)

Bases: Datasource

A Ray Data Datasource for reading from a Metaxy metadata store.

This datasource reads metadata entries from a Metaxy metadata store as Ray Data blocks, associated with a specific feature key.

Example

import metaxy as mx
import ray

cfg = mx.init()

ds = ray.data.read_datasource(
    MetaxyDatasource(
        feature="my/feature",
        store=cfg.get_store(),
        config=cfg,
    )
)

with filters and column selection

import narwhals as nw

ds = ray.data.read_datasource(
    MetaxyDatasource(
        feature="my/feature",
        store=cfg.get_store(),
        config=cfg,
        filters=[nw.col("value") > 10],
        columns=["sample_uid", "value"],
    )
)

incremental mode

# Read only samples that need processing
ds = ray.data.read_datasource(
    MetaxyDatasource(
        feature="my/feature",
        store=cfg.get_store(),
        config=cfg,
        incremental=True,
    )
)

Parameters:

  • feature (CoercibleToFeatureKey) –

    Feature to read metadata for.

  • store (MetadataStore) –

    Metadata store to read from.

  • config (MetaxyConfig | None, default: None ) –

    Metaxy configuration. Will be auto-discovered by the worker if not provided.

    Warning

    Ensure the Ray environment is set up properly when not passing config explicitly. This can be achieved by setting METAXY_CONFIG and other METAXY_ environment variables. The best practice is to pass config explicitly to avoid any surprises.

  • incremental (bool, default: False ) –

    If True, return only samples that need processing (new and stale). Adds a metaxy_status column with values:

    • "new": samples that have not been processed yet

    • "stale": samples that have been processed but have to be reprocessed

  • filters (Sequence[Expr] | None, default: None ) –

    Sequence of Narwhals filter expressions to apply.

  • columns (Sequence[str] | None, default: None ) –

    Subset of columns to include. Metaxy's system columns are always included.

  • allow_fallback (bool, default: True ) –

    If True, check fallback stores on main store miss.

  • with_feature_history (bool, default: False ) –

    If True, only return rows with current feature version.

  • feature_version (str | None, default: None ) –

    Explicit feature version to filter by (mutually exclusive with with_feature_history=False).

  • with_sample_history (bool, default: False ) –

    Whether to deduplicate samples within id_columns groups ordered by metaxy_created_at.

  • include_soft_deleted (bool, default: False ) –

    If True, include soft-deleted rows in the result.

Source code in src/metaxy/ext/ray/datasource.py
def __init__(
    self,
    feature: mx.CoercibleToFeatureKey,
    store: mx.MetadataStore,
    config: mx.MetaxyConfig | None = None,
    *,
    incremental: bool = False,
    feature_version: str | None = None,
    filters: Sequence[nw.Expr] | None = None,
    columns: Sequence[str] | None = None,
    allow_fallback: bool = True,
    with_feature_history: bool = False,
    with_sample_history: bool = False,
    include_soft_deleted: bool = False,
):
    self.config = mx.init(config)
    self.store = store
    self.incremental = incremental
    self.feature_version = feature_version
    self.filters = list(filters) if filters else None
    self.columns = list(columns) if columns else None
    self.allow_fallback = allow_fallback
    self.with_feature_history = with_feature_history
    self.with_sample_history = with_sample_history
    self.include_soft_deleted = include_soft_deleted

    self._feature_key = mx.coerce_to_feature_key(feature)
Functions
get_read_tasks
get_read_tasks(
    parallelism: int, per_task_row_limit: int | None = None
) -> list[ReadTask]

Return read tasks for the feature metadata.

Parameters:

  • parallelism (int) –

    Requested parallelism level (currently ignored, returns single task).

  • per_task_row_limit (int | None, default: None ) –

    Maximum rows per returned block. If set, the data will be split into multiple blocks of at most this size.

Returns:

  • list[ReadTask] –

    List containing a single ReadTask that may return multiple blocks.

Source code in src/metaxy/ext/ray/datasource.py
def get_read_tasks(self, parallelism: int, per_task_row_limit: int | None = None) -> list[ReadTask]:
    """Return read tasks for the feature metadata.

    Args:
        parallelism: Requested parallelism level (currently ignored, returns single task).
        per_task_row_limit: Maximum rows per returned block. If set, the data will be
            split into multiple blocks of at most this size.

    Returns:
        List containing a single ReadTask that may return multiple blocks.
    """
    num_rows = self._get_row_count()

    # Capture self for the closure
    datasource = self
    row_limit = per_task_row_limit

    def read_fn() -> list[pa.Table]:
        mx.init(datasource.config)

        with datasource.store:
            lf = datasource._read_lazy()
            table = lf.collect(backend="pyarrow").to_arrow()
            batches = table.to_batches(max_chunksize=row_limit)
            return [pa.Table.from_batches([b]) for b in batches]

    metadata = BlockMetadata(
        num_rows=num_rows,
        size_bytes=None,
        input_files=None,
        exec_stats=None,
    )

    return [ReadTask(read_fn, metadata)]
estimate_inmemory_data_size
estimate_inmemory_data_size() -> int | None

Return an estimate of in-memory data size, or None if unknown.

Source code in src/metaxy/ext/ray/datasource.py
def estimate_inmemory_data_size(self) -> int | None:
    """Return an estimate of in-memory data size, or None if unknown."""
    return None

members: true