Skip to content

graflo.architecture.contract.declarations

Ingestion declarations: resources, transforms, and ingestion model.

DressConfig

Bases: ConfigBaseModel

Output dressing specification for pivoted transforms.

When a transform function returns a single scalar (e.g. round_str returns 6.43), DressConfig describes how to package that scalar together with the input field name into a dict.

Attributes:

Name Type Description
key str

Output field that receives the input field name (e.g. "Open").

value str

Output field that receives the function result (e.g. 6.43).

Source code in graflo/architecture/contract/declarations/transform.py
class DressConfig(ConfigBaseModel):
    """Output dressing specification for pivoted transforms.

    When a transform function returns a single scalar (e.g. ``round_str``
    returns ``6.43``), DressConfig describes how to package that scalar together
    with the input field name into a dict.

    Attributes:
        key: Output field that receives the **input field name** (e.g. "Open").
        value: Output field that receives the **function result** (e.g. 6.43).
    """

    key: str = Field(description="Output field name for the input key.")
    value: str = Field(description="Output field name for the function result.")

EdgeInferSpec

Bases: ConfigBaseModel

Selector for controlling inferred edge emission.

Source code in graflo/architecture/contract/declarations/resource.py
class EdgeInferSpec(ConfigBaseModel):
    """Selector for controlling inferred edge emission."""

    source: str = PydanticField(..., description="Edge source vertex name.")
    target: str = PydanticField(..., description="Edge target vertex name.")
    relation: str | None = PydanticField(
        default=None,
        description=(
            "Optional relation discriminator. If omitted, selector applies to all relations "
            "for (source, target)."
        ),
    )

    @property
    def edge_id(self) -> EdgeId:
        return self.source, self.target, self.relation

    def matches(self, edge_id: EdgeId) -> bool:
        source, target, relation = edge_id
        return (
            self.source == source
            and self.target == target
            and (self.relation is None or self.relation == relation)
        )

IngestionModel

Bases: ConfigBaseModel

Ingestion model (C): resources and transform registry.

Source code in graflo/architecture/contract/declarations/ingestion_model/model.py
class IngestionModel(ConfigBaseModel):
    """Ingestion model (C): resources and transform registry."""

    resources: list[Resource] = PydanticField(
        default_factory=list,
        description="List of resource definitions (data pipelines mapping to vertices/edges).",
    )
    transforms: list[ProtoTransform] = PydanticField(
        default_factory=list,
        description="List of named transforms available to resources.",
    )

    _resources: dict[str, Resource] = PrivateAttr()
    _transforms: dict[str, ProtoTransform] = PrivateAttr(default_factory=dict)

    @model_validator(mode="after")
    def _init_model(self) -> IngestionModel:
        """Build transform and resource lookup maps."""
        self._rebuild_runtime_state()
        return self

    def _rebuild_resource_map(self) -> None:
        """Validate resource name uniqueness and refresh lookup map."""
        names = [r.name for r in self.resources]
        c = Counter(names)
        for k, v in c.items():
            if v > 1:
                raise ValueError(f"resource name {k} used {v} times")
        object.__setattr__(self, "_resources", {r.name: r for r in self.resources})

    def _rebuild_transform_map(self) -> None:
        """Validate transform names and refresh name lookup map."""
        missing_names = [idx for idx, t in enumerate(self.transforms) if not t.name]
        if missing_names:
            raise ValueError(
                "All ingestion transforms must define a non-empty name. "
                f"Missing at indexes: {missing_names}"
            )

        transform_names = [t.name for t in self.transforms if t.name is not None]
        name_counts = Counter(transform_names)
        duplicates = sorted([name for name, count in name_counts.items() if count > 1])
        if duplicates:
            raise ValueError(f"Duplicate ingestion transform names found: {duplicates}")

        object.__setattr__(
            self,
            "_transforms",
            {t.name: t for t in self.transforms if t.name is not None},
        )

    def finish_init(
        self,
        core_schema: CoreSchema,
        *,
        strict_references: bool = False,
        dynamic_edge_feedback: bool = False,
    ) -> None:
        """Initialize resources against graph model and transform library."""
        self._rebuild_runtime_state()
        for r in self.resources:
            r.finish_init(
                vertex_config=core_schema.vertex_config,
                edge_config=core_schema.edge_config,
                transforms=self._transforms,
                strict_references=strict_references,
                dynamic_edge_feedback=dynamic_edge_feedback,
            )

    def _rebuild_runtime_state(self) -> None:
        """Rebuild transform and resource lookup maps."""
        self._rebuild_transform_map()
        self._rebuild_resource_map()

    def fetch_resource(self, name: str | None = None) -> Resource:
        """Fetch a resource by name or get the first available resource.

        Args:
            name: Optional name of the resource to fetch

        Returns:
            Resource: The requested resource

        Raises:
            ValueError: If the requested resource is not found or if no resources exist
        """
        _current_resource = None

        if name is not None:
            if name in self._resources:
                _current_resource = self._resources[name]
            else:
                raise ValueError(f"Resource {name} not found")
        else:
            if self._resources:
                _current_resource = self.resources[0]
            else:
                raise ValueError("Empty resource container :(")
        return _current_resource

    def prune_to_graph(
        self, core_schema: CoreSchema, disconnected: set[str] | None = None
    ) -> None:
        """Drop resource actors that reference disconnected vertices."""
        if disconnected is None:
            disconnected = (
                core_schema.vertex_config.vertex_set - core_schema.edge_config.vertices
            )
        if not disconnected:
            return

        def _mentions_disconnected(wrapper) -> bool:
            return bool(wrapper.actor.references_vertices() & disconnected)

        to_drop: list[Resource] = []
        for resource in self.resources:
            root = resource.root
            if _mentions_disconnected(root):
                to_drop.append(resource)
                continue
            root.remove_descendants_if(_mentions_disconnected)
            if not any(a.references_vertices() for a in root.collect_actors()):
                to_drop.append(resource)

        for r in to_drop:
            self.resources.remove(r)
            self._resources.pop(r.name, None)

fetch_resource(name=None)

Fetch a resource by name or get the first available resource.

Parameters:

Name Type Description Default
name str | None

Optional name of the resource to fetch

None

Returns:

Name Type Description
Resource Resource

The requested resource

