Skip to content

graflo.architecture.schema

CoreSchema

Bases: ConfigBaseModel

Logical graph model (A): vertices and edges.

Source code in graflo/architecture/schema/core.py
class CoreSchema(ConfigBaseModel):
    """Logical graph model (A): vertices and edges."""

    vertex_config: VertexConfig = PydanticField(
        ...,
        description="Configuration for vertex collections (vertices, identities, properties).",
    )
    edge_config: EdgeConfig = PydanticField(
        ...,
        description="Configuration for edge collections (edges, weights).",
    )

    @model_validator(mode="after")
    def _init_graph(self) -> CoreSchema:
        self.finish_init()
        return self

    def finish_init(self) -> None:
        self.vertex_config.finish_init()
        self._validate_edge_vertices_defined()
        self.edge_config.finish_init(self.vertex_config)

    def _validate_edge_vertices_defined(self) -> None:
        """Ensure all edge endpoints reference defined vertex names."""
        declared_vertices = self.vertex_config.vertex_set
        edge_vertices = self.edge_config.vertices
        undefined_vertices = edge_vertices - declared_vertices
        if undefined_vertices:
            undefined_vertices_list = sorted(undefined_vertices)
            declared_vertices_list = sorted(declared_vertices)
            raise ValueError(
                "edge_config references undefined vertices: "
                f"{undefined_vertices_list}. "
                f"Declared vertices: {declared_vertices_list}"
            )

    def remove_disconnected_vertices(self) -> set[str]:
        """Remove disconnected vertices and return removed names."""
        connected = self.edge_config.vertices
        disconnected = self.vertex_config.vertex_set - connected
        if disconnected:
            self.vertex_config.remove_vertices(disconnected)
        return disconnected

remove_disconnected_vertices()

Remove disconnected vertices and return removed names.

Source code in graflo/architecture/schema/core.py
def remove_disconnected_vertices(self) -> set[str]:
    """Remove disconnected vertices and return removed names."""
    connected = self.edge_config.vertices
    disconnected = self.vertex_config.vertex_set - connected
    if disconnected:
        self.vertex_config.remove_vertices(disconnected)
    return disconnected

EdgeConfigDBAware

DB-aware projection wrapper for EdgeConfig.

