Utils
metaxy.utils
¶
Utility modules for Metaxy.
Classes¶
metaxy.utils.BufferedMetadataWriter
¶
BufferedMetadataWriter(
store: MetadataStore,
flush_batch_size: int | None = None,
flush_interval: float = 2.0,
max_queue_size: int = 0,
)
Buffered metadata writer with background flush thread.
Queues data and writes to a MetadataStore in batches either when:
- The batch reaches
flush_batch_sizerows (if set) flush_intervalseconds have passed since last flush
The writer runs a background thread that handles flushing, allowing the main thread to continue processing data without blocking on writes.
Example
Manual lifecycle management
Parameters:
-
store(MetadataStore) –The MetadataStore to write to. Must be opened before use.
-
flush_batch_size(int | None, default:None) –Number of rows to accumulate before flushing. If not set, flushes are only triggered by
flush_intervalor when stopping the writer.Note
Setting this triggers row counting which materializes lazy frames.
-
flush_interval(float, default:2.0) –Maximum seconds between flushes. The timer resets after the end of each flush.
-
max_queue_size(int, default:0) –Maximum number of pending items in the queue. When the queue is full,
put()blocks until the background thread consumes an item. Defaults to an unlimited queue (the right choice in most cases).
Raises:
-
RuntimeError–If the background thread encounters an error during flush.
Source code in src/metaxy/utils/batched_writer.py
def __init__(
self,
store: MetadataStore,
flush_batch_size: int | None = None,
flush_interval: float = 2.0,
max_queue_size: int = 0,
) -> None:
self._store = store
self._flush_batch_size = flush_batch_size
self._flush_interval = flush_interval
self._queue: queue.Queue[QueueItem | _FlushSignal] = queue.Queue(maxsize=max_queue_size)
self._should_stop = threading.Event()
self._stopped = threading.Event()
self._num_written: dict[FeatureKey, int] = {}
self._lock = threading.Lock()
self._error: BaseException | None = None
self._thread: threading.Thread | None = None
self._started = False
# Capture context at construction time to propagate to background thread
# This is necessary because ContextVars don't propagate to child threads
self._graph = FeatureGraph.get_active()
self._config = MetaxyConfig.get()
Attributes¶
num_written
property
¶
num_written: dict[FeatureKey, int]
Number of rows written so far per feature.
This property is thread-safe and can be called while the writer is still running to check progress.
Returns:
-
dict[FeatureKey, int]–Dict mapping feature keys to number of rows successfully flushed to the store.
has_error
property
¶
has_error: bool
Check if the writer has encountered an error.
Returns:
-
bool–True if the background thread encountered an error, False otherwise.
Functions¶
start
¶
Start the background flush thread.
This method must be called before putting data. When using the writer as a context manager, this is called automatically on entry.
Raises:
-
RuntimeError–If the writer has already been started.
Source code in src/metaxy/utils/batched_writer.py
def start(self) -> None:
"""Start the background flush thread.
This method must be called before putting data. When using the writer
as a context manager, this is called automatically on entry.
Raises:
RuntimeError: If the writer has already been started.
"""
if self._started:
raise RuntimeError("Writer has already been started")
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
self._started = True
put
¶
put(
batches: Mapping[CoercibleToFeatureKey, IntoFrame],
) -> None
Queue batches for writing.
The batches are accumulated per-feature and written together using
[MetadataStore.write_multi][].
Parameters:
-
batches(Mapping[CoercibleToFeatureKey, IntoFrame]) –Mapping from feature keys to dataframes. Dataframes can be of any type supported by Narwhals.
Raises:
-
RuntimeError–If the writer has not been started, has been stopped, or encountered an error.
Source code in src/metaxy/utils/batched_writer.py
def put(self, batches: Mapping[CoercibleToFeatureKey, IntoFrame]) -> None:
"""Queue batches for writing.
The batches are accumulated per-feature and written together using
`[MetadataStore.write_multi][]`.
Args:
batches: Mapping from feature keys to dataframes.
Dataframes can be of any type supported by [Narwhals](https://narwhals-dev.github.io/narwhals/).
Raises:
RuntimeError: If the writer has not been started, has been stopped,
or encountered an error.
"""
self._check_can_put()
# Convert all keys and values
converted: dict[FeatureKey, Frame] = {}
for key, batch in batches.items():
feature_key = ValidatedFeatureKeyAdapter.validate_python(key)
batch_nw = self._to_narwhals(batch)
converted[feature_key] = batch_nw
self._queue.put(converted)
flush
¶
flush(timeout: float = 30.0) -> None
Flush all pending data to the store.
Blocks until the background thread has flushed all currently accumulated data, or until the timeout is reached.
Parameters:
-
timeout(float, default:30.0) –Maximum seconds to wait for the flush to complete.
Raises:
-
RuntimeError–If the writer is not started, has stopped, has errored, the flush does not complete within the timeout, or the flush itself encounters an error.
Source code in src/metaxy/utils/batched_writer.py
def flush(self, timeout: float = 30.0) -> None:
"""Flush all pending data to the store.
Blocks until the background thread has flushed all currently
accumulated data, or until the timeout is reached.
Args:
timeout: Maximum seconds to wait for the flush to complete.
Raises:
RuntimeError: If the writer is not started, has stopped, has errored,
the flush does not complete within the timeout, or the flush itself
encounters an error.
"""
self._check_can_put()
signal = _FlushSignal()
self._queue.put(signal)
if not signal.done.wait(timeout):
raise RuntimeError(f"Flush did not complete within {timeout:.1f}s")
if signal.error is not None:
raise RuntimeError("Flush failed") from signal.error
stop
¶
stop(timeout: float = 30.0) -> dict[FeatureKey, int]
Signal stop and wait for flush to complete.
This method signals the background thread to stop, waits for it to finish flushing any remaining data, and returns the number of rows written per feature.
Parameters:
-
timeout(float, default:30.0) –Maximum seconds to wait for the background thread. Defaults to 30.0.
Returns:
-
dict[FeatureKey, int]–Dict mapping feature keys to number of rows written.
Raises:
-
RuntimeError–If the background thread encountered an error during flush, or did not stop within the timeout.
Source code in src/metaxy/utils/batched_writer.py
def stop(self, timeout: float = 30.0) -> dict[FeatureKey, int]:
"""Signal stop and wait for flush to complete.
This method signals the background thread to stop, waits for it to
finish flushing any remaining data, and returns the number of rows
written per feature.
Args:
timeout: Maximum seconds to wait for the background thread.
Defaults to 30.0.
Returns:
Dict mapping feature keys to number of rows written.
Raises:
RuntimeError: If the background thread encountered an error during flush,
or did not stop within the timeout.
"""
if not self._started or self._thread is None:
return {}
self._should_stop.set()
self._thread.join(timeout=timeout)
if self._thread.is_alive():
raise RuntimeError(f"BufferedMetadataWriter background thread did not stop within {timeout:.1f}s")
if self._error is not None:
raise RuntimeError(f"Writer encountered an error: {self._error}") from self._error
with self._lock:
return dict(self._num_written)
__enter__
¶
__enter__() -> BufferedMetadataWriter
__exit__
¶
__exit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None
Exit context manager, stopping the writer.
Source code in src/metaxy/utils/batched_writer.py
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Exit context manager, stopping the writer."""
try:
self.stop()
except RuntimeError:
if exc_type is None:
raise
logger.exception(
"Error during BufferedMetadataWriter shutdown (suppressed to avoid masking original exception)"
)