Skip to content

graphcast.architecture

Graph database architecture components.

This package defines the core architecture components for graph database operations, including schema management, resource handling, and data transformations.

Key Components
  • Schema: Graph database schema definition and management
  • Resource: Data resource management and processing
  • Transform: Data transformation and standardization
  • Vertex: Vertex collection configuration
  • Edge: Edge collection configuration
Example

from graphcast.architecture import Schema, Resource schema = Schema( ... general={"name": "my_graph", "version": "1.0"}, ... vertex_config=vertex_config, ... edge_config=edge_config ... ) resource = Resource(name="users", data=user_data)

Edge dataclass

Bases: BaseDataclass

Represents an edge in the graph database.

An edge connects two vertices and can have various configurations for indexing, weights, and relationship types.

Attributes:

Name Type Description
source str

Source vertex name

target str

Target vertex name

indexes list[Index]

List of indexes for the edge

weights Optional[WeightConfig]

Optional weight configuration

non_exclusive list[str]

List of non-exclusive fields

relation Optional[str]

Optional relation name (for Neo4j)

purpose Optional[str]

Optional purpose for utility collections

source_discriminant Optional[str]

Optional source discriminant field

target_discriminant Optional[str]

Optional target discriminant field

source_relation_field Optional[str]

Optional source relation field

target_relation_field Optional[str]

Optional target relation field

type EdgeType

Edge type (DIRECT or INDIRECT)

aux bool

Whether this is an auxiliary edge

casting_type EdgeCastingType

Type of edge casting

by Optional[str]

Optional vertex name for indirect edges

source_collection Optional[str]

Optional source collection name

target_collection Optional[str]

Optional target collection name

graph_name Optional[str]

Optional graph name

collection_name Optional[str]

Optional collection name

db_flavor DBFlavor

Database flavor (ARANGO or NEO4J)

Source code in graphcast/architecture/edge.py
@dataclasses.dataclass
class Edge(BaseDataclass):
    """Represents an edge in the graph database.

    An edge connects two vertices and can have various configurations for
    indexing, weights, and relationship types.

    Attributes:
        source: Source vertex name
        target: Target vertex name
        indexes: List of indexes for the edge
        weights: Optional weight configuration
        non_exclusive: List of non-exclusive fields
        relation: Optional relation name (for Neo4j)
        purpose: Optional purpose for utility collections
        source_discriminant: Optional source discriminant field
        target_discriminant: Optional target discriminant field
        source_relation_field: Optional source relation field
        target_relation_field: Optional target relation field
        type: Edge type (DIRECT or INDIRECT)
        aux: Whether this is an auxiliary edge
        casting_type: Type of edge casting
        by: Optional vertex name for indirect edges
        source_collection: Optional source collection name
        target_collection: Optional target collection name
        graph_name: Optional graph name
        collection_name: Optional collection name
        db_flavor: Database flavor (ARANGO or NEO4J)
    """

    source: str
    target: str
    indexes: list[Index] = dataclasses.field(default_factory=list)
    weights: Optional[WeightConfig] = None

    non_exclusive: list[str] = dataclasses.field(default_factory=list)

    # used for specifies an index (neo4j)
    relation: Optional[str] = None

    # used to create extra utility collections between the same type of vertices (A, B)
    purpose: Optional[str] = None

    source_discriminant: Optional[str] = None
    target_discriminant: Optional[str] = None

    source_relation_field: Optional[str] = None
    target_relation_field: Optional[str] = None

    type: EdgeType = EdgeType.DIRECT

    aux: bool = (
        False  # aux=True edges are init in the db but not considered by graphcast
    )

    casting_type: EdgeCastingType = EdgeCastingType.PAIR_LIKE
    by: Optional[str] = None
    source_collection: Optional[str] = None
    target_collection: Optional[str] = None
    graph_name: Optional[str] = None
    collection_name: Optional[str] = None
    db_flavor: DBFlavor = DBFlavor.ARANGO

    def __post_init__(self):
        """Initialize the edge after dataclass initialization.

        Validates that source and target relation fields are not both set.

        Raises:
            ValueError: If both source and target relation fields are set
        """
        if (
            self.source_relation_field is not None
            and self.target_relation_field is not None
        ):
            raise ValueError(
                f"Both source_relation_field and target_relation_field are set for edge ({self.source}, {self.target})"
            )

    def finish_init(self, vertex_config: VertexConfig):
        """Complete edge initialization with vertex configuration.

        Sets up edge collections, graph names, and initializes indices based on
        the vertex configuration.

        Args:
            vertex_config: Configuration for vertices

        Note:
            Discriminant is used to pin documents among a collection of documents
            of the same vertex type.
        """
        if self.type == EdgeType.INDIRECT and self.by is not None:
            self.by = vertex_config.vertex_dbname(self.by)

        if self.source_discriminant is None and self.target_discriminant is None:
            self.casting_type = EdgeCastingType.PAIR_LIKE
        else:
            self.casting_type = EdgeCastingType.PRODUCT_LIKE

        if self.weights is not None:
            if self.weights.source_fields:
                vertex_config[self.source] = vertex_config[
                    self.source
                ].update_aux_fields(self.weights.source_fields)
            if self.weights.target_fields:
                vertex_config[self.target] = vertex_config[
                    self.target
                ].update_aux_fields(self.weights.target_fields)

        self.source_collection = vertex_config.vertex_dbname(self.source)
        self.target_collection = vertex_config.vertex_dbname(self.target)
        graph_name = [
            vertex_config.vertex_dbname(self.source),
            vertex_config.vertex_dbname(self.target),
        ]
        if self.purpose is not None:
            graph_name += [self.purpose]
        self.graph_name = "_".join(graph_name + ["graph"])
        self.collection_name = "_".join(graph_name + ["edges"])
        self.db_flavor = vertex_config.db_flavor
        self._init_indices(vertex_config)

    def _init_indices(self, vc: VertexConfig):
        """Initialize indices for the edge.

        Args:
            vc: Vertex configuration
        """
        self.indexes = [self._init_index(index, vc) for index in self.indexes]

    def _init_index(self, index: Index, vc: VertexConfig) -> Index:
        """Initialize a single index for the edge.

        Args:
            index: Index to initialize
            vc: Vertex configuration

        Returns:
            Index: Initialized index

        Note:
            Default behavior for edge indices: adds ["_from", "_to"] for uniqueness
            in ArangoDB.
        """
        index_fields = []

        # "@" is reserved : quick hack - do not reinit the index twice
        if any("@" in f for f in index.fields):
            return index
        if index.name is None:
            index_fields += index.fields
        else:
            # add index over a vertex of index.name
            if index.fields:
                fields = index.fields
            else:
                fields = vc.index(index.name).fields
            index_fields += [f"{index.name}@{x}" for x in fields]

        if not index.exclude_edge_endpoints and self.db_flavor == DBFlavor.ARANGO:
            if all([item not in index_fields for item in ["_from", "_to"]]):
                index_fields = ["_from", "_to"] + index_fields

        index.fields = index_fields
        return index

    @property
    def edge_name_dyad(self):
        """Get the edge name as a dyad (source, target).

        Returns:
            tuple[str, str]: Source and target vertex names
        """
        return self.source, self.target

    @property
    def edge_id(self) -> EdgeId:
        """Get the edge ID.

        Returns:
            EdgeId: Tuple of (source, target, purpose)
        """
        return self.source, self.target, self.purpose

