Ray
Metaxy has basic integration with Ray to assist with setting up Ray Data jobs.
Ray Environment Setup
It's critically important for Metaxy to resolve correct configuration and feature graph on the Ray worker.
-
ensure
METAXY_CONFIGpoints to the correct Metaxy config file -
configure
worker_process_setup_hookparameter of RuntimeEnv to runmetaxy.initbefore anything else on the Ray worker
Per-task setup
Additionally, RAY_USER_SETUP_FUNCTION can be configured to execute a Python function on every Ray task startup
metaxy.ext.ray
¶
Classes¶
metaxy.ext.ray.MetaxyDatasink
¶
MetaxyDatasink(
feature: CoercibleToFeatureKey,
store: MetadataStore,
config: MetaxyConfig | None = None,
)
Bases: Datasink[_WriteTaskResult]
A Ray Data Datasink for writing to a Metaxy metadata store.
Example
Note
In the future this Datasink will support writing multiple features at once.
Parameters:
-
feature(CoercibleToFeatureKey) –Feature to write metadata for.
-
store(MetadataStore) –Metadata store to write to.
-
config(MetaxyConfig | None, default:None) –Metaxy configuration. Will be auto-discovered by the worker if not provided.
Warning
Ensure the Ray environment is set up properly when not passing
configexplicitly. This can be achieved by settingMETAXY_CONFIGand otherMETAXY_environment variables. The best practice is to passconfigexplicitly to avoid surprises.
Source code in src/metaxy/ext/ray/datasink.py
def __init__(
self,
feature: mx.CoercibleToFeatureKey,
store: mx.MetadataStore,
config: mx.MetaxyConfig | None = None,
):
self.config = mx.init(config)
self.store = store
self.config = config
self._feature_key = mx.coerce_to_feature_key(feature)
# Populated after write completes
self._result: MetaxyWriteResult | None = None
Attributes¶
result
property
¶
Result of the write operation.
Raises:
-
RuntimeError–If accessed before the write operation completes.
Functions¶
write
¶
Write blocks of metadata to the store.
Source code in src/metaxy/ext/ray/datasink.py
def write(
self,
blocks: Iterable[Block],
ctx: TaskContext,
) -> _WriteTaskResult:
"""Write blocks of metadata to the store."""
# Initialize metaxy on the worker - config and features are needed for write
config = mx.init(self.config)
if config.sync:
mx.sync_external_features(self.store)
rows_written = 0
rows_failed = 0
for i, block in enumerate(blocks):
block_accessor = BlockAccessor.for_block(block)
num_rows = block_accessor.num_rows()
try:
with self.store.open("w"):
self.store.write(self._feature_key, block)
rows_written += num_rows
except Exception:
logger.exception(
f"Failed to write {num_rows} metadata rows for feature {self._feature_key.to_string()} block {i} of task {ctx.task_idx} ({ctx.op_name})"
)
rows_failed += num_rows
return _WriteTaskResult(rows_written=rows_written, rows_failed=rows_failed)
on_write_complete
¶
on_write_complete(
write_result: WriteResult[_WriteTaskResult],
) -> None
Aggregate write statistics from all tasks.
Source code in src/metaxy/ext/ray/datasink.py
def on_write_complete(self, write_result: WriteResult[_WriteTaskResult]) -> None:
"""Aggregate write statistics from all tasks."""
rows_written = 0
rows_failed = 0
for task_result in write_result.write_returns:
rows_written += task_result.rows_written
rows_failed += task_result.rows_failed
self._result = MetaxyWriteResult(rows_written=rows_written, rows_failed=rows_failed)
logger.info(
f"MetaxyDatasink write complete for {self._feature_key.to_string()}: "
f"{rows_written} rows written, {rows_failed} rows failed"
)
metaxy.ext.ray.MetaxyDatasource
¶
MetaxyDatasource(
feature: CoercibleToFeatureKey,
store: MetadataStore,
config: MetaxyConfig | None = None,
*,
incremental: bool = False,
feature_version: str | None = None,
filters: Sequence[Expr] | None = None,
columns: Sequence[str] | None = None,
allow_fallback: bool = True,
with_feature_history: bool = False,
with_sample_history: bool = False,
include_soft_deleted: bool = False,
)
Bases: Datasource
A Ray Data Datasource for reading from a Metaxy metadata store.
This datasource reads metadata entries from a Metaxy metadata store as Ray Data blocks, associated with a specific feature key.
Example
with filters and column selection
incremental mode
Parameters:
-
feature(CoercibleToFeatureKey) –Feature to read metadata for.
-
store(MetadataStore) –Metadata store to read from.
-
config(MetaxyConfig | None, default:None) –Metaxy configuration. Will be auto-discovered by the worker if not provided.
Warning
Ensure the Ray environment is set up properly when not passing
configexplicitly. This can be achieved by settingMETAXY_CONFIGand otherMETAXY_environment variables. The best practice is to passconfigexplicitly to avoid any surprises. -
incremental(bool, default:False) –If
True, return only samples that need processing (new and stale). Adds ametaxy_statuscolumn with values:-
"new": samples that have not been processed yet -
"stale": samples that have been processed but have to be reprocessed
-
-
filters(Sequence[Expr] | None, default:None) –Sequence of Narwhals filter expressions to apply.
-
columns(Sequence[str] | None, default:None) –Subset of columns to include. Metaxy's system columns are always included.
-
allow_fallback(bool, default:True) –If
True, check fallback stores on main store miss. -
with_feature_history(bool, default:False) –If
True, only return rows with current feature version. -
feature_version(str | None, default:None) –Explicit feature version to filter by (mutually exclusive with
with_feature_history=False). -
with_sample_history(bool, default:False) –Whether to deduplicate samples within
id_columnsgroups ordered bymetaxy_created_at. -
include_soft_deleted(bool, default:False) –If
True, include soft-deleted rows in the result.
Source code in src/metaxy/ext/ray/datasource.py
def __init__(
self,
feature: mx.CoercibleToFeatureKey,
store: mx.MetadataStore,
config: mx.MetaxyConfig | None = None,
*,
incremental: bool = False,
feature_version: str | None = None,
filters: Sequence[nw.Expr] | None = None,
columns: Sequence[str] | None = None,
allow_fallback: bool = True,
with_feature_history: bool = False,
with_sample_history: bool = False,
include_soft_deleted: bool = False,
):
self.config = mx.init(config)
self.store = store
self.incremental = incremental
self.feature_version = feature_version
self.filters = list(filters) if filters else None
self.columns = list(columns) if columns else None
self.allow_fallback = allow_fallback
self.with_feature_history = with_feature_history
self.with_sample_history = with_sample_history
self.include_soft_deleted = include_soft_deleted
self._feature_key = mx.coerce_to_feature_key(feature)
Functions¶
get_read_tasks
¶
Return read tasks for the feature metadata.
Parameters:
-
parallelism(int) –Requested parallelism level (currently ignored, returns single task).
-
per_task_row_limit(int | None, default:None) –Maximum rows per returned block. If set, the data will be split into multiple blocks of at most this size.
Returns:
-
list[ReadTask]–List containing a single ReadTask that may return multiple blocks.
Source code in src/metaxy/ext/ray/datasource.py
def get_read_tasks(self, parallelism: int, per_task_row_limit: int | None = None) -> list[ReadTask]:
"""Return read tasks for the feature metadata.
Args:
parallelism: Requested parallelism level (currently ignored, returns single task).
per_task_row_limit: Maximum rows per returned block. If set, the data will be
split into multiple blocks of at most this size.
Returns:
List containing a single ReadTask that may return multiple blocks.
"""
num_rows = self._get_row_count()
# Capture self for the closure
datasource = self
row_limit = per_task_row_limit
def read_fn() -> list[pa.Table]:
mx.init(datasource.config)
with datasource.store:
lf = datasource._read_lazy()
table = lf.collect(backend="pyarrow").to_arrow()
batches = table.to_batches(max_chunksize=row_limit)
return [pa.Table.from_batches([b]) for b in batches]
metadata = BlockMetadata(
num_rows=num_rows,
size_bytes=None,
input_files=None,
exec_stats=None,
)
return [ReadTask(read_fn, metadata)]
members: true