Skip to content

graflo.architecture.contract

Declarative contracts: manifest, bindings, ingestion models, resources, transforms.

APIConnector

Bases: ResourceConnector

Connector for REST API endpoints.

Declares the non-secret access pattern (path, method, pagination). Runtime base_url and credentials are supplied via connector_connection -> conn_proxy -> :class:~graflo.hq.connection_provider.ApiGeneralizedConnConfig.

Attributes:

Name Type Description
path str

Relative endpoint path (e.g. /api/users).

method str

HTTP method (default GET).

params dict[str, Any]

Static query parameters.

pagination PaginationConfig | None

Pagination strategy and response path configuration.

row_annotations dict[str, Any]

Constant fields merged into every fetched row (doc wins).

headers dict[str, str]

Non-secret HTTP headers.

timeout float | None

Request timeout in seconds.

retries int

Number of retry attempts.

retry_backoff_factor float

Backoff factor for retries.

retry_status_forcelist list[int]

HTTP status codes to retry on.

verify bool

Verify SSL certificates.

Source code in graflo/architecture/contract/bindings/connectors.py
class APIConnector(ResourceConnector):
    """Connector for REST API endpoints.

    Declares the non-secret access pattern (path, method, pagination). Runtime
    ``base_url`` and credentials are supplied via ``connector_connection`` ->
    ``conn_proxy`` -> :class:`~graflo.hq.connection_provider.ApiGeneralizedConnConfig`.

    Attributes:
        path: Relative endpoint path (e.g. ``/api/users``).
        method: HTTP method (default ``GET``).
        params: Static query parameters.
        pagination: Pagination strategy and response path configuration.
        row_annotations: Constant fields merged into every fetched row (doc wins).
        headers: Non-secret HTTP headers.
        timeout: Request timeout in seconds.
        retries: Number of retry attempts.
        retry_backoff_factor: Backoff factor for retries.
        retry_status_forcelist: HTTP status codes to retry on.
        verify: Verify SSL certificates.
    """

    path: str = Field(..., description="Relative API endpoint path")
    method: str = "GET"
    params: dict[str, Any] = Field(default_factory=dict)
    pagination: PaginationConfig | None = None
    headers: dict[str, str] = Field(default_factory=dict)
    timeout: float | None = None
    retries: int = 0
    retry_backoff_factor: float = 0.1
    retry_status_forcelist: list[int] = Field(
        default_factory=lambda: [500, 502, 503, 504]
    )
    verify: bool = True

    @staticmethod
    def _join_url(base_url: str, path: str) -> str:
        return f"{base_url.rstrip('/')}/{path.lstrip('/')}"

    def matches(self, resource_identifier: str) -> bool:
        """Match resource name, connector name, or path tail."""
        if self.name is not None and resource_identifier == self.name:
            return True
        if self.resource_name is not None and resource_identifier == self.resource_name:
            return True
        path_tail = self.path.rstrip("/").rsplit("/", 1)[-1]
        return resource_identifier in {self.path, path_tail}

    def bound_source_kind(self) -> BoundSourceKind:
        return BoundSourceKind.API

    def build_api_config(
        self,
        *,
        base_url: str,
        auth: "ApiAuth | None" = None,
        default_headers: dict[str, str] | None = None,
        page_size_override: int | None = None,
    ) -> "APIConfig":
        """Merge contract fields with runtime connection config into ``APIConfig``."""
        from graflo.data_source.api import APIConfig

        headers = dict(default_headers or {})
        headers.update(self.headers)

        pagination = self.pagination
        if pagination is not None and page_size_override is not None:
            pagination = pagination.model_copy(
                update={
                    "request": pagination.request.model_copy(
                        update={"page_size": page_size_override}
                    )
                }
            )

        return APIConfig(
            url=self._join_url(base_url, self.path),
            method=self.method,
            headers=headers,
            auth=auth,
            params=dict(self.params),
            timeout=self.timeout,
            retries=self.retries,
            retry_backoff_factor=self.retry_backoff_factor,
            retry_status_forcelist=list(self.retry_status_forcelist),
            verify=self.verify,
            pagination=pagination,
            row_annotations=dict(self.row_annotations),
        )

build_api_config(*, base_url, auth=None, default_headers=None, page_size_override=None)

Merge contract fields with runtime connection config into APIConfig.

Source code in graflo/architecture/contract/bindings/connectors.py
def build_api_config(
    self,
    *,
    base_url: str,
    auth: "ApiAuth | None" = None,
    default_headers: dict[str, str] | None = None,
    page_size_override: int | None = None,
) -> "APIConfig":
    """Merge contract fields with runtime connection config into ``APIConfig``."""
    from graflo.data_source.api import APIConfig

    headers = dict(default_headers or {})
    headers.update(self.headers)

    pagination = self.pagination
    if pagination is not None and page_size_override is not None:
        pagination = pagination.model_copy(
            update={
                "request": pagination.request.model_copy(
                    update={"page_size": page_size_override}
                )
            }
        )

    return APIConfig(
        url=self._join_url(base_url, self.path),
        method=self.method,
        headers=headers,
        auth=auth,
        params=dict(self.params),
        timeout=self.timeout,
        retries=self.retries,
        retry_backoff_factor=self.retry_backoff_factor,
        retry_status_forcelist=list(self.retry_status_forcelist),
        verify=self.verify,
        pagination=pagination,
        row_annotations=dict(self.row_annotations),
    )

matches(resource_identifier)

Match resource name, connector name, or path tail.

Source code in graflo/architecture/contract/bindings/connectors.py
def matches(self, resource_identifier: str) -> bool:
    """Match resource name, connector name, or path tail."""
    if self.name is not None and resource_identifier == self.name:
        return True
    if self.resource_name is not None and resource_identifier == self.resource_name:
        return True
    path_tail = self.path.rstrip("/").rsplit("/", 1)[-1]
    return resource_identifier in {self.path, path_tail}

ApiResponseStructure

Bases: ConfigBaseModel

Maps JSON response envelope fields to extraction and pagination signals.

Source code in graflo/architecture/contract/bindings/connectors.py
class ApiResponseStructure(ConfigBaseModel):
    """Maps JSON response envelope fields to extraction and pagination signals."""

    records_path: str | None = Field(
        default=None,
        description="Dot path to the record list (e.g. ``results``).",
    )
    total_count_path: str | None = Field(
        default=None,
        description="Dot path to total item count across all pages (e.g. ``count``).",
    )
    offset_path: str | None = Field(
        default=None,
        description="Dot path to echoed page start index (e.g. ``offset``).",
    )
    next_offset_path: str | None = Field(
        default=None,
        description=(
            "Dot path to server-provided next offset for the following request "
            "(e.g. ``next_offset``)."
        ),
    )
    has_more_path: str | None = Field(
        default=None,
        description="Dot path to a boolean more-pages flag (e.g. ``has_more``).",
    )
    cursor_path: str | None = Field(
        default=None,
        description="Dot path to the next opaque cursor token.",
    )
    batch_metadata_paths: dict[str, str] = Field(
        default_factory=dict,
        description=(
            "Map row annotation keys to response dot paths "
            "(e.g. ``_batch_id: result_id``)."
        ),
    )
    auto_detect: bool = Field(
        default=False,
        description=(
            "When true, infer unset response paths from the first response body."
        ),
    )

BoundSourceKind

Bases: BaseEnum

Physical source modality for a bound connector (how rows are retrieved).

This describes the connector-backed access pattern, not the abstract ingestion resource. File format (CSV, JSON, etc.) is chosen by the loader from file extensions.

Attributes:

Name Type Description
FILE

File-based connector (directory + pattern or paths).

SQL_TABLE

SQL table / database-backed connector.

SPARQL

SPARQL / RDF connector (endpoint or local RDF via rdflib).

API

REST API connector (path + pagination on a runtime base URL).

Source code in graflo/architecture/contract/bindings/connectors.py
class BoundSourceKind(BaseEnum):
    """Physical source modality for a bound connector (how rows are retrieved).

    This describes the connector-backed access pattern, not the abstract
    ingestion resource. File format (CSV, JSON, etc.) is chosen by the loader
    from file extensions.

    Attributes:
        FILE: File-based connector (directory + pattern or paths).
        SQL_TABLE: SQL table / database-backed connector.
        SPARQL: SPARQL / RDF connector (endpoint or local RDF via rdflib).
        API: REST API connector (path + pagination on a runtime base URL).
    """

    FILE = "file"
    SQL_TABLE = "sql_table"
    SPARQL = "sparql"
    API = "api"

ColumnTimeFilter

Bases: ConfigBaseModel

Predicate on a single date/time column (SQL-friendly via :class:FilterExpression).

Source code in graflo/architecture/contract/bindings/column_time_filter.py
class ColumnTimeFilter(ConfigBaseModel):
    """Predicate on a single date/time column (SQL-friendly via :class:`FilterExpression`)."""

    column: str = Field(..., description="Column name for the time predicate.")
    start: str | None = Field(
        default=None,
        description="Lower bound (ISO date or datetime). Interpreted with start_inclusive.",
    )
    end: str | None = Field(
        default=None,
        description="Upper bound (ISO date or datetime). Interpreted with end_inclusive.",
    )
    interval: str | None = Field(
        default=None,
        description='Pandas timedelta string (e.g. "7D", "2H"); requires start; '
        "defines [start, start + interval). Mutually exclusive with end.",
    )
    not_equals: str | None = Field(
        default=None,
        description="If set, render column != value. Mutually exclusive with start/end/interval.",
    )
    start_inclusive: bool = Field(
        default=True,
        description="If True (default), lower bound uses >= when start is set.",
    )
    end_inclusive: bool = Field(
        default=False,
        description="If False (default), upper bound uses < when end is set.",
    )

    @model_validator(mode="after")
    def _check_shape(self) -> Self:
        has_pred = any(
            v is not None
            for v in (self.start, self.end, self.interval, self.not_equals)
        )
        if not has_pred:
            # Column-only hint (e.g. introspection sets datetime column without a WHERE).
            return self
        if self.not_equals is not None:
            if any(v is not None for v in (self.start, self.end, self.interval)):
                raise ValueError(
                    "not_equals cannot be combined with start, end, or interval"
                )
            return self
        if self.interval is not None:
            if self.start is None:
                raise ValueError("interval requires start")
            if self.end is not None:
                raise ValueError(
                    "interval cannot be combined with end; use start + interval"
                )
            self._validated_timedelta()
            return self
        if self.start is None and self.end is None:
            raise ValueError(
                "ColumnTimeFilter requires at least one of: start, end, interval, not_equals"
            )
        return self

    def _validated_timedelta(self) -> pd.Timedelta:
        assert self.interval is not None
        try:
            td = pd.Timedelta(self.interval)
        except (ValueError, TypeError) as e:
            raise ValueError(
                f"Invalid pandas timedelta string for interval: {self.interval!r}"
            ) from e
        if pd.isna(td):
            raise ValueError(
                f"Invalid pandas timedelta string for interval: {self.interval!r}"
            )
        return td

    def _interval_upper_literal(self) -> str:
        assert self.start is not None and self.interval is not None
        start_dt, date_only = parse_iso_date_or_datetime(self.start)
        delta = self._validated_timedelta().to_pytimedelta()
        end_dt = start_dt + delta
        upper_date_only = date_only and (end_dt.time() == time.min)
        return format_sql_literal(end_dt, upper_date_only)

    def _lower_literal(self) -> str:
        assert self.start is not None
        dt, date_only = parse_iso_date_or_datetime(self.start)
        return format_sql_literal(dt, date_only)

    def _upper_literal(self) -> str:
        assert self.end is not None
        dt, date_only = parse_iso_date_or_datetime(self.end)
        return format_sql_literal(dt, date_only)

    def as_filter_expression(self) -> FilterExpression | None:
        """Return a single composite AND of leaves, or None if misconfigured."""
        from graflo.filter.onto import (
            ComparisonOperator,
            FilterExpression,
            LogicalOperator,
        )

        if self.not_equals is not None:
            return FilterExpression(
                kind="leaf",
                field=self.column,
                cmp_operator=ComparisonOperator.NEQ,
                value=[self.not_equals],
            )

        leaves: list[FilterExpression] = []

        if self.interval is not None:
            assert self.start is not None
            # Half-open window [start, start + interval).
            leaves.append(
                FilterExpression(
                    kind="leaf",
                    field=self.column,
                    cmp_operator=ComparisonOperator.GE,
                    value=[self._lower_literal()],
                )
            )
            leaves.append(
                FilterExpression(
                    kind="leaf",
                    field=self.column,
                    cmp_operator=ComparisonOperator.LT,
                    value=[self._interval_upper_literal()],
                )
            )
        else:
            if self.start is not None:
                lower_op = (
                    ComparisonOperator.GE
                    if self.start_inclusive
                    else ComparisonOperator.GT
                )
                leaves.append(
                    FilterExpression(
                        kind="leaf",
                        field=self.column,
                        cmp_operator=lower_op,
                        value=[self._lower_literal()],
                    )
                )
            if self.end is not None:
                upper_op = (
                    ComparisonOperator.LE
                    if self.end_inclusive
                    else ComparisonOperator.LT
                )
                leaves.append(
                    FilterExpression(
                        kind="leaf",
                        field=self.column,
                        cmp_operator=upper_op,
                        value=[self._upper_literal()],
                    )
                )

        if not leaves:
            return None
        if len(leaves) == 1:
            return leaves[0]
        return FilterExpression(
            kind="composite",
            operator=LogicalOperator.AND,
            deps=leaves,
        )