Source code in graflo/architecture/schema/db_aware.py
class EdgeConfigDBAware:
    """DB-aware projection wrapper for `EdgeConfig`."""

    def __init__(
        self,
        logical: EdgeConfig,
        vertex_config: VertexConfigDBAware,
        database_features: DatabaseProfile,
        ingestion_overlay: EdgeIngestionOverlay | None = None,
    ):
        self.logical = logical
        self.vertex_config = vertex_config
        self.db_profile = database_features
        self.ingestion_overlay = ingestion_overlay

    def _uses_relation_from_key(self, edge_id: EdgeId) -> bool:
        if self.ingestion_overlay is not None:
            return self.ingestion_overlay.uses_relation_from_key(edge_id)
        return False

    @property
    def edges(self) -> list[Edge]:
        return self.logical.edges

    def __iter__(self) -> Iterator[Edge]:
        return self.values()

    def values(self) -> Iterator[Edge]:
        return self.logical.values()

    def items(self) -> Iterator[tuple[EdgeId, Edge]]:
        return self.logical.items()

    @property
    def vertices(self):
        return self.logical.vertices

    def relation_dbname(self, edge: Edge) -> str | None:
        relation = edge.relation
        if self.db_profile.db_flavor == DBType.TIGERGRAPH and relation is None:
            relation = DEFAULT_TIGERGRAPH_RELATION
        return self.db_profile.edge_relation_name(
            edge.edge_id,
            default_relation=relation,
        )

    def effective_weights(self, edge: Edge) -> WeightConfig | None:
        def _as_weight_config() -> WeightConfig | None:
            if not edge.properties:
                return None
            return WeightConfig(
                direct=[f.model_copy(deep=True) for f in edge.properties],
            )

        if self.db_profile.db_flavor != DBType.TIGERGRAPH:
            return _as_weight_config()

        # Typed TigerGraph edge: per-row relation label stored under a stable attribute.
        needs_relation_attr = edge.relation is None or self._uses_relation_from_key(
            edge.edge_id
        )
        if not needs_relation_attr:
            return _as_weight_config()

        base = _as_weight_config() or WeightConfig()
        if DEFAULT_TIGERGRAPH_RELATION_WEIGHTNAME not in base.direct_names:
            base.direct.append(
                Field(
                    name=DEFAULT_TIGERGRAPH_RELATION_WEIGHTNAME, type=FieldType.STRING
                )
            )
        return base

    def runtime(self, edge: Edge) -> EdgeRuntime:
        needs_tg_relation_attr = self.db_profile.db_flavor == DBType.TIGERGRAPH and (
            edge.relation is None or self._uses_relation_from_key(edge.edge_id)
        )
        runtime = EdgeRuntime(
            edge=edge,
            source_storage=self.vertex_config.vertex_dbname(edge.source),
            target_storage=self.vertex_config.vertex_dbname(edge.target),
            relation_name=self.relation_dbname(edge),
            store_extracted_relation_as_weight=needs_tg_relation_attr,
            effective_relation_field=(
                DEFAULT_TIGERGRAPH_RELATION_WEIGHTNAME
                if needs_tg_relation_attr
                else None
            ),
            db_profile=self.db_profile,
        )
        return runtime

    def relationship_merge_property_names(self, edge: Edge) -> list[str]:
        """Relationship properties used for edge upsert/MERGE keys (per backend).

        Uniqueness is ``(source_id, *identity_fields, target_id)`` for the **first**
        logical ``identities`` key (endpoints are matched separately on vertices).
        Additional ``identities`` keys are compiled into separate unique indexes
        via :meth:`compile_identity_indexes` but do not change the writer merge key.

        If that key yields no relationship fields, or ``identities`` is empty,
        falls back to all declared edge attribute names.
        """
        db_flavor = self.db_profile.db_flavor
        if edge.identities:
            props = self._identity_tokens_to_relationship_properties(
                edge.identities[0], db_flavor
            )
            if props:
                return props
        if edge.property_names:
            return list(edge.property_names)
        return []

    @staticmethod
    def _identity_tokens_to_relationship_properties(
        identity_key: list[str], db_flavor: DBType
    ) -> list[str]:
        fields: list[str] = []
        for token in identity_key:
            if token in ("source", "target"):
                continue
            if token == "relation":
                if db_flavor != DBType.TIGERGRAPH:
                    fields.append("relation")
                continue
            fields.append(token)
        deduped: list[str] = []
        for field in fields:
            if field not in deduped:
                deduped.append(field)
        return deduped

    def compile_identity_indexes(self) -> None:
        db_flavor = self.db_profile.db_flavor
        for edge in self.logical.edges:
            for identity_key in edge.identities:
                identity_fields = self._identity_key_index_fields(
                    identity_key, db_flavor
                )
                if not identity_fields:
                    continue
                fields, unique = self._normalize_edge_identity_index(
                    identity_fields, db_flavor
                )
                if not fields:
                    continue
                self.db_profile.add_edge_index(
                    edge.edge_id,
                    Index(fields=fields, unique=unique),
                    purpose=None,
                )

    def _identity_key_index_fields(
        self, identity_key: list[str], db_flavor: DBType
    ) -> list[str]:
        fields: list[str] = []
        for token in identity_key:
            if token == "source":
                if db_flavor == DBType.ARANGO:
                    fields.append("_from")
            elif token == "target":
                if db_flavor == DBType.ARANGO:
                    fields.append("_to")
            elif token == "relation":
                if db_flavor != DBType.TIGERGRAPH:
                    fields.append("relation")
            else:
                fields.append(token)
        deduped: list[str] = []
        for field in fields:
            if field not in deduped:
                deduped.append(field)
        return deduped

    @staticmethod
    def _normalize_edge_identity_index(
        fields: list[str], db_flavor: DBType
    ) -> tuple[list[str], bool]:
        """Map logical edge identity to physical index fields and DB uniqueness.

        Logical uniqueness is always ``(source, *relationship_fields, target)``.

        * **ArangoDB** — Edge documents carry ``_from`` / ``_to``. Unique persistent
          indexes must include them before other fields, even when the YAML
          ``identities`` entry lists only relationship tokens (e.g. ``_role``).
        * **Neo4j, FalkorDB, Memgraph, Nebula** — Indexed columns are relationship /
          edge-type properties only; they cannot express endpoint scope. We still
          register the property fields for lookups but set ``unique=False`` so the
          database is not asked to enforce a misleading global uniqueness on those
          properties alone. (Application MERGE / ingest semantics remain authoritative.)
        * **TigerGraph** — Edge secondary indexes are not applied by the driver today;
          fields are kept for profiling; uniqueness is preserved for consistency.
        """
        rest = [f for f in fields if f not in ("_from", "_to")]
        if db_flavor == DBType.ARANGO:
            return (["_from", "_to", *rest], True)
        if db_flavor in (
            DBType.NEO4J,
            DBType.FALKORDB,
            DBType.MEMGRAPH,
            DBType.NEBULA,
        ):
            return (fields, False)
        return (fields, True)

