Skip to content

graflo.architecture

Graph database architecture components.

This package defines the core architecture components for graph database operations, including schema management, resource handling, and data transformations.

Key Components
  • Schema: Graph database schema definition and management
  • Resource: Data resource management and processing
  • Transform: Data transformation and standardization
  • Vertex: Vertex collection configuration
  • Edge: Edge collection configuration
Example

from graflo.architecture import Schema, Resource schema = Schema( ... general={"name": "my_graph", "version": "1.0"}, ... vertex_config=vertex_config, ... edge_config=edge_config ... ) resource = Resource(name="users", data=user_data)

Edge

Bases: EdgeBase

Represents an edge in the graph database.

An edge connects two vertices and can have various configurations for indexing, weights, and relationship types.

Attributes:

Name Type Description
source str

Source vertex name

target str

Target vertex name

indexes list[Index]

List of indexes for the edge

weights WeightConfig | None

Optional weight configuration

relation str | None

Optional relation name (for Neo4j)

purpose str | None

Optional purpose for utility collections

match_source str | None

Optional source discriminant field

match_target str | None

Optional target discriminant field

type EdgeType

Edge type (DIRECT or INDIRECT)

aux bool

Whether this is an auxiliary edge

by str | None

Optional vertex name for indirect edges

graph_name str | None

Optional graph name (ArangoDB only, set in finish_init)

database_name str | None

Optional database-specific edge identifier (ArangoDB only, set in finish_init). For ArangoDB, this corresponds to the edge collection name.

Source code in graflo/architecture/edge.py
class Edge(EdgeBase):
    """Represents an edge in the graph database.

    An edge connects two vertices and can have various configurations for
    indexing, weights, and relationship types.

    Attributes:
        source: Source vertex name
        target: Target vertex name
        indexes: List of indexes for the edge
        weights: Optional weight configuration
        relation: Optional relation name (for Neo4j)
        purpose: Optional purpose for utility collections
        match_source: Optional source discriminant field
        match_target: Optional target discriminant field
        type: Edge type (DIRECT or INDIRECT)
        aux: Whether this is an auxiliary edge
        by: Optional vertex name for indirect edges
        graph_name: Optional graph name (ArangoDB only, set in finish_init)
        database_name: Optional database-specific edge identifier (ArangoDB only, set in finish_init).
                       For ArangoDB, this corresponds to the edge collection name.
    """

    indexes: list[Index] = PydanticField(
        default_factory=list,
        alias="index",
        description="List of index definitions for this edge. Alias: index.",
    )
    weights: WeightConfig | None = PydanticField(
        default=None,
        description="Optional edge weight/attribute configuration (direct fields and vertex-based weights).",
    )

    _relation_dbname: str | None = PrivateAttr(default=None)

    purpose: str | None = PydanticField(
        default=None,
        description="Optional purpose label for utility edge collections between same vertex types.",
    )

    type: EdgeType = PydanticField(
        default=EdgeType.DIRECT,
        description="Edge type: DIRECT (created during ingestion) or INDIRECT (pre-existing collection).",
    )

    aux: bool = PydanticField(
        default=False,
        description="If True, edge is initialized in DB but not used by graflo ingestion.",
    )

    by: str | None = PydanticField(
        default=None,
        description="For INDIRECT edges: vertex type name used to define the edge (set to dbname in finish_init).",
    )
    graph_name: str | None = PydanticField(
        default=None,
        description="ArangoDB graph name (set in finish_init).",
    )
    database_name: str | None = PydanticField(
        default=None,
        description="ArangoDB edge collection name (set in finish_init).",
    )

    _source: str | None = PrivateAttr(default=None)
    _target: str | None = PrivateAttr(default=None)

    @property
    def relation_dbname(self) -> str | None:
        return self._relation_dbname or self.relation

    @relation_dbname.setter
    def relation_dbname(self, value: str | None):
        self._relation_dbname = value

    def finish_init(self, vertex_config: VertexConfig):
        """Complete edge initialization with vertex configuration.

        Sets up edge collections, graph names, and initializes indices based on
        the vertex configuration.

        Args:
            vertex_config: Configuration for vertices

        """
        if self.type == EdgeType.INDIRECT and self.by is not None:
            self.by = vertex_config.vertex_dbname(self.by)

        self._source = vertex_config.vertex_dbname(self.source)
        self._target = vertex_config.vertex_dbname(self.target)

        # ArangoDB-specific: set graph_name and database_name only for ArangoDB
        if vertex_config.db_flavor == DBType.ARANGO:
            graph_name = [
                vertex_config.vertex_dbname(self.source),
                vertex_config.vertex_dbname(self.target),
            ]
            if self.purpose is not None:
                graph_name += [self.purpose]
            self.graph_name = "_".join(graph_name + ["graph"])
            self.database_name = "_".join(graph_name + ["edges"])

        # TigerGraph requires named edge types (relations), so assign default if missing
        if vertex_config.db_flavor == DBType.TIGERGRAPH and self.relation is None:
            # Use default relation name for TigerGraph
            # TigerGraph requires all edges to have a named type (relation)
            self.relation = DEFAULT_TIGERGRAPH_RELATION
            # Ensure dbname follows logical relation by default
            if self.relation_dbname is None:
                self.relation_dbname = self.relation

        # TigerGraph: add relation field to weights if relation_field or relation_from_key is set
        # This ensures the relation value is included as a typed property in the edge schema
        if vertex_config.db_flavor == DBType.TIGERGRAPH:
            if self.relation_field is None and self.relation_from_key:
                # relation_from_key is True but relation_field not set, default to standard name
                self.relation_field = DEFAULT_TIGERGRAPH_RELATION_WEIGHTNAME

            if self.relation_field is not None:
                # Initialize weights if not already present
                if self.weights is None:
                    self.weights = WeightConfig()
                # Type assertion: weights is guaranteed to be WeightConfig after assignment
                assert self.weights is not None, "weights should be initialized"
                # Check if the field already exists in direct weights
                if self.relation_field not in self.weights.direct_names:
                    # Add the relation field with STRING type for TigerGraph
                    self.weights.direct.append(
                        Field(name=self.relation_field, type=FieldType.STRING)
                    )

                # TigerGraph: optionally add index for relation_field if it's dynamic
                # Check if the field already has an index
                has_index = any(
                    self.relation_field in idx.fields for idx in self.indexes
                )
                if not has_index:
                    # Add a persistent secondary index for the relation field
                    self.indexes.append(Index(fields=[self.relation_field]))

        self._init_indices(vertex_config)

    def _init_indices(self, vc: VertexConfig):
        """Initialize indices for the edge.

        Args:
            vc: Vertex configuration
        """
        self.indexes = [self._init_index(index, vc) for index in self.indexes]

    def _init_index(self, index: Index, vc: VertexConfig) -> Index:
        """Initialize a single index for the edge.

        Args:
            index: Index to initialize
            vc: Vertex configuration

        Returns:
            Index: Initialized index

        Note:
            Default behavior for edge indices: adds ["_from", "_to"] for uniqueness
            in ArangoDB.
        """
        index_fields = []

        # "@" is reserved : quick hack - do not reinit the index twice
        if any("@" in f for f in index.fields):
            return index
        if index.name is None:
            index_fields += index.fields
        else:
            # add index over a vertex of index.name
            if index.fields:
                fields = index.fields
            else:
                fields = vc.index(index.name).fields
            index_fields += [f"{index.name}@{x}" for x in fields]

        if not index.exclude_edge_endpoints and vc.db_flavor == DBType.ARANGO:
            if all([item not in index_fields for item in ["_from", "_to"]]):
                index_fields = ["_from", "_to"] + index_fields

        index.fields = index_fields
        return index

    @property
    def edge_name_dyad(self):
        """Get the edge name as a dyad (source, target).

        Returns:
            tuple[str, str]: Source and target vertex names
        """
        return self.source, self.target

    @property
    def edge_id(self) -> EdgeId:
        """Get the edge ID.

        Returns:
            EdgeId: Tuple of (source, target, purpose)
        """
        return self.source, self.target, self.purpose

