Skip to content

graflo.architecture.pipeline.runtime.actor.config

Actor configuration models and parsing.

DescendActorConfig

Bases: ConfigBaseModel

Configuration for a DescendActor.

Source code in graflo/architecture/pipeline/runtime/actor/config/models.py
class DescendActorConfig(ConfigBaseModel):
    """Configuration for a DescendActor."""

    type: Literal["descend"] = PydanticField(
        default="descend", description="Actor type discriminator"
    )
    key: str | None = PydanticField(default=None, description="Key to descend into")
    any_key: bool = PydanticField(default=False, description="Process all keys")
    pipeline: list["ActorConfig"] = PydanticField(
        default_factory=list,
        alias="apply",
        description="Pipeline of actors to apply to nested data",
    )

    @model_validator(mode="before")
    @classmethod
    def set_type_and_normalize(cls, data: Any) -> Any:
        if not isinstance(data, dict):
            return data
        normalized = normalize_actor_step(cast(dict[str, Any], data))
        return normalized if normalized.get("type") == "descend" else data

    @model_validator(mode="after")
    def validate_explicit_vertex_requirements(self) -> DescendActorConfig:
        return self

EdgeActorConfig

Bases: EdgeBase

Configuration for an EdgeActor.

Source code in graflo/architecture/pipeline/runtime/actor/config/models.py
class EdgeActorConfig(EdgeBase):
    """Configuration for an EdgeActor."""

    type: Literal["edge"] = PydanticField(
        default="edge", description="Actor type discriminator"
    )
    source: str = PydanticField(
        ..., alias="from", description="Source vertex type name"
    )
    target: str = PydanticField(..., alias="to", description="Target vertex type name")
    weights: dict[str, list[str]] | None = PydanticField(
        default=None, description="Weight configuration"
    )

    @model_validator(mode="before")
    @classmethod
    def set_type_and_flatten(cls, data: Any) -> Any:
        if not isinstance(data, dict):
            return data
        normalized = normalize_actor_step(cast(dict[str, Any], data))
        return normalized if normalized.get("type") == "edge" else data

EdgeRouterActorConfig

Bases: ConfigBaseModel

Configuration for an EdgeRouterActor.

Source code in graflo/architecture/pipeline/runtime/actor/config/models.py
class EdgeRouterActorConfig(ConfigBaseModel):
    """Configuration for an EdgeRouterActor."""

    type: Literal["edge_router"] = PydanticField(
        default="edge_router", description="Actor type discriminator"
    )
    source_type_field: str | None = PydanticField(
        default=None,
        description=(
            "Document field whose value determines the source vertex type. "
            "Provide this or source."
        ),
    )
    target_type_field: str | None = PydanticField(
        default=None,
        description=(
            "Document field whose value determines the target vertex type. "
            "Provide this or target."
        ),
    )
    source: str | None = PydanticField(
        default=None,
        description=(
            "Static source vertex type name. Provide this or source_type_field."
        ),
    )
    target: str | None = PydanticField(
        default=None,
        description=(
            "Static target vertex type name. Provide this or target_type_field."
        ),
    )
    source_fields: dict[str, str] | None = PydanticField(
        default=None,
        description="Projection for source vertex identity.",
    )
    target_fields: dict[str, str] | None = PydanticField(
        default=None,
        description="Projection for target vertex identity.",
    )
    relation_field: str | None = PydanticField(
        default=None,
        description="Document field whose value determines the relation type per row.",
    )
    relation: str | None = PydanticField(
        default=None,
        description="Static relation type when relation_field is not used.",
    )
    type_map: dict[str, str] | None = PydanticField(
        default=None,
        description="Shared map: raw type value -> vertex type name.",
    )
    source_type_map: dict[str, str] | None = PydanticField(
        default=None,
        description="Override type_map for source side only.",
    )
    target_type_map: dict[str, str] | None = PydanticField(
        default=None,
        description="Override type_map for target side only.",
    )
    relation_map: dict[str, str] | None = PydanticField(
        default=None,
        description="Map raw relation values to canonical names.",
    )

    @model_validator(mode="before")
    @classmethod
    def set_type(cls, data: Any) -> Any:
        if (
            isinstance(data, dict)
            and (
                "source_type_field" in data
                or "target_type_field" in data
                or "source" in data
                or "target" in data
            )
            and "type" not in data
        ):
            data = dict(data)
            data["type"] = "edge_router"
        return data

    @model_validator(mode="after")
    def validate_side_type_sources(self) -> "EdgeRouterActorConfig":
        if self.source is None and self.source_type_field is None:
            raise ValueError(
                "edge_router requires source or source_type_field to be provided."
            )
        if self.target is None and self.target_type_field is None:
            raise ValueError(
                "edge_router requires target or target_type_field to be provided."
            )
        if self.source_type_field is None and self.target_type_field is None:
            raise ValueError(
                "edge_router requires at least one of "
                "source_type_field or target_type_field."
            )
        return self

TransformActorConfig

Bases: ConfigBaseModel

Configuration for a TransformActor.

Source code in graflo/architecture/pipeline/runtime/actor/config/models.py
class TransformActorConfig(ConfigBaseModel):
    """Configuration for a TransformActor."""

    type: Literal["transform"] = PydanticField(
        default="transform", description="Actor type discriminator"
    )
    rename: dict[str, str] | None = PydanticField(
        default=None,
        description="Rename mapping in explicit DSL form: transform.rename.",
    )
    call: TransformCallConfig | None = PydanticField(
        default=None,
        description="Function-call configuration in explicit DSL form: transform.call.",
    )

    @model_validator(mode="before")
    @classmethod
    def set_type_and_flatten(cls, data: Any) -> Any:
        if not isinstance(data, dict):
            return data
        normalized = normalize_actor_step(cast(dict[str, Any], data))
        if normalized.get("type") != "transform":
            return data
        normalized = dict(normalized)
        call = normalized.get("call")
        if isinstance(call, dict):
            call = dict(call)
            for key in ("input", "output"):
                value = call.get(key)
                if isinstance(value, str):
                    call[key] = [value]
                elif isinstance(value, tuple):
                    call[key] = list(value)
            keys = call.get("keys")
            if isinstance(keys, str):
                call["keys"] = {"mode": "include", "names": [keys]}
            elif isinstance(keys, tuple):
                call["keys"] = {"mode": "include", "names": list(keys)}
            elif isinstance(keys, list):
                call["keys"] = {"mode": "include", "names": keys}
            elif isinstance(keys, dict):
                keys = dict(keys)
                names = keys.get("names")
                if isinstance(names, str):
                    keys["names"] = [names]
                elif isinstance(names, tuple):
                    keys["names"] = list(names)
                call["keys"] = keys
            normalized["call"] = call
        return normalized

    @model_validator(mode="after")
    def validate_mode(self) -> "TransformActorConfig":
        enabled = sum([self.rename is not None, self.call is not None])
        if enabled != 1:
            raise ValueError(
                "Transform step must define exactly one of rename or call."
            )
        return self

VertexActorConfig

Bases: ConfigBaseModel

Configuration for a VertexActor.

Source code in graflo/architecture/pipeline/runtime/actor/config/models.py
class VertexActorConfig(ConfigBaseModel):
    """Configuration for a VertexActor."""

    type: Literal["vertex"] = PydanticField(
        default="vertex", description="Actor type discriminator"
    )
    vertex: str = PydanticField(..., description="Name of the vertex type to create")
    from_doc: dict[str, str] | None = PydanticField(
        default=None,
        alias="from",
        description="Projection: {vertex_field: doc_field}.",
    )
    keep_fields: list[str] | None = PydanticField(
        default=None, description="Optional list of fields to keep"
    )

    @model_validator(mode="before")
    @classmethod
    def set_type(cls, data: Any) -> Any:
        if isinstance(data, dict) and "vertex" in data and "type" not in data:
            data = dict(data)
            data["type"] = "vertex"
        return data

VertexRouterActorConfig

Bases: ConfigBaseModel

Configuration for a VertexRouterActor.

Source code in graflo/architecture/pipeline/runtime/actor/config/models.py
class VertexRouterActorConfig(ConfigBaseModel):
    """Configuration for a VertexRouterActor."""

    type: Literal["vertex_router"] = PydanticField(
        default="vertex_router", description="Actor type discriminator"
    )
    type_field: str = PydanticField(
        ...,
        description="Document field whose value determines the target vertex type name.",
    )
    prefix: str | None = PydanticField(
        default=None,
        description="Optional prefix to strip from document field keys.",
    )
    field_map: dict[str, str] | None = PydanticField(
        default=None,
        description="Optional explicit rename map.",
    )
    type_map: dict[str, str] | None = PydanticField(
        default=None,
        description="Map raw document values to vertex type names.",
    )
    vertex_from_map: dict[str, dict[str, str]] | None = PydanticField(
        default=None,
        description="Per-vertex-type field projection.",
    )

    @model_validator(mode="before")
    @classmethod
    def set_type(cls, data: Any) -> Any:
        if isinstance(data, dict) and "type_field" in data and "type" not in data:
            data = dict(data)
            data["type"] = "vertex_router"
        return data

normalize_actor_step(data)

Normalize a raw step dict so it has 'type' and flat structure for validation.

