Skip to content

Expansion

Overview

View Source on GitHub

This example demonstrates how to implement expansion (1:N) transformations with Metaxy. In such relationships a single parent sample can map into multiple child samples.

These relationships can be modeled with LineageRelationship.expansion lineage type.

We will use a hypothetical video chunking pipeline as an example.

The Pipeline

We are going to define a typical video processing pipeline with three features:

---
title: Feature Graph
---
flowchart LR
    %% Snapshot version: none
    %%{init: {'flowchart': {'htmlLabels': true, 'curve': 'basis'}, 'themeVariables': {'fontSize': '14px'}}}%%
    video_raw["<div style="text-align:left"><b>video/raw</b><br/>d842b740<br/><font color="#999">---</font><br/>- audio (7132721c)<br/>- frames (3f6f401c)</div>"]
    video_chunk["<div style="text-align:left"><b>video/chunk</b><br/>7dc3712a<br/><font color="#999">---</font><br/>- audio (1549b1fa)<br/>- frames (df8943d3)</div>"]
    video_faces["<div style="text-align:left"><b>video/faces</b><br/>52f055e3<br/><font color="#999">---</font><br/>- faces (0561bd8c)</div>"]
    video_raw --> video_chunk
    video_chunk --> video_faces

Defining features: "video/raw"

Each video-like feature in our pipeline is going to have two fields: audio and frames.

Let's set the code version of audio to "1" in order to change it in the future. frames field will have a default version.

src/example_expansion/features.py
import metaxy as mx


class Video(
    mx.BaseFeature,
    spec=mx.FeatureSpec(
        key="video/raw",
        id_columns=["video_id"],
        fields=[
            mx.FieldSpec(key="audio", code_version="1"),
            "frames",
        ],
    ),
):
    video_id: str
    path: str  # where the video is stored

Defining features: "video/chunk"

"video/chunk" represents a piece of the upstream "video/raw" feature. Since each "video/raw" sample can be split into multiple chunks, we need to tell Metaxy how to map each chunk to its parent video.

src/example_expansion/features.py
class VideoChunk(
    mx.BaseFeature,
    spec=mx.FeatureSpec(
        key=["video", "chunk"],
        id_columns=["video_chunk_id"],
        deps=[
            mx.FeatureDep(
                feature=Video,
                lineage=mx.LineageRelationship.expansion(on=["video_id"]),
            )
        ],
        fields=["audio", "frames"],
    ),
):
    video_id: str  # points to the parent video
    video_chunk_id: str
    path: str  # where the video chunk is stored

We do not specify custom versions on its fields. Metaxy will automatically assign field-level lineage by matching on field names: "video/chunk:frames" depends on "video/raw:frames" and "video/chunk:audio" depends on "video/raw:audio".

flowchart LR
    subgraph video_raw["video/raw"]
        video_raw_audio["audio"]
        video_raw_frames["frames"]
    end
    subgraph video_chunk["video/chunk"]
        video_chunk_audio["audio"]
        video_chunk_frames["frames"]
    end
    video_raw_audio --> video_chunk_audio
    video_raw_frames --> video_chunk_frames
    style video_raw stroke:#4C78A8,stroke-width:2px
    style video_chunk stroke:#4C78A8,stroke-width:2px

Defining features: "video/faces"

"video/faces" processes video chunks and only depends on the frames field. This can be expressed with a FieldDep.

src/example_expansion/features.py
class FaceRecognition(
    mx.BaseFeature,
    spec=mx.FeatureSpec(
        key=["video", "faces"],
        id_columns=["video_chunk_id"],
        deps=[VideoChunk],
        fields=[
            mx.FieldSpec(
                key="faces", deps=[mx.FieldDep(feature=VideoChunk, fields=["frames"])]
            )
        ],
    ),
):
    video_chunk_id: str
    num_faces: int  # number of faces detected

flowchart LR
    subgraph video_chunk["video/chunk"]
        video_chunk_audio["audio"]
        video_chunk_frames["frames"]
    end
    subgraph video_faces["video/faces"]
        video_faces_faces["faces"]
    end
    video_chunk_frames --> video_faces_faces
    style video_chunk stroke:#4C78A8,stroke-width:2px
    style video_faces stroke:#4C78A8,stroke-width:2px
This completes the feature definitions. Let's proceed to running the pipeline.

Getting Started

Install the example's dependencies:

uv sync

Walkthrough

Here is a toy pipeline for computing the feature graph described above:

pipeline.py
pipeline.py
import os
import random

import metaxy as mx
import narwhals as nw
import polars as pl

from example_expansion.features import FaceRecognition, Video, VideoChunk
from example_expansion.utils import split_video_into_chunks


def main():
    # Set random seed from environment if provided (for deterministic testing)
    if seed_str := os.environ.get("RANDOM_SEED"):
        random.seed(int(seed_str))
    cfg = mx.init()
    store = cfg.get_store("dev")

    # let's pretend somebody has already created the videos for us
    samples = pl.DataFrame(
        {
            "video_id": [1, 2, 3],
            "path": ["video1.mp4", "video2.mp4", "video3.mp4"],
            "metaxy_provenance_by_field": [
                {"audio": "v1", "frames": "v1"},
                {"audio": "v2", "frames": "v2"},
                {"audio": "v3", "frames": "v3"},
            ],
        }
    )

    with store:
        # showcase: resolve incremental update for a root feature
        increment = store.resolve_update(Video, samples=nw.from_native(samples))
        if len(increment.new) > 0:
            print(f"Found {len(increment.new)} new videos")
            store.write(Video, increment.new)

    # Resolve videos that need to be split into chunks
    with store:
        increment = store.resolve_update(VideoChunk)
        # the DataFrame dimensions matches Video (with ID column renamed)

        print(
            f"Found {len(increment.new)} new videos and {len(increment.stale)} stale videos that need chunking"
        )

        for row_dict in pl.concat(
            [increment.new.to_polars(), increment.stale.to_polars()]
        ).iter_rows(named=True):
            print(f"Processing video: {row_dict}")
            # let's split each video to 3-5 chunks randomly

            video_id = row_dict["video_id"]
            path = row_dict["path"]

            provenance_by_field = row_dict["metaxy_provenance_by_field"]
            provenance = row_dict["metaxy_provenance"]

            # pretend we split the video into chunks
            chunk_paths = split_video_into_chunks(path)

            # Generate chunk IDs based on the parent video ID
            chunk_ids = [f"{video_id}_{i}" for i in range(len(chunk_paths))]

            # write the chunks to the store
            # CRUSIAL: all the chunks **must share the same provenance values**
            chunk_df = pl.DataFrame(
                {
                    "video_id": [video_id] * len(chunk_paths),
                    "video_chunk_id": chunk_ids,
                    "path": chunk_paths,
                    "metaxy_provenance_by_field": [provenance_by_field]
                    * len(chunk_paths),
                    "metaxy_provenance": [provenance] * len(chunk_paths),
                }
            )
            print(f"Writing {len(chunk_paths)} chunks for video {video_id}")
            store.write(VideoChunk, nw.from_native(chunk_df))

    # Process face recognition on video chunks
    with store:
        increment = store.resolve_update(FaceRecognition)
        print(
            f"Found {len(increment.new)} new video chunks and {len(increment.stale)} stale video chunks that need face recognition"
        )

        if len(increment.new) > 0:
            # simulate face detection on each chunk
            face_data = []
            for row_dict in pl.concat(
                [increment.new.to_polars(), increment.stale.to_polars()]
            ).iter_rows(named=True):
                video_chunk_id = row_dict["video_chunk_id"]
                provenance_by_field = row_dict["metaxy_provenance_by_field"]
                provenance = row_dict["metaxy_provenance"]

                # simulate detecting random number of faces
                num_faces = random.randint(0, 10)

                face_data.append(
                    {
                        "video_chunk_id": video_chunk_id,
                        "num_faces": num_faces,
                        "metaxy_provenance_by_field": provenance_by_field,
                        "metaxy_provenance": provenance,
                    }
                )

            face_df = pl.DataFrame(face_data)
            print(f"Writing face recognition results for {len(face_data)} chunks")
            store.write(FaceRecognition, nw.from_native(face_df))


if __name__ == "__main__":
    main()

Step 1: Launch Initial Run

Run the pipeline to create videos, chunks, and face recognition results:

$ python pipeline.py
Found 3 new videos
Found 3 new videos and 0 stale videos that need chunking
Processing video: {'video_id': 1, 'path': 'video1.mp4', 'metaxy_deleted_at': None, 'metaxy_provenance_by_field': {'audio': '23124187', 'frames': '24116928'}, 'metaxy_provenance': '40433844', 'metaxy_data_version': '40433844', 'metaxy_data_version_by_field': {'audio': '23124187', 'frames': '24116928'}}
Writing 5 chunks for video 1
Processing video: {'video_id': 2, 'path': 'video2.mp4', 'metaxy_deleted_at': None, 'metaxy_provenance_by_field': {'audio': '18611808', 'frames': '18345318'}, 'metaxy_provenance': '12032307', 'metaxy_data_version': '12032307', 'metaxy_data_version_by_field': {'audio': '18611808', 'frames': '18345318'}}
Writing 3 chunks for video 2
Processing video: {'video_id': 3, 'path': 'video3.mp4', 'metaxy_deleted_at': None, 'metaxy_provenance_by_field': {'audio': '24638180', 'frames': '16645902'}, 'metaxy_provenance': '21310928', 'metaxy_data_version': '21310928', 'metaxy_data_version_by_field': {'audio': '24638180', 'frames': '16645902'}}
Writing 3 chunks for video 3
Found 11 new video chunks and 0 stale video chunks that need face recognition
Writing face recognition results for 11 chunks

All three features have been materialized. Note that the "video/chunk" feature may dynamically create as many samples as needed: Metaxy doesn't need to know anything about this in advance, except the relationship type.

Step 2: Verify Idempotency

Run the pipeline again without any changes:

$ python pipeline.py
Found 0 new videos and 0 stale videos that need chunking
Found 0 new video chunks and 0 stale video chunks that need face recognition

Nothing needs recomputation - the system correctly detects no changes.

Step 3: Change Audio Code Version

Now let's bump the code version on the audio field of "video/raw" feature:

patches/01_update_video_code_version.patch
--- a/src/example_expansion/features.py
+++ b/src/example_expansion/features.py
@@ -9,6 +9,6 @@ class Video(
         id_columns=["video_id"],
         fields=[
-            mx.FieldSpec(key="audio", code_version="1"),
+            mx.FieldSpec(key="audio", code_version="2"),
             "frames",
         ],
     ),
---
title: Feature Graph Changes
---
flowchart TB
    %% Snapshot version: none
    %%{init: {'flowchart': {'htmlLabels': true, 'curve': 'basis'}, 'themeVariables': {'fontSize': '14px'}}}%%
    video_raw["<div style="text-align:left"><b>video/raw</b><br/><font color="#FF0000">d842b740</font> → <font color="#00FF00">6e5a4ab8</font><br/><font color="#999">---</font><br/>- frames (3f6f401c)<br/>- <font color="#FFAA00">audio</font> (<font color="#FF0000">7132721c</font> → <font color="#00FF00">f8e35296</font>)</div>"]
    video_chunk["<div style="text-align:left"><b>video/chunk</b><br/><font color="#FF0000">7dc3712a</font> → <font color="#00FF00">504507a9</font><br/><font color="#999">---</font><br/>- frames (df8943d3)<br/>- <font color="#FFAA00">audio</font> (<font color="#FF0000">1549b1fa</font> → <font color="#00FF00">7948d163</font>)</div>"]
    video_faces["<div style="text-align:left"><b>video/faces</b><br/>52f055e3<br/><font color="#999">---</font><br/>- faces (0561bd8c)</div>"]
    video_raw --> video_chunk
    video_chunk --> video_faces


    style video_chunk stroke:#FFAA00,stroke-width:2px
    style video_faces stroke:#808080
    style video_raw stroke:#FFAA00,stroke-width:2px

This represents updating the audio processing algorithm, and therefore the audio data, while frame data is kept the same.

Step 4: Observe Field-Level Tracking

Run the pipeline again after the code change:

$ python pipeline.py
Found 3 new videos
Found 3 new videos and 0 stale videos that need chunking
Processing video: {'video_id': 1, 'path': 'video1.mp4', 'metaxy_deleted_at': None, 'metaxy_provenance_by_field': {'audio': '23124187', 'frames': '24116928'}, 'metaxy_provenance': '40433844', 'metaxy_data_version': '40433844', 'metaxy_data_version_by_field': {'audio': '23124187', 'frames': '24116928'}}
Writing 5 chunks for video 1
Processing video: {'video_id': 2, 'path': 'video2.mp4', 'metaxy_deleted_at': None, 'metaxy_provenance_by_field': {'audio': '18611808', 'frames': '18345318'}, 'metaxy_provenance': '12032307', 'metaxy_data_version': '12032307', 'metaxy_data_version_by_field': {'audio': '18611808', 'frames': '18345318'}}
Writing 3 chunks for video 2
Processing video: {'video_id': 3, 'path': 'video3.mp4', 'metaxy_deleted_at': None, 'metaxy_provenance_by_field': {'audio': '24638180', 'frames': '16645902'}, 'metaxy_provenance': '21310928', 'metaxy_data_version': '21310928', 'metaxy_data_version_by_field': {'audio': '24638180', 'frames': '16645902'}}
Writing 3 chunks for video 3
Found 0 new video chunks and 0 stale video chunks that need face recognition

Key observation:

  • "video/chunk" has been recomputed since the audio field on it has been affected by the upstream change
  • "video/faces" did not require a recompute, because it only depends on the frames field (which did not change)

Conclusion

Metaxy provides a convenient API for modeling expansion relationships: LineageRelationship.expansion. Other Metaxy features such as field-level versioning continue to work seamlessly when declaring expansion relationships.

Learn more about: