Skip to content

graflo.architecture.pipeline.runtime.actor.config

Actor configuration models and parsing.

DescendActorConfig

Bases: ConfigBaseModel

Configuration for a DescendActor.

Source code in graflo/architecture/pipeline/runtime/actor/config/models.py
class DescendActorConfig(ConfigBaseModel):
    """Configuration for a DescendActor."""

    type: Literal["descend"] = PydanticField(
        default="descend", description="Actor type discriminator"
    )
    key: str | None = PydanticField(default=None, description="Key to descend into")
    any_key: bool = PydanticField(default=False, description="Process all keys")
    pipeline: list["ActorConfig"] = PydanticField(
        default_factory=list,
        alias="apply",
        description="Pipeline of actors to apply to nested data",
    )

    @model_validator(mode="before")
    @classmethod
    def set_type_and_normalize(cls, data: Any) -> Any:
        if not isinstance(data, dict):
            return data
        normalized = normalize_actor_step(cast(dict[str, Any], data))
        return normalized if normalized.get("type") == "descend" else data

    @model_validator(mode="after")
    def validate_explicit_vertex_requirements(self) -> DescendActorConfig:
        return self

EdgeActorConfig

Bases: ConfigBaseModel

Configuration for an EdgeActor (logical edge + ingestion derivation; flat YAML).

Single-intent mode (default): declare source/target via from/to (static vertex type names) or source_role/target_role (slot-based dynamic resolution; source_type_field/target_type_field remain accepted aliases). One edge intent is emitted per row.

Multi-link mode (links list): declare a list of :class:EdgeLinkConfig items. Each item emits one edge intent per row, allowing a single pipeline step to produce multiple relationship types from one flat row. Mutually exclusive with all top-level source/target fields.

