Utils
metaxy.utils
¶
Utility modules for Metaxy.
Classes¶
metaxy.utils.BatchedMetadataWriter
¶
BatchedMetadataWriter(
store: MetadataStore,
flush_batch_size: int | None = None,
flush_interval: float = 2.0,
)
Batched metadata writer with background flush thread.
Queues data and writes to a MetadataStore in batches either when:
- The batch reaches
flush_batch_sizerows (if set) flush_intervalseconds have passed since last flush
The writer runs a background thread that handles flushing, allowing the main thread to continue processing data without blocking on writes.
Example
Manual lifecycle management
Parameters:
-
store(MetadataStore) –The MetadataStore to write to. Must be opened before use.
-
flush_batch_size(int | None, default:None) –Number of rows to accumulate before flushing. If not set, flushes are only triggered by
flush_intervalor when stopping the writer.Note
Setting this triggers row counting which materializes lazy frames.
-
flush_interval(float, default:2.0) –Maximum seconds between flushes. The timer resets after the end of each flush.
Raises:
-
RuntimeError–If the background thread encounters an error during flush.
Source code in src/metaxy/utils/batched_writer.py
def __init__(
self,
store: MetadataStore,
flush_batch_size: int | None = None,
flush_interval: float = 2.0,
) -> None:
self._store = store
self._flush_batch_size = flush_batch_size
self._flush_interval = flush_interval
self._queue: queue.Queue[QueueItem] = queue.Queue()
self._should_stop = threading.Event()
self._stopped = threading.Event()
self._num_written: dict[FeatureKey, int] = {}
self._lock = threading.Lock()
self._error: BaseException | None = None
self._thread: threading.Thread | None = None
self._started = False
# Capture context at construction time to propagate to background thread
# This is necessary because ContextVars don't propagate to child threads
self._graph = FeatureGraph.get_active()
self._config = MetaxyConfig.get()
Attributes¶
num_written
property
¶
num_written: dict[FeatureKey, int]
Number of rows written so far per feature.
This property is thread-safe and can be called while the writer is still running to check progress.
Returns:
-
dict[FeatureKey, int]–Dict mapping feature keys to number of rows successfully flushed to the store.
has_error
property
¶
has_error: bool
Check if the writer has encountered an error.
Returns:
-
bool–True if the background thread encountered an error, False otherwise.
Functions¶
start
¶
Start the background flush thread.
This method must be called before putting data. When using the writer as a context manager, this is called automatically on entry.
Raises:
-
RuntimeError–If the writer has already been started.
Source code in src/metaxy/utils/batched_writer.py
def start(self) -> None:
"""Start the background flush thread.
This method must be called before putting data. When using the writer
as a context manager, this is called automatically on entry.
Raises:
RuntimeError: If the writer has already been started.
"""
if self._started:
raise RuntimeError("Writer has already been started")
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
self._started = True
put
¶
put(
batches: Mapping[CoercibleToFeatureKey, IntoFrame],
) -> None
Queue batches for writing.
The batches are accumulated per-feature and written together using
[MetadataStore.write_metadata_multi][].
Parameters:
-
batches(Mapping[CoercibleToFeatureKey, IntoFrame]) –Mapping from feature keys to dataframes. Dataframes can be of any type supported by Narwhals.
Raises:
-
RuntimeError–If the writer has not been started, has been stopped, or encountered an error.
Source code in src/metaxy/utils/batched_writer.py
def put(self, batches: Mapping[CoercibleToFeatureKey, IntoFrame]) -> None:
"""Queue batches for writing.
The batches are accumulated per-feature and written together using
`[MetadataStore.write_metadata_multi][]`.
Args:
batches: Mapping from feature keys to dataframes.
Dataframes can be of any type supported by [Narwhals](https://narwhals-dev.github.io/narwhals/).
Raises:
RuntimeError: If the writer has not been started, has been stopped,
or encountered an error.
"""
self._check_can_put()
# Convert all keys and values
converted: dict[FeatureKey, Frame] = {}
for key, batch in batches.items():
feature_key = ValidatedFeatureKeyAdapter.validate_python(key)
batch_nw = self._to_narwhals(batch)
converted[feature_key] = batch_nw
self._queue.put(converted)
stop
¶
stop(timeout: float = 30.0) -> dict[FeatureKey, int]
Signal stop and wait for flush to complete.
This method signals the background thread to stop, waits for it to finish flushing any remaining data, and returns the number of rows written per feature.
Parameters:
-
timeout(float, default:30.0) –Maximum seconds to wait for the background thread. Defaults to 30.0.
Returns:
-
dict[FeatureKey, int]–Dict mapping feature keys to number of rows written.
Raises:
-
RuntimeError–If the background thread encountered an error during flush.
Source code in src/metaxy/utils/batched_writer.py
def stop(self, timeout: float = 30.0) -> dict[FeatureKey, int]:
"""Signal stop and wait for flush to complete.
This method signals the background thread to stop, waits for it to
finish flushing any remaining data, and returns the number of rows
written per feature.
Args:
timeout: Maximum seconds to wait for the background thread.
Defaults to 30.0.
Returns:
Dict mapping feature keys to number of rows written.
Raises:
RuntimeError: If the background thread encountered an error during flush.
"""
if not self._started or self._thread is None:
return {}
self._should_stop.set()
self._thread.join(timeout=timeout)
if self._thread.is_alive():
logger.warning("BatchedMetadataWriter did not stop within %.1fs", timeout)
if self._error is not None:
raise RuntimeError(f"Writer encountered an error: {self._error}") from self._error
with self._lock:
return dict(self._num_written)
__enter__
¶
__enter__() -> BatchedMetadataWriter
__exit__
¶
__exit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None