as_filter_expression()

Return a single composite AND of leaves, or None if misconfigured.

Source code in graflo/architecture/contract/bindings/column_time_filter.py
def as_filter_expression(self) -> FilterExpression | None:
    """Return a single composite AND of leaves, or None if misconfigured."""
    from graflo.filter.onto import (
        ComparisonOperator,
        FilterExpression,
        LogicalOperator,
    )

    if self.not_equals is not None:
        return FilterExpression(
            kind="leaf",
            field=self.column,
            cmp_operator=ComparisonOperator.NEQ,
            value=[self.not_equals],
        )

    leaves: list[FilterExpression] = []

    if self.interval is not None:
        assert self.start is not None
        # Half-open window [start, start + interval).
        leaves.append(
            FilterExpression(
                kind="leaf",
                field=self.column,
                cmp_operator=ComparisonOperator.GE,
                value=[self._lower_literal()],
            )
        )
        leaves.append(
            FilterExpression(
                kind="leaf",
                field=self.column,
                cmp_operator=ComparisonOperator.LT,
                value=[self._interval_upper_literal()],
            )
        )
    else:
        if self.start is not None:
            lower_op = (
                ComparisonOperator.GE
                if self.start_inclusive
                else ComparisonOperator.GT
            )
            leaves.append(
                FilterExpression(
                    kind="leaf",
                    field=self.column,
                    cmp_operator=lower_op,
                    value=[self._lower_literal()],
                )
            )
        if self.end is not None:
            upper_op = (
                ComparisonOperator.LE
                if self.end_inclusive
                else ComparisonOperator.LT
            )
            leaves.append(
                FilterExpression(
                    kind="leaf",
                    field=self.column,
                    cmp_operator=upper_op,
                    value=[self._upper_literal()],
                )
            )

    if not leaves:
        return None
    if len(leaves) == 1:
        return leaves[0]
    return FilterExpression(
        kind="composite",
        operator=LogicalOperator.AND,
        deps=leaves,
    )

FileConnector

Bases: ResourceConnector

Connector for matching files.

Attributes:

Name Type Description
regex str | None

Regular expression pattern for matching filenames

sub_path Path

Path to search for matching files (default: "./")

time_filter ColumnTimeFilter | None

Optional structured filter on a date/time column (shared with :class:TableConnector), using :class:~graflo.architecture.contract.bindings.column_time_filter.ColumnTimeFilter.

Source code in graflo/architecture/contract/bindings/connectors.py
class FileConnector(ResourceConnector):
    """Connector for matching files.

    Attributes:
        regex: Regular expression pattern for matching filenames
        sub_path: Path to search for matching files (default: "./")
        time_filter: Optional structured filter on a date/time column (shared with
            :class:`TableConnector`), using :class:`~graflo.architecture.contract.bindings.column_time_filter.ColumnTimeFilter`.
    """

    regex: str | None = None
    sub_path: pathlib.Path = Field(default_factory=lambda: pathlib.Path("./"))
    time_filter: ColumnTimeFilter | None = None

    @model_validator(mode="after")
    def _validate_file_connector(self) -> Self:
        """Ensure sub_path is a Path."""
        if not isinstance(self.sub_path, pathlib.Path):
            object.__setattr__(self, "sub_path", pathlib.Path(self.sub_path))
        if self.row_annotations:
            raise ValueError("row_annotations is not implemented for FileConnector")
        return self

    @property
    def date_field(self) -> str | None:
        """Column used for time filtering, if any (compat alias for ``time_filter.column``)."""
        return self.time_filter.column if self.time_filter else None

    def matches(self, resource_identifier: str) -> bool:
        """Check if connector matches a filename.

        Args:
            resource_identifier: Filename to match

        Returns:
            bool: True if connector matches
        """
        if self.regex is None:
            return False
        return bool(re.match(self.regex, resource_identifier))

    def bound_source_kind(self) -> BoundSourceKind:
        """File connector always uses ``BoundSourceKind.FILE``."""
        return BoundSourceKind.FILE

date_field property

Column used for time filtering, if any (compat alias for time_filter.column).

bound_source_kind()

File connector always uses BoundSourceKind.FILE.

Source code in graflo/architecture/contract/bindings/connectors.py
def bound_source_kind(self) -> BoundSourceKind:
    """File connector always uses ``BoundSourceKind.FILE``."""
    return BoundSourceKind.FILE

matches(resource_identifier)

Check if connector matches a filename.

Parameters:

Name Type Description Default
resource_identifier str

Filename to match

required

Returns:

Name Type Description
bool bool

True if connector matches

Source code in graflo/architecture/contract/bindings/connectors.py
def matches(self, resource_identifier: str) -> bool:
    """Check if connector matches a filename.

    Args:
        resource_identifier: Filename to match

    Returns:
        bool: True if connector matches
    """
    if self.regex is None:
        return False
    return bool(re.match(self.regex, resource_identifier))

GraphManifest

Bases: ConfigBaseModel

Canonical config contract for graph schema, ingestion, and bindings.

Source code in graflo/architecture/contract/manifest.py
class GraphManifest(ConfigBaseModel):
    """Canonical config contract for graph schema, ingestion, and bindings."""

    model_config = ConfigDict(populate_by_name=True)

    graph_schema: Schema | None = PydanticField(
        default=None,
        description="Logical graph schema contract.",
        validation_alias=AliasChoices("schema", "graph_schema"),
        serialization_alias="schema",
    )
    ingestion_model: IngestionModel | None = PydanticField(
        default=None,
        description="Ingestion resources and transforms.",
    )
    bindings: Bindings | None = PydanticField(
        default=None,
        description="Bindings mapping resources to concrete data sources.",
    )

    @classmethod
    def from_config(cls, data: dict[str, Any]) -> "GraphManifest":
        """Build a manifest from a Python mapping payload."""
        return cls.from_dict(data)

    @model_validator(mode="after")
    def _validate_manifest(self) -> "GraphManifest":
        if (
            self.graph_schema is None
            and self.ingestion_model is None
            and self.bindings is None
        ):
            raise ValueError(
                "GraphManifest requires at least one block: "
                "schema, ingestion_model, or bindings."
            )
        return self

    def finish_init(
        self,
        *,
        strict_references: bool = False,
        dynamic_edge_feedback: bool = False,
    ) -> None:
        """Initialize model internals and cross-block runtime links."""
        if self.graph_schema is not None:
            self.graph_schema.finish_init()
        if self.graph_schema is not None and self.ingestion_model is not None:
            self.ingestion_model.finish_init(
                self.graph_schema.core_schema,
                strict_references=strict_references,
                dynamic_edge_feedback=dynamic_edge_feedback,
                target_db_flavor=self.graph_schema.db_profile.db_flavor,
            )

    def require_schema(self) -> Schema:
        if self.graph_schema is None:
            raise ValueError("GraphManifest is missing required 'schema' block.")
        return self.graph_schema

    def require_ingestion_model(self) -> IngestionModel:
        if self.ingestion_model is None:
            raise ValueError(
                "GraphManifest is missing required 'ingestion_model' block."
            )
        return self.ingestion_model

    def require_bindings(self) -> Bindings:
        if self.bindings is None:
            raise ValueError("GraphManifest is missing required 'bindings' block.")
        return self.bindings

finish_init(*, strict_references=False, dynamic_edge_feedback=False)

Initialize model internals and cross-block runtime links.

Source code in graflo/architecture/contract/manifest.py
def finish_init(
    self,
    *,
    strict_references: bool = False,
    dynamic_edge_feedback: bool = False,
) -> None:
    """Initialize model internals and cross-block runtime links."""
    if self.graph_schema is not None:
        self.graph_schema.finish_init()
    if self.graph_schema is not None and self.ingestion_model is not None:
        self.ingestion_model.finish_init(
            self.graph_schema.core_schema,
            strict_references=strict_references,
            dynamic_edge_feedback=dynamic_edge_feedback,
            target_db_flavor=self.graph_schema.db_profile.db_flavor,
        )

from_config(data) classmethod

Build a manifest from a Python mapping payload.

Source code in graflo/architecture/contract/manifest.py
@classmethod
def from_config(cls, data: dict[str, Any]) -> "GraphManifest":
    """Build a manifest from a Python mapping payload."""
    return cls.from_dict(data)

IngestionModel

Bases: ConfigBaseModel

Ingestion model (C): resources and transform registry.

Source code in graflo/architecture/contract/ingestion/model.py
class IngestionModel(ConfigBaseModel):
    """Ingestion model (C): resources and transform registry."""

    edges_on_duplicate: Literal["ignore", "upsert"] = PydanticField(
        default="ignore",
        description=(
            "How batch edge writes tolerate an already-matching edge. Passed through to "
            ":meth:`~graflo.db.conn.Connection.insert_edges_batch` where the target backend "
            "supports it."
        ),
    )
    resources: list[ResourceConfig] = PydanticField(
        default_factory=list,
        description="List of resource definitions (data pipelines mapping to vertices/edges).",
    )
    transforms: list[ProtoTransform] = PydanticField(
        default_factory=list,
        description="List of named transforms available to resources.",
    )

    _resources: dict[str, ResourceConfig] = PrivateAttr()
    _runtimes: dict[str, ResourceRuntime] = PrivateAttr(default_factory=dict)
    _transforms: dict[str, ProtoTransform] = PrivateAttr(default_factory=dict)
    _combined_edge_derivation: EdgeDerivationRegistry = PrivateAttr(
        default_factory=EdgeDerivationRegistry
    )

    @model_validator(mode="after")
    def _init_model(self) -> IngestionModel:
        """Build transform and resource lookup maps."""
        self._rebuild_config_state()
        return self

    def _rebuild_resource_map(self) -> None:
        """Validate resource name uniqueness and refresh lookup map."""
        names = [r.name for r in self.resources]
        c = Counter(names)
        for k, v in c.items():
            if v > 1:
                raise ValueError(f"resource name {k} used {v} times")
        object.__setattr__(self, "_resources", {r.name: r for r in self.resources})

    def _rebuild_transform_map(self) -> None:
        """Validate transform names and refresh name lookup map."""
        missing_names = [idx for idx, t in enumerate(self.transforms) if not t.name]
        if missing_names:
            raise ValueError(
                "All ingestion transforms must define a non-empty name. "
                f"Missing at indexes: {missing_names}"
            )

        transform_names = [t.name for t in self.transforms if t.name is not None]
        name_counts = Counter(transform_names)
        duplicates = sorted([name for name, count in name_counts.items() if count > 1])
        if duplicates:
            raise ValueError(f"Duplicate ingestion transform names found: {duplicates}")

        object.__setattr__(
            self,
            "_transforms",
            {t.name: t for t in self.transforms if t.name is not None},
        )

    def finish_init(
        self,
        core_schema: CoreSchema,
        *,
        strict_references: bool = False,
        dynamic_edge_feedback: bool = False,
        allowed_vertex_names: set[str] | None = None,
        target_db_flavor: DBType | None = None,
    ) -> None:
        """Build per-resource runtimes against graph model and transform library."""
        self._rebuild_config_state()
        runtimes: dict[str, ResourceRuntime] = {}
        for config in self.resources:
            runtimes[config.name] = ResourceRuntime(
                config,
                vertex_config=core_schema.vertex_config,
                edge_config=core_schema.edge_config,
                transforms=self._transforms,
                strict_references=strict_references,
                dynamic_edge_feedback=dynamic_edge_feedback,
                allowed_vertex_names=allowed_vertex_names,
                target_db_flavor=target_db_flavor,
            )
        object.__setattr__(self, "_runtimes", runtimes)

    def _rebuild_config_state(self) -> None:
        """Rebuild transform and resource lookup maps."""
        self._rebuild_transform_map()
        self._rebuild_resource_map()

    def fetch_resource(self, name: str | None = None) -> ResourceRuntime:
        """Fetch an initialized runtime resource by name."""
        if name is not None:
            runtime = self._runtimes.get(name)
            if runtime is None:
                raise ValueError(f"Resource {name} not found")
            return runtime
        if self._runtimes:
            return next(iter(self._runtimes.values()))
        if self.resources:
            raise RuntimeError(
                "IngestionModel resources exist but runtimes were not built; "
                "call finish_init() first."
            )
        raise ValueError("Empty resource container :(")

    def fetch_resource_config(self, name: str) -> ResourceConfig:
        """Fetch declarative resource config by name."""
        config = self._resources.get(name)
        if config is None:
            raise ValueError(f"Resource {name} not found")
        return config

    def prune_to_graph(
        self, core_schema: CoreSchema, disconnected: set[str] | None = None
    ) -> None:
        """Drop resource actors that reference disconnected vertices."""
        if disconnected is None:
            disconnected = (
                core_schema.vertex_config.vertex_set - core_schema.edge_config.vertices
            )
        if not disconnected:
            return

        def _mentions_disconnected(wrapper: ActorWrapper) -> bool:
            return bool(wrapper.actor.references_vertices() & disconnected)

        to_drop: list[ResourceConfig] = []
        for resource_config in self.resources:
            root = ActorWrapper(*resource_config.pipeline)
            if _mentions_disconnected(root):
                to_drop.append(resource_config)
                continue
            root.remove_descendants_if(_mentions_disconnected)
            if not any(a.references_vertices() for a in root.collect_actors()):
                to_drop.append(resource_config)

        for dropped in to_drop:
            self.resources.remove(dropped)
            self._resources.pop(dropped.name, None)
            self._runtimes.pop(dropped.name, None)
        if to_drop:
            self._rebuild_config_state()