Source code in graflo/architecture/pipeline/runtime/actor/config/models.py
class EdgeActorConfig(ConfigBaseModel):
    """Configuration for an EdgeActor (logical edge + ingestion derivation; flat YAML).

    **Single-intent mode** (default): declare source/target via ``from``/``to`` (static
    vertex type names) or ``source_role``/``target_role`` (slot-based dynamic
    resolution; ``source_type_field``/``target_type_field`` remain accepted aliases).
    One edge intent is emitted per row.

    **Multi-link mode** (``links`` list): declare a list of :class:`EdgeLinkConfig` items.
    Each item emits one edge intent per row, allowing a single pipeline step to produce
    multiple relationship types from one flat row.  Mutually exclusive with all top-level
    source/target fields.
    """

    type: Literal["edge"] = PydanticField(
        default="edge", description="Actor type discriminator"
    )
    source: str | None = PydanticField(
        default=None,
        alias="from",
        description="Source vertex type name (optional if source_type_field/source_role is set).",
    )
    target: str | None = PydanticField(
        default=None,
        alias="to",
        description="Target vertex type name (optional if target_type_field/target_role is set).",
    )
    source_type_field: str | None = PydanticField(
        default=None,
        description=(
            "Accumulator slot segment for the source vertex (same name as the upstream "
            "VertexRouterActor role, inferred from type_field when role is omitted). EdgeActor scans "
            "acc_vertex for data at lindex.extend((source_type_field, 0)) to resolve the "
            "source type dynamically. Legacy alias for source_role."
        ),
    )
    target_type_field: str | None = PydanticField(
        default=None,
        description=(
            "Accumulator slot segment for the target vertex (same name as upstream "
            "VertexRouterActor role, inferred from type_field when role is omitted). "
            "Legacy alias for target_role."
        ),
    )
    source_role: str | None = PydanticField(
        default=None,
        description=(
            "Role slot name for the source vertex — role-first alias for source_type_field. "
            "When both are set, values must match."
        ),
    )
    target_role: str | None = PydanticField(
        default=None,
        description=(
            "Role slot name for the target vertex — role-first alias for target_type_field. "
            "When both are set, values must match."
        ),
    )
    links: list[EdgeLinkConfig] | None = PydanticField(
        default=None,
        description=(
            "Multi-intent list. When set, each item emits one edge intent per row. "
            "Mutually exclusive with all top-level source/target/role fields. "
            "Use when a single flat row encodes multiple relationships."
        ),
    )
    relation_map: dict[str, str] | None = PydanticField(
        default=None,
        description="Map raw relation values to canonical relation names.",
    )
    strict_edge_types: bool = PydanticField(
        default=False,
        description=(
            "When True, skip rows whose resolved (source_type, target_type) pair "
            "is not pre-declared in the resource edge_config at init. "
            "When False (default), dynamic pairs are registered at runtime."
        ),
    )
    relation: str | None = PydanticField(
        default=None,
        description="Optional fixed logical relation / edge type name.",
    )
    relation_from_key: bool = PydanticField(
        default=False,
        description="Ingestion: derive per-row relation label from the location key during assembly.",
    )
    description: str | None = PydanticField(
        default=None,
        description="Optional semantic description (merged into schema Edge).",
    )
    relation_field: str | None = PydanticField(
        default=None,
        description="Ingestion: document field name for per-row relationship type.",
    )
    match_source: str | None = PydanticField(
        default=None,
        description="Ingestion: require this path segment in source locations.",
    )
    match_target: str | None = PydanticField(
        default=None,
        description="Ingestion: require this path segment in target locations.",
    )
    exclude_source: str | None = PydanticField(
        default=None,
        description="Ingestion: exclude source locations containing this segment.",
    )
    exclude_target: str | None = PydanticField(
        default=None,
        description="Ingestion: exclude target locations containing this segment.",
    )
    match: str | None = PydanticField(
        default=None,
        description="Ingestion: require this segment on both source and target locations.",
    )
    properties: list[Any] = PydanticField(
        default_factory=list,
        description="Edge properties merged into schema Edge (same forms as Edge.properties).",
    )
    vertex_weights: list[Any] = PydanticField(
        default_factory=list,
        description="Vertex-derived weight rules registered in EdgeDerivationRegistry.",
    )

    @staticmethod
    def _canonicalize_slot_key(
        role: str | None,
        legacy_type_field: str | None,
        *,
        role_name: str,
        type_field_name: str,
        context: str,
    ) -> str | None:
        """Canonicalize legacy slot-name fields to role-first semantics."""
        if (
            role is not None
            and legacy_type_field is not None
            and role != legacy_type_field
        ):
            raise ValueError(
                f"{role_name} and {type_field_name} must match when both are set in {context}."
            )
        return role if role is not None else legacy_type_field

    @model_validator(mode="after")
    def validate_type_sources(self) -> "EdgeActorConfig":
        if self.links is not None:
            # Multi-link mode: top-level source/target fields must all be absent.
            has_single = any(
                [
                    self.source,
                    self.target,
                    self.source_type_field,
                    self.target_type_field,
                    self.source_role,
                    self.target_role,
                ]
            )
            if has_single:
                raise ValueError(
                    "edge 'links' is mutually exclusive with top-level "
                    "from/to/source_type_field/target_type_field/source_role/target_role."
                )
            return self

        # Single-intent mode: canonicalize to role-first slot names.
        # Use object.__setattr__ to bypass validate_assignment re-triggering this validator.
        source_role = self._canonicalize_slot_key(
            self.source_role,
            self.source_type_field,
            role_name="source_role",
            type_field_name="source_type_field",
            context="an edge step",
        )
        target_role = self._canonicalize_slot_key(
            self.target_role,
            self.target_type_field,
            role_name="target_role",
            type_field_name="target_type_field",
            context="an edge step",
        )
        object.__setattr__(self, "source_role", source_role)
        object.__setattr__(self, "target_role", target_role)
        object.__setattr__(self, "source_type_field", None)
        object.__setattr__(self, "target_type_field", None)

        # Each side needs exactly one of: static type or dynamic slot.
        if self.source is None and self.source_role is None:
            raise ValueError(
                "edge step requires 'from' (source), source_role, or source_type_field."
            )
        if self.target is None and self.target_role is None:
            raise ValueError(
                "edge step requires 'to' (target), target_role, or target_type_field."
            )
        if self.source is not None and self.source_role is not None:
            raise ValueError("'from' and source_type_field are mutually exclusive.")
        if self.target is not None and self.target_role is not None:
            raise ValueError("'to' and target_type_field are mutually exclusive.")
        # Mixed mode (one static + one dynamic) is valid; both-static is pure static mode.
        return self

    @property
    def derivation(self) -> EdgeDerivation:
        """Normalized ingestion-only fields for assembly/render."""
        return EdgeDerivation(
            match_source=self.match_source,
            match_target=self.match_target,
            exclude_source=self.exclude_source,
            exclude_target=self.exclude_target,
            match=self.match,
            relation_field=self.relation_field,
            relation_from_key=self.relation_from_key,
        )

    @model_validator(mode="before")
    @classmethod
    def set_type_and_flatten(cls, data: Any) -> Any:
        if not isinstance(data, dict):
            return data
        normalized = normalize_actor_step(cast(dict[str, Any], data))
        return normalized if normalized.get("type") == "edge" else data