edge_id property

Get the edge ID.

Returns:

Name Type Description
EdgeId EdgeId

Tuple of (source, target, purpose)

edge_name_dyad property

Get the edge name as a dyad (source, target).

Returns:

Type Description

tuple[str, str]: Source and target vertex names

__post_init__()

Initialize the edge after dataclass initialization.

Validates that source and target relation fields are not both set.

Raises:

Type Description
ValueError

If both source and target relation fields are set

Source code in graphcast/architecture/edge.py
def __post_init__(self):
    """Initialize the edge after dataclass initialization.

    Validates that source and target relation fields are not both set.

    Raises:
        ValueError: If both source and target relation fields are set
    """
    if (
        self.source_relation_field is not None
        and self.target_relation_field is not None
    ):
        raise ValueError(
            f"Both source_relation_field and target_relation_field are set for edge ({self.source}, {self.target})"
        )

finish_init(vertex_config)

Complete edge initialization with vertex configuration.

Sets up edge collections, graph names, and initializes indices based on the vertex configuration.

Parameters:

Name Type Description Default
vertex_config VertexConfig

Configuration for vertices

required
Note

Discriminant is used to pin documents among a collection of documents of the same vertex type.

Source code in graphcast/architecture/edge.py
def finish_init(self, vertex_config: VertexConfig):
    """Complete edge initialization with vertex configuration.

    Sets up edge collections, graph names, and initializes indices based on
    the vertex configuration.

    Args:
        vertex_config: Configuration for vertices

    Note:
        Discriminant is used to pin documents among a collection of documents
        of the same vertex type.
    """
    if self.type == EdgeType.INDIRECT and self.by is not None:
        self.by = vertex_config.vertex_dbname(self.by)

    if self.source_discriminant is None and self.target_discriminant is None:
        self.casting_type = EdgeCastingType.PAIR_LIKE
    else:
        self.casting_type = EdgeCastingType.PRODUCT_LIKE

    if self.weights is not None:
        if self.weights.source_fields:
            vertex_config[self.source] = vertex_config[
                self.source
            ].update_aux_fields(self.weights.source_fields)
        if self.weights.target_fields:
            vertex_config[self.target] = vertex_config[
                self.target
            ].update_aux_fields(self.weights.target_fields)

    self.source_collection = vertex_config.vertex_dbname(self.source)
    self.target_collection = vertex_config.vertex_dbname(self.target)
    graph_name = [
        vertex_config.vertex_dbname(self.source),
        vertex_config.vertex_dbname(self.target),
    ]
    if self.purpose is not None:
        graph_name += [self.purpose]
    self.graph_name = "_".join(graph_name + ["graph"])
    self.collection_name = "_".join(graph_name + ["edges"])
    self.db_flavor = vertex_config.db_flavor
    self._init_indices(vertex_config)

EdgeConfig dataclass

Bases: BaseDataclass

Configuration for managing collections of edges.

This class manages a collection of edges, providing methods for accessing and manipulating edge configurations.

Attributes:

Name Type Description
edges list[Edge]

List of edge configurations

