Skip to content

graflo.architecture.evolution.apply

Apply manifest evolution operations to a copy of a :class:~graflo.architecture.contract.manifest.GraphManifest.

apply_add_edge_properties(manifest, op)

Append new edge properties to existing relations.

Source code in graflo/architecture/evolution/apply.py
def apply_add_edge_properties(manifest: GraphManifest, op: AddEdgePropertiesOp) -> None:
    """Append new edge properties to existing relations."""
    schema = manifest.graph_schema
    if schema is None:
        raise ValueError("add_edge_properties requires graph_schema")
    for edge in schema.core_schema.edge_config.edges:
        additions = (
            op.additions.get(edge.relation, []) if edge.relation is not None else []
        )
        if not additions:
            continue
        existing = {field.name for field in edge.properties}
        for name in additions:
            if name in existing:
                continue
            edge.properties.append(Field(name=name, type=None))
            existing.add(name)
    schema.finish_init()

apply_add_vertex_properties(manifest, op)

Append new vertex properties to existing vertices.

Source code in graflo/architecture/evolution/apply.py
def apply_add_vertex_properties(
    manifest: GraphManifest, op: AddVertexPropertiesOp
) -> None:
    """Append new vertex properties to existing vertices."""
    schema = manifest.graph_schema
    if schema is None:
        raise ValueError("add_vertex_properties requires graph_schema")
    unknown = set(op.additions) - schema.core_schema.vertex_config.vertex_set
    if unknown:
        raise ValueError(f"add_vertex_properties: unknown vertices: {sorted(unknown)}")
    for vertex in schema.core_schema.vertex_config.vertices:
        additions = op.additions.get(vertex.name, [])
        if not additions:
            continue
        existing = {field.name for field in vertex.properties}
        for name in additions:
            if name in existing:
                continue
            vertex.properties.append(Field(name=name, type=None))
            existing.add(name)
    schema.finish_init()

apply_evolution(manifest, ops, *, bump_version='minor', finish_init=True, strict_references=False, dynamic_edge_feedback=False)

Return a deep copy of manifest with ops applied and optionally re-initialized.

Compare before/after contract identity with :func:graflo.migrate.io.manifest_hash (stable hash over schema, ingestion_model, and bindings blocks).

Source code in graflo/architecture/evolution/apply.py
def apply_evolution(
    manifest: GraphManifest,
    ops: Sequence[
        RemoveVerticesOp
        | MergeVerticesOp
        | RenameVertexPropertiesOp
        | RemoveVertexPropertiesOp
        | AddVertexPropertiesOp
        | RenameVerticesOp
        | RenameRelationsOp
        | RenameResourcesOp
        | RemoveEdgesOp
        | MergeEdgesOp
        | RenameEdgePropertiesOp
        | RemoveEdgePropertiesOp
        | AddEdgePropertiesOp
        | SanitizeOp
    ],
    *,
    bump_version: bool | Literal["minor"] = "minor",
    finish_init: bool = True,
    strict_references: bool = False,
    dynamic_edge_feedback: bool = False,
) -> GraphManifest:
    """Return a deep copy of *manifest* with *ops* applied and optionally re-initialized.

    Compare before/after contract identity with :func:`graflo.migrate.io.manifest_hash`
    (stable hash over schema, ingestion_model, and bindings blocks).
    """
    out = manifest.model_copy(deep=True)

    for op in ops:
        _dispatch_op(out, op)

    _bump_schema_version(out, bump_version)

    if finish_init:
        out.finish_init(
            strict_references=strict_references,
            dynamic_edge_feedback=dynamic_edge_feedback,
        )
    return out

apply_manifest_ops_inplace(manifest, ops)

Apply each evolution op to manifest in place.

Does not copy the manifest, bump schema version, or call :meth:GraphManifest.finish_init. Callers that need re-validation after mutation should invoke finish_init themselves.

Source code in graflo/architecture/evolution/apply.py
def apply_manifest_ops_inplace(
    manifest: GraphManifest,
    ops: Sequence[
        RemoveVerticesOp
        | MergeVerticesOp
        | RenameVertexPropertiesOp
        | RemoveVertexPropertiesOp
        | AddVertexPropertiesOp
        | RenameVerticesOp
        | RenameRelationsOp
        | RenameResourcesOp
        | RemoveEdgesOp
        | MergeEdgesOp
        | RenameEdgePropertiesOp
        | RemoveEdgePropertiesOp
        | AddEdgePropertiesOp
        | SanitizeOp
    ],
) -> None:
    """Apply each evolution op to *manifest* in place.

    Does not copy the manifest, bump schema version, or call :meth:`GraphManifest.finish_init`.
    Callers that need re-validation after mutation should invoke ``finish_init`` themselves.
    """
    for op in ops:
        _dispatch_op(manifest, op)