relationship_merge_property_names(edge)

Relationship properties used for edge upsert/MERGE keys (per backend).

Uniqueness is (source_id, *identity_fields, target_id) for the first logical identities key (endpoints are matched separately on vertices). Additional identities keys are compiled into separate unique indexes via :meth:compile_identity_indexes but do not change the writer merge key.

If that key yields no relationship fields, or identities is empty, falls back to all declared edge attribute names.

Source code in graflo/architecture/schema/db_aware.py
def relationship_merge_property_names(self, edge: Edge) -> list[str]:
    """Relationship properties used for edge upsert/MERGE keys (per backend).

    Uniqueness is ``(source_id, *identity_fields, target_id)`` for the **first**
    logical ``identities`` key (endpoints are matched separately on vertices).
    Additional ``identities`` keys are compiled into separate unique indexes
    via :meth:`compile_identity_indexes` but do not change the writer merge key.

    If that key yields no relationship fields, or ``identities`` is empty,
    falls back to all declared edge attribute names.
    """
    db_flavor = self.db_profile.db_flavor
    if edge.identities:
        props = self._identity_tokens_to_relationship_properties(
            edge.identities[0], db_flavor
        )
        if props:
            return props
    if edge.property_names:
        return list(edge.property_names)
    return []

EdgeRuntime dataclass

Resolved DB-facing runtime data for one logical edge.

Source code in graflo/architecture/schema/db_aware.py
@dataclass(frozen=True)
class EdgeRuntime:
    """Resolved DB-facing runtime data for one logical edge."""

    edge: Edge
    source_storage: str
    target_storage: str
    relation_name: str | None
    store_extracted_relation_as_weight: bool
    effective_relation_field: str | None
    db_profile: DatabaseProfile

    @property
    def edge_id(self) -> EdgeId:
        return self.edge.edge_id

    def storage_name(self, *, purpose: str | None = None) -> str | None:
        return self.db_profile.edge_storage_name(
            self.edge.edge_id,
            source_storage=self.source_storage,
            target_storage=self.target_storage,
            purpose=purpose,
        )

    def graph_name(self, *, purpose: str | None = None) -> str | None:
        return self.db_profile.edge_graph_name(
            self.edge.edge_id,
            source_storage=self.source_storage,
            target_storage=self.target_storage,
            purpose=purpose,
        )

    def physical_variants(self) -> list[dict[str, str | None | list[Index]]]:
        return self.db_profile.edge_physical_variants(
            self.edge.edge_id,
            source_storage=self.source_storage,
            target_storage=self.target_storage,
        )

GraphMetadata

Bases: ConfigBaseModel

Schema metadata and versioning information.

Holds metadata about the schema, including its name, version, and description. Used for schema identification and versioning. Suitable for LLM-generated schema constituents.

Source code in graflo/architecture/schema/metadata.py
class GraphMetadata(ConfigBaseModel):
    """Schema metadata and versioning information.

    Holds metadata about the schema, including its name, version, and
    description.  Used for schema identification and versioning.
    Suitable for LLM-generated schema constituents.
    """

    name: str = PydanticField(
        ...,
        description="Name of the schema (e.g. graph or database identifier).",
    )
    version: str | None = PydanticField(
        default=None,
        description="Semantic version of the schema (e.g. '1.0.0', '2.1.3-beta+build.42').",
    )
    description: str | None = PydanticField(
        default=None,
        description="Optional human-readable description of the schema.",
    )

    @field_validator("version")
    @classmethod
    def _validate_semver(cls, v: str | None) -> str | None:
        if v is not None and not _SEMVER_RE.match(v):
            raise ValueError(
                f"version '{v}' is not a valid semantic version "
                f"(expected MAJOR.MINOR.PATCH[-prerelease][+build])"
            )
        return v