Raises:

Type Description
ValueError

If the requested resource is not found or if no resources exist

Source code in graflo/architecture/contract/declarations/ingestion_model/model.py
def fetch_resource(self, name: str | None = None) -> Resource:
    """Fetch a resource by name or get the first available resource.

    Args:
        name: Optional name of the resource to fetch

    Returns:
        Resource: The requested resource

    Raises:
        ValueError: If the requested resource is not found or if no resources exist
    """
    _current_resource = None

    if name is not None:
        if name in self._resources:
            _current_resource = self._resources[name]
        else:
            raise ValueError(f"Resource {name} not found")
    else:
        if self._resources:
            _current_resource = self.resources[0]
        else:
            raise ValueError("Empty resource container :(")
    return _current_resource

finish_init(core_schema, *, strict_references=False, dynamic_edge_feedback=False)

Initialize resources against graph model and transform library.

Source code in graflo/architecture/contract/declarations/ingestion_model/model.py
def finish_init(
    self,
    core_schema: CoreSchema,
    *,
    strict_references: bool = False,
    dynamic_edge_feedback: bool = False,
) -> None:
    """Initialize resources against graph model and transform library."""
    self._rebuild_runtime_state()
    for r in self.resources:
        r.finish_init(
            vertex_config=core_schema.vertex_config,
            edge_config=core_schema.edge_config,
            transforms=self._transforms,
            strict_references=strict_references,
            dynamic_edge_feedback=dynamic_edge_feedback,
        )

prune_to_graph(core_schema, disconnected=None)

Drop resource actors that reference disconnected vertices.

Source code in graflo/architecture/contract/declarations/ingestion_model/model.py
def prune_to_graph(
    self, core_schema: CoreSchema, disconnected: set[str] | None = None
) -> None:
    """Drop resource actors that reference disconnected vertices."""
    if disconnected is None:
        disconnected = (
            core_schema.vertex_config.vertex_set - core_schema.edge_config.vertices
        )
    if not disconnected:
        return

    def _mentions_disconnected(wrapper) -> bool:
        return bool(wrapper.actor.references_vertices() & disconnected)

    to_drop: list[Resource] = []
    for resource in self.resources:
        root = resource.root
        if _mentions_disconnected(root):
            to_drop.append(resource)
            continue
        root.remove_descendants_if(_mentions_disconnected)
        if not any(a.references_vertices() for a in root.collect_actors()):
            to_drop.append(resource)

    for r in to_drop:
        self.resources.remove(r)
        self._resources.pop(r.name, None)

KeySelectionConfig

Bases: ConfigBaseModel

Selection of document keys for key-target transforms.

Source code in graflo/architecture/contract/declarations/transform.py
class KeySelectionConfig(ConfigBaseModel):
    """Selection of document keys for key-target transforms."""

    mode: Literal["all", "include", "exclude"] = Field(
        default="all",
        description=(
            "How keys are selected for target='keys': all=all keys, "
            "include=only specified keys, exclude=all except specified keys."
        ),
    )
    names: tuple[str, ...] = Field(
        default_factory=tuple,
        description="Keys used by include/exclude modes.",
    )

    @model_validator(mode="before")
    @classmethod
    def _normalize_names(cls, data: Any) -> Any:
        if not isinstance(data, dict):
            return data
        data = dict(data)
        names = data.get("names")
        if isinstance(names, str):
            data["names"] = (names,)
        elif isinstance(names, list):
            data["names"] = tuple(names)
        elif names is None:
            data["names"] = ()
        return data

    @model_validator(mode="after")
    def _validate_mode_names(self) -> Self:
        if self.mode == "all" and self.names:
            raise ValueError("keys.names must be empty when keys.mode='all'.")
        if self.mode in {"include", "exclude"} and not self.names:
            raise ValueError(
                "keys.names must be provided when keys.mode is include/exclude."
            )
        return self

ProtoTransform

Bases: ConfigBaseModel

Base class for transform definitions.

This class provides the foundation for data transformations, supporting both functional transformations and declarative mappings.

Attributes:

Name Type Description
name str | None

Optional name of the transform

module str | None

Optional module containing the transform function

params dict[str, Any]

Dictionary of transform parameters

foo str | None

Optional name of the transform function

input tuple[str, ...]

Tuple of input field names

output tuple[str, ...]

Tuple of output field names

dress DressConfig | None

Optional pivot dressing for scalar functional results

target Literal['values', 'keys']

Whether to transform field values or document keys

keys KeySelectionConfig

Key selection when target is keys

_foo Any

Internal reference to the transform function