Source code in graphcast/architecture/edge.py
@dataclasses.dataclass
class EdgeConfig(BaseDataclass):
    """Configuration for managing collections of edges.

    This class manages a collection of edges, providing methods for accessing
    and manipulating edge configurations.

    Attributes:
        edges: List of edge configurations
    """

    edges: list[Edge] = dataclasses.field(default_factory=list)

    def __post_init__(self):
        """Initialize the edge configuration.

        Creates internal mapping of edge IDs to edge configurations.
        """
        self._edges_map: dict[EdgeId, Edge] = {e.edge_id: e for e in self.edges}

    def finish_init(self, vc: VertexConfig):
        """Complete initialization of all edges with vertex configuration.

        Args:
            vc: Vertex configuration
        """
        for k, e in self._edges_map.items():
            e.finish_init(vc)

    def _reset_edges(self):
        """Reset edges list from internal mapping."""
        self.edges = list(self._edges_map.values())

    def edges_list(self, include_aux=False):
        """Get list of edges.

        Args:
            include_aux: Whether to include auxiliary edges

        Returns:
            generator: Generator yielding edge configurations
        """
        return (e for e in self._edges_map.values() if include_aux or not e.aux)

    def edges_items(self, include_aux=False):
        """Get items of edges.

        Args:
            include_aux: Whether to include auxiliary edges

        Returns:
            generator: Generator yielding (edge_id, edge) tuples
        """
        return (
            (eid, e) for eid, e in self._edges_map.items() if include_aux or not e.aux
        )

    def __contains__(self, item: EdgeId | Edge):
        """Check if edge exists in configuration.

        Args:
            item: Edge ID or Edge instance to check

        Returns:
            bool: True if edge exists, False otherwise
        """
        if isinstance(item, Edge):
            eid = item.edge_id
        else:
            eid = item

        if eid in self._edges_map:
            return True
        else:
            return False

    def update_edges(self, edge: Edge, vertex_config: VertexConfig):
        """Update edge configuration.

        Args:
            edge: Edge configuration to update
            vertex_config: Vertex configuration
        """
        if edge.edge_id in self._edges_map:
            self._edges_map[edge.edge_id].update(edge)
        else:
            self._edges_map[edge.edge_id] = edge
        self._edges_map[edge.edge_id].finish_init(vertex_config=vertex_config)

    @property
    def vertices(self):
        """Get set of vertex names involved in edges.

        Returns:
            set[str]: Set of vertex names
        """
        return {e.source for e in self.edges} | {e.target for e in self.edges}

vertices property

Get set of vertex names involved in edges.

Returns:

Type Description

set[str]: Set of vertex names

__contains__(item)

Check if edge exists in configuration.

Parameters:

Name Type Description Default
item EdgeId | Edge

Edge ID or Edge instance to check

required

Returns:

Name Type Description
bool

True if edge exists, False otherwise

Source code in graphcast/architecture/edge.py
def __contains__(self, item: EdgeId | Edge):
    """Check if edge exists in configuration.

    Args:
        item: Edge ID or Edge instance to check

    Returns:
        bool: True if edge exists, False otherwise
    """
    if isinstance(item, Edge):
        eid = item.edge_id
    else:
        eid = item

    if eid in self._edges_map:
        return True
    else:
        return False

__post_init__()

Initialize the edge configuration.

Creates internal mapping of edge IDs to edge configurations.

Source code in graphcast/architecture/edge.py
def __post_init__(self):
    """Initialize the edge configuration.

    Creates internal mapping of edge IDs to edge configurations.
    """
    self._edges_map: dict[EdgeId, Edge] = {e.edge_id: e for e in self.edges}

edges_items(include_aux=False)

Get items of edges.

Parameters:

Name Type Description Default
include_aux

Whether to include auxiliary edges

False

Returns:

Name Type Description
generator

Generator yielding (edge_id, edge) tuples

Source code in graphcast/architecture/edge.py
def edges_items(self, include_aux=False):
    """Get items of edges.

    Args:
        include_aux: Whether to include auxiliary edges

    Returns:
        generator: Generator yielding (edge_id, edge) tuples
    """
    return (
        (eid, e) for eid, e in self._edges_map.items() if include_aux or not e.aux
    )

edges_list(include_aux=False)

Get list of edges.

Parameters:

Name Type Description Default
include_aux

Whether to include auxiliary edges

False

Returns:

Name Type Description
generator

Generator yielding edge configurations

Source code in graphcast/architecture/edge.py
def edges_list(self, include_aux=False):
    """Get list of edges.

    Args:
        include_aux: Whether to include auxiliary edges

    Returns:
        generator: Generator yielding edge configurations
    """
    return (e for e in self._edges_map.values() if include_aux or not e.aux)

finish_init(vc)

Complete initialization of all edges with vertex configuration.

Parameters:

Name Type Description Default
vc VertexConfig

Vertex configuration

required
Source code in graphcast/architecture/edge.py
def finish_init(self, vc: VertexConfig):
    """Complete initialization of all edges with vertex configuration.

    Args:
        vc: Vertex configuration
    """
    for k, e in self._edges_map.items():
        e.finish_init(vc)

update_edges(edge, vertex_config)

Update edge configuration.

Parameters:

Name Type Description Default
edge Edge

Edge configuration to update

required
vertex_config VertexConfig

Vertex configuration

required
Source code in graphcast/architecture/edge.py
def update_edges(self, edge: Edge, vertex_config: VertexConfig):
    """Update edge configuration.

    Args:
        edge: Edge configuration to update
        vertex_config: Vertex configuration
    """
    if edge.edge_id in self._edges_map:
        self._edges_map[edge.edge_id].update(edge)
    else:
        self._edges_map[edge.edge_id] = edge
    self._edges_map[edge.edge_id].finish_init(vertex_config=vertex_config)

Index dataclass

Bases: BaseDataclass

Configuration for database indexes.

Attributes:

Name Type Description
name str | None

Optional name of the index

fields list[str]

List of fields to index

unique bool

Whether the index enforces uniqueness

type IndexType

Type of index to create

deduplicate bool

Whether to deduplicate index entries

sparse bool

Whether to create a sparse index

exclude_edge_endpoints bool

Whether to exclude edge endpoints from index

