Skip to content

graflo.architecture.contract.declarations.resource

Resource management and processing for graph databases.

This module provides the core resource handling functionality for graph databases. It defines how data resources are processed, transformed, and mapped to graph structures through a system of actors and transformations.

Key Components
  • Resource: Main class for resource processing and transformation
  • ActorWrapper: Wrapper for processing actors
  • ActionContext: Context for processing actions
The resource system allows for
  • Data encoding and transformation
  • Vertex and edge creation
  • Weight management
  • Collection merging
  • Type casting and validation
  • Dynamic vertex-type routing via VertexRouterActor in the pipeline
Example

resource = Resource( ... name="users", ... pipeline=[{"vertex": "user"}, {"edge": {"from": "user", "to": "user"}}], ... encoding=EncodingType.UTF_8 ... ) result = resource(doc)

EdgeInferSpec

Bases: ConfigBaseModel

Selector for controlling inferred edge emission.

Source code in graflo/architecture/contract/declarations/resource.py
class EdgeInferSpec(ConfigBaseModel):
    """Selector for controlling inferred edge emission."""

    source: str = PydanticField(..., description="Edge source vertex name.")
    target: str = PydanticField(..., description="Edge target vertex name.")
    relation: str | None = PydanticField(
        default=None,
        description=(
            "Optional relation discriminator. If omitted, selector applies to all relations "
            "for (source, target)."
        ),
    )

    @property
    def edge_id(self) -> EdgeId:
        return self.source, self.target, self.relation

    def matches(self, edge_id: EdgeId) -> bool:
        source, target, relation = edge_id
        return (
            self.source == source
            and self.target == target
            and (self.relation is None or self.relation == relation)
        )

Resource

Bases: ConfigBaseModel

Resource configuration and processing.

Represents a data resource that can be processed and transformed into graph structures. Manages the processing pipeline through actors and handles data encoding, transformation, and mapping. Suitable for LLM-generated schema constituents.

Dynamic vertex-type routing is handled by vertex_router steps in the pipeline (see :class:~graflo.architecture.pipeline.runtime.actor.VertexRouterActor).