derivation property

Normalized ingestion-only fields for assembly/render.

EdgeLinkConfig

Bases: ConfigBaseModel

One intent in a multi-link edge step.

Each item in an EdgeActorConfig.links list describes one source→target→relation binding to emit per row. Equivalent to a single-intent edge step without the links field itself.

Slot resolution uses role-first semantics (source_role / target_role). Legacy aliases (source_type_field / target_type_field) are accepted and canonicalized to their role counterparts. The slot name is the accumulator segment populated by an upstream vertex step with a matching role, or by vertex_router.role (which defaults to type_field when omitted).

Source code in graflo/architecture/pipeline/runtime/actor/config/models.py
class EdgeLinkConfig(ConfigBaseModel):
    """One intent in a multi-link edge step.

    Each item in an ``EdgeActorConfig.links`` list describes one source→target→relation
    binding to emit per row. Equivalent to a single-intent ``edge`` step without the
    ``links`` field itself.

    Slot resolution uses role-first semantics (``source_role`` / ``target_role``).
    Legacy aliases (``source_type_field`` / ``target_type_field``) are accepted and
    canonicalized to their role counterparts. The slot name is the accumulator segment
    populated by an upstream ``vertex`` step with a matching ``role``, or by
    ``vertex_router.role`` (which defaults to ``type_field`` when omitted).
    """

    model_config = {"extra": "forbid", "populate_by_name": True}

    source: str | None = PydanticField(
        default=None,
        alias="from",
        description="Static source vertex type name. Exclusive with source_type_field / source_role.",
    )
    target: str | None = PydanticField(
        default=None,
        alias="to",
        description="Static target vertex type name. Exclusive with target_type_field / target_role.",
    )
    source_type_field: str | None = PydanticField(
        default=None,
        description=(
            "Accumulator slot segment for the source vertex (same name as upstream "
            "vertex/vertex_router role). Exclusive with 'from' and source_role."
        ),
    )
    target_type_field: str | None = PydanticField(
        default=None,
        description=(
            "Accumulator slot segment for the target vertex (same name as upstream "
            "vertex/vertex_router role). Exclusive with 'to' and target_role."
        ),
    )
    source_role: str | None = PydanticField(
        default=None,
        description=(
            "Role-first alias for source_type_field (same accumulator segment name). "
            "When both are set, values must match."
        ),
    )
    target_role: str | None = PydanticField(
        default=None,
        description=(
            "Role-first alias for target_type_field (same accumulator segment name). "
            "When both are set, values must match."
        ),
    )
    relation: str | None = PydanticField(
        default=None,
        description="Fixed relation / edge type name for this link.",
    )
    relation_field: str | None = PydanticField(
        default=None,
        description="Document field name for per-row relationship type.",
    )
    match_source: str | None = PydanticField(
        default=None,
        description="Require this path segment in source vertex locations.",
    )
    match_target: str | None = PydanticField(
        default=None,
        description="Require this path segment in target vertex locations.",
    )

    @staticmethod
    def _canonicalize_slot_key(
        role: str | None,
        legacy_type_field: str | None,
        *,
        role_name: str,
        type_field_name: str,
        context: str,
    ) -> str | None:
        """Canonicalize legacy slot-name fields to role-first semantics."""
        if (
            role is not None
            and legacy_type_field is not None
            and role != legacy_type_field
        ):
            raise ValueError(
                f"{role_name} and {type_field_name} must match when both are set in {context}."
            )
        return role if role is not None else legacy_type_field

    @model_validator(mode="after")
    def resolve_and_validate(self) -> "EdgeLinkConfig":
        # Canonicalize to role-first slot names while preserving legacy key input.
        # Use object.__setattr__ to bypass validate_assignment re-triggering this validator.
        source_role = self._canonicalize_slot_key(
            self.source_role,
            self.source_type_field,
            role_name="source_role",
            type_field_name="source_type_field",
            context="an edge link",
        )
        target_role = self._canonicalize_slot_key(
            self.target_role,
            self.target_type_field,
            role_name="target_role",
            type_field_name="target_type_field",
            context="an edge link",
        )
        object.__setattr__(self, "source_role", source_role)
        object.__setattr__(self, "target_role", target_role)
        object.__setattr__(self, "source_type_field", None)
        object.__setattr__(self, "target_type_field", None)

        # Each side needs exactly one of: static type or slot reference.
        if self.source is None and self.source_role is None:
            raise ValueError(
                "edge link requires 'from' (source), source_role, or source_type_field."
            )
        if self.target is None and self.target_role is None:
            raise ValueError(
                "edge link requires 'to' (target), target_role, or target_type_field."
            )
        if self.source is not None and self.source_role is not None:
            raise ValueError(
                "'from' and source_type_field/source_role are mutually exclusive."
            )
        if self.target is not None and self.target_role is not None:
            raise ValueError(
                "'to' and target_type_field/target_role are mutually exclusive."
            )
        return self

