Skip to content

graflo.architecture.pipeline.runtime.actor.edge

Edge actor for processing edge data.

EdgeActor

Bases: Actor

Actor for processing edge data.

Operates in three modes determined by configuration:

Static mode (from/to set): both vertex types are declared at config time. The schema Edge is created during finish_init and the __call__ path is unchanged from the original implementation.

Dynamic mode (at least one of source_role/target_role set, with source_type_field/target_type_field accepted as legacy aliases): vertex types for the dynamic side(s) are resolved at extraction time by looking up accumulator slots populated by an upstream VertexRouterActor (slot segment = role or type_field) or a VertexActor with a matching role. The schema Edge is created—or retrieved from cache—per unique (source_type, target_type, relation) triple encountered.

Multi-link mode (links list set): each item in links becomes a dedicated sub-EdgeActor that runs in sequence per row, emitting one edge intent each. Use when one flat row encodes multiple distinct relationships.

Source code in graflo/architecture/pipeline/runtime/actor/edge.py
class EdgeActor(Actor):
    """Actor for processing edge data.

    Operates in three modes determined by configuration:

    **Static mode** (``from``/``to`` set): both vertex types are declared at config
    time.  The schema ``Edge`` is created during ``finish_init`` and the
    ``__call__`` path is unchanged from the original implementation.

    **Dynamic mode** (at least one of ``source_role``/``target_role`` set, with
    ``source_type_field``/``target_type_field`` accepted as legacy aliases):
    vertex types for the dynamic side(s) are
    resolved at extraction time by looking up accumulator slots populated by an
    upstream ``VertexRouterActor`` (slot segment = ``role`` or ``type_field``) or a
    ``VertexActor`` with a matching ``role``.
    The schema ``Edge`` is created—or retrieved from cache—per unique
    ``(source_type, target_type, relation)`` triple encountered.

    **Multi-link mode** (``links`` list set): each item in ``links`` becomes a
    dedicated sub-``EdgeActor`` that runs in sequence per row, emitting one edge
    intent each.  Use when one flat row encodes multiple distinct relationships.
    """

    def __init__(self, config: EdgeActorConfig):
        # Multi-link mode: delegate each link to its own EdgeActor.
        if config.links:
            self._link_actors: list[EdgeActor] = [
                EdgeActor(_link_to_edge_actor_config(lk)) for lk in config.links
            ]
            # Null-out all single-intent state so the dispatch is unambiguous.
            self._source_slot_key = None
            self._target_slot_key = None
            self._static_source = None
            self._static_target = None
            self._relation_map: dict[str, str] = {}
            self._strict_edge_types = False
            self._edge_cache: dict[tuple[str, str, str | None], Edge] = {}
            self._init_ctx: ActorInitContext | None = None
            self.derivation: EdgeDerivation = EdgeDerivation()
            self._pending_vertex_weights: list[Weight] = []
            self._static_relation = None
            self.edge: Edge | None = None
            self.vertex_config: VertexConfig | None = None
            self.edge_config: EdgeConfig | None = None
            self.allowed_vertex_names: set[str] | None = None
            return

        self._link_actors = []

        self._source_slot_key = config.source_role
        self._target_slot_key = config.target_role
        # Static fallback for whichever side is not dynamic.
        self._static_source = config.source
        self._static_target = config.target
        self._relation_map = config.relation_map or {}
        self._strict_edge_types = config.strict_edge_types
        self._edge_cache = {}
        self._init_ctx = None

        self.derivation = config.derivation
        self._pending_vertex_weights = []

        # In dynamic/mixed mode the static relation (if set) is used as a fallback
        # when relation_field yields nothing.
        self._static_relation = None

        # Dynamic mode: at least one side is resolved at extraction time.
        # Static mode: both sides are fixed at config time.
        is_dynamic = (
            self._source_slot_key is not None or self._target_slot_key is not None
        )
        if not is_dynamic:
            payload: dict[str, Any] = {
                "source": config.source,
                "target": config.target,
            }
            if config.relation is not None:
                payload["relation"] = config.relation
            if config.description is not None:
                payload["description"] = config.description
            if config.properties:
                payload["properties"] = config.properties
            for item in config.vertex_weights:
                self._pending_vertex_weights.append(Weight.model_validate(item))
            self.edge: Edge | None = Edge.from_dict(payload)
        else:
            self.edge = None
            self._static_relation = config.relation

        self.vertex_config: VertexConfig | None = None
        self.edge_config: EdgeConfig | None = None
        self.allowed_vertex_names: set[str] | None = None

    @property
    def relation_field(self) -> str | None:
        """Alias for tooling (e.g. plot labels)."""
        return self.derivation.relation_field

    @classmethod
    def from_config(cls, config: EdgeActorConfig) -> "EdgeActor":
        return cls(config)

    def fetch_important_items(self) -> dict[str, Any]:
        if self._link_actors:
            return {"links": str(len(self._link_actors))}
        items: dict[str, Any] = {}
        if self.edge is not None:
            items["source"] = self.edge.source
            items["target"] = self.edge.target
        else:
            if self._source_slot_key is not None:
                items["source_role"] = self._source_slot_key
            elif self._static_source is not None:
                items["source"] = self._static_source
            if self._target_slot_key is not None:
                items["target_role"] = self._target_slot_key
            elif self._static_target is not None:
                items["target"] = self._static_target
        for k in ("match_source", "match_target"):
            v = getattr(self.derivation, k)
            if v is not None:
                items[k] = v
        return items

    def finish_init(self, init_ctx: ActorInitContext) -> None:
        self._init_ctx = init_ctx
        self.vertex_config = init_ctx.vertex_config
        self.edge_config = init_ctx.edge_config
        self.allowed_vertex_names = init_ctx.allowed_vertex_names

        if self._link_actors:
            # Multi-link mode: delegate finish_init to each sub-actor.
            for la in self._link_actors:
                la.finish_init(init_ctx)
            return

        if self.edge is not None:
            # Static mode: register schema Edge now.
            edge_id = self.edge.edge_id
            init_ctx.edge_config.update_edges(
                self.edge, vertex_config=self.vertex_config
            )
            if self.derivation.relation_from_key:
                init_ctx.edge_derivation.mark_relation_from_key(edge_id)
            if self._pending_vertex_weights:
                init_ctx.edge_derivation.merge_vertex_weights(
                    edge_id, self._pending_vertex_weights
                )
            self.edge = init_ctx.edge_config.edge_for(edge_id)
        else:
            # Dynamic mode: cache will be populated per-row.
            self._edge_cache.clear()

    # ------------------------------------------------------------------
    # Dynamic-mode helpers
    # ------------------------------------------------------------------

    def _get_or_create_edge(
        self, source: str, target: str, relation: str | None
    ) -> Edge | None:
        key = (source, target, relation)
        if key in self._edge_cache:
            return self._edge_cache[key]
        if self._strict_edge_types:
            # Skip if this (source, target, relation) was not pre-declared.
            if self.edge_config is not None and key not in self.edge_config:
                logger.debug(
                    "EdgeActor: strict_edge_types=True, skipping undeclared (%s, %s, %s)",
                    source,
                    target,
                    relation,
                )
                return None
        edge = Edge(source=source, target=target, relation=relation)
        if self.vertex_config is not None:
            edge.finish_init(vertex_config=self.vertex_config)
        if self.edge_config is not None and self.vertex_config is not None:
            self.edge_config.update_edges(edge, vertex_config=self.vertex_config)
        self._edge_cache[key] = edge
        logger.debug(
            "EdgeActor: registered dynamic edge (%s, %s, %s)", source, target, relation
        )
        return edge

    def _find_type_at_slot(
        self, ctx: ExtractionContext, slot_lindex: LocationIndex
    ) -> str | None:
        """Scan acc_vertex to find which vertex type has data at *slot_lindex*."""
        for vtype, by_loc in ctx.acc_vertex.items():
            if slot_lindex in by_loc and by_loc[slot_lindex]:
                return vtype
        return None

    # ------------------------------------------------------------------
    # Main dispatch
    # ------------------------------------------------------------------

    def __call__(
        self, ctx: ExtractionContext, lindex: LocationIndex, *nargs: Any, **kwargs: Any
    ) -> ExtractionContext:
        if self._link_actors:
            # Multi-link mode: run each sub-actor in sequence.
            for la in self._link_actors:
                ctx = la(ctx, lindex, *nargs, **kwargs)
            return ctx
        if self._source_slot_key is not None or self._target_slot_key is not None:
            return self._call_dynamic(ctx, lindex, **kwargs)
        return self._call_static(ctx, lindex, **kwargs)

    def _call_static(
        self, ctx: ExtractionContext, lindex: LocationIndex, **kwargs: Any
    ) -> ExtractionContext:
        """Static mode: unchanged behavior from original EdgeActor."""
        assert self.edge is not None
        if self.allowed_vertex_names is not None and (
            self.edge.source not in self.allowed_vertex_names
            or self.edge.target not in self.allowed_vertex_names
        ):
            return ctx
        if (
            self.allowed_vertex_names is None
            and self.vertex_config is not None
            and (
                self.edge.source not in self.vertex_config.vertex_set
                or self.edge.target not in self.vertex_config.vertex_set
            )
        ):
            return ctx

        der = None if self.derivation.is_empty() else self.derivation
        ctx.record_edge_intent(
            edge=self.edge,
            location=lindex,
            derivation=der,
        )
        return ctx

    def _call_dynamic(
        self, ctx: ExtractionContext, lindex: LocationIndex, **kwargs: Any
    ) -> ExtractionContext:
        """Dynamic / mixed mode: resolve dynamic side(s) from VRA accumulator slots.

        Source or target (but not both) may be statically declared; that side's
        type is taken directly from config rather than looked up in the accumulator.
        """
        raw_observation = kwargs.get("doc", {})
        if not isinstance(raw_observation, dict):
            logger.debug(
                "EdgeActor: expected dict observation, got %s, skipping",
                type(raw_observation).__name__,
            )
            return ctx

        buffer_items: list[Any] = list(ctx.transform_buffer.get(lindex, []))
        doc = merge_observation_with_transform_buffer(raw_observation, buffer_items)
        ctx.obs_buffer[lindex] = dict(doc)

        # --- source type ---
        if self._source_slot_key is not None:
            source_slot_lindex = lindex.extend((self._source_slot_key, 0))
            source_type = self._find_type_at_slot(ctx, source_slot_lindex)
            if source_type is None:
                logger.debug(
                    "EdgeActor: no vertex data at source slot '%s', skipping",
                    self._source_slot_key,
                )
                return ctx
        else:
            # Mixed mode: static source
            assert self._static_source is not None
            source_type = self._static_source

        if (
            self.vertex_config is not None
            and source_type not in self.vertex_config.vertex_set
        ):
            logger.debug(
                "EdgeActor: source type '%s' not in vertex_set, skipping", source_type
            )
            return ctx

        # --- target type ---
        if self._target_slot_key is not None:
            target_slot_lindex = lindex.extend((self._target_slot_key, 0))
            target_type = self._find_type_at_slot(ctx, target_slot_lindex)
            if target_type is None:
                logger.debug(
                    "EdgeActor: no vertex data at target slot '%s', skipping",
                    self._target_slot_key,
                )
                return ctx
        else:
            # Mixed mode: static target
            assert self._static_target is not None
            target_type = self._static_target

        if (
            self.vertex_config is not None
            and target_type not in self.vertex_config.vertex_set
        ):
            logger.debug(
                "EdgeActor: target type '%s' not in vertex_set, skipping", target_type
            )
            return ctx

        # allowed_vertex_names early-exit
        if self.allowed_vertex_names is not None and (
            source_type not in self.allowed_vertex_names
            or target_type not in self.allowed_vertex_names
        ):
            return ctx

        # --- relation ---
        raw_relation: str | None
        if self.derivation.relation_field:
            raw_relation = doc.get(self.derivation.relation_field)
        else:
            raw_relation = None

        if raw_relation is not None:
            relation: str | None = self._relation_map.get(raw_relation, raw_relation)
        else:
            relation = self._static_relation

        # Create / retrieve cached schema Edge.
        edge = self._get_or_create_edge(source_type, target_type, relation)
        if edge is None:
            return ctx

        # Build derivation: slot names for dynamic sides so render_edge can filter.
        derivation = EdgeDerivation(
            match_source=self._source_slot_key,
            match_target=self._target_slot_key,
        )
        ctx.record_edge_intent(edge=edge, location=lindex, derivation=derivation)
        return ctx

    def references_vertices(self) -> set[str]:
        if self._link_actors:
            result: set[str] = set()
            for la in self._link_actors:
                result |= la.references_vertices()
            return result
        if self.edge is not None:
            return {self.edge.source, self.edge.target}
        static: set[str] = set()
        if self._static_source:
            static.add(self._static_source)
        if self._static_target:
            static.add(self._static_target)
        return (
            static
            | {s for s, _, _ in self._edge_cache}
            | {t for _, t, _ in self._edge_cache}
        )

relation_field property

Alias for tooling (e.g. plot labels).