Ray
Metaxy can integrate with Ray for distributed computing workloads.
metaxy.ext.ray
¶
Classes¶
metaxy.ext.ray.MetaxyDatasink
¶
MetaxyDatasink(
feature: CoercibleToFeatureKey,
store: MetadataStore,
config: MetaxyConfig | None = None,
)
Bases: Datasink[_WriteTaskResult]
A Ray Data Datasink for writing to a Metaxy metadata store.
Example
Note
In the future this Datasink will support writing multiple features at once.
Parameters:
-
feature(CoercibleToFeatureKey) –Feature to write metadata for.
-
store(MetadataStore) –Metadata store to write to.
-
config(MetaxyConfig | None, default:None) –Metaxy configuration. Will be auto-discovered by the worker if not provided.
Warning
Ensure the Ray environment is set up properly when not passing
configexplicitly. This can be achieved by settingMETAXY_CONFIGand otherMETAXY_environment variables. The best practice is to passconfigexplicitly to avoid surprises.
Source code in src/metaxy/ext/ray/datasink.py
def __init__(
self,
feature: mx.CoercibleToFeatureKey,
store: mx.MetadataStore,
config: mx.MetaxyConfig | None = None,
):
self.config = mx.init_metaxy(config)
self.store = store
self.config = config
self._feature_key = mx.coerce_to_feature_key(feature)
# Populated after write completes
self._result: MetaxyWriteResult | None = None
Attributes¶
result
property
¶
Result of the write operation.
Raises:
-
RuntimeError–If accessed before the write operation completes.
Functions¶
write
¶
Write blocks of metadata to the store.
Source code in src/metaxy/ext/ray/datasink.py
def write(
self,
blocks: Iterable[Block],
ctx: TaskContext,
) -> _WriteTaskResult:
"""Write blocks of metadata to the store."""
# Initialize metaxy on the worker - config and features are needed for write_metadata
config = mx.init_metaxy(self.config)
if config.sync:
mx.sync_external_features(self.store)
rows_written = 0
rows_failed = 0
for i, block in enumerate(blocks):
block_accessor = BlockAccessor.for_block(block)
num_rows = block_accessor.num_rows()
try:
with self.store.open("write"):
self.store.write_metadata(self._feature_key, block)
rows_written += num_rows
except Exception:
logger.exception(
f"Failed to write {num_rows} metadata rows for feature {self._feature_key.to_string()} block {i} of task {ctx.task_idx} ({ctx.op_name})"
)
rows_failed += num_rows
return _WriteTaskResult(rows_written=rows_written, rows_failed=rows_failed)
on_write_complete
¶
on_write_complete(
write_result: WriteResult[_WriteTaskResult],
) -> None
Aggregate write statistics from all tasks.
Source code in src/metaxy/ext/ray/datasink.py
def on_write_complete(self, write_result: WriteResult[_WriteTaskResult]) -> None:
"""Aggregate write statistics from all tasks."""
rows_written = 0
rows_failed = 0
for task_result in write_result.write_returns:
rows_written += task_result.rows_written
rows_failed += task_result.rows_failed
self._result = MetaxyWriteResult(rows_written=rows_written, rows_failed=rows_failed)
logger.info(
f"MetaxyDatasink write complete for {self._feature_key.to_string()}: "
f"{rows_written} rows written, {rows_failed} rows failed"
)
metaxy.ext.ray.MetaxyDatasource
¶
MetaxyDatasource(
feature: CoercibleToFeatureKey,
store: MetadataStore,
config: MetaxyConfig | None = None,
*,
incremental: bool = False,
feature_version: str | None = None,
filters: Sequence[Expr] | None = None,
columns: Sequence[str] | None = None,
allow_fallback: bool = True,
current_only: bool = True,
latest_only: bool = True,
include_soft_deleted: bool = False,
)
Bases: Datasource
A Ray Data Datasource for reading from a Metaxy metadata store.
This datasource reads metadata entries from a Metaxy metadata store as Ray Data blocks, associated with a specific feature key.
Example
with filters and column selection
incremental mode
Parameters:
-
feature(CoercibleToFeatureKey) –Feature to read metadata for.
-
store(MetadataStore) –Metadata store to read from.
-
config(MetaxyConfig | None, default:None) –Metaxy configuration. Will be auto-discovered by the worker if not provided.
Warning
Ensure the Ray environment is set up properly when not passing
configexplicitly. This can be achieved by settingMETAXY_CONFIGand otherMETAXY_environment variables. The best practice is to passconfigexplicitly to avoid any surprises. -
incremental(bool, default:False) –If
True, return only samples that need processing (new and stale). Adds ametaxy_statuscolumn with values:-
"new": samples that have not been processed yet -
"stale": samples that have been processed but have to be reprocessed
-
-
filters(Sequence[Expr] | None, default:None) –Sequence of Narwhals filter expressions to apply.
-
columns(Sequence[str] | None, default:None) –Subset of columns to include. Metaxy's system columns are always included.
-
allow_fallback(bool, default:True) –If
True, check fallback stores on main store miss. -
current_only(bool, default:True) –If
True, only return rows with current feature version. -
feature_version(str | None, default:None) –Explicit feature version to filter by (mutually exclusive with
current_only=True). -
latest_only(bool, default:True) –Whether to deduplicate samples within
id_columnsgroups ordered bymetaxy_created_at. -
include_soft_deleted(bool, default:False) –If
True, include soft-deleted rows in the result.
Source code in src/metaxy/ext/ray/datasource.py
def __init__(
self,
feature: mx.CoercibleToFeatureKey,
store: mx.MetadataStore,
config: mx.MetaxyConfig | None = None,
*,
incremental: bool = False,
feature_version: str | None = None,
filters: Sequence[nw.Expr] | None = None,
columns: Sequence[str] | None = None,
allow_fallback: bool = True,
current_only: bool = True,
latest_only: bool = True,
include_soft_deleted: bool = False,
):
self.config = mx.init_metaxy(config)
self.store = store
self.incremental = incremental
self.feature_version = feature_version
self.filters = list(filters) if filters else None
self.columns = list(columns) if columns else None
self.allow_fallback = allow_fallback
self.current_only = current_only
self.latest_only = latest_only
self.include_soft_deleted = include_soft_deleted
self._feature_key = mx.coerce_to_feature_key(feature)
Functions¶
get_read_tasks
¶
Return read tasks for the feature metadata.
Parameters:
-
parallelism(int) –Requested parallelism level (currently ignored, returns single task).
-
per_task_row_limit(int | None, default:None) –Maximum rows per returned block. If set, the data will be split into multiple blocks of at most this size.
Returns:
-
list[ReadTask]–List containing a single ReadTask that may return multiple blocks.
Source code in src/metaxy/ext/ray/datasource.py
def get_read_tasks(self, parallelism: int, per_task_row_limit: int | None = None) -> list[ReadTask]:
"""Return read tasks for the feature metadata.
Args:
parallelism: Requested parallelism level (currently ignored, returns single task).
per_task_row_limit: Maximum rows per returned block. If set, the data will be
split into multiple blocks of at most this size.
Returns:
List containing a single ReadTask that may return multiple blocks.
"""
num_rows = self._get_row_count()
# Capture self for the closure
datasource = self
row_limit = per_task_row_limit
def read_fn() -> list[pa.Table]:
mx.init_metaxy(datasource.config)
with datasource.store.open("read"):
lf = datasource._read_metadata_lazy()
table = lf.collect(backend="pyarrow").to_arrow()
batches = table.to_batches(max_chunksize=row_limit)
return [pa.Table.from_batches([b]) for b in batches]
metadata = BlockMetadata(
num_rows=num_rows,
size_bytes=None,
input_files=None,
exec_stats=None,
)
return [ReadTask(read_fn, metadata)]
members: true