fetch_resource(name=None)

Fetch an initialized runtime resource by name.

Source code in graflo/architecture/contract/ingestion/model.py
def fetch_resource(self, name: str | None = None) -> ResourceRuntime:
    """Fetch an initialized runtime resource by name."""
    if name is not None:
        runtime = self._runtimes.get(name)
        if runtime is None:
            raise ValueError(f"Resource {name} not found")
        return runtime
    if self._runtimes:
        return next(iter(self._runtimes.values()))
    if self.resources:
        raise RuntimeError(
            "IngestionModel resources exist but runtimes were not built; "
            "call finish_init() first."
        )
    raise ValueError("Empty resource container :(")

fetch_resource_config(name)

Fetch declarative resource config by name.

Source code in graflo/architecture/contract/ingestion/model.py
def fetch_resource_config(self, name: str) -> ResourceConfig:
    """Fetch declarative resource config by name."""
    config = self._resources.get(name)
    if config is None:
        raise ValueError(f"Resource {name} not found")
    return config

finish_init(core_schema, *, strict_references=False, dynamic_edge_feedback=False, allowed_vertex_names=None, target_db_flavor=None)

Build per-resource runtimes against graph model and transform library.

Source code in graflo/architecture/contract/ingestion/model.py
def finish_init(
    self,
    core_schema: CoreSchema,
    *,
    strict_references: bool = False,
    dynamic_edge_feedback: bool = False,
    allowed_vertex_names: set[str] | None = None,
    target_db_flavor: DBType | None = None,
) -> None:
    """Build per-resource runtimes against graph model and transform library."""
    self._rebuild_config_state()
    runtimes: dict[str, ResourceRuntime] = {}
    for config in self.resources:
        runtimes[config.name] = ResourceRuntime(
            config,
            vertex_config=core_schema.vertex_config,
            edge_config=core_schema.edge_config,
            transforms=self._transforms,
            strict_references=strict_references,
            dynamic_edge_feedback=dynamic_edge_feedback,
            allowed_vertex_names=allowed_vertex_names,
            target_db_flavor=target_db_flavor,
        )
    object.__setattr__(self, "_runtimes", runtimes)

prune_to_graph(core_schema, disconnected=None)

Drop resource actors that reference disconnected vertices.

Source code in graflo/architecture/contract/ingestion/model.py
def prune_to_graph(
    self, core_schema: CoreSchema, disconnected: set[str] | None = None
) -> None:
    """Drop resource actors that reference disconnected vertices."""
    if disconnected is None:
        disconnected = (
            core_schema.vertex_config.vertex_set - core_schema.edge_config.vertices
        )
    if not disconnected:
        return

    def _mentions_disconnected(wrapper: ActorWrapper) -> bool:
        return bool(wrapper.actor.references_vertices() & disconnected)

    to_drop: list[ResourceConfig] = []
    for resource_config in self.resources:
        root = ActorWrapper(*resource_config.pipeline)
        if _mentions_disconnected(root):
            to_drop.append(resource_config)
            continue
        root.remove_descendants_if(_mentions_disconnected)
        if not any(a.references_vertices() for a in root.collect_actors()):
            to_drop.append(resource_config)

    for dropped in to_drop:
        self.resources.remove(dropped)
        self._resources.pop(dropped.name, None)
        self._runtimes.pop(dropped.name, None)
    if to_drop:
        self._rebuild_config_state()

JoinClause

Bases: ConfigBaseModel

Specification for a SQL JOIN operation.

Used by TableConnector to describe multi-table queries. Each JoinClause adds one JOIN to the generated SQL. The base row uses TableConnector.base_alias (default base), not a hard-coded name.

Attributes:

Name Type Description
table str

Table name to join (e.g. "all_classes").

schema_name str | None

Optional schema override for the joined table.

alias str | None

SQL alias for the joined table (e.g. "s", "t"). Required when the same table is joined more than once.

on_self str

Column on the base (left) table used in the ON condition.

on_other str

Column on the joined (right) table used in the ON condition.

join_type str

Type of join -- LEFT, INNER, etc. Defaults to LEFT.

select_fields list[str] | None

Explicit list of columns to SELECT from this join. When None every column of the joined table is included (aliased with the join alias prefix).

Source code in graflo/architecture/contract/bindings/connectors.py
class JoinClause(ConfigBaseModel):
    """Specification for a SQL JOIN operation.

    Used by TableConnector to describe multi-table queries. Each JoinClause
    adds one JOIN to the generated SQL. The base row uses ``TableConnector.base_alias``
    (default ``base``), not a hard-coded name.

    Attributes:
        table: Table name to join (e.g. "all_classes").
        schema_name: Optional schema override for the joined table.
        alias: SQL alias for the joined table (e.g. "s", "t"). Required when
            the same table is joined more than once.
        on_self: Column on the base (left) table used in the ON condition.
        on_other: Column on the joined (right) table used in the ON condition.
        join_type: Type of join -- LEFT, INNER, etc. Defaults to LEFT.
        select_fields: Explicit list of columns to SELECT from this join.
            When None every column of the joined table is included (aliased
            with the join alias prefix).
    """

    table: str = Field(..., description="Table name to join.")
    schema_name: str | None = Field(
        default=None, description="Schema override for the joined table."
    )
    alias: str | None = Field(
        default=None, description="SQL alias for the joined table."
    )
    on_self: str = Field(
        ..., description="Column on the base table for the ON condition."
    )
    on_other: str = Field(
        ..., description="Column on the joined table for the ON condition."
    )
    join_type: str = Field(default="LEFT", description="JOIN type (LEFT, INNER, etc.).")
    select_fields: list[str] | None = Field(
        default=None,
        description="Columns to SELECT from this join (None = all columns).",
    )

PaginationConfig

Bases: ConfigBaseModel

Configuration for API pagination (contract-level, secret-free).

Combines request construction (request) with response envelope parsing (response).

Source code in graflo/architecture/contract/bindings/connectors.py
class PaginationConfig(ConfigBaseModel):
    """Configuration for API pagination (contract-level, secret-free).

    Combines request construction (``request``) with response envelope parsing
    (``response``).
    """

    request: PaginationRequestConfig = Field(default_factory=PaginationRequestConfig)
    response: ApiResponseStructure = Field(default_factory=ApiResponseStructure)

PaginationRequestConfig

Bases: ConfigBaseModel

Configuration for building paginated HTTP requests.

Source code in graflo/architecture/contract/bindings/connectors.py
class PaginationRequestConfig(ConfigBaseModel):
    """Configuration for building paginated HTTP requests."""

    strategy: Literal["offset", "page", "cursor"] = "offset"
    offset_param: str = "offset"
    limit_param: str = Field(
        default="limit",
        description=(
            "Query parameter name for page size (offset strategy only). "
            "The value sent is ``page_size``, not a total item cap."
        ),
    )
    cursor_param: str = "cursor"
    page_param: str = "page"
    per_page_param: str = Field(
        default="per_page",
        description=(
            "Query parameter name for page size (page strategy only). "
            "The value sent is ``page_size``, not a total item cap."
        ),
    )
    initial_offset: int = 0
    initial_page: int = 1
    initial_cursor: str | None = None
    page_size: int = Field(
        default=100,
        description=(
            "Records requested per HTTP page. Sent as the value of "
            "``limit_param`` (offset) or ``per_page_param`` (page)."
        ),
    )

ProtoTransform

Bases: ConfigBaseModel

Base class for transform definitions.

This class provides the foundation for data transformations, supporting both functional transformations and declarative mappings.

Attributes:

Name Type Description
name str | None

Optional name of the transform

module str | None

Optional module containing the transform function

params dict[str, Any]

Dictionary of transform parameters

foo str | None

Optional name of the transform function

input tuple[str, ...]

Tuple of input field names

output tuple[str, ...]

Tuple of output field names

dress DressConfig | None

Optional pivot dressing for scalar functional results

target Literal['values', 'keys']

Whether to transform field values or document keys

keys KeySelectionConfig

Key selection when target is keys

_foo Any

Internal reference to the transform function

Source code in graflo/architecture/contract/ingestion/transform.py
class ProtoTransform(ConfigBaseModel):
    """Base class for transform definitions.

    This class provides the foundation for data transformations, supporting both
    functional transformations and declarative mappings.

    Attributes:
        name: Optional name of the transform
        module: Optional module containing the transform function
        params: Dictionary of transform parameters
        foo: Optional name of the transform function
        input: Tuple of input field names
        output: Tuple of output field names
        dress: Optional pivot dressing for scalar functional results
        target: Whether to transform field values or document keys
        keys: Key selection when target is keys
        _foo: Internal reference to the transform function
    """

    name: str | None = Field(
        default=None,
        description="Optional name for this transform (e.g. for reference in ingestion_model.transforms).",
    )
    module: str | None = Field(
        default=None,
        description="Python module path containing the transform function (e.g. my_package.transforms).",
    )
    params: dict[str, Any] = Field(
        default_factory=dict,
        description="Extra parameters passed to the transform function at runtime.",
    )
    foo: str | None = Field(
        default=None,
        description="Name of the callable in module to use as the transform function.",
    )
    input: tuple[str, ...] = Field(
        default_factory=tuple,
        description="Input field names passed to the transform function.",
    )
    output: tuple[str, ...] = Field(
        default_factory=tuple,
        description="Output field names produced by the transform (defaults to input if unset).",
    )
    input_groups: tuple[tuple[str, ...], ...] = Field(
        default_factory=tuple,
        description=(
            "Explicit groups of input fields for repeated tuple-style value calls."
        ),
    )
    output_groups: tuple[tuple[str, ...], ...] = Field(
        default_factory=tuple,
        description=(
            "Explicit output field groups aligned with input_groups for grouped value calls."
        ),
    )
    dress: DressConfig | None = Field(
        default=None,
        description=(
            "Dressing spec for pivoted output. Applies to ingestion_model.transforms "
            "entries and to inline transform steps. "
            "dress.key receives the input field name, dress.value receives the "
            "function result. E.g. dress={key: name, value: value} with "
            "input=(Open,) produces {name: 'Open', value: <result>}."
        ),
    )
    target: Literal["values", "keys"] = Field(
        default="values",
        description=(
            "Transform target. values=apply function to input values; "
            "keys=apply function to selected document keys."
        ),
    )
    keys: KeySelectionConfig = Field(
        default_factory=KeySelectionConfig,
        description="Key selection for key-target transforms.",
    )

    _foo: Any = PrivateAttr(default=None)

    @model_validator(mode="before")
    @classmethod
    def _normalize_input_output(cls, data: Any) -> Any:
        if not isinstance(data, dict):
            return data
        data = dict(data)
        if "dress" in data and isinstance(data["dress"], (list, tuple)):
            raise ValueError(
                "List-style `dress` is no longer supported. "
                "Use a dict: dress={key: ..., value: ...}."
            )
        for key in ("input", "output"):
            if key in data:
                if data[key] is not None:
                    data[key] = _tuple_it(data[key])
                else:
                    data[key] = ()
        for key in ("input_groups", "output_groups"):
            if key in data:
                if data[key] is None:
                    data[key] = ()
                else:
                    data[key] = _tuple_groups_it(data[key])
        _normalize_keys_in_dict(data)
        return data

    @model_validator(mode="after")
    def _init_foo_and_output(self) -> Self:
        if self.module is not None and self.foo is not None:
            try:
                _module = importlib.import_module(self.module)
            except Exception as e:
                raise TypeError(f"Provided module {self.module} is not valid: {e}")
            try:
                object.__setattr__(self, "_foo", getattr(_module, self.foo))
            except Exception as e:
                raise ValueError(
                    f"Could not instantiate transform function. Exception: {e}"
                )
        if self.dress is not None:
            if self.target == "keys":
                raise ValueError("target='keys' is not compatible with dress.")
            object.__setattr__(self, "output", (self.dress.key, self.dress.value))
        elif not self.output and self.input:
            object.__setattr__(self, "output", self.input)
        return self

    @classmethod
    def get_fields_members(cls) -> list[str]:
        """Get list of field members (public model fields)."""
        return list(cls.model_fields.keys())

    def apply(self, *args: Any, **kwargs: Any) -> Any:
        """Apply the raw transform function to the given arguments.

        This is the core function invocation without any input extraction or
        output dressing — purely ``self._foo(*args, **kwargs, **self.params)``.

        Raises:
            TransformException: If no transform function has been set.
        """
        if self._foo is None:
            raise TransformException("No transform function set")
        return self._foo(*args, **kwargs, **self.params)

    def __lt__(self, other: object) -> bool:
        """Compare transforms for ordering.

        Args:
            other: Other transform to compare with

        Returns:
            bool: True if this transform should be ordered before other
        """
        if not isinstance(other, ProtoTransform):
            return NotImplemented
        if self._foo is None and other._foo is not None:
            return True
        return False