edge_id property

Get the edge ID.

Returns:

Name Type Description
EdgeId EdgeId

Tuple of (source, target, purpose)

edge_name_dyad property

Get the edge name as a dyad (source, target).

Returns:

Type Description

tuple[str, str]: Source and target vertex names

finish_init(vertex_config)

Complete edge initialization with vertex configuration.

Sets up edge collections, graph names, and initializes indices based on the vertex configuration.

Parameters:

Name Type Description Default
vertex_config VertexConfig

Configuration for vertices

required
Source code in graflo/architecture/edge.py
def finish_init(self, vertex_config: VertexConfig):
    """Complete edge initialization with vertex configuration.

    Sets up edge collections, graph names, and initializes indices based on
    the vertex configuration.

    Args:
        vertex_config: Configuration for vertices

    """
    if self.type == EdgeType.INDIRECT and self.by is not None:
        self.by = vertex_config.vertex_dbname(self.by)

    self._source = vertex_config.vertex_dbname(self.source)
    self._target = vertex_config.vertex_dbname(self.target)

    # ArangoDB-specific: set graph_name and database_name only for ArangoDB
    if vertex_config.db_flavor == DBType.ARANGO:
        graph_name = [
            vertex_config.vertex_dbname(self.source),
            vertex_config.vertex_dbname(self.target),
        ]
        if self.purpose is not None:
            graph_name += [self.purpose]
        self.graph_name = "_".join(graph_name + ["graph"])
        self.database_name = "_".join(graph_name + ["edges"])

    # TigerGraph requires named edge types (relations), so assign default if missing
    if vertex_config.db_flavor == DBType.TIGERGRAPH and self.relation is None:
        # Use default relation name for TigerGraph
        # TigerGraph requires all edges to have a named type (relation)
        self.relation = DEFAULT_TIGERGRAPH_RELATION
        # Ensure dbname follows logical relation by default
        if self.relation_dbname is None:
            self.relation_dbname = self.relation

    # TigerGraph: add relation field to weights if relation_field or relation_from_key is set
    # This ensures the relation value is included as a typed property in the edge schema
    if vertex_config.db_flavor == DBType.TIGERGRAPH:
        if self.relation_field is None and self.relation_from_key:
            # relation_from_key is True but relation_field not set, default to standard name
            self.relation_field = DEFAULT_TIGERGRAPH_RELATION_WEIGHTNAME

        if self.relation_field is not None:
            # Initialize weights if not already present
            if self.weights is None:
                self.weights = WeightConfig()
            # Type assertion: weights is guaranteed to be WeightConfig after assignment
            assert self.weights is not None, "weights should be initialized"
            # Check if the field already exists in direct weights
            if self.relation_field not in self.weights.direct_names:
                # Add the relation field with STRING type for TigerGraph
                self.weights.direct.append(
                    Field(name=self.relation_field, type=FieldType.STRING)
                )

            # TigerGraph: optionally add index for relation_field if it's dynamic
            # Check if the field already has an index
            has_index = any(
                self.relation_field in idx.fields for idx in self.indexes
            )
            if not has_index:
                # Add a persistent secondary index for the relation field
                self.indexes.append(Index(fields=[self.relation_field]))

    self._init_indices(vertex_config)

EdgeConfig

Bases: ConfigBaseModel

Configuration for managing collections of edges.

This class manages a collection of edges, providing methods for accessing and manipulating edge configurations.

Attributes:

Name Type Description
edges list[Edge]

List of edge configurations

Source code in graflo/architecture/edge.py
class EdgeConfig(ConfigBaseModel):
    """Configuration for managing collections of edges.

    This class manages a collection of edges, providing methods for accessing
    and manipulating edge configurations.

    Attributes:
        edges: List of edge configurations
    """

    edges: list[Edge] = PydanticField(
        default_factory=list,
        description="List of edge definitions (source, target, weights, indexes, relation, etc.).",
    )
    _edges_map: dict[EdgeId, Edge] = PrivateAttr()

    @model_validator(mode="after")
    def _build_edges_map(self) -> EdgeConfig:
        """Build internal mapping of edge IDs to edge configurations."""
        object.__setattr__(self, "_edges_map", {e.edge_id: e for e in self.edges})
        return self

    def finish_init(self, vc: VertexConfig):
        """Complete initialization of all edges with vertex configuration.

        Args:
            vc: Vertex configuration
        """
        for e in self.edges:
            e.finish_init(vc)

    def edges_list(self, include_aux=False):
        """Get list of edges.

        Args:
            include_aux: Whether to include auxiliary edges

        Returns:
            generator: Generator yielding edge configurations
        """
        return (e for e in self._edges_map.values() if include_aux or not e.aux)

    def edges_items(self, include_aux=False):
        """Get items of edges.

        Args:
            include_aux: Whether to include auxiliary edges

        Returns:
            generator: Generator yielding (edge_id, edge) tuples
        """
        return (
            (eid, e) for eid, e in self._edges_map.items() if include_aux or not e.aux
        )

    def __contains__(self, item: EdgeId | Edge):
        """Check if edge exists in configuration.

        Args:
            item: Edge ID or Edge instance to check

        Returns:
            bool: True if edge exists, False otherwise
        """
        if isinstance(item, Edge):
            eid = item.edge_id
        else:
            eid = item

        if eid in self._edges_map:
            return True
        else:
            return False

    def update_edges(self, edge: Edge, vertex_config: VertexConfig):
        """Update edge configuration.

        Args:
            edge: Edge configuration to update
            vertex_config: Vertex configuration
        """
        if edge.edge_id in self._edges_map:
            self._edges_map[edge.edge_id].update(edge)
        else:
            self._edges_map[edge.edge_id] = edge
        self._edges_map[edge.edge_id].finish_init(vertex_config=vertex_config)

    @property
    def vertices(self):
        """Get set of vertex names involved in edges.

        Returns:
            set[str]: Set of vertex names
        """
        return {e.source for e in self.edges} | {e.target for e in self.edges}

vertices property

Get set of vertex names involved in edges.

Returns:

Type Description

set[str]: Set of vertex names

__contains__(item)

Check if edge exists in configuration.

Parameters:

Name Type Description Default
item EdgeId | Edge

Edge ID or Edge instance to check

required

Returns:

Name Type Description
bool

True if edge exists, False otherwise

Source code in graflo/architecture/edge.py
def __contains__(self, item: EdgeId | Edge):
    """Check if edge exists in configuration.

    Args:
        item: Edge ID or Edge instance to check

    Returns:
        bool: True if edge exists, False otherwise
    """
    if isinstance(item, Edge):
        eid = item.edge_id
    else:
        eid = item

    if eid in self._edges_map:
        return True
    else:
        return False