Schema

Bases: ConfigBaseModel

Graph schema (A+B): metadata, core schema, and DB profile.

Source code in graflo/architecture/schema/document.py
class Schema(ConfigBaseModel):
    """Graph schema (A+B): metadata, core schema, and DB profile."""

    metadata: GraphMetadata = PydanticField(
        ...,
        description="Schema metadata and versioning (name, version).",
    )
    core_schema: CoreSchema = PydanticField(
        ...,
        description="Core schema model (vertices + edges).",
        validation_alias=AliasChoices("core_schema", "graph"),
    )
    db_profile: DatabaseProfile = PydanticField(
        default_factory=DatabaseProfile,
        description=(
            "Database-specific physical profile (secondary indexes, naming, TigerGraph GSQL "
            "DEFAULT overrides via default_property_values, etc.)."
        ),
    )

    @model_validator(mode="after")
    def _init_schema(self) -> Schema:
        self.finish_init()
        return self

    def finish_init(self) -> None:
        self.core_schema.finish_init()

    def remove_disconnected_vertices(self) -> set[str]:
        return self.core_schema.remove_disconnected_vertices()

    def resolve_db_aware(self, db_flavor: DBType | None = None) -> SchemaDBAware:
        """Build DB-aware runtime wrappers without mutating logical schema."""
        from .db_aware import (
            EdgeConfigDBAware,
            SchemaDBAware,
            VertexConfigDBAware,
        )

        if db_flavor is not None:
            self.db_profile.db_flavor = db_flavor

        vertex_db = VertexConfigDBAware(self.core_schema.vertex_config, self.db_profile)
        edge_db = EdgeConfigDBAware(
            self.core_schema.edge_config, vertex_db, self.db_profile
        )
        edge_db.compile_identity_indexes()
        return SchemaDBAware(
            vertex_config=vertex_db,
            edge_config=edge_db,
            db_profile=self.db_profile,
        )

    @staticmethod
    def _slug_filename_token(token: str) -> str:
        """Normalize arbitrary token into filename-safe slug."""
        cleaned = re.sub(r"[^A-Za-z0-9._-]+", "-", token.strip())
        return cleaned.strip("-") or "schema"

    def default_dump_filename(self) -> str:
        """Return default schema dump filename: <name>-<version>.yaml."""
        schema_name = self._slug_filename_token(self.metadata.name)
        version = (
            self.metadata.version
            if self.metadata.version is not None
            else "unversioned"
        )
        schema_version = self._slug_filename_token(version)
        return f"{schema_name}-{schema_version}.yaml"

    def dump(
        self,
        path: str | pathlib.Path | None = None,
        *,
        exclude_defaults: bool = True,
    ) -> pathlib.Path:
        """Dump schema YAML to path, excluding defaults by default.

        If path is omitted, writes into current working directory using
        `<schema_name>-<version>.yaml`.
        """
        if path is None:
            target_path = pathlib.Path.cwd() / self.default_dump_filename()
        else:
            target_path = pathlib.Path(path)
            if target_path.is_dir():
                target_path = target_path / self.default_dump_filename()

        if exclude_defaults:
            payload = self.to_minimal_canonical_dict()
        else:
            payload = self.to_dict(skip_defaults=False)
        target_path.parent.mkdir(parents=True, exist_ok=True)
        target_path.write_text(
            yaml.safe_dump(
                payload,
                default_flow_style=False,
                sort_keys=False,
            ),
            encoding="utf-8",
        )
        return target_path

default_dump_filename()

Return default schema dump filename: -.yaml.

Source code in graflo/architecture/schema/document.py
def default_dump_filename(self) -> str:
    """Return default schema dump filename: <name>-<version>.yaml."""
    schema_name = self._slug_filename_token(self.metadata.name)
    version = (
        self.metadata.version
        if self.metadata.version is not None
        else "unversioned"
    )
    schema_version = self._slug_filename_token(version)
    return f"{schema_name}-{schema_version}.yaml"

dump(path=None, *, exclude_defaults=True)

Dump schema YAML to path, excluding defaults by default.

