Skip to content

graflo.architecture.pipeline.runtime

Pipeline runtime: actors, assembly, and executor.

Actor

Bases: ABC

Abstract base class for all actors in the system.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
class Actor(ABC):
    """Abstract base class for all actors in the system."""

    @abstractmethod
    def __call__(
        self, ctx: ExtractionContext, lindex: LocationIndex, *nargs, **kwargs
    ) -> ExtractionContext:
        """Execute the actor's main processing logic."""
        pass

    def fetch_important_items(self) -> dict[str, object]:
        """Get a dictionary of important items for string representation."""
        return {}

    def finish_init(self, init_ctx: ActorInitContext) -> None:
        """Complete initialization of the actor."""
        pass

    def init_transforms(self, init_ctx: ActorInitContext) -> None:
        """Initialize transformations for the actor."""
        pass

    def count(self) -> int:
        """Get the count of items processed by this actor."""
        return 1

    def references_vertices(self) -> set[str]:
        """Return vertex names this actor references."""
        return set()

    def _filter_items(self, items: dict[str, object]) -> dict[str, object]:
        """Filter out None and empty items."""
        return {k: v for k, v in items.items() if v is not None and v}

    def _stringify_items(self, items: dict[str, object]) -> dict[str, str]:
        """Convert items to string representation."""
        return {
            k: ", ".join(list(v)) if isinstance(v, (tuple, list)) else str(v)
            for k, v in items.items()
        }

    def _fetch_items_from_dict(self, keys: tuple[str, ...]) -> dict[str, object]:
        """Helper method to extract items from instance dict for string representation."""
        return {k: self.__dict__[k] for k in keys if k in self.__dict__}

    def __str__(self) -> str:
        """Get string representation of the actor."""
        d = self.fetch_important_items()
        d = self._filter_items(d)
        d = self._stringify_items(d)
        d_list = [[k, d[k]] for k in sorted(d)]
        d_list_b = [type(self).__name__] + [": ".join(x) for x in d_list]
        return "\n".join(d_list_b)

    __repr__ = __str__

    def fetch_actors(self, level: int, edges: list) -> tuple[int, type, str, list]:
        """Fetch actor information for tree representation."""
        return level, type(self), str(self), edges

__call__(ctx, lindex, *nargs, **kwargs) abstractmethod

Execute the actor's main processing logic.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
@abstractmethod
def __call__(
    self, ctx: ExtractionContext, lindex: LocationIndex, *nargs, **kwargs
) -> ExtractionContext:
    """Execute the actor's main processing logic."""
    pass

__str__()

Get string representation of the actor.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
def __str__(self) -> str:
    """Get string representation of the actor."""
    d = self.fetch_important_items()
    d = self._filter_items(d)
    d = self._stringify_items(d)
    d_list = [[k, d[k]] for k in sorted(d)]
    d_list_b = [type(self).__name__] + [": ".join(x) for x in d_list]
    return "\n".join(d_list_b)

count()

Get the count of items processed by this actor.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
def count(self) -> int:
    """Get the count of items processed by this actor."""
    return 1

fetch_actors(level, edges)

Fetch actor information for tree representation.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
def fetch_actors(self, level: int, edges: list) -> tuple[int, type, str, list]:
    """Fetch actor information for tree representation."""
    return level, type(self), str(self), edges

fetch_important_items()

Get a dictionary of important items for string representation.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
def fetch_important_items(self) -> dict[str, object]:
    """Get a dictionary of important items for string representation."""
    return {}

finish_init(init_ctx)

Complete initialization of the actor.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
def finish_init(self, init_ctx: ActorInitContext) -> None:
    """Complete initialization of the actor."""
    pass

init_transforms(init_ctx)

Initialize transformations for the actor.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
def init_transforms(self, init_ctx: ActorInitContext) -> None:
    """Initialize transformations for the actor."""
    pass

references_vertices()

Return vertex names this actor references.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
def references_vertices(self) -> set[str]:
    """Return vertex names this actor references."""
    return set()

ActorConstants

Constants used throughout the actor system.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
class ActorConstants:
    """Constants used throughout the actor system."""

    DESCEND_KEY: str = "key"
    DRESSING_TRANSFORMED_VALUE_KEY: str = "__value__"

ActorExecutor

Owns runtime extraction and assembly orchestration for an ActorWrapper.

Source code in graflo/architecture/pipeline/runtime/executor.py
class ActorExecutor:
    """Owns runtime extraction and assembly orchestration for an ActorWrapper."""

    def __init__(self, root: ActorWrapper):
        self.root = root

    def extract(self, doc: dict) -> ExtractionContext:
        extraction_ctx = ExtractionContext()
        return self.root(extraction_ctx, doc=doc)

    def assemble(
        self, extraction_ctx: ExtractionContext
    ) -> defaultdict[GraphEntity, list]:
        assembly_ctx = AssemblyContext.from_extraction(extraction_ctx)
        return self.root.assemble(assembly_ctx)

    def assemble_result(self, extraction_ctx: ExtractionContext) -> GraphAssemblyResult:
        return GraphAssemblyResult(entities=self.assemble(extraction_ctx))

ActorInitContext dataclass

Typed initialization state shared across actor tree.