Source code in graflo/architecture/pipeline/runtime/actor/config/normalize.py
def normalize_actor_step(data: dict[str, Any]) -> dict[str, Any]:
    """Normalize a raw step dict so it has 'type' and flat structure for validation."""
    if not isinstance(data, dict):
        return data
    data = dict(data)
    if "type" in data:
        return data

    if "vertex" in data:
        data["type"] = "vertex"
        return data

    if "edge" in data:
        inner = data.pop("edge")
        if isinstance(inner, dict):
            data.update(inner)
        data["type"] = "edge"
        return data
    if ("source" in data or "from" in data) and ("target" in data or "to" in data):
        data = dict(data)
        data["type"] = "edge"
        return data
    if "create_edge" in data:
        inner = data.pop("create_edge")
        if isinstance(inner, dict):
            data.update(inner)
        data["type"] = "edge"
        return data

    if "descend" in data:
        inner = data.pop("descend")
        if isinstance(inner, dict):
            if "pipeline" in inner:
                inner["pipeline"] = [
                    normalize_actor_step(s) for s in _steps_list(inner["pipeline"])
                ]
            elif "apply" in inner:
                inner["pipeline"] = [
                    normalize_actor_step(s) for s in _steps_list(inner["apply"])
                ]
                del inner["apply"]
            data.update(inner)
        data["type"] = "descend"
        if "pipeline" not in data and "apply" in data:
            data["pipeline"] = [
                normalize_actor_step(s) for s in _steps_list(data["apply"])
            ]
            del data["apply"]
        return data

    if "vertex_router" in data:
        inner = data.pop("vertex_router")
        if isinstance(inner, dict):
            data.update(inner)
        data["type"] = "vertex_router"
        return data

    if "edge_router" in data:
        inner = data.pop("edge_router")
        if isinstance(inner, dict):
            data.update(inner)
        data["type"] = "edge_router"
        return data

    if "transform" in data:
        inner = data.pop("transform")
        if not isinstance(inner, dict):
            raise ValueError("transform step must be an object with rename or call.")
        data.update(inner)
        data["type"] = "transform"
        return data

    if "apply" in data:
        data["type"] = "descend"
        data["pipeline"] = [normalize_actor_step(s) for s in _steps_list(data["apply"])]
        del data["apply"]
        return data
    if "pipeline" in data:
        data["type"] = "descend"
        data["pipeline"] = [
            normalize_actor_step(s) for s in _steps_list(data["pipeline"])
        ]
        return data

    if "type" not in data and ("rename" in data or "call" in data):
        data = dict(data)
        data["type"] = "transform"
        return data

    return data

parse_root_config(*args, **kwargs)

Parse root input into a single ActorConfig (single step or descend pipeline).

Source code in graflo/architecture/pipeline/runtime/actor/config/parse.py
def parse_root_config(
    *args: Any,
    **kwargs: Any,
) -> ActorConfig:
    """Parse root input into a single ActorConfig (single step or descend pipeline)."""
    pipeline: list[dict[str, Any]] | None = None
    single: dict[str, Any] | None = None

    if kwargs and ("apply" in kwargs or "pipeline" in kwargs):
        raw = kwargs.get("pipeline") or kwargs.get("apply")
        if raw is not None:
            pipeline = cast(
                list[dict[str, Any]],
                list(raw) if isinstance(raw, list) else [raw],
            )
    elif args:
        if len(args) == 1 and isinstance(args[0], list):
            list_arg = args[0]
            if not all(isinstance(item, dict) for item in list_arg):
                raise ValueError("pipeline must be a list of dict actor steps")
            pipeline = [dict(item) for item in list_arg]
        elif len(args) == 1 and isinstance(args[0], dict):
            single = dict(args[0])
        elif args and all(isinstance(a, dict) for a in args):
            pipeline = [dict(a) for a in args]

    if pipeline is not None:
        configs = []
        for step in pipeline:
            normalized = normalize_actor_step(step)
            try:
                configs.append(_actor_config_adapter.validate_python(normalized))
            except ValidationError as err:
                _raise_step_validation_error(normalized, err)
        return DescendActorConfig.model_validate(
            {
                "type": "descend",
                "key": None,
                "any_key": False,
                "pipeline": configs,
            }
        )
    if single is not None:
        step_dict = {k: v for k, v in single.items() if k not in _STEP_STRIP_KEYS}
        return validate_actor_step(normalize_actor_step(step_dict))
    step_kwargs = {k: v for k, v in kwargs.items() if k not in _STEP_STRIP_KEYS}
    return validate_actor_step(normalize_actor_step(step_kwargs))

validate_actor_step(data)

Validate a normalized step dict as ActorConfig (discriminated union).

Source code in graflo/architecture/pipeline/runtime/actor/config/parse.py
def validate_actor_step(
    data: dict[str, Any],
) -> ActorConfig:
    """Validate a normalized step dict as ActorConfig (discriminated union)."""
    try:
        return _actor_config_adapter.validate_python(data)
    except ValidationError as err:
        _raise_step_validation_error(data, err)