Source code in graphcast/architecture/onto.py
@dataclasses.dataclass
class Index(BaseDataclass):
    """Configuration for database indexes.

    Attributes:
        name: Optional name of the index
        fields: List of fields to index
        unique: Whether the index enforces uniqueness
        type: Type of index to create
        deduplicate: Whether to deduplicate index entries
        sparse: Whether to create a sparse index
        exclude_edge_endpoints: Whether to exclude edge endpoints from index
    """

    name: str | None = None
    fields: list[str] = dataclasses.field(default_factory=list)
    unique: bool = True
    type: IndexType = IndexType.PERSISTENT
    deduplicate: bool = True
    sparse: bool = False
    exclude_edge_endpoints: bool = False

    def __iter__(self):
        """Iterate over the indexed fields."""
        return iter(self.fields)

    def db_form(self, db_type: DBFlavor) -> dict:
        """Convert index configuration to database-specific format.

        Args:
            db_type: Type of database (ARANGO or NEO4J)

        Returns:
            Dictionary of index configuration in database-specific format

        Raises:
            ValueError: If db_type is not supported
        """
        r = self.to_dict()
        if db_type == DBFlavor.ARANGO:
            _ = r.pop("name")
            _ = r.pop("exclude_edge_endpoints")
        elif db_type == DBFlavor.NEO4J:
            pass
        else:
            raise ValueError(f"Unknown db_type {db_type}")

        return r

__iter__()

Iterate over the indexed fields.

Source code in graphcast/architecture/onto.py
def __iter__(self):
    """Iterate over the indexed fields."""
    return iter(self.fields)

db_form(db_type)

Convert index configuration to database-specific format.

Parameters:

Name Type Description Default
db_type DBFlavor

Type of database (ARANGO or NEO4J)

required

Returns:

Type Description
dict

Dictionary of index configuration in database-specific format

Raises:

Type Description
ValueError

If db_type is not supported

Source code in graphcast/architecture/onto.py
def db_form(self, db_type: DBFlavor) -> dict:
    """Convert index configuration to database-specific format.

    Args:
        db_type: Type of database (ARANGO or NEO4J)

    Returns:
        Dictionary of index configuration in database-specific format

    Raises:
        ValueError: If db_type is not supported
    """
    r = self.to_dict()
    if db_type == DBFlavor.ARANGO:
        _ = r.pop("name")
        _ = r.pop("exclude_edge_endpoints")
    elif db_type == DBFlavor.NEO4J:
        pass
    else:
        raise ValueError(f"Unknown db_type {db_type}")

    return r

Resource dataclass

Bases: BaseDataclass, JSONWizard

Resource configuration and processing.

This class represents a data resource that can be processed and transformed into graph structures. It manages the processing pipeline through actors and handles data encoding, transformation, and mapping.

Attributes:

Name Type Description
resource_name str

Name of the resource

apply list

List of actors to apply in sequence

encoding EncodingType

Data encoding type (default: UTF_8)

merge_collections list[str]

List of collections to merge

extra_weights list[Edge]

List of additional edge weights

types dict[str, str]

Dictionary of field type mappings

root dict[str, str]

Root actor wrapper for processing

vertex_config dict[str, str]

Configuration for vertices

edge_config dict[str, str]

Configuration for edges

Source code in graphcast/architecture/resource.py
@dataclasses.dataclass(kw_only=True)
class Resource(BaseDataclass, JSONWizard):
    """Resource configuration and processing.

    This class represents a data resource that can be processed and transformed
    into graph structures. It manages the processing pipeline through actors
    and handles data encoding, transformation, and mapping.

    Attributes:
        resource_name: Name of the resource
        apply: List of actors to apply in sequence
        encoding: Data encoding type (default: UTF_8)
        merge_collections: List of collections to merge
        extra_weights: List of additional edge weights
        types: Dictionary of field type mappings
        root: Root actor wrapper for processing
        vertex_config: Configuration for vertices
        edge_config: Configuration for edges
    """

    resource_name: str
    apply: list
    encoding: EncodingType = EncodingType.UTF_8
    merge_collections: list[str] = dataclasses.field(default_factory=list)
    extra_weights: list[Edge] = dataclasses.field(default_factory=list)
    types: dict[str, str] = dataclasses.field(default_factory=dict)

    def __post_init__(self):
        """Initialize the resource after dataclass initialization.

        Sets up the actor wrapper and type mappings. Evaluates type expressions
        for field type casting.

        Raises:
            Exception: If type evaluation fails for any field
        """
        self.root = ActorWrapper(*self.apply)
        self._types: dict[str, Callable] = dict()
        self.vertex_config: VertexConfig
        self.edge_config: EdgeConfig
        for k, v in self.types.items():
            try:
                self._types[k] = eval(v)
            except Exception as ex:
                logger.error(
                    f"For resource {self.name} for field {k} failed to cast type {v} : {ex}"
                )

    @property
    def name(self):
        """Get the resource name.

        Returns:
            str: Name of the resource
        """
        return self.resource_name

    def finish_init(
        self,
        vertex_config: VertexConfig,
        edge_config: EdgeConfig,
        transforms: dict[str, ProtoTransform],
    ):
        """Complete resource initialization.

        Initializes the resource with vertex and edge configurations,
        and sets up the processing pipeline.

        Args:
            vertex_config: Configuration for vertices
            edge_config: Configuration for edges
            transforms: Dictionary of available transforms
        """
        self.vertex_config = vertex_config
        self.edge_config = edge_config

        logger.debug(f"total resource actor count : {self.root.count()}")
        self.root.finish_init(
            vertex_config=vertex_config,
            transforms=transforms,
            edge_config=edge_config,
        )

        logger.debug(f"total resource actor count (after 2 finit): {self.root.count()}")

        for e in self.extra_weights:
            e.finish_init(vertex_config)

    def __call__(self, doc: dict) -> defaultdict[GraphEntity, list]:
        """Process a document through the resource pipeline.

        Args:
            doc: Document to process

        Returns:
            defaultdict[GraphEntity, list]: Processed graph entities
        """
        ctx = ActionContext()
        ctx = self.root(ctx, doc=doc)
        acc = self.root.normalize_ctx(ctx)
        return acc

    def count(self):
        """Get the total number of actors in the resource.

        Returns:
            int: Number of actors
        """
        return self.root.count()