Source code in graflo/architecture/contract/declarations/transform.py
class ProtoTransform(ConfigBaseModel):
    """Base class for transform definitions.

    This class provides the foundation for data transformations, supporting both
    functional transformations and declarative mappings.

    Attributes:
        name: Optional name of the transform
        module: Optional module containing the transform function
        params: Dictionary of transform parameters
        foo: Optional name of the transform function
        input: Tuple of input field names
        output: Tuple of output field names
        dress: Optional pivot dressing for scalar functional results
        target: Whether to transform field values or document keys
        keys: Key selection when target is keys
        _foo: Internal reference to the transform function
    """

    name: str | None = Field(
        default=None,
        description="Optional name for this transform (e.g. for reference in ingestion_model.transforms).",
    )
    module: str | None = Field(
        default=None,
        description="Python module path containing the transform function (e.g. my_package.transforms).",
    )
    params: dict[str, Any] = Field(
        default_factory=dict,
        description="Extra parameters passed to the transform function at runtime.",
    )
    foo: str | None = Field(
        default=None,
        description="Name of the callable in module to use as the transform function.",
    )
    input: tuple[str, ...] = Field(
        default_factory=tuple,
        description="Input field names passed to the transform function.",
    )
    output: tuple[str, ...] = Field(
        default_factory=tuple,
        description="Output field names produced by the transform (defaults to input if unset).",
    )
    input_groups: tuple[tuple[str, ...], ...] = Field(
        default_factory=tuple,
        description=(
            "Explicit groups of input fields for repeated tuple-style value calls."
        ),
    )
    output_groups: tuple[tuple[str, ...], ...] = Field(
        default_factory=tuple,
        description=(
            "Explicit output field groups aligned with input_groups for grouped value calls."
        ),
    )
    dress: DressConfig | None = Field(
        default=None,
        description=(
            "Dressing spec for pivoted output. Applies to ingestion_model.transforms "
            "entries and to inline transform steps. "
            "dress.key receives the input field name, dress.value receives the "
            "function result. E.g. dress={key: name, value: value} with "
            "input=(Open,) produces {name: 'Open', value: <result>}."
        ),
    )
    target: Literal["values", "keys"] = Field(
        default="values",
        description=(
            "Transform target. values=apply function to input values; "
            "keys=apply function to selected document keys."
        ),
    )
    keys: KeySelectionConfig = Field(
        default_factory=KeySelectionConfig,
        description="Key selection for key-target transforms.",
    )

    _foo: Any = PrivateAttr(default=None)

    @model_validator(mode="before")
    @classmethod
    def _normalize_input_output(cls, data: Any) -> Any:
        if not isinstance(data, dict):
            return data
        data = dict(data)
        if "dress" in data and isinstance(data["dress"], (list, tuple)):
            raise ValueError(
                "List-style `dress` is no longer supported. "
                "Use a dict: dress={key: ..., value: ...}."
            )
        for key in ("input", "output"):
            if key in data:
                if data[key] is not None:
                    data[key] = _tuple_it(data[key])
                else:
                    data[key] = ()
        for key in ("input_groups", "output_groups"):
            if key in data:
                if data[key] is None:
                    data[key] = ()
                else:
                    data[key] = _tuple_groups_it(data[key])
        _normalize_keys_in_dict(data)
        return data

    @model_validator(mode="after")
    def _init_foo_and_output(self) -> Self:
        if self.module is not None and self.foo is not None:
            try:
                _module = importlib.import_module(self.module)
            except Exception as e:
                raise TypeError(f"Provided module {self.module} is not valid: {e}")
            try:
                object.__setattr__(self, "_foo", getattr(_module, self.foo))
            except Exception as e:
                raise ValueError(
                    f"Could not instantiate transform function. Exception: {e}"
                )
        if self.dress is not None:
            if self.target == "keys":
                raise ValueError("target='keys' is not compatible with dress.")
            object.__setattr__(self, "output", (self.dress.key, self.dress.value))
        elif not self.output and self.input:
            object.__setattr__(self, "output", self.input)
        return self

    @classmethod
    def get_fields_members(cls) -> list[str]:
        """Get list of field members (public model fields)."""
        return list(cls.model_fields.keys())

    def apply(self, *args: Any, **kwargs: Any) -> Any:
        """Apply the raw transform function to the given arguments.

        This is the core function invocation without any input extraction or
        output dressing — purely ``self._foo(*args, **kwargs, **self.params)``.

        Raises:
            TransformException: If no transform function has been set.
        """
        if self._foo is None:
            raise TransformException("No transform function set")
        return self._foo(*args, **kwargs, **self.params)

    def __lt__(self, other: object) -> bool:
        """Compare transforms for ordering.

        Args:
            other: Other transform to compare with

        Returns:
            bool: True if this transform should be ordered before other
        """
        if not isinstance(other, ProtoTransform):
            return NotImplemented
        if self._foo is None and other._foo is not None:
            return True
        return False

__lt__(other)

Compare transforms for ordering.

Parameters:

Name Type Description Default
other object

Other transform to compare with

required

Returns:

Name Type Description
bool bool

True if this transform should be ordered before other

Source code in graflo/architecture/contract/declarations/transform.py
def __lt__(self, other: object) -> bool:
    """Compare transforms for ordering.

    Args:
        other: Other transform to compare with

    Returns:
        bool: True if this transform should be ordered before other
    """
    if not isinstance(other, ProtoTransform):
        return NotImplemented
    if self._foo is None and other._foo is not None:
        return True
    return False

apply(*args, **kwargs)

Apply the raw transform function to the given arguments.

This is the core function invocation without any input extraction or output dressing — purely self._foo(*args, **kwargs, **self.params).

Raises:

Type Description
TransformException

If no transform function has been set.

Source code in graflo/architecture/contract/declarations/transform.py
def apply(self, *args: Any, **kwargs: Any) -> Any:
    """Apply the raw transform function to the given arguments.

    This is the core function invocation without any input extraction or
    output dressing — purely ``self._foo(*args, **kwargs, **self.params)``.

    Raises:
        TransformException: If no transform function has been set.
    """
    if self._foo is None:
        raise TransformException("No transform function set")
    return self._foo(*args, **kwargs, **self.params)

get_fields_members() classmethod

Get list of field members (public model fields).

Source code in graflo/architecture/contract/declarations/transform.py
@classmethod
def get_fields_members(cls) -> list[str]:
    """Get list of field members (public model fields)."""
    return list(cls.model_fields.keys())

Resource

Bases: ConfigBaseModel

Resource configuration and processing.

Represents a data resource that can be processed and transformed into graph structures. Manages the processing pipeline through actors and handles data encoding, transformation, and mapping. Suitable for LLM-generated schema constituents.

Dynamic vertex-type routing is handled by vertex_router steps in the pipeline (see :class:~graflo.architecture.pipeline.runtime.actor.VertexRouterActor).

