Skip to content

Utils

metaxy.utils

Utility modules for Metaxy.

Classes

metaxy.utils.BatchedMetadataWriter

BatchedMetadataWriter(
    store: MetadataStore,
    flush_batch_size: int | None = None,
    flush_interval: float = 2.0,
)

Batched metadata writer with background flush thread.

Queues data and writes to a MetadataStore in batches either when:

  • The batch reaches flush_batch_size rows (if set)
  • flush_interval seconds have passed since last flush

The writer runs a background thread that handles flushing, allowing the main thread to continue processing data without blocking on writes.

Example
import polars as pl

with mx.BatchedMetadataWriter(store) as writer:
    batch = {
        MyFeature: pl.DataFrame(
            {
                "id": ["x"],
                "metaxy_provenance_by_field": [{"part_1": "h1", "part_2": "h2"}],
            }
        )
    }
    writer.put(batch)

with store:
    assert len(store.read_metadata(MyFeature).collect()) == 1
Manual lifecycle management

writer = mx.BatchedMetadataWriter(store)
writer.start()
try:
    for batch_dict in data_stream:
        writer.put(batch_dict)
finally:
    rows_written = writer.stop()

Parameters:

  • store (MetadataStore) –

    The MetadataStore to write to. Must be opened before use.

  • flush_batch_size (int | None, default: None ) –

    Number of rows to accumulate before flushing. If not set, flushes are only triggered by flush_interval or when stopping the writer.

    Note

    Setting this triggers row counting which materializes lazy frames.

  • flush_interval (float, default: 2.0 ) –

    Maximum seconds between flushes. The timer resets after the end of each flush.

Raises:

  • RuntimeError

    If the background thread encounters an error during flush.

Source code in src/metaxy/utils/batched_writer.py
def __init__(
    self,
    store: MetadataStore,
    flush_batch_size: int | None = None,
    flush_interval: float = 2.0,
) -> None:
    self._store = store
    self._flush_batch_size = flush_batch_size
    self._flush_interval = flush_interval

    self._queue: queue.Queue[QueueItem] = queue.Queue()
    self._should_stop = threading.Event()
    self._stopped = threading.Event()
    self._num_written: dict[FeatureKey, int] = {}
    self._lock = threading.Lock()
    self._error: BaseException | None = None

    self._thread: threading.Thread | None = None
    self._started = False

    # Capture context at construction time to propagate to background thread
    # This is necessary because ContextVars don't propagate to child threads
    self._graph = FeatureGraph.get_active()
    self._config = MetaxyConfig.get()
Attributes
num_written property
num_written: dict[FeatureKey, int]

Number of rows written so far per feature.

This property is thread-safe and can be called while the writer is still running to check progress.

Returns:

  • dict[FeatureKey, int]

    Dict mapping feature keys to number of rows successfully flushed to the store.

has_error property
has_error: bool

Check if the writer has encountered an error.

Returns:

  • bool

    True if the background thread encountered an error, False otherwise.

Functions
start
start() -> None

Start the background flush thread.

This method must be called before putting data. When using the writer as a context manager, this is called automatically on entry.

Raises:

Source code in src/metaxy/utils/batched_writer.py
def start(self) -> None:
    """Start the background flush thread.

    This method must be called before putting data. When using the writer
    as a context manager, this is called automatically on entry.

    Raises:
        RuntimeError: If the writer has already been started.
    """
    if self._started:
        raise RuntimeError("Writer has already been started")

    self._thread = threading.Thread(target=self._run, daemon=True)
    self._thread.start()
    self._started = True
put
put(
    batches: Mapping[CoercibleToFeatureKey, IntoFrame],
) -> None

Queue batches for writing.

The batches are accumulated per-feature and written together using [MetadataStore.write_metadata_multi][].

Parameters:

Raises:

  • RuntimeError

    If the writer has not been started, has been stopped, or encountered an error.

Source code in src/metaxy/utils/batched_writer.py
def put(self, batches: Mapping[CoercibleToFeatureKey, IntoFrame]) -> None:
    """Queue batches for writing.

    The batches are accumulated per-feature and written together using
    `[MetadataStore.write_metadata_multi][]`.

    Args:
        batches: Mapping from feature keys to dataframes.
            Dataframes can be of any type supported by [Narwhals](https://narwhals-dev.github.io/narwhals/).

    Raises:
        RuntimeError: If the writer has not been started, has been stopped,
            or encountered an error.
    """
    self._check_can_put()

    # Convert all keys and values
    converted: dict[FeatureKey, Frame] = {}
    for key, batch in batches.items():
        feature_key = ValidatedFeatureKeyAdapter.validate_python(key)
        batch_nw = self._to_narwhals(batch)
        converted[feature_key] = batch_nw

    self._queue.put(converted)
stop
stop(timeout: float = 30.0) -> dict[FeatureKey, int]

Signal stop and wait for flush to complete.

This method signals the background thread to stop, waits for it to finish flushing any remaining data, and returns the number of rows written per feature.

Parameters:

  • timeout (float, default: 30.0 ) –

    Maximum seconds to wait for the background thread. Defaults to 30.0.

Returns:

Raises:

  • RuntimeError

    If the background thread encountered an error during flush.

Source code in src/metaxy/utils/batched_writer.py
def stop(self, timeout: float = 30.0) -> dict[FeatureKey, int]:
    """Signal stop and wait for flush to complete.

    This method signals the background thread to stop, waits for it to
    finish flushing any remaining data, and returns the number of rows
    written per feature.

    Args:
        timeout: Maximum seconds to wait for the background thread.
            Defaults to 30.0.

    Returns:
        Dict mapping feature keys to number of rows written.

    Raises:
        RuntimeError: If the background thread encountered an error during flush.
    """
    if not self._started or self._thread is None:
        return {}

    self._should_stop.set()
    self._thread.join(timeout=timeout)

    if self._thread.is_alive():
        logger.warning("BatchedMetadataWriter did not stop within %.1fs", timeout)

    if self._error is not None:
        raise RuntimeError(f"Writer encountered an error: {self._error}") from self._error

    with self._lock:
        return dict(self._num_written)
__enter__
__enter__() -> BatchedMetadataWriter

Enter context manager, starting the background thread.

Source code in src/metaxy/utils/batched_writer.py
def __enter__(self) -> BatchedMetadataWriter:
    """Enter context manager, starting the background thread."""
    self.start()
    return self
__exit__
__exit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit context manager, stopping the writer.

Source code in src/metaxy/utils/batched_writer.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit context manager, stopping the writer."""
    self.stop()