Skip to content

graflo.architecture.pipeline.runtime.actor.vertex_router

Vertex router actor for routing nested JSON observations to vertex actors.

VertexRouterActor

Bases: Actor

Routes documents to the correct VertexActor based on a type field.

The merged observation (document + same-location transform buffer) is passed through to the selected :class:VertexActor unchanged. Projection uses the same from / vertex_from_map contract as a standalone vertex step.

Vertices are accumulated at lindex.extend((role, 0)). role is normalized by config validation (defaults to :attr:type_field when omitted), so runtime slot addressing uses a single internal key. A downstream dynamic EdgeActor references this slot via source_role / target_role (or source_type_field / target_type_field) using the same segment name.

Source code in graflo/architecture/pipeline/runtime/actor/vertex_router.py
class VertexRouterActor(Actor):
    """Routes documents to the correct VertexActor based on a type field.

    The merged observation (document + same-location transform buffer) is passed
    through to the selected :class:`VertexActor` unchanged. Projection uses the same
    ``from`` / ``vertex_from_map`` contract as a standalone vertex step.

    Vertices are accumulated at ``lindex.extend((role, 0))``. ``role`` is normalized
    by config validation (defaults to :attr:`type_field` when omitted), so runtime slot
    addressing uses a single internal key. A downstream dynamic ``EdgeActor`` references
    this slot via ``source_role`` / ``target_role`` (or ``source_type_field`` /
    ``target_type_field``) using the same segment name.
    """

    def __init__(self, config: VertexRouterActorConfig):
        self.type_field = config.type_field
        # Config normalization guarantees role is always present.
        self.role: str = config.role or config.type_field
        self.from_doc: dict[str, str] | None = config.from_doc
        self.keep_fields: tuple[str, ...] | None = (
            tuple(config.keep_fields) if config.keep_fields else None
        )
        self.extraction_scope: Literal["full", "mapped_only"] = config.extraction_scope
        self.type_map: dict[str, str] = config.type_map or {}
        self.vertex_from_map: dict[str, dict[str, str]] = config.vertex_from_map or {}
        self._vertex_actors: dict[str, ActorWrapper] = {}
        self._init_ctx: ActorInitContext | None = None
        self.vertex_config: VertexConfig = VertexConfig(vertices=[])

    @classmethod
    def from_config(cls, config: VertexRouterActorConfig) -> VertexRouterActor:
        return cls(config)

    def fetch_important_items(self) -> dict[str, Any]:
        items: dict[str, Any] = {"type_field": self.type_field, "role": self.role}
        if self.from_doc:
            items["from_doc"] = self.from_doc
        if self.keep_fields:
            items["keep_fields"] = list(self.keep_fields)
        items["extraction_scope"] = self.extraction_scope
        if self.type_map:
            items["type_map"] = self.type_map
        if self.vertex_from_map:
            items["vertex_from_map"] = self.vertex_from_map
        items["vertex_types"] = sorted(self._vertex_actors.keys())
        return items

    def finish_init(self, init_ctx: ActorInitContext) -> None:
        self.vertex_config = init_ctx.vertex_config
        self._init_ctx = init_ctx
        self._vertex_actors.clear()

    def _get_or_create_wrapper(self, vertex_type: str) -> "ActorWrapper | None":
        from .wrapper import ActorWrapper

        if vertex_type not in self.vertex_config.vertex_set:
            return None
        wrapper = self._vertex_actors.get(vertex_type)
        if wrapper is not None:
            return wrapper
        if self._init_ctx is None:
            raise RuntimeError(
                "VertexRouterActor._get_or_create_wrapper called before finish_init"
            )

        if vertex_type in self.vertex_from_map:
            per_type_from = self.vertex_from_map[vertex_type]
        else:
            per_type_from = self.from_doc
        config = VertexActorConfig(
            vertex=vertex_type,
            from_doc=per_type_from,
            keep_fields=list(self.keep_fields) if self.keep_fields else None,
            extraction_scope=self.extraction_scope,
        )
        wrapper = ActorWrapper.from_config(config)
        wrapper.finish_init(self._init_ctx)
        self._vertex_actors[vertex_type] = wrapper
        logger.debug(
            "VertexRouterActor: lazily registered VertexActor(%s) for type_field=%s role=%s",
            vertex_type,
            self.type_field,
            self.role,
        )
        return wrapper

    def count(self) -> int:
        return 1 + sum(w.count() for w in self._vertex_actors.values())

    def __call__(
        self, ctx: ExtractionContext, lindex: LocationIndex, *nargs: Any, **kwargs: Any
    ) -> ExtractionContext:
        raw_observation = kwargs.get("doc", {})
        if not isinstance(raw_observation, dict):
            logger.debug(
                "VertexRouterActor: expected dict observation slice, got %s, skipping",
                type(raw_observation).__name__,
            )
            return ctx
        buffer_items: list[Any] = list(ctx.transform_buffer.get(lindex, []))
        doc = merge_observation_with_transform_buffer(raw_observation, buffer_items)
        ctx.obs_buffer[lindex] = dict(doc)
        raw_vtype = doc.get(self.type_field)
        if raw_vtype is None:
            logger.debug(
                "VertexRouterActor: type_field '%s' not in doc, skipping",
                self.type_field,
            )
            return ctx
        vtype = self.type_map.get(raw_vtype, raw_vtype)

        wrapper = self._get_or_create_wrapper(vtype)
        if wrapper is None:
            logger.debug(
                "VertexRouterActor: vertex type '%s' (from field '%s') "
                "not in VertexConfig, skipping",
                vtype,
                self.type_field,
            )
            return ctx

        effective_lindex = lindex.extend((self.role, 0))
        return wrapper(ctx, effective_lindex, doc=doc)