name property

Get the resource name.

Returns:

Name Type Description
str

Name of the resource

__call__(doc)

Process a document through the resource pipeline.

Parameters:

Name Type Description Default
doc dict

Document to process

required

Returns:

Type Description
defaultdict[GraphEntity, list]

defaultdict[GraphEntity, list]: Processed graph entities

Source code in graphcast/architecture/resource.py
def __call__(self, doc: dict) -> defaultdict[GraphEntity, list]:
    """Process a document through the resource pipeline.

    Args:
        doc: Document to process

    Returns:
        defaultdict[GraphEntity, list]: Processed graph entities
    """
    ctx = ActionContext()
    ctx = self.root(ctx, doc=doc)
    acc = self.root.normalize_ctx(ctx)
    return acc

__post_init__()

Initialize the resource after dataclass initialization.

Sets up the actor wrapper and type mappings. Evaluates type expressions for field type casting.

Raises:

Type Description
Exception

If type evaluation fails for any field

Source code in graphcast/architecture/resource.py
def __post_init__(self):
    """Initialize the resource after dataclass initialization.

    Sets up the actor wrapper and type mappings. Evaluates type expressions
    for field type casting.

    Raises:
        Exception: If type evaluation fails for any field
    """
    self.root = ActorWrapper(*self.apply)
    self._types: dict[str, Callable] = dict()
    self.vertex_config: VertexConfig
    self.edge_config: EdgeConfig
    for k, v in self.types.items():
        try:
            self._types[k] = eval(v)
        except Exception as ex:
            logger.error(
                f"For resource {self.name} for field {k} failed to cast type {v} : {ex}"
            )

count()

Get the total number of actors in the resource.

Returns:

Name Type Description
int

Number of actors

Source code in graphcast/architecture/resource.py
def count(self):
    """Get the total number of actors in the resource.

    Returns:
        int: Number of actors
    """
    return self.root.count()

finish_init(vertex_config, edge_config, transforms)

Complete resource initialization.

Initializes the resource with vertex and edge configurations, and sets up the processing pipeline.

Parameters:

Name Type Description Default
vertex_config VertexConfig

Configuration for vertices

required
edge_config EdgeConfig

Configuration for edges

required
transforms dict[str, ProtoTransform]

Dictionary of available transforms

required
Source code in graphcast/architecture/resource.py
def finish_init(
    self,
    vertex_config: VertexConfig,
    edge_config: EdgeConfig,
    transforms: dict[str, ProtoTransform],
):
    """Complete resource initialization.

    Initializes the resource with vertex and edge configurations,
    and sets up the processing pipeline.

    Args:
        vertex_config: Configuration for vertices
        edge_config: Configuration for edges
        transforms: Dictionary of available transforms
    """
    self.vertex_config = vertex_config
    self.edge_config = edge_config

    logger.debug(f"total resource actor count : {self.root.count()}")
    self.root.finish_init(
        vertex_config=vertex_config,
        transforms=transforms,
        edge_config=edge_config,
    )

    logger.debug(f"total resource actor count (after 2 finit): {self.root.count()}")

    for e in self.extra_weights:
        e.finish_init(vertex_config)

Schema dataclass

Bases: BaseDataclass

Graph database schema configuration.

This class represents the complete schema configuration for a graph database. It manages resources, vertex configurations, edge configurations, and transforms.

Attributes:

Name Type Description
general SchemaMetadata

Schema metadata and versioning information

vertex_config VertexConfig

Configuration for vertex collections

edge_config EdgeConfig

Configuration for edge collections

resources list[Resource]

List of resource definitions

transforms dict[str, ProtoTransform]

Dictionary of available transforms

_resources dict[str, ProtoTransform]

Internal mapping of resource names to resources

Source code in graphcast/architecture/schema.py
@dataclasses.dataclass
class Schema(BaseDataclass):
    """Graph database schema configuration.

    This class represents the complete schema configuration for a graph database.
    It manages resources, vertex configurations, edge configurations, and transforms.

    Attributes:
        general: Schema metadata and versioning information
        vertex_config: Configuration for vertex collections
        edge_config: Configuration for edge collections
        resources: List of resource definitions
        transforms: Dictionary of available transforms
        _resources: Internal mapping of resource names to resources
    """

    general: SchemaMetadata
    vertex_config: VertexConfig
    edge_config: EdgeConfig
    resources: list[Resource]
    transforms: dict[str, ProtoTransform] = dataclasses.field(default_factory=dict)

    def __post_init__(self):
        """Initialize the schema after dataclass initialization.

        Sets up transforms, initializes edge configuration, and validates
        resource names for uniqueness.

        Raises:
            ValueError: If duplicate resource names are found
        """
        for name, t in self.transforms.items():
            t.name = name

        self.edge_config.finish_init(self.vertex_config)

        for r in self.resources:
            r.finish_init(
                vertex_config=self.vertex_config,
                edge_config=self.edge_config,
                transforms=self.transforms,
            )

        names = [r.name for r in self.resources]
        c = Counter(names)
        for k, v in c.items():
            if v > 1:
                raise ValueError(f"resource name {k} used {v} times")
        self._resources: dict[str, Resource] = {}
        for r in self.resources:
            self._resources[r.name] = r

    def fetch_resource(self, name: Optional[str] = None) -> Resource:
        """Fetch a resource by name or get the first available resource.

        Args:
            name: Optional name of the resource to fetch

        Returns:
            Resource: The requested resource

        Raises:
            ValueError: If the requested resource is not found or if no resources exist
        """
        _current_resource = None

        if name is not None:
            if name in self._resources:
                _current_resource = self._resources[name]
            else:
                raise ValueError(f"Resource {name} not found")
        else:
            if self._resources:
                _current_resource = self.resources[0]
            else:
                raise ValueError("Empty resource container 😕")
        return _current_resource