Source code in graflo/architecture/contract/declarations/resource.py
class Resource(ConfigBaseModel):
    """Resource configuration and processing.

    Represents a data resource that can be processed and transformed into graph
    structures. Manages the processing pipeline through actors and handles data
    encoding, transformation, and mapping. Suitable for LLM-generated schema
    constituents.

    Dynamic vertex-type routing is handled by ``vertex_router`` steps in the
    pipeline (see :class:`~graflo.architecture.pipeline.runtime.actor.VertexRouterActor`).
    """

    model_config = {"extra": "forbid"}

    name: str = PydanticField(
        ...,
        description="Name of the resource (e.g. table or file identifier).",
    )
    pipeline: list[dict[str, Any]] = PydanticField(
        ...,
        description="Pipeline of actor steps to apply in sequence (vertex, edge, transform, descend). "
        'Each step is a dict, e.g. {"vertex": "user"} or {"edge": {"from": "a", "to": "b"}}.',
        validation_alias=AliasChoices("pipeline", "apply"),
    )
    encoding: EncodingType = PydanticField(
        default=EncodingType.UTF_8,
        description="Character encoding for input/output (e.g. utf-8, ISO-8859-1).",
    )
    merge_collections: list[str] = PydanticField(
        default_factory=list,
        description="List of collection names to merge when writing to the graph.",
    )
    extra_weights: list[Edge] = PydanticField(
        default_factory=list,
        description="Additional edge weight configurations for this resource.",
    )
    types: dict[str, str] = PydanticField(
        default_factory=dict,
        description='Field name to Python type expression for casting (e.g. {"amount": "float"}).',
    )
    infer_edges: bool = PydanticField(
        default=True,
        description=(
            "If True, infer edges from current vertex population. "
            "If False, emit only edges explicitly declared as edge actors in the pipeline."
        ),
    )
    infer_edge_only: list[EdgeInferSpec] = PydanticField(
        default_factory=list,
        description=(
            "Optional allow-list for inferred edges. Applies only to inferred (greedy) edges, "
            "not explicit edge actors."
        ),
    )
    infer_edge_except: list[EdgeInferSpec] = PydanticField(
        default_factory=list,
        description=(
            "Optional deny-list for inferred edges. Applies only to inferred (greedy) edges, "
            "not explicit edge actors."
        ),
    )

    _root: ActorWrapper = PrivateAttr()
    _types: dict[str, Callable[..., Any]] = PrivateAttr(default_factory=dict)
    _vertex_config: VertexConfig = PrivateAttr()
    _edge_config: EdgeConfig = PrivateAttr()
    _executor: ActorExecutor = PrivateAttr()
    _initialized: bool = PrivateAttr(default=False)

    @model_validator(mode="after")
    def _build_root_and_types(self) -> Resource:
        """Build root ActorWrapper and resolve safe cast functions."""
        from graflo.architecture.pipeline.runtime.actor import ActorWrapper
        from graflo.architecture.pipeline.runtime.executor import ActorExecutor

        object.__setattr__(self, "_root", ActorWrapper(*self.pipeline))
        object.__setattr__(self, "_executor", ActorExecutor(self._root))
        object.__setattr__(self, "_types", {})
        for k, v in self.types.items():
            caster = _resolve_type_caster(v)
            if caster is not None:
                self._types[k] = caster
            else:
                logger.error(
                    "For resource %s for field %s failed to resolve cast type %s",
                    self.name,
                    k,
                    v,
                )
        # Placeholders until schema binds real configs.
        object.__setattr__(self, "_vertex_config", VertexConfig(vertices=[]))
        object.__setattr__(self, "_edge_config", EdgeConfig())
        object.__setattr__(self, "_initialized", False)
        self._validate_infer_edge_spec_policy()
        return self

    def _validate_infer_edge_spec_policy(self) -> None:
        if self.infer_edge_only and self.infer_edge_except:
            raise ValueError(
                "Resource infer_edge_only and infer_edge_except are mutually exclusive."
            )

    def _validate_infer_edge_spec_targets(self, edge_config: EdgeConfig) -> None:
        known_edge_ids = {edge_id for edge_id, _ in edge_config.items()}

        def _validate_list(field_name: str, specs: list[EdgeInferSpec]) -> None:
            unknown: list[EdgeId] = []
            for spec in specs:
                if not any(spec.matches(edge_id) for edge_id in known_edge_ids):
                    unknown.append(spec.edge_id)
            if unknown:
                raise ValueError(
                    f"Resource {field_name} contains unknown edge selectors: {unknown}"
                )

        _validate_list("infer_edge_only", self.infer_edge_only)
        _validate_list("infer_edge_except", self.infer_edge_except)

    @property
    def vertex_config(self) -> VertexConfig:
        """Vertex configuration (set by Schema.finish_init)."""
        return self._vertex_config

    @property
    def edge_config(self) -> EdgeConfig:
        """Edge configuration (set by Schema.finish_init)."""
        return self._edge_config

    @property
    def root(self) -> ActorWrapper:
        """Root actor wrapper for the processing pipeline."""
        return self._root

    def finish_init(
        self,
        vertex_config: VertexConfig,
        edge_config: EdgeConfig,
        transforms: dict[str, ProtoTransform],
        *,
        strict_references: bool = False,
        dynamic_edge_feedback: bool = False,
    ) -> None:
        """Complete resource initialization.

        Initializes the resource with vertex and edge configurations,
        and sets up the processing pipeline. Called by Schema after load.

        Args:
            vertex_config: Configuration for vertices
            edge_config: Configuration for edges
            transforms: Dictionary of available transforms
        """
        self._rebuild_runtime(
            vertex_config=vertex_config,
            edge_config=edge_config,
            transforms=transforms,
            strict_references=strict_references,
            dynamic_edge_feedback=dynamic_edge_feedback,
        )

    def _edge_ids_from_edge_actors(self) -> set[EdgeId]:
        """Collect (source, target, None) for every EdgeActor in this resource's pipeline.

        Used to auto-add to infer_edge_except so inferred edges do not duplicate
        edges produced by explicit edge actors.
        """
        from graflo.architecture.pipeline.runtime.actor import EdgeActor

        edge_actors = [
            a for a in self.root.collect_actors() if isinstance(a, EdgeActor)
        ]
        return {(ea.edge.source, ea.edge.target, None) for ea in edge_actors}

    def _validate_dynamic_edge_vertices_exist(
        self, vertex_config: VertexConfig
    ) -> None:
        """Ensure all vertices implied by dynamic edge controls are declared."""
        known_vertices = set(vertex_config.vertex_set)
        referenced_vertices: set[str] = set()

        for spec in self.infer_edge_only:
            referenced_vertices.add(spec.source)
            referenced_vertices.add(spec.target)

        for spec in self.infer_edge_except:
            referenced_vertices.add(spec.source)
            referenced_vertices.add(spec.target)

        for source, target, _ in self._edge_ids_from_edge_actors():
            referenced_vertices.add(source)
            referenced_vertices.add(target)

        missing_vertices = sorted(referenced_vertices - known_vertices)
        if missing_vertices:
            raise ValueError(
                "Resource dynamic edge references undefined vertices: "
                f"{missing_vertices}. "
                "Declare these vertices in vertex_config before using dynamic/inferred edges."
            )

    def _rebuild_runtime(
        self,
        *,
        vertex_config: VertexConfig,
        edge_config: EdgeConfig,
        transforms: dict[str, ProtoTransform],
        strict_references: bool = False,
        dynamic_edge_feedback: bool = False,
    ) -> None:
        """Rebuild runtime actor initialization state from typed context."""
        object.__setattr__(self, "_vertex_config", vertex_config)
        # Runtime actors may register dynamic edges; keep per-resource edge state.
        local_edge_config = EdgeConfig.model_validate(
            edge_config.to_dict(skip_defaults=False)
        )
        object.__setattr__(self, "_edge_config", local_edge_config)
        self._validate_dynamic_edge_vertices_exist(vertex_config)
        self._validate_infer_edge_spec_targets(self._edge_config)

        baseline_edge_ids = {edge_id for edge_id, _ in edge_config.items()}
        infer_edge_except = {spec.edge_id for spec in self.infer_edge_except}
        # When not using infer_edge_only, auto-add (s,t,None) to infer_edge_except
        # for any edge type handled by explicit EdgeActors in this resource.
        if not self.infer_edge_only:
            infer_edge_except |= self._edge_ids_from_edge_actors()

        from graflo.architecture.pipeline.runtime.actor import ActorInitContext

        logger.debug("total resource actor count : %s", self.root.count())
        init_ctx = ActorInitContext(
            vertex_config=vertex_config,
            edge_config=self._edge_config,
            transforms=transforms,
            infer_edges=self.infer_edges,
            infer_edge_only={spec.edge_id for spec in self.infer_edge_only},
            infer_edge_except=infer_edge_except,
            strict_references=strict_references,
        )
        self.root.finish_init(init_ctx=init_ctx)
        object.__setattr__(self, "_initialized", True)

        if dynamic_edge_feedback:
            # Edge actors register static edge definitions into the resource-local edge
            # config during finish_init(). Optionally propagate newly discovered edges
            # to the shared schema-level edge_config so schema definition and DB
            # writers can see them.
            for edge_id, edge in self._edge_config.items():
                if edge_id in baseline_edge_ids:
                    continue
                edge_config.update_edges(
                    edge.model_copy(deep=True), vertex_config=vertex_config
                )

        logger.debug("total resource actor count (after finit): %s", self.root.count())

        for e in self.extra_weights:
            e.finish_init(vertex_config)

    def __call__(self, doc: dict) -> defaultdict[GraphEntity, list]:
        """Process a document through the resource pipeline.

        Args:
            doc: Document to process

        Returns:
            defaultdict[GraphEntity, list]: Processed graph entities
        """
        if not self._initialized:
            raise RuntimeError(
                f"Resource '{self.name}' must be initialized via finish_init() before use."
            )
        extraction_ctx = self._executor.extract(doc)
        result = self._executor.assemble_result(extraction_ctx)
        return result.entities

    def count(self) -> int:
        """Total number of actors in the resource pipeline."""
        return self.root.count()