Source code in graflo/architecture/contract/declarations/resource.py
class Resource(ConfigBaseModel):
    """Resource configuration and processing.

    Represents a data resource that can be processed and transformed into graph
    structures. Manages the processing pipeline through actors and handles data
    encoding, transformation, and mapping. Suitable for LLM-generated schema
    constituents.

    Dynamic vertex-type routing is handled by ``vertex_router`` steps in the
    pipeline (see :class:`~graflo.architecture.pipeline.runtime.actor.VertexRouterActor`).
    """

    model_config = {"extra": "forbid"}

    name: str = PydanticField(
        ...,
        description="Name of the resource (e.g. table or file identifier).",
    )
    pipeline: list[dict[str, Any]] = PydanticField(
        ...,
        description="Pipeline of actor steps to apply in sequence (vertex, edge, transform, descend). "
        'Each step is a dict, e.g. {"vertex": "user"} or {"edge": {"from": "a", "to": "b"}}.',
        validation_alias=AliasChoices("pipeline", "apply"),
    )
    encoding: EncodingType = PydanticField(
        default=EncodingType.UTF_8,
        description="Character encoding for input/output (e.g. utf-8, ISO-8859-1).",
    )
    merge_collections: list[str] = PydanticField(
        default_factory=list,
        description="List of collection names to merge when writing to the graph.",
    )
    extra_weights: list[Edge] = PydanticField(
        default_factory=list,
        description="Additional edge weight configurations for this resource.",
    )
    types: dict[str, str] = PydanticField(
        default_factory=dict,
        description='Field name to Python type expression for casting (e.g. {"amount": "float"}).',
    )
    infer_edges: bool = PydanticField(
        default=True,
        description=(
            "If True, infer edges from current vertex population. "
            "If False, emit only edges explicitly declared as edge actors in the pipeline."
        ),
    )
    infer_edge_only: list[EdgeInferSpec] = PydanticField(
        default_factory=list,
        description=(
            "Optional allow-list for inferred edges. Applies only to inferred (greedy) edges, "
            "not explicit edge actors."
        ),
    )
    infer_edge_except: list[EdgeInferSpec] = PydanticField(
        default_factory=list,
        description=(
            "Optional deny-list for inferred edges. Applies only to inferred (greedy) edges, "
            "not explicit edge actors."
        ),
    )

    _root: ActorWrapper = PrivateAttr()
    _types: dict[str, Callable[..., Any]] = PrivateAttr(default_factory=dict)
    _vertex_config: VertexConfig = PrivateAttr()
    _edge_config: EdgeConfig = PrivateAttr()
    _executor: ActorExecutor = PrivateAttr()
    _initialized: bool = PrivateAttr(default=False)

    @model_validator(mode="after")
    def _build_root_and_types(self) -> Resource:
        """Build root ActorWrapper and resolve safe cast functions."""
        from graflo.architecture.pipeline.runtime.actor import ActorWrapper
        from graflo.architecture.pipeline.runtime.executor import ActorExecutor

        object.__setattr__(self, "_root", ActorWrapper(*self.pipeline))
        object.__setattr__(self, "_executor", ActorExecutor(self._root))
        object.__setattr__(self, "_types", {})
        for k, v in self.types.items():
            caster = _resolve_type_caster(v)
            if caster is not None:
                self._types[k] = caster
            else:
                logger.error(
                    "For resource %s for field %s failed to resolve cast type %s",
                    self.name,
                    k,
                    v,
                )
        # Placeholders until schema binds real configs.
        object.__setattr__(self, "_vertex_config", VertexConfig(vertices=[]))
        object.__setattr__(self, "_edge_config", EdgeConfig())
        object.__setattr__(self, "_initialized", False)
        self._validate_infer_edge_spec_policy()
        return self

    def _validate_infer_edge_spec_policy(self) -> None:
        if self.infer_edge_only and self.infer_edge_except:
            raise ValueError(
                "Resource infer_edge_only and infer_edge_except are mutually exclusive."
            )

    def _validate_infer_edge_spec_targets(self, edge_config: EdgeConfig) -> None:
        known_edge_ids = {edge_id for edge_id, _ in edge_config.items()}

        def _validate_list(field_name: str, specs: list[EdgeInferSpec]) -> None:
            unknown: list[EdgeId] = []
            for spec in specs:
                if not any(spec.matches(edge_id) for edge_id in known_edge_ids):
                    unknown.append(spec.edge_id)
            if unknown:
                raise ValueError(
                    f"Resource {field_name} contains unknown edge selectors: {unknown}"
                )

        _validate_list("infer_edge_only", self.infer_edge_only)
        _validate_list("infer_edge_except", self.infer_edge_except)

    @property
    def vertex_config(self) -> VertexConfig:
        """Vertex configuration (set by Schema.finish_init)."""
        return self._vertex_config

    @property
    def edge_config(self) -> EdgeConfig:
        """Edge configuration (set by Schema.finish_init)."""
        return self._edge_config

    @property
    def root(self) -> ActorWrapper:
        """Root actor wrapper for the processing pipeline."""
        return self._root

    def finish_init(
        self,
        vertex_config: VertexConfig,
        edge_config: EdgeConfig,
        transforms: dict[str, ProtoTransform],
        *,
        strict_references: bool = False,
        dynamic_edge_feedback: bool = False,
    ) -> None:
        """Complete resource initialization.

        Initializes the resource with vertex and edge configurations,
        and sets up the processing pipeline. Called by Schema after load.

        Args:
            vertex_config: Configuration for vertices
            edge_config: Configuration for edges
            transforms: Dictionary of available transforms
        """
        self._rebuild_runtime(
            vertex_config=vertex_config,
            edge_config=edge_config,
            transforms=transforms,
            strict_references=strict_references,
            dynamic_edge_feedback=dynamic_edge_feedback,
        )

    def _edge_ids_from_edge_actors(self) -> set[EdgeId]:
        """Collect (source, target, None) for every EdgeActor in this resource's pipeline.

        Used to auto-add to infer_edge_except so inferred edges do not duplicate
        edges produced by explicit edge actors.
        """
        from graflo.architecture.pipeline.runtime.actor import EdgeActor

        edge_actors = [
            a for a in self.root.collect_actors() if isinstance(a, EdgeActor)
        ]
        return {(ea.edge.source, ea.edge.target, None) for ea in edge_actors}

    def _validate_dynamic_edge_vertices_exist(
        self, vertex_config: VertexConfig
    ) -> None:
        """Ensure all vertices implied by dynamic edge controls are declared."""
        known_vertices = set(vertex_config.vertex_set)
        referenced_vertices: set[str] = set()

        for spec in self.infer_edge_only:
            referenced_vertices.add(spec.source)
            referenced_vertices.add(spec.target)

        for spec in self.infer_edge_except:
            referenced_vertices.add(spec.source)
            referenced_vertices.add(spec.target)

        for source, target, _ in self._edge_ids_from_edge_actors():
            referenced_vertices.add(source)
            referenced_vertices.add(target)

        missing_vertices = sorted(referenced_vertices - known_vertices)
        if missing_vertices:
            raise ValueError(
                "Resource dynamic edge references undefined vertices: "
                f"{missing_vertices}. "
                "Declare these vertices in vertex_config before using dynamic/inferred edges."
            )

    def _rebuild_runtime(
        self,
        *,
        vertex_config: VertexConfig,
        edge_config: EdgeConfig,
        transforms: dict[str, ProtoTransform],
        strict_references: bool = False,
        dynamic_edge_feedback: bool = False,
    ) -> None:
        """Rebuild runtime actor initialization state from typed context."""
        object.__setattr__(self, "_vertex_config", vertex_config)
        # Runtime actors may register dynamic edges; keep per-resource edge state.
        local_edge_config = EdgeConfig.model_validate(
            edge_config.to_dict(skip_defaults=False)
        )
        object.__setattr__(self, "_edge_config", local_edge_config)
        self._validate_dynamic_edge_vertices_exist(vertex_config)
        self._validate_infer_edge_spec_targets(self._edge_config)

        baseline_edge_ids = {edge_id for edge_id, _ in edge_config.items()}
        infer_edge_except = {spec.edge_id for spec in self.infer_edge_except}
        # When not using infer_edge_only, auto-add (s,t,None) to infer_edge_except
        # for any edge type handled by explicit EdgeActors in this resource.
        if not self.infer_edge_only:
            infer_edge_except |= self._edge_ids_from_edge_actors()

        from graflo.architecture.pipeline.runtime.actor import ActorInitContext

        logger.debug("total resource actor count : %s", self.root.count())
        init_ctx = ActorInitContext(
            vertex_config=vertex_config,
            edge_config=self._edge_config,
            transforms=transforms,
            infer_edges=self.infer_edges,
            infer_edge_only={spec.edge_id for spec in self.infer_edge_only},
            infer_edge_except=infer_edge_except,
            strict_references=strict_references,
        )
        self.root.finish_init(init_ctx=init_ctx)
        object.__setattr__(self, "_initialized", True)

        if dynamic_edge_feedback:
            # Edge actors register static edge definitions into the resource-local edge
            # config during finish_init(). Optionally propagate newly discovered edges
            # to the shared schema-level edge_config so schema definition and DB
            # writers can see them.
            for edge_id, edge in self._edge_config.items():
                if edge_id in baseline_edge_ids:
                    continue
                edge_config.update_edges(
                    edge.model_copy(deep=True), vertex_config=vertex_config
                )

        logger.debug("total resource actor count (after finit): %s", self.root.count())

        for e in self.extra_weights:
            e.finish_init(vertex_config)

    def __call__(self, doc: dict) -> defaultdict[GraphEntity, list]:
        """Process a document through the resource pipeline.

        Args:
            doc: Document to process

        Returns:
            defaultdict[GraphEntity, list]: Processed graph entities
        """
        if not self._initialized:
            raise RuntimeError(
                f"Resource '{self.name}' must be initialized via finish_init() before use."
            )
        extraction_ctx = self._executor.extract(doc)
        result = self._executor.assemble_result(extraction_ctx)
        return result.entities

    def count(self) -> int:
        """Total number of actors in the resource pipeline."""
        return self.root.count()