TransformActorConfig

Bases: ConfigBaseModel

Configuration for a TransformActor.

Source code in graflo/architecture/pipeline/runtime/actor/config/models.py
class TransformActorConfig(ConfigBaseModel):
    """Configuration for a TransformActor."""

    type: Literal["transform"] = PydanticField(
        default="transform", description="Actor type discriminator"
    )
    rename: dict[str, str] | None = PydanticField(
        default=None,
        description="Rename mapping in explicit DSL form: transform.rename.",
    )
    call: TransformCallConfig | None = PydanticField(
        default=None,
        description="Function-call configuration in explicit DSL form: transform.call.",
    )

    @model_validator(mode="before")
    @classmethod
    def set_type_and_flatten(cls, data: Any) -> Any:
        if not isinstance(data, dict):
            return data
        normalized = normalize_actor_step(cast(dict[str, Any], data))
        if normalized.get("type") != "transform":
            return data
        normalized = dict(normalized)
        call = normalized.get("call")
        if isinstance(call, dict):
            call = dict(call)
            for key in ("input", "output"):
                value = call.get(key)
                if isinstance(value, str):
                    call[key] = [value]
                elif isinstance(value, tuple):
                    call[key] = list(value)
            keys = call.get("keys")
            if isinstance(keys, str):
                call["keys"] = {"mode": "include", "names": [keys]}
            elif isinstance(keys, tuple):
                call["keys"] = {"mode": "include", "names": list(keys)}
            elif isinstance(keys, list):
                call["keys"] = {"mode": "include", "names": keys}
            elif isinstance(keys, dict):
                keys = dict(keys)
                names = keys.get("names")
                if isinstance(names, str):
                    keys["names"] = [names]
                elif isinstance(names, tuple):
                    keys["names"] = list(names)
                call["keys"] = keys
            normalized["call"] = call
        return normalized

    @model_validator(mode="after")
    def validate_mode(self) -> "TransformActorConfig":
        enabled = sum([self.rename is not None, self.call is not None])
        if enabled != 1:
            raise ValueError(
                "Transform step must define exactly one of rename or call."
            )
        return self