edge_config property

Edge configuration (set by Schema.finish_init).

root property

Root actor wrapper for the processing pipeline.

vertex_config property

Vertex configuration (set by Schema.finish_init).

__call__(doc)

Process a document through the resource pipeline.

Parameters:

Name Type Description Default
doc dict

Document to process

required

Returns:

Type Description
defaultdict[GraphEntity, list]

defaultdict[GraphEntity, list]: Processed graph entities

Source code in graflo/architecture/contract/declarations/resource.py
def __call__(self, doc: dict) -> defaultdict[GraphEntity, list]:
    """Process a document through the resource pipeline.

    Args:
        doc: Document to process

    Returns:
        defaultdict[GraphEntity, list]: Processed graph entities
    """
    if not self._initialized:
        raise RuntimeError(
            f"Resource '{self.name}' must be initialized via finish_init() before use."
        )
    extraction_ctx = self._executor.extract(doc)
    result = self._executor.assemble_result(extraction_ctx)
    return result.entities

count()

Total number of actors in the resource pipeline.

Source code in graflo/architecture/contract/declarations/resource.py
def count(self) -> int:
    """Total number of actors in the resource pipeline."""
    return self.root.count()

finish_init(vertex_config, edge_config, transforms, *, strict_references=False, dynamic_edge_feedback=False)

Complete resource initialization.

Initializes the resource with vertex and edge configurations, and sets up the processing pipeline. Called by Schema after load.

Parameters:

Name Type Description Default
vertex_config VertexConfig

Configuration for vertices

required
edge_config EdgeConfig

Configuration for edges

required
transforms dict[str, ProtoTransform]

Dictionary of available transforms

required
Source code in graflo/architecture/contract/declarations/resource.py
def finish_init(
    self,
    vertex_config: VertexConfig,
    edge_config: EdgeConfig,
    transforms: dict[str, ProtoTransform],
    *,
    strict_references: bool = False,
    dynamic_edge_feedback: bool = False,
) -> None:
    """Complete resource initialization.

    Initializes the resource with vertex and edge configurations,
    and sets up the processing pipeline. Called by Schema after load.

    Args:
        vertex_config: Configuration for vertices
        edge_config: Configuration for edges
        transforms: Dictionary of available transforms
    """
    self._rebuild_runtime(
        vertex_config=vertex_config,
        edge_config=edge_config,
        transforms=transforms,
        strict_references=strict_references,
        dynamic_edge_feedback=dynamic_edge_feedback,
    )

Transform

Bases: ProtoTransform

Concrete transform implementation.

Wraps a ProtoTransform with input extraction, output dressing, field mapping, and transform composition.

Attributes:

Name Type Description
fields tuple[str, ...]

Tuple of fields to transform

rename dict[str, str]

Dictionary mapping input fields to output fields

functional_transform bool

Whether this is a functional transform