apply_merge_edges(manifest, op)

Merge edge relation names into one canonical relation.

Source code in graflo/architecture/evolution/apply.py
def apply_merge_edges(manifest: GraphManifest, op: MergeEdgesOp) -> None:
    """Merge edge relation names into one canonical relation."""
    if op.into in set(op.sources):
        raise ValueError("merge_edges: `sources` must not include `into`")
    relation_map = {source: op.into for source in op.sources}
    apply_rename_relations(manifest, RenameRelationsOp(relations=relation_map))
    schema = manifest.graph_schema
    if schema is None:
        return
    merged_edges = remap_relation_and_merge_edges(
        schema.core_schema.edge_config.edges, relation_map
    )
    schema.core_schema = CoreSchema(
        vertex_config=schema.core_schema.vertex_config,
        edge_config=edge_config_from_edges(merged_edges),
    )
    schema.finish_init()

apply_merge_vertices(manifest, op)

Mutate manifest in place: merge source vertices into into.

Source code in graflo/architecture/evolution/apply.py
def apply_merge_vertices(manifest: GraphManifest, op: MergeVerticesOp) -> None:
    """Mutate *manifest* in place: merge source vertices into ``into``."""
    schema = manifest.graph_schema
    if schema is None:
        raise ValueError("merge_vertices requires graph_schema")

    sources = list(op.sources)
    into = op.into
    sset = set(sources)
    if into in sset:
        raise ValueError("merge_vertices: `into` must not appear in `sources`")

    core = schema.core_schema
    new_vc = _build_merged_vertex_config(core.vertex_config, sources, into)
    m = {s: into for s in sources}
    merged_edges = redirect_and_merge_edges(core.edge_config.edges, m)

    schema.core_schema = CoreSchema(
        vertex_config=new_vc,
        edge_config=edge_config_from_edges(merged_edges),
    )
    apply_vertex_merge_to_db_profile(schema.db_profile, sset, into)
    schema.db_profile = _revalidate_db_profile(schema.db_profile)

    if manifest.ingestion_model is not None:
        _rewrite_ingestion_for_merge(manifest.ingestion_model, m)
        manifest.ingestion_model = IngestionModel.model_validate(
            manifest.ingestion_model.to_dict(skip_defaults=False)
        )

apply_remove_edge_properties(manifest, op)

Remove edge properties by relation and clean references.

Source code in graflo/architecture/evolution/apply.py
def apply_remove_edge_properties(
    manifest: GraphManifest, op: RemoveEdgePropertiesOp
) -> None:
    """Remove edge properties by relation and clean references."""
    schema = manifest.graph_schema
    if schema is None:
        raise ValueError("remove_edge_properties requires graph_schema")
    removals = {
        relation: {field for field in fields if isinstance(field, str)}
        for relation, fields in op.removals.items()
    }
    for edge in schema.core_schema.edge_config.edges:
        remove_fields = (
            removals.get(edge.relation, set()) if edge.relation is not None else set()
        )
        if not remove_fields:
            continue
        blocked_tokens = set().union(
            *[
                set(identity) - {"source", "target", "relation"}
                for identity in edge.identities
            ]
        )
        overlap = sorted(blocked_tokens & remove_fields)
        if overlap:
            raise ValueError(
                "remove_edge_properties cannot remove identity fields "
                f"for relation {edge.relation}: {overlap}"
            )
        edge.properties = [
            field for field in edge.properties if field.name not in remove_fields
        ]
    apply_edge_property_removal_to_db_profile(schema.db_profile, removals)
    schema.db_profile = _revalidate_db_profile(schema.db_profile)
    schema.finish_init()
    _rebuild_ingestion_with_pipeline_rewrite(
        manifest,
        lambda pipeline: rewrite_edge_properties_in_pipeline(
            pipeline, removals_by_relation=removals
        ),
    )

apply_remove_edges(manifest, op)

Remove edges by relation name and prune related references.