If path is omitted, writes into current working directory using <schema_name>-<version>.yaml.

Source code in graflo/architecture/schema/document.py
def dump(
    self,
    path: str | pathlib.Path | None = None,
    *,
    exclude_defaults: bool = True,
) -> pathlib.Path:
    """Dump schema YAML to path, excluding defaults by default.

    If path is omitted, writes into current working directory using
    `<schema_name>-<version>.yaml`.
    """
    if path is None:
        target_path = pathlib.Path.cwd() / self.default_dump_filename()
    else:
        target_path = pathlib.Path(path)
        if target_path.is_dir():
            target_path = target_path / self.default_dump_filename()

    if exclude_defaults:
        payload = self.to_minimal_canonical_dict()
    else:
        payload = self.to_dict(skip_defaults=False)
    target_path.parent.mkdir(parents=True, exist_ok=True)
    target_path.write_text(
        yaml.safe_dump(
            payload,
            default_flow_style=False,
            sort_keys=False,
        ),
        encoding="utf-8",
    )
    return target_path

resolve_db_aware(db_flavor=None)

Build DB-aware runtime wrappers without mutating logical schema.

Source code in graflo/architecture/schema/document.py
def resolve_db_aware(self, db_flavor: DBType | None = None) -> SchemaDBAware:
    """Build DB-aware runtime wrappers without mutating logical schema."""
    from .db_aware import (
        EdgeConfigDBAware,
        SchemaDBAware,
        VertexConfigDBAware,
    )

    if db_flavor is not None:
        self.db_profile.db_flavor = db_flavor

    vertex_db = VertexConfigDBAware(self.core_schema.vertex_config, self.db_profile)
    edge_db = EdgeConfigDBAware(
        self.core_schema.edge_config, vertex_db, self.db_profile
    )
    edge_db.compile_identity_indexes()
    return SchemaDBAware(
        vertex_config=vertex_db,
        edge_config=edge_db,
        db_profile=self.db_profile,
    )

SchemaDBAware dataclass

DB-aware schema runtime view.

Source code in graflo/architecture/schema/db_aware.py
@dataclass(frozen=True)
class SchemaDBAware:
    """DB-aware schema runtime view."""

    vertex_config: VertexConfigDBAware
    edge_config: EdgeConfigDBAware
    db_profile: DatabaseProfile

VertexConfigDBAware

DB-aware projection wrapper for VertexConfig.

Source code in graflo/architecture/schema/db_aware.py
class VertexConfigDBAware:
    """DB-aware projection wrapper for `VertexConfig`."""

    def __init__(self, logical: VertexConfig, database_features: DatabaseProfile):
        self.logical = logical
        self.db_profile = database_features

    @property
    def vertex_set(self):
        return self.logical.vertex_set

    @property
    def blank_vertices(self):
        return self.logical.blank_vertices

    @property
    def vertices(self):
        return self.logical.vertices

    def vertex_dbname(self, vertex_name: str) -> str:
        return self.db_profile.vertex_storage_name(vertex_name)

    def index(self, vertex_name: str) -> Index:
        """Get primary index for a vertex (DB layer needs Index for collection setup)."""
        return Index(fields=self.identity_fields(vertex_name))

    def identity_fields(self, vertex_name: str) -> list[str]:
        identity = self.logical.identity_fields(vertex_name)
        if identity:
            return identity
        if vertex_name in self.logical.blank_vertices:
            return ["_key"] if self.db_profile.db_flavor == DBType.ARANGO else ["id"]
        return identity

    def properties(self, vertex_name: str) -> list[Field]:
        props = self.logical.properties(vertex_name)
        if self.db_profile.db_flavor != DBType.TIGERGRAPH:
            return props
        # TigerGraph needs explicit scalar defaults for schema definition.
        return [
            Field(name=f.name, type=FieldType.STRING if f.type is None else f.type)
            for f in props
        ]

    def property_names(self, vertex_name: str) -> list[str]:
        return [f.name for f in self.properties(vertex_name)]

index(vertex_name)

Get primary index for a vertex (DB layer needs Index for collection setup).

Source code in graflo/architecture/schema/db_aware.py
def index(self, vertex_name: str) -> Index:
    """Get primary index for a vertex (DB layer needs Index for collection setup)."""
    return Index(fields=self.identity_fields(vertex_name))