Source code in graflo/architecture/contract/declarations/transform.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
class Transform(ProtoTransform):
    """Concrete transform implementation.

    Wraps a ProtoTransform with input extraction, output dressing, field
    mapping, and transform composition.

    Attributes:
        fields: Tuple of fields to transform
        rename: Dictionary mapping input fields to output fields
        functional_transform: Whether this is a functional transform
    """

    fields: tuple[str, ...] = Field(
        default_factory=tuple,
        description="Field names for declarative transform (used to derive input when input unset).",
    )
    rename: dict[str, str] = Field(
        default_factory=dict,
        description="Mapping of input_key -> output_key for pure field renaming (no function).",
    )
    strategy: Literal["single", "each", "all"] = Field(
        default="single",
        description=(
            "Functional call strategy. "
            "single: call function once with all input values. "
            "each: call function once per input field (unary). "
            "all: pass full document as a single argument."
        ),
    )
    passthrough_group_output: bool = Field(
        default=True,
        description=(
            "When grouped mode omits outputs, map function results back to input group keys."
        ),
    )

    functional_transform: bool = Field(
        default=False,
        description="True when a callable (module.foo) is set; False for pure map/dress transforms.",
    )

    @model_validator(mode="before")
    @classmethod
    def _normalize_fields(cls, data: Any) -> Any:
        if not isinstance(data, dict):
            return data
        data = dict(data)
        if "fields" in data and data["fields"] is not None:
            data["fields"] = _tuple_it(data["fields"])
        if "switch" in data:
            raise ValueError(
                "Legacy `switch` is no longer supported. Use `input` + `dress`."
            )
        return data

    @model_validator(mode="after")
    def _init_derived(self) -> Self:
        explicit_map = bool(self.rename)
        object.__setattr__(self, "functional_transform", self._foo is not None)
        next_input, next_output, next_map = self._derive_effective_io_and_map()
        object.__setattr__(self, "input", next_input)
        object.__setattr__(self, "output", next_output)
        object.__setattr__(self, "map", next_map)
        self._validate_configuration(explicit_map=explicit_map)
        return self

    def _derive_grouped_default_output(self) -> tuple[str, ...]:
        if not self.input_groups or self.output or self.output_groups:
            return self.output
        if not self.passthrough_group_output:
            return self.output
        scalar_names: list[str] = []
        for group in self.input_groups:
            if len(group) != 1:
                return self.output
            scalar_names.append(group[0])
        return tuple(scalar_names) if scalar_names else self.output

    def _derive_effective_io_and_map(
        self,
    ) -> tuple[tuple[str, ...], tuple[str, ...], dict[str, str]]:
        """Compute effective input/output/map once using explicit precedence."""
        next_input = self.input
        next_output = self._derive_grouped_default_output()
        next_map = dict(self.rename)

        if self.fields and not next_input:
            next_input = self.fields

        if next_map:
            if not next_input and not next_output:
                next_input = tuple(next_map.keys())
                next_output = tuple(next_map.values())
            elif not next_input:
                next_input = tuple(next_map.keys())
            elif not next_output:
                next_output = tuple(next_map.values())

        if self.dress is not None:
            next_output = (self.dress.key, self.dress.value)
        elif not next_output and next_input:
            next_output = next_input

        if (
            not next_map
            and next_input
            and next_output
            and len(next_input) == len(next_output)
        ):
            next_map = {src: dst for src, dst in zip(next_input, next_output)}

        return next_input, next_output, next_map

    def _init_io_from_map(self, force_init: bool = False) -> None:
        """Backwards-compatible shim; prefer sync_io_from_map()."""
        if not self.rename:
            return
        map_input = tuple(self.rename.keys())
        map_output = tuple(self.rename.values())
        if force_init or (not self.input and not self.output):
            object.__setattr__(self, "input", map_input)
            object.__setattr__(self, "output", map_output)
            return
        if not self.input:
            object.__setattr__(self, "input", map_input)
        elif not self.output:
            object.__setattr__(self, "output", map_output)

    def _validate_configuration(self, *, explicit_map: bool) -> None:
        """Validate that the transform has enough information to operate."""
        if self.target == "keys":
            if self.input_groups or self.output_groups:
                raise ValueError(
                    "target='keys' does not accept input_groups/output_groups."
                )
            if self._foo is None:
                raise ValueError("target='keys' requires a functional transform.")
            if self.rename:
                raise ValueError("target='keys' cannot be combined with map.")
            if self.input or self.output or self.fields:
                raise ValueError(
                    "target='keys' does not accept input/output/fields; use keys selector."
                )
            if self.dress is not None:
                raise ValueError("target='keys' is not compatible with dress.")
            if self.strategy != "single":
                raise ValueError(
                    "target='keys' uses implicit per-key execution and does not accept strategy."
                )
            return

        # Reject only user-specified map+function conflict. A derived map
        # (from input/output defaults) is valid for functional transforms.
        if explicit_map and self.rename and self._foo is not None:
            raise ValueError("map and functional transform cannot be used together.")
        if self.dress is not None:
            if self._foo is None:
                raise ValueError(
                    "dress requires a functional transform (module + foo)."
                )
            if len(self.input) != 1:
                raise ValueError("dress requires exactly one input field.")
        if self.strategy != "single" and self._foo is None:
            raise ValueError("strategy applies only to functional transforms.")
        if self.input_groups:
            if self._foo is None:
                raise ValueError(
                    "input_groups requires a functional transform (module + foo)."
                )
            if self.strategy != "single":
                raise ValueError(
                    "input_groups mode is explicit grouped execution and does not accept strategy."
                )
            if self.input or self.fields:
                raise ValueError("input_groups cannot be combined with input/fields.")
            if self.rename:
                raise ValueError("input_groups cannot be combined with map.")
            if self.dress is not None:
                raise ValueError("input_groups is not compatible with dress.")
            if self.output_groups and self.output:
                raise ValueError(
                    "Provide either output or output_groups for input_groups mode, not both."
                )
            if self.output_groups and len(self.output_groups) != len(self.input_groups):
                raise ValueError(
                    "output_groups must have same number of groups as input_groups."
                )
            if self.output and len(self.output) != len(self.input_groups):
                raise ValueError(
                    "When using input_groups with scalar outputs, output length must match number of input_groups."
                )
        elif self.output_groups:
            raise ValueError("output_groups requires input_groups.")
        if self._foo is not None and not self.input:
            if self.strategy != "all" and not self.input_groups:
                raise ValueError(
                    "Functional transforms require `input` (string or list of field names)."
                )
        if self.strategy == "all":
            if self.input or self.fields:
                raise ValueError("strategy='all' does not accept input/fields.")
            if self.dress is not None:
                raise ValueError("strategy='all' is not compatible with dress.")
        if self.strategy == "each":
            if not self.input:
                raise ValueError("strategy='each' requires one or more input fields.")
            if self.output and len(self.input) != len(self.output):
                raise ValueError(
                    "strategy='each' requires output length to match input length."
                )
        if (
            self._foo is None
            and self.dress is None
            and self.input
            and self.output
            and len(self.input) != len(self.output)
        ):
            raise ValueError(
                "Non-functional transforms require input/output to have the same length."
            )
        if (
            not self.input
            and not self.output
            and not self.input_groups
            and not self.output_groups
            and not self.name
            and not (self._foo is not None and self.strategy == "all")
        ):
            raise ValueError(
                "Either input/output, fields, map or name must be provided in "
                "Transform constructor."
            )

    def _refresh_derived(self) -> None:
        """Re-run derived state (e.g. map from input/output) after mutating attributes."""
        if self.rename or not self.input or not self.output:
            return
        if len(self.input) != len(self.output):
            return
        object.__setattr__(
            self, "map", {src: dst for src, dst in zip(self.input, self.output)}
        )

    def __call__(self, *nargs: Any, **kwargs: Any) -> dict[str, Any] | Any:
        """Execute the transform.

        Args:
            *nargs: Positional arguments for the transform
            **kwargs: Keyword arguments for the transform

        Returns:
            dict: Transformed data
        """
        if self.target == "keys":
            input_doc = nargs[0] if nargs and isinstance(nargs[0], dict) else None
            if input_doc is None:
                raise TransformException(
                    "target='keys' requires a document dictionary."
                )
            return self._transform_keys(input_doc, **kwargs)

        if self.input_groups:
            input_doc = nargs[0] if nargs and isinstance(nargs[0], dict) else None
            if input_doc is None:
                raise TransformException(
                    "input_groups transforms require a document dictionary."
                )
            return self._transform_input_groups(input_doc, **kwargs)

        if self.is_mapping:
            input_doc = nargs[0]
            if isinstance(input_doc, dict):
                output_values = [input_doc[k] for k in self.input]
            else:
                output_values = list(nargs)
        else:
            if self.strategy == "all":
                if nargs and isinstance(nargs[0], dict):
                    output_values = self.apply(nargs[0], **kwargs)
                else:
                    output_values = self.apply(*nargs, **kwargs)
            elif self.strategy == "each":
                if nargs and isinstance(input_doc := nargs[0], dict):
                    output_values = [
                        self.apply(input_doc[k], **kwargs) for k in self.input
                    ]
                else:
                    output_values = [self.apply(value, **kwargs) for value in nargs]
            else:
                if nargs and isinstance(input_doc := nargs[0], dict):
                    new_args = [input_doc[k] for k in self.input]
                    output_values = self.apply(*new_args, **kwargs)
                else:
                    output_values = self.apply(*nargs, **kwargs)

        if self.output:
            r = self._dress_as_dict(output_values)
        else:
            r = output_values
        return r

    def _apply_grouped_result(
        self,
        out: dict[str, Any],
        result: Any,
        input_group: tuple[str, ...],
        output_group: tuple[str, ...] | None,
        *,
        group_index: int,
    ) -> None:
        if output_group is not None:
            if isinstance(result, (list, tuple)):
                values = list(result)
            else:
                values = [result]
            if len(values) != len(output_group):
                raise TransformException(
                    f"input_groups[{group_index}] produced {len(values)} values, "
                    f"but output_groups[{group_index}] expects {len(output_group)}."
                )
            pairs = zip(output_group, values)
        elif self.output:
            pairs = ((self.output[group_index], result),)
        else:
            if isinstance(result, (list, tuple)):
                values = list(result)
                if len(values) != len(input_group):
                    raise TransformException(
                        f"input_groups[{group_index}] has {len(input_group)} fields, "
                        f"but transform returned {len(values)} values. "
                        "Provide output/output_groups explicitly to resolve mapping."
                    )
                pairs = zip(input_group, values)
            else:
                if len(input_group) != 1:
                    raise TransformException(
                        f"input_groups[{group_index}] has {len(input_group)} fields "
                        "but transform returned a scalar. "
                        "Provide output/output_groups explicitly for scalar group results."
                    )
                pairs = ((input_group[0], result),)
        for key, value in pairs:
            if key in out:
                raise TransformException(
                    f"Grouped transform produced duplicate output key '{key}'."
                )
            out[key] = value

    def _transform_input_groups(
        self, doc: dict[str, Any], **kwargs: Any
    ) -> dict[str, Any]:
        out: dict[str, Any] = {}
        for idx, input_group in enumerate(self.input_groups):
            values = [doc[k] for k in input_group]
            result = self.apply(*values, **kwargs)
            output_group = self.output_groups[idx] if self.output_groups else None
            self._apply_grouped_result(
                out,
                result,
                input_group,
                output_group,
                group_index=idx,
            )
        return out

    @property
    def is_mapping(self) -> bool:
        """True when the transform is pure mapping (no function)."""
        return self._foo is None

    def _dress_as_dict(self, transform_result: Any) -> dict[str, Any]:
        """Convert transform result to dictionary format.

        When ``dress`` is set the result is pivoted: the input field name is
        stored under ``dress.key`` and the function result under ``dress.value``.
        Otherwise the result is mapped positionally to ``output`` fields.
        """
        if self.dress is not None:
            return {
                self.dress.key: self.input[0],
                self.dress.value: transform_result,
            }
        elif isinstance(transform_result, (list, tuple)):
            return {k: v for k, v in zip(self.output, transform_result)}
        else:
            return {self.output[-1]: transform_result}

    def _selected_keys(self, doc: dict[str, Any]) -> set[str]:
        if self.keys.mode == "all":
            return set(doc.keys())
        selected = set(self.keys.names)
        if self.keys.mode == "include":
            return selected
        return {k for k in doc if k not in selected}

    def _transform_keys(self, doc: dict[str, Any], **kwargs: Any) -> dict[str, Any]:
        selected = self._selected_keys(doc)
        out: dict[str, Any] = {}
        for key, value in doc.items():
            new_key = self.apply(key, **kwargs) if key in selected else key
            if not isinstance(new_key, str):
                raise TransformException(
                    "Key transform functions must return str values."
                )
            if new_key in out:
                raise TransformException(
                    f"Key transform collision detected for key '{new_key}'."
                )
            out[new_key] = value
        return out

    @property
    def is_dummy(self) -> bool:
        """Check if this is a dummy transform.

        Returns:
            bool: True if this is a dummy transform
        """
        return self.name is not None and not self.rename and self._foo is None

    def merge_from(self, t: Transform) -> Transform:
        """Merge another transform's configuration into a copy of it.

        Returns a new Transform with values from self overriding t where set.
        Does not override ConfigBaseModel.update (in-place); use this for
        copy-and-merge semantics.

        Args:
            t: Transform to merge from

        Returns:
            Transform: New transform with merged configuration
        """
        t_copy = deepcopy(t)
        if self.input:
            t_copy.input = self.input
        if self.output:
            t_copy.output = self.output
        if self.params:
            t_copy.params = {**t_copy.params, **self.params}
        t_copy._refresh_derived()
        return t_copy

    def get_barebone(
        self, other: Transform | None
    ) -> tuple[Transform | None, Transform | None]:
        """Get the barebone transform configuration.

        Args:
            other: Optional transform to use as base

        Returns:
            tuple[Transform | None, Transform | None]: Updated self transform
            and transform to store in library
        """
        self_param = self.to_dict(exclude_defaults=True)
        if self.foo is not None:
            # self will be the lib transform
            return None, self
        elif other is not None and other.foo is not None:
            # init self from other
            self_param.pop("foo", None)
            self_param.pop("module", None)
            other_param = other.to_dict(exclude_defaults=True)
            other_param.update(self_param)
            return Transform(**other_param), None
        else:
            return None, None