edge_config property

Edge configuration (set by Schema.finish_init).

root property

Root actor wrapper for the processing pipeline.

vertex_config property

Vertex configuration (set by Schema.finish_init).

__call__(doc)

Process a document through the resource pipeline.

Parameters:

Name Type Description Default
doc dict

Document to process

required

Returns:

Type Description
defaultdict[GraphEntity, list]

defaultdict[GraphEntity, list]: Processed graph entities

Source code in graflo/architecture/contract/declarations/resource.py
def __call__(self, doc: dict) -> defaultdict[GraphEntity, list]:
    """Process a document through the resource pipeline.

    Args:
        doc: Document to process

    Returns:
        defaultdict[GraphEntity, list]: Processed graph entities
    """
    if not self._initialized:
        raise RuntimeError(
            f"Resource '{self.name}' must be initialized via finish_init() before use."
        )
    extraction_ctx = self._executor.extract(doc)
    result = self._executor.assemble_result(extraction_ctx)
    return result.entities

count()

Total number of actors in the resource pipeline.

Source code in graflo/architecture/contract/declarations/resource.py
def count(self) -> int:
    """Total number of actors in the resource pipeline."""
    return self.root.count()

finish_init(vertex_config, edge_config, transforms, *, strict_references=False, dynamic_edge_feedback=False)

Complete resource initialization.

Initializes the resource with vertex and edge configurations, and sets up the processing pipeline. Called by Schema after load.

Parameters:

Name Type Description Default
vertex_config VertexConfig

Configuration for vertices

required
edge_config EdgeConfig

Configuration for edges

required
transforms dict[str, ProtoTransform]

Dictionary of available transforms

required
Source code in graflo/architecture/contract/declarations/resource.py
def finish_init(
    self,
    vertex_config: VertexConfig,
    edge_config: EdgeConfig,
    transforms: dict[str, ProtoTransform],
    *,
    strict_references: bool = False,
    dynamic_edge_feedback: bool = False,
) -> None:
    """Complete resource initialization.

    Initializes the resource with vertex and edge configurations,
    and sets up the processing pipeline. Called by Schema after load.

    Args:
        vertex_config: Configuration for vertices
        edge_config: Configuration for edges
        transforms: Dictionary of available transforms
    """
    self._rebuild_runtime(
        vertex_config=vertex_config,
        edge_config=edge_config,
        transforms=transforms,
        strict_references=strict_references,
        dynamic_edge_feedback=dynamic_edge_feedback,
    )