Skip to content

graflo.architecture.pipeline.runtime.actor.config.parse

Parsing and validation of actor configuration.

parse_root_config(*args, **kwargs)

Parse root input into a single ActorConfig (single step or descend pipeline).

Source code in graflo/architecture/pipeline/runtime/actor/config/parse.py
def parse_root_config(
    *args: Any,
    **kwargs: Any,
) -> ActorConfig:
    """Parse root input into a single ActorConfig (single step or descend pipeline)."""
    pipeline: list[dict[str, Any]] | None = None
    single: dict[str, Any] | None = None

    if kwargs and ("apply" in kwargs or "pipeline" in kwargs):
        raw = kwargs.get("pipeline") or kwargs.get("apply")
        if raw is not None:
            pipeline = cast(
                list[dict[str, Any]],
                list(raw) if isinstance(raw, list) else [raw],
            )
    elif args:
        if len(args) == 1 and isinstance(args[0], list):
            list_arg = args[0]
            if not all(isinstance(item, dict) for item in list_arg):
                raise ValueError("pipeline must be a list of dict actor steps")
            pipeline = [dict(item) for item in list_arg]
        elif len(args) == 1 and isinstance(args[0], dict):
            single = dict(args[0])
        elif args and all(isinstance(a, dict) for a in args):
            pipeline = [dict(a) for a in args]

    if pipeline is not None:
        configs = []
        for step in pipeline:
            normalized = normalize_actor_step(step)
            try:
                configs.append(_actor_config_adapter.validate_python(normalized))
            except ValidationError as err:
                _raise_step_validation_error(normalized, err)
        return DescendActorConfig.model_validate(
            {
                "type": "descend",
                "key": None,
                "any_key": False,
                "pipeline": configs,
            }
        )
    if single is not None:
        step_dict = {k: v for k, v in single.items() if k not in _STEP_STRIP_KEYS}
        return validate_actor_step(normalize_actor_step(step_dict))
    step_kwargs = {k: v for k, v in kwargs.items() if k not in _STEP_STRIP_KEYS}
    return validate_actor_step(normalize_actor_step(step_kwargs))

validate_actor_step(data)

Validate a normalized step dict as ActorConfig (discriminated union).

Source code in graflo/architecture/pipeline/runtime/actor/config/parse.py
def validate_actor_step(
    data: dict[str, Any],
) -> ActorConfig:
    """Validate a normalized step dict as ActorConfig (discriminated union)."""
    try:
        return _actor_config_adapter.validate_python(data)
    except ValidationError as err:
        _raise_step_validation_error(data, err)