Skip to content

Quickstart

View Source on GitHub

First Metaxy Application

1. Install Metaxy

Let's choose a backend for our first MetadataStore. A good option for local development is DeltaLake. Let's install it:

pip install 'metaxy[delta]'

Now the metadata store can be created as:

from metaxy.ext.polars.handlers.delta import DeltaMetadataStore

store = DeltaMetadataStore("/tmp/quickstart.delta")

2. Define your first Feature

Any Metaxy project must have at least one root feature.

features.py
from pydantic import Field

import metaxy as mx


class Video(
    mx.BaseFeature,
    spec=mx.FeatureSpec(
        key="video",
        id_columns=["id"],
    ),
):
    raw_video_path: str = Field(description="Path to the raw video file")
    id: str = Field(description="Unique identifier for the video")
    path: str = Field(description="Path to the processed video file")

3. Resolve a root increment

Root features are a bit special. They are entry points into the Metaxy world. Because of that, will have to provide a samples argument to MetadataStore.resolve_update, which is typically not required for non-root features.

Tip

The only requirement for this dataframe is to have a metaxy_provenance_by_field column (1).

  1. and to have appropriate ID columns
pipeline.py
# Prepare a DataFrame with incoming metadata
samples = pl.DataFrame(
    {
        "id": ["vid_001", "vid_002", "vid_003"],
        "raw_video_path": [
            "/data/raw/vid_001.mp4",
            "/data/raw/vid_002.mp4",
            "/data/raw/vid_003.mp4",
        ],
        "metaxy_provenance_by_field": [
            {"default": "a1n892ja"},  # can be a hash sum
            {"default": "2024-01-15T10:30:00Z"},  # or a modified_on timestamp
            {"default": "v1.2.3"},  # or just any string
        ],
    }
)

with store:
    increment = store.resolve_update(Video, samples=samples)

The increment object is an instance of Increment and contains three dataframes:

  • increment.new: new samples which were not previously recorded

  • increment.stale: samples which were previously recorded but have now changed

  • increment.orphaned: samples which were previously recorded but are no longer present in the input samples DataFrame

It's up to you how to handle these dataframes. Usually there will be a processing step iterating over all the rows in increment.new and increment.stale (possibly in parallel, using something like Ray), while increment.orphaned may be used to cleanup the no longer needed data and metadata.

These dataframes have pre-computed provenance columns which should not be modified and eventually should be written to the metadata store.

Tip

The dataframes will have a metaxy_data_version column which is recommended to be used for storage paths:

pipeline.py
to_process = pl.concat([increment.new.to_polars(), increment.stale.to_polars()])

result = []
for row in to_process.iter_rows(named=True):
    path = f"/data/processed/{row['id']}/{row['metaxy_data_version']}/video.mp4"
    result.append({**row, "path": path})

4. Record metadata for processed samples

Once done, write the metadata for the processed samples:

pipeline.py
if result:
    with store.open("w"):
        store.write(Video, pl.DataFrame(result))

Recorded samples will no longer be returned by MetadataStore.resolve_update during future pipeline runs, unless the incoming metaxy_provenance_by_field values are updated.

Flushing Metadata In The Background

Usually it's desired to write metadata to the metadata store as soon as it becomes available. This ensures the pipeline can resume processing after a failure and no data is lost. BufferedMetadataWriter can be used to achieve this: it writes metadata in real-time from a background thread.

Feature Dependencies

Now let's add a downstream feature. We can use deps field on FeatureSpec in order to do that. We will make a simple feature that extracts the audio track from a video.

features.py
class Audio(
    mx.BaseFeature,
    spec=mx.FeatureSpec(
        key="audio",
        deps=[Video],
        id_columns=["id"],
    ),
):
    id: str = Field(description="Unique identifier for the audio")
    path: str = Field(description="Path to the audio file")

And call the familiar resolve_update API:

pipeline.py
with store:
    audio_increment = store.resolve_update(Audio)

That's all! The increment can be handled similarly to the "video" feature.

Advanced Feature Definitions

Learn how to mark dependencies as optional, specify field-level versions and dependencies, lineage types, and other advanced Metaxy features in definitions docs.

What's Next?

Here are a few more useful links: