Skip to content

graflo.architecture.evolution.sanitize

Internal helpers for :class:~graflo.architecture.evolution.ops.SanitizeOp.

Hosts the analytic / planning side of sanitization: pure functions that compute which fields must be renamed and the per-vertex field rewrite map produced by TigerGraph's "consistent identity per relation" constraint.

The actual mutation lives in :mod:graflo.architecture.evolution.apply (apply_sanitize and apply_rename_vertex_properties) so the same code paths drive both SanitizeOp and the standalone RenameVertexPropertiesOp.

compute_vertex_field_renames(schema, reserved_words)

Compute per-vertex field rename map for a flavor's reserved-word set.

Pure: returns {vertex_name: {old_field: new_field}} without mutating schema. Vertices/fields whose names are not reserved are absent from the result.

Source code in graflo/architecture/evolution/sanitize.py
def compute_vertex_field_renames(
    schema: Schema,
    reserved_words: set[str],
) -> dict[str, dict[str, str]]:
    """Compute per-vertex field rename map for a flavor's reserved-word set.

    Pure: returns ``{vertex_name: {old_field: new_field}}`` without mutating
    ``schema``. Vertices/fields whose names are not reserved are absent from
    the result.
    """
    from graflo.db.util import sanitize_attribute_name

    renames: dict[str, dict[str, str]] = {}
    if not reserved_words:
        return renames

    for vertex in schema.core_schema.vertex_config.vertices:
        per_vertex: dict[str, str] = {}
        for field in vertex.properties:
            sanitized = sanitize_attribute_name(field.name, reserved_words)
            if sanitized != field.name:
                per_vertex[field.name] = sanitized
        if per_vertex:
            renames[vertex.name] = per_vertex
    return renames

normalize_relation_identity(schema, db_flavor)

For TigerGraph: align identity fields across edges sharing a relation.

Returns a per-vertex {old_field: new_field} map describing the schema- side changes performed. Caller is responsible for propagating the same map to ingestion via rewrite_vertex_field_names_in_pipeline.

For non-TigerGraph flavors this is a no-op and returns an empty dict.

Source code in graflo/architecture/evolution/sanitize.py
def normalize_relation_identity(
    schema: Schema,
    db_flavor: DBType,
) -> dict[str, dict[str, str]]:
    """For TigerGraph: align identity fields across edges sharing a relation.

    Returns a per-vertex ``{old_field: new_field}`` map describing the schema-
    side changes performed. Caller is responsible for propagating the same
    map to ingestion via ``rewrite_vertex_field_names_in_pipeline``.

    For non-TigerGraph flavors this is a no-op and returns an empty dict.
    """
    field_renames: dict[str, dict[str, str]] = {}
    if db_flavor != DBType.TIGERGRAPH:
        return field_renames

    edges_by_relation: dict[str | None, list[Edge]] = {}
    for edge in schema.core_schema.edge_config.edges:
        relation = (
            schema.db_profile.edge_relation_name(
                edge.edge_id,
                default_relation=edge.relation,
            )
            or edge.relation
        )
        edges_by_relation.setdefault(relation, []).append(edge)

    for relation, relation_edges in edges_by_relation.items():
        if len(relation_edges) <= 1:
            continue

        source_indexes: list[tuple[str, tuple[str, ...]]] = []
        target_indexes: list[tuple[str, tuple[str, ...]]] = []
        for edge in relation_edges:
            source_indexes.append(
                (
                    edge.source,
                    tuple(
                        schema.core_schema.vertex_config.identity_fields(edge.source)
                    ),
                )
            )
            target_indexes.append(
                (
                    edge.target,
                    tuple(
                        schema.core_schema.vertex_config.identity_fields(edge.target)
                    ),
                )
            )

        _normalize_role_indexes(
            source_indexes,
            schema,
            field_renames,
            relation,
            role="source",
        )
        _normalize_role_indexes(
            target_indexes,
            schema,
            field_renames,
            relation,
            role="target",
        )

    return field_renames