Metaxy + Apache Iceberg¶
Apache Iceberg is an open table format for large analytic datasets supporting ACID transactions and schema evolution. Use IcebergMetadataStore to read and write Metaxy metadata from and to Iceberg tables. This metadata store is built on top of PyIceberg and uses the in-memory Polars versioning engine for versioning computations.
Note
By default, it uses a SQLite-backed SQL catalog for local development. You can configure any PyIceberg-supported catalog (REST, Glue, Hive) via catalog_properties.
Recommended: enable Map datatype
Apache Iceberg supports the Map type natively. Enabling enable_map_datatype preserves Map columns across read and write operations.
Installation¶
API Reference¶
metaxy.ext.polars.handlers.iceberg
¶
Apache Iceberg metadata store implemented with PyIceberg.
metaxy.ext.polars.IcebergMetadataStore
¶
IcebergMetadataStore(
warehouse: str | Path,
*,
namespace: str = "metaxy",
catalog_name: str = "metaxy",
catalog_properties: dict[str, str] | None = None,
auto_create_namespace: bool = True,
fallback_stores: list[MetadataStore] | None = None,
**kwargs: Any,
)
Bases: MetadataStore
Apache Iceberg metadata store backed by PyIceberg.
Stores feature metadata in Iceberg tables managed by a PyIceberg catalog. It uses the Polars versioning engine for provenance calculations.
Tip
If Polars 1.39 or greater is installed, lazy Polars frames are sinked via
LazyFrame.sink_iceberg, avoiding unnecessary materialization.
Example:
```py
from metaxy.ext.polars.handlers.iceberg import IcebergMetadataStore
store = IcebergMetadataStore(
warehouse="s3://my-bucket/warehouse",
namespace="ml_features",
catalog_properties={"type": "glue"},
)
```
Parameters:
-
warehouse(str | Path) โWarehouse directory or URI for Iceberg data files.
-
namespace(str, default:'metaxy') โIceberg namespace for tables (Glue Database, SQL schema, etc.).
-
catalog_name(str, default:'metaxy') โLocal identifier for the PyIceberg catalog instance.
-
catalog_properties(dict[str, str] | None, default:None) โProperties for
pyiceberg.catalog.load_catalog. -
auto_create_namespace(bool, default:True) โCreate the namespace on first write if it does not exist.
-
fallback_stores(list[MetadataStore] | None, default:None) โOrdered list of read-only fallback stores.
-
**kwargs(Any, default:{}) โForwarded to metaxy.metadata_store.base.MetadataStore.
Source code in src/metaxy/ext/polars/handlers/iceberg.py
def __init__(
self,
warehouse: str | Path,
*,
namespace: str = "metaxy",
catalog_name: str = "metaxy",
catalog_properties: dict[str, str] | None = None,
auto_create_namespace: bool = True,
fallback_stores: list[MetadataStore] | None = None,
**kwargs: Any,
) -> None:
"""Initialize Apache Iceberg metadata store.
Args:
warehouse: Warehouse directory or URI for Iceberg data files.
namespace: Iceberg namespace for tables (Glue Database, SQL schema, etc.).
catalog_name: Local identifier for the PyIceberg catalog instance.
catalog_properties: Properties for [`pyiceberg.catalog.load_catalog`][pyiceberg.catalog.load_catalog].
auto_create_namespace: Create the namespace on first write if it does not exist.
fallback_stores: Ordered list of read-only fallback stores.
**kwargs: Forwarded to [metaxy.metadata_store.base.MetadataStore][metaxy.metadata_store.base.MetadataStore].
"""
self.namespace = namespace
self.catalog_name = catalog_name
self.auto_create_namespace = auto_create_namespace
self._catalog: Catalog | None = None
warehouse_str = str(warehouse)
self._is_remote = not is_local_path(warehouse_str)
if self._is_remote:
self._warehouse_uri = warehouse_str.rstrip("/")
else:
if warehouse_str.startswith("file://"):
warehouse_str = warehouse_str[7:]
elif warehouse_str.startswith("local://"):
warehouse_str = warehouse_str[8:]
self._warehouse_uri = Path(warehouse_str).expanduser().resolve().as_posix()
self._catalog_properties = catalog_properties or {
"type": "sql",
"uri": f"sqlite:///{self._warehouse_uri}/catalog.db",
"warehouse": "file://" + self._warehouse_uri,
}
super().__init__(
fallback_stores=fallback_stores,
versioning_engine="polars",
**kwargs,
)
Configuration¶
Configuration for IcebergMetadataStore.
Example
Show JSON schema:
{
"$defs": {
"HashAlgorithm": {
"description": "Supported hash algorithms for field provenance calculation.\n\nThese algorithms are chosen for:\n\n- Speed (non-cryptographic hashes preferred)\n\n- Cross-database availability\n\n- Good collision resistance for field provenance calculation",
"enum": [
"xxhash64",
"xxhash32",
"wyhash",
"sha256",
"md5",
"farmhash"
],
"title": "HashAlgorithm",
"type": "string"
}
},
"additionalProperties": false,
"description": "Configuration for IcebergMetadataStore.\n\nExample:\n ```toml title=\"metaxy.toml\"\n [stores.dev]\n type = \"metaxy.ext.polars.IcebergMetadataStore\"\n\n [stores.dev.config]\n warehouse = \"/path/to/warehouse\"\n namespace = \"metaxy\"\n\n [stores.dev.config.catalog_properties]\n type = \"sql\"\n ```",
"properties": {
"fallback_stores": {
"description": "List of fallback store names to search when features are not found in the current store.",
"items": {
"type": "string"
},
"title": "Fallback Stores",
"type": "array"
},
"hash_algorithm": {
"anyOf": [
{
"$ref": "#/$defs/HashAlgorithm"
},
{
"type": "null"
}
],
"default": null,
"description": "Hash algorithm for versioning. If None, uses store's default."
},
"versioning_engine": {
"default": "auto",
"description": "Which versioning engine to use: 'auto' (prefer native), 'native', or 'polars'.",
"enum": [
"auto",
"native",
"polars"
],
"title": "Versioning Engine",
"type": "string"
},
"warehouse": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
}
],
"description": "Warehouse directory or URI where Iceberg tables are stored.",
"title": "Warehouse"
},
"namespace": {
"default": "metaxy",
"description": "Iceberg namespace for feature tables.",
"title": "Namespace",
"type": "string"
},
"catalog_name": {
"default": "metaxy",
"description": "Name of the Iceberg catalog.",
"title": "Catalog Name",
"type": "string"
},
"catalog_properties": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"description": "Properties passed to pyiceberg.catalog.load_catalog.",
"title": "Catalog Properties"
},
"auto_create_namespace": {
"default": true,
"description": "Automatically create the namespace on first write if it does not exist.",
"title": "Auto Create Namespace",
"type": "boolean"
}
},
"required": [
"warehouse"
],
"title": "IcebergMetadataStoreConfig",
"type": "object"
}
fallback_stores
pydantic-field
¶
List of fallback store names to search when features are not found in the current store.
hash_algorithm
pydantic-field
¶
hash_algorithm: HashAlgorithm | None = None
Hash algorithm for versioning. If None, uses store's default.
versioning_engine
pydantic-field
¶
versioning_engine: Literal["auto", "native", "polars"] = (
"auto"
)
Which versioning engine to use: 'auto' (prefer native), 'native', or 'polars'.
warehouse
pydantic-field
¶
Warehouse directory or URI where Iceberg tables are stored.
catalog_properties
pydantic-field
¶
Properties passed to pyiceberg.catalog.load_catalog.