Skip to content

graflo.architecture.pipeline.runtime.actor.edge

Edge actor for processing edge data.

EdgeActor

Bases: Actor

Actor for processing edge data.

Source code in graflo/architecture/pipeline/runtime/actor/edge.py
class EdgeActor(Actor):
    """Actor for processing edge data."""

    def __init__(self, config: EdgeActorConfig):
        kwargs = config.model_dump(by_alias=False, exclude_none=True)
        kwargs.pop("type", None)
        self.edge = Edge.from_dict(kwargs)
        self.vertex_config: Any = None

    @classmethod
    def from_config(cls, config: EdgeActorConfig) -> EdgeActor:
        return cls(config)

    def fetch_important_items(self) -> dict[str, Any]:
        return {
            k: self.edge.__dict__[k]
            for k in ["source", "target", "match_source", "match_target"]
            if k in self.edge.__dict__
        }

    def finish_init(self, init_ctx: ActorInitContext) -> None:
        self.vertex_config = init_ctx.vertex_config
        if self.vertex_config is not None:
            init_ctx.edge_config.update_edges(
                self.edge, vertex_config=self.vertex_config
            )

    def __call__(
        self, ctx: ExtractionContext, lindex: LocationIndex, *nargs: Any, **kwargs: Any
    ) -> ExtractionContext:
        ctx.edge_requests.append((self.edge, lindex))
        ctx.record_edge_intent(edge=self.edge, location=lindex)
        return ctx

    def references_vertices(self) -> set[str]:
        return {self.edge.source, self.edge.target}