__post_init__()

Initialize the schema after dataclass initialization.

Sets up transforms, initializes edge configuration, and validates resource names for uniqueness.

Raises:

Type Description
ValueError

If duplicate resource names are found

Source code in graphcast/architecture/schema.py
def __post_init__(self):
    """Initialize the schema after dataclass initialization.

    Sets up transforms, initializes edge configuration, and validates
    resource names for uniqueness.

    Raises:
        ValueError: If duplicate resource names are found
    """
    for name, t in self.transforms.items():
        t.name = name

    self.edge_config.finish_init(self.vertex_config)

    for r in self.resources:
        r.finish_init(
            vertex_config=self.vertex_config,
            edge_config=self.edge_config,
            transforms=self.transforms,
        )

    names = [r.name for r in self.resources]
    c = Counter(names)
    for k, v in c.items():
        if v > 1:
            raise ValueError(f"resource name {k} used {v} times")
    self._resources: dict[str, Resource] = {}
    for r in self.resources:
        self._resources[r.name] = r

fetch_resource(name=None)

Fetch a resource by name or get the first available resource.

Parameters:

Name Type Description Default
name Optional[str]

Optional name of the resource to fetch

None

Returns:

Name Type Description
Resource Resource

The requested resource

Raises:

Type Description
ValueError

If the requested resource is not found or if no resources exist

Source code in graphcast/architecture/schema.py
def fetch_resource(self, name: Optional[str] = None) -> Resource:
    """Fetch a resource by name or get the first available resource.

    Args:
        name: Optional name of the resource to fetch

    Returns:
        Resource: The requested resource

    Raises:
        ValueError: If the requested resource is not found or if no resources exist
    """
    _current_resource = None

    if name is not None:
        if name in self._resources:
            _current_resource = self._resources[name]
        else:
            raise ValueError(f"Resource {name} not found")
    else:
        if self._resources:
            _current_resource = self.resources[0]
        else:
            raise ValueError("Empty resource container 😕")
    return _current_resource

Vertex dataclass

Bases: BaseDataclass

Represents a vertex in the graph database.

A vertex is a fundamental unit in the graph that can have fields, indexes, and filters

Attributes:

Name Type Description
name str

Name of the vertex

fields list[str]

List of field names

fields_aux list[str]

List of auxiliary field names for weight passing

indexes list[Index]

List of indexes for the vertex

filters list[Expression]

List of filter expressions

dbname Optional[str]

Optional database name (defaults to vertex name)

Source code in graphcast/architecture/vertex.py
@dataclasses.dataclass
class Vertex(BaseDataclass):
    """Represents a vertex in the graph database.

    A vertex is a fundamental unit in the graph that can have fields, indexes,
    and filters

    Attributes:
        name: Name of the vertex
        fields: List of field names
        fields_aux: List of auxiliary field names for weight passing
        indexes: List of indexes for the vertex
        filters: List of filter expressions
        dbname: Optional database name (defaults to vertex name)
    """

    name: str
    fields: list[str]
    fields_aux: list[str] = dataclasses.field(
        default_factory=list
    )  # temporary field necessary to pass weights to edges
    indexes: list[Index] = dataclasses.field(default_factory=list)
    filters: list[Expression] = dataclasses.field(default_factory=list)
    dbname: Optional[str] = None

    @property
    def fields_all(self):
        """Get all fields including auxiliary fields.

        Returns:
            list[str]: Combined list of regular and auxiliary fields
        """
        return self.fields + self.fields_aux

    def __post_init__(self):
        """Initialize the vertex after dataclass initialization.

        Sets the database name if not provided and updates fields based on indexes.
        """
        if self.dbname is None:
            self.dbname = self.name
        union_fields = set(self.fields)
        for ei in self.indexes:
            union_fields |= set(ei.fields)
        self.fields = list(union_fields)

    def update_aux_fields(self, fields_aux: list):
        """Update auxiliary fields.

        Args:
            fields_aux: List of new auxiliary fields to add

        Returns:
            Vertex: Self for method chaining
        """
        self.fields_aux = list(set(self.fields_aux) | set(fields_aux))
        return self

fields_all property

Get all fields including auxiliary fields.

Returns:

Type Description

list[str]: Combined list of regular and auxiliary fields

__post_init__()

Initialize the vertex after dataclass initialization.

Sets the database name if not provided and updates fields based on indexes.

Source code in graphcast/architecture/vertex.py
def __post_init__(self):
    """Initialize the vertex after dataclass initialization.

    Sets the database name if not provided and updates fields based on indexes.
    """
    if self.dbname is None:
        self.dbname = self.name
    union_fields = set(self.fields)
    for ei in self.indexes:
        union_fields |= set(ei.fields)
    self.fields = list(union_fields)

update_aux_fields(fields_aux)

Update auxiliary fields.

Parameters:

Name Type Description Default
fields_aux list

List of new auxiliary fields to add

required

Returns:

Name Type Description
Vertex

Self for method chaining

Source code in graphcast/architecture/vertex.py
def update_aux_fields(self, fields_aux: list):
    """Update auxiliary fields.

    Args:
        fields_aux: List of new auxiliary fields to add

    Returns:
        Vertex: Self for method chaining
    """
    self.fields_aux = list(set(self.fields_aux) | set(fields_aux))
    return self

VertexConfig dataclass

Bases: BaseDataclass

Configuration for managing collections of vertices.

This class manages a collection of vertices, providing methods for accessing and manipulating vertex configurations.

Attributes:

Name Type Description
vertices list[Vertex]

