Skip to content

graflo.migrate.io

I/O utilities for migration workflows.

full_hash(schema, ingestion_model, bindings)

Stable hash over composed deployment object.

Source code in graflo/migrate/io.py
def full_hash(schema: Schema, ingestion_model: IngestionModel, bindings: Any) -> str:
    """Stable hash over composed deployment object."""
    payload = {
        "schema": schema.to_minimal_canonical_dict(),
        "ingestion": ingestion_model.to_minimal_canonical_dict(),
        "bindings": (
            bindings.to_minimal_canonical_dict()
            if hasattr(bindings, "to_minimal_canonical_dict")
            else (bindings.to_dict() if hasattr(bindings, "to_dict") else bindings)
        ),
    }
    return _stable_hash(payload)

graph_hash(schema)

Stable hash over logical graph model only.

Source code in graflo/migrate/io.py
def graph_hash(schema: Schema) -> str:
    """Stable hash over logical graph model only."""
    return _stable_hash(schema.core_schema.to_minimal_canonical_dict())

ingestion_hash(ingestion_model)

Stable hash over ingestion model (resources + transforms).

Source code in graflo/migrate/io.py
def ingestion_hash(ingestion_model: IngestionModel) -> str:
    """Stable hash over ingestion model (resources + transforms)."""
    return _stable_hash(ingestion_model.to_minimal_canonical_dict())

load_ingestion_model(path, schema=None)

Load ingestion block from a manifest path.

Source code in graflo/migrate/io.py
def load_ingestion_model(
    path: str | Path, schema: Schema | None = None
) -> IngestionModel:
    """Load ingestion block from a manifest path."""
    manifest = load_manifest(path)
    ingestion_model = manifest.require_ingestion_model()
    if schema is not None:
        ingestion_model.finish_init(schema.core_schema)
    return ingestion_model

load_manifest(path)

Load and initialize graph manifest from YAML path.

Source code in graflo/migrate/io.py
def load_manifest(path: str | Path) -> GraphManifest:
    """Load and initialize graph manifest from YAML path."""
    manifest = GraphManifest.from_config(FileHandle.load(path))
    manifest.finish_init()
    return manifest

load_schema(path)

Load schema block from a manifest path.

Source code in graflo/migrate/io.py
def load_schema(path: str | Path) -> Schema:
    """Load schema block from a manifest path."""
    return load_manifest(path).require_schema()

manifest_hash(manifest)

Stable hash over manifest blocks.

Source code in graflo/migrate/io.py
def manifest_hash(manifest: GraphManifest) -> str:
    """Stable hash over manifest blocks."""
    payload = {
        "schema": manifest.graph_schema.to_minimal_canonical_dict()
        if manifest.graph_schema is not None
        else None,
        "ingestion_model": (
            manifest.ingestion_model.to_minimal_canonical_dict()
            if manifest.ingestion_model is not None
            else None
        ),
        "bindings": manifest.bindings.to_minimal_canonical_dict()
        if manifest.bindings is not None
        else None,
    }
    return _stable_hash(payload)

plan_to_json_serializable(plan)

Convert pydantic plan-like object to JSON payload.

Source code in graflo/migrate/io.py
def plan_to_json_serializable(plan: Any) -> dict[str, Any]:
    """Convert pydantic plan-like object to JSON payload."""
    if hasattr(plan, "model_dump"):
        return plan.model_dump()
    if hasattr(plan, "to_dict"):
        return plan.to_dict()
    raise TypeError(f"Unsupported plan object type: {type(plan)}")

schema_hash(schema)

Stable hash over schema deployment contract (graph + DB profile).

Source code in graflo/migrate/io.py
def schema_hash(schema: Schema) -> str:
    """Stable hash over schema deployment contract (graph + DB profile)."""
    payload = {
        "core_schema": schema.core_schema.to_minimal_canonical_dict(),
        "db_profile": schema.db_profile.to_minimal_canonical_dict(),
    }
    return _stable_hash(payload)