Source code in graflo/architecture/pipeline/runtime/actor/base.py
@dataclass(slots=True)
class ActorInitContext:
    """Typed initialization state shared across actor tree."""

    vertex_config: VertexConfig
    edge_config: EdgeConfig
    transforms: dict[str, ProtoTransform]
    infer_edges: bool = True
    infer_edge_only: set[EdgeId] = field(default_factory=set)
    infer_edge_except: set[EdgeId] = field(default_factory=set)
    strict_references: bool = False

ActorWrapper

Wrapper class for managing actor instances.

Source code in graflo/architecture/pipeline/runtime/actor/wrapper.py
class ActorWrapper:
    """Wrapper class for managing actor instances."""

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        config = parse_root_config(*args, **kwargs)
        w = ActorWrapper.from_config(config)
        self.actor = w.actor
        self.init_ctx = w.init_ctx

    @property
    def vertex_config(self) -> VertexConfig:
        return self.init_ctx.vertex_config

    @property
    def edge_config(self) -> EdgeConfig:
        return self.init_ctx.edge_config

    @property
    def infer_edges(self) -> bool:
        return self.init_ctx.infer_edges

    @property
    def infer_edge_only(self) -> set[EdgeId]:
        return self.init_ctx.infer_edge_only

    @property
    def infer_edge_except(self) -> set[EdgeId]:
        return self.init_ctx.infer_edge_except

    def init_transforms(self, init_ctx: ActorInitContext) -> None:
        self.init_ctx = init_ctx
        self.actor.init_transforms(init_ctx)

    def finish_init(self, init_ctx: ActorInitContext) -> None:
        self.init_ctx = init_ctx
        self.actor.init_transforms(init_ctx)
        self.actor.finish_init(init_ctx)

    def count(self) -> int:
        return self.actor.count()

    @classmethod
    def from_config(cls, config: ActorConfig) -> ActorWrapper:
        if isinstance(config, VertexActorConfig):
            actor = VertexActor.from_config(config)
        elif isinstance(config, TransformActorConfig):
            actor = TransformActor.from_config(config)
        elif isinstance(config, EdgeActorConfig):
            actor = EdgeActor.from_config(config)
        elif isinstance(config, DescendActorConfig):
            actor = DescendActor.from_config(config)
        elif isinstance(config, VertexRouterActorConfig):
            actor = VertexRouterActor.from_config(config)
        elif isinstance(config, EdgeRouterActorConfig):
            actor = EdgeRouterActor.from_config(config)
        else:
            raise ValueError(
                f"Expected VertexActorConfig, TransformActorConfig, EdgeActorConfig, "
                f"DescendActorConfig, VertexRouterActorConfig, or EdgeRouterActorConfig, "
                f"got {type(config)}"
            )
        wrapper = cls.__new__(cls)
        wrapper.actor = actor
        wrapper.init_ctx = ActorInitContext(
            vertex_config=VertexConfig(vertices=[]),
            edge_config=EdgeConfig(),
            transforms={},
            infer_edges=True,
            infer_edge_only=set(),
            infer_edge_except=set(),
        )
        return wrapper

    @classmethod
    def _from_step(cls, step: dict[str, Any]) -> ActorWrapper:
        config = validate_actor_step(normalize_actor_step(step))
        return cls.from_config(config)

    def __call__(
        self,
        ctx: ExtractionContext,
        lindex: LocationIndex = LocationIndex(),
        *nargs: Any,
        **kwargs: Any,
    ) -> ExtractionContext:
        ctx = self.actor(ctx, lindex, *nargs, **kwargs)
        return ctx

    def assemble(
        self, ctx: ExtractionContext | AssemblyContext | ActionContext
    ) -> defaultdict[GraphEntity, list]:
        if isinstance(ctx, AssemblyContext):
            assembly_ctx = ctx
        else:
            assembly_ctx = AssemblyContext.from_extraction(ctx)
        assemble_edges(
            ctx=assembly_ctx,
            vertex_config=self.vertex_config,
            edge_config=self.edge_config,
            infer_edges=self.infer_edges,
            infer_edge_only=self.infer_edge_only,
            infer_edge_except=self.infer_edge_except,
        )

        for vertex_name, dd in assembly_ctx.acc_vertex.items():
            for lindex, vertex_list in dd.items():
                vertex_list = [x.vertex for x in vertex_list]
                vertex_list_updated = merge_doc_basis(
                    vertex_list,
                    tuple(self.vertex_config.identity_fields(vertex_name)),
                )
                vertex_list_updated = pick_unique_dict(vertex_list_updated)
                assembly_ctx.acc_global[vertex_name] += vertex_list_updated

        assembly_ctx = add_blank_collections(assembly_ctx, self.vertex_config)

        if isinstance(ctx, ActionContext):
            ctx.acc_global = assembly_ctx.acc_global
            return ctx.acc_global
        return assembly_ctx.acc_global

    @classmethod
    def from_dict(cls, data: dict | list) -> ActorWrapper:
        if isinstance(data, list):
            return cls(*data)
        return cls(**data)

    def assemble_tree(self, fig_path: Path | None = None):
        import logging

        logger = logging.getLogger(__name__)
        _, _, _, edges = self.fetch_actors(0, [])
        logger.info("%s", len(edges))
        try:
            import networkx as nx
        except ImportError as e:
            logger.error("not able to import networks %s", e)
            return None
        nodes = {}
        g = nx.MultiDiGraph()
        for ha, hb, pa, pb in edges:
            nodes[ha] = pa
            nodes[hb] = pb
        from graflo.plot.plotter import fillcolor_palette

        map_class2color = {
            DescendActor: fillcolor_palette["green"],
            VertexActor: "orange",
            VertexRouterActor: fillcolor_palette["peach"],
            EdgeRouterActor: fillcolor_palette["red"],
            EdgeActor: fillcolor_palette["violet"],
            TransformActor: fillcolor_palette["blue"],
        }

        for n, props in nodes.items():
            nodes[n]["fillcolor"] = map_class2color[props["class"]]
            nodes[n]["style"] = "filled"
            nodes[n]["color"] = "brown"

        edges = [(ha, hb) for ha, hb, _, _ in edges]
        g.add_edges_from(edges)
        g.add_nodes_from(nodes.items())

        if fig_path is not None:
            ag = nx.nx_agraph.to_agraph(g)
            ag.draw(fig_path, "pdf", prog="dot")
            return None
        return g

    def fetch_actors(self, level: int, edges: list) -> tuple[int, type, str, list]:
        return self.actor.fetch_actors(level, edges)

    def collect_actors(self) -> list[Actor]:
        actors = [self.actor]
        if isinstance(self.actor, DescendActor):
            for descendant in self.actor.descendants:
                actors.extend(descendant.collect_actors())
        return actors

    def find_descendants(
        self,
        predicate: Callable[[ActorWrapper], bool] | None = None,
        *,
        actor_type: type[Actor] | None = None,
        **attr_in: Any,
    ) -> list[ActorWrapper]:
        if predicate is None:

            def _predicate(w: ActorWrapper) -> bool:
                if actor_type is not None and not isinstance(w.actor, actor_type):
                    return False
                for attr, allowed in attr_in.items():
                    if allowed is None:
                        continue
                    val = getattr(w.actor, attr, None)
                    if val not in allowed:
                        return False
                return True

            predicate = _predicate

        result: list[ActorWrapper] = []
        if isinstance(self.actor, DescendActor):
            for d in self.actor.descendants:
                if predicate(d):
                    result.append(d)
                result.extend(d.find_descendants(predicate=predicate))
        return result

    def remove_descendants_if(self, predicate: Callable[[ActorWrapper], bool]) -> None:
        if isinstance(self.actor, DescendActor):
            for d in list(self.actor.descendants):
                d.remove_descendants_if(predicate=predicate)
            self.actor._descendants[:] = [
                d
                for d in self.actor.descendants
                if not predicate(d)
                and not (isinstance(d.actor, DescendActor) and d.count() == 0)
            ]

DescendActor

Bases: Actor

Actor for processing hierarchical data structures.

Source code in graflo/architecture/pipeline/runtime/actor/descend.py
class DescendActor(Actor):
    """Actor for processing hierarchical data structures."""

    def __init__(
        self,
        key: str | None,
        any_key: bool = False,
        *,
        _descendants: list[ActorWrapper] | None = None,
    ):
        self.key = key
        self.any_key = any_key
        self._descendants: list[ActorWrapper] = (
            list(_descendants) if _descendants else []
        )
        self._descendants_sorted = True
        self._descendants.sort(key=lambda x: _NodeTypePriority[type(x.actor)])

    def fetch_important_items(self) -> dict[str, Any]:
        items = self._fetch_items_from_dict(("key",))
        if self.any_key:
            items["any_key"] = True
        return items

    def add_descendant(self, d: ActorWrapper) -> None:
        self._descendants.append(d)
        self._descendants_sorted = False

    def count(self) -> int:
        return sum(d.count() for d in self.descendants)

    @property
    def descendants(self) -> list[ActorWrapper]:
        if not self._descendants_sorted:
            self._descendants.sort(key=lambda x: _NodeTypePriority[type(x.actor)])
            self._descendants_sorted = True
        return self._descendants

    @classmethod
    def from_config(cls, config: DescendActorConfig) -> DescendActor:
        from .wrapper import ActorWrapper

        wrappers = [ActorWrapper.from_config(c) for c in config.pipeline]
        return cls(key=config.key, any_key=config.any_key, _descendants=wrappers)

    def _infer_vertex_descendants_from_transforms(
        self, init_ctx: ActorInitContext
    ) -> None:
        from .transform import TransformActor
        from .vertex import VertexActor

        if any(isinstance(an.actor, VertexActor) for an in self.descendants):
            return

        transform_output_fields: set[str] = set()
        for an in self.descendants:
            if isinstance(an.actor, TransformActor):
                transform_output_fields.update(str(k) for k in an.actor.t.rename.keys())

        if not transform_output_fields:
            return

        inferred_vertices: list[str] = []
        for vertex_name in sorted(init_ctx.vertex_config.vertex_set):
            identity_fields = {
                f for f in init_ctx.vertex_config.identity_fields(vertex_name)
            }
            if identity_fields and identity_fields.issubset(transform_output_fields):
                inferred_vertices.append(vertex_name)

        if not inferred_vertices:
            return

        existing_targets: set[str] = set()
        for an in self.descendants:
            existing_targets.update(
                str(v) for v in an.actor.references_vertices() if v is not None
            )
        for vertex_name in inferred_vertices:
            if vertex_name in existing_targets:
                continue
            from .wrapper import ActorWrapper

            self.add_descendant(
                ActorWrapper.from_config(VertexActorConfig(vertex=vertex_name))
            )
            logger.debug(
                "DescendActor: inferred implicit VertexActor(%s) from untargeted transform fields %s",
                vertex_name,
                sorted(transform_output_fields),
            )

    def init_transforms(self, init_ctx: ActorInitContext) -> None:
        for an in self.descendants:
            an.init_transforms(init_ctx)

    def finish_init(self, init_ctx: ActorInitContext) -> None:
        self.vertex_config = init_ctx.vertex_config
        self._infer_vertex_descendants_from_transforms(init_ctx)
        for an in self.descendants:
            an.finish_init(init_ctx)

    def _expand_document(self, doc: dict | list) -> list[tuple[str | None, Any]]:
        if self.key is not None:
            if isinstance(doc, dict) and self.key in doc:
                items = doc[self.key]
                aux = items if isinstance(items, list) else [items]
                return [(self.key, item) for item in aux]
            return []
        elif self.any_key:
            if isinstance(doc, dict):
                result = []
                for key, items in doc.items():
                    aux = items if isinstance(items, list) else [items]
                    result.extend([(key, item) for item in aux])
                return result
            return []
        else:
            if isinstance(doc, list):
                return [(None, item) for item in doc]
            return [(None, doc)]

    def __call__(self, ctx: Any, lindex: Any, *nargs: Any, **kwargs: Any) -> Any:
        doc: Any = kwargs.pop("doc")
        if doc is None:
            raise ValueError(f"{type(self).__name__}: doc should be provided")
        if not doc:
            return ctx

        doc_expanded = self._expand_document(doc)
        if not doc_expanded:
            return ctx

        logger.debug("Expanding %s items", len(doc_expanded))

        for idoc, (key, sub_doc) in enumerate(doc_expanded):
            logger.debug("Processing item %s/%s", idoc + 1, len(doc_expanded))
            if isinstance(sub_doc, dict):
                nargs_tuple: tuple[Any, ...] = ()
                child_kwargs = {**kwargs, "doc": sub_doc}
            else:
                nargs_tuple = (sub_doc,)
                child_kwargs = kwargs

            extra_step = (idoc,) if key is None else (key, idoc)
            for j, anw in enumerate(self.descendants):
                logger.debug(
                    "%s: %s/%s",
                    type(anw.actor).__name__,
                    j + 1,
                    len(self.descendants),
                )
                ctx = anw(ctx, lindex.extend(extra_step), *nargs_tuple, **child_kwargs)
        return ctx

    def fetch_actors(self, level: int, edges: list) -> tuple[int, type, str, list]:
        label_current = str(self)
        cname_current = type(self)
        hash_current = hash((level, cname_current, label_current))
        logger.info("%s, %s", hash_current, (level, cname_current, label_current))
        props_current = {"label": label_current, "class": cname_current, "level": level}
        for d in self.descendants:
            level_a, cname, label_a, edges_a = d.fetch_actors(level + 1, edges)
            hash_a = hash((level_a, cname, label_a))
            props_a = {"label": label_a, "class": cname, "level": level_a}
            edges = [(hash_current, hash_a, props_current, props_a)] + edges_a
        return level, type(self), str(self), edges

EdgeActor

Bases: Actor

Actor for processing edge data.

Source code in graflo/architecture/pipeline/runtime/actor/edge.py
class EdgeActor(Actor):
    """Actor for processing edge data."""

    def __init__(self, config: EdgeActorConfig):
        kwargs = config.model_dump(by_alias=False, exclude_none=True)
        kwargs.pop("type", None)
        self.edge = Edge.from_dict(kwargs)
        self.vertex_config: Any = None

    @classmethod
    def from_config(cls, config: EdgeActorConfig) -> EdgeActor:
        return cls(config)

    def fetch_important_items(self) -> dict[str, Any]:
        return {
            k: self.edge.__dict__[k]
            for k in ["source", "target", "match_source", "match_target"]
            if k in self.edge.__dict__
        }

    def finish_init(self, init_ctx: ActorInitContext) -> None:
        self.vertex_config = init_ctx.vertex_config
        if self.vertex_config is not None:
            init_ctx.edge_config.update_edges(
                self.edge, vertex_config=self.vertex_config
            )

    def __call__(
        self, ctx: ExtractionContext, lindex: LocationIndex, *nargs: Any, **kwargs: Any
    ) -> ExtractionContext:
        ctx.edge_requests.append((self.edge, lindex))
        ctx.record_edge_intent(edge=self.edge, location=lindex)
        return ctx

    def references_vertices(self) -> set[str]:
        return {self.edge.source, self.edge.target}

EdgeRouterActor

Bases: Actor

Routes documents to dynamically created edges based on type fields.

Source code in graflo/architecture/pipeline/runtime/actor/edge_router.py
class EdgeRouterActor(Actor):
    """Routes documents to dynamically created edges based on type fields."""

    def __init__(self, config: EdgeRouterActorConfig):
        self.source_type_field = config.source_type_field
        self.target_type_field = config.target_type_field
        self.source = config.source
        self.target = config.target
        self.source_fields = config.source_fields
        self.target_fields = config.target_fields
        self.relation_field = config.relation_field
        self.relation = config.relation
        self._source_type_map: dict[str, str] = {
            **(config.type_map or {}),
            **(config.source_type_map or {}),
        }
        self._target_type_map: dict[str, str] = {
            **(config.type_map or {}),
            **(config.target_type_map or {}),
        }
        self._relation_map: dict[str, str] = config.relation_map or {}
        self._edge_cache: dict[tuple[str, str, str | None], Edge] = {}
        self._init_ctx: ActorInitContext | None = None
        self.vertex_config: VertexConfig = VertexConfig(vertices=[])
        self.edge_config: EdgeConfig = EdgeConfig()

    @classmethod
    def from_config(cls, config: EdgeRouterActorConfig) -> EdgeRouterActor:
        return cls(config)

    def finish_init(self, init_ctx: ActorInitContext) -> None:
        self._init_ctx = init_ctx
        self.vertex_config = init_ctx.vertex_config
        self.edge_config = init_ctx.edge_config
        self._edge_cache.clear()

    def _resolve_type(self, raw: str, type_map: dict[str, str]) -> str | None:
        resolved = type_map.get(raw, raw)
        if resolved not in self.vertex_config.vertex_set:
            logger.debug(
                "EdgeRouterActor: resolved type '%s' not in vertex_set, skipping",
                resolved,
            )
            return None
        return resolved

    def _resolve_relation(self, raw: str | None) -> str | None:
        if raw is None:
            return None
        return self._relation_map.get(raw, raw)

    def _resolve_side_type(
        self,
        doc: dict[str, Any],
        *,
        explicit_type: str | None,
        type_field: str | None,
        type_map: dict[str, str],
        side_name: str,
    ) -> str | None:
        if explicit_type is not None:
            return self._resolve_type(explicit_type, type_map)

        if type_field is None:
            logger.debug(
                "EdgeRouterActor: no %s type source configured, skipping",
                side_name,
            )
            return None

        raw_type = doc.get(type_field)
        if raw_type is None:
            logger.debug(
                "EdgeRouterActor: missing %s type field '%s' in doc, skipping",
                side_name,
                type_field,
            )
            return None

        return self._resolve_type(raw_type, type_map)

    def _get_or_create_edge(
        self,
        source_name: str,
        target_name: str,
        relation: str | None,
    ) -> Edge:
        key = (source_name, target_name, relation)
        if key in self._edge_cache:
            return self._edge_cache[key]
        edge = Edge(source=source_name, target=target_name, relation=relation)
        edge.finish_init(vertex_config=self.vertex_config)
        self.edge_config.update_edges(edge, vertex_config=self.vertex_config)
        self._edge_cache[key] = edge
        logger.debug(
            "EdgeRouterActor: registered dynamic edge (%s, %s, %s)",
            source_name,
            target_name,
            relation,
        )
        return edge

    def _project_vertex_doc(
        self,
        doc: dict[str, Any],
        fields: dict[str, str] | None,
        vertex_name: str,
    ) -> dict[str, Any]:
        if fields is not None:
            return {vf: doc[df] for vf, df in fields.items() if df in doc}
        identity = self.vertex_config.identity_fields(vertex_name)
        return {f: doc[f] for f in identity if f in doc}

    def __call__(
        self,
        ctx: ExtractionContext,
        lindex: LocationIndex,
        *nargs: Any,
        **kwargs: Any,
    ) -> ExtractionContext:
        doc: dict[str, Any] = kwargs.get("doc", {})

        source_name = self._resolve_side_type(
            doc,
            explicit_type=self.source,
            type_field=self.source_type_field,
            type_map=self._source_type_map,
            side_name="source",
        )
        target_name = self._resolve_side_type(
            doc,
            explicit_type=self.target,
            type_field=self.target_type_field,
            type_map=self._target_type_map,
            side_name="target",
        )
        if source_name is None or target_name is None:
            return ctx

        raw_relation = (
            doc.get(self.relation_field) if self.relation_field else self.relation
        )
        relation = self._resolve_relation(raw_relation)

        source_doc = self._project_vertex_doc(doc, self.source_fields, source_name)
        target_doc = self._project_vertex_doc(doc, self.target_fields, target_name)

        if not source_doc or not target_doc:
            logger.debug(
                "EdgeRouterActor: could not project identity docs for "
                "(%s, %s), skipping",
                source_name,
                target_name,
            )
            return ctx

        source_lindex = lindex.extend(("src", 0))
        target_lindex = lindex.extend(("tgt", 0))
        ctx.acc_vertex[source_name][source_lindex].append(
            VertexRep(vertex=source_doc, ctx={})
        )
        ctx.acc_vertex[target_name][target_lindex].append(
            VertexRep(vertex=target_doc, ctx={})
        )

        edge = self._get_or_create_edge(source_name, target_name, relation)
        ctx.edge_requests.append((edge, lindex))
        ctx.record_edge_intent(edge=edge, location=lindex)
        return ctx

    def references_vertices(self) -> set[str]:
        return {s for s, _, _ in self._edge_cache} | {t for _, t, _ in self._edge_cache}

    def fetch_important_items(self) -> dict[str, Any]:
        items: dict[str, Any] = {
            "source": self.source or "",
            "target": self.target or "",
            "source_type_field": self.source_type_field,
            "target_type_field": self.target_type_field,
            "relation_field": self.relation_field or "",
            "cached_edges": sorted(str(k) for k in self._edge_cache),
        }
        if self._relation_map:
            items["relation_map"] = self._relation_map
        return items