is_dummy property

Check if this is a dummy transform.

Returns:

Name Type Description
bool bool

True if this is a dummy transform

is_mapping property

True when the transform is pure mapping (no function).

__call__(*nargs, **kwargs)

Execute the transform.

Parameters:

Name Type Description Default
*nargs Any

Positional arguments for the transform

()
**kwargs Any

Keyword arguments for the transform

{}

Returns:

Name Type Description
dict dict[str, Any] | Any

Transformed data

Source code in graflo/architecture/contract/declarations/transform.py
def __call__(self, *nargs: Any, **kwargs: Any) -> dict[str, Any] | Any:
    """Execute the transform.

    Args:
        *nargs: Positional arguments for the transform
        **kwargs: Keyword arguments for the transform

    Returns:
        dict: Transformed data
    """
    if self.target == "keys":
        input_doc = nargs[0] if nargs and isinstance(nargs[0], dict) else None
        if input_doc is None:
            raise TransformException(
                "target='keys' requires a document dictionary."
            )
        return self._transform_keys(input_doc, **kwargs)

    if self.input_groups:
        input_doc = nargs[0] if nargs and isinstance(nargs[0], dict) else None
        if input_doc is None:
            raise TransformException(
                "input_groups transforms require a document dictionary."
            )
        return self._transform_input_groups(input_doc, **kwargs)

    if self.is_mapping:
        input_doc = nargs[0]
        if isinstance(input_doc, dict):
            output_values = [input_doc[k] for k in self.input]
        else:
            output_values = list(nargs)
    else:
        if self.strategy == "all":
            if nargs and isinstance(nargs[0], dict):
                output_values = self.apply(nargs[0], **kwargs)
            else:
                output_values = self.apply(*nargs, **kwargs)
        elif self.strategy == "each":
            if nargs and isinstance(input_doc := nargs[0], dict):
                output_values = [
                    self.apply(input_doc[k], **kwargs) for k in self.input
                ]
            else:
                output_values = [self.apply(value, **kwargs) for value in nargs]
        else:
            if nargs and isinstance(input_doc := nargs[0], dict):
                new_args = [input_doc[k] for k in self.input]
                output_values = self.apply(*new_args, **kwargs)
            else:
                output_values = self.apply(*nargs, **kwargs)

    if self.output:
        r = self._dress_as_dict(output_values)
    else:
        r = output_values
    return r