Source code in graflo/architecture/evolution/apply.py
def apply_remove_edges(manifest: GraphManifest, op: RemoveEdgesOp) -> None:
    """Remove edges by relation name and prune related references."""
    removed = set(op.relations)
    if not removed:
        return
    schema = manifest.graph_schema
    if schema is None:
        raise ValueError("remove_edges requires graph_schema")
    schema.core_schema = CoreSchema(
        vertex_config=schema.core_schema.vertex_config,
        edge_config=EdgeConfig(
            edges=[
                edge
                for edge in schema.core_schema.edge_config.edges
                if edge.relation not in removed
            ]
        ),
    )
    apply_relation_removal_to_db_profile(schema.db_profile, removed)
    schema.db_profile = _revalidate_db_profile(schema.db_profile)
    schema.finish_init()

    if manifest.ingestion_model is not None:
        from graflo.architecture.contract.declarations.resource import Resource

        resources: list[Resource] = []
        for resource in manifest.ingestion_model.resources:
            payload = resource.to_dict(skip_defaults=False)
            pipeline = payload.get("pipeline")
            if isinstance(pipeline, list):
                payload["pipeline"] = rewrite_remove_relations_in_pipeline(
                    pipeline, removed
                )
            for key in ("infer_edge_only", "infer_edge_except"):
                specs = payload.get(key)
                if isinstance(specs, list):
                    payload[key] = [
                        spec
                        for spec in specs
                        if not (
                            isinstance(spec, dict) and spec.get("relation") in removed
                        )
                    ]
            extra_weights = payload.get("extra_weights")
            if isinstance(extra_weights, list):
                payload["extra_weights"] = [
                    entry
                    for entry in extra_weights
                    if not (
                        isinstance(entry, dict)
                        and isinstance(entry.get("edge"), dict)
                        and entry["edge"].get("relation") in removed
                    )
                ]
            resources.append(Resource.model_validate(payload))
        manifest.ingestion_model.resources = resources
        manifest.ingestion_model = IngestionModel.model_validate(
            manifest.ingestion_model.to_dict(skip_defaults=False)
        )

apply_remove_vertex_properties(manifest, op)

Remove vertex properties and clean up ingestion/db profile references.

Source code in graflo/architecture/evolution/apply.py
def apply_remove_vertex_properties(
    manifest: GraphManifest, op: RemoveVertexPropertiesOp
) -> None:
    """Remove vertex properties and clean up ingestion/db profile references."""
    if not op.removals:
        return
    schema = manifest.graph_schema
    if schema is None:
        raise ValueError("remove_vertex_properties requires graph_schema")

    unknown_vertices = sorted(
        set(op.removals) - schema.core_schema.vertex_config.vertex_set
    )
    if unknown_vertices:
        raise ValueError(
            f"remove_vertex_properties: unknown vertices in removals: {unknown_vertices}"
        )

    removals = {
        vertex_name: {field for field in fields if isinstance(field, str)}
        for vertex_name, fields in op.removals.items()
    }
    if not removals:
        return

    for vertex in schema.core_schema.vertex_config.vertices:
        remove_fields = removals.get(vertex.name, set())
        if not remove_fields:
            continue
        identity_overlap = sorted(set(vertex.identity) & remove_fields)
        if identity_overlap:
            raise ValueError(
                "remove_vertex_properties cannot remove identity fields "
                f"for vertex {vertex.name}: {identity_overlap}"
            )
        vertex.properties = [
            field for field in vertex.properties if field.name not in remove_fields
        ]

    for vertex_name, indexes in list(schema.db_profile.vertex_indexes.items()):
        remove_fields = removals.get(vertex_name, set())
        if not remove_fields:
            continue
        updated_indexes = []
        for index in indexes:
            fields = [field for field in index.fields if field not in remove_fields]
            if fields:
                updated_indexes.append(index.model_copy(update={"fields": fields}))
        schema.db_profile.vertex_indexes[vertex_name] = updated_indexes

    for edge_spec in schema.db_profile.edge_specs:
        updated_indexes = []
        for index in edge_spec.indexes:
            fields = list(index.fields)
            source_removals = removals.get(edge_spec.source, set())
            target_removals = removals.get(edge_spec.target, set())
            if source_removals:
                fields = [field for field in fields if field not in source_removals]
            if target_removals:
                fields = [field for field in fields if field not in target_removals]
            if fields:
                updated_indexes.append(index.model_copy(update={"fields": fields}))
        edge_spec.indexes = updated_indexes

    schema.db_profile = _revalidate_db_profile(schema.db_profile)
    schema.finish_init()

    if manifest.ingestion_model is not None:
        _rebuild_ingestion_with_pipeline_rewrite(
            manifest,
            lambda pipeline: rewrite_remove_vertex_properties_in_pipeline(
                pipeline, removals
            ),
        )
        for resource in manifest.ingestion_model.resources:
            if resource.extra_weights:
                for entry in resource.extra_weights:
                    for weight in entry.vertex_weights:
                        if not isinstance(weight.name, str):
                            continue
                        remove_fields = removals.get(weight.name, set())
                        if not remove_fields:
                            continue
                        weight.fields = [
                            field
                            for field in weight.fields
                            if field not in remove_fields
                        ]
                        weight.map = {
                            key: value
                            for key, value in weight.map.items()
                            if key not in remove_fields
                        }
                        weight.filter = {
                            key: value
                            for key, value in weight.filter.items()
                            if key not in remove_fields
                        }