TransformActor

Bases: Actor

Actor for applying transformations to data.

Source code in graflo/architecture/pipeline/runtime/actor/transform.py
class TransformActor(Actor):
    """Actor for applying transformations to data."""

    def __init__(self, config: TransformActorConfig):
        self.transforms: dict[str, ProtoTransform] = {}
        self.call_use: str | None = None
        self._call_config = None

        if config.rename is not None:
            self.t = Transform(rename=config.rename)
            return

        if config.call is None:
            raise ValueError(
                "TransformActorConfig requires call when rename is absent."
            )

        call = config.call
        self._call_config = call
        self.call_use = call.use
        inline_target = (
            call.target
            if call.target is not None
            else "values"
            if call.use is None
            else None
        )
        transform_kwargs: dict[str, Any] = {
            "name": call.use,
            "params": call.params,
            "module": call.module,
            "foo": call.foo,
            "input": tuple(call.input) if call.input else (),
            "output": tuple(call.output) if call.output else (),
            "input_groups": (
                tuple(tuple(group) for group in call.input_groups)
                if call.input_groups
                else ()
            ),
            "output_groups": (
                tuple(tuple(group) for group in call.output_groups)
                if call.output_groups
                else ()
            ),
            "dress": call.dress,
            "strategy": call.strategy or "single",
        }
        if inline_target is not None:
            transform_kwargs["target"] = inline_target
        if call.use is None:
            if call.keys is not None:
                transform_kwargs["keys"] = KeySelectionConfig.model_validate(
                    call.keys.model_dump()
                )
        # When call.use references ingestion_model.transforms, defer strict
        # transform validation until finish_init can hydrate module/foo.
        if call.use is not None and call.module is None and call.foo is None:
            self.t = Transform(name=call.use)
            return
        self.t = Transform(**transform_kwargs)

    def fetch_important_items(self) -> dict[str, Any]:
        items = self._fetch_items_from_dict(("transform",))
        items.update({"t.input": self.t.input, "t.output": self.t.output})
        return items

    @classmethod
    def from_config(cls, config: TransformActorConfig) -> TransformActor:
        return cls(config)

    def init_transforms(self, init_ctx: ActorInitContext) -> None:
        self.transforms = init_ctx.transforms

    def _merge_call_with_proto(self, call: Any, pt: ProtoTransform) -> dict[str, Any]:
        next_params = call.params if call.params else pt.params
        next_dress = call.dress if call.dress is not None else pt.dress
        next_target = call.target if call.target is not None else pt.target

        if next_target == "keys":
            if call.input or call.output or call.input_groups or call.output_groups:
                raise ValueError(
                    "call.input, call.output, call.input_groups, and call.output_groups "
                    "cannot be used when the effective transform target is keys "
                    "(from call.target or the named ingestion_model.transforms entry)."
                )
            if call.dress is not None:
                raise ValueError("call.dress is not supported when target='keys'.")
            if call.strategy is not None and call.strategy != "single":
                raise ValueError(
                    "call.strategy is not allowed when target='keys'; "
                    "key mode uses implicit per-key execution."
                )
            next_input: tuple[str, ...] = ()
            next_output: tuple[str, ...] = ()
            next_input_groups: tuple[tuple[str, ...], ...] = ()
            next_output_groups: tuple[tuple[str, ...], ...] = ()
        else:
            next_input_groups = (
                tuple(tuple(group) for group in call.input_groups)
                if call.input_groups
                else pt.input_groups
            )
            next_output_groups = (
                tuple(tuple(group) for group in call.output_groups)
                if call.output_groups
                else pt.output_groups
            )
            if next_input_groups:
                next_input = ()
                # Explicit grouped override should not inherit potentially
                # conflicting proto output/output_groups for a different shape.
                if call.input_groups:
                    next_output_groups = (
                        tuple(tuple(group) for group in call.output_groups)
                        if call.output_groups
                        else ()
                    )
                    next_output = tuple(call.output) if call.output else ()
                elif next_dress is not None:
                    next_output = (next_dress.key, next_dress.value)
                else:
                    next_output = tuple(call.output) if call.output else pt.output
            else:
                next_input = tuple(call.input) if call.input else pt.input
                if next_dress is not None:
                    next_output = (next_dress.key, next_dress.value)
                else:
                    next_output = tuple(call.output) if call.output else pt.output

        transform_kwargs: dict[str, Any] = {
            "dress": next_dress,
            "name": call.use,
            "module": pt.module,
            "foo": pt.foo,
            "params": next_params,
            "input": next_input,
            "output": next_output,
            "input_groups": next_input_groups,
            "output_groups": next_output_groups,
            "strategy": call.strategy or "single",
            "target": next_target,
        }
        if call.keys is not None:
            transform_kwargs["keys"] = KeySelectionConfig.model_validate(
                call.keys.model_dump()
            )
        else:
            transform_kwargs["keys"] = pt.keys.model_copy(deep=True)
        return transform_kwargs

    def finish_init(self, init_ctx: ActorInitContext) -> None:
        self.transforms = init_ctx.transforms
        if self.call_use is None or self.t._foo is not None:
            return
        if self._call_config is None:
            return
        pt = self.transforms.get(self.call_use, None)
        if pt is None:
            if init_ctx.strict_references:
                raise ValueError(
                    f"Transform '{self.call_use}' referenced by transform.call.use "
                    "was not found in ingestion_model.transforms."
                )
            return
        call = self._call_config
        transform_kwargs = self._merge_call_with_proto(call, pt)
        self.t = Transform(**transform_kwargs)

    def _extract_doc(self, nargs: tuple[Any, ...], **kwargs: Any) -> dict[str, Any]:
        if kwargs:
            doc: dict[str, Any] | None = kwargs.get("doc")
        elif nargs:
            doc = nargs[0]
        else:
            raise ValueError(f"{type(self).__name__}: doc should be provided")
        if doc is None:
            raise ValueError(f"{type(self).__name__}: doc should be provided")
        return doc

    def _format_transform_result(self, result: Any) -> TransformPayload:
        return TransformPayload.from_result(result)

    def __call__(
        self, ctx: ExtractionContext, lindex: LocationIndex, *nargs: Any, **kwargs: Any
    ) -> ExtractionContext:
        logger.debug("transforms : %s %s", id(self.transforms), len(self.transforms))
        doc = self._extract_doc(nargs, **kwargs)
        transform_result = self.t(doc)
        _update_doc = self._format_transform_result(transform_result)
        ctx.buffer_transforms[lindex].append(_update_doc)
        ctx.record_transform_observation(location=lindex, payload=_update_doc)
        return ctx

    def references_vertices(self) -> set[str]:
        return set()