VertexActorConfig

Bases: VertexExtractionOptionsConfig

Configuration for a VertexActor.

Source code in graflo/architecture/pipeline/runtime/actor/config/models.py
class VertexActorConfig(VertexExtractionOptionsConfig):
    """Configuration for a VertexActor."""

    type: Literal["vertex"] = PydanticField(
        default="vertex", description="Actor type discriminator"
    )
    vertex: str = PydanticField(..., description="Name of the vertex type to create")

    @model_validator(mode="before")
    @classmethod
    def set_type(cls, data: Any) -> Any:
        if isinstance(data, dict) and "vertex" in data and "type" not in data:
            data = dict(data)
            data["type"] = "vertex"
        return data

VertexRouterActorConfig

Bases: VertexExtractionOptionsConfig

Configuration for a VertexRouterActor.

Field handling matches :class:VertexActorConfig: optional router-level from / from_doc (and per-type vertex_from_map), optional keep_fields, and the same merged observation dict is passed to the lazily created :class:VertexActor (no separate slice / rename layer).

Source code in graflo/architecture/pipeline/runtime/actor/config/models.py
class VertexRouterActorConfig(VertexExtractionOptionsConfig):
    """Configuration for a VertexRouterActor.

    Field handling matches :class:`VertexActorConfig`: optional router-level ``from`` /
    ``from_doc`` (and per-type ``vertex_from_map``), optional ``keep_fields``, and the
    same merged observation dict is passed to the lazily created :class:`VertexActor`
    (no separate slice / rename layer).
    """

    type: Literal["vertex_router"] = PydanticField(
        default="vertex_router", description="Actor type discriminator"
    )
    type_field: str = PydanticField(
        ...,
        description=(
            "Key on the merged observation (document + same-location transform buffer) "
            "whose value determines the target vertex type (after type_map). "
            "This is a discriminator field, not the internal slot key. Use the "
            "actual column name (e.g. ``s__class_name`` or ``p_kind``)."
        ),
    )
    type_map: dict[str, str] | None = PydanticField(
        default=None,
        description="Map raw document values to vertex type names.",
    )
    vertex_from_map: dict[str, dict[str, str]] | None = PydanticField(
        default=None,
        description="Per-vertex-type field projection.",
    )

    @model_validator(mode="before")
    @classmethod
    def set_type(cls, data: Any) -> Any:
        if isinstance(data, dict) and "type_field" in data and "type" not in data:
            data = dict(data)
            data["type"] = "vertex_router"
        return data

    @model_validator(mode="after")
    def normalize_role(self) -> "VertexRouterActorConfig":
        if self.role is None:
            object.__setattr__(self, "role", self.type_field)
        return self

normalize_actor_step(data)

Normalize a raw step dict so it has 'type' and flat structure for validation.