__lt__(other)

Compare transforms for ordering.

Parameters:

Name Type Description Default
other object

Other transform to compare with

required

Returns:

Name Type Description
bool bool

True if this transform should be ordered before other

Source code in graflo/architecture/contract/ingestion/transform.py
def __lt__(self, other: object) -> bool:
    """Compare transforms for ordering.

    Args:
        other: Other transform to compare with

    Returns:
        bool: True if this transform should be ordered before other
    """
    if not isinstance(other, ProtoTransform):
        return NotImplemented
    if self._foo is None and other._foo is not None:
        return True
    return False

apply(*args, **kwargs)

Apply the raw transform function to the given arguments.

This is the core function invocation without any input extraction or output dressing — purely self._foo(*args, **kwargs, **self.params).

Raises:

Type Description
TransformException

If no transform function has been set.

Source code in graflo/architecture/contract/ingestion/transform.py
def apply(self, *args: Any, **kwargs: Any) -> Any:
    """Apply the raw transform function to the given arguments.

    This is the core function invocation without any input extraction or
    output dressing — purely ``self._foo(*args, **kwargs, **self.params)``.

    Raises:
        TransformException: If no transform function has been set.
    """
    if self._foo is None:
        raise TransformException("No transform function set")
    return self._foo(*args, **kwargs, **self.params)

get_fields_members() classmethod

Get list of field members (public model fields).

Source code in graflo/architecture/contract/ingestion/transform.py
@classmethod
def get_fields_members(cls) -> list[str]:
    """Get list of field members (public model fields)."""
    return list(cls.model_fields.keys())

ResourceConfig

Bases: ConfigBaseModel

Declarative resource definition (serializable contract).

Source code in graflo/architecture/contract/ingestion/resource.py
class ResourceConfig(ConfigBaseModel):
    """Declarative resource definition (serializable contract)."""

    model_config = {"extra": "forbid"}

    name: str = PydanticField(
        ...,
        description="Name of the resource (e.g. table or file identifier).",
    )
    pipeline: list[dict[str, Any]] = PydanticField(
        ...,
        description="Pipeline of actor steps to apply in sequence (vertex, edge, transform, descend). "
        'Each step is a dict, e.g. {"vertex": "user"} or {"edge": {"from": "a", "to": "b"}}.',
        validation_alias=AliasChoices("pipeline", "apply"),
    )
    encoding: EncodingType = PydanticField(
        default=EncodingType.UTF_8,
        description="Character encoding for input/output (e.g. utf-8, ISO-8859-1).",
    )
    merge_collections: list[str] = PydanticField(
        default_factory=list,
        description="List of collection names to merge when writing to the graph.",
    )
    extra_weights: list[ResourceExtraWeightEntry] = PydanticField(
        default_factory=list,
        description="Additional edge attribute / vertex-weight enrichment for this resource.",
    )
    types: dict[str, str] = PydanticField(
        default_factory=dict,
        description='Field name to Python type expression for casting (e.g. {"amount": "float"}).',
    )
    infer_edges: bool = PydanticField(
        default=True,
        description=(
            "If True, infer edges from current vertex population. "
            "If False, emit only edges explicitly declared as edge actors in the pipeline."
        ),
    )
    infer_edge_only: list[EdgeInferSpec] = PydanticField(
        default_factory=list,
        description=(
            "Optional allow-list for inferred edges. Applies only to inferred (greedy) edges, "
            "not explicit edge actors."
        ),
    )
    infer_edge_except: list[EdgeInferSpec] = PydanticField(
        default_factory=list,
        description=(
            "Optional deny-list for inferred edges. Applies only to inferred (greedy) edges, "
            "not explicit edge actors."
        ),
    )
    drop_trivial_input_fields: bool = PydanticField(
        default=False,
        description=(
            "If True, remove top-level input keys whose value is None or the empty string before "
            "the actor pipeline runs."
        ),
    )
    fail_fast: bool = PydanticField(
        default=False,
        description=(
            "If True, a transform step fails when required input keys are missing in the "
            "current document (rename: all source keys must be present; call: all input keys). "
            "If False (default), rename applies only to keys present in the document and "
            "functional transforms skip the step when inputs are missing."
        ),
    )
    tolerate_transform_errors: bool = PydanticField(
        default=True,
        description=(
            "If True, a failing transform step sets its declared output fields to None, "
            "records the error, and continues the pipeline."
        ),
    )

    @model_validator(mode="after")
    def _validate_policy(self) -> ResourceConfig:
        if self.infer_edge_only and self.infer_edge_except:
            raise ValueError(
                "Resource infer_edge_only and infer_edge_except are mutually exclusive."
            )
        return self

    def collect_vertex_names(self) -> set[str]:
        """Vertex types referenced by this resource (pipeline and related config)."""
        names = collect_vertex_names_from_pipeline(self.pipeline)
        names.update(self.merge_collections)
        for spec in self.infer_edge_only:
            names.add(spec.source)
            names.add(spec.target)
        for spec in self.infer_edge_except:
            names.add(spec.source)
            names.add(spec.target)
        for entry in self.extra_weights:
            names.add(entry.edge.source)
            names.add(entry.edge.target)
            for weight in entry.vertex_weights:
                if weight.name is not None:
                    names.add(weight.name)
        return names

    def pipeline_actor_count(self) -> int:
        """Count actors in the pipeline without binding schema context."""
        from graflo.architecture.pipeline.runtime.actor import ActorWrapper

        return ActorWrapper(*self.pipeline).count()

collect_vertex_names()

Vertex types referenced by this resource (pipeline and related config).

Source code in graflo/architecture/contract/ingestion/resource.py
def collect_vertex_names(self) -> set[str]:
    """Vertex types referenced by this resource (pipeline and related config)."""
    names = collect_vertex_names_from_pipeline(self.pipeline)
    names.update(self.merge_collections)
    for spec in self.infer_edge_only:
        names.add(spec.source)
        names.add(spec.target)
    for spec in self.infer_edge_except:
        names.add(spec.source)
        names.add(spec.target)
    for entry in self.extra_weights:
        names.add(entry.edge.source)
        names.add(entry.edge.target)
        for weight in entry.vertex_weights:
            if weight.name is not None:
                names.add(weight.name)
    return names

pipeline_actor_count()

Count actors in the pipeline without binding schema context.

Source code in graflo/architecture/contract/ingestion/resource.py
def pipeline_actor_count(self) -> int:
    """Count actors in the pipeline without binding schema context."""
    from graflo.architecture.pipeline.runtime.actor import ActorWrapper

    return ActorWrapper(*self.pipeline).count()

ResourceConnector

Bases: ConfigBaseModel, ABC

Abstract base class for resource connectors (files or tables).

Provides common API for connector matching and resource identification. All concrete connector types inherit from this class.

Connectors only describe source-side matching/query behavior. Resource-to- connector linkage is handled by Bindings.

Source code in graflo/architecture/contract/bindings/connectors.py
class ResourceConnector(ConfigBaseModel, abc.ABC):
    """Abstract base class for resource connectors (files or tables).

    Provides common API for connector matching and resource identification.
    All concrete connector types inherit from this class.

    Connectors only describe source-side matching/query behavior. Resource-to-
    connector linkage is handled by ``Bindings``.
    """

    name: str | None = Field(
        default=None,
        description="Optional connector name used by top-level resource_connector mapping.",
    )
    resource_name: str | None = Field(
        default=None,
        description="Optional direct resource binding declared on the connector itself.",
    )
    hash: str = Field(
        default="",
        exclude=True,
        description="Deterministic internal connector id derived from defining fields.",
    )
    row_annotations: dict[str, Any] = Field(
        default_factory=dict,
        description=(
            "Constant fields merged into every fetched row as defaults (response "
            "fields take priority). Only implemented for :class:`APIConnector`; "
            "other connector types reject non-empty values."
        ),
    )

    def _hash_payload(self) -> dict[str, Any]:
        payload = self.model_dump(
            mode="json",
            by_alias=True,
            exclude={"hash", "name", "resource_name"},
        )
        payload["_connector_type"] = type(self).__name__
        return payload

    @model_validator(mode="after")
    def _compute_hash(self) -> Self:
        canonical = json.dumps(
            self._hash_payload(), sort_keys=True, separators=(",", ":")
        )
        object.__setattr__(
            self,
            "hash",
            hashlib.sha256(canonical.encode("utf-8")).hexdigest(),
        )
        return self

    @abc.abstractmethod
    def matches(self, resource_identifier: str) -> bool:
        """Check if connector matches a resource identifier.

        Args:
            resource_identifier: Identifier to match (filename or table name)

        Returns:
            bool: True if connector matches
        """
        pass

    @abc.abstractmethod
    def bound_source_kind(self) -> BoundSourceKind:
        """Return the physical source kind for this connector."""
        pass

bound_source_kind() abstractmethod

Return the physical source kind for this connector.

Source code in graflo/architecture/contract/bindings/connectors.py
@abc.abstractmethod
def bound_source_kind(self) -> BoundSourceKind:
    """Return the physical source kind for this connector."""
    pass

matches(resource_identifier) abstractmethod

Check if connector matches a resource identifier.

Parameters:

Name Type Description Default
resource_identifier str

Identifier to match (filename or table name)

required

Returns:

Name Type Description
bool bool

True if connector matches

Source code in graflo/architecture/contract/bindings/connectors.py
@abc.abstractmethod
def matches(self, resource_identifier: str) -> bool:
    """Check if connector matches a resource identifier.

    Args:
        resource_identifier: Identifier to match (filename or table name)

    Returns:
        bool: True if connector matches
    """
    pass

ResourceRuntime

Fully initialized resource executor for document casting.