List of vertex configurations

blank_vertices list[str]

List of blank vertex names

force_types dict[str, list]

Dictionary mapping vertex names to type lists

db_flavor DBFlavor

Database flavor (ARANGO or NEO4J)

Source code in graphcast/architecture/vertex.py
@dataclasses.dataclass
class VertexConfig(BaseDataclass):
    """Configuration for managing collections of vertices.

    This class manages a collection of vertices, providing methods for accessing
    and manipulating vertex configurations.

    Attributes:
        vertices: List of vertex configurations
        blank_vertices: List of blank vertex names
        force_types: Dictionary mapping vertex names to type lists
        db_flavor: Database flavor (ARANGO or NEO4J)
    """

    vertices: list[Vertex]
    blank_vertices: list[str] = dataclasses.field(default_factory=list)
    force_types: dict[str, list] = dataclasses.field(default_factory=dict)
    db_flavor: DBFlavor = DBFlavor.ARANGO

    def __post_init__(self):
        """Initialize the vertex configuration.

        Creates internal mappings and validates blank vertices.

        Raises:
            ValueError: If blank vertices are not defined in the configuration
        """
        self._vertices_map: dict[str, Vertex] = {
            item.name: item for item in self.vertices
        }

        # TODO replace by types
        # vertex_collection_name -> [numeric fields]
        self._vcollection_numeric_fields_map = {}

        self.discriminant_chart: defaultdict[str, bool] = defaultdict(lambda: False)

        if set(self.blank_vertices) - set(self.vertex_set):
            raise ValueError(
                f" Blank collections {self.blank_vertices} are not defined"
                " as vertex collections"
            )

    @property
    def vertex_set(self):
        """Get set of vertex names.

        Returns:
            set[str]: Set of vertex names
        """
        return set(self._vertices_map.keys())

    @property
    def vertex_list(self):
        """Get list of vertex configurations.

        Returns:
            list[Vertex]: List of vertex configurations
        """
        return list(self._vertices_map.values())

    def vertex_dbname(self, vertex_name):
        """Get database name for a vertex.

        Args:
            vertex_name: Name of the vertex

        Returns:
            str: Database name for the vertex

        Raises:
            KeyError: If vertex is not found
        """
        try:
            value = self._vertices_map[vertex_name].dbname
        except KeyError as e:
            logger.error(
                "Available vertex collections :"
                f" {self._vertices_map.keys()}; vertex collection"
                f" requested : {vertex_name}"
            )
            raise e
        return value

    def index(self, vertex_name) -> Index:
        """Get primary index for a vertex.

        Args:
            vertex_name: Name of the vertex

        Returns:
            Index: Primary index for the vertex
        """
        return self._vertices_map[vertex_name].indexes[0]

    def indexes(self, vertex_name) -> list[Index]:
        """Get all indexes for a vertex.

        Args:
            vertex_name: Name of the vertex

        Returns:
            list[Index]: List of indexes for the vertex
        """
        return self._vertices_map[vertex_name].indexes

    def fields(self, vertex_name: str, with_aux=False):
        """Get fields for a vertex.

        Args:
            vertex_name: Name of the vertex
            with_aux: Whether to include auxiliary fields

        Returns:
            list[str]: List of fields
        """
        if with_aux:
            return self._vertices_map[vertex_name].fields_all
        else:
            return self._vertices_map[vertex_name].fields

    def numeric_fields_list(self, vertex_name):
        """Get list of numeric fields for a vertex.

        Args:
            vertex_name: Name of the vertex

        Returns:
            tuple: Tuple of numeric field names

        Raises:
            ValueError: If vertex is not defined in config
        """
        if vertex_name in self.vertex_set:
            if vertex_name in self._vcollection_numeric_fields_map:
                return self._vcollection_numeric_fields_map[vertex_name]
            else:
                return ()
        else:
            raise ValueError(
                " Accessing vertex collection numeric fields: vertex"
                f" collection {vertex_name} was not defined in config"
            )

    def filters(self, vertex_name) -> list[Expression]:
        """Get filter expressions for a vertex.

        Args:
            vertex_name: Name of the vertex

        Returns:
            list[Expression]: List of filter expressions
        """
        if vertex_name in self._vertices_map:
            return self._vertices_map[vertex_name].filters
        else:
            return []

    def update_vertex(self, v: Vertex):
        """Update vertex configuration.

        Args:
            v: Vertex configuration to update
        """
        self._vertices_map[v.name] = v

    def __getitem__(self, key: str):
        """Get vertex configuration by name.

        Args:
            key: Vertex name

        Returns:
            Vertex: Vertex configuration

        Raises:
            KeyError: If vertex is not found
        """
        if key in self._vertices_map:
            return self._vertices_map[key]
        else:
            raise KeyError(f"Vertex {key} absent")

    def __setitem__(self, key: str, value: Vertex):
        """Set vertex configuration by name.

        Args:
            key: Vertex name
            value: Vertex configuration
        """
        self._vertices_map[key] = value

vertex_list property

Get list of vertex configurations.

Returns:

Type Description

list[Vertex]: List of vertex configurations

vertex_set property

Get set of vertex names.

Returns:

Type Description

set[str]: Set of vertex names

__getitem__(key)

Get vertex configuration by name.

Parameters:

Name Type Description Default
key str

Vertex name

required

Returns:

Name Type Description
Vertex

Vertex configuration

Raises:

Type Description
KeyError

If vertex is not found

Source code in graphcast/architecture/vertex.py
def __getitem__(self, key: str):
    """Get vertex configuration by name.

    Args:
        key: Vertex name

    Returns:
        Vertex: Vertex configuration

    Raises:
        KeyError: If vertex is not found
    """
    if key in self._vertices_map:
        return self._vertices_map[key]
    else:
        raise KeyError(f"Vertex {key} absent")

