Skip to content

graflo.architecture.schema

CoreSchema

Bases: ConfigBaseModel

Logical graph model (A): vertices and edges.

Source code in graflo/architecture/schema/core.py
class CoreSchema(ConfigBaseModel):
    """Logical graph model (A): vertices and edges."""

    vertex_config: VertexConfig = PydanticField(
        ...,
        description="Configuration for vertex collections (vertices, identities, fields).",
    )
    edge_config: EdgeConfig = PydanticField(
        ...,
        description="Configuration for edge collections (edges, weights).",
    )

    @model_validator(mode="after")
    def _init_graph(self) -> CoreSchema:
        self.finish_init()
        return self

    def finish_init(self) -> None:
        self.vertex_config.finish_init()
        self._validate_edge_vertices_defined()
        self.edge_config.finish_init(self.vertex_config)

    def _validate_edge_vertices_defined(self) -> None:
        """Ensure all edge endpoints reference defined vertex names."""
        declared_vertices = self.vertex_config.vertex_set
        edge_vertices = self.edge_config.vertices
        undefined_vertices = edge_vertices - declared_vertices
        if undefined_vertices:
            undefined_vertices_list = sorted(undefined_vertices)
            declared_vertices_list = sorted(declared_vertices)
            raise ValueError(
                "edge_config references undefined vertices: "
                f"{undefined_vertices_list}. "
                f"Declared vertices: {declared_vertices_list}"
            )

    def remove_disconnected_vertices(self) -> set[str]:
        """Remove disconnected vertices and return removed names."""
        connected = self.edge_config.vertices
        disconnected = self.vertex_config.vertex_set - connected
        if disconnected:
            self.vertex_config.remove_vertices(disconnected)
        return disconnected

remove_disconnected_vertices()

Remove disconnected vertices and return removed names.

Source code in graflo/architecture/schema/core.py
def remove_disconnected_vertices(self) -> set[str]:
    """Remove disconnected vertices and return removed names."""
    connected = self.edge_config.vertices
    disconnected = self.vertex_config.vertex_set - connected
    if disconnected:
        self.vertex_config.remove_vertices(disconnected)
    return disconnected

EdgeConfigDBAware

DB-aware projection wrapper for EdgeConfig.

Source code in graflo/architecture/schema/db_aware.py
class EdgeConfigDBAware:
    """DB-aware projection wrapper for `EdgeConfig`."""

    def __init__(
        self,
        logical: EdgeConfig,
        vertex_config: VertexConfigDBAware,
        database_features: DatabaseProfile,
    ):
        self.logical = logical
        self.vertex_config = vertex_config
        self.db_profile = database_features

    @property
    def edges(self) -> list[Edge]:
        return self.logical.edges

    def __iter__(self) -> Iterator[Edge]:
        return self.values()

    def values(self) -> Iterator[Edge]:
        return self.logical.values()

    def items(self) -> Iterator[tuple[EdgeId, Edge]]:
        return self.logical.items()

    @property
    def vertices(self):
        return self.logical.vertices

    def relation_dbname(self, edge: Edge) -> str | None:
        relation = edge.relation
        if self.db_profile.db_flavor == DBType.TIGERGRAPH and relation is None:
            relation = DEFAULT_TIGERGRAPH_RELATION
        return self.db_profile.edge_relation_name(
            edge.edge_id,
            default_relation=relation,
        )

    def effective_weights(self, edge: Edge) -> WeightConfig | None:
        if self.db_profile.db_flavor != DBType.TIGERGRAPH:
            return edge.weights

        relation_field = edge.relation_field
        if relation_field is None and edge.relation_from_key:
            relation_field = DEFAULT_TIGERGRAPH_RELATION_WEIGHTNAME

        if relation_field is None:
            return edge.weights

        base = (
            edge.weights.model_copy(deep=True)
            if edge.weights is not None
            else WeightConfig()
        )
        if relation_field not in base.direct_names:
            base.direct.append(Field(name=relation_field, type=FieldType.STRING))
        return base

    def runtime(self, edge: Edge) -> EdgeRuntime:
        runtime = EdgeRuntime(
            edge=edge,
            source_storage=self.vertex_config.vertex_dbname(edge.source),
            target_storage=self.vertex_config.vertex_dbname(edge.target),
            relation_name=self.relation_dbname(edge),
            store_extracted_relation_as_weight=(
                self.db_profile.db_flavor == DBType.TIGERGRAPH
            ),
            effective_relation_field=(
                edge.relation_field
                if edge.relation_field is not None
                else (
                    DEFAULT_TIGERGRAPH_RELATION_WEIGHTNAME
                    if self.db_profile.db_flavor == DBType.TIGERGRAPH
                    and edge.relation_from_key
                    else None
                )
            ),
            db_profile=self.db_profile,
        )
        return runtime

    def relationship_merge_property_names(self, edge: Edge) -> list[str]:
        """Relationship properties that distinguish parallel edges (Cypher MERGE, etc.).

        Uses the first logical ``identities`` key when present (endpoints omitted —
        they are already matched on nodes). If that key yields no relationship
        fields, or ``identities`` is empty, falls back to all direct weight names.
        """
        db_flavor = self.db_profile.db_flavor
        if edge.identities:
            props = self._identity_tokens_to_relationship_properties(
                edge.identities[0], db_flavor
            )
            if props:
                return props
        if edge.weights is not None and edge.weights.direct_names:
            return list(edge.weights.direct_names)
        return []

    @staticmethod
    def _identity_tokens_to_relationship_properties(
        identity_key: list[str], db_flavor: DBType
    ) -> list[str]:
        fields: list[str] = []
        for token in identity_key:
            if token in ("source", "target"):
                continue
            if token == "relation":
                if db_flavor != DBType.TIGERGRAPH:
                    fields.append("relation")
                continue
            fields.append(token)
        deduped: list[str] = []
        for field in fields:
            if field not in deduped:
                deduped.append(field)
        return deduped

    def compile_identity_indexes(self) -> None:
        db_flavor = self.db_profile.db_flavor
        for edge in self.logical.edges:
            for identity_key in edge.identities:
                identity_fields = self._identity_key_index_fields(
                    identity_key, db_flavor
                )
                if not identity_fields:
                    continue
                self.db_profile.add_edge_index(
                    edge.edge_id,
                    Index(fields=identity_fields, unique=True),
                    purpose=None,
                )

    def _identity_key_index_fields(
        self, identity_key: list[str], db_flavor: DBType
    ) -> list[str]:
        fields: list[str] = []
        for token in identity_key:
            if token == "source":
                if db_flavor == DBType.ARANGO:
                    fields.append("_from")
            elif token == "target":
                if db_flavor == DBType.ARANGO:
                    fields.append("_to")
            elif token == "relation":
                if db_flavor != DBType.TIGERGRAPH:
                    fields.append("relation")
            else:
                fields.append(token)
        deduped: list[str] = []
        for field in fields:
            if field not in deduped:
                deduped.append(field)
        return deduped