edges_items(include_aux=False)

Get items of edges.

Parameters:

Name Type Description Default
include_aux

Whether to include auxiliary edges

False

Returns:

Name Type Description
generator

Generator yielding (edge_id, edge) tuples

Source code in graflo/architecture/edge.py
def edges_items(self, include_aux=False):
    """Get items of edges.

    Args:
        include_aux: Whether to include auxiliary edges

    Returns:
        generator: Generator yielding (edge_id, edge) tuples
    """
    return (
        (eid, e) for eid, e in self._edges_map.items() if include_aux or not e.aux
    )

edges_list(include_aux=False)

Get list of edges.

Parameters:

Name Type Description Default
include_aux

Whether to include auxiliary edges

False

Returns:

Name Type Description
generator

Generator yielding edge configurations

Source code in graflo/architecture/edge.py
def edges_list(self, include_aux=False):
    """Get list of edges.

    Args:
        include_aux: Whether to include auxiliary edges

    Returns:
        generator: Generator yielding edge configurations
    """
    return (e for e in self._edges_map.values() if include_aux or not e.aux)

finish_init(vc)

Complete initialization of all edges with vertex configuration.

Parameters:

Name Type Description Default
vc VertexConfig

Vertex configuration

required
Source code in graflo/architecture/edge.py
def finish_init(self, vc: VertexConfig):
    """Complete initialization of all edges with vertex configuration.

    Args:
        vc: Vertex configuration
    """
    for e in self.edges:
        e.finish_init(vc)

update_edges(edge, vertex_config)

Update edge configuration.

Parameters:

Name Type Description Default
edge Edge

Edge configuration to update

required
vertex_config VertexConfig

Vertex configuration

required
Source code in graflo/architecture/edge.py
def update_edges(self, edge: Edge, vertex_config: VertexConfig):
    """Update edge configuration.

    Args:
        edge: Edge configuration to update
        vertex_config: Vertex configuration
    """
    if edge.edge_id in self._edges_map:
        self._edges_map[edge.edge_id].update(edge)
    else:
        self._edges_map[edge.edge_id] = edge
    self._edges_map[edge.edge_id].finish_init(vertex_config=vertex_config)

FieldType

Bases: BaseEnum

Supported field types for graph databases.

These types are primarily used for TigerGraph, which requires explicit field types. Other databases (ArangoDB, Neo4j) may use different type systems or not require types.

Attributes:

Name Type Description
INT

Integer type

UINT

Unsigned integer type

FLOAT

Floating point type

DOUBLE

Double precision floating point type

BOOL

Boolean type

STRING

String type

DATETIME

DateTime type

Source code in graflo/architecture/vertex.py
class FieldType(BaseEnum):
    """Supported field types for graph databases.

    These types are primarily used for TigerGraph, which requires explicit field types.
    Other databases (ArangoDB, Neo4j) may use different type systems or not require types.

    Attributes:
        INT: Integer type
        UINT: Unsigned integer type
        FLOAT: Floating point type
        DOUBLE: Double precision floating point type
        BOOL: Boolean type
        STRING: String type
        DATETIME: DateTime type
    """

    INT = "INT"
    UINT = "UINT"
    FLOAT = "FLOAT"
    DOUBLE = "DOUBLE"
    BOOL = "BOOL"
    STRING = "STRING"
    DATETIME = "DATETIME"

Index

Bases: ConfigBaseModel

Configuration for database indexes.

Attributes:

Name Type Description
name str | None

Optional name of the index

fields list[str]

List of fields to index

unique bool

Whether the index enforces uniqueness

type IndexType

Type of index to create

deduplicate bool

Whether to deduplicate index entries

sparse bool

Whether to create a sparse index

exclude_edge_endpoints bool

Whether to exclude edge endpoints from index

Source code in graflo/architecture/onto.py
class Index(ConfigBaseModel):
    """Configuration for database indexes.

    Attributes:
        name: Optional name of the index
        fields: List of fields to index
        unique: Whether the index enforces uniqueness
        type: Type of index to create
        deduplicate: Whether to deduplicate index entries
        sparse: Whether to create a sparse index
        exclude_edge_endpoints: Whether to exclude edge endpoints from index
    """

    name: str | None = Field(
        default=None,
        description="Optional index name. For edges, can reference a vertex name for composite fields.",
    )
    fields: list[str] = Field(
        default_factory=list,
        description="List of field names included in this index.",
    )
    unique: bool = Field(
        default=True,
        description="If True, index enforces uniqueness on the field combination.",
    )
    type: IndexType = Field(
        default=IndexType.PERSISTENT,
        description="Index type (PERSISTENT, HASH, SKIPLIST, FULLTEXT).",
    )
    deduplicate: bool = Field(
        default=True,
        description="Whether to deduplicate index entries (e.g. ArangoDB).",
    )
    sparse: bool = Field(
        default=False,
        description="If True, create a sparse index (exclude null/missing values).",
    )
    exclude_edge_endpoints: bool = Field(
        default=False,
        description="If True, do not add _from/_to to edge index (e.g. ArangoDB).",
    )

    def __iter__(self):
        """Iterate over the indexed fields."""
        return iter(self.fields)

    def db_form(self, db_type: DBType) -> dict:
        """Convert index configuration to database-specific format.

        Args:
            db_type: Type of database (ARANGO or NEO4J)

        Returns:
            Dictionary of index configuration in database-specific format

        Raises:
            ValueError: If db_type is not supported
        """
        r = dict(self.to_dict())
        if db_type == DBType.ARANGO:
            r.pop("name", None)
            r.pop("exclude_edge_endpoints", None)
        return r

__iter__()

Iterate over the indexed fields.

Source code in graflo/architecture/onto.py
def __iter__(self):
    """Iterate over the indexed fields."""
    return iter(self.fields)

db_form(db_type)

Convert index configuration to database-specific format.

Parameters:

Name Type Description Default
db_type DBType

Type of database (ARANGO or NEO4J)

required

Returns:

Type Description
dict

Dictionary of index configuration in database-specific format

Raises:

Type Description
ValueError

If db_type is not supported

Source code in graflo/architecture/onto.py
def db_form(self, db_type: DBType) -> dict:
    """Convert index configuration to database-specific format.

    Args:
        db_type: Type of database (ARANGO or NEO4J)

    Returns:
        Dictionary of index configuration in database-specific format

    Raises:
        ValueError: If db_type is not supported
    """
    r = dict(self.to_dict())
    if db_type == DBType.ARANGO:
        r.pop("name", None)
        r.pop("exclude_edge_endpoints", None)
    return r

Resource

Bases: ConfigBaseModel

Resource configuration and processing.

Represents a data resource that can be processed and transformed into graph structures. Manages the processing pipeline through actors and handles data encoding, transformation, and mapping. Suitable for LLM-generated schema constituents.