Source code in graflo/architecture/contract/runtime/resource.py
class ResourceRuntime:
    """Fully initialized resource executor for document casting."""

    def __init__(
        self,
        config: ResourceConfig,
        vertex_config: VertexConfig,
        edge_config: EdgeConfig,
        transforms: dict[str, ProtoTransform],
        *,
        strict_references: bool = False,
        dynamic_edge_feedback: bool = False,
        allowed_vertex_names: set[str] | None = None,
        target_db_flavor: DBType | None = None,
    ) -> None:
        self.config = config
        self._type_casters = resolve_type_casters(config.types)
        self._root = ActorWrapper(*config.pipeline)
        self._executor = ActorExecutor(self._root)

        runtime_vertex_config, local_edge_config = self._filter_vertex_edge_configs(
            vertex_config,
            edge_config,
            allowed_vertex_names=allowed_vertex_names,
        )
        self._vertex_config = runtime_vertex_config
        self._edge_config = local_edge_config

        self._validate_vertex_references(vertex_config)
        self._validate_infer_edge_spec_targets(self._edge_config)

        edge_derivation_registry = EdgeDerivationRegistry()
        self._edge_derivation_registry = edge_derivation_registry

        infer_edge_except = self._build_infer_except()
        init_ctx = self._build_init_context(
            transforms=transforms,
            edge_derivation=edge_derivation_registry,
            infer_edge_except=infer_edge_except,
            strict_references=strict_references,
            allowed_vertex_names=allowed_vertex_names,
            target_db_flavor=target_db_flavor,
        )
        logger.debug("total resource actor count : %s", self._root.count())
        self._root.finish_init(init_ctx=init_ctx)

        if dynamic_edge_feedback:
            self._propagate_dynamic_edges(edge_config, vertex_config=vertex_config)

        logger.debug("total resource actor count (after init): %s", self._root.count())
        self._init_extra_weights(vertex_config)

    @property
    def name(self) -> str:
        return self.config.name

    @property
    def vertex_config(self) -> VertexConfig:
        return self._vertex_config

    @property
    def edge_config(self) -> EdgeConfig:
        return self._edge_config

    @property
    def root(self) -> ActorWrapper:
        return self._root

    @property
    def type_casters(self) -> dict[str, Callable[..., Any]]:
        return self._type_casters

    def collect_vertex_names(self) -> set[str]:
        return self.config.collect_vertex_names()

    def count(self) -> int:
        return self._root.count()

    @staticmethod
    def edge_ids_from_pipeline(pipeline: list[dict[str, Any]]) -> set[EdgeId]:
        """Collect (source, target, None) for every static EdgeActor in *pipeline*."""
        root = ActorWrapper(*pipeline)
        edge_actors = [a for a in root.collect_actors() if isinstance(a, EdgeActor)]
        return {
            (ea.edge.source, ea.edge.target, None)
            for ea in edge_actors
            if ea.edge is not None
        }

    def _filter_vertex_edge_configs(
        self,
        vertex_config: VertexConfig,
        edge_config: EdgeConfig,
        *,
        allowed_vertex_names: set[str] | None,
    ) -> tuple[VertexConfig, EdgeConfig]:
        runtime_vertex_config = filter_vertex_config_for_resource(
            vertex_config,
            resource_vertex_names=self.collect_vertex_names(),
            allowed_vertex_names=allowed_vertex_names,
        )
        local_edge_config = EdgeConfig.model_validate(
            edge_config.to_dict(skip_defaults=False)
        )
        return runtime_vertex_config, local_edge_config

    def _validate_vertex_references(self, vertex_config: VertexConfig) -> None:
        known_vertices = set(vertex_config.vertex_set)
        referenced_vertices: set[str] = set()

        for spec in self.config.infer_edge_only:
            referenced_vertices.add(spec.source)
            referenced_vertices.add(spec.target)
        for spec in self.config.infer_edge_except:
            referenced_vertices.add(spec.source)
            referenced_vertices.add(spec.target)
        for source, target, _ in self.edge_ids_from_pipeline(self.config.pipeline):
            referenced_vertices.add(source)
            referenced_vertices.add(target)

        missing_vertices = sorted(referenced_vertices - known_vertices)
        if missing_vertices:
            raise ValueError(
                "Resource dynamic edge references undefined vertices: "
                f"{missing_vertices}. "
                "Declare these vertices in vertex_config before using dynamic/inferred edges."
            )

    def _validate_infer_edge_spec_targets(self, edge_config: EdgeConfig) -> None:
        known_edge_ids = {edge_id for edge_id, _ in edge_config.items()}

        def _validate_list(field_name: str, specs: list[EdgeInferSpec]) -> None:
            unknown: list[EdgeId] = []
            for spec in specs:
                if not any(spec.matches(edge_id) for edge_id in known_edge_ids):
                    unknown.append(spec.edge_id)
            if unknown:
                raise ValueError(
                    f"Resource {field_name} contains unknown edge selectors: {unknown}"
                )

        _validate_list("infer_edge_only", self.config.infer_edge_only)
        _validate_list("infer_edge_except", self.config.infer_edge_except)

    def _build_infer_except(self) -> set[EdgeId]:
        infer_edge_except = {spec.edge_id for spec in self.config.infer_edge_except}
        if not self.config.infer_edge_only:
            infer_edge_except |= self.edge_ids_from_pipeline(self.config.pipeline)
        return infer_edge_except

    def _build_init_context(
        self,
        *,
        transforms: dict[str, ProtoTransform],
        edge_derivation: EdgeDerivationRegistry,
        infer_edge_except: set[EdgeId],
        strict_references: bool,
        allowed_vertex_names: set[str] | None,
        target_db_flavor: DBType | None,
    ) -> ActorInitContext:
        return ActorInitContext(
            vertex_config=self._vertex_config,
            edge_config=self._edge_config,
            transforms=transforms,
            edge_derivation=edge_derivation,
            allowed_vertex_names=allowed_vertex_names,
            infer_edges=self.config.infer_edges,
            infer_edge_only={spec.edge_id for spec in self.config.infer_edge_only},
            infer_edge_except=infer_edge_except,
            strict_references=strict_references,
            fail_fast=self.config.fail_fast,
            tolerate_transform_errors=self.config.tolerate_transform_errors,
            target_db_flavor=target_db_flavor,
        )

    def _propagate_dynamic_edges(
        self,
        edge_config: EdgeConfig,
        *,
        vertex_config: VertexConfig,
    ) -> None:
        baseline_edge_ids = {edge_id for edge_id, _ in edge_config.items()}
        for edge_id, edge in self._edge_config.items():
            if edge_id in baseline_edge_ids:
                continue
            edge_config.update_edges(
                edge.model_copy(deep=True), vertex_config=vertex_config
            )

    def _init_extra_weights(self, vertex_config: VertexConfig) -> None:
        reg = self._edge_derivation_registry
        for entry in self.config.extra_weights:
            entry.edge.finish_init(vertex_config)
            if reg is not None and entry.vertex_weights:
                reg.merge_vertex_weights(entry.edge.edge_id, entry.vertex_weights)

    def cast_document(self, doc: dict) -> ResourceCastResult:
        """Process a document and return entities plus any tolerated transform failures."""
        work_doc: dict[str, Any] = (
            strip_trivial_top_level_fields(doc)
            if self.config.drop_trivial_input_fields
            else dict(doc)
        )
        if self._type_casters:
            apply_type_casters(work_doc, self._type_casters)
        extraction_ctx = self._executor.extract(work_doc)
        result = self._executor.assemble_result(extraction_ctx)
        return ResourceCastResult(
            entities=result.entities,
            transform_failures=list(extraction_ctx.transform_failures),
        )

    def __call__(self, doc: dict) -> defaultdict[GraphEntity, list]:
        return self.cast_document(doc).entities

cast_document(doc)

Process a document and return entities plus any tolerated transform failures.

Source code in graflo/architecture/contract/runtime/resource.py
def cast_document(self, doc: dict) -> ResourceCastResult:
    """Process a document and return entities plus any tolerated transform failures."""
    work_doc: dict[str, Any] = (
        strip_trivial_top_level_fields(doc)
        if self.config.drop_trivial_input_fields
        else dict(doc)
    )
    if self._type_casters:
        apply_type_casters(work_doc, self._type_casters)
    extraction_ctx = self._executor.extract(work_doc)
    result = self._executor.assemble_result(extraction_ctx)
    return ResourceCastResult(
        entities=result.entities,
        transform_failures=list(extraction_ctx.transform_failures),
    )

edge_ids_from_pipeline(pipeline) staticmethod

Collect (source, target, None) for every static EdgeActor in pipeline.

Source code in graflo/architecture/contract/runtime/resource.py
@staticmethod
def edge_ids_from_pipeline(pipeline: list[dict[str, Any]]) -> set[EdgeId]:
    """Collect (source, target, None) for every static EdgeActor in *pipeline*."""
    root = ActorWrapper(*pipeline)
    edge_actors = [a for a in root.collect_actors() if isinstance(a, EdgeActor)]
    return {
        (ea.edge.source, ea.edge.target, None)
        for ea in edge_actors
        if ea.edge is not None
    }

SparqlConnector

Bases: ResourceConnector

Connector for matching SPARQL / RDF data sources.

Each SparqlConnector targets instances of a single rdf:Class. It can be backed either by a remote SPARQL endpoint (Fuseki, Blazegraph, ...) or by a local RDF file parsed with rdflib.

Attributes:

Name Type Description
rdf_class str

Full URI of the rdf:Class whose instances this connector fetches (e.g. "http://example.org/Person").

endpoint_url str | None

SPARQL query endpoint URL. When set, instances are fetched via HTTP. When None the connector is for local file mode.

graph_uri str | None

Named-graph URI to restrict the query to (optional).

sparql_query str | None

Custom SPARQL SELECT query override. When provided the auto-generated per-class query is skipped.

rdf_file Path | None

Path to a local RDF file (.ttl, .rdf, .n3, .jsonld). Mutually exclusive with endpoint_url.

Source code in graflo/architecture/contract/bindings/connectors.py
class SparqlConnector(ResourceConnector):
    """Connector for matching SPARQL / RDF data sources.

    Each ``SparqlConnector`` targets instances of a single ``rdf:Class``.
    It can be backed either by a remote SPARQL endpoint (Fuseki, Blazegraph, ...)
    or by a local RDF file parsed with *rdflib*.

    Attributes:
        rdf_class: Full URI of the ``rdf:Class`` whose instances this connector
            fetches (e.g. ``"http://example.org/Person"``).
        endpoint_url: SPARQL query endpoint URL.  When set, instances are
            fetched via HTTP.  When ``None`` the connector is for local file mode.
        graph_uri: Named-graph URI to restrict the query to (optional).
        sparql_query: Custom SPARQL ``SELECT`` query override.  When provided
            the auto-generated per-class query is skipped.
        rdf_file: Path to a local RDF file (``.ttl``, ``.rdf``, ``.n3``,
            ``.jsonld``).  Mutually exclusive with *endpoint_url*.
    """

    rdf_class: str = Field(
        ..., description="URI of the rdf:Class to fetch instances of"
    )
    endpoint_url: str | None = Field(
        default=None, description="SPARQL query endpoint URL"
    )
    graph_uri: str | None = Field(
        default=None, description="Named graph URI (optional)"
    )
    sparql_query: str | None = Field(
        default=None, description="Custom SPARQL query override"
    )
    rdf_file: pathlib.Path | None = Field(
        default=None, description="Path to a local RDF file"
    )

    @model_validator(mode="after")
    def _reject_row_annotations(self) -> Self:
        # TODO: implement row_annotations for SparqlConnector row payloads.
        if self.row_annotations:
            raise ValueError("row_annotations is not implemented for SparqlConnector")
        return self

    def matches(self, resource_identifier: str) -> bool:
        """Match by the local name (fragment) of the rdf:Class URI.

        Args:
            resource_identifier: Identifier to match against

        Returns:
            True when *resource_identifier* equals the class local name
        """
        local_name = self.rdf_class.rsplit("#", 1)[-1].rsplit("/", 1)[-1]
        return resource_identifier == local_name

    def bound_source_kind(self) -> BoundSourceKind:
        """Return ``BoundSourceKind.SPARQL``."""
        return BoundSourceKind.SPARQL

    def build_select_query(self) -> str:
        """Build a SPARQL SELECT query for instances of ``rdf_class``.

        If *sparql_query* is set it is returned as-is.  Otherwise a simple
        per-class query is generated::

            SELECT ?s ?p ?o WHERE {
              ?s a <rdf_class> .
              ?s ?p ?o .
            }

        Returns:
            SPARQL query string
        """
        if self.sparql_query:
            return self.sparql_query

        graph_open = f"GRAPH <{self.graph_uri}> {{" if self.graph_uri else ""
        graph_close = "}" if self.graph_uri else ""

        return (
            "SELECT ?s ?p ?o WHERE { "
            f"{graph_open} "
            f"?s a <{self.rdf_class}> . "
            f"?s ?p ?o . "
            f"{graph_close} "
            "}"
        )

bound_source_kind()

Return BoundSourceKind.SPARQL.

Source code in graflo/architecture/contract/bindings/connectors.py
def bound_source_kind(self) -> BoundSourceKind:
    """Return ``BoundSourceKind.SPARQL``."""
    return BoundSourceKind.SPARQL

build_select_query()

Build a SPARQL SELECT query for instances of rdf_class.

If sparql_query is set it is returned as-is. Otherwise a simple per-class query is generated::

SELECT ?s ?p ?o WHERE {
  ?s a <rdf_class> .
  ?s ?p ?o .
}

Returns:

Type Description
str

SPARQL query string

Source code in graflo/architecture/contract/bindings/connectors.py
def build_select_query(self) -> str:
    """Build a SPARQL SELECT query for instances of ``rdf_class``.

    If *sparql_query* is set it is returned as-is.  Otherwise a simple
    per-class query is generated::

        SELECT ?s ?p ?o WHERE {
          ?s a <rdf_class> .
          ?s ?p ?o .
        }

    Returns:
        SPARQL query string
    """
    if self.sparql_query:
        return self.sparql_query

    graph_open = f"GRAPH <{self.graph_uri}> {{" if self.graph_uri else ""
    graph_close = "}" if self.graph_uri else ""

    return (
        "SELECT ?s ?p ?o WHERE { "
        f"{graph_open} "
        f"?s a <{self.rdf_class}> . "
        f"?s ?p ?o . "
        f"{graph_close} "
        "}"
    )

matches(resource_identifier)

