Skip to content

Utils

metaxy.utils

Utility modules for Metaxy.

Classes

metaxy.utils.BufferedMetadataWriter

BufferedMetadataWriter(
    store: MetadataStore,
    flush_batch_size: int | None = None,
    flush_interval: float = 2.0,
    max_queue_size: int = 0,
)

Buffered metadata writer with background flush thread.

Queues data and writes to a MetadataStore in batches either when:

  • The batch reaches flush_batch_size rows (if set)
  • flush_interval seconds have passed since last flush

The writer runs a background thread that handles flushing, allowing the main thread to continue processing data without blocking on writes.

Example
import polars as pl

with mx.BufferedMetadataWriter(store) as writer:
    batch = {
        "my/feature": pl.DataFrame(
            {
                "id": ["x"],
                "metaxy_provenance_by_field": [{"part_1": "h1", "part_2": "h2"}],
            }
        )
    }
    writer.put(batch)

with store:
    assert len(store.read(MyFeature).collect()) == 1
Manual lifecycle management

writer = mx.BufferedMetadataWriter(store)
writer.start()
try:
    for batch_dict in data_stream:
        writer.put(batch_dict)
finally:
    rows_written = writer.stop()

Parameters:

  • store (MetadataStore) –

    The MetadataStore to write to. Must be opened before use.

  • flush_batch_size (int | None, default: None ) –

    Number of rows to accumulate before flushing. If not set, flushes are only triggered by flush_interval or when stopping the writer.

    Note

    Setting this triggers row counting which materializes lazy frames.

  • flush_interval (float, default: 2.0 ) –

    Maximum seconds between flushes. The timer resets after the end of each flush.

  • max_queue_size (int, default: 0 ) –

    Maximum number of pending items in the queue. When the queue is full, put() blocks until the background thread consumes an item. Defaults to an unlimited queue (the right choice in most cases).

Raises:

  • RuntimeError –

    If the background thread encounters an error during flush.

Source code in src/metaxy/utils/batched_writer.py
def __init__(
    self,
    store: MetadataStore,
    flush_batch_size: int | None = None,
    flush_interval: float = 2.0,
    max_queue_size: int = 0,
) -> None:
    self._store = store
    self._flush_batch_size = flush_batch_size
    self._flush_interval = flush_interval

    self._queue: queue.Queue[QueueItem | _FlushSignal] = queue.Queue(maxsize=max_queue_size)
    self._should_stop = threading.Event()
    self._stopped = threading.Event()
    self._num_written: dict[FeatureKey, int] = {}
    self._lock = threading.Lock()
    self._error: BaseException | None = None

    self._thread: threading.Thread | None = None
    self._started = False

    # Capture context at construction time to propagate to background thread
    # This is necessary because ContextVars don't propagate to child threads
    self._graph = FeatureGraph.get_active()
    self._config = MetaxyConfig.get()
Attributes
num_written property
num_written: dict[FeatureKey, int]

Number of rows written so far per feature.

This property is thread-safe and can be called while the writer is still running to check progress.

Returns:

  • dict[FeatureKey, int] –

    Dict mapping feature keys to number of rows successfully flushed to the store.

has_error property
has_error: bool

Check if the writer has encountered an error.

Returns:

  • bool –

    True if the background thread encountered an error, False otherwise.

Functions
start
start() -> None

Start the background flush thread.

This method must be called before putting data. When using the writer as a context manager, this is called automatically on entry.

Raises:

  • RuntimeError –

    If the writer has already been started.

Source code in src/metaxy/utils/batched_writer.py
def start(self) -> None:
    """Start the background flush thread.

    This method must be called before putting data. When using the writer
    as a context manager, this is called automatically on entry.

    Raises:
        RuntimeError: If the writer has already been started.
    """
    if self._started:
        raise RuntimeError("Writer has already been started")

    self._thread = threading.Thread(target=self._run, daemon=True)
    self._thread.start()
    self._started = True
put
put(
    batches: Mapping[CoercibleToFeatureKey, IntoFrame],
) -> None

Queue batches for writing.

The batches are accumulated per-feature and written together using [MetadataStore.write_multi][].

Parameters:

Raises:

  • RuntimeError –

    If the writer has not been started, has been stopped, or encountered an error.

Source code in src/metaxy/utils/batched_writer.py
def put(self, batches: Mapping[CoercibleToFeatureKey, IntoFrame]) -> None:
    """Queue batches for writing.

    The batches are accumulated per-feature and written together using
    `[MetadataStore.write_multi][]`.

    Args:
        batches: Mapping from feature keys to dataframes.
            Dataframes can be of any type supported by [Narwhals](https://narwhals-dev.github.io/narwhals/).

    Raises:
        RuntimeError: If the writer has not been started, has been stopped,
            or encountered an error.
    """
    self._check_can_put()

    # Convert all keys and values
    converted: dict[FeatureKey, Frame] = {}
    for key, batch in batches.items():
        feature_key = ValidatedFeatureKeyAdapter.validate_python(key)
        batch_nw = self._to_narwhals(batch)
        converted[feature_key] = batch_nw

    self._queue.put(converted)
flush
flush(timeout: float = 30.0) -> None

Flush all pending data to the store.

Blocks until the background thread has flushed all currently accumulated data, or until the timeout is reached.

Parameters:

  • timeout (float, default: 30.0 ) –

    Maximum seconds to wait for the flush to complete.

Raises:

  • RuntimeError –

    If the writer is not started, has stopped, has errored, the flush does not complete within the timeout, or the flush itself encounters an error.