relationship_merge_property_names(edge)

Relationship properties that distinguish parallel edges (Cypher MERGE, etc.).

Uses the first logical identities key when present (endpoints omitted — they are already matched on nodes). If that key yields no relationship fields, or identities is empty, falls back to all direct weight names.

Source code in graflo/architecture/schema/db_aware.py
def relationship_merge_property_names(self, edge: Edge) -> list[str]:
    """Relationship properties that distinguish parallel edges (Cypher MERGE, etc.).

    Uses the first logical ``identities`` key when present (endpoints omitted —
    they are already matched on nodes). If that key yields no relationship
    fields, or ``identities`` is empty, falls back to all direct weight names.
    """
    db_flavor = self.db_profile.db_flavor
    if edge.identities:
        props = self._identity_tokens_to_relationship_properties(
            edge.identities[0], db_flavor
        )
        if props:
            return props
    if edge.weights is not None and edge.weights.direct_names:
        return list(edge.weights.direct_names)
    return []

EdgeRuntime dataclass

Resolved DB-facing runtime data for one logical edge.

Source code in graflo/architecture/schema/db_aware.py
@dataclass(frozen=True)
class EdgeRuntime:
    """Resolved DB-facing runtime data for one logical edge."""

    edge: Edge
    source_storage: str
    target_storage: str
    relation_name: str | None
    store_extracted_relation_as_weight: bool
    effective_relation_field: str | None
    db_profile: DatabaseProfile

    @property
    def edge_id(self) -> EdgeId:
        return self.edge.edge_id

    def storage_name(self, *, purpose: str | None = None) -> str | None:
        return self.db_profile.edge_storage_name(
            self.edge.edge_id,
            source_storage=self.source_storage,
            target_storage=self.target_storage,
            purpose=purpose,
        )

    def graph_name(self, *, purpose: str | None = None) -> str | None:
        return self.db_profile.edge_graph_name(
            self.edge.edge_id,
            source_storage=self.source_storage,
            target_storage=self.target_storage,
            purpose=purpose,
        )

    def physical_variants(self) -> list[dict[str, str | None | list[Index]]]:
        return self.db_profile.edge_physical_variants(
            self.edge.edge_id,
            source_storage=self.source_storage,
            target_storage=self.target_storage,
        )

GraphMetadata

Bases: ConfigBaseModel

Schema metadata and versioning information.