Match by the local name (fragment) of the rdf:Class URI.

Parameters:

Name Type Description Default
resource_identifier str

Identifier to match against

required

Returns:

Type Description
bool

True when resource_identifier equals the class local name

Source code in graflo/architecture/contract/bindings/connectors.py
def matches(self, resource_identifier: str) -> bool:
    """Match by the local name (fragment) of the rdf:Class URI.

    Args:
        resource_identifier: Identifier to match against

    Returns:
        True when *resource_identifier* equals the class local name
    """
    local_name = self.rdf_class.rsplit("#", 1)[-1].rsplit("/", 1)[-1]
    return resource_identifier == local_name

TableConnector

Bases: ResourceConnector

Connector for matching database tables.

Supports simple single-table queries as well as multi-table JOINs and pushdown filters via FilterExpression.

Attributes:

Name Type Description
table_name str

Exact table name or regex pattern

schema_name str | None

Schema name (optional, defaults to public)

database str | None

Database name (optional)

time_filter ColumnTimeFilter | None

Optional structured filter on a date/time column, rendered via :class:~graflo.filter.onto.FilterExpression in SQL.

filters list[Any]

General-purpose pushdown filters rendered as SQL WHERE fragments.

joins list[JoinClause]

Multi-table JOIN specifications (auto-generated or explicit).

base_alias str

SQL alias for the base table when joins is non-empty.

select_columns list[str] | None

Explicit SELECT column list. None means * for the base table (plus aliased columns from joins).

Source code in graflo/architecture/contract/bindings/connectors.py
class TableConnector(ResourceConnector):
    """Connector for matching database tables.

    Supports simple single-table queries as well as multi-table JOINs and
    pushdown filters via ``FilterExpression``.

    Attributes:
        table_name: Exact table name or regex pattern
        schema_name: Schema name (optional, defaults to public)
        database: Database name (optional)
        time_filter: Optional structured filter on a date/time column, rendered
            via :class:`~graflo.filter.onto.FilterExpression` in SQL.
        filters: General-purpose pushdown filters rendered as SQL WHERE fragments.
        joins: Multi-table JOIN specifications (auto-generated or explicit).
        base_alias: SQL alias for the base table when ``joins`` is non-empty.
        select_columns: Explicit SELECT column list. None means ``*`` for the
            base table (plus aliased columns from joins).
    """

    table_name: str = Field(
        default="", validation_alias=AliasChoices("table_name", "table")
    )
    schema_name: str | None = Field(
        default=None, validation_alias=AliasChoices("schema_name", "schema")
    )
    database: str | None = None
    time_filter: ColumnTimeFilter | None = None
    filters: list[Any] = Field(
        default_factory=list,
        description="Pushdown FilterExpression filters (rendered to SQL WHERE).",
    )
    joins: list[JoinClause] = Field(
        default_factory=list,
        description="JOIN clauses for multi-table queries.",
    )
    base_alias: str = Field(
        default="base",
        description="SQL alias for the base table row when joins are present.",
    )
    select_columns: list[str] | None = Field(
        default=None,
        description="Explicit SELECT columns. None = SELECT * (plus join aliases).",
    )
    view: Any = Field(
        default=None,
        description="SelectSpec or dict for declarative view (alternative to table+joins+filters).",
    )

    @field_validator("filters", mode="before")
    @classmethod
    def _coerce_filters(cls, v: Any) -> list[Any]:
        from graflo.filter.onto import parse_filter_expression

        if v is None:
            return []
        if not isinstance(v, list):
            raise ValueError("filters must be a list")
        result: list[Any] = []
        for i, item in enumerate(v):
            try:
                result.append(parse_filter_expression(item))
            except (ValueError, ValidationError) as e:
                raise ValueError(f"filters[{i}]: {e}") from e
        return result

    @field_validator("view", mode="before")
    @classmethod
    def _coerce_view(cls, v: Any) -> Any:
        if v is None:
            return None
        if isinstance(v, dict):
            from graflo.filter.select import SelectSpec

            return SelectSpec.from_dict(v)
        return v

    @model_validator(mode="after")
    def _validate_table_connector(self) -> Self:
        """Validate table_name and join wiring."""
        if not self.table_name:
            raise ValueError("table_name is required for TableConnector")
        if not _BASE_TABLE_ALIAS_IDENT.match(self.base_alias):
            raise ValueError(
                "base_alias must be a simple SQL identifier "
                "(ASCII letter, digit, underscore)"
            )
        join_aliases = {jc.alias or jc.table for jc in self.joins}
        if self.base_alias in join_aliases:
            raise ValueError(
                f"base_alias {self.base_alias!r} conflicts with a join alias "
                f"{sorted(join_aliases)}"
            )
        if self.row_annotations:
            raise ValueError("row_annotations is not implemented for TableConnector")
        return self

    @property
    def date_field(self) -> str | None:
        """Column used for time filtering, if any (compat alias for ``time_filter.column``)."""
        return self.time_filter.column if self.time_filter else None

    def matches(self, resource_identifier: str) -> bool:
        """Check if connector matches a table name.

        Args:
            resource_identifier: Table name to match (format: schema.table or just table)

        Returns:
            bool: True if connector matches
        """
        if not self.table_name:
            return False

        # Compile regex expression
        if self.table_name.startswith("^") or self.table_name.endswith("$"):
            # Already a regex expression
            compiled_regex = re.compile(self.table_name)
        else:
            # Exact match expression
            compiled_regex = re.compile(f"^{re.escape(self.table_name)}$")

        # Check if resource_identifier matches
        if compiled_regex.match(resource_identifier):
            return True

        # If schema_name is specified, also check schema.table format
        if self.schema_name:
            full_name = f"{self.schema_name}.{resource_identifier}"
            if compiled_regex.match(full_name):
                return True

        return False

    def bound_source_kind(self) -> BoundSourceKind:
        return BoundSourceKind.SQL_TABLE

    def build_where_clause(self, base_alias: str | None = None) -> str:
        """Build SQL WHERE clause from time filter **and** general filters.

        Returns:
            WHERE clause string (without the WHERE keyword) or empty string if no filters
        """
        from graflo.onto import ExpressionFlavor

        conditions: list[str] = []

        if self.time_filter is not None:
            expr = self.time_filter.as_filter_expression()
            if expr is not None:
                filt_expr = self._coerce_filter_expression(expr, base_alias)
                if filt_expr is not None:
                    rendered = filt_expr(kind=ExpressionFlavor.SQL)
                    if rendered:
                        conditions.append(str(rendered))

        # General-purpose FilterExpression filters
        for filt in self.filters:
            filt_expr = self._coerce_filter_expression(filt, base_alias)
            if filt_expr is not None:
                rendered = filt_expr(kind=ExpressionFlavor.SQL)
                if rendered:
                    conditions.append(str(rendered))

        if conditions:
            return " AND ".join(conditions)
        return ""

    def build_query(self, effective_schema: str | None = None) -> str:
        """Build a complete SQL SELECT query.

        When ``view`` is set, delegates to ``view.build_sql()``. Otherwise
        incorporates the base table, any JoinClauses, explicit select_columns,
        time_filter, and FilterExpression filters.

        Args:
            effective_schema: Schema to use if ``self.schema_name`` is None.

        Returns:
            Complete SQL query string.
        """
        schema = self.schema_name or effective_schema or "public"
        if self.view is not None:
            from graflo.filter.select import SelectSpec

            if isinstance(self.view, SelectSpec):
                query = self.view.build_sql(schema=schema, base_table=self.table_name)
                where = self.build_where_clause(base_alias=self.view.base_alias)
                if where:
                    return self._append_where_condition(query, where)
                return query
        base_alias = self.base_alias if self.joins else None
        base_ref = f'"{schema}"."{self.table_name}"'
        if base_alias:
            base_ref_aliased = f"{base_ref} {base_alias}"
        else:
            base_ref_aliased = base_ref

        # --- SELECT ---
        select_parts: list[str] = []
        if self.select_columns is not None:
            select_parts = list(self.select_columns)
        elif self.joins:
            select_parts.append(f"{base_alias}.*")
            for jc in self.joins:
                alias = jc.alias or jc.table
                jc_schema = jc.schema_name or schema
                if jc.select_fields is not None:
                    for col in jc.select_fields:
                        select_parts.append(f'{alias}."{col}" AS "{alias}__{col}"')
                else:
                    select_parts.append(f"{alias}.*")
        else:
            select_parts.append("*")

        select_clause = ", ".join(select_parts)

        # --- FROM + JOINs ---
        from_clause = base_ref_aliased
        for jc in self.joins:
            jc_schema = jc.schema_name or schema
            alias = jc.alias or jc.table
            join_ref = f'"{jc_schema}"."{jc.table}"'
            left_col = (
                f'{base_alias}."{jc.on_self}"' if base_alias else f'"{jc.on_self}"'
            )
            right_col = f'{alias}."{jc.on_other}"'
            from_clause += (
                f" {jc.join_type} JOIN {join_ref} {alias} ON {left_col} = {right_col}"
            )

        query = f"SELECT {select_clause} FROM {from_clause}"

        # --- WHERE ---
        where = self.build_where_clause(base_alias=base_alias)
        if where:
            query += f" WHERE {where}"

        return query

    @staticmethod
    def _append_where_condition(query: str, condition: str) -> str:
        """Append a SQL condition to *query* preserving an existing WHERE clause."""
        if re.search(r"\bWHERE\b", query, flags=re.IGNORECASE):
            return f"{query} AND {condition}"
        return f"{query} WHERE {condition}"

    @staticmethod
    def _qualified_column_ref(column: str, base_alias: str | None) -> str:
        if base_alias:
            return f'{base_alias}."{column}"'
        return f'"{column}"'

    @classmethod
    def _qualify_filter_payload(
        cls, payload: dict[str, Any], base_alias: str | None
    ) -> dict[str, Any]:
        qualified = dict(payload)
        if base_alias is None:
            return qualified
        if qualified.get("kind") == "leaf":
            field = qualified.get("field")
            if isinstance(field, str) and "." not in field:
                qualified["field"] = f"{base_alias}.{field}"
            return qualified
        deps = qualified.get("deps")
        if isinstance(deps, list):
            qualified["deps"] = [
                cls._qualify_filter_payload(dep, base_alias)
                if isinstance(dep, dict)
                else dep
                for dep in deps
            ]
        return qualified

    @classmethod
    def _coerce_filter_expression(
        cls, raw_filter: Any, base_alias: str | None
    ) -> FilterExpression | None:
        from graflo.filter.onto import parse_filter_expression

        if raw_filter is None:
            return None
        expr = parse_filter_expression(raw_filter)
        if base_alias is None:
            return expr
        payload = expr.model_dump(mode="python")
        return parse_filter_expression(cls._qualify_filter_payload(payload, base_alias))

date_field property

Column used for time filtering, if any (compat alias for time_filter.column).

build_query(effective_schema=None)

Build a complete SQL SELECT query.

When view is set, delegates to view.build_sql(). Otherwise incorporates the base table, any JoinClauses, explicit select_columns, time_filter, and FilterExpression filters.

Parameters:

Name Type Description Default
effective_schema str | None

Schema to use if self.schema_name is None.

None

Returns:

Type Description
str

Complete SQL query string.

Source code in graflo/architecture/contract/bindings/connectors.py
def build_query(self, effective_schema: str | None = None) -> str:
    """Build a complete SQL SELECT query.

    When ``view`` is set, delegates to ``view.build_sql()``. Otherwise
    incorporates the base table, any JoinClauses, explicit select_columns,
    time_filter, and FilterExpression filters.

    Args:
        effective_schema: Schema to use if ``self.schema_name`` is None.

    Returns:
        Complete SQL query string.
    """
    schema = self.schema_name or effective_schema or "public"
    if self.view is not None:
        from graflo.filter.select import SelectSpec

        if isinstance(self.view, SelectSpec):
            query = self.view.build_sql(schema=schema, base_table=self.table_name)
            where = self.build_where_clause(base_alias=self.view.base_alias)
            if where:
                return self._append_where_condition(query, where)
            return query
    base_alias = self.base_alias if self.joins else None
    base_ref = f'"{schema}"."{self.table_name}"'
    if base_alias:
        base_ref_aliased = f"{base_ref} {base_alias}"
    else:
        base_ref_aliased = base_ref

    # --- SELECT ---
    select_parts: list[str] = []
    if self.select_columns is not None:
        select_parts = list(self.select_columns)
    elif self.joins:
        select_parts.append(f"{base_alias}.*")
        for jc in self.joins:
            alias = jc.alias or jc.table
            jc_schema = jc.schema_name or schema
            if jc.select_fields is not None:
                for col in jc.select_fields:
                    select_parts.append(f'{alias}."{col}" AS "{alias}__{col}"')
            else:
                select_parts.append(f"{alias}.*")
    else:
        select_parts.append("*")

    select_clause = ", ".join(select_parts)

    # --- FROM + JOINs ---
    from_clause = base_ref_aliased
    for jc in self.joins:
        jc_schema = jc.schema_name or schema
        alias = jc.alias or jc.table
        join_ref = f'"{jc_schema}"."{jc.table}"'
        left_col = (
            f'{base_alias}."{jc.on_self}"' if base_alias else f'"{jc.on_self}"'
        )
        right_col = f'{alias}."{jc.on_other}"'
        from_clause += (
            f" {jc.join_type} JOIN {join_ref} {alias} ON {left_col} = {right_col}"
        )

    query = f"SELECT {select_clause} FROM {from_clause}"

    # --- WHERE ---
    where = self.build_where_clause(base_alias=base_alias)
    if where:
        query += f" WHERE {where}"

    return query