__post_init__()

Initialize the vertex configuration.

Creates internal mappings and validates blank vertices.

Raises:

Type Description
ValueError

If blank vertices are not defined in the configuration

Source code in graphcast/architecture/vertex.py
def __post_init__(self):
    """Initialize the vertex configuration.

    Creates internal mappings and validates blank vertices.

    Raises:
        ValueError: If blank vertices are not defined in the configuration
    """
    self._vertices_map: dict[str, Vertex] = {
        item.name: item for item in self.vertices
    }

    # TODO replace by types
    # vertex_collection_name -> [numeric fields]
    self._vcollection_numeric_fields_map = {}

    self.discriminant_chart: defaultdict[str, bool] = defaultdict(lambda: False)

    if set(self.blank_vertices) - set(self.vertex_set):
        raise ValueError(
            f" Blank collections {self.blank_vertices} are not defined"
            " as vertex collections"
        )

__setitem__(key, value)

Set vertex configuration by name.

Parameters:

Name Type Description Default
key str

Vertex name

required
value Vertex

Vertex configuration

required
Source code in graphcast/architecture/vertex.py
def __setitem__(self, key: str, value: Vertex):
    """Set vertex configuration by name.

    Args:
        key: Vertex name
        value: Vertex configuration
    """
    self._vertices_map[key] = value

fields(vertex_name, with_aux=False)

Get fields for a vertex.

Parameters:

Name Type Description Default
vertex_name str

Name of the vertex

required
with_aux

Whether to include auxiliary fields

False

Returns:

Type Description

list[str]: List of fields

Source code in graphcast/architecture/vertex.py
def fields(self, vertex_name: str, with_aux=False):
    """Get fields for a vertex.

    Args:
        vertex_name: Name of the vertex
        with_aux: Whether to include auxiliary fields

    Returns:
        list[str]: List of fields
    """
    if with_aux:
        return self._vertices_map[vertex_name].fields_all
    else:
        return self._vertices_map[vertex_name].fields

filters(vertex_name)

Get filter expressions for a vertex.

Parameters:

Name Type Description Default
vertex_name

Name of the vertex

required

Returns:

Type Description
list[Expression]

list[Expression]: List of filter expressions

Source code in graphcast/architecture/vertex.py
def filters(self, vertex_name) -> list[Expression]:
    """Get filter expressions for a vertex.

    Args:
        vertex_name: Name of the vertex

    Returns:
        list[Expression]: List of filter expressions
    """
    if vertex_name in self._vertices_map:
        return self._vertices_map[vertex_name].filters
    else:
        return []

index(vertex_name)

Get primary index for a vertex.

Parameters:

Name Type Description Default
vertex_name

Name of the vertex

required

Returns:

Name Type Description
Index Index

Primary index for the vertex

Source code in graphcast/architecture/vertex.py
def index(self, vertex_name) -> Index:
    """Get primary index for a vertex.

    Args:
        vertex_name: Name of the vertex

    Returns:
        Index: Primary index for the vertex
    """
    return self._vertices_map[vertex_name].indexes[0]

indexes(vertex_name)

Get all indexes for a vertex.

Parameters:

Name Type Description Default
vertex_name

Name of the vertex

required

Returns:

Type Description
list[Index]

list[Index]: List of indexes for the vertex

Source code in graphcast/architecture/vertex.py
def indexes(self, vertex_name) -> list[Index]:
    """Get all indexes for a vertex.

    Args:
        vertex_name: Name of the vertex

    Returns:
        list[Index]: List of indexes for the vertex
    """
    return self._vertices_map[vertex_name].indexes

numeric_fields_list(vertex_name)

Get list of numeric fields for a vertex.

Parameters:

Name Type Description Default
vertex_name

Name of the vertex

required

Returns:

Name Type Description
tuple

Tuple of numeric field names

Raises:

Type Description
ValueError

If vertex is not defined in config

Source code in graphcast/architecture/vertex.py
def numeric_fields_list(self, vertex_name):
    """Get list of numeric fields for a vertex.

    Args:
        vertex_name: Name of the vertex

    Returns:
        tuple: Tuple of numeric field names

    Raises:
        ValueError: If vertex is not defined in config
    """
    if vertex_name in self.vertex_set:
        if vertex_name in self._vcollection_numeric_fields_map:
            return self._vcollection_numeric_fields_map[vertex_name]
        else:
            return ()
    else:
        raise ValueError(
            " Accessing vertex collection numeric fields: vertex"
            f" collection {vertex_name} was not defined in config"
        )

update_vertex(v)

Update vertex configuration.

Parameters:

Name Type Description Default
v Vertex

Vertex configuration to update

required
Source code in graphcast/architecture/vertex.py
def update_vertex(self, v: Vertex):
    """Update vertex configuration.

    Args:
        v: Vertex configuration to update
    """
    self._vertices_map[v.name] = v

vertex_dbname(vertex_name)

Get database name for a vertex.

Parameters:

Name Type Description Default
vertex_name

Name of the vertex

required

Returns:

Name Type Description
str

Database name for the vertex

Raises:

Type Description
KeyError

If vertex is not found

Source code in graphcast/architecture/vertex.py
def vertex_dbname(self, vertex_name):
    """Get database name for a vertex.

    Args:
        vertex_name: Name of the vertex

    Returns:
        str: Database name for the vertex

    Raises:
        KeyError: If vertex is not found
    """
    try:
        value = self._vertices_map[vertex_name].dbname
    except KeyError as e:
        logger.error(
            "Available vertex collections :"
            f" {self._vertices_map.keys()}; vertex collection"
            f" requested : {vertex_name}"
        )
        raise e
    return value