apply_remove_vertices(manifest, op)

Mutate manifest in place: cascade-remove vertices (schema, ingestion, bindings).

Source code in graflo/architecture/evolution/apply.py
def apply_remove_vertices(manifest: GraphManifest, op: RemoveVerticesOp) -> None:
    """Mutate *manifest* in place: cascade-remove vertices (schema, ingestion, bindings)."""
    removed = set(op.names)
    schema = manifest.graph_schema
    if schema is None:
        raise ValueError("remove_vertices requires graph_schema")

    core = schema.core_schema
    missing = removed - core.vertex_config.vertex_set
    if missing:
        raise ValueError(f"Unknown vertices to remove: {sorted(missing)}")

    core.vertex_config.remove_vertices(removed)
    filtered_edges = [
        e
        for e in core.edge_config.edges
        if e.source not in removed and e.target not in removed
    ]
    schema.core_schema = CoreSchema(
        vertex_config=core.vertex_config,
        edge_config=EdgeConfig(edges=filtered_edges),
    )

    apply_vertex_removal_to_db_profile(schema.db_profile, removed)
    schema.db_profile = _revalidate_db_profile(schema.db_profile)

    if manifest.ingestion_model is not None:
        _prune_ingestion_for_removed_vertices(manifest.ingestion_model, removed)
        manifest.ingestion_model = IngestionModel.model_validate(
            manifest.ingestion_model.to_dict(skip_defaults=False)
        )
        surviving = {r.name for r in manifest.ingestion_model.resources}
        _filter_bindings_for_resources(manifest, surviving)

apply_rename_edge_properties(manifest, op)

Rename edge properties by relation and propagate references.

Source code in graflo/architecture/evolution/apply.py
def apply_rename_edge_properties(
    manifest: GraphManifest, op: RenameEdgePropertiesOp
) -> None:
    """Rename edge properties by relation and propagate references."""
    schema = manifest.graph_schema
    if schema is None:
        raise ValueError("rename_edge_properties requires graph_schema")
    for edge in schema.core_schema.edge_config.edges:
        per_relation = (
            op.renames.get(edge.relation, {}) if edge.relation is not None else {}
        )
        if not per_relation:
            continue
        seen: set[str] = set()
        new_properties: list[Field] = []
        for field in edge.properties:
            new_name = per_relation.get(field.name, field.name)
            if new_name in seen:
                continue
            seen.add(new_name)
            new_properties.append(field.model_copy(update={"name": new_name}))
        edge.properties = new_properties
        edge.identities = [
            [
                per_relation.get(token, token)
                if token not in {"source", "target", "relation"}
                else token
                for token in identity
            ]
            for identity in edge.identities
        ]
    apply_edge_property_rename_to_db_profile(schema.db_profile, op.renames)
    schema.db_profile = _revalidate_db_profile(schema.db_profile)
    schema.finish_init()

    _rebuild_ingestion_with_pipeline_rewrite(
        manifest,
        lambda pipeline: rewrite_edge_properties_in_pipeline(
            pipeline, renames_by_relation=op.renames
        ),
    )

apply_rename_relations(manifest, op)

Rename logical relation names across schema/ingestion/db profile.

Source code in graflo/architecture/evolution/apply.py
def apply_rename_relations(manifest: GraphManifest, op: RenameRelationsOp) -> None:
    """Rename logical relation names across schema/ingestion/db profile."""
    _apply_rename_entities(manifest, edge_map=op.relations)
    schema = manifest.graph_schema
    if schema is None:
        return
    apply_relation_rename_to_db_profile(schema.db_profile, op.relations)
    merge_relation_entries_in_db_profile(schema.db_profile)
    schema.db_profile = _revalidate_db_profile(schema.db_profile)
    schema.finish_init()

apply_rename_resources(manifest, op)

Rename ingestion resources and bindings references.

Source code in graflo/architecture/evolution/apply.py
def apply_rename_resources(manifest: GraphManifest, op: RenameResourcesOp) -> None:
    """Rename ingestion resources and bindings references."""
    _apply_rename_entities(manifest, resource_map=op.resources)

apply_rename_vertex_properties(manifest, op)

Rename vertex properties (and their references) across the manifest.