Source code in graflo/architecture/pipeline/runtime/actor/config/normalize.py
def normalize_actor_step(data: dict[str, Any]) -> dict[str, Any]:
    """Normalize a raw step dict so it has 'type' and flat structure for validation."""
    if not isinstance(data, dict):
        return data
    data = dict(data)
    if "type" in data:
        return data

    if "vertex" in data:
        data["type"] = "vertex"
        return data

    if "edge" in data:
        inner = data.pop("edge")
        if isinstance(inner, dict):
            data.update(inner)
        data["type"] = "edge"
        return data
    if ("source" in data or "from" in data) and ("target" in data or "to" in data):
        data = dict(data)
        data["type"] = "edge"
        return data
    if "create_edge" in data:
        inner = data.pop("create_edge")
        if isinstance(inner, dict):
            data.update(inner)
        data["type"] = "edge"
        return data

    if "descend" in data:
        inner = data.pop("descend")
        if isinstance(inner, dict):
            if "pipeline" in inner:
                inner["pipeline"] = [
                    normalize_actor_step(s) for s in _steps_list(inner["pipeline"])
                ]
            elif "apply" in inner:
                inner["pipeline"] = [
                    normalize_actor_step(s) for s in _steps_list(inner["apply"])
                ]
                del inner["apply"]
            data.update(inner)
        data["type"] = "descend"
        if "pipeline" not in data and "apply" in data:
            data["pipeline"] = [
                normalize_actor_step(s) for s in _steps_list(data["apply"])
            ]
            del data["apply"]
        return data

    if "vertex_router" in data:
        inner = data.pop("vertex_router")
        if isinstance(inner, dict):
            data.update(inner)
        data["type"] = "vertex_router"
        return data

    if "transform" in data:
        inner = data.pop("transform")
        if not isinstance(inner, dict):
            raise ValueError("transform step must be an object with rename or call.")
        data.update(inner)
        data["type"] = "transform"
        return data

    if "apply" in data:
        data["type"] = "descend"
        data["pipeline"] = [normalize_actor_step(s) for s in _steps_list(data["apply"])]
        del data["apply"]
        return data
    if "pipeline" in data:
        data["type"] = "descend"
        data["pipeline"] = [
            normalize_actor_step(s) for s in _steps_list(data["pipeline"])
        ]
        return data

    if "type" not in data and ("rename" in data or "call" in data):
        data = dict(data)
        data["type"] = "transform"
        return data

    return data

parse_root_config(*args, **kwargs)

Parse root input into a single ActorConfig (single step or descend pipeline).

Source code in graflo/architecture/pipeline/runtime/actor/config/parse.py
def parse_root_config(
    *args: Any,
    **kwargs: Any,
) -> ActorConfig:
    """Parse root input into a single ActorConfig (single step or descend pipeline)."""
    pipeline: list[dict[str, Any]] | None = None
    single: dict[str, Any] | None = None

    if kwargs and ("apply" in kwargs or "pipeline" in kwargs):
        raw = kwargs.get("pipeline") or kwargs.get("apply")
        if raw is not None:
            pipeline = cast(
                list[dict[str, Any]],
                list(raw) if isinstance(raw, list) else [raw],
            )
    elif args:
        if len(args) == 1 and isinstance(args[0], list):
            list_arg = args[0]
            if not all(isinstance(item, dict) for item in list_arg):
                raise ValueError("pipeline must be a list of dict actor steps")
            pipeline = [dict(item) for item in list_arg]
        elif len(args) == 1 and isinstance(args[0], dict):
            single = dict(args[0])
        elif args and all(isinstance(a, dict) for a in args):
            pipeline = [dict(a) for a in args]

    if pipeline is not None:
        configs = []
        for step in pipeline:
            normalized = normalize_actor_step(step)
            try:
                configs.append(_actor_config_adapter.validate_python(normalized))
            except ValidationError as err:
                _raise_step_validation_error(normalized, err)
        return DescendActorConfig.model_validate(
            {
                "type": "descend",
                "key": None,
                "any_key": False,
                "pipeline": configs,
            }
        )
    if single is not None:
        step_dict = {k: v for k, v in single.items() if k not in _STEP_STRIP_KEYS}
        return validate_actor_step(normalize_actor_step(step_dict))
    step_kwargs = {k: v for k, v in kwargs.items() if k not in _STEP_STRIP_KEYS}
    return validate_actor_step(normalize_actor_step(step_kwargs))

validate_actor_step(data)

Validate a normalized step dict as ActorConfig (discriminated union).

Source code in graflo/architecture/pipeline/runtime/actor/config/parse.py
def validate_actor_step(
    data: dict[str, Any],
) -> ActorConfig:
    """Validate a normalized step dict as ActorConfig (discriminated union)."""
    try:
        return _actor_config_adapter.validate_python(data)
    except ValidationError as err:
        _raise_step_validation_error(data, err)