Source code in graflo/architecture/resource.py
class Resource(ConfigBaseModel):
    """Resource configuration and processing.

    Represents a data resource that can be processed and transformed into graph
    structures. Manages the processing pipeline through actors and handles data
    encoding, transformation, and mapping. Suitable for LLM-generated schema
    constituents.
    """

    model_config = {"extra": "forbid"}

    resource_name: str = PydanticField(
        ...,
        description="Name of the resource (e.g. table or file identifier).",
    )
    pipeline: list[dict[str, Any]] = PydanticField(
        ...,
        description="Pipeline of actor steps to apply in sequence (vertex, edge, transform, descend). "
        'Each step is a dict, e.g. {"vertex": "user"} or {"edge": {"from": "a", "to": "b"}}.',
        validation_alias=AliasChoices("pipeline", "apply"),
    )
    encoding: EncodingType = PydanticField(
        default=EncodingType.UTF_8,
        description="Character encoding for input/output (e.g. utf-8, ISO-8859-1).",
    )
    merge_collections: list[str] = PydanticField(
        default_factory=list,
        description="List of collection names to merge when writing to the graph.",
    )
    extra_weights: list[Edge] = PydanticField(
        default_factory=list,
        description="Additional edge weight configurations for this resource.",
    )
    types: dict[str, str] = PydanticField(
        default_factory=dict,
        description='Field name to Python type expression for casting (e.g. {"amount": "float"}).',
    )
    edge_greedy: bool = PydanticField(
        default=True,
        description="If True, emit edges as soon as source/target vertices exist; if False, wait for explicit targets.",
    )

    _root: ActorWrapper = PrivateAttr()
    _types: dict[str, Callable[..., Any]] = PrivateAttr(default_factory=dict)
    _vertex_config: VertexConfig = PrivateAttr()
    _edge_config: EdgeConfig = PrivateAttr()

    @model_validator(mode="after")
    def _build_root_and_types(self) -> Resource:
        """Build root ActorWrapper from pipeline and evaluate type expressions."""
        object.__setattr__(self, "_root", ActorWrapper(*self.pipeline))
        object.__setattr__(self, "_types", {})
        for k, v in self.types.items():
            try:
                self._types[k] = eval(v)
            except Exception as ex:
                logger.error(
                    "For resource %s for field %s failed to cast type %s : %s",
                    self.name,
                    k,
                    v,
                    ex,
                )
        # Placeholders until finish_init is called by Schema
        object.__setattr__(
            self,
            "_vertex_config",
            VertexConfig(vertices=[]),
        )
        object.__setattr__(self, "_edge_config", EdgeConfig())
        return self

    @property
    def vertex_config(self) -> VertexConfig:
        """Vertex configuration (set by Schema.finish_init)."""
        return self._vertex_config

    @property
    def edge_config(self) -> EdgeConfig:
        """Edge configuration (set by Schema.finish_init)."""
        return self._edge_config

    @property
    def root(self) -> ActorWrapper:
        """Root actor wrapper for the processing pipeline."""
        return self._root

    @property
    def name(self) -> str:
        """Resource name (alias for resource_name)."""
        return self.resource_name

    def finish_init(
        self,
        vertex_config: VertexConfig,
        edge_config: EdgeConfig,
        transforms: dict[str, ProtoTransform],
    ) -> None:
        """Complete resource initialization.

        Initializes the resource with vertex and edge configurations,
        and sets up the processing pipeline. Called by Schema after load.

        Args:
            vertex_config: Configuration for vertices
            edge_config: Configuration for edges
            transforms: Dictionary of available transforms
        """
        object.__setattr__(self, "_vertex_config", vertex_config)
        object.__setattr__(self, "_edge_config", edge_config)

        logger.debug("total resource actor count : %s", self.root.count())
        self.root.finish_init(
            vertex_config=vertex_config,
            transforms=transforms,
            edge_config=edge_config,
            edge_greedy=self.edge_greedy,
        )

        logger.debug(
            "total resource actor count (after 2 finit): %s", self.root.count()
        )

        for e in self.extra_weights:
            e.finish_init(vertex_config)

    def __call__(self, doc: dict) -> defaultdict[GraphEntity, list]:
        """Process a document through the resource pipeline.

        Args:
            doc: Document to process

        Returns:
            defaultdict[GraphEntity, list]: Processed graph entities
        """
        ctx = ActionContext()
        ctx = self.root(ctx, doc=doc)
        acc = self.root.normalize_ctx(ctx)
        return acc

    def count(self) -> int:
        """Total number of actors in the resource pipeline."""
        return self.root.count()

edge_config property

Edge configuration (set by Schema.finish_init).

name property

Resource name (alias for resource_name).

root property

Root actor wrapper for the processing pipeline.

vertex_config property

Vertex configuration (set by Schema.finish_init).

__call__(doc)

Process a document through the resource pipeline.

Parameters:

Name Type Description Default
doc dict

Document to process

required

Returns:

Type Description
defaultdict[GraphEntity, list]

defaultdict[GraphEntity, list]: Processed graph entities

Source code in graflo/architecture/resource.py
def __call__(self, doc: dict) -> defaultdict[GraphEntity, list]:
    """Process a document through the resource pipeline.

    Args:
        doc: Document to process

    Returns:
        defaultdict[GraphEntity, list]: Processed graph entities
    """
    ctx = ActionContext()
    ctx = self.root(ctx, doc=doc)
    acc = self.root.normalize_ctx(ctx)
    return acc

count()

Total number of actors in the resource pipeline.

Source code in graflo/architecture/resource.py
def count(self) -> int:
    """Total number of actors in the resource pipeline."""
    return self.root.count()

finish_init(vertex_config, edge_config, transforms)

Complete resource initialization.

Initializes the resource with vertex and edge configurations, and sets up the processing pipeline. Called by Schema after load.

Parameters:

Name Type Description Default
vertex_config VertexConfig

Configuration for vertices

required
edge_config EdgeConfig

Configuration for edges

required
transforms dict[str, ProtoTransform]

Dictionary of available transforms

required
Source code in graflo/architecture/resource.py
def finish_init(
    self,
    vertex_config: VertexConfig,
    edge_config: EdgeConfig,
    transforms: dict[str, ProtoTransform],
) -> None:
    """Complete resource initialization.

    Initializes the resource with vertex and edge configurations,
    and sets up the processing pipeline. Called by Schema after load.

    Args:
        vertex_config: Configuration for vertices
        edge_config: Configuration for edges
        transforms: Dictionary of available transforms
    """
    object.__setattr__(self, "_vertex_config", vertex_config)
    object.__setattr__(self, "_edge_config", edge_config)

    logger.debug("total resource actor count : %s", self.root.count())
    self.root.finish_init(
        vertex_config=vertex_config,
        transforms=transforms,
        edge_config=edge_config,
        edge_greedy=self.edge_greedy,
    )

    logger.debug(
        "total resource actor count (after 2 finit): %s", self.root.count()
    )

    for e in self.extra_weights:
        e.finish_init(vertex_config)

Schema

Bases: ConfigBaseModel

Graph database schema configuration.

Represents the complete schema configuration for a graph database. Manages resources, vertex configurations, edge configurations, and transforms. Suitable for LLM-generated schema constituents.