Mutates manifest in place:

  • Rewrites schema Field.name and vertex.identity.
  • Rewrites :class:DatabaseProfile field references (vertex_indexes, edge_specs.indexes, default_property_values).
  • Rewrites resource pipelines so that VertexActor.from covers the rename and TransformActor.rename produces the renamed property (see :func:rewrite_vertex_field_names_in_pipeline).
  • Rewrites Resource.extra_weights / vertex_weights (and any vertex_weights embedded in edge pipeline steps).
Source code in graflo/architecture/evolution/apply.py
def apply_rename_vertex_properties(
    manifest: GraphManifest, op: RenameVertexPropertiesOp
) -> None:
    """Rename vertex properties (and their references) across the manifest.

    Mutates *manifest* in place:

    - Rewrites schema ``Field.name`` and ``vertex.identity``.
    - Rewrites :class:`DatabaseProfile` field references (vertex_indexes,
      edge_specs.indexes, default_property_values).
    - Rewrites resource pipelines so that ``VertexActor.from`` covers the
      rename and ``TransformActor.rename`` produces the renamed property
      (see :func:`rewrite_vertex_field_names_in_pipeline`).
    - Rewrites ``Resource.extra_weights`` / ``vertex_weights`` (and any
      ``vertex_weights`` embedded in ``edge`` pipeline steps).
    """
    if not op.renames:
        return
    schema = manifest.graph_schema
    if schema is None:
        raise ValueError("rename_vertex_properties requires graph_schema")

    unknown = sorted(set(op.renames) - schema.core_schema.vertex_config.vertex_set)
    if unknown:
        raise ValueError(
            f"rename_vertex_properties: unknown vertices in renames: {unknown}"
        )

    _rename_fields_in_schema(schema, op.renames)
    apply_field_rename_to_db_profile(schema.db_profile, op.renames)
    schema.db_profile = _revalidate_db_profile(schema.db_profile)
    schema.finish_init()

    _rebuild_ingestion_with_pipeline_rewrite(
        manifest,
        lambda pipeline: rewrite_vertex_field_names_in_pipeline(pipeline, op.renames),
        vertex_field_renames=op.renames,
    )

apply_rename_vertices(manifest, op)

Rename logical vertex names across schema/ingestion/bindings.

Source code in graflo/architecture/evolution/apply.py
def apply_rename_vertices(manifest: GraphManifest, op: RenameVerticesOp) -> None:
    """Rename logical vertex names across schema/ingestion/bindings."""
    _apply_rename_entities(manifest, vertex_map=op.vertices)

apply_sanitize(manifest, op)

Apply DB-flavor-specific sanitization to manifest in place.

Composes:

  1. Storage-name sanitization on :class:DatabaseProfile.
  2. Reserved-word vertex field renames (via apply_rename_vertex_properties).
  3. TigerGraph identity normalization (cross-relation), propagated to ingestion via the same field-rename code path.
Source code in graflo/architecture/evolution/apply.py
def apply_sanitize(manifest: GraphManifest, op: SanitizeOp) -> None:
    """Apply DB-flavor-specific sanitization to *manifest* in place.

    Composes:

    1. Storage-name sanitization on :class:`DatabaseProfile`.
    2. Reserved-word vertex field renames (via ``apply_rename_vertex_properties``).
    3. TigerGraph identity normalization (cross-relation), propagated to
       ingestion via the same field-rename code path.
    """
    from graflo.db.util import load_reserved_words

    if manifest.graph_schema is None:
        return

    schema = manifest.graph_schema
    if op.reserved_words is not None:
        reserved_words = {word.upper() for word in op.reserved_words}
    else:
        reserved_words = load_reserved_words(op.db_flavor)

    if reserved_words:
        apply_storage_name_sanitization_to_db_profile(
            schema.db_profile, schema, reserved_words
        )
        schema.db_profile = _revalidate_db_profile(schema.db_profile)

        field_renames = compute_vertex_field_renames(schema, reserved_words)
        if field_renames:
            apply_rename_vertex_properties(
                manifest,
                RenameVertexPropertiesOp(renames=field_renames),
            )

    identity_renames = normalize_relation_identity(schema, op.db_flavor)
    if identity_renames:
        apply_field_rename_to_db_profile(schema.db_profile, identity_renames)
        schema.db_profile = _revalidate_db_profile(schema.db_profile)
        schema.finish_init()
        _rebuild_ingestion_with_pipeline_rewrite(
            manifest,
            lambda pipeline: rewrite_vertex_field_names_in_pipeline(
                pipeline, identity_renames
            ),
            vertex_field_renames=identity_renames,
        )