VertexActor

Bases: Actor

Actor for processing vertex data.

Source code in graflo/architecture/pipeline/runtime/actor/vertex.py
class VertexActor(Actor):
    """Actor for processing vertex data."""

    def __init__(self, config: VertexActorConfig):
        self.name = config.vertex
        self.from_doc: dict[str, str] | None = config.from_doc
        self.keep_fields: tuple[str, ...] | None = (
            tuple(config.keep_fields) if config.keep_fields else None
        )
        self.vertex_config: VertexConfig

    @classmethod
    def from_config(cls, config: VertexActorConfig) -> VertexActor:
        return cls(config)

    def fetch_important_items(self) -> dict[str, Any]:
        return self._fetch_items_from_dict(("name", "from_doc", "keep_fields"))

    def finish_init(self, init_ctx: ActorInitContext) -> None:
        self.vertex_config = init_ctx.vertex_config

    def _filter_and_aggregate_vertex_docs(
        self, docs: list[dict[str, Any]], doc: dict[str, Any]
    ) -> list[dict[str, Any]]:
        filters = self.vertex_config.filters(self.name)
        return [
            _doc
            for _doc in docs
            if all(cfilter(kind=ExpressionFlavor.PYTHON, **_doc) for cfilter in filters)
        ]

    def _extract_vertex_doc_from_transformed_item(
        self,
        item: Any,
        vertex_keys: tuple[str, ...],
        index_keys: tuple[str, ...],
    ) -> dict[str, Any]:
        if isinstance(item, TransformPayload):
            doc: dict[str, Any] = {}
            consumed_named: set[str] = set()
            for k, v in item.named.items():
                if k in vertex_keys and v is not None:
                    doc[k] = v
                    consumed_named.add(k)
            for j, value in enumerate(item.positional):
                if j >= len(index_keys):
                    break
                doc[index_keys[j]] = value
            for key in consumed_named:
                item.named.pop(key, None)
            if item.positional:
                item.positional = ()
            return doc

        if isinstance(item, dict):
            doc = {}
            value_keys = sorted(
                (
                    k
                    for k in item
                    if k.startswith(ActorConstants.DRESSING_TRANSFORMED_VALUE_KEY)
                ),
                key=lambda x: int(x.rsplit("#", 1)[-1]),
            )
            for j, vkey in enumerate(value_keys):
                if j >= len(index_keys):
                    break
                doc[index_keys[j]] = item.pop(vkey)
            for vkey in vertex_keys:
                if vkey not in doc and vkey in item and item[vkey] is not None:
                    doc[vkey] = item.pop(vkey)
            return doc

        return {}

    def _process_transformed_items(
        self,
        ctx: ExtractionContext,
        lindex: LocationIndex,
        doc: dict[str, Any],
        vertex_keys: tuple[str, ...],
    ) -> list[dict[str, Any]]:
        index_keys = tuple(self.vertex_config.identity_fields(self.name))
        payloads = ctx.buffer_transforms[lindex]
        extracted_docs = [
            self._extract_vertex_doc_from_transformed_item(
                item, vertex_keys, index_keys
            )
            for item in payloads
        ]
        ctx.buffer_transforms[lindex] = [
            item
            for item in payloads
            if not (
                isinstance(item, TransformPayload)
                and not item.named
                and not item.positional
            )
            and not (isinstance(item, dict) and not item)
        ]
        return self._filter_and_aggregate_vertex_docs(extracted_docs, doc)

    def __call__(
        self, ctx: ExtractionContext, lindex: LocationIndex, *nargs: Any, **kwargs: Any
    ) -> ExtractionContext:
        doc: dict[str, Any] = kwargs.get("doc", {})
        vertex_keys_list = self.vertex_config.fields_names(self.name)
        vertex_keys: tuple[str, ...] = tuple(vertex_keys_list)

        agg = []
        if self.from_doc:
            projected = {v_f: doc.get(d_f) for v_f, d_f in self.from_doc.items()}
            if any(v is not None for v in projected.values()):
                agg.append(projected)

        agg.extend(self._process_transformed_items(ctx, lindex, doc, vertex_keys))

        remaining_keys = set(vertex_keys) - set().union(*[d.keys() for d in agg])
        passthrough_doc = {k: doc.pop(k) for k in remaining_keys if k in doc}
        if passthrough_doc:
            agg.append(passthrough_doc)

        merged = merge_doc_basis(
            agg, index_keys=tuple(self.vertex_config.identity_fields(self.name))
        )

        obs_ctx = {q: w for q, w in doc.items() if not isinstance(w, (dict, list))}
        for m in merged:
            vertex_rep = VertexRep(vertex=m, ctx=obs_ctx)
            ctx.acc_vertex[self.name][lindex].append(vertex_rep)
            ctx.record_vertex_observation(
                vertex_name=self.name,
                location=lindex,
                vertex=vertex_rep.vertex,
                ctx=vertex_rep.ctx,
            )
        return ctx

    def references_vertices(self) -> set[str]:
        return {self.name}