Source code in graflo/architecture/schema.py
class Schema(ConfigBaseModel):
    """Graph database schema configuration.

    Represents the complete schema configuration for a graph database.
    Manages resources, vertex configurations, edge configurations, and transforms.
    Suitable for LLM-generated schema constituents.
    """

    general: SchemaMetadata = PydanticField(
        ...,
        description="Schema metadata and versioning (name, version).",
    )
    vertex_config: VertexConfig = PydanticField(
        ...,
        description="Configuration for vertex collections (vertices, fields, indexes).",
    )
    edge_config: EdgeConfig = PydanticField(
        ...,
        description="Configuration for edge collections (edges, weights).",
    )
    resources: list[Resource] = PydanticField(
        default_factory=list,
        description="List of resource definitions (data pipelines mapping to vertices/edges).",
    )
    transforms: dict[str, ProtoTransform] = PydanticField(
        default_factory=dict,
        description="Dictionary of named transforms available to resources (name -> ProtoTransform).",
    )

    _resources: dict[str, Resource] = PrivateAttr()

    @field_validator("resources", mode="before")
    @classmethod
    def _coerce_resources_list(cls, v: Any) -> Any:
        """Accept empty dict as empty list for backward compatibility."""
        if isinstance(v, dict) and len(v) == 0:
            return []
        return v

    @model_validator(mode="after")
    def _init_schema(self) -> Schema:
        """Set transform names, finish edge/resource init, and build resource name map."""
        self.finish_init()
        return self

    def finish_init(self) -> None:
        """Complete schema initialization after construction or resource updates.

        Sets transform names, initializes edge configuration with vertex config,
        calls finish_init on each resource, validates unique resource names,
        and builds the internal _resources name-to-Resource mapping.

        Call this after assigning to resources (e.g. when inferring resources
        from a database) so that _resources and resource pipelines are correct.

        Raises:
            ValueError: If duplicate resource names are found.
        """
        for name, t in self.transforms.items():
            t.name = name

        self.edge_config.finish_init(self.vertex_config)

        for r in self.resources:
            r.finish_init(
                vertex_config=self.vertex_config,
                edge_config=self.edge_config,
                transforms=self.transforms,
            )

        names = [r.name for r in self.resources]
        c = Counter(names)
        for k, v in c.items():
            if v > 1:
                raise ValueError(f"resource name {k} used {v} times")
        object.__setattr__(self, "_resources", {r.name: r for r in self.resources})

    def fetch_resource(self, name: str | None = None) -> Resource:
        """Fetch a resource by name or get the first available resource.

        Args:
            name: Optional name of the resource to fetch

        Returns:
            Resource: The requested resource

        Raises:
            ValueError: If the requested resource is not found or if no resources exist
        """
        _current_resource = None

        if name is not None:
            if name in self._resources:
                _current_resource = self._resources[name]
            else:
                raise ValueError(f"Resource {name} not found")
        else:
            if self._resources:
                _current_resource = self.resources[0]
            else:
                raise ValueError("Empty resource container 😕")
        return _current_resource

    def remove_disconnected_vertices(self) -> None:
        """Remove vertices that do not take part in any relation (disconnected).

        Builds the set of vertex names that appear as source or target of any
        edge, then removes from VertexConfig all other vertices. For each
        resource, finds actors that reference disconnected vertices (via
        find_descendants) and removes them from the actor tree. Resources
        whose root actor references only disconnected vertices are removed.

        Mutates this schema in place.
        """
        connected = self.edge_config.vertices
        disconnected = self.vertex_config.vertex_set - connected
        if not disconnected:
            return

        self.vertex_config.remove_vertices(disconnected)

        def mentions_disconnected(wrapper):
            actor = wrapper.actor
            if isinstance(actor, VertexActor):
                return actor.name in disconnected
            if isinstance(actor, TransformActor):
                return actor.vertex is not None and actor.vertex in disconnected
            if isinstance(actor, EdgeActor):
                return (
                    actor.edge.source in disconnected
                    or actor.edge.target in disconnected
                )
            return False

        to_drop: list[Resource] = []
        for resource in self.resources:
            root = resource.root
            to_remove = set(
                root.find_descendants(actor_type=VertexActor, name=disconnected)
                + root.find_descendants(actor_type=TransformActor, vertex=disconnected)
                + root.find_descendants(
                    predicate=lambda w: isinstance(w.actor, EdgeActor)
                    and (
                        w.actor.edge.source in disconnected
                        or w.actor.edge.target in disconnected
                    ),
                )
            )
            if mentions_disconnected(root):
                to_drop.append(resource)
                continue
            root.remove_descendants_if(lambda w: w in to_remove)

        for r in to_drop:
            self.resources.remove(r)
            self._resources.pop(r.name, None)

fetch_resource(name=None)

Fetch a resource by name or get the first available resource.

Parameters:

Name Type Description Default
name str | None

Optional name of the resource to fetch

None

Returns:

Name Type Description
Resource Resource

The requested resource

Raises:

Type Description
ValueError

If the requested resource is not found or if no resources exist

Source code in graflo/architecture/schema.py
def fetch_resource(self, name: str | None = None) -> Resource:
    """Fetch a resource by name or get the first available resource.

    Args:
        name: Optional name of the resource to fetch

    Returns:
        Resource: The requested resource

    Raises:
        ValueError: If the requested resource is not found or if no resources exist
    """
    _current_resource = None

    if name is not None:
        if name in self._resources:
            _current_resource = self._resources[name]
        else:
            raise ValueError(f"Resource {name} not found")
    else:
        if self._resources:
            _current_resource = self.resources[0]
        else:
            raise ValueError("Empty resource container 😕")
    return _current_resource

finish_init()

Complete schema initialization after construction or resource updates.

Sets transform names, initializes edge configuration with vertex config, calls finish_init on each resource, validates unique resource names, and builds the internal _resources name-to-Resource mapping.

Call this after assigning to resources (e.g. when inferring resources from a database) so that _resources and resource pipelines are correct.

Raises:

Type Description
ValueError

If duplicate resource names are found.

Source code in graflo/architecture/schema.py
def finish_init(self) -> None:
    """Complete schema initialization after construction or resource updates.

    Sets transform names, initializes edge configuration with vertex config,
    calls finish_init on each resource, validates unique resource names,
    and builds the internal _resources name-to-Resource mapping.

    Call this after assigning to resources (e.g. when inferring resources
    from a database) so that _resources and resource pipelines are correct.

    Raises:
        ValueError: If duplicate resource names are found.
    """
    for name, t in self.transforms.items():
        t.name = name

    self.edge_config.finish_init(self.vertex_config)

    for r in self.resources:
        r.finish_init(
            vertex_config=self.vertex_config,
            edge_config=self.edge_config,
            transforms=self.transforms,
        )

    names = [r.name for r in self.resources]
    c = Counter(names)
    for k, v in c.items():
        if v > 1:
            raise ValueError(f"resource name {k} used {v} times")
    object.__setattr__(self, "_resources", {r.name: r for r in self.resources})

remove_disconnected_vertices()

Remove vertices that do not take part in any relation (disconnected).

Builds the set of vertex names that appear as source or target of any edge, then removes from VertexConfig all other vertices. For each resource, finds actors that reference disconnected vertices (via find_descendants) and removes them from the actor tree. Resources whose root actor references only disconnected vertices are removed.

Mutates this schema in place.