Source code in src/metaxy/utils/batched_writer.py
def flush(self, timeout: float = 30.0) -> None:
    """Flush all pending data to the store.

    Blocks until the background thread has flushed all currently
    accumulated data, or until the timeout is reached.

    Args:
        timeout: Maximum seconds to wait for the flush to complete.

    Raises:
        RuntimeError: If the writer is not started, has stopped, has errored,
            the flush does not complete within the timeout, or the flush itself
            encounters an error.
    """
    self._check_can_put()
    signal = _FlushSignal()
    self._queue.put(signal)
    if not signal.done.wait(timeout):
        raise RuntimeError(f"Flush did not complete within {timeout:.1f}s")
    if signal.error is not None:
        raise RuntimeError("Flush failed") from signal.error
stop
stop(timeout: float = 30.0) -> dict[FeatureKey, int]

Signal stop and wait for flush to complete.

This method signals the background thread to stop, waits for it to finish flushing any remaining data, and returns the number of rows written per feature.

Parameters:

  • timeout (float, default: 30.0 ) –

    Maximum seconds to wait for the background thread. Defaults to 30.0.

Returns:

Raises:

  • RuntimeError –

    If the background thread encountered an error during flush, or did not stop within the timeout.

Source code in src/metaxy/utils/batched_writer.py
def stop(self, timeout: float = 30.0) -> dict[FeatureKey, int]:
    """Signal stop and wait for flush to complete.

    This method signals the background thread to stop, waits for it to
    finish flushing any remaining data, and returns the number of rows
    written per feature.

    Args:
        timeout: Maximum seconds to wait for the background thread.
            Defaults to 30.0.

    Returns:
        Dict mapping feature keys to number of rows written.

    Raises:
        RuntimeError: If the background thread encountered an error during flush,
            or did not stop within the timeout.
    """
    if not self._started or self._thread is None:
        return {}

    self._should_stop.set()
    self._thread.join(timeout=timeout)

    if self._thread.is_alive():
        raise RuntimeError(f"BufferedMetadataWriter background thread did not stop within {timeout:.1f}s")

    if self._error is not None:
        raise RuntimeError(f"Writer encountered an error: {self._error}") from self._error

    with self._lock:
        return dict(self._num_written)
__enter__
__enter__() -> BufferedMetadataWriter

Enter context manager, starting the background thread.

Source code in src/metaxy/utils/batched_writer.py
def __enter__(self) -> BufferedMetadataWriter:
    """Enter context manager, starting the background thread."""
    self.start()
    return self
__exit__
__exit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit context manager, stopping the writer.

Source code in src/metaxy/utils/batched_writer.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit context manager, stopping the writer."""
    try:
        self.stop()
    except RuntimeError:
        if exc_type is None:
            raise
        logger.exception(
            "Error during BufferedMetadataWriter shutdown (suppressed to avoid masking original exception)"
        )

Functions

metaxy.utils.collect_to_arrow

collect_to_arrow(frame: PolarsCompatibleFrame) -> Table

Convert a frame into a PyArrow Table.

If the dataframe is a Polars dataframe, it converts polars_map.Map extension columns to native Arrow MapArray when MetaxyConfig.enable_map_datatype is set.

Source code in src/metaxy/utils/dataframes.py
@public
def collect_to_arrow(frame: PolarsCompatibleFrame) -> pa.Table:
    """Convert a frame into a PyArrow Table.

    If the dataframe is a Polars dataframe, it converts `polars_map.Map`
        extension columns to native Arrow `MapArray` when `MetaxyConfig.enable_map_datatype` is set.
    """
    nw_frame = nw.from_native(frame) if isinstance(frame, (pl.DataFrame, pl.LazyFrame)) else frame
    collected = nw_frame.collect() if isinstance(nw_frame, nw.LazyFrame) else nw_frame
    table: pa.Table = collected.to_arrow()

    if collected.implementation == nw.Implementation.POLARS:
        from metaxy.utils._arrow_map import convert_extension_maps_to_native, has_extension_map_columns

        if has_extension_map_columns(table):
            table = convert_extension_maps_to_native(table)

    return table

metaxy.utils.collect_to_polars

collect_to_polars(
    frame: PolarsCompatibleFrame,
) -> DataFrame

Helper to convert a frame into an eager Polars DataFrame.

Preserves Map columns as polars_map.Map when MetaxyConfig.enable_map_datatype is set.

Parameters:

  • frame (PolarsCompatibleFrame) –

    The Narwhals frame to convert.

Returns:

  • DataFrame –

    The materialized eager Polars DataFrame.

Source code in src/metaxy/utils/dataframes.py
@public
def collect_to_polars(frame: PolarsCompatibleFrame) -> pl.DataFrame:
    """Helper to convert a frame into an eager Polars DataFrame.

    Preserves `Map` columns as `polars_map.Map` when `MetaxyConfig.enable_map_datatype` is set.

    Args:
        frame: The Narwhals frame to convert.

    Returns:
        The materialized eager Polars DataFrame.
    """
    if isinstance(frame, pl.DataFrame):
        return frame
    if isinstance(frame, pl.LazyFrame):
        return cast(pl.DataFrame, frame.collect())

    if isinstance(frame, (nw.DataFrame, nw.LazyFrame)):
        if frame.implementation == nw.Implementation.POLARS:
            native = frame.to_native()
            return cast(pl.DataFrame, native.collect() if isinstance(native, pl.LazyFrame) else native)

        map_cols = find_map_columns(frame)
        result = frame.collect().to_polars() if isinstance(frame, nw.LazyFrame) else frame.to_polars()
        if map_cols:
            from metaxy.utils._arrow_map import convert_maps_to_polars_map

            result = convert_maps_to_polars_map(result, columns=map_cols)
        return result

    collected = frame.lazy().collect()
    if isinstance(collected, pl.DataFrame):
        return collected
    return collected.to_polars()