build_where_clause(base_alias=None)

Build SQL WHERE clause from time filter and general filters.

Returns:

Type Description
str

WHERE clause string (without the WHERE keyword) or empty string if no filters

Source code in graflo/architecture/contract/bindings/connectors.py
def build_where_clause(self, base_alias: str | None = None) -> str:
    """Build SQL WHERE clause from time filter **and** general filters.

    Returns:
        WHERE clause string (without the WHERE keyword) or empty string if no filters
    """
    from graflo.onto import ExpressionFlavor

    conditions: list[str] = []

    if self.time_filter is not None:
        expr = self.time_filter.as_filter_expression()
        if expr is not None:
            filt_expr = self._coerce_filter_expression(expr, base_alias)
            if filt_expr is not None:
                rendered = filt_expr(kind=ExpressionFlavor.SQL)
                if rendered:
                    conditions.append(str(rendered))

    # General-purpose FilterExpression filters
    for filt in self.filters:
        filt_expr = self._coerce_filter_expression(filt, base_alias)
        if filt_expr is not None:
            rendered = filt_expr(kind=ExpressionFlavor.SQL)
            if rendered:
                conditions.append(str(rendered))

    if conditions:
        return " AND ".join(conditions)
    return ""

matches(resource_identifier)

Check if connector matches a table name.

Parameters:

Name Type Description Default
resource_identifier str

Table name to match (format: schema.table or just table)

required

Returns:

Name Type Description
bool bool

True if connector matches

Source code in graflo/architecture/contract/bindings/connectors.py
def matches(self, resource_identifier: str) -> bool:
    """Check if connector matches a table name.

    Args:
        resource_identifier: Table name to match (format: schema.table or just table)

    Returns:
        bool: True if connector matches
    """
    if not self.table_name:
        return False

    # Compile regex expression
    if self.table_name.startswith("^") or self.table_name.endswith("$"):
        # Already a regex expression
        compiled_regex = re.compile(self.table_name)
    else:
        # Exact match expression
        compiled_regex = re.compile(f"^{re.escape(self.table_name)}$")

    # Check if resource_identifier matches
    if compiled_regex.match(resource_identifier):
        return True

    # If schema_name is specified, also check schema.table format
    if self.schema_name:
        full_name = f"{self.schema_name}.{resource_identifier}"
        if compiled_regex.match(full_name):
            return True

    return False

Transform

Bases: ProtoTransform

Concrete transform implementation.

Wraps a ProtoTransform with input extraction, output dressing, field mapping, and transform composition.

Attributes:

Name Type Description
fields tuple[str, ...]

Tuple of fields to transform

rename dict[str, str]

Dictionary mapping input fields to output fields

functional_transform bool

Whether this is a functional transform

Source code in graflo/architecture/contract/ingestion/transform.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
class Transform(ProtoTransform):
    """Concrete transform implementation.

    Wraps a ProtoTransform with input extraction, output dressing, field
    mapping, and transform composition.

    Attributes:
        fields: Tuple of fields to transform
        rename: Dictionary mapping input fields to output fields
        functional_transform: Whether this is a functional transform
    """

    fields: tuple[str, ...] = Field(
        default_factory=tuple,
        description="Field names for declarative transform (used to derive input when input unset).",
    )
    rename: dict[str, str] = Field(
        default_factory=dict,
        description="Mapping of input_key -> output_key for pure field renaming (no function).",
    )
    strategy: Literal["single", "each", "all"] = Field(
        default="single",
        description=(
            "Functional call strategy. "
            "single: call function once with all input values. "
            "each: call function once per input field (unary). "
            "all: pass full document as a single argument."
        ),
    )
    passthrough_group_output: bool = Field(
        default=True,
        description=(
            "When grouped mode omits outputs, map function results back to input group keys."
        ),
    )

    functional_transform: bool = Field(
        default=False,
        description="True when a callable (module.foo) is set; False for pure map/dress transforms.",
    )

    @model_validator(mode="before")
    @classmethod
    def _normalize_fields(cls, data: Any) -> Any:
        if not isinstance(data, dict):
            return data
        data = dict(data)
        if "fields" in data and data["fields"] is not None:
            data["fields"] = _tuple_it(data["fields"])
        if "switch" in data:
            raise ValueError(
                "Legacy `switch` is no longer supported. Use `input` + `dress`."
            )
        return data

    @model_validator(mode="after")
    def _init_derived(self) -> Self:
        explicit_map = bool(self.rename)
        object.__setattr__(self, "functional_transform", self._foo is not None)
        next_input, next_output, _next_map = self._derive_effective_io_and_map()
        object.__setattr__(self, "input", next_input)
        object.__setattr__(self, "output", next_output)
        self._validate_configuration(explicit_map=explicit_map)
        return self

    def _derive_grouped_default_output(self) -> tuple[str, ...]:
        if not self.input_groups or self.output or self.output_groups:
            return self.output
        if not self.passthrough_group_output:
            return self.output
        scalar_names: list[str] = []
        for group in self.input_groups:
            if len(group) != 1:
                return self.output
            scalar_names.append(group[0])
        return tuple(scalar_names) if scalar_names else self.output

    def _derive_effective_io_and_map(
        self,
    ) -> tuple[tuple[str, ...], tuple[str, ...], dict[str, str]]:
        """Compute effective input/output/map once using explicit precedence."""
        next_input = self.input
        next_output = self._derive_grouped_default_output()
        next_map = dict(self.rename)

        if self.fields and not next_input:
            next_input = self.fields

        if next_map:
            if not next_input and not next_output:
                next_input = tuple(next_map.keys())
                next_output = tuple(next_map.values())
            elif not next_input:
                next_input = tuple(next_map.keys())
            elif not next_output:
                next_output = tuple(next_map.values())

        if self.dress is not None:
            next_output = (self.dress.key, self.dress.value)
        elif not next_output and next_input:
            next_output = next_input

        if (
            not next_map
            and next_input
            and next_output
            and len(next_input) == len(next_output)
        ):
            next_map = {src: dst for src, dst in zip(next_input, next_output)}

        return next_input, next_output, next_map

    def _init_io_from_map(self, force_init: bool = False) -> None:
        """Backwards-compatible shim; prefer sync_io_from_map()."""
        if not self.rename:
            return
        map_input = tuple(self.rename.keys())
        map_output = tuple(self.rename.values())
        if force_init or (not self.input and not self.output):
            object.__setattr__(self, "input", map_input)
            object.__setattr__(self, "output", map_output)
            return
        if not self.input:
            object.__setattr__(self, "input", map_input)
        elif not self.output:
            object.__setattr__(self, "output", map_output)

    def _validate_configuration(self, *, explicit_map: bool) -> None:
        """Validate that the transform has enough information to operate."""
        if self.target == "keys":
            if self.input_groups or self.output_groups:
                raise ValueError(
                    "target='keys' does not accept input_groups/output_groups."
                )
            if self._foo is None:
                raise ValueError("target='keys' requires a functional transform.")
            if self.rename:
                raise ValueError("target='keys' cannot be combined with map.")
            if self.input or self.output or self.fields:
                raise ValueError(
                    "target='keys' does not accept input/output/fields; use keys selector."
                )
            if self.dress is not None:
                raise ValueError("target='keys' is not compatible with dress.")
            if self.strategy != "single":
                raise ValueError(
                    "target='keys' uses implicit per-key execution and does not accept strategy."
                )
            return

        # Reject only user-specified map+function conflict. A derived map
        # (from input/output defaults) is valid for functional transforms.
        if explicit_map and self.rename and self._foo is not None:
            raise ValueError("map and functional transform cannot be used together.")
        if self.dress is not None:
            if len(self.input) != 1:
                raise ValueError("dress requires exactly one input field.")
        if self.strategy != "single" and self._foo is None:
            raise ValueError("strategy applies only to functional transforms.")
        if self.input_groups:
            if self._foo is None:
                raise ValueError(
                    "input_groups requires a functional transform (module + foo)."
                )
            if self.strategy != "single":
                raise ValueError(
                    "input_groups mode is explicit grouped execution and does not accept strategy."
                )
            if self.input or self.fields:
                raise ValueError("input_groups cannot be combined with input/fields.")
            if self.rename:
                raise ValueError("input_groups cannot be combined with map.")
            if self.dress is not None:
                raise ValueError("input_groups is not compatible with dress.")
            if self.output_groups and self.output:
                raise ValueError(
                    "Provide either output or output_groups for input_groups mode, not both."
                )
            if self.output_groups and len(self.output_groups) != len(self.input_groups):
                raise ValueError(
                    "output_groups must have same number of groups as input_groups."
                )
            if self.output and len(self.output) != len(self.input_groups):
                raise ValueError(
                    "When using input_groups with scalar outputs, output length must match number of input_groups."
                )
        elif self.output_groups:
            raise ValueError("output_groups requires input_groups.")
        if self._foo is not None and not self.input:
            if self.strategy != "all" and not self.input_groups:
                raise ValueError(
                    "Functional transforms require `input` (string or list of field names)."
                )
        if self.strategy == "all":
            if self.input or self.fields:
                raise ValueError("strategy='all' does not accept input/fields.")
            if self.dress is not None:
                raise ValueError("strategy='all' is not compatible with dress.")
        if self.strategy == "each":
            if not self.input:
                raise ValueError("strategy='each' requires one or more input fields.")
            if self.output and len(self.input) != len(self.output):
                raise ValueError(
                    "strategy='each' requires output length to match input length."
                )
        if (
            self._foo is None
            and self.dress is None
            and self.input
            and self.output
            and len(self.input) != len(self.output)
        ):
            raise ValueError(
                "Non-functional transforms require input/output to have the same length."
            )
        if (
            not self.input
            and not self.output
            and not self.input_groups
            and not self.output_groups
            and not self.name
            and not (self._foo is not None and self.strategy == "all")
        ):
            raise ValueError(
                "Either input/output, fields, map or name must be provided in "
                "Transform constructor."
            )

    def _refresh_derived(self) -> None:
        """Re-run derived input/output after mutating attributes (merge_from)."""
        if self.rename or not self.input or not self.output:
            return
        if len(self.input) != len(self.output):
            return

    def __call__(self, *nargs: Any, **kwargs: Any) -> dict[str, Any] | Any:
        """Execute the transform.

        Args:
            *nargs: Positional arguments for the transform
            **kwargs: Keyword arguments for the transform

        Returns:
            dict: Transformed data
        """
        if self.target == "keys":
            input_doc = nargs[0] if nargs and isinstance(nargs[0], dict) else None
            if input_doc is None:
                raise TransformException(
                    "target='keys' requires a document dictionary."
                )
            return self._transform_keys(input_doc, **kwargs)

        if self.input_groups:
            input_doc = nargs[0] if nargs and isinstance(nargs[0], dict) else None
            if input_doc is None:
                raise TransformException(
                    "input_groups transforms require a document dictionary."
                )
            return self._transform_input_groups(input_doc, **kwargs)

        if self.is_mapping:
            input_doc = nargs[0]
            if isinstance(input_doc, dict):
                if self.rename:
                    present = {
                        self.rename[src]: input_doc[src]
                        for src in self.rename
                        if src in input_doc
                    }
                    return present
                output_values = [input_doc[k] for k in self.input]
            else:
                output_values = list(nargs)
            if self.dress is not None and len(output_values) == 1:
                # Non-functional dress shorthand: keep scalar value.
                output_values = output_values[0]
        else:
            if self.strategy == "all":
                if nargs and isinstance(nargs[0], dict):
                    output_values = self.apply(nargs[0], **kwargs)
                else:
                    output_values = self.apply(*nargs, **kwargs)
            elif self.strategy == "each":
                if nargs and isinstance(input_doc := nargs[0], dict):
                    output_values = [
                        self.apply(input_doc[k], **kwargs) for k in self.input
                    ]
                else:
                    output_values = [self.apply(value, **kwargs) for value in nargs]
            else:
                if nargs and isinstance(input_doc := nargs[0], dict):
                    new_args = [input_doc[k] for k in self.input]
                    output_values = self.apply(*new_args, **kwargs)
                else:
                    output_values = self.apply(*nargs, **kwargs)

        if self.output:
            r = self._dress_as_dict(output_values)
        else:
            r = output_values
        return r

    def _apply_grouped_result(
        self,
        out: dict[str, Any],
        result: Any,
        input_group: tuple[str, ...],
        output_group: tuple[str, ...] | None,
        *,
        group_index: int,
    ) -> None:
        if output_group is not None:
            if isinstance(result, (list, tuple)):
                values = list(result)
            else:
                values = [result]
            if len(values) != len(output_group):
                raise TransformException(
                    f"input_groups[{group_index}] produced {len(values)} values, "
                    f"but output_groups[{group_index}] expects {len(output_group)}."
                )
            pairs = zip(output_group, values)
        elif self.output:
            pairs = ((self.output[group_index], result),)
        else:
            if isinstance(result, (list, tuple)):
                values = list(result)
                if len(values) != len(input_group):
                    raise TransformException(
                        f"input_groups[{group_index}] has {len(input_group)} fields, "
                        f"but transform returned {len(values)} values. "
                        "Provide output/output_groups explicitly to resolve mapping."
                    )
                pairs = zip(input_group, values)
            else:
                if len(input_group) != 1:
                    raise TransformException(
                        f"input_groups[{group_index}] has {len(input_group)} fields "
                        "but transform returned a scalar. "
                        "Provide output/output_groups explicitly for scalar group results."
                    )
                pairs = ((input_group[0], result),)
        for key, value in pairs:
            if key in out:
                raise TransformException(
                    f"Grouped transform produced duplicate output key '{key}'."
                )
            out[key] = value

    def _transform_input_groups(
        self, doc: dict[str, Any], **kwargs: Any
    ) -> dict[str, Any]:
        out: dict[str, Any] = {}
        for idx, input_group in enumerate(self.input_groups):
            values = [doc[k] for k in input_group]
            result = self.apply(*values, **kwargs)
            output_group = self.output_groups[idx] if self.output_groups else None
            self._apply_grouped_result(
                out,
                result,
                input_group,
                output_group,
                group_index=idx,
            )
        return out

    @property
    def is_mapping(self) -> bool:
        """True when the transform is pure mapping (no function)."""
        return self._foo is None

    def planned_output_field_names(
        self, doc: dict[str, Any] | None = None
    ) -> tuple[str, ...]:
        """Return output field names this transform would write on success."""
        if self.target == "keys":
            if doc is None:
                return ()
            return tuple(sorted(self._selected_keys(doc)))

        if self.input_groups:
            if self.output_groups:
                names: list[str] = []
                for group in self.output_groups:
                    names.extend(group)
                return tuple(dict.fromkeys(names))
            if self.output:
                return self.output
            scalar_names: list[str] = []
            for group in self.input_groups:
                if len(group) != 1:
                    return ()
                scalar_names.append(group[0])
            return tuple(scalar_names)

        if self.dress is not None:
            return (self.dress.key, self.dress.value)

        if self.rename:
            if doc is None:
                return tuple(self.rename.values())
            return tuple(self.rename[src] for src in self.rename if src in doc)

        if self.output:
            return self.output

        return ()

    def _dress_as_dict(self, transform_result: Any) -> dict[str, Any]:
        """Convert transform result to dictionary format.

        When ``dress`` is set the result is pivoted: the input field name is
        stored under ``dress.key`` and the function result under ``dress.value``.
        Otherwise the result is mapped positionally to ``output`` fields.
        """
        if self.dress is not None:
            return {
                self.dress.key: self.input[0],
                self.dress.value: transform_result,
            }
        elif isinstance(transform_result, (list, tuple)):
            return {k: v for k, v in zip(self.output, transform_result)}
        else:
            return {self.output[-1]: transform_result}

    def _selected_keys(self, doc: dict[str, Any]) -> set[str]:
        if self.keys.mode == "all":
            return set(doc.keys())
        selected = set(self.keys.names)
        if self.keys.mode == "include":
            return selected
        return {k for k in doc if k not in selected}

    def _transform_keys(self, doc: dict[str, Any], **kwargs: Any) -> dict[str, Any]:
        selected = self._selected_keys(doc)
        out: dict[str, Any] = {}
        for key, value in doc.items():
            new_key = self.apply(key, **kwargs) if key in selected else key
            if not isinstance(new_key, str):
                raise TransformException(
                    "Key transform functions must return str values."
                )
            if new_key in out:
                raise TransformException(
                    f"Key transform collision detected for key '{new_key}'."
                )
            out[new_key] = value
        return out

    @property
    def is_dummy(self) -> bool:
        """Check if this is a dummy transform.

        Returns:
            bool: True if this is a dummy transform
        """
        return self.name is not None and not self.rename and self._foo is None

    def merge_from(self, t: Transform) -> Transform:
        """Merge another transform's configuration into a copy of it.

        Returns a new Transform with values from self overriding t where set.
        Does not override ConfigBaseModel.update (in-place); use this for
        copy-and-merge semantics.

        Args:
            t: Transform to merge from

        Returns:
            Transform: New transform with merged configuration
        """
        t_copy = deepcopy(t)
        if self.input:
            t_copy.input = self.input
        if self.output:
            t_copy.output = self.output
        if self.params:
            t_copy.params = {**t_copy.params, **self.params}
        t_copy._refresh_derived()
        return t_copy

    def get_barebone(
        self, other: Transform | None
    ) -> tuple[Transform | None, Transform | None]:
        """Get the barebone transform configuration.

        Args:
            other: Optional transform to use as base

        Returns:
            tuple[Transform | None, Transform | None]: Updated self transform
            and transform to store in library
        """
        self_param = self.to_dict(exclude_defaults=True)
        if self.foo is not None:
            # self will be the lib transform
            return None, self
        elif other is not None and other.foo is not None:
            # init self from other
            self_param.pop("foo", None)
            self_param.pop("module", None)
            other_param = other.to_dict(exclude_defaults=True)
            other_param.update(self_param)
            return Transform(**other_param), None
        else:
            return None, None