Source code in graflo/architecture/schema.py
def remove_disconnected_vertices(self) -> None:
    """Remove vertices that do not take part in any relation (disconnected).

    Builds the set of vertex names that appear as source or target of any
    edge, then removes from VertexConfig all other vertices. For each
    resource, finds actors that reference disconnected vertices (via
    find_descendants) and removes them from the actor tree. Resources
    whose root actor references only disconnected vertices are removed.

    Mutates this schema in place.
    """
    connected = self.edge_config.vertices
    disconnected = self.vertex_config.vertex_set - connected
    if not disconnected:
        return

    self.vertex_config.remove_vertices(disconnected)

    def mentions_disconnected(wrapper):
        actor = wrapper.actor
        if isinstance(actor, VertexActor):
            return actor.name in disconnected
        if isinstance(actor, TransformActor):
            return actor.vertex is not None and actor.vertex in disconnected
        if isinstance(actor, EdgeActor):
            return (
                actor.edge.source in disconnected
                or actor.edge.target in disconnected
            )
        return False

    to_drop: list[Resource] = []
    for resource in self.resources:
        root = resource.root
        to_remove = set(
            root.find_descendants(actor_type=VertexActor, name=disconnected)
            + root.find_descendants(actor_type=TransformActor, vertex=disconnected)
            + root.find_descendants(
                predicate=lambda w: isinstance(w.actor, EdgeActor)
                and (
                    w.actor.edge.source in disconnected
                    or w.actor.edge.target in disconnected
                ),
            )
        )
        if mentions_disconnected(root):
            to_drop.append(resource)
            continue
        root.remove_descendants_if(lambda w: w in to_remove)

    for r in to_drop:
        self.resources.remove(r)
        self._resources.pop(r.name, None)

Vertex

Bases: ConfigBaseModel

Represents a vertex in the graph database.

A vertex is a fundamental unit in the graph that can have fields, indexes, and filters. Fields can be specified as strings, Field objects, or dicts. Internally, fields are stored as Field objects but behave like strings for backward compatibility.

Attributes:

Name Type Description
name str

Name of the vertex

fields list[Field]

List of field names (str), Field objects, or dicts. Will be normalized to Field objects by the validator.

indexes list[Index]

List of indexes for the vertex

filters list[FilterExpression]

List of filter expressions

dbname str | None

Optional database name (defaults to vertex name)

Examples:

>>> # Backward compatible: list of strings
>>> v1 = Vertex(name="user", fields=["id", "name"])
>>> # Typed fields: list of Field objects
>>> v2 = Vertex(name="user", fields=[
...     Field(name="id", type="INT"),
...     Field(name="name", type="STRING")
... ])
>>> # From dicts (e.g., from YAML/JSON)
>>> v3 = Vertex(name="user", fields=[
...     {"name": "id", "type": "INT"},
...     {"name": "name"}  # defaults to None type
... ])
Source code in graflo/architecture/vertex.py
class Vertex(ConfigBaseModel):
    """Represents a vertex in the graph database.

    A vertex is a fundamental unit in the graph that can have fields, indexes,
    and filters. Fields can be specified as strings, Field objects, or dicts.
    Internally, fields are stored as Field objects but behave like strings
    for backward compatibility.

    Attributes:
        name: Name of the vertex
        fields: List of field names (str), Field objects, or dicts.
               Will be normalized to Field objects by the validator.
        indexes: List of indexes for the vertex
        filters: List of filter expressions
        dbname: Optional database name (defaults to vertex name)

    Examples:
        >>> # Backward compatible: list of strings
        >>> v1 = Vertex(name="user", fields=["id", "name"])

        >>> # Typed fields: list of Field objects
        >>> v2 = Vertex(name="user", fields=[
        ...     Field(name="id", type="INT"),
        ...     Field(name="name", type="STRING")
        ... ])

        >>> # From dicts (e.g., from YAML/JSON)
        >>> v3 = Vertex(name="user", fields=[
        ...     {"name": "id", "type": "INT"},
        ...     {"name": "name"}  # defaults to None type
        ... ])
    """

    # Allow extra keys when loading from YAML (e.g. transforms, other runtime keys)
    model_config = ConfigDict(extra="ignore")

    name: str = PydanticField(
        ...,
        description="Name of the vertex type (e.g. user, post, company).",
    )
    fields: list[Field] = PydanticField(
        default_factory=list,
        description="List of fields (names, Field objects, or dicts). Normalized to Field objects.",
    )
    indexes: list[Index] = PydanticField(
        default_factory=list,
        description="List of index definitions for this vertex. Defaults to primary index on all fields if empty.",
    )
    filters: list[FilterExpression] = PydanticField(
        default_factory=list,
        description="Filter expressions (logical formulae) applied when querying this vertex.",
    )
    dbname: str | None = PydanticField(
        default=None,
        description="Optional database collection/table name. Defaults to vertex name if not set.",
    )

    @field_validator("fields", mode="before")
    @classmethod
    def convert_to_fields(cls, v: Any) -> Any:
        if not isinstance(v, list):
            raise ValueError("fields must be a list")
        return [_normalize_fields_item(item) for item in v]

    @field_validator("indexes", mode="before")
    @classmethod
    def convert_to_indexes(cls, v: Any) -> Any:
        if not isinstance(v, list):
            return v
        result = []
        for item in v:
            if isinstance(item, dict):
                result.append(Index.model_validate(item))
            else:
                result.append(item)
        return result

    @field_validator("filters", mode="before")
    @classmethod
    def convert_to_expressions(cls, v: Any) -> Any:
        if not isinstance(v, list):
            return v
        result: list[FilterExpression] = []
        for item in v:
            if isinstance(item, FilterExpression):
                result.append(item)
            elif isinstance(item, (dict, list)):
                result.append(FilterExpression.from_dict(item))
            else:
                raise ValueError(
                    "each filter must be a FilterExpression instance or a dict/list (parsed as FilterExpression)"
                )
        return result

    @model_validator(mode="after")
    def set_dbname_and_indexes(self) -> "Vertex":
        if self.dbname is None:
            object.__setattr__(self, "dbname", self.name)
        indexes = list(self.indexes)
        if not indexes:
            object.__setattr__(
                self,
                "indexes",
                [Index(fields=[f.name for f in self.fields])],
            )
        else:
            seen_names = {f.name for f in self.fields}
            new_fields = list(self.fields)
            for idx in indexes:
                for field_name in idx.fields:
                    if field_name not in seen_names:
                        new_fields.append(Field(name=field_name, type=None))
                        seen_names.add(field_name)
            object.__setattr__(self, "fields", new_fields)
        return self

    @property
    def field_names(self) -> list[str]:
        """Get list of field names (as strings)."""
        return [field.name for field in self.fields]

    def get_fields(self) -> list[Field]:
        return self.fields

    def finish_init(self, db_flavor: DBType):
        """Complete initialization of vertex with database-specific field types.

        Args:
            db_flavor: Database flavor to use for initialization
        """
        self.fields = [
            Field(name=f.name, type=FieldType.STRING)
            if f.type is None and db_flavor == DBType.TIGERGRAPH
            else f
            for f in self.fields
        ]

field_names property

Get list of field names (as strings).

finish_init(db_flavor)

Complete initialization of vertex with database-specific field types.

Parameters:

Name Type Description Default
db_flavor DBType

Database flavor to use for initialization

required
Source code in graflo/architecture/vertex.py
def finish_init(self, db_flavor: DBType):
    """Complete initialization of vertex with database-specific field types.

    Args:
        db_flavor: Database flavor to use for initialization
    """
    self.fields = [
        Field(name=f.name, type=FieldType.STRING)
        if f.type is None and db_flavor == DBType.TIGERGRAPH
        else f
        for f in self.fields
    ]

VertexConfig

Bases: ConfigBaseModel

Configuration for managing vertices.

This class manages vertices, providing methods for accessing and manipulating vertex configurations.

Attributes:

Name Type Description
vertices list[Vertex]

List of vertex configurations

blank_vertices list[str]

List of blank vertex names

force_types dict[str, list]

Dictionary mapping vertex names to type lists

db_flavor DBType

Database flavor (ARANGO or NEO4J)

Source code in graflo/architecture/vertex.py
class VertexConfig(ConfigBaseModel):
    """Configuration for managing vertices.

    This class manages vertices, providing methods for accessing
    and manipulating vertex configurations.

    Attributes:
        vertices: List of vertex configurations
        blank_vertices: List of blank vertex names
        force_types: Dictionary mapping vertex names to type lists
        db_flavor: Database flavor (ARANGO or NEO4J)
    """

    # Allow extra keys when loading from YAML (e.g. vertex_config wrapper key)
    model_config = ConfigDict(extra="ignore")

    vertices: list[Vertex] = PydanticField(
        ...,
        description="List of vertex type definitions (name, fields, indexes, filters).",
    )
    blank_vertices: list[str] = PydanticField(
        default_factory=list,
        description="Vertex names that may be created without explicit data (e.g. placeholders).",
    )
    force_types: dict[str, list] = PydanticField(
        default_factory=dict,
        description="Override mapping: vertex name -> list of field type names for type inference.",
    )
    db_flavor: DBType = PydanticField(
        default=DBType.ARANGO,
        description="Database flavor (ARANGO, NEO4J, TIGERGRAPH) for schema and index generation.",
    )

    _vertices_map: dict[str, Vertex] | None = PrivateAttr(default=None)
    _vertex_numeric_fields_map: dict[str, object] | None = PrivateAttr(default=None)

    @model_validator(mode="after")
    def build_vertices_map_and_validate_blank(self) -> "VertexConfig":
        object.__setattr__(
            self,
            "_vertices_map",
            {item.name: item for item in self.vertices},
        )
        object.__setattr__(self, "_vertex_numeric_fields_map", {})
        if set(self.blank_vertices) - set(self.vertex_set):
            raise ValueError(
                f" Blank vertices {self.blank_vertices} are not defined as vertices"
            )
        return self

    def _get_vertices_map(self) -> dict[str, Vertex]:
        """Return the vertices map (set by model validator)."""
        assert self._vertices_map is not None, "VertexConfig not fully initialized"
        return self._vertices_map

    @property
    def vertex_set(self):
        """Get set of vertex names.

        Returns:
            set[str]: Set of vertex names
        """
        return set(self._get_vertices_map().keys())

    @property
    def vertex_list(self):
        """Get list of vertex configurations.

        Returns:
            list[Vertex]: List of vertex configurations
        """
        return list(self._get_vertices_map().values())

    def _get_vertex_by_name_or_dbname(self, identifier: str) -> Vertex:
        """Get vertex by name or dbname.

        Args:
            identifier: Vertex name or dbname

        Returns:
            Vertex: The vertex object

        Raises:
            KeyError: If vertex is not found by name or dbname
        """
        m = self._get_vertices_map()
        # First try by name (most common case)
        if identifier in m:
            return m[identifier]

        # Try by dbname
        for vertex in m.values():
            if vertex.dbname == identifier:
                return vertex

        # Not found
        available_names = list(m.keys())
        available_dbnames = [v.dbname for v in m.values()]
        raise KeyError(
            f"Vertex '{identifier}' not found by name or dbname. "
            f"Available names: {available_names}, "
            f"Available dbnames: {available_dbnames}"
        )

    def vertex_dbname(self, vertex_name):
        """Get database name for a vertex.

        Args:
            vertex_name: Name of the vertex

        Returns:
            str: Database name for the vertex

        Raises:
            KeyError: If vertex is not found
        """
        m = self._get_vertices_map()
        try:
            value = m[vertex_name].dbname
        except KeyError as e:
            logger.error(
                f"Available vertices : {m.keys()}; vertex requested : {vertex_name}"
            )
            raise e
        return value

    def index(self, vertex_name) -> Index:
        """Get primary index for a vertex.

        Args:
            vertex_name: Name of the vertex

        Returns:
            Index: Primary index for the vertex
        """
        return self._get_vertices_map()[vertex_name].indexes[0]

    def indexes(self, vertex_name) -> list[Index]:
        """Get all indexes for a vertex.

        Args:
            vertex_name: Name of the vertex

        Returns:
            list[Index]: List of indexes for the vertex
        """
        return self._get_vertices_map()[vertex_name].indexes

    def fields(self, vertex_name: str) -> list[Field]:
        """Get fields for a vertex.

        Args:
            vertex_name: Name of the vertex or dbname

        Returns:
            list[Field]: List of Field objects
        """
        # Get vertex by name or dbname
        vertex = self._get_vertex_by_name_or_dbname(vertex_name)

        return vertex.fields

    def fields_names(
        self,
        vertex_name: str,
    ) -> list[str]:
        """Get field names for a vertex as strings.

        Args:
            vertex_name: Name of the vertex or dbname

        Returns:
            list[str]: List of field names as strings
        """
        vertex = self._get_vertex_by_name_or_dbname(vertex_name)
        return vertex.field_names

    def numeric_fields_list(self, vertex_name):
        """Get list of numeric fields for a vertex.

        Args:
            vertex_name: Name of the vertex

        Returns:
            tuple: Tuple of numeric field names

        Raises:
            ValueError: If vertex is not defined in config
        """
        if vertex_name in self.vertex_set:
            nmap = self._vertex_numeric_fields_map
            if nmap is not None and vertex_name in nmap:
                return nmap[vertex_name]
            else:
                return ()
        else:
            raise ValueError(
                " Accessing vertex numeric fields: vertex"
                f" {vertex_name} was not defined in config"
            )

    def filters(self, vertex_name) -> list[FilterExpression]:
        """Get filter clauses for a vertex.

        Args:
            vertex_name: Name of the vertex

        Returns:
            list[FilterExpression]: List of filter expressions
        """
        m = self._get_vertices_map()
        if vertex_name in m:
            return m[vertex_name].filters
        else:
            return []

    def remove_vertices(self, names: set[str]) -> None:
        """Remove vertices by name.

        Removes vertices from the configuration and from blank_vertices
        when present. Mutates the instance in place.

        Args:
            names: Set of vertex names to remove
        """
        if not names:
            return
        self.vertices[:] = [v for v in self.vertices if v.name not in names]
        m = self._get_vertices_map()
        for n in names:
            m.pop(n, None)
        self.blank_vertices[:] = [b for b in self.blank_vertices if b not in names]

    def update_vertex(self, v: Vertex):
        """Update vertex configuration.

        Args:
            v: Vertex configuration to update
        """
        self._get_vertices_map()[v.name] = v

    def __getitem__(self, key: str):
        """Get vertex configuration by name.

        Args:
            key: Vertex name

        Returns:
            Vertex: Vertex configuration

        Raises:
            KeyError: If vertex is not found
        """
        m = self._get_vertices_map()
        if key in m:
            return m[key]
        else:
            raise KeyError(f"Vertex {key} absent")

    def __setitem__(self, key: str, value: Vertex):
        """Set vertex configuration by name.

        Args:
            key: Vertex name
            value: Vertex configuration
        """
        self._get_vertices_map()[key] = value

    def finish_init(self):
        """Complete initialization of all vertices with database-specific field types.

        Uses self.db_flavor to determine database-specific initialization behavior.
        """
        for v in self.vertices:
            v.finish_init(self.db_flavor)