get_barebone(other)

Get the barebone transform configuration.

Parameters:

Name Type Description Default
other Transform | None

Optional transform to use as base

required

Returns:

Type Description
Transform | None

tuple[Transform | None, Transform | None]: Updated self transform

Transform | None

and transform to store in library

Source code in graflo/architecture/contract/declarations/transform.py
def get_barebone(
    self, other: Transform | None
) -> tuple[Transform | None, Transform | None]:
    """Get the barebone transform configuration.

    Args:
        other: Optional transform to use as base

    Returns:
        tuple[Transform | None, Transform | None]: Updated self transform
        and transform to store in library
    """
    self_param = self.to_dict(exclude_defaults=True)
    if self.foo is not None:
        # self will be the lib transform
        return None, self
    elif other is not None and other.foo is not None:
        # init self from other
        self_param.pop("foo", None)
        self_param.pop("module", None)
        other_param = other.to_dict(exclude_defaults=True)
        other_param.update(self_param)
        return Transform(**other_param), None
    else:
        return None, None

merge_from(t)

Merge another transform's configuration into a copy of it.

Returns a new Transform with values from self overriding t where set. Does not override ConfigBaseModel.update (in-place); use this for copy-and-merge semantics.

Parameters:

Name Type Description Default
t Transform

Transform to merge from

required

Returns:

Name Type Description
Transform Transform

New transform with merged configuration

Source code in graflo/architecture/contract/declarations/transform.py
def merge_from(self, t: Transform) -> Transform:
    """Merge another transform's configuration into a copy of it.

    Returns a new Transform with values from self overriding t where set.
    Does not override ConfigBaseModel.update (in-place); use this for
    copy-and-merge semantics.

    Args:
        t: Transform to merge from

    Returns:
        Transform: New transform with merged configuration
    """
    t_copy = deepcopy(t)
    if self.input:
        t_copy.input = self.input
    if self.output:
        t_copy.output = self.output
    if self.params:
        t_copy.params = {**t_copy.params, **self.params}
    t_copy._refresh_derived()
    return t_copy

TransformException

Bases: Exception

Base exception for transform-related errors.

Source code in graflo/architecture/contract/declarations/transform.py
class TransformException(Exception):
    """Base exception for transform-related errors."""

    pass