VertexRouterActor

Bases: Actor

Routes documents to the correct VertexActor based on a type field.

Source code in graflo/architecture/pipeline/runtime/actor/vertex_router.py
class VertexRouterActor(Actor):
    """Routes documents to the correct VertexActor based on a type field."""

    def __init__(self, config: VertexRouterActorConfig):
        self.type_field = config.type_field
        self.prefix = config.prefix
        self.field_map = config.field_map
        self.type_map: dict[str, str] = config.type_map or {}
        self.vertex_from_map: dict[str, dict[str, str]] = config.vertex_from_map or {}
        self._vertex_actors: dict[str, ActorWrapper] = {}
        self._init_ctx: ActorInitContext | None = None
        self.vertex_config: VertexConfig = VertexConfig(vertices=[])

    @classmethod
    def from_config(cls, config: VertexRouterActorConfig) -> VertexRouterActor:
        return cls(config)

    def fetch_important_items(self) -> dict[str, Any]:
        items: dict[str, Any] = {"type_field": self.type_field}
        if self.prefix:
            items["prefix"] = self.prefix
        if self.field_map:
            items["field_map"] = self.field_map
        if self.type_map:
            items["type_map"] = self.type_map
        if self.vertex_from_map:
            items["vertex_from_map"] = self.vertex_from_map
        items["vertex_types"] = sorted(self._vertex_actors.keys())
        return items

    def finish_init(self, init_ctx: ActorInitContext) -> None:
        self.vertex_config = init_ctx.vertex_config
        self._init_ctx = init_ctx
        self._vertex_actors.clear()

    def _get_or_create_wrapper(self, vertex_type: str) -> "ActorWrapper | None":
        from .wrapper import ActorWrapper

        if vertex_type not in self.vertex_config.vertex_set:
            return None
        wrapper = self._vertex_actors.get(vertex_type)
        if wrapper is not None:
            return wrapper
        if self._init_ctx is None:
            raise RuntimeError(
                "VertexRouterActor._get_or_create_wrapper called before finish_init"
            )

        from_doc = self.vertex_from_map.get(vertex_type)
        config = VertexActorConfig(vertex=vertex_type, from_doc=from_doc)
        wrapper = ActorWrapper.from_config(config)
        wrapper.finish_init(self._init_ctx)
        self._vertex_actors[vertex_type] = wrapper
        logger.debug(
            "VertexRouterActor: lazily registered VertexActor(%s) for type_field=%s",
            vertex_type,
            self.type_field,
        )
        return wrapper

    def count(self) -> int:
        return 1 + sum(w.count() for w in self._vertex_actors.values())

    def _extract_sub_doc(self, doc: dict[str, Any]) -> dict[str, Any]:
        if self.prefix:
            return {
                k[len(self.prefix) :]: v
                for k, v in doc.items()
                if k.startswith(self.prefix)
            }
        if self.field_map:
            return {
                new_key: doc[old_key]
                for old_key, new_key in self.field_map.items()
                if old_key in doc
            }
        return doc

    def __call__(
        self, ctx: ExtractionContext, lindex: LocationIndex, *nargs: Any, **kwargs: Any
    ) -> ExtractionContext:
        doc: dict[str, Any] = kwargs.get("doc", {})
        raw_vtype = doc.get(self.type_field)
        if raw_vtype is None:
            logger.debug(
                "VertexRouterActor: type_field '%s' not in doc, skipping",
                self.type_field,
            )
            return ctx
        vtype = self.type_map.get(raw_vtype, raw_vtype)

        wrapper = self._get_or_create_wrapper(vtype)
        if wrapper is None:
            logger.debug(
                "VertexRouterActor: vertex type '%s' (from field '%s') "
                "not in VertexConfig, skipping",
                vtype,
                self.type_field,
            )
            return ctx

        sub_doc = self._extract_sub_doc(doc)
        if not sub_doc:
            return ctx

        return wrapper(ctx, lindex, doc=sub_doc)