vertex_list property

Get list of vertex configurations.

Returns:

Type Description

list[Vertex]: List of vertex configurations

vertex_set property

Get set of vertex names.

Returns:

Type Description

set[str]: Set of vertex names

__getitem__(key)

Get vertex configuration by name.

Parameters:

Name Type Description Default
key str

Vertex name

required

Returns:

Name Type Description
Vertex

Vertex configuration

Raises:

Type Description
KeyError

If vertex is not found

Source code in graflo/architecture/vertex.py
def __getitem__(self, key: str):
    """Get vertex configuration by name.

    Args:
        key: Vertex name

    Returns:
        Vertex: Vertex configuration

    Raises:
        KeyError: If vertex is not found
    """
    m = self._get_vertices_map()
    if key in m:
        return m[key]
    else:
        raise KeyError(f"Vertex {key} absent")

__setitem__(key, value)

Set vertex configuration by name.

Parameters:

Name Type Description Default
key str

Vertex name

required
value Vertex

Vertex configuration

required
Source code in graflo/architecture/vertex.py
def __setitem__(self, key: str, value: Vertex):
    """Set vertex configuration by name.

    Args:
        key: Vertex name
        value: Vertex configuration
    """
    self._get_vertices_map()[key] = value

fields(vertex_name)

Get fields for a vertex.

Parameters:

Name Type Description Default
vertex_name str

Name of the vertex or dbname

required

Returns:

Type Description
list[Field]

list[Field]: List of Field objects

Source code in graflo/architecture/vertex.py
def fields(self, vertex_name: str) -> list[Field]:
    """Get fields for a vertex.

    Args:
        vertex_name: Name of the vertex or dbname

    Returns:
        list[Field]: List of Field objects
    """
    # Get vertex by name or dbname
    vertex = self._get_vertex_by_name_or_dbname(vertex_name)

    return vertex.fields

fields_names(vertex_name)

Get field names for a vertex as strings.

Parameters:

Name Type Description Default
vertex_name str

Name of the vertex or dbname

required

Returns:

Type Description
list[str]

list[str]: List of field names as strings

Source code in graflo/architecture/vertex.py
def fields_names(
    self,
    vertex_name: str,
) -> list[str]:
    """Get field names for a vertex as strings.

    Args:
        vertex_name: Name of the vertex or dbname

    Returns:
        list[str]: List of field names as strings
    """
    vertex = self._get_vertex_by_name_or_dbname(vertex_name)
    return vertex.field_names

filters(vertex_name)

Get filter clauses for a vertex.

Parameters:

Name Type Description Default
vertex_name

Name of the vertex

required

Returns:

Type Description
list[FilterExpression]

list[FilterExpression]: List of filter expressions

Source code in graflo/architecture/vertex.py
def filters(self, vertex_name) -> list[FilterExpression]:
    """Get filter clauses for a vertex.

    Args:
        vertex_name: Name of the vertex

    Returns:
        list[FilterExpression]: List of filter expressions
    """
    m = self._get_vertices_map()
    if vertex_name in m:
        return m[vertex_name].filters
    else:
        return []

finish_init()

Complete initialization of all vertices with database-specific field types.

Uses self.db_flavor to determine database-specific initialization behavior.

Source code in graflo/architecture/vertex.py
def finish_init(self):
    """Complete initialization of all vertices with database-specific field types.

    Uses self.db_flavor to determine database-specific initialization behavior.
    """
    for v in self.vertices:
        v.finish_init(self.db_flavor)

index(vertex_name)

Get primary index for a vertex.

Parameters:

Name Type Description Default
vertex_name

Name of the vertex

required

Returns:

Name Type Description
Index Index

Primary index for the vertex

Source code in graflo/architecture/vertex.py
def index(self, vertex_name) -> Index:
    """Get primary index for a vertex.

    Args:
        vertex_name: Name of the vertex

    Returns:
        Index: Primary index for the vertex
    """
    return self._get_vertices_map()[vertex_name].indexes[0]

indexes(vertex_name)

Get all indexes for a vertex.

Parameters:

Name Type Description Default
vertex_name

Name of the vertex

required

Returns:

Type Description
list[Index]

list[Index]: List of indexes for the vertex

Source code in graflo/architecture/vertex.py
def indexes(self, vertex_name) -> list[Index]:
    """Get all indexes for a vertex.

    Args:
        vertex_name: Name of the vertex

    Returns:
        list[Index]: List of indexes for the vertex
    """
    return self._get_vertices_map()[vertex_name].indexes

numeric_fields_list(vertex_name)

Get list of numeric fields for a vertex.

Parameters:

Name Type Description Default
vertex_name

Name of the vertex

required

Returns:

Name Type Description
tuple

Tuple of numeric field names

Raises:

Type Description
ValueError

If vertex is not defined in config

Source code in graflo/architecture/vertex.py
def numeric_fields_list(self, vertex_name):
    """Get list of numeric fields for a vertex.

    Args:
        vertex_name: Name of the vertex

    Returns:
        tuple: Tuple of numeric field names

    Raises:
        ValueError: If vertex is not defined in config
    """
    if vertex_name in self.vertex_set:
        nmap = self._vertex_numeric_fields_map
        if nmap is not None and vertex_name in nmap:
            return nmap[vertex_name]
        else:
            return ()
    else:
        raise ValueError(
            " Accessing vertex numeric fields: vertex"
            f" {vertex_name} was not defined in config"
        )

remove_vertices(names)

Remove vertices by name.

Removes vertices from the configuration and from blank_vertices when present. Mutates the instance in place.

Parameters:

Name Type Description Default
names set[str]

Set of vertex names to remove

required
Source code in graflo/architecture/vertex.py
def remove_vertices(self, names: set[str]) -> None:
    """Remove vertices by name.

    Removes vertices from the configuration and from blank_vertices
    when present. Mutates the instance in place.

    Args:
        names: Set of vertex names to remove
    """
    if not names:
        return
    self.vertices[:] = [v for v in self.vertices if v.name not in names]
    m = self._get_vertices_map()
    for n in names:
        m.pop(n, None)
    self.blank_vertices[:] = [b for b in self.blank_vertices if b not in names]

update_vertex(v)

Update vertex configuration.

Parameters:

Name Type Description Default
v Vertex

Vertex configuration to update

required
Source code in graflo/architecture/vertex.py
def update_vertex(self, v: Vertex):
    """Update vertex configuration.

    Args:
        v: Vertex configuration to update
    """
    self._get_vertices_map()[v.name] = v

vertex_dbname(vertex_name)

Get database name for a vertex.

Parameters:

Name Type Description Default
vertex_name

Name of the vertex

required

Returns:

Name Type Description
str

Database name for the vertex

Raises:

Type Description
KeyError

If vertex is not found

Source code in graflo/architecture/vertex.py
def vertex_dbname(self, vertex_name):
    """Get database name for a vertex.

    Args:
        vertex_name: Name of the vertex

    Returns:
        str: Database name for the vertex

    Raises:
        KeyError: If vertex is not found
    """
    m = self._get_vertices_map()
    try:
        value = m[vertex_name].dbname
    except KeyError as e:
        logger.error(
            f"Available vertices : {m.keys()}; vertex requested : {vertex_name}"
        )
        raise e
    return value