Holds metadata about the schema, including its name, version, and description. Used for schema identification and versioning. Suitable for LLM-generated schema constituents.

Source code in graflo/architecture/schema/metadata.py
class GraphMetadata(ConfigBaseModel):
    """Schema metadata and versioning information.

    Holds metadata about the schema, including its name, version, and
    description.  Used for schema identification and versioning.
    Suitable for LLM-generated schema constituents.
    """

    name: str = PydanticField(
        ...,
        description="Name of the schema (e.g. graph or database identifier).",
    )
    version: str | None = PydanticField(
        default=None,
        description="Semantic version of the schema (e.g. '1.0.0', '2.1.3-beta+build.42').",
    )
    description: str | None = PydanticField(
        default=None,
        description="Optional human-readable description of the schema.",
    )

    @field_validator("version")
    @classmethod
    def _validate_semver(cls, v: str | None) -> str | None:
        if v is not None and not _SEMVER_RE.match(v):
            raise ValueError(
                f"version '{v}' is not a valid semantic version "
                f"(expected MAJOR.MINOR.PATCH[-prerelease][+build])"
            )
        return v

Schema

Bases: ConfigBaseModel

Graph schema (A+B): metadata, core schema, and DB profile.

Source code in graflo/architecture/schema/document.py
class Schema(ConfigBaseModel):
    """Graph schema (A+B): metadata, core schema, and DB profile."""

    metadata: GraphMetadata = PydanticField(
        ...,
        description="Schema metadata and versioning (name, version).",
    )
    core_schema: CoreSchema = PydanticField(
        ...,
        description="Core schema model (vertices + edges).",
        validation_alias=AliasChoices("core_schema", "graph"),
    )
    db_profile: DatabaseProfile = PydanticField(
        default_factory=DatabaseProfile,
        description="Database-specific physical profile (secondary indexes, naming, etc.).",
    )

    @model_validator(mode="after")
    def _init_schema(self) -> Schema:
        self.finish_init()
        return self

    def finish_init(self) -> None:
        self.core_schema.finish_init()

    def remove_disconnected_vertices(self) -> set[str]:
        return self.core_schema.remove_disconnected_vertices()

    def resolve_db_aware(self, db_flavor: DBType | None = None) -> SchemaDBAware:
        """Build DB-aware runtime wrappers without mutating logical schema."""
        from .db_aware import (
            EdgeConfigDBAware,
            SchemaDBAware,
            VertexConfigDBAware,
        )

        if db_flavor is not None:
            self.db_profile.db_flavor = db_flavor

        vertex_db = VertexConfigDBAware(self.core_schema.vertex_config, self.db_profile)
        edge_db = EdgeConfigDBAware(
            self.core_schema.edge_config, vertex_db, self.db_profile
        )
        edge_db.compile_identity_indexes()
        return SchemaDBAware(
            vertex_config=vertex_db,
            edge_config=edge_db,
            db_profile=self.db_profile,
        )

    @staticmethod
    def _slug_filename_token(token: str) -> str:
        """Normalize arbitrary token into filename-safe slug."""
        cleaned = re.sub(r"[^A-Za-z0-9._-]+", "-", token.strip())
        return cleaned.strip("-") or "schema"

    def default_dump_filename(self) -> str:
        """Return default schema dump filename: <name>-<version>.yaml."""
        schema_name = self._slug_filename_token(self.metadata.name)
        version = (
            self.metadata.version
            if self.metadata.version is not None
            else "unversioned"
        )
        schema_version = self._slug_filename_token(version)
        return f"{schema_name}-{schema_version}.yaml"

    def dump(
        self,
        path: str | pathlib.Path | None = None,
        *,
        exclude_defaults: bool = True,
    ) -> pathlib.Path:
        """Dump schema YAML to path, excluding defaults by default.

        If path is omitted, writes into current working directory using
        `<schema_name>-<version>.yaml`.
        """
        if path is None:
            target_path = pathlib.Path.cwd() / self.default_dump_filename()
        else:
            target_path = pathlib.Path(path)
            if target_path.is_dir():
                target_path = target_path / self.default_dump_filename()

        if exclude_defaults:
            payload = self.to_minimal_canonical_dict()
        else:
            payload = self.to_dict(skip_defaults=False)
        target_path.parent.mkdir(parents=True, exist_ok=True)
        target_path.write_text(
            yaml.safe_dump(
                payload,
                default_flow_style=False,
                sort_keys=False,
            ),
            encoding="utf-8",
        )
        return target_path

default_dump_filename()

Return default schema dump filename: -.yaml.

