Expansion¶
Overview¶
This example demonstrates how to implement expansion (1:N) transformations with Metaxy.
In such relationships a single parent sample can map into multiple child samples.
These relationships can be modeled with LineageRelationship.expansion lineage type.
We will use a hypothetical video chunking pipeline as an example.
The Pipeline¶
We are going to define a typical video processing pipeline with three features:
---
title: Feature Graph
---
flowchart LR
%% Snapshot version: none
%%{init: {'flowchart': {'htmlLabels': true, 'curve': 'basis'}, 'themeVariables': {'fontSize': '14px'}}}%%
video_raw["<div style="text-align:left"><b>video/raw</b><br/>d842b740<br/><font color="#999">---</font><br/>- audio (7132721c)<br/>- frames (3f6f401c)</div>"]
video_chunk["<div style="text-align:left"><b>video/chunk</b><br/>7dc3712a<br/><font color="#999">---</font><br/>- audio (1549b1fa)<br/>- frames (df8943d3)</div>"]
video_faces["<div style="text-align:left"><b>video/faces</b><br/>52f055e3<br/><font color="#999">---</font><br/>- faces (0561bd8c)</div>"]
video_raw --> video_chunk
video_chunk --> video_faces
Defining features: "video/raw"¶
Each video-like feature in our pipeline is going to have two fields: audio and frames.
Let's set the code version of audio to "1" in order to change it in the future.
frames field will have a default version.
import metaxy as mx
class Video(
mx.BaseFeature,
spec=mx.FeatureSpec(
key="video/raw",
id_columns=["video_id"],
fields=[
mx.FieldSpec(key="audio", code_version="1"),
"frames",
],
),
):
video_id: str
path: str # where the video is stored
Defining features: "video/chunk"¶
"video/chunk" represents a piece of the upstream "video/raw" feature. Since each "video/raw" sample can be split into multiple chunks, we need to tell Metaxy how to map each chunk to its parent video.
class VideoChunk(
mx.BaseFeature,
spec=mx.FeatureSpec(
key=["video", "chunk"],
id_columns=["video_chunk_id"],
deps=[
mx.FeatureDep(
feature=Video,
lineage=mx.LineageRelationship.expansion(on=["video_id"]),
)
],
fields=["audio", "frames"],
),
):
video_id: str # points to the parent video
video_chunk_id: str
path: str # where the video chunk is stored
We do not specify custom versions on its fields. Metaxy will automatically assign field-level lineage by matching on field names: "video/chunk:frames" depends on "video/raw:frames" and "video/chunk:audio" depends on "video/raw:audio".
flowchart LR
subgraph video_raw["video/raw"]
video_raw_audio["audio"]
video_raw_frames["frames"]
end
subgraph video_chunk["video/chunk"]
video_chunk_audio["audio"]
video_chunk_frames["frames"]
end
video_raw_audio --> video_chunk_audio
video_raw_frames --> video_chunk_frames
style video_raw stroke:#4C78A8,stroke-width:2px
style video_chunk stroke:#4C78A8,stroke-width:2px
Defining features: "video/faces"¶
"video/faces" processes video chunks and only depends on the frames field. This can be expressed with a FieldDep.
class FaceRecognition(
mx.BaseFeature,
spec=mx.FeatureSpec(
key=["video", "faces"],
id_columns=["video_chunk_id"],
deps=[VideoChunk],
fields=[
mx.FieldSpec(
key="faces", deps=[mx.FieldDep(feature=VideoChunk, fields=["frames"])]
)
],
),
):
video_chunk_id: str
num_faces: int # number of faces detected
flowchart LR
subgraph video_chunk["video/chunk"]
video_chunk_audio["audio"]
video_chunk_frames["frames"]
end
subgraph video_faces["video/faces"]
video_faces_faces["faces"]
end
video_chunk_frames --> video_faces_faces
style video_chunk stroke:#4C78A8,stroke-width:2px
style video_faces stroke:#4C78A8,stroke-width:2px
This completes the feature definitions. Let's proceed to running the pipeline.
Getting Started¶
Install the example's dependencies:
Walkthrough¶
Here is a toy pipeline for computing the feature graph described above:
pipeline.py
import os
import random
import metaxy as mx
import narwhals as nw
import polars as pl
from example_expansion.features import FaceRecognition, Video, VideoChunk
from example_expansion.utils import split_video_into_chunks
def main():
# Set random seed from environment if provided (for deterministic testing)
if seed_str := os.environ.get("RANDOM_SEED"):
random.seed(int(seed_str))
cfg = mx.init()
store = cfg.get_store("dev")
# let's pretend somebody has already created the videos for us
samples = pl.DataFrame(
{
"video_id": [1, 2, 3],
"path": ["video1.mp4", "video2.mp4", "video3.mp4"],
"metaxy_provenance_by_field": [
{"audio": "v1", "frames": "v1"},
{"audio": "v2", "frames": "v2"},
{"audio": "v3", "frames": "v3"},
],
}
)
with store:
# showcase: resolve incremental update for a root feature
increment = store.resolve_update(Video, samples=nw.from_native(samples))
if len(increment.new) > 0:
print(f"Found {len(increment.new)} new videos")
store.write(Video, increment.new)
# Resolve videos that need to be split into chunks
with store:
increment = store.resolve_update(VideoChunk)
# the DataFrame dimensions matches Video (with ID column renamed)
print(
f"Found {len(increment.new)} new videos and {len(increment.stale)} stale videos that need chunking"
)
for row_dict in pl.concat(
[increment.new.to_polars(), increment.stale.to_polars()]
).iter_rows(named=True):
print(f"Processing video: {row_dict}")
# let's split each video to 3-5 chunks randomly
video_id = row_dict["video_id"]
path = row_dict["path"]
provenance_by_field = row_dict["metaxy_provenance_by_field"]
provenance = row_dict["metaxy_provenance"]
# pretend we split the video into chunks
chunk_paths = split_video_into_chunks(path)
# Generate chunk IDs based on the parent video ID
chunk_ids = [f"{video_id}_{i}" for i in range(len(chunk_paths))]
# write the chunks to the store
# CRUSIAL: all the chunks **must share the same provenance values**
chunk_df = pl.DataFrame(
{
"video_id": [video_id] * len(chunk_paths),
"video_chunk_id": chunk_ids,
"path": chunk_paths,
"metaxy_provenance_by_field": [provenance_by_field]
* len(chunk_paths),
"metaxy_provenance": [provenance] * len(chunk_paths),
}
)
print(f"Writing {len(chunk_paths)} chunks for video {video_id}")
store.write(VideoChunk, nw.from_native(chunk_df))
# Process face recognition on video chunks
with store:
increment = store.resolve_update(FaceRecognition)
print(
f"Found {len(increment.new)} new video chunks and {len(increment.stale)} stale video chunks that need face recognition"
)
if len(increment.new) > 0:
# simulate face detection on each chunk
face_data = []
for row_dict in pl.concat(
[increment.new.to_polars(), increment.stale.to_polars()]
).iter_rows(named=True):
video_chunk_id = row_dict["video_chunk_id"]
provenance_by_field = row_dict["metaxy_provenance_by_field"]
provenance = row_dict["metaxy_provenance"]
# simulate detecting random number of faces
num_faces = random.randint(0, 10)
face_data.append(
{
"video_chunk_id": video_chunk_id,
"num_faces": num_faces,
"metaxy_provenance_by_field": provenance_by_field,
"metaxy_provenance": provenance,
}
)
face_df = pl.DataFrame(face_data)
print(f"Writing face recognition results for {len(face_data)} chunks")
store.write(FaceRecognition, nw.from_native(face_df))
if __name__ == "__main__":
main()
Step 1: Launch Initial Run¶
Run the pipeline to create videos, chunks, and face recognition results:
Found 3 new videos
Found 3 new videos and 0 stale videos that need chunking
Processing video: {'video_id': 1, 'path': 'video1.mp4', 'metaxy_deleted_at': None, 'metaxy_provenance_by_field': {'audio': '23124187', 'frames': '24116928'}, 'metaxy_provenance': '40433844', 'metaxy_data_version': '40433844', 'metaxy_data_version_by_field': {'audio': '23124187', 'frames': '24116928'}}
Writing 5 chunks for video 1
Processing video: {'video_id': 2, 'path': 'video2.mp4', 'metaxy_deleted_at': None, 'metaxy_provenance_by_field': {'audio': '18611808', 'frames': '18345318'}, 'metaxy_provenance': '12032307', 'metaxy_data_version': '12032307', 'metaxy_data_version_by_field': {'audio': '18611808', 'frames': '18345318'}}
Writing 3 chunks for video 2
Processing video: {'video_id': 3, 'path': 'video3.mp4', 'metaxy_deleted_at': None, 'metaxy_provenance_by_field': {'audio': '24638180', 'frames': '16645902'}, 'metaxy_provenance': '21310928', 'metaxy_data_version': '21310928', 'metaxy_data_version_by_field': {'audio': '24638180', 'frames': '16645902'}}
Writing 3 chunks for video 3
Found 11 new video chunks and 0 stale video chunks that need face recognition
Writing face recognition results for 11 chunks
All three features have been materialized. Note that the "video/chunk" feature may dynamically create as many samples as needed: Metaxy doesn't need to know anything about this in advance, except the relationship type.
Step 2: Verify Idempotency¶
Run the pipeline again without any changes:
Found 0 new videos and 0 stale videos that need chunking
Found 0 new video chunks and 0 stale video chunks that need face recognition
Nothing needs recomputation - the system correctly detects no changes.
Step 3: Change Audio Code Version¶
Now let's bump the code version on the audio field of "video/raw" feature:
---
title: Feature Graph Changes
---
flowchart TB
%% Snapshot version: none
%%{init: {'flowchart': {'htmlLabels': true, 'curve': 'basis'}, 'themeVariables': {'fontSize': '14px'}}}%%
video_raw["<div style="text-align:left"><b>video/raw</b><br/><font color="#FF0000">d842b740</font> → <font color="#00FF00">6e5a4ab8</font><br/><font color="#999">---</font><br/>- frames (3f6f401c)<br/>- <font color="#FFAA00">audio</font> (<font color="#FF0000">7132721c</font> → <font color="#00FF00">f8e35296</font>)</div>"]
video_chunk["<div style="text-align:left"><b>video/chunk</b><br/><font color="#FF0000">7dc3712a</font> → <font color="#00FF00">504507a9</font><br/><font color="#999">---</font><br/>- frames (df8943d3)<br/>- <font color="#FFAA00">audio</font> (<font color="#FF0000">1549b1fa</font> → <font color="#00FF00">7948d163</font>)</div>"]
video_faces["<div style="text-align:left"><b>video/faces</b><br/>52f055e3<br/><font color="#999">---</font><br/>- faces (0561bd8c)</div>"]
video_raw --> video_chunk
video_chunk --> video_faces
style video_chunk stroke:#FFAA00,stroke-width:2px
style video_faces stroke:#808080
style video_raw stroke:#FFAA00,stroke-width:2px
This represents updating the audio processing algorithm, and therefore the audio data, while frame data is kept the same.
Step 4: Observe Field-Level Tracking¶
Run the pipeline again after the code change:
Found 3 new videos
Found 3 new videos and 0 stale videos that need chunking
Processing video: {'video_id': 1, 'path': 'video1.mp4', 'metaxy_deleted_at': None, 'metaxy_provenance_by_field': {'audio': '23124187', 'frames': '24116928'}, 'metaxy_provenance': '40433844', 'metaxy_data_version': '40433844', 'metaxy_data_version_by_field': {'audio': '23124187', 'frames': '24116928'}}
Writing 5 chunks for video 1
Processing video: {'video_id': 2, 'path': 'video2.mp4', 'metaxy_deleted_at': None, 'metaxy_provenance_by_field': {'audio': '18611808', 'frames': '18345318'}, 'metaxy_provenance': '12032307', 'metaxy_data_version': '12032307', 'metaxy_data_version_by_field': {'audio': '18611808', 'frames': '18345318'}}
Writing 3 chunks for video 2
Processing video: {'video_id': 3, 'path': 'video3.mp4', 'metaxy_deleted_at': None, 'metaxy_provenance_by_field': {'audio': '24638180', 'frames': '16645902'}, 'metaxy_provenance': '21310928', 'metaxy_data_version': '21310928', 'metaxy_data_version_by_field': {'audio': '24638180', 'frames': '16645902'}}
Writing 3 chunks for video 3
Found 0 new video chunks and 0 stale video chunks that need face recognition
Key observation:
"video/chunk"has been recomputed since theaudiofield on it has been affected by the upstream change"video/faces"did not require a recompute, because it only depends on theframesfield (which did not change)
Conclusion¶
Metaxy provides a convenient API for modeling expansion relationships: LineageRelationship.expansion. Other Metaxy features such as field-level versioning continue to work seamlessly when declaring expansion relationships.
Related materials¶
Learn more about: