Skip to content

graflo.architecture.pipeline

Pipeline runtime (execution). Declarations live in graflo.architecture.contract.

Actor

Bases: ABC

Abstract base class for all actors in the system.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
class Actor(ABC):
    """Abstract base class for all actors in the system."""

    @abstractmethod
    def __call__(
        self, ctx: ExtractionContext, lindex: LocationIndex, *nargs, **kwargs
    ) -> ExtractionContext:
        """Execute the actor's main processing logic."""
        pass

    def fetch_important_items(self) -> dict[str, object]:
        """Get a dictionary of important items for string representation."""
        return {}

    def finish_init(self, init_ctx: ActorInitContext) -> None:
        """Complete initialization of the actor."""
        pass

    def init_transforms(self, init_ctx: ActorInitContext) -> None:
        """Initialize transformations for the actor."""
        pass

    def count(self) -> int:
        """Get the count of items processed by this actor."""
        return 1

    def references_vertices(self) -> set[str]:
        """Return vertex names this actor references."""
        return set()

    def _filter_items(self, items: dict[str, object]) -> dict[str, object]:
        """Filter out None and empty items."""
        return {k: v for k, v in items.items() if v is not None and v}

    def _stringify_items(self, items: dict[str, object]) -> dict[str, str]:
        """Convert items to string representation."""
        return {
            k: ", ".join(list(v)) if isinstance(v, (tuple, list)) else str(v)
            for k, v in items.items()
        }

    def _fetch_items_from_dict(self, keys: tuple[str, ...]) -> dict[str, object]:
        """Helper method to extract items from instance dict for string representation."""
        return {k: self.__dict__[k] for k in keys if k in self.__dict__}

    def __str__(self) -> str:
        """Get string representation of the actor."""
        d = self.fetch_important_items()
        d = self._filter_items(d)
        d = self._stringify_items(d)
        d_list = [[k, d[k]] for k in sorted(d)]
        d_list_b = [type(self).__name__] + [": ".join(x) for x in d_list]
        return "\n".join(d_list_b)

    __repr__ = __str__

    def fetch_actors(self, level: int, edges: list) -> tuple[int, type, str, list]:
        """Fetch actor information for tree representation."""
        return level, type(self), str(self), edges

__call__(ctx, lindex, *nargs, **kwargs) abstractmethod

Execute the actor's main processing logic.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
@abstractmethod
def __call__(
    self, ctx: ExtractionContext, lindex: LocationIndex, *nargs, **kwargs
) -> ExtractionContext:
    """Execute the actor's main processing logic."""
    pass

__str__()

Get string representation of the actor.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
def __str__(self) -> str:
    """Get string representation of the actor."""
    d = self.fetch_important_items()
    d = self._filter_items(d)
    d = self._stringify_items(d)
    d_list = [[k, d[k]] for k in sorted(d)]
    d_list_b = [type(self).__name__] + [": ".join(x) for x in d_list]
    return "\n".join(d_list_b)

count()

Get the count of items processed by this actor.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
def count(self) -> int:
    """Get the count of items processed by this actor."""
    return 1

fetch_actors(level, edges)

Fetch actor information for tree representation.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
def fetch_actors(self, level: int, edges: list) -> tuple[int, type, str, list]:
    """Fetch actor information for tree representation."""
    return level, type(self), str(self), edges

fetch_important_items()

Get a dictionary of important items for string representation.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
def fetch_important_items(self) -> dict[str, object]:
    """Get a dictionary of important items for string representation."""
    return {}

finish_init(init_ctx)

Complete initialization of the actor.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
def finish_init(self, init_ctx: ActorInitContext) -> None:
    """Complete initialization of the actor."""
    pass

init_transforms(init_ctx)

Initialize transformations for the actor.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
def init_transforms(self, init_ctx: ActorInitContext) -> None:
    """Initialize transformations for the actor."""
    pass

references_vertices()

Return vertex names this actor references.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
def references_vertices(self) -> set[str]:
    """Return vertex names this actor references."""
    return set()

ActorConstants

Constants used throughout the actor system.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
class ActorConstants:
    """Constants used throughout the actor system."""

    DESCEND_KEY: str = "key"
    DRESSING_TRANSFORMED_VALUE_KEY: str = "__value__"

ActorExecutor

Owns runtime extraction and assembly orchestration for an ActorWrapper.

Source code in graflo/architecture/pipeline/runtime/executor.py
class ActorExecutor:
    """Owns runtime extraction and assembly orchestration for an ActorWrapper."""

    def __init__(self, root: ActorWrapper):
        self.root = root

    def extract(self, doc: dict) -> ExtractionContext:
        extraction_ctx = ExtractionContext()
        return self.root(extraction_ctx, doc=doc)

    def assemble(
        self, extraction_ctx: ExtractionContext
    ) -> defaultdict[GraphEntity, list]:
        assembly_ctx = AssemblyContext.from_extraction(extraction_ctx)
        return self.root.assemble(assembly_ctx)

    def assemble_result(self, extraction_ctx: ExtractionContext) -> GraphAssemblyResult:
        return GraphAssemblyResult(entities=self.assemble(extraction_ctx))

ActorInitContext dataclass

Typed initialization state shared across actor tree.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
@dataclass(slots=True)
class ActorInitContext:
    """Typed initialization state shared across actor tree."""

    vertex_config: VertexConfig
    edge_config: EdgeConfig
    transforms: dict[str, ProtoTransform]
    edge_derivation: EdgeDerivationRegistry = field(
        default_factory=EdgeDerivationRegistry
    )
    allowed_vertex_names: set[str] | None = None
    infer_edges: bool = True
    infer_edge_only: set[EdgeId] = field(default_factory=set)
    infer_edge_except: set[EdgeId] = field(default_factory=set)
    strict_references: bool = False
    skip_actors_on_missing_input_keys: bool = False
    target_db_flavor: DBType | None = None

ActorWrapper

Wrapper class for managing actor instances.

Source code in graflo/architecture/pipeline/runtime/actor/wrapper.py
class ActorWrapper:
    """Wrapper class for managing actor instances."""

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        config = parse_root_config(*args, **kwargs)
        w = ActorWrapper.from_config(config)
        self.actor = w.actor
        self.init_ctx = w.init_ctx

    @property
    def vertex_config(self) -> VertexConfig:
        return self.init_ctx.vertex_config

    @property
    def edge_config(self) -> EdgeConfig:
        return self.init_ctx.edge_config

    @property
    def infer_edges(self) -> bool:
        return self.init_ctx.infer_edges

    @property
    def infer_edge_only(self) -> set[EdgeId]:
        return self.init_ctx.infer_edge_only

    @property
    def infer_edge_except(self) -> set[EdgeId]:
        return self.init_ctx.infer_edge_except

    @property
    def target_db_flavor(self) -> DBType | None:
        return self.init_ctx.target_db_flavor

    def init_transforms(self, init_ctx: ActorInitContext) -> None:
        self.init_ctx = init_ctx
        self.actor.init_transforms(init_ctx)

    def finish_init(self, init_ctx: ActorInitContext) -> None:
        self.init_ctx = init_ctx
        self.actor.init_transforms(init_ctx)
        self.actor.finish_init(init_ctx)

    def count(self) -> int:
        return self.actor.count()

    @classmethod
    def from_config(cls, config: ActorConfig) -> ActorWrapper:
        if isinstance(config, VertexActorConfig):
            actor = VertexActor.from_config(config)
        elif isinstance(config, TransformActorConfig):
            actor = TransformActor.from_config(config)
        elif isinstance(config, EdgeActorConfig):
            actor = EdgeActor.from_config(config)
        elif isinstance(config, DescendActorConfig):
            actor = DescendActor.from_config(config)
        elif isinstance(config, VertexRouterActorConfig):
            actor = VertexRouterActor.from_config(config)
        else:
            raise ValueError(
                f"Expected VertexActorConfig, TransformActorConfig, EdgeActorConfig, "
                f"DescendActorConfig, or VertexRouterActorConfig, got {type(config)}"
            )
        wrapper = cls.__new__(cls)
        wrapper.actor = actor
        wrapper.init_ctx = ActorInitContext(
            vertex_config=VertexConfig(vertices=[]),
            edge_config=EdgeConfig(),
            transforms={},
            allowed_vertex_names=None,
            infer_edges=True,
            infer_edge_only=set(),
            infer_edge_except=set(),
        )
        return wrapper

    @classmethod
    def _from_step(cls, step: dict[str, Any]) -> ActorWrapper:
        config = validate_actor_step(normalize_actor_step(step))
        return cls.from_config(config)

    def __call__(
        self,
        ctx: ExtractionContext,
        lindex: LocationIndex = LocationIndex(),
        *nargs: Any,
        **kwargs: Any,
    ) -> ExtractionContext:
        ctx = self.actor(ctx, lindex, *nargs, **kwargs)
        return ctx

    def assemble(
        self, ctx: ExtractionContext | AssemblyContext | ActionContext
    ) -> defaultdict[GraphEntity, list]:
        if isinstance(ctx, AssemblyContext):
            assembly_ctx = ctx
        else:
            assembly_ctx = AssemblyContext.from_extraction(ctx)
        assemble_edges(
            ctx=assembly_ctx,
            vertex_config=self.vertex_config,
            edge_config=self.edge_config,
            infer_edges=self.infer_edges,
            infer_edge_only=self.infer_edge_only,
            infer_edge_except=self.infer_edge_except,
            target_db_flavor=self.target_db_flavor,
            edge_derivation=self.init_ctx.edge_derivation,
        )

        for vertex_name, dd in assembly_ctx.acc_vertex.items():
            for lindex, vertex_list in dd.items():
                vertex_list = [x.vertex for x in vertex_list]
                vertex_list_updated = merge_doc_basis(
                    vertex_list,
                    tuple(self.vertex_config.identity_fields(vertex_name)),
                )
                vertex_list_updated = pick_unique_dict(vertex_list_updated)
                assembly_ctx.acc_global[vertex_name] += vertex_list_updated

        assembly_ctx = add_blank_collections(assembly_ctx, self.vertex_config)

        if isinstance(ctx, ActionContext):
            ctx.acc_global = assembly_ctx.acc_global
            return ctx.acc_global
        return assembly_ctx.acc_global

    @classmethod
    def from_dict(cls, data: dict | list) -> ActorWrapper:
        if isinstance(data, list):
            return cls(*data)
        return cls(**data)

    def assemble_tree(self, fig_path: Path | None = None):
        import logging

        logger = logging.getLogger(__name__)
        _, _, _, edges = self.fetch_actors(0, [])
        logger.info("%s", len(edges))
        try:
            import networkx as nx
        except ImportError as e:
            logger.error("not able to import networks %s", e)
            return None
        nodes = {}
        g = nx.MultiDiGraph()
        for ha, hb, pa, pb in edges:
            nodes[ha] = pa
            nodes[hb] = pb
        from graflo.plot.plotter import fillcolor_palette

        map_class2color = {
            DescendActor: fillcolor_palette["green"],
            VertexActor: "orange",
            VertexRouterActor: fillcolor_palette["peach"],
            EdgeActor: fillcolor_palette["violet"],
            TransformActor: fillcolor_palette["blue"],
        }

        for n, props in nodes.items():
            nodes[n]["fillcolor"] = map_class2color[props["class"]]
            nodes[n]["style"] = "filled"
            nodes[n]["color"] = "brown"

        edges = [(ha, hb) for ha, hb, _, _ in edges]
        g.add_edges_from(edges)
        g.add_nodes_from(nodes.items())

        if fig_path is not None:
            ag = nx.nx_agraph.to_agraph(g)
            ag.draw(fig_path, "pdf", prog="dot")
            return None
        return g

    def fetch_actors(self, level: int, edges: list) -> tuple[int, type, str, list]:
        return self.actor.fetch_actors(level, edges)

    def collect_actors(self) -> list[Actor]:
        actors = [self.actor]
        if isinstance(self.actor, DescendActor):
            for descendant in self.actor.descendants:
                actors.extend(descendant.collect_actors())
        return actors

    def find_descendants(
        self,
        predicate: Callable[[ActorWrapper], bool] | None = None,
        *,
        actor_type: type[Actor] | None = None,
        **attr_in: Any,
    ) -> list[ActorWrapper]:
        if predicate is None:

            def _predicate(w: ActorWrapper) -> bool:
                if actor_type is not None and not isinstance(w.actor, actor_type):
                    return False
                for attr, allowed in attr_in.items():
                    if allowed is None:
                        continue
                    val = getattr(w.actor, attr, None)
                    if val not in allowed:
                        return False
                return True

            predicate = _predicate

        result: list[ActorWrapper] = []
        if isinstance(self.actor, DescendActor):
            for d in self.actor.descendants:
                if predicate(d):
                    result.append(d)
                result.extend(d.find_descendants(predicate=predicate))
        return result

    def remove_descendants_if(self, predicate: Callable[[ActorWrapper], bool]) -> None:
        if isinstance(self.actor, DescendActor):
            for d in list(self.actor.descendants):
                d.remove_descendants_if(predicate=predicate)
            self.actor._descendants[:] = [
                d
                for d in self.actor.descendants
                if not predicate(d)
                and not (isinstance(d.actor, DescendActor) and d.count() == 0)
            ]

DescendActor

Bases: Actor

Actor for processing hierarchical data structures.

Source code in graflo/architecture/pipeline/runtime/actor/descend.py
class DescendActor(Actor):
    """Actor for processing hierarchical data structures."""

    def __init__(
        self,
        key: str | None,
        any_key: bool = False,
        *,
        _descendants: list[ActorWrapper] | None = None,
    ):
        self.key = key
        self.any_key = any_key
        self._descendants: list[ActorWrapper] = (
            list(_descendants) if _descendants else []
        )
        self._descendants_sorted = True
        self._descendants.sort(key=lambda x: _NodeTypePriority[type(x.actor)])

    def fetch_important_items(self) -> dict[str, Any]:
        items = self._fetch_items_from_dict(("key",))
        if self.any_key:
            items["any_key"] = True
        return items

    def add_descendant(self, d: ActorWrapper) -> None:
        self._descendants.append(d)
        self._descendants_sorted = False

    def count(self) -> int:
        return sum(d.count() for d in self.descendants)

    @property
    def descendants(self) -> list[ActorWrapper]:
        if not self._descendants_sorted:
            self._descendants.sort(key=lambda x: _NodeTypePriority[type(x.actor)])
            self._descendants_sorted = True
        return self._descendants

    @classmethod
    def from_config(cls, config: DescendActorConfig) -> DescendActor:
        from .wrapper import ActorWrapper

        wrappers = [ActorWrapper.from_config(c) for c in config.pipeline]
        return cls(key=config.key, any_key=config.any_key, _descendants=wrappers)

    def _infer_vertex_descendants_from_transforms(
        self, init_ctx: ActorInitContext
    ) -> None:
        from .transform import TransformActor
        from .vertex import VertexActor

        if any(isinstance(an.actor, VertexActor) for an in self.descendants):
            return

        transform_output_fields: set[str] = set()
        for an in self.descendants:
            if isinstance(an.actor, TransformActor):
                transform_output_fields.update(str(k) for k in an.actor.t.rename.keys())

        if not transform_output_fields:
            return

        inferred_vertices: list[str] = []
        for vertex_name in sorted(init_ctx.vertex_config.vertex_set):
            identity_fields = {
                f for f in init_ctx.vertex_config.identity_fields(vertex_name)
            }
            if identity_fields and identity_fields.issubset(transform_output_fields):
                inferred_vertices.append(vertex_name)

        if not inferred_vertices:
            return

        existing_targets: set[str] = set()
        for an in self.descendants:
            existing_targets.update(
                str(v) for v in an.actor.references_vertices() if v is not None
            )
        for vertex_name in inferred_vertices:
            if vertex_name in existing_targets:
                continue
            from .wrapper import ActorWrapper

            self.add_descendant(
                ActorWrapper.from_config(VertexActorConfig(vertex=vertex_name))
            )
            logger.debug(
                "DescendActor: inferred implicit VertexActor(%s) from untargeted transform fields %s",
                vertex_name,
                sorted(transform_output_fields),
            )

    def init_transforms(self, init_ctx: ActorInitContext) -> None:
        for an in self.descendants:
            an.init_transforms(init_ctx)

    def finish_init(self, init_ctx: ActorInitContext) -> None:
        self.vertex_config = init_ctx.vertex_config
        self._infer_vertex_descendants_from_transforms(init_ctx)
        for an in self.descendants:
            an.finish_init(init_ctx)

    def _expand_document(self, doc: dict | list) -> list[tuple[str | None, Any]]:
        if self.key is not None:
            if isinstance(doc, dict) and self.key in doc:
                items = doc[self.key]
                aux = items if isinstance(items, list) else [items]
                return [(self.key, item) for item in aux]
            return []
        elif self.any_key:
            if isinstance(doc, dict):
                result = []
                for key, items in doc.items():
                    aux = items if isinstance(items, list) else [items]
                    result.extend([(key, item) for item in aux])
                return result
            return []
        else:
            if isinstance(doc, list):
                return [(None, item) for item in doc]
            return [(None, doc)]

    def __call__(self, ctx: Any, lindex: Any, *nargs: Any, **kwargs: Any) -> Any:
        doc: Any = kwargs.pop("doc")
        if doc is None:
            raise ValueError(f"{type(self).__name__}: doc should be provided")
        if not doc:
            return ctx

        doc_expanded = self._expand_document(doc)
        if not doc_expanded:
            return ctx

        logger.debug("Expanding %s items", len(doc_expanded))

        for idoc, (key, sub_doc) in enumerate(doc_expanded):
            logger.debug("Processing item %s/%s", idoc + 1, len(doc_expanded))
            extra_step = (idoc,) if key is None else (key, idoc)
            child_lindex = lindex.extend(extra_step)
            if isinstance(sub_doc, dict):
                nargs_tuple: tuple[Any, ...] = ()
            else:
                nargs_tuple = (sub_doc,)

            for j, anw in enumerate(self.descendants):
                logger.debug(
                    "%s: %s/%s",
                    type(anw.actor).__name__,
                    j + 1,
                    len(self.descendants),
                )
                if isinstance(sub_doc, dict) and isinstance(anw.actor, TransformActor):
                    buf = list(ctx.transform_buffer.get(child_lindex, []))
                    feed_doc = merge_observation_with_transform_buffer(sub_doc, buf)
                    child_kwargs = {**kwargs, "doc": feed_doc}
                elif isinstance(sub_doc, dict):
                    child_kwargs = {**kwargs, "doc": sub_doc}
                else:
                    child_kwargs = kwargs
                ctx = anw(ctx, child_lindex, *nargs_tuple, **child_kwargs)
        return ctx

    def fetch_actors(self, level: int, edges: list) -> tuple[int, type, str, list]:
        label_current = str(self)
        cname_current = type(self)
        hash_current = hash((level, cname_current, label_current))
        logger.info("%s, %s", hash_current, (level, cname_current, label_current))
        props_current = {"label": label_current, "class": cname_current, "level": level}
        for d in self.descendants:
            level_a, cname, label_a, edges_a = d.fetch_actors(level + 1, edges)
            hash_a = hash((level_a, cname, label_a))
            props_a = {"label": label_a, "class": cname, "level": level_a}
            edges = [(hash_current, hash_a, props_current, props_a)] + edges_a
        return level, type(self), str(self), edges

EdgeActor

Bases: Actor

Actor for processing edge data.

Operates in three modes determined by configuration:

Static mode (from/to set): both vertex types are declared at config time. The schema Edge is created during finish_init and the __call__ path is unchanged from the original implementation.

Dynamic mode (at least one of source_role/target_role set, with source_type_field/target_type_field accepted as legacy aliases): vertex types for the dynamic side(s) are resolved at extraction time by looking up accumulator slots populated by an upstream VertexRouterActor (slot segment = role or type_field) or a VertexActor with a matching role. The schema Edge is created—or retrieved from cache—per unique (source_type, target_type, relation) triple encountered.

Multi-link mode (links list set): each item in links becomes a dedicated sub-EdgeActor that runs in sequence per row, emitting one edge intent each. Use when one flat row encodes multiple distinct relationships.

Source code in graflo/architecture/pipeline/runtime/actor/edge.py
class EdgeActor(Actor):
    """Actor for processing edge data.

    Operates in three modes determined by configuration:

    **Static mode** (``from``/``to`` set): both vertex types are declared at config
    time.  The schema ``Edge`` is created during ``finish_init`` and the
    ``__call__`` path is unchanged from the original implementation.

    **Dynamic mode** (at least one of ``source_role``/``target_role`` set, with
    ``source_type_field``/``target_type_field`` accepted as legacy aliases):
    vertex types for the dynamic side(s) are
    resolved at extraction time by looking up accumulator slots populated by an
    upstream ``VertexRouterActor`` (slot segment = ``role`` or ``type_field``) or a
    ``VertexActor`` with a matching ``role``.
    The schema ``Edge`` is created—or retrieved from cache—per unique
    ``(source_type, target_type, relation)`` triple encountered.

    **Multi-link mode** (``links`` list set): each item in ``links`` becomes a
    dedicated sub-``EdgeActor`` that runs in sequence per row, emitting one edge
    intent each.  Use when one flat row encodes multiple distinct relationships.
    """

    def __init__(self, config: EdgeActorConfig):
        # Multi-link mode: delegate each link to its own EdgeActor.
        if config.links:
            self._link_actors: list[EdgeActor] = [
                EdgeActor(_link_to_edge_actor_config(lk)) for lk in config.links
            ]
            # Null-out all single-intent state so the dispatch is unambiguous.
            self._source_slot_key = None
            self._target_slot_key = None
            self._static_source = None
            self._static_target = None
            self._relation_map: dict[str, str] = {}
            self._strict_edge_types = False
            self._edge_cache: dict[tuple[str, str, str | None], Edge] = {}
            self._init_ctx: ActorInitContext | None = None
            self.derivation: EdgeDerivation = EdgeDerivation()
            self._pending_vertex_weights: list[Weight] = []
            self._static_relation = None
            self.edge: Edge | None = None
            self.vertex_config: VertexConfig | None = None
            self.edge_config: EdgeConfig | None = None
            self.allowed_vertex_names: set[str] | None = None
            return

        self._link_actors = []

        self._source_slot_key = config.source_role
        self._target_slot_key = config.target_role
        # Static fallback for whichever side is not dynamic.
        self._static_source = config.source
        self._static_target = config.target
        self._relation_map = config.relation_map or {}
        self._strict_edge_types = config.strict_edge_types
        self._edge_cache = {}
        self._init_ctx = None

        self.derivation = config.derivation
        self._pending_vertex_weights = []

        # In dynamic/mixed mode the static relation (if set) is used as a fallback
        # when relation_field yields nothing.
        self._static_relation = None

        # Dynamic mode: at least one side is resolved at extraction time.
        # Static mode: both sides are fixed at config time.
        is_dynamic = (
            self._source_slot_key is not None or self._target_slot_key is not None
        )
        if not is_dynamic:
            payload: dict[str, Any] = {
                "source": config.source,
                "target": config.target,
            }
            if config.relation is not None:
                payload["relation"] = config.relation
            if config.description is not None:
                payload["description"] = config.description
            if config.properties:
                payload["properties"] = config.properties
            for item in config.vertex_weights:
                self._pending_vertex_weights.append(Weight.model_validate(item))
            self.edge: Edge | None = Edge.from_dict(payload)
        else:
            self.edge = None
            self._static_relation = config.relation

        self.vertex_config: VertexConfig | None = None
        self.edge_config: EdgeConfig | None = None
        self.allowed_vertex_names: set[str] | None = None

    @property
    def relation_field(self) -> str | None:
        """Alias for tooling (e.g. plot labels)."""
        return self.derivation.relation_field

    @classmethod
    def from_config(cls, config: EdgeActorConfig) -> "EdgeActor":
        return cls(config)

    def fetch_important_items(self) -> dict[str, Any]:
        if self._link_actors:
            return {"links": str(len(self._link_actors))}
        items: dict[str, Any] = {}
        if self.edge is not None:
            items["source"] = self.edge.source
            items["target"] = self.edge.target
        else:
            if self._source_slot_key is not None:
                items["source_role"] = self._source_slot_key
            elif self._static_source is not None:
                items["source"] = self._static_source
            if self._target_slot_key is not None:
                items["target_role"] = self._target_slot_key
            elif self._static_target is not None:
                items["target"] = self._static_target
        for k in ("match_source", "match_target"):
            v = getattr(self.derivation, k)
            if v is not None:
                items[k] = v
        return items

    def finish_init(self, init_ctx: ActorInitContext) -> None:
        self._init_ctx = init_ctx
        self.vertex_config = init_ctx.vertex_config
        self.edge_config = init_ctx.edge_config
        self.allowed_vertex_names = init_ctx.allowed_vertex_names

        if self._link_actors:
            # Multi-link mode: delegate finish_init to each sub-actor.
            for la in self._link_actors:
                la.finish_init(init_ctx)
            return

        if self.edge is not None:
            # Static mode: register schema Edge now.
            edge_id = self.edge.edge_id
            init_ctx.edge_config.update_edges(
                self.edge, vertex_config=self.vertex_config
            )
            if self.derivation.relation_from_key:
                init_ctx.edge_derivation.mark_relation_from_key(edge_id)
            if self._pending_vertex_weights:
                init_ctx.edge_derivation.merge_vertex_weights(
                    edge_id, self._pending_vertex_weights
                )
            self.edge = init_ctx.edge_config.edge_for(edge_id)
        else:
            # Dynamic mode: cache will be populated per-row.
            self._edge_cache.clear()

    # ------------------------------------------------------------------
    # Dynamic-mode helpers
    # ------------------------------------------------------------------

    def _get_or_create_edge(
        self, source: str, target: str, relation: str | None
    ) -> Edge | None:
        key = (source, target, relation)
        if key in self._edge_cache:
            return self._edge_cache[key]
        if self._strict_edge_types:
            # Skip if this (source, target, relation) was not pre-declared.
            if self.edge_config is not None and key not in self.edge_config:
                logger.debug(
                    "EdgeActor: strict_edge_types=True, skipping undeclared (%s, %s, %s)",
                    source,
                    target,
                    relation,
                )
                return None
        edge = Edge(source=source, target=target, relation=relation)
        if self.vertex_config is not None:
            edge.finish_init(vertex_config=self.vertex_config)
        if self.edge_config is not None and self.vertex_config is not None:
            self.edge_config.update_edges(edge, vertex_config=self.vertex_config)
        self._edge_cache[key] = edge
        logger.debug(
            "EdgeActor: registered dynamic edge (%s, %s, %s)", source, target, relation
        )
        return edge

    def _find_type_at_slot(
        self, ctx: ExtractionContext, slot_lindex: LocationIndex
    ) -> str | None:
        """Scan acc_vertex to find which vertex type has data at *slot_lindex*."""
        for vtype, by_loc in ctx.acc_vertex.items():
            if slot_lindex in by_loc and by_loc[slot_lindex]:
                return vtype
        return None

    # ------------------------------------------------------------------
    # Main dispatch
    # ------------------------------------------------------------------

    def __call__(
        self, ctx: ExtractionContext, lindex: LocationIndex, *nargs: Any, **kwargs: Any
    ) -> ExtractionContext:
        if self._link_actors:
            # Multi-link mode: run each sub-actor in sequence.
            for la in self._link_actors:
                ctx = la(ctx, lindex, *nargs, **kwargs)
            return ctx
        if self._source_slot_key is not None or self._target_slot_key is not None:
            return self._call_dynamic(ctx, lindex, **kwargs)
        return self._call_static(ctx, lindex, **kwargs)

    def _call_static(
        self, ctx: ExtractionContext, lindex: LocationIndex, **kwargs: Any
    ) -> ExtractionContext:
        """Static mode: unchanged behavior from original EdgeActor."""
        assert self.edge is not None
        if self.allowed_vertex_names is not None and (
            self.edge.source not in self.allowed_vertex_names
            or self.edge.target not in self.allowed_vertex_names
        ):
            return ctx
        if (
            self.allowed_vertex_names is None
            and self.vertex_config is not None
            and (
                self.edge.source not in self.vertex_config.vertex_set
                or self.edge.target not in self.vertex_config.vertex_set
            )
        ):
            return ctx

        der = None if self.derivation.is_empty() else self.derivation
        ctx.record_edge_intent(
            edge=self.edge,
            location=lindex,
            derivation=der,
        )
        return ctx

    def _call_dynamic(
        self, ctx: ExtractionContext, lindex: LocationIndex, **kwargs: Any
    ) -> ExtractionContext:
        """Dynamic / mixed mode: resolve dynamic side(s) from VRA accumulator slots.

        Source or target (but not both) may be statically declared; that side's
        type is taken directly from config rather than looked up in the accumulator.
        """
        raw_observation = kwargs.get("doc", {})
        if not isinstance(raw_observation, dict):
            logger.debug(
                "EdgeActor: expected dict observation, got %s, skipping",
                type(raw_observation).__name__,
            )
            return ctx

        buffer_items: list[Any] = list(ctx.transform_buffer.get(lindex, []))
        doc = merge_observation_with_transform_buffer(raw_observation, buffer_items)
        ctx.obs_buffer[lindex] = dict(doc)

        # --- source type ---
        if self._source_slot_key is not None:
            source_slot_lindex = lindex.extend((self._source_slot_key, 0))
            source_type = self._find_type_at_slot(ctx, source_slot_lindex)
            if source_type is None:
                logger.debug(
                    "EdgeActor: no vertex data at source slot '%s', skipping",
                    self._source_slot_key,
                )
                return ctx
        else:
            # Mixed mode: static source
            assert self._static_source is not None
            source_type = self._static_source

        if (
            self.vertex_config is not None
            and source_type not in self.vertex_config.vertex_set
        ):
            logger.debug(
                "EdgeActor: source type '%s' not in vertex_set, skipping", source_type
            )
            return ctx

        # --- target type ---
        if self._target_slot_key is not None:
            target_slot_lindex = lindex.extend((self._target_slot_key, 0))
            target_type = self._find_type_at_slot(ctx, target_slot_lindex)
            if target_type is None:
                logger.debug(
                    "EdgeActor: no vertex data at target slot '%s', skipping",
                    self._target_slot_key,
                )
                return ctx
        else:
            # Mixed mode: static target
            assert self._static_target is not None
            target_type = self._static_target

        if (
            self.vertex_config is not None
            and target_type not in self.vertex_config.vertex_set
        ):
            logger.debug(
                "EdgeActor: target type '%s' not in vertex_set, skipping", target_type
            )
            return ctx

        # allowed_vertex_names early-exit
        if self.allowed_vertex_names is not None and (
            source_type not in self.allowed_vertex_names
            or target_type not in self.allowed_vertex_names
        ):
            return ctx

        # --- relation ---
        raw_relation: str | None
        if self.derivation.relation_field:
            raw_relation = doc.get(self.derivation.relation_field)
        else:
            raw_relation = None

        if raw_relation is not None:
            relation: str | None = self._relation_map.get(raw_relation, raw_relation)
        else:
            relation = self._static_relation

        # Create / retrieve cached schema Edge.
        edge = self._get_or_create_edge(source_type, target_type, relation)
        if edge is None:
            return ctx

        # Build derivation: slot names for dynamic sides so render_edge can filter.
        derivation = EdgeDerivation(
            match_source=self._source_slot_key,
            match_target=self._target_slot_key,
        )
        ctx.record_edge_intent(edge=edge, location=lindex, derivation=derivation)
        return ctx

    def references_vertices(self) -> set[str]:
        if self._link_actors:
            result: set[str] = set()
            for la in self._link_actors:
                result |= la.references_vertices()
            return result
        if self.edge is not None:
            return {self.edge.source, self.edge.target}
        static: set[str] = set()
        if self._static_source:
            static.add(self._static_source)
        if self._static_target:
            static.add(self._static_target)
        return (
            static
            | {s for s, _, _ in self._edge_cache}
            | {t for _, t, _ in self._edge_cache}
        )

relation_field property

Alias for tooling (e.g. plot labels).

TransformActor

Bases: Actor

Actor for applying transformations to data.

Source code in graflo/architecture/pipeline/runtime/actor/transform.py
class TransformActor(Actor):
    """Actor for applying transformations to data."""

    def __init__(self, config: TransformActorConfig):
        self.transforms: dict[str, ProtoTransform] = {}
        self.call_use: str | None = None
        self._call_config = None
        self._skip_on_missing_input_keys = False
        self._required_doc_keys: frozenset[str] = frozenset()

        if config.rename is not None:
            self.t = Transform(rename=config.rename)
            return

        if config.call is None:
            raise ValueError(
                "TransformActorConfig requires call when rename is absent."
            )

        call = config.call
        self._call_config = call
        self.call_use = call.use
        inline_target = (
            call.target
            if call.target is not None
            else "values"
            if call.use is None
            else None
        )
        transform_kwargs: dict[str, Any] = {
            "name": call.use,
            "params": call.params,
            "module": call.module,
            "foo": call.foo,
            "input": tuple(call.input) if call.input else (),
            "output": tuple(call.output) if call.output else (),
            "input_groups": (
                tuple(tuple(group) for group in call.input_groups)
                if call.input_groups
                else ()
            ),
            "output_groups": (
                tuple(tuple(group) for group in call.output_groups)
                if call.output_groups
                else ()
            ),
            "dress": call.dress,
            "strategy": call.strategy or "single",
        }
        if inline_target is not None:
            transform_kwargs["target"] = inline_target
        if call.use is None:
            if call.keys is not None:
                transform_kwargs["keys"] = KeySelectionConfig.model_validate(
                    call.keys.model_dump()
                )
        # When call.use references ingestion_model.transforms, defer strict
        # transform validation until finish_init can hydrate module/foo.
        if call.use is not None and call.module is None and call.foo is None:
            self.t = Transform(name=call.use)
            return
        self.t = Transform(**transform_kwargs)

    def _refresh_missing_key_guard(self, init_ctx: ActorInitContext) -> None:
        self._skip_on_missing_input_keys = init_ctx.skip_actors_on_missing_input_keys
        if (
            not self._skip_on_missing_input_keys
            or self.t.target == "keys"
            or self.t.strategy == "all"
        ):
            self._required_doc_keys = frozenset()
            return
        required: set[str] = set(self.t.input)
        for group in self.t.input_groups:
            required.update(group)
        self._required_doc_keys = frozenset(required)

    def fetch_important_items(self) -> dict[str, Any]:
        items = self._fetch_items_from_dict(("transform",))
        items.update({"t.input": self.t.input, "t.output": self.t.output})
        return items

    @classmethod
    def from_config(cls, config: TransformActorConfig) -> TransformActor:
        return cls(config)

    def init_transforms(self, init_ctx: ActorInitContext) -> None:
        self.transforms = init_ctx.transforms

    def _merge_call_with_proto(self, call: Any, pt: ProtoTransform) -> dict[str, Any]:
        next_params = call.params if call.params else pt.params
        next_dress = call.dress if call.dress is not None else pt.dress
        next_target = call.target if call.target is not None else pt.target

        if next_target == "keys":
            if call.input or call.output or call.input_groups or call.output_groups:
                raise ValueError(
                    "call.input, call.output, call.input_groups, and call.output_groups "
                    "cannot be used when the effective transform target is keys "
                    "(from call.target or the named ingestion_model.transforms entry)."
                )
            if call.dress is not None:
                raise ValueError("call.dress is not supported when target='keys'.")
            if call.strategy is not None and call.strategy != "single":
                raise ValueError(
                    "call.strategy is not allowed when target='keys'; "
                    "key mode uses implicit per-key execution."
                )
            next_input: tuple[str, ...] = ()
            next_output: tuple[str, ...] = ()
            next_input_groups: tuple[tuple[str, ...], ...] = ()
            next_output_groups: tuple[tuple[str, ...], ...] = ()
        else:
            next_input_groups = (
                tuple(tuple(group) for group in call.input_groups)
                if call.input_groups
                else pt.input_groups
            )
            next_output_groups = (
                tuple(tuple(group) for group in call.output_groups)
                if call.output_groups
                else pt.output_groups
            )
            if next_input_groups:
                next_input = ()
                # Explicit grouped override should not inherit potentially
                # conflicting proto output/output_groups for a different shape.
                if call.input_groups:
                    next_output_groups = (
                        tuple(tuple(group) for group in call.output_groups)
                        if call.output_groups
                        else ()
                    )
                    next_output = tuple(call.output) if call.output else ()
                elif next_dress is not None:
                    next_output = (next_dress.key, next_dress.value)
                else:
                    next_output = tuple(call.output) if call.output else pt.output
            else:
                next_input = tuple(call.input) if call.input else pt.input
                if next_dress is not None:
                    next_output = (next_dress.key, next_dress.value)
                else:
                    next_output = tuple(call.output) if call.output else pt.output

        transform_kwargs: dict[str, Any] = {
            "dress": next_dress,
            "name": call.use,
            "module": pt.module,
            "foo": pt.foo,
            "params": next_params,
            "input": next_input,
            "output": next_output,
            "input_groups": next_input_groups,
            "output_groups": next_output_groups,
            "strategy": call.strategy or "single",
            "target": next_target,
        }
        if call.keys is not None:
            transform_kwargs["keys"] = KeySelectionConfig.model_validate(
                call.keys.model_dump()
            )
        else:
            transform_kwargs["keys"] = pt.keys.model_copy(deep=True)
        return transform_kwargs

    def finish_init(self, init_ctx: ActorInitContext) -> None:
        self.transforms = init_ctx.transforms
        if self.call_use is None or self.t._foo is not None:
            self._refresh_missing_key_guard(init_ctx)
            return
        if self._call_config is None:
            self._refresh_missing_key_guard(init_ctx)
            return
        pt = self.transforms.get(self.call_use, None)
        if pt is None:
            if init_ctx.strict_references:
                raise ValueError(
                    f"Transform '{self.call_use}' referenced by transform.call.use "
                    "was not found in ingestion_model.transforms."
                )
            self._refresh_missing_key_guard(init_ctx)
            return
        call = self._call_config
        transform_kwargs = self._merge_call_with_proto(call, pt)
        self.t = Transform(**transform_kwargs)
        self._refresh_missing_key_guard(init_ctx)

    def _extract_doc(self, nargs: tuple[Any, ...], **kwargs: Any) -> dict[str, Any]:
        if kwargs:
            doc: dict[str, Any] | None = kwargs.get("doc")
        elif nargs:
            doc = nargs[0]
        else:
            raise ValueError(f"{type(self).__name__}: doc should be provided")
        if doc is None:
            raise ValueError(f"{type(self).__name__}: doc should be provided")
        return doc

    def _format_transform_result(self, result: Any) -> TransformPayload:
        return TransformPayload.from_result(result)

    def __call__(
        self, ctx: ExtractionContext, lindex: LocationIndex, *nargs: Any, **kwargs: Any
    ) -> ExtractionContext:
        logger.debug("transforms : %s %s", id(self.transforms), len(self.transforms))
        doc = self._extract_doc(nargs, **kwargs)
        if self._skip_on_missing_input_keys and self._required_doc_keys:
            if not self._required_doc_keys.issubset(doc):
                return ctx
        transform_result = self.t(doc)
        _update_doc = self._format_transform_result(transform_result)
        ctx.transform_buffer[lindex].append(_update_doc)
        ctx.record_transform_observation(location=lindex, payload=_update_doc)
        return ctx

    def references_vertices(self) -> set[str]:
        return set()

VertexActor

Bases: Actor

Actor for processing vertex data.

Source code in graflo/architecture/pipeline/runtime/actor/vertex.py
class VertexActor(Actor):
    """Actor for processing vertex data."""

    def __init__(self, config: VertexActorConfig):
        self.name = config.vertex
        self.from_doc: dict[str, str] | None = config.from_doc
        self.keep_fields: tuple[str, ...] | None = (
            tuple(config.keep_fields) if config.keep_fields else None
        )
        self.extraction_scope: Literal["full", "mapped_only"] = config.extraction_scope
        self.role: str | None = config.role
        self.vertex_config: VertexConfig
        self.allowed_vertex_names: set[str] | None = None

    @classmethod
    def from_config(cls, config: VertexActorConfig) -> VertexActor:
        return cls(config)

    def fetch_important_items(self) -> dict[str, Any]:
        return self._fetch_items_from_dict(
            ("name", "from_doc", "keep_fields", "extraction_scope", "role")
        )

    def finish_init(self, init_ctx: ActorInitContext) -> None:
        self.vertex_config = init_ctx.vertex_config
        self.allowed_vertex_names = init_ctx.allowed_vertex_names

    def _filter_and_aggregate_vertex_docs(
        self, docs: list[dict[str, Any]], doc: dict[str, Any]
    ) -> list[dict[str, Any]]:
        filters = self.vertex_config.filters(self.name)
        return [
            _doc
            for _doc in docs
            if all(cfilter(kind=ExpressionFlavor.PYTHON, **_doc) for cfilter in filters)
        ]

    def _extract_vertex_doc_from_transformed_item(
        self,
        item: Any,
        vertex_keys: tuple[str, ...],
        index_keys: tuple[str, ...],
    ) -> dict[str, Any]:
        if isinstance(item, TransformPayload):
            doc: dict[str, Any] = {}
            consumed_named: set[str] = set()
            for k, v in item.named.items():
                if k in vertex_keys and v is not None:
                    doc[k] = v
                    consumed_named.add(k)
            for j, value in enumerate(item.positional):
                if j >= len(index_keys):
                    break
                doc[index_keys[j]] = value
            for key in consumed_named:
                item.named.pop(key, None)
            if item.positional:
                item.positional = ()
            return doc

        if isinstance(item, dict):
            doc = {}
            value_keys = sorted(
                (
                    k
                    for k in item
                    if k.startswith(ActorConstants.DRESSING_TRANSFORMED_VALUE_KEY)
                ),
                key=lambda x: int(x.rsplit("#", 1)[-1]),
            )
            for j, vkey in enumerate(value_keys):
                if j >= len(index_keys):
                    break
                doc[index_keys[j]] = item.pop(vkey)
            for vkey in vertex_keys:
                if vkey not in doc and vkey in item and item[vkey] is not None:
                    doc[vkey] = item.pop(vkey)
            return doc

        return {}

    def _process_transformed_items(
        self,
        ctx: ExtractionContext,
        lindex: LocationIndex,
        doc: dict[str, Any],
        vertex_keys: tuple[str, ...],
    ) -> list[dict[str, Any]]:
        index_keys = tuple(self.vertex_config.identity_fields(self.name))
        payloads = ctx.transform_buffer[lindex]
        extracted_docs = [
            self._extract_vertex_doc_from_transformed_item(
                item, vertex_keys, index_keys
            )
            for item in payloads
        ]
        ctx.transform_buffer[lindex] = [
            item
            for item in payloads
            if not (
                isinstance(item, TransformPayload)
                and not item.named
                and not item.positional
            )
            and not (isinstance(item, dict) and not item)
        ]
        return self._filter_and_aggregate_vertex_docs(extracted_docs, doc)

    def __call__(
        self, ctx: ExtractionContext, lindex: LocationIndex, *nargs: Any, **kwargs: Any
    ) -> ExtractionContext:
        doc: dict[str, Any] = kwargs.get("doc", {})
        buffer_items: list[Any] = list(ctx.transform_buffer.get(lindex, []))
        effective_doc = merge_observation_with_transform_buffer(doc, buffer_items)
        ctx.obs_buffer[lindex] = dict(effective_doc)

        # Early-exit for disallowed vertex types.
        # This must happen before any ctx.acc_vertex[...] access.
        if (
            self.allowed_vertex_names is not None
            and self.name not in self.allowed_vertex_names
        ):
            return ctx
        if (
            self.allowed_vertex_names is None
            and self.name not in self.vertex_config.vertex_set
        ):
            return ctx

        vertex_keys_list = self.vertex_config.property_names(self.name)
        vertex_keys: tuple[str, ...] = tuple(vertex_keys_list)

        # When a role is set the vertex is stored at a named sub-slot so that
        # multiple vertices of the same type in the same row (e.g. buyer/seller)
        # occupy distinct accumulator locations. Transforms are always read from
        # the bare row lindex; only storage moves to the role slot.
        effective_lindex = lindex.extend((self.role, 0)) if self.role else lindex

        agg = []
        if self.from_doc:
            projected = {
                v_f: effective_doc.get(d_f) for v_f, d_f in self.from_doc.items()
            }
            if any(v is not None for v in projected.values()):
                agg.append(projected)

        agg.extend(
            self._process_transformed_items(ctx, lindex, effective_doc, vertex_keys)
        )

        if self.extraction_scope == "full":
            remaining_keys = set(vertex_keys) - set().union(*[d.keys() for d in agg])
            # When keep_fields is set, restrict passthrough to only those declared fields.
            if self.keep_fields is not None:
                remaining_keys = remaining_keys & set(self.keep_fields)
            passthrough_doc = {
                k: effective_doc.get(k) for k in remaining_keys if k in effective_doc
            }
            if passthrough_doc:
                agg.append(passthrough_doc)

        merged = merge_doc_basis(
            agg, index_keys=tuple(self.vertex_config.identity_fields(self.name))
        )

        for m in merged:
            vertex_rep = VertexRep(vertex=m)
            ctx.acc_vertex[self.name][effective_lindex].append(vertex_rep)
            ctx.record_vertex_observation(
                vertex_name=self.name,
                location=effective_lindex,
                vertex=vertex_rep.vertex,
                ctx={},
            )
        return ctx

    def references_vertices(self) -> set[str]:
        return {self.name}

VertexRouterActor

Bases: Actor

Routes documents to the correct VertexActor based on a type field.

The merged observation (document + same-location transform buffer) is passed through to the selected :class:VertexActor unchanged. Projection uses the same from / vertex_from_map contract as a standalone vertex step.

Vertices are accumulated at lindex.extend((role, 0)). role is normalized by config validation (defaults to :attr:type_field when omitted), so runtime slot addressing uses a single internal key. A downstream dynamic EdgeActor references this slot via source_role / target_role (or source_type_field / target_type_field) using the same segment name.

Source code in graflo/architecture/pipeline/runtime/actor/vertex_router.py
class VertexRouterActor(Actor):
    """Routes documents to the correct VertexActor based on a type field.

    The merged observation (document + same-location transform buffer) is passed
    through to the selected :class:`VertexActor` unchanged. Projection uses the same
    ``from`` / ``vertex_from_map`` contract as a standalone vertex step.

    Vertices are accumulated at ``lindex.extend((role, 0))``. ``role`` is normalized
    by config validation (defaults to :attr:`type_field` when omitted), so runtime slot
    addressing uses a single internal key. A downstream dynamic ``EdgeActor`` references
    this slot via ``source_role`` / ``target_role`` (or ``source_type_field`` /
    ``target_type_field``) using the same segment name.
    """

    def __init__(self, config: VertexRouterActorConfig):
        self.type_field = config.type_field
        # Config normalization guarantees role is always present.
        self.role: str = config.role or config.type_field
        self.from_doc: dict[str, str] | None = config.from_doc
        self.keep_fields: tuple[str, ...] | None = (
            tuple(config.keep_fields) if config.keep_fields else None
        )
        self.extraction_scope: Literal["full", "mapped_only"] = config.extraction_scope
        self.type_map: dict[str, str] = config.type_map or {}
        self.vertex_from_map: dict[str, dict[str, str]] = config.vertex_from_map or {}
        self._vertex_actors: dict[str, ActorWrapper] = {}
        self._init_ctx: ActorInitContext | None = None
        self.vertex_config: VertexConfig = VertexConfig(vertices=[])

    @classmethod
    def from_config(cls, config: VertexRouterActorConfig) -> VertexRouterActor:
        return cls(config)

    def fetch_important_items(self) -> dict[str, Any]:
        items: dict[str, Any] = {"type_field": self.type_field, "role": self.role}
        if self.from_doc:
            items["from_doc"] = self.from_doc
        if self.keep_fields:
            items["keep_fields"] = list(self.keep_fields)
        items["extraction_scope"] = self.extraction_scope
        if self.type_map:
            items["type_map"] = self.type_map
        if self.vertex_from_map:
            items["vertex_from_map"] = self.vertex_from_map
        items["vertex_types"] = sorted(self._vertex_actors.keys())
        return items

    def finish_init(self, init_ctx: ActorInitContext) -> None:
        self.vertex_config = init_ctx.vertex_config
        self._init_ctx = init_ctx
        self._vertex_actors.clear()

    def _get_or_create_wrapper(self, vertex_type: str) -> "ActorWrapper | None":
        from .wrapper import ActorWrapper

        if vertex_type not in self.vertex_config.vertex_set:
            return None
        wrapper = self._vertex_actors.get(vertex_type)
        if wrapper is not None:
            return wrapper
        if self._init_ctx is None:
            raise RuntimeError(
                "VertexRouterActor._get_or_create_wrapper called before finish_init"
            )

        if vertex_type in self.vertex_from_map:
            per_type_from = self.vertex_from_map[vertex_type]
        else:
            per_type_from = self.from_doc
        config = VertexActorConfig(
            vertex=vertex_type,
            from_doc=per_type_from,
            keep_fields=list(self.keep_fields) if self.keep_fields else None,
            extraction_scope=self.extraction_scope,
        )
        wrapper = ActorWrapper.from_config(config)
        wrapper.finish_init(self._init_ctx)
        self._vertex_actors[vertex_type] = wrapper
        logger.debug(
            "VertexRouterActor: lazily registered VertexActor(%s) for type_field=%s role=%s",
            vertex_type,
            self.type_field,
            self.role,
        )
        return wrapper

    def count(self) -> int:
        return 1 + sum(w.count() for w in self._vertex_actors.values())

    def __call__(
        self, ctx: ExtractionContext, lindex: LocationIndex, *nargs: Any, **kwargs: Any
    ) -> ExtractionContext:
        raw_observation = kwargs.get("doc", {})
        if not isinstance(raw_observation, dict):
            logger.debug(
                "VertexRouterActor: expected dict observation slice, got %s, skipping",
                type(raw_observation).__name__,
            )
            return ctx
        buffer_items: list[Any] = list(ctx.transform_buffer.get(lindex, []))
        doc = merge_observation_with_transform_buffer(raw_observation, buffer_items)
        ctx.obs_buffer[lindex] = dict(doc)
        raw_vtype = doc.get(self.type_field)
        if raw_vtype is None:
            logger.debug(
                "VertexRouterActor: type_field '%s' not in doc, skipping",
                self.type_field,
            )
            return ctx
        vtype = self.type_map.get(raw_vtype, raw_vtype)

        wrapper = self._get_or_create_wrapper(vtype)
        if wrapper is None:
            logger.debug(
                "VertexRouterActor: vertex type '%s' (from field '%s') "
                "not in VertexConfig, skipping",
                vtype,
                self.type_field,
            )
            return ctx

        effective_lindex = lindex.extend((self.role, 0))
        return wrapper(ctx, effective_lindex, doc=doc)