Source code in graflo/architecture/schema/document.py
def default_dump_filename(self) -> str:
    """Return default schema dump filename: <name>-<version>.yaml."""
    schema_name = self._slug_filename_token(self.metadata.name)
    version = (
        self.metadata.version
        if self.metadata.version is not None
        else "unversioned"
    )
    schema_version = self._slug_filename_token(version)
    return f"{schema_name}-{schema_version}.yaml"

dump(path=None, *, exclude_defaults=True)

Dump schema YAML to path, excluding defaults by default.

If path is omitted, writes into current working directory using <schema_name>-<version>.yaml.

Source code in graflo/architecture/schema/document.py
def dump(
    self,
    path: str | pathlib.Path | None = None,
    *,
    exclude_defaults: bool = True,
) -> pathlib.Path:
    """Dump schema YAML to path, excluding defaults by default.

    If path is omitted, writes into current working directory using
    `<schema_name>-<version>.yaml`.
    """
    if path is None:
        target_path = pathlib.Path.cwd() / self.default_dump_filename()
    else:
        target_path = pathlib.Path(path)
        if target_path.is_dir():
            target_path = target_path / self.default_dump_filename()

    if exclude_defaults:
        payload = self.to_minimal_canonical_dict()
    else:
        payload = self.to_dict(skip_defaults=False)
    target_path.parent.mkdir(parents=True, exist_ok=True)
    target_path.write_text(
        yaml.safe_dump(
            payload,
            default_flow_style=False,
            sort_keys=False,
        ),
        encoding="utf-8",
    )
    return target_path

resolve_db_aware(db_flavor=None)

Build DB-aware runtime wrappers without mutating logical schema.

Source code in graflo/architecture/schema/document.py
def resolve_db_aware(self, db_flavor: DBType | None = None) -> SchemaDBAware:
    """Build DB-aware runtime wrappers without mutating logical schema."""
    from .db_aware import (
        EdgeConfigDBAware,
        SchemaDBAware,
        VertexConfigDBAware,
    )

    if db_flavor is not None:
        self.db_profile.db_flavor = db_flavor

    vertex_db = VertexConfigDBAware(self.core_schema.vertex_config, self.db_profile)
    edge_db = EdgeConfigDBAware(
        self.core_schema.edge_config, vertex_db, self.db_profile
    )
    edge_db.compile_identity_indexes()
    return SchemaDBAware(
        vertex_config=vertex_db,
        edge_config=edge_db,
        db_profile=self.db_profile,
    )

SchemaDBAware dataclass

DB-aware schema runtime view.

Source code in graflo/architecture/schema/db_aware.py
@dataclass(frozen=True)
class SchemaDBAware:
    """DB-aware schema runtime view."""

    vertex_config: VertexConfigDBAware
    edge_config: EdgeConfigDBAware
    db_profile: DatabaseProfile

VertexConfigDBAware

DB-aware projection wrapper for VertexConfig.

Source code in graflo/architecture/schema/db_aware.py
class VertexConfigDBAware:
    """DB-aware projection wrapper for `VertexConfig`."""

    def __init__(self, logical: VertexConfig, database_features: DatabaseProfile):
        self.logical = logical
        self.db_profile = database_features

    @property
    def vertex_set(self):
        return self.logical.vertex_set

    @property
    def blank_vertices(self):
        return self.logical.blank_vertices

    @property
    def vertices(self):
        return self.logical.vertices

    def vertex_dbname(self, vertex_name: str) -> str:
        return self.db_profile.vertex_storage_name(vertex_name)

    def index(self, vertex_name: str) -> Index:
        """Get primary index for a vertex (DB layer needs Index for collection setup)."""
        return Index(fields=self.identity_fields(vertex_name))

    def identity_fields(self, vertex_name: str) -> list[str]:
        identity = self.logical.identity_fields(vertex_name)
        if identity:
            return identity
        if vertex_name in self.logical.blank_vertices:
            return ["_key"] if self.db_profile.db_flavor == DBType.ARANGO else ["id"]
        return identity

    def fields(self, vertex_name: str) -> list[Field]:
        fields = self.logical.fields(vertex_name)
        if self.db_profile.db_flavor != DBType.TIGERGRAPH:
            return fields
        # TigerGraph needs explicit scalar defaults for schema definition.
        return [
            Field(name=f.name, type=FieldType.STRING if f.type is None else f.type)
            for f in fields
        ]

    def fields_names(self, vertex_name: str) -> list[str]:
        return [f.name for f in self.fields(vertex_name)]

index(vertex_name)

Get primary index for a vertex (DB layer needs Index for collection setup).

Source code in graflo/architecture/schema/db_aware.py
def index(self, vertex_name: str) -> Index:
    """Get primary index for a vertex (DB layer needs Index for collection setup)."""
    return Index(fields=self.identity_fields(vertex_name))