is_dummy property

Check if this is a dummy transform.

Returns:

Name Type Description
bool bool

True if this is a dummy transform

is_mapping property

True when the transform is pure mapping (no function).

__call__(*nargs, **kwargs)

Execute the transform.

Parameters:

Name Type Description Default
*nargs Any

Positional arguments for the transform

()
**kwargs Any

Keyword arguments for the transform

{}

Returns:

Name Type Description
dict dict[str, Any] | Any

Transformed data

Source code in graflo/architecture/contract/ingestion/transform.py
def __call__(self, *nargs: Any, **kwargs: Any) -> dict[str, Any] | Any:
    """Execute the transform.

    Args:
        *nargs: Positional arguments for the transform
        **kwargs: Keyword arguments for the transform

    Returns:
        dict: Transformed data
    """
    if self.target == "keys":
        input_doc = nargs[0] if nargs and isinstance(nargs[0], dict) else None
        if input_doc is None:
            raise TransformException(
                "target='keys' requires a document dictionary."
            )
        return self._transform_keys(input_doc, **kwargs)

    if self.input_groups:
        input_doc = nargs[0] if nargs and isinstance(nargs[0], dict) else None
        if input_doc is None:
            raise TransformException(
                "input_groups transforms require a document dictionary."
            )
        return self._transform_input_groups(input_doc, **kwargs)

    if self.is_mapping:
        input_doc = nargs[0]
        if isinstance(input_doc, dict):
            if self.rename:
                present = {
                    self.rename[src]: input_doc[src]
                    for src in self.rename
                    if src in input_doc
                }
                return present
            output_values = [input_doc[k] for k in self.input]
        else:
            output_values = list(nargs)
        if self.dress is not None and len(output_values) == 1:
            # Non-functional dress shorthand: keep scalar value.
            output_values = output_values[0]
    else:
        if self.strategy == "all":
            if nargs and isinstance(nargs[0], dict):
                output_values = self.apply(nargs[0], **kwargs)
            else:
                output_values = self.apply(*nargs, **kwargs)
        elif self.strategy == "each":
            if nargs and isinstance(input_doc := nargs[0], dict):
                output_values = [
                    self.apply(input_doc[k], **kwargs) for k in self.input
                ]
            else:
                output_values = [self.apply(value, **kwargs) for value in nargs]
        else:
            if nargs and isinstance(input_doc := nargs[0], dict):
                new_args = [input_doc[k] for k in self.input]
                output_values = self.apply(*new_args, **kwargs)
            else:
                output_values = self.apply(*nargs, **kwargs)

    if self.output:
        r = self._dress_as_dict(output_values)
    else:
        r = output_values
    return r

get_barebone(other)

Get the barebone transform configuration.

Parameters:

Name Type Description Default
other Transform | None

Optional transform to use as base

required

Returns:

Type Description
Transform | None

tuple[Transform | None, Transform | None]: Updated self transform

Transform | None

and transform to store in library

Source code in graflo/architecture/contract/ingestion/transform.py
def get_barebone(
    self, other: Transform | None
) -> tuple[Transform | None, Transform | None]:
    """Get the barebone transform configuration.

    Args:
        other: Optional transform to use as base

    Returns:
        tuple[Transform | None, Transform | None]: Updated self transform
        and transform to store in library
    """
    self_param = self.to_dict(exclude_defaults=True)
    if self.foo is not None:
        # self will be the lib transform
        return None, self
    elif other is not None and other.foo is not None:
        # init self from other
        self_param.pop("foo", None)
        self_param.pop("module", None)
        other_param = other.to_dict(exclude_defaults=True)
        other_param.update(self_param)
        return Transform(**other_param), None
    else:
        return None, None

merge_from(t)

Merge another transform's configuration into a copy of it.

Returns a new Transform with values from self overriding t where set. Does not override ConfigBaseModel.update (in-place); use this for copy-and-merge semantics.

Parameters:

Name Type Description Default
t Transform

Transform to merge from

required

Returns:

Name Type Description
Transform Transform

New transform with merged configuration

Source code in graflo/architecture/contract/ingestion/transform.py
def merge_from(self, t: Transform) -> Transform:
    """Merge another transform's configuration into a copy of it.

    Returns a new Transform with values from self overriding t where set.
    Does not override ConfigBaseModel.update (in-place); use this for
    copy-and-merge semantics.

    Args:
        t: Transform to merge from

    Returns:
        Transform: New transform with merged configuration
    """
    t_copy = deepcopy(t)
    if self.input:
        t_copy.input = self.input
    if self.output:
        t_copy.output = self.output
    if self.params:
        t_copy.params = {**t_copy.params, **self.params}
    t_copy._refresh_derived()
    return t_copy

planned_output_field_names(doc=None)

Return output field names this transform would write on success.

Source code in graflo/architecture/contract/ingestion/transform.py
def planned_output_field_names(
    self, doc: dict[str, Any] | None = None
) -> tuple[str, ...]:
    """Return output field names this transform would write on success."""
    if self.target == "keys":
        if doc is None:
            return ()
        return tuple(sorted(self._selected_keys(doc)))

    if self.input_groups:
        if self.output_groups:
            names: list[str] = []
            for group in self.output_groups:
                names.extend(group)
            return tuple(dict.fromkeys(names))
        if self.output:
            return self.output
        scalar_names: list[str] = []
        for group in self.input_groups:
            if len(group) != 1:
                return ()
            scalar_names.append(group[0])
        return tuple(scalar_names)

    if self.dress is not None:
        return (self.dress.key, self.dress.value)

    if self.rename:
        if doc is None:
            return tuple(self.rename.values())
        return tuple(self.rename[src] for src in self.rename if src in doc)

    if self.output:
        return self.output

    return ()

build_resource_runtime(config, vertex_config, edge_config, transforms=None, *, strict_references=False, dynamic_edge_feedback=False, allowed_vertex_names=None, target_db_flavor=None)

Construct a fully initialized :class:ResourceRuntime from declarative config.

Source code in graflo/architecture/contract/runtime/resource.py
def build_resource_runtime(
    config: ResourceConfig,
    vertex_config: VertexConfig,
    edge_config: EdgeConfig,
    transforms: dict[str, ProtoTransform] | None = None,
    *,
    strict_references: bool = False,
    dynamic_edge_feedback: bool = False,
    allowed_vertex_names: set[str] | None = None,
    target_db_flavor: DBType | None = None,
) -> ResourceRuntime:
    """Construct a fully initialized :class:`ResourceRuntime` from declarative config."""
    return ResourceRuntime(
        config,
        vertex_config,
        edge_config,
        transforms or {},
        strict_references=strict_references,
        dynamic_edge_feedback=dynamic_edge_feedback,
        allowed_vertex_names=allowed_vertex_names,
        target_db_flavor=target_db_flavor,
    )