Skip to content

graflo.architecture.contract.bindings

Resource connectors and named binding collections.

APIConnector

Bases: ResourceConnector

Connector for REST API endpoints.

Declares the non-secret access pattern (path, method, pagination). Runtime base_url and credentials are supplied via connector_connection -> conn_proxy -> :class:~graflo.hq.connection_provider.ApiGeneralizedConnConfig.

Attributes:

Name Type Description
path str

Relative endpoint path (e.g. /api/users).

method str

HTTP method (default GET).

params dict[str, Any]

Static query parameters.

pagination PaginationConfig | None

Pagination strategy and response path configuration.

row_annotations dict[str, Any]

Constant fields merged into every fetched row (doc wins).

headers dict[str, str]

Non-secret HTTP headers.

timeout float | None

Request timeout in seconds.

retries int

Number of retry attempts.

retry_backoff_factor float

Backoff factor for retries.

retry_status_forcelist list[int]

HTTP status codes to retry on.

verify bool

Verify SSL certificates.

Source code in graflo/architecture/contract/bindings/connectors.py
class APIConnector(ResourceConnector):
    """Connector for REST API endpoints.

    Declares the non-secret access pattern (path, method, pagination). Runtime
    ``base_url`` and credentials are supplied via ``connector_connection`` ->
    ``conn_proxy`` -> :class:`~graflo.hq.connection_provider.ApiGeneralizedConnConfig`.

    Attributes:
        path: Relative endpoint path (e.g. ``/api/users``).
        method: HTTP method (default ``GET``).
        params: Static query parameters.
        pagination: Pagination strategy and response path configuration.
        row_annotations: Constant fields merged into every fetched row (doc wins).
        headers: Non-secret HTTP headers.
        timeout: Request timeout in seconds.
        retries: Number of retry attempts.
        retry_backoff_factor: Backoff factor for retries.
        retry_status_forcelist: HTTP status codes to retry on.
        verify: Verify SSL certificates.
    """

    path: str = Field(..., description="Relative API endpoint path")
    method: str = "GET"
    params: dict[str, Any] = Field(default_factory=dict)
    pagination: PaginationConfig | None = None
    headers: dict[str, str] = Field(default_factory=dict)
    timeout: float | None = None
    retries: int = 0
    retry_backoff_factor: float = 0.1
    retry_status_forcelist: list[int] = Field(
        default_factory=lambda: [500, 502, 503, 504]
    )
    verify: bool = True

    @staticmethod
    def _join_url(base_url: str, path: str) -> str:
        return f"{base_url.rstrip('/')}/{path.lstrip('/')}"

    def matches(self, resource_identifier: str) -> bool:
        """Match resource name, connector name, or path tail."""
        if self.name is not None and resource_identifier == self.name:
            return True
        if self.resource_name is not None and resource_identifier == self.resource_name:
            return True
        path_tail = self.path.rstrip("/").rsplit("/", 1)[-1]
        return resource_identifier in {self.path, path_tail}

    def bound_source_kind(self) -> BoundSourceKind:
        return BoundSourceKind.API

    def build_api_config(
        self,
        *,
        base_url: str,
        auth: "ApiAuth | None" = None,
        default_headers: dict[str, str] | None = None,
        page_size_override: int | None = None,
    ) -> "APIConfig":
        """Merge contract fields with runtime connection config into ``APIConfig``."""
        from graflo.data_source.api import APIConfig

        headers = dict(default_headers or {})
        headers.update(self.headers)

        pagination = self.pagination
        if pagination is not None and page_size_override is not None:
            pagination = pagination.model_copy(
                update={
                    "request": pagination.request.model_copy(
                        update={"page_size": page_size_override}
                    )
                }
            )

        return APIConfig(
            url=self._join_url(base_url, self.path),
            method=self.method,
            headers=headers,
            auth=auth,
            params=dict(self.params),
            timeout=self.timeout,
            retries=self.retries,
            retry_backoff_factor=self.retry_backoff_factor,
            retry_status_forcelist=list(self.retry_status_forcelist),
            verify=self.verify,
            pagination=pagination,
            row_annotations=dict(self.row_annotations),
        )

build_api_config(*, base_url, auth=None, default_headers=None, page_size_override=None)

Merge contract fields with runtime connection config into APIConfig.

Source code in graflo/architecture/contract/bindings/connectors.py
def build_api_config(
    self,
    *,
    base_url: str,
    auth: "ApiAuth | None" = None,
    default_headers: dict[str, str] | None = None,
    page_size_override: int | None = None,
) -> "APIConfig":
    """Merge contract fields with runtime connection config into ``APIConfig``."""
    from graflo.data_source.api import APIConfig

    headers = dict(default_headers or {})
    headers.update(self.headers)

    pagination = self.pagination
    if pagination is not None and page_size_override is not None:
        pagination = pagination.model_copy(
            update={
                "request": pagination.request.model_copy(
                    update={"page_size": page_size_override}
                )
            }
        )

    return APIConfig(
        url=self._join_url(base_url, self.path),
        method=self.method,
        headers=headers,
        auth=auth,
        params=dict(self.params),
        timeout=self.timeout,
        retries=self.retries,
        retry_backoff_factor=self.retry_backoff_factor,
        retry_status_forcelist=list(self.retry_status_forcelist),
        verify=self.verify,
        pagination=pagination,
        row_annotations=dict(self.row_annotations),
    )

matches(resource_identifier)

Match resource name, connector name, or path tail.

Source code in graflo/architecture/contract/bindings/connectors.py
def matches(self, resource_identifier: str) -> bool:
    """Match resource name, connector name, or path tail."""
    if self.name is not None and resource_identifier == self.name:
        return True
    if self.resource_name is not None and resource_identifier == self.resource_name:
        return True
    path_tail = self.path.rstrip("/").rsplit("/", 1)[-1]
    return resource_identifier in {self.path, path_tail}

ApiResponseStructure

Bases: ConfigBaseModel

Maps JSON response envelope fields to extraction and pagination signals.

Source code in graflo/architecture/contract/bindings/connectors.py
class ApiResponseStructure(ConfigBaseModel):
    """Maps JSON response envelope fields to extraction and pagination signals."""

    records_path: str | None = Field(
        default=None,
        description="Dot path to the record list (e.g. ``results``).",
    )
    total_count_path: str | None = Field(
        default=None,
        description="Dot path to total item count across all pages (e.g. ``count``).",
    )
    offset_path: str | None = Field(
        default=None,
        description="Dot path to echoed page start index (e.g. ``offset``).",
    )
    next_offset_path: str | None = Field(
        default=None,
        description=(
            "Dot path to server-provided next offset for the following request "
            "(e.g. ``next_offset``)."
        ),
    )
    has_more_path: str | None = Field(
        default=None,
        description="Dot path to a boolean more-pages flag (e.g. ``has_more``).",
    )
    cursor_path: str | None = Field(
        default=None,
        description="Dot path to the next opaque cursor token.",
    )
    batch_metadata_paths: dict[str, str] = Field(
        default_factory=dict,
        description=(
            "Map row annotation keys to response dot paths "
            "(e.g. ``_batch_id: result_id``)."
        ),
    )
    auto_detect: bool = Field(
        default=False,
        description=(
            "When true, infer unset response paths from the first response body."
        ),
    )

BindingsConfig

Bases: ConfigBaseModel

Declarative bindings contract (connectors and resource wiring).

Source code in graflo/architecture/contract/bindings/core.py
class BindingsConfig(ConfigBaseModel):
    """Declarative bindings contract (connectors and resource wiring)."""

    connector_templates: list[ConnectorTemplate] = Field(
        default_factory=list,
        description=(
            "Named connector defaults for ``base:`` expansion on connector entries."
        ),
    )
    conn_proxy: str | None = Field(
        default=None,
        description=(
            "Default runtime connection proxy for connectors without an explicit "
            "``connector_connection`` entry."
        ),
    )
    connectors: list[AnyConnector] = Field(default_factory=list)
    # Accept dict entries at init-time (see validators below).
    # Internally and at runtime, Graflo uses typed lists derived from these.
    resource_connector: list[ResourceConnectorBinding | dict[str, str]] = Field(
        default_factory=list
    )
    # Connector -> runtime endpoint config indirection (proxy by name).
    connector_connection: list[ConnectorConnectionBinding | dict[str, str]] = Field(
        default_factory=list
    )
    _resource_connector_typed: list[ResourceConnectorBinding] = PrivateAttr(
        default_factory=list
    )
    _connector_connection_typed: list[ConnectorConnectionBinding] = PrivateAttr(
        default_factory=list
    )
    _connectors_index: dict[str, ResourceConnector] = PrivateAttr(default_factory=dict)
    _connectors_name_index: dict[str, str] = PrivateAttr(default_factory=dict)
    _resource_to_connector_hashes: dict[str, list[str]] = PrivateAttr(
        default_factory=dict
    )
    _connector_to_conn_proxy: dict[str, str] = PrivateAttr(default_factory=dict)
    staging_proxy: list[StagingProxyBinding | dict[str, str]] = Field(
        default_factory=list,
        description="Optional named staging endpoints (S3) -> conn_proxy wiring.",
    )
    _staging_proxy_typed: list[StagingProxyBinding] = PrivateAttr(default_factory=list)
    _staging_name_to_conn_proxy: dict[str, str] = PrivateAttr(default_factory=dict)

    @property
    def connector_connection_bindings(
        self,
    ) -> list[ConnectorConnectionBinding]:
        # Expose typed entries for downstream components (type-checker friendly).
        return self._connector_connection_typed

    @model_validator(mode="before")
    @classmethod
    def _expand_connector_templates(cls, data: Any) -> Any:
        if not isinstance(data, dict):
            return data
        return _expand_connectors_from_templates(dict(data))

    @field_validator("connector_templates", mode="before")
    @classmethod
    def _coerce_connector_templates(cls, v: Any) -> list[ConnectorTemplate]:
        if v is None:
            return []
        if not isinstance(v, list):
            raise ValueError("connector_templates must be a list")
        coerced: list[ConnectorTemplate] = []
        for i, item in enumerate(v):
            if isinstance(item, ConnectorTemplate):
                coerced.append(item)
                continue
            if isinstance(item, dict):
                try:
                    coerced.append(ConnectorTemplate.model_validate(item))
                except Exception as e:  # noqa: BLE001
                    raise ValueError(
                        f"Invalid connector_templates entry at index {i}: {item!r}."
                    ) from e
                continue
            raise ValueError(
                f"Invalid connector_templates entry at index {i}: expected dict or "
                f"ConnectorTemplate, got {type(item).__name__}."
            )
        return coerced

    @field_validator("staging_proxy", mode="before")
    @classmethod
    def _coerce_staging_proxy_entries(
        cls, v: Any
    ) -> list[StagingProxyBinding | dict[str, str]]:
        if v is None:
            return []
        if not isinstance(v, list):
            raise ValueError(
                "staging_proxy must be a list of {name, conn_proxy} entries"
            )
        coerced: list[StagingProxyBinding | dict[str, str]] = []
        for i, item in enumerate(v):
            if isinstance(item, StagingProxyBinding):
                coerced.append(item)
                continue
            if isinstance(item, dict):
                missing = [k for k in ("name", "conn_proxy") if k not in item]
                if missing:
                    raise ValueError(
                        f"Invalid staging_proxy entry at index {i}: missing {missing}."
                    )
                coerced.append(StagingProxyBinding.model_validate(item))
                continue
            raise ValueError(
                f"Invalid staging_proxy entry at index {i}: got {type(item).__name__}."
            )
        return coerced

    def _rebuild_staging_proxy_index(self) -> None:
        self._staging_name_to_conn_proxy = {}
        for m in self._staging_proxy_typed:
            existing = self._staging_name_to_conn_proxy.get(m.name)
            if existing is not None and existing != m.conn_proxy:
                raise ValueError(
                    f"Duplicate staging_proxy name '{m.name}' with conflicting conn_proxy."
                )
            self._staging_name_to_conn_proxy[m.name] = m.conn_proxy

    def get_staging_conn_proxy(self, name: str) -> str | None:
        """Return ``conn_proxy`` for a staging profile name, if declared."""
        return self._staging_name_to_conn_proxy.get(name)

    def _rebuild_indexes(self) -> None:
        self._connectors_index = {}
        self._connectors_name_index = {}
        for connector in self.connectors:
            existing = self._connectors_index.get(connector.hash)
            if existing is not None:
                raise ValueError(
                    "Connector hash collision detected for connectors "
                    f"'{type(existing).__name__}' and '{type(connector).__name__}' "
                    f"(hash='{connector.hash}')."
                )
            self._connectors_index[connector.hash] = connector

            if connector.name:
                existing_hash = self._connectors_name_index.get(connector.name)
                if existing_hash is not None and existing_hash != connector.hash:
                    raise ValueError(
                        "Connector names must be unique when provided. "
                        f"Duplicate connector name '{connector.name}'."
                    )
                self._connectors_name_index[connector.name] = connector.hash

    def _append_resource_connector_hash(
        self, resource_name: str, connector_hash: str
    ) -> None:
        """Append *connector_hash* for *resource_name* if not already present (order kept)."""
        bucket = self._resource_to_connector_hashes.setdefault(resource_name, [])
        if connector_hash not in bucket:
            bucket.append(connector_hash)

    @field_validator("resource_connector", mode="before")
    @classmethod
    def _coerce_resource_connector_entries(
        cls, v: Any
    ) -> list[ResourceConnectorBinding]:
        if v is None:
            return []
        if not isinstance(v, list):
            raise ValueError(
                "resource_connector must be a list of {resource, connector} entries"
            )

        coerced: list[ResourceConnectorBinding] = []
        for i, item in enumerate(v):
            if isinstance(item, ResourceConnectorBinding):
                coerced.append(item)
                continue

            if isinstance(item, dict):
                missing = [k for k in ("resource", "connector") if k not in item]
                if missing:
                    raise ValueError(
                        f"Invalid resource_connector entry at index {i}: missing required keys {missing}. "
                        "Expected keys: ['resource', 'connector']."
                    )

                try:
                    coerced.append(ResourceConnectorBinding.model_validate(item))
                except Exception as e:  # noqa: BLE001
                    # Keep the message concise and contextual; nested pydantic
                    # errors can be noisy for config authors.
                    raise ValueError(
                        f"Invalid resource_connector entry at index {i}: {item!r}."
                    ) from e
                continue

            raise ValueError(
                f"Invalid resource_connector entry at index {i}: expected dict or "
                f"ResourceConnectorBinding, got {type(item).__name__}."
            )

        return coerced

    @field_validator("connector_connection", mode="before")
    @classmethod
    def _coerce_connector_connection_entries(
        cls, v: Any
    ) -> list[ConnectorConnectionBinding]:
        if v is None:
            return []
        if not isinstance(v, list):
            raise ValueError(
                "connector_connection must be a list of {connector, conn_proxy} entries"
            )

        coerced: list[ConnectorConnectionBinding] = []
        for i, item in enumerate(v):
            if isinstance(item, ConnectorConnectionBinding):
                coerced.append(item)
                continue

            if isinstance(item, dict):
                missing = [k for k in ("connector", "conn_proxy") if k not in item]
                if missing:
                    raise ValueError(
                        f"Invalid connector_connection entry at index {i}: missing required keys {missing}. "
                        "Expected keys: ['connector', 'conn_proxy']."
                    )
                try:
                    coerced.append(ConnectorConnectionBinding.model_validate(item))
                except Exception as e:  # noqa: BLE001
                    raise ValueError(
                        f"Invalid connector_connection entry at index {i}: {item!r}."
                    ) from e
                continue

            raise ValueError(
                f"Invalid connector_connection entry at index {i}: expected dict or "
                f"ConnectorConnectionBinding, got {type(item).__name__}."
            )

        return coerced

    @staticmethod
    def default_connector_name(connector: ResourceConnector) -> str:
        if connector.name:
            return connector.name
        if isinstance(connector, FileConnector):
            return connector.regex or str(connector.sub_path)
        if isinstance(connector, TableConnector):
            return connector.table_name
        if isinstance(connector, SparqlConnector):
            return connector.rdf_class
        if isinstance(connector, APIConnector):
            return connector.path
        raise TypeError(f"Unsupported connector type: {type(connector)!r}")

    @model_validator(mode="after")
    def _populate_resource_connector(self) -> Self:
        self._rebuild_indexes()
        self._resource_to_connector_hashes = {}

        # Create typed views so internal code never has to handle dicts.
        self._resource_connector_typed = [
            ResourceConnectorBinding.model_validate(m) if isinstance(m, dict) else m
            for m in self.resource_connector
        ]
        self._connector_connection_typed = [
            ConnectorConnectionBinding.model_validate(m) if isinstance(m, dict) else m
            for m in self.connector_connection
        ]
        self._staging_proxy_typed = [
            StagingProxyBinding.model_validate(m) if isinstance(m, dict) else m
            for m in self.staging_proxy
        ]
        self._rebuild_staging_proxy_index()

        for connector in self.connectors:
            if connector.resource_name is None:
                continue
            self._append_resource_connector_hash(
                connector.resource_name, connector.hash
            )

        for mapping in self._resource_connector_typed:
            connector_hash = self._connectors_name_index.get(mapping.connector)
            if connector_hash is None:
                if mapping.connector in self._connectors_index:
                    connector_hash = mapping.connector
                else:
                    raise ValueError(
                        f"resource_connector references unknown connector '{mapping.connector}' "
                        f"for resource '{mapping.resource}'."
                    )
            self._append_resource_connector_hash(mapping.resource, connector_hash)
        self._rebuild_connector_to_conn_proxy()
        return self

    def _resolve_connector_ref_to_hash(self, connector_ref: str) -> str:
        """Resolve a connector reference to its canonical connector hash.

        Allowed references:
        - ``connector.hash`` (canonical internal id), or
        - ``connector.name`` (when a name is provided / auto-filled).

        Ingestion resource names are not valid connector references (a resource
        may map to multiple connectors).
        """
        if connector_ref in self._connectors_index:
            return connector_ref
        resolved_hash = self._connectors_name_index.get(connector_ref)
        if resolved_hash is None:
            raise ValueError(f"Unknown connector reference '{connector_ref}'")
        return resolved_hash

    def _rebuild_connector_to_conn_proxy(self) -> None:
        self._connector_to_conn_proxy = {}
        for mapping in self._connector_connection_typed:
            connector_hash = self._resolve_connector_ref_to_hash(mapping.connector)
            existing = self._connector_to_conn_proxy.get(connector_hash)
            if existing is not None and existing != mapping.conn_proxy:
                raise ValueError(
                    "Conflicting conn_proxy mapping for connector "
                    f"'{connector_hash}' (existing='{existing}', new='{mapping.conn_proxy}')."
                )
            self._connector_to_conn_proxy[connector_hash] = mapping.conn_proxy

        if self.conn_proxy is not None:
            for connector in self.connectors:
                if connector.hash not in self._connector_to_conn_proxy:
                    self._connector_to_conn_proxy[connector.hash] = self.conn_proxy

    def get_conn_proxy_for_connector(self, connector: AnyConnector) -> str | None:
        """Return the mapped runtime proxy name for a given connector."""
        return self._connector_to_conn_proxy.get(connector.hash)

    def resolve_connector_refs_to_hashes(self, connector_refs: list[str]) -> set[str]:
        """Resolve connector names or hashes to canonical connector hashes."""
        return {self._resolve_connector_ref_to_hash(ref) for ref in connector_refs}

    def get_connectors_for_resource(self, resource_name: str) -> list[AnyConnector]:
        """Return connectors bound to *resource_name*, in binding order (unique by hash)."""
        result: list[AnyConnector] = []
        for h in self._resource_to_connector_hashes.get(resource_name, []):
            c = self._connectors_index.get(h)
            if isinstance(
                c, (TableConnector, FileConnector, SparqlConnector, APIConnector)
            ):
                result.append(c)
        return result

    @classmethod
    def from_dict(cls, data: dict[str, Any] | list[Any]) -> Self:
        if isinstance(data, list):
            raise ValueError(
                "Bindings.from_dict expects a mapping with 'connectors' and optional "
                "'resource_connector'. List-style connector payloads are not supported."
            )
        legacy_keys = {
            "postgres_connections",
            "table_connectors",
            "file_connectors",
            "sparql_connectors",
        }
        found_legacy = sorted(k for k in legacy_keys if k in data)
        if found_legacy:
            raise ValueError(
                "Legacy Bindings init keys are not supported. "
                f"Unsupported keys: {', '.join(found_legacy)}."
            )
        return cls.model_validate(data)

get_conn_proxy_for_connector(connector)

Return the mapped runtime proxy name for a given connector.

Source code in graflo/architecture/contract/bindings/core.py
def get_conn_proxy_for_connector(self, connector: AnyConnector) -> str | None:
    """Return the mapped runtime proxy name for a given connector."""
    return self._connector_to_conn_proxy.get(connector.hash)

get_connectors_for_resource(resource_name)

Return connectors bound to resource_name, in binding order (unique by hash).

Source code in graflo/architecture/contract/bindings/core.py
def get_connectors_for_resource(self, resource_name: str) -> list[AnyConnector]:
    """Return connectors bound to *resource_name*, in binding order (unique by hash)."""
    result: list[AnyConnector] = []
    for h in self._resource_to_connector_hashes.get(resource_name, []):
        c = self._connectors_index.get(h)
        if isinstance(
            c, (TableConnector, FileConnector, SparqlConnector, APIConnector)
        ):
            result.append(c)
    return result

get_staging_conn_proxy(name)

Return conn_proxy for a staging profile name, if declared.

Source code in graflo/architecture/contract/bindings/core.py
def get_staging_conn_proxy(self, name: str) -> str | None:
    """Return ``conn_proxy`` for a staging profile name, if declared."""
    return self._staging_name_to_conn_proxy.get(name)

resolve_connector_refs_to_hashes(connector_refs)

Resolve connector names or hashes to canonical connector hashes.

Source code in graflo/architecture/contract/bindings/core.py
def resolve_connector_refs_to_hashes(self, connector_refs: list[str]) -> set[str]:
    """Resolve connector names or hashes to canonical connector hashes."""
    return {self._resolve_connector_ref_to_hash(ref) for ref in connector_refs}

BindingsRegistry

Bases: BindingsConfig

Mutable bindings registry for programmatic connector updates.

Source code in graflo/architecture/contract/bindings/core.py
class BindingsRegistry(BindingsConfig):
    """Mutable bindings registry for programmatic connector updates."""

    def bind_connector_to_conn_proxy(
        self,
        connector: AnyConnector,
        conn_proxy: str,
    ) -> None:
        """Bind a connector to a non-secret runtime proxy name.

        Uses ``connector.name`` when available, falling back to ``connector.hash``.
        """
        # Ensure indexes include the connector and that a default name is set.
        if connector.hash not in self._connectors_index:
            self.add_connector(connector)
        # Pick a contract reference string that's stable and user-friendly.
        connector_ref = connector.name or connector.hash

        # Ensure uniqueness by connector.hash (not by ref-string).
        connector_hash = connector.hash
        existing_idx: int | None = None
        for i, m in enumerate(self._connector_connection_typed):
            try:
                if self._resolve_connector_ref_to_hash(m.connector) == connector_hash:
                    existing_idx = i
                    break
            except ValueError:
                continue

        if existing_idx is None:
            self._connector_connection_typed.append(
                ConnectorConnectionBinding(
                    connector=connector_ref, conn_proxy=conn_proxy
                )
            )
        else:
            self._connector_connection_typed[existing_idx] = ConnectorConnectionBinding(
                connector=connector_ref, conn_proxy=conn_proxy
            )
        # Keep the public contract field in sync for serialization / downstream.
        self.connector_connection = list(self._connector_connection_typed)

        self._rebuild_connector_to_conn_proxy()

    def apply_connector_update(self, update: ConnectorUpdate) -> None:
        """Patch a connector in-place in this binding (re-hashes and reindexes).

        Uses ``model_validate`` on merged data so connector validators (including
        hash recomputation) run; ``model_copy(update=...)`` would not re-run them.
        """
        connector_hash = self._resolve_connector_ref_to_hash(update.connector)
        old = self._connectors_index[connector_hash]
        patch = update.as_patch()
        if not patch:
            return
        merged = old.model_dump(mode="python")
        merged.update(patch)
        new = cast(
            AnyConnector,
            old.__class__.model_validate(merged),
        )
        self.replace_connector(old, new)

    def replace_connector(
        self,
        old: ResourceConnector | str,
        new: AnyConnector,
    ) -> None:
        """Swap *old* for *new*, preserving resource wiring and conn_proxy by hash."""
        old_hash = (
            old.hash
            if isinstance(old, ResourceConnector)
            else self._resolve_connector_ref_to_hash(old)
        )
        if old_hash not in self._connectors_index:
            raise KeyError(f"Connector not found for hash={old_hash!r}")

        old_connector = self._connectors_index[old_hash]
        if new.name is None and old_connector.name is not None:
            object.__setattr__(new, "name", old_connector.name)

        replaced = False
        for idx, c in enumerate(self.connectors):
            if c.hash == old_hash:
                self.connectors[idx] = new
                replaced = True
                break
        if not replaced:
            raise KeyError(
                f"Connector with hash={old_hash!r} not found in bindings.connectors list"
            )

        new_hash = new.hash
        for hashes in self._resource_to_connector_hashes.values():
            for i, h in enumerate(hashes):
                if h == old_hash:
                    hashes[i] = new_hash

        old_proxy = self._connector_to_conn_proxy.pop(old_hash, None)
        self._rebuild_indexes()
        if old_proxy is not None:
            self._connector_to_conn_proxy[new_hash] = old_proxy

    def add_connector(
        self,
        connector: AnyConnector,
    ) -> None:
        if connector.name is None:
            object.__setattr__(
                connector, "name", self.default_connector_name(connector)
            )
        existing_name_hash = None
        if connector.name:
            existing_name_hash = self._connectors_name_index.get(connector.name)
        if (
            connector.name
            and existing_name_hash is not None
            and existing_name_hash != connector.hash
        ):
            raise ValueError(
                "Connector names must be unique when provided. "
                f"Duplicate connector name '{connector.name}'."
            )

        if connector.hash in self._connectors_index:
            old_connector = self._connectors_index[connector.hash]
            for idx, existing in enumerate(self.connectors):
                if existing is old_connector:
                    self.connectors[idx] = connector
                    break
        else:
            self.connectors.append(connector)
        self._rebuild_indexes()
        if connector.resource_name is not None:
            self._append_resource_connector_hash(
                connector.resource_name, connector.hash
            )

    def bind_resource(
        self,
        resource_name: str,
        connector: AnyConnector,
    ) -> None:
        if connector.hash not in self._connectors_index:
            raise KeyError(f"Connector not found for hash='{connector.hash}'")
        self._append_resource_connector_hash(resource_name, connector.hash)
        connector_name = connector.name or self.default_connector_name(connector)
        self._resource_connector_typed.append(
            ResourceConnectorBinding(
                resource=resource_name,
                connector=connector_name,
            )
        )
        # Keep the public contract field in sync for serialization / downstream.
        self.resource_connector = list(self._resource_connector_typed)

apply_connector_update(update)

Patch a connector in-place in this binding (re-hashes and reindexes).

Uses model_validate on merged data so connector validators (including hash recomputation) run; model_copy(update=...) would not re-run them.

Source code in graflo/architecture/contract/bindings/core.py
def apply_connector_update(self, update: ConnectorUpdate) -> None:
    """Patch a connector in-place in this binding (re-hashes and reindexes).

    Uses ``model_validate`` on merged data so connector validators (including
    hash recomputation) run; ``model_copy(update=...)`` would not re-run them.
    """
    connector_hash = self._resolve_connector_ref_to_hash(update.connector)
    old = self._connectors_index[connector_hash]
    patch = update.as_patch()
    if not patch:
        return
    merged = old.model_dump(mode="python")
    merged.update(patch)
    new = cast(
        AnyConnector,
        old.__class__.model_validate(merged),
    )
    self.replace_connector(old, new)

bind_connector_to_conn_proxy(connector, conn_proxy)

Bind a connector to a non-secret runtime proxy name.

Uses connector.name when available, falling back to connector.hash.

Source code in graflo/architecture/contract/bindings/core.py
def bind_connector_to_conn_proxy(
    self,
    connector: AnyConnector,
    conn_proxy: str,
) -> None:
    """Bind a connector to a non-secret runtime proxy name.

    Uses ``connector.name`` when available, falling back to ``connector.hash``.
    """
    # Ensure indexes include the connector and that a default name is set.
    if connector.hash not in self._connectors_index:
        self.add_connector(connector)
    # Pick a contract reference string that's stable and user-friendly.
    connector_ref = connector.name or connector.hash

    # Ensure uniqueness by connector.hash (not by ref-string).
    connector_hash = connector.hash
    existing_idx: int | None = None
    for i, m in enumerate(self._connector_connection_typed):
        try:
            if self._resolve_connector_ref_to_hash(m.connector) == connector_hash:
                existing_idx = i
                break
        except ValueError:
            continue

    if existing_idx is None:
        self._connector_connection_typed.append(
            ConnectorConnectionBinding(
                connector=connector_ref, conn_proxy=conn_proxy
            )
        )
    else:
        self._connector_connection_typed[existing_idx] = ConnectorConnectionBinding(
            connector=connector_ref, conn_proxy=conn_proxy
        )
    # Keep the public contract field in sync for serialization / downstream.
    self.connector_connection = list(self._connector_connection_typed)

    self._rebuild_connector_to_conn_proxy()

replace_connector(old, new)

Swap old for new, preserving resource wiring and conn_proxy by hash.

Source code in graflo/architecture/contract/bindings/core.py
def replace_connector(
    self,
    old: ResourceConnector | str,
    new: AnyConnector,
) -> None:
    """Swap *old* for *new*, preserving resource wiring and conn_proxy by hash."""
    old_hash = (
        old.hash
        if isinstance(old, ResourceConnector)
        else self._resolve_connector_ref_to_hash(old)
    )
    if old_hash not in self._connectors_index:
        raise KeyError(f"Connector not found for hash={old_hash!r}")

    old_connector = self._connectors_index[old_hash]
    if new.name is None and old_connector.name is not None:
        object.__setattr__(new, "name", old_connector.name)

    replaced = False
    for idx, c in enumerate(self.connectors):
        if c.hash == old_hash:
            self.connectors[idx] = new
            replaced = True
            break
    if not replaced:
        raise KeyError(
            f"Connector with hash={old_hash!r} not found in bindings.connectors list"
        )

    new_hash = new.hash
    for hashes in self._resource_to_connector_hashes.values():
        for i, h in enumerate(hashes):
            if h == old_hash:
                hashes[i] = new_hash

    old_proxy = self._connector_to_conn_proxy.pop(old_hash, None)
    self._rebuild_indexes()
    if old_proxy is not None:
        self._connector_to_conn_proxy[new_hash] = old_proxy

BoundSourceKind

Bases: BaseEnum

Physical source modality for a bound connector (how rows are retrieved).

This describes the connector-backed access pattern, not the abstract ingestion resource. File format (CSV, JSON, etc.) is chosen by the loader from file extensions.

Attributes:

Name Type Description
FILE

File-based connector (directory + pattern or paths).

SQL_TABLE

SQL table / database-backed connector.

SPARQL

SPARQL / RDF connector (endpoint or local RDF via rdflib).

API

REST API connector (path + pagination on a runtime base URL).

Source code in graflo/architecture/contract/bindings/connectors.py
class BoundSourceKind(BaseEnum):
    """Physical source modality for a bound connector (how rows are retrieved).

    This describes the connector-backed access pattern, not the abstract
    ingestion resource. File format (CSV, JSON, etc.) is chosen by the loader
    from file extensions.

    Attributes:
        FILE: File-based connector (directory + pattern or paths).
        SQL_TABLE: SQL table / database-backed connector.
        SPARQL: SPARQL / RDF connector (endpoint or local RDF via rdflib).
        API: REST API connector (path + pagination on a runtime base URL).
    """

    FILE = "file"
    SQL_TABLE = "sql_table"
    SPARQL = "sparql"
    API = "api"

ColumnTimeFilter

Bases: ConfigBaseModel

Predicate on a single date/time column (SQL-friendly via :class:FilterExpression).

Source code in graflo/architecture/contract/bindings/column_time_filter.py
class ColumnTimeFilter(ConfigBaseModel):
    """Predicate on a single date/time column (SQL-friendly via :class:`FilterExpression`)."""

    column: str = Field(..., description="Column name for the time predicate.")
    start: str | None = Field(
        default=None,
        description="Lower bound (ISO date or datetime). Interpreted with start_inclusive.",
    )
    end: str | None = Field(
        default=None,
        description="Upper bound (ISO date or datetime). Interpreted with end_inclusive.",
    )
    interval: str | None = Field(
        default=None,
        description='Pandas timedelta string (e.g. "7D", "2H"); requires start; '
        "defines [start, start + interval). Mutually exclusive with end.",
    )
    not_equals: str | None = Field(
        default=None,
        description="If set, render column != value. Mutually exclusive with start/end/interval.",
    )
    start_inclusive: bool = Field(
        default=True,
        description="If True (default), lower bound uses >= when start is set.",
    )
    end_inclusive: bool = Field(
        default=False,
        description="If False (default), upper bound uses < when end is set.",
    )

    @model_validator(mode="after")
    def _check_shape(self) -> Self:
        has_pred = any(
            v is not None
            for v in (self.start, self.end, self.interval, self.not_equals)
        )
        if not has_pred:
            # Column-only hint (e.g. introspection sets datetime column without a WHERE).
            return self
        if self.not_equals is not None:
            if any(v is not None for v in (self.start, self.end, self.interval)):
                raise ValueError(
                    "not_equals cannot be combined with start, end, or interval"
                )
            return self
        if self.interval is not None:
            if self.start is None:
                raise ValueError("interval requires start")
            if self.end is not None:
                raise ValueError(
                    "interval cannot be combined with end; use start + interval"
                )
            self._validated_timedelta()
            return self
        if self.start is None and self.end is None:
            raise ValueError(
                "ColumnTimeFilter requires at least one of: start, end, interval, not_equals"
            )
        return self

    def _validated_timedelta(self) -> pd.Timedelta:
        assert self.interval is not None
        try:
            td = pd.Timedelta(self.interval)
        except (ValueError, TypeError) as e:
            raise ValueError(
                f"Invalid pandas timedelta string for interval: {self.interval!r}"
            ) from e
        if pd.isna(td):
            raise ValueError(
                f"Invalid pandas timedelta string for interval: {self.interval!r}"
            )
        return td

    def _interval_upper_literal(self) -> str:
        assert self.start is not None and self.interval is not None
        start_dt, date_only = parse_iso_date_or_datetime(self.start)
        delta = self._validated_timedelta().to_pytimedelta()
        end_dt = start_dt + delta
        upper_date_only = date_only and (end_dt.time() == time.min)
        return format_sql_literal(end_dt, upper_date_only)

    def _lower_literal(self) -> str:
        assert self.start is not None
        dt, date_only = parse_iso_date_or_datetime(self.start)
        return format_sql_literal(dt, date_only)

    def _upper_literal(self) -> str:
        assert self.end is not None
        dt, date_only = parse_iso_date_or_datetime(self.end)
        return format_sql_literal(dt, date_only)

    def as_filter_expression(self) -> FilterExpression | None:
        """Return a single composite AND of leaves, or None if misconfigured."""
        from graflo.filter.onto import (
            ComparisonOperator,
            FilterExpression,
            LogicalOperator,
        )

        if self.not_equals is not None:
            return FilterExpression(
                kind="leaf",
                field=self.column,
                cmp_operator=ComparisonOperator.NEQ,
                value=[self.not_equals],
            )

        leaves: list[FilterExpression] = []

        if self.interval is not None:
            assert self.start is not None
            # Half-open window [start, start + interval).
            leaves.append(
                FilterExpression(
                    kind="leaf",
                    field=self.column,
                    cmp_operator=ComparisonOperator.GE,
                    value=[self._lower_literal()],
                )
            )
            leaves.append(
                FilterExpression(
                    kind="leaf",
                    field=self.column,
                    cmp_operator=ComparisonOperator.LT,
                    value=[self._interval_upper_literal()],
                )
            )
        else:
            if self.start is not None:
                lower_op = (
                    ComparisonOperator.GE
                    if self.start_inclusive
                    else ComparisonOperator.GT
                )
                leaves.append(
                    FilterExpression(
                        kind="leaf",
                        field=self.column,
                        cmp_operator=lower_op,
                        value=[self._lower_literal()],
                    )
                )
            if self.end is not None:
                upper_op = (
                    ComparisonOperator.LE
                    if self.end_inclusive
                    else ComparisonOperator.LT
                )
                leaves.append(
                    FilterExpression(
                        kind="leaf",
                        field=self.column,
                        cmp_operator=upper_op,
                        value=[self._upper_literal()],
                    )
                )

        if not leaves:
            return None
        if len(leaves) == 1:
            return leaves[0]
        return FilterExpression(
            kind="composite",
            operator=LogicalOperator.AND,
            deps=leaves,
        )

as_filter_expression()

Return a single composite AND of leaves, or None if misconfigured.

Source code in graflo/architecture/contract/bindings/column_time_filter.py
def as_filter_expression(self) -> FilterExpression | None:
    """Return a single composite AND of leaves, or None if misconfigured."""
    from graflo.filter.onto import (
        ComparisonOperator,
        FilterExpression,
        LogicalOperator,
    )

    if self.not_equals is not None:
        return FilterExpression(
            kind="leaf",
            field=self.column,
            cmp_operator=ComparisonOperator.NEQ,
            value=[self.not_equals],
        )

    leaves: list[FilterExpression] = []

    if self.interval is not None:
        assert self.start is not None
        # Half-open window [start, start + interval).
        leaves.append(
            FilterExpression(
                kind="leaf",
                field=self.column,
                cmp_operator=ComparisonOperator.GE,
                value=[self._lower_literal()],
            )
        )
        leaves.append(
            FilterExpression(
                kind="leaf",
                field=self.column,
                cmp_operator=ComparisonOperator.LT,
                value=[self._interval_upper_literal()],
            )
        )
    else:
        if self.start is not None:
            lower_op = (
                ComparisonOperator.GE
                if self.start_inclusive
                else ComparisonOperator.GT
            )
            leaves.append(
                FilterExpression(
                    kind="leaf",
                    field=self.column,
                    cmp_operator=lower_op,
                    value=[self._lower_literal()],
                )
            )
        if self.end is not None:
            upper_op = (
                ComparisonOperator.LE
                if self.end_inclusive
                else ComparisonOperator.LT
            )
            leaves.append(
                FilterExpression(
                    kind="leaf",
                    field=self.column,
                    cmp_operator=upper_op,
                    value=[self._upper_literal()],
                )
            )

    if not leaves:
        return None
    if len(leaves) == 1:
        return leaves[0]
    return FilterExpression(
        kind="composite",
        operator=LogicalOperator.AND,
        deps=leaves,
    )

ConnectorTemplate

Bases: ConfigBaseModel

Named connector defaults referenced by base on connector entries.

Template-only metadata (not copied onto connectors): name, conn_proxy. All other fields are merged into expanded connectors (dict fields such as params are deep-merged; scalars and blocks like pagination are replaced when the connector entry provides them explicitly).

Source code in graflo/architecture/contract/bindings/core.py
class ConnectorTemplate(ConfigBaseModel):
    """Named connector defaults referenced by ``base`` on connector entries.

    Template-only metadata (not copied onto connectors): ``name``, ``conn_proxy``.
    All other fields are merged into expanded connectors (dict fields such as
    ``params`` are deep-merged; scalars and blocks like ``pagination`` are replaced
    when the connector entry provides them explicitly).
    """

    model_config = ConfigDict(extra="allow")

    name: str = Field(
        ..., description="Template name referenced by connector ``base``."
    )
    conn_proxy: str | None = Field(
        default=None,
        description=(
            "Template metadata: auto-wires ``connector_connection`` for expanded "
            "connectors that declare ``name``."
        ),
    )

ConnectorUpdate

Bases: ConfigBaseModel

Patch payload for an existing connector (applied after manifest load).

Only connector is a fixed field; any other keys are captured as extras and merged into the resolved connector via model_validate (so validators, including hash recomputation, run). New connector types and fields do not require changes to this model. Not part of the stored GraphManifest; load from a separate file or build in code, then call Bindings.apply_connector_update.

Attributes:

Name Type Description
connector str

Connector name or hash (same resolution as bindings).

Source code in graflo/architecture/contract/bindings/connectors.py
class ConnectorUpdate(ConfigBaseModel):
    """Patch payload for an existing connector (applied after manifest load).

    Only ``connector`` is a fixed field; any other keys are captured as extras and
    merged into the resolved connector via ``model_validate`` (so validators,
    including hash recomputation, run). New connector types and fields do not
    require changes to this model. Not part of the stored ``GraphManifest``;
    load from a separate file or build in code, then call
    ``Bindings.apply_connector_update``.

    Attributes:
        connector: Connector ``name`` or ``hash`` (same resolution as bindings).
    """

    model_config = ConfigDict(extra="allow")

    connector: str = Field(
        ...,
        description="Connector reference: name or hash of the connector to patch.",
    )

    def as_patch(self) -> dict[str, Any]:
        """Return extra keys as a patch mapping (excludes ``connector``)."""
        return dict(self.model_extra or {})

as_patch()

Return extra keys as a patch mapping (excludes connector).

Source code in graflo/architecture/contract/bindings/connectors.py
def as_patch(self) -> dict[str, Any]:
    """Return extra keys as a patch mapping (excludes ``connector``)."""
    return dict(self.model_extra or {})

FileConnector

Bases: ResourceConnector

Connector for matching files.

Attributes:

Name Type Description
regex str | None

Regular expression pattern for matching filenames

sub_path Path

Path to search for matching files (default: "./")

time_filter ColumnTimeFilter | None

Optional structured filter on a date/time column (shared with :class:TableConnector), using :class:~graflo.architecture.contract.bindings.column_time_filter.ColumnTimeFilter.

Source code in graflo/architecture/contract/bindings/connectors.py
class FileConnector(ResourceConnector):
    """Connector for matching files.

    Attributes:
        regex: Regular expression pattern for matching filenames
        sub_path: Path to search for matching files (default: "./")
        time_filter: Optional structured filter on a date/time column (shared with
            :class:`TableConnector`), using :class:`~graflo.architecture.contract.bindings.column_time_filter.ColumnTimeFilter`.
    """

    regex: str | None = None
    sub_path: pathlib.Path = Field(default_factory=lambda: pathlib.Path("./"))
    time_filter: ColumnTimeFilter | None = None

    @model_validator(mode="after")
    def _validate_file_connector(self) -> Self:
        """Ensure sub_path is a Path."""
        if not isinstance(self.sub_path, pathlib.Path):
            object.__setattr__(self, "sub_path", pathlib.Path(self.sub_path))
        if self.row_annotations:
            raise ValueError("row_annotations is not implemented for FileConnector")
        return self

    @property
    def date_field(self) -> str | None:
        """Column used for time filtering, if any (compat alias for ``time_filter.column``)."""
        return self.time_filter.column if self.time_filter else None

    def matches(self, resource_identifier: str) -> bool:
        """Check if connector matches a filename.

        Args:
            resource_identifier: Filename to match

        Returns:
            bool: True if connector matches
        """
        if self.regex is None:
            return False
        return bool(re.match(self.regex, resource_identifier))

    def bound_source_kind(self) -> BoundSourceKind:
        """File connector always uses ``BoundSourceKind.FILE``."""
        return BoundSourceKind.FILE

date_field property

Column used for time filtering, if any (compat alias for time_filter.column).

bound_source_kind()

File connector always uses BoundSourceKind.FILE.

Source code in graflo/architecture/contract/bindings/connectors.py
def bound_source_kind(self) -> BoundSourceKind:
    """File connector always uses ``BoundSourceKind.FILE``."""
    return BoundSourceKind.FILE

matches(resource_identifier)

Check if connector matches a filename.

Parameters:

Name Type Description Default
resource_identifier str

Filename to match

required

Returns:

Name Type Description
bool bool

True if connector matches

Source code in graflo/architecture/contract/bindings/connectors.py
def matches(self, resource_identifier: str) -> bool:
    """Check if connector matches a filename.

    Args:
        resource_identifier: Filename to match

    Returns:
        bool: True if connector matches
    """
    if self.regex is None:
        return False
    return bool(re.match(self.regex, resource_identifier))

JoinClause

Bases: ConfigBaseModel

Specification for a SQL JOIN operation.

Used by TableConnector to describe multi-table queries. Each JoinClause adds one JOIN to the generated SQL. The base row uses TableConnector.base_alias (default base), not a hard-coded name.

Attributes:

Name Type Description
table str

Table name to join (e.g. "all_classes").

schema_name str | None

Optional schema override for the joined table.

alias str | None

SQL alias for the joined table (e.g. "s", "t"). Required when the same table is joined more than once.

on_self str

Column on the base (left) table used in the ON condition.

on_other str

Column on the joined (right) table used in the ON condition.

join_type str

Type of join -- LEFT, INNER, etc. Defaults to LEFT.

select_fields list[str] | None

Explicit list of columns to SELECT from this join. When None every column of the joined table is included (aliased with the join alias prefix).

Source code in graflo/architecture/contract/bindings/connectors.py
class JoinClause(ConfigBaseModel):
    """Specification for a SQL JOIN operation.

    Used by TableConnector to describe multi-table queries. Each JoinClause
    adds one JOIN to the generated SQL. The base row uses ``TableConnector.base_alias``
    (default ``base``), not a hard-coded name.

    Attributes:
        table: Table name to join (e.g. "all_classes").
        schema_name: Optional schema override for the joined table.
        alias: SQL alias for the joined table (e.g. "s", "t"). Required when
            the same table is joined more than once.
        on_self: Column on the base (left) table used in the ON condition.
        on_other: Column on the joined (right) table used in the ON condition.
        join_type: Type of join -- LEFT, INNER, etc. Defaults to LEFT.
        select_fields: Explicit list of columns to SELECT from this join.
            When None every column of the joined table is included (aliased
            with the join alias prefix).
    """

    table: str = Field(..., description="Table name to join.")
    schema_name: str | None = Field(
        default=None, description="Schema override for the joined table."
    )
    alias: str | None = Field(
        default=None, description="SQL alias for the joined table."
    )
    on_self: str = Field(
        ..., description="Column on the base table for the ON condition."
    )
    on_other: str = Field(
        ..., description="Column on the joined table for the ON condition."
    )
    join_type: str = Field(default="LEFT", description="JOIN type (LEFT, INNER, etc.).")
    select_fields: list[str] | None = Field(
        default=None,
        description="Columns to SELECT from this join (None = all columns).",
    )

PaginationConfig

Bases: ConfigBaseModel

Configuration for API pagination (contract-level, secret-free).

Combines request construction (request) with response envelope parsing (response).

Source code in graflo/architecture/contract/bindings/connectors.py
class PaginationConfig(ConfigBaseModel):
    """Configuration for API pagination (contract-level, secret-free).

    Combines request construction (``request``) with response envelope parsing
    (``response``).
    """

    request: PaginationRequestConfig = Field(default_factory=PaginationRequestConfig)
    response: ApiResponseStructure = Field(default_factory=ApiResponseStructure)

PaginationRequestConfig

Bases: ConfigBaseModel

Configuration for building paginated HTTP requests.

Source code in graflo/architecture/contract/bindings/connectors.py
class PaginationRequestConfig(ConfigBaseModel):
    """Configuration for building paginated HTTP requests."""

    strategy: Literal["offset", "page", "cursor"] = "offset"
    offset_param: str = "offset"
    limit_param: str = Field(
        default="limit",
        description=(
            "Query parameter name for page size (offset strategy only). "
            "The value sent is ``page_size``, not a total item cap."
        ),
    )
    cursor_param: str = "cursor"
    page_param: str = "page"
    per_page_param: str = Field(
        default="per_page",
        description=(
            "Query parameter name for page size (page strategy only). "
            "The value sent is ``page_size``, not a total item cap."
        ),
    )
    initial_offset: int = 0
    initial_page: int = 1
    initial_cursor: str | None = None
    page_size: int = Field(
        default=100,
        description=(
            "Records requested per HTTP page. Sent as the value of "
            "``limit_param`` (offset) or ``per_page_param`` (page)."
        ),
    )

ResourceConnector

Bases: ConfigBaseModel, ABC

Abstract base class for resource connectors (files or tables).

Provides common API for connector matching and resource identification. All concrete connector types inherit from this class.

Connectors only describe source-side matching/query behavior. Resource-to- connector linkage is handled by Bindings.

Source code in graflo/architecture/contract/bindings/connectors.py
class ResourceConnector(ConfigBaseModel, abc.ABC):
    """Abstract base class for resource connectors (files or tables).

    Provides common API for connector matching and resource identification.
    All concrete connector types inherit from this class.

    Connectors only describe source-side matching/query behavior. Resource-to-
    connector linkage is handled by ``Bindings``.
    """

    name: str | None = Field(
        default=None,
        description="Optional connector name used by top-level resource_connector mapping.",
    )
    resource_name: str | None = Field(
        default=None,
        description="Optional direct resource binding declared on the connector itself.",
    )
    hash: str = Field(
        default="",
        exclude=True,
        description="Deterministic internal connector id derived from defining fields.",
    )
    row_annotations: dict[str, Any] = Field(
        default_factory=dict,
        description=(
            "Constant fields merged into every fetched row as defaults (response "
            "fields take priority). Only implemented for :class:`APIConnector`; "
            "other connector types reject non-empty values."
        ),
    )

    def _hash_payload(self) -> dict[str, Any]:
        payload = self.model_dump(
            mode="json",
            by_alias=True,
            exclude={"hash", "name", "resource_name"},
        )
        payload["_connector_type"] = type(self).__name__
        return payload

    @model_validator(mode="after")
    def _compute_hash(self) -> Self:
        canonical = json.dumps(
            self._hash_payload(), sort_keys=True, separators=(",", ":")
        )
        object.__setattr__(
            self,
            "hash",
            hashlib.sha256(canonical.encode("utf-8")).hexdigest(),
        )
        return self

    @abc.abstractmethod
    def matches(self, resource_identifier: str) -> bool:
        """Check if connector matches a resource identifier.

        Args:
            resource_identifier: Identifier to match (filename or table name)

        Returns:
            bool: True if connector matches
        """
        pass

    @abc.abstractmethod
    def bound_source_kind(self) -> BoundSourceKind:
        """Return the physical source kind for this connector."""
        pass

bound_source_kind() abstractmethod

Return the physical source kind for this connector.

Source code in graflo/architecture/contract/bindings/connectors.py
@abc.abstractmethod
def bound_source_kind(self) -> BoundSourceKind:
    """Return the physical source kind for this connector."""
    pass

matches(resource_identifier) abstractmethod

Check if connector matches a resource identifier.

Parameters:

Name Type Description Default
resource_identifier str

Identifier to match (filename or table name)

required

Returns:

Name Type Description
bool bool

True if connector matches

Source code in graflo/architecture/contract/bindings/connectors.py
@abc.abstractmethod
def matches(self, resource_identifier: str) -> bool:
    """Check if connector matches a resource identifier.

    Args:
        resource_identifier: Identifier to match (filename or table name)

    Returns:
        bool: True if connector matches
    """
    pass

ResourceConnectorBinding

Bases: ConfigBaseModel

Top-level resource -> connector-name mapping entry.

Source code in graflo/architecture/contract/bindings/core.py
class ResourceConnectorBinding(ConfigBaseModel):
    """Top-level resource -> connector-name mapping entry."""

    resource: str
    connector: str

SparqlConnector

Bases: ResourceConnector

Connector for matching SPARQL / RDF data sources.

Each SparqlConnector targets instances of a single rdf:Class. It can be backed either by a remote SPARQL endpoint (Fuseki, Blazegraph, ...) or by a local RDF file parsed with rdflib.

Attributes:

Name Type Description
rdf_class str

Full URI of the rdf:Class whose instances this connector fetches (e.g. "http://example.org/Person").

endpoint_url str | None

SPARQL query endpoint URL. When set, instances are fetched via HTTP. When None the connector is for local file mode.

graph_uri str | None

Named-graph URI to restrict the query to (optional).

sparql_query str | None

Custom SPARQL SELECT query override. When provided the auto-generated per-class query is skipped.

rdf_file Path | None

Path to a local RDF file (.ttl, .rdf, .n3, .jsonld). Mutually exclusive with endpoint_url.

Source code in graflo/architecture/contract/bindings/connectors.py
class SparqlConnector(ResourceConnector):
    """Connector for matching SPARQL / RDF data sources.

    Each ``SparqlConnector`` targets instances of a single ``rdf:Class``.
    It can be backed either by a remote SPARQL endpoint (Fuseki, Blazegraph, ...)
    or by a local RDF file parsed with *rdflib*.

    Attributes:
        rdf_class: Full URI of the ``rdf:Class`` whose instances this connector
            fetches (e.g. ``"http://example.org/Person"``).
        endpoint_url: SPARQL query endpoint URL.  When set, instances are
            fetched via HTTP.  When ``None`` the connector is for local file mode.
        graph_uri: Named-graph URI to restrict the query to (optional).
        sparql_query: Custom SPARQL ``SELECT`` query override.  When provided
            the auto-generated per-class query is skipped.
        rdf_file: Path to a local RDF file (``.ttl``, ``.rdf``, ``.n3``,
            ``.jsonld``).  Mutually exclusive with *endpoint_url*.
    """

    rdf_class: str = Field(
        ..., description="URI of the rdf:Class to fetch instances of"
    )
    endpoint_url: str | None = Field(
        default=None, description="SPARQL query endpoint URL"
    )
    graph_uri: str | None = Field(
        default=None, description="Named graph URI (optional)"
    )
    sparql_query: str | None = Field(
        default=None, description="Custom SPARQL query override"
    )
    rdf_file: pathlib.Path | None = Field(
        default=None, description="Path to a local RDF file"
    )

    @model_validator(mode="after")
    def _reject_row_annotations(self) -> Self:
        # TODO: implement row_annotations for SparqlConnector row payloads.
        if self.row_annotations:
            raise ValueError("row_annotations is not implemented for SparqlConnector")
        return self

    def matches(self, resource_identifier: str) -> bool:
        """Match by the local name (fragment) of the rdf:Class URI.

        Args:
            resource_identifier: Identifier to match against

        Returns:
            True when *resource_identifier* equals the class local name
        """
        local_name = self.rdf_class.rsplit("#", 1)[-1].rsplit("/", 1)[-1]
        return resource_identifier == local_name

    def bound_source_kind(self) -> BoundSourceKind:
        """Return ``BoundSourceKind.SPARQL``."""
        return BoundSourceKind.SPARQL

    def build_select_query(self) -> str:
        """Build a SPARQL SELECT query for instances of ``rdf_class``.

        If *sparql_query* is set it is returned as-is.  Otherwise a simple
        per-class query is generated::

            SELECT ?s ?p ?o WHERE {
              ?s a <rdf_class> .
              ?s ?p ?o .
            }

        Returns:
            SPARQL query string
        """
        if self.sparql_query:
            return self.sparql_query

        graph_open = f"GRAPH <{self.graph_uri}> {{" if self.graph_uri else ""
        graph_close = "}" if self.graph_uri else ""

        return (
            "SELECT ?s ?p ?o WHERE { "
            f"{graph_open} "
            f"?s a <{self.rdf_class}> . "
            f"?s ?p ?o . "
            f"{graph_close} "
            "}"
        )

bound_source_kind()

Return BoundSourceKind.SPARQL.

Source code in graflo/architecture/contract/bindings/connectors.py
def bound_source_kind(self) -> BoundSourceKind:
    """Return ``BoundSourceKind.SPARQL``."""
    return BoundSourceKind.SPARQL

build_select_query()

Build a SPARQL SELECT query for instances of rdf_class.

If sparql_query is set it is returned as-is. Otherwise a simple per-class query is generated::

SELECT ?s ?p ?o WHERE {
  ?s a <rdf_class> .
  ?s ?p ?o .
}

Returns:

Type Description
str

SPARQL query string

Source code in graflo/architecture/contract/bindings/connectors.py
def build_select_query(self) -> str:
    """Build a SPARQL SELECT query for instances of ``rdf_class``.

    If *sparql_query* is set it is returned as-is.  Otherwise a simple
    per-class query is generated::

        SELECT ?s ?p ?o WHERE {
          ?s a <rdf_class> .
          ?s ?p ?o .
        }

    Returns:
        SPARQL query string
    """
    if self.sparql_query:
        return self.sparql_query

    graph_open = f"GRAPH <{self.graph_uri}> {{" if self.graph_uri else ""
    graph_close = "}" if self.graph_uri else ""

    return (
        "SELECT ?s ?p ?o WHERE { "
        f"{graph_open} "
        f"?s a <{self.rdf_class}> . "
        f"?s ?p ?o . "
        f"{graph_close} "
        "}"
    )

matches(resource_identifier)

Match by the local name (fragment) of the rdf:Class URI.

Parameters:

Name Type Description Default
resource_identifier str

Identifier to match against

required

Returns:

Type Description
bool

True when resource_identifier equals the class local name

Source code in graflo/architecture/contract/bindings/connectors.py
def matches(self, resource_identifier: str) -> bool:
    """Match by the local name (fragment) of the rdf:Class URI.

    Args:
        resource_identifier: Identifier to match against

    Returns:
        True when *resource_identifier* equals the class local name
    """
    local_name = self.rdf_class.rsplit("#", 1)[-1].rsplit("/", 1)[-1]
    return resource_identifier == local_name

StagingProxyBinding

Bases: ConfigBaseModel

Named staging profile -> runtime connection-proxy (e.g. S3 credentials).

Used by TigerGraph bulk ingest to resolve S3GeneralizedConnConfig without putting secrets in the manifest.

Source code in graflo/architecture/contract/bindings/core.py
class StagingProxyBinding(ConfigBaseModel):
    """Named staging profile -> runtime connection-proxy (e.g. S3 credentials).

    Used by TigerGraph bulk ingest to resolve ``S3GeneralizedConnConfig`` without
    putting secrets in the manifest.
    """

    name: str
    conn_proxy: str

TableConnector

Bases: ResourceConnector

Connector for matching database tables.

Supports simple single-table queries as well as multi-table JOINs and pushdown filters via FilterExpression.

Attributes:

Name Type Description
table_name str

Exact table name or regex pattern

schema_name str | None

Schema name (optional, defaults to public)

database str | None

Database name (optional)

time_filter ColumnTimeFilter | None

Optional structured filter on a date/time column, rendered via :class:~graflo.filter.onto.FilterExpression in SQL.

filters list[Any]

General-purpose pushdown filters rendered as SQL WHERE fragments.

joins list[JoinClause]

Multi-table JOIN specifications (auto-generated or explicit).

base_alias str

SQL alias for the base table when joins is non-empty.

select_columns list[str] | None

Explicit SELECT column list. None means * for the base table (plus aliased columns from joins).

Source code in graflo/architecture/contract/bindings/connectors.py
class TableConnector(ResourceConnector):
    """Connector for matching database tables.

    Supports simple single-table queries as well as multi-table JOINs and
    pushdown filters via ``FilterExpression``.

    Attributes:
        table_name: Exact table name or regex pattern
        schema_name: Schema name (optional, defaults to public)
        database: Database name (optional)
        time_filter: Optional structured filter on a date/time column, rendered
            via :class:`~graflo.filter.onto.FilterExpression` in SQL.
        filters: General-purpose pushdown filters rendered as SQL WHERE fragments.
        joins: Multi-table JOIN specifications (auto-generated or explicit).
        base_alias: SQL alias for the base table when ``joins`` is non-empty.
        select_columns: Explicit SELECT column list. None means ``*`` for the
            base table (plus aliased columns from joins).
    """

    table_name: str = Field(
        default="", validation_alias=AliasChoices("table_name", "table")
    )
    schema_name: str | None = Field(
        default=None, validation_alias=AliasChoices("schema_name", "schema")
    )
    database: str | None = None
    time_filter: ColumnTimeFilter | None = None
    filters: list[Any] = Field(
        default_factory=list,
        description="Pushdown FilterExpression filters (rendered to SQL WHERE).",
    )
    joins: list[JoinClause] = Field(
        default_factory=list,
        description="JOIN clauses for multi-table queries.",
    )
    base_alias: str = Field(
        default="base",
        description="SQL alias for the base table row when joins are present.",
    )
    select_columns: list[str] | None = Field(
        default=None,
        description="Explicit SELECT columns. None = SELECT * (plus join aliases).",
    )
    view: Any = Field(
        default=None,
        description="SelectSpec or dict for declarative view (alternative to table+joins+filters).",
    )

    @field_validator("filters", mode="before")
    @classmethod
    def _coerce_filters(cls, v: Any) -> list[Any]:
        from graflo.filter.onto import parse_filter_expression

        if v is None:
            return []
        if not isinstance(v, list):
            raise ValueError("filters must be a list")
        result: list[Any] = []
        for i, item in enumerate(v):
            try:
                result.append(parse_filter_expression(item))
            except (ValueError, ValidationError) as e:
                raise ValueError(f"filters[{i}]: {e}") from e
        return result

    @field_validator("view", mode="before")
    @classmethod
    def _coerce_view(cls, v: Any) -> Any:
        if v is None:
            return None
        if isinstance(v, dict):
            from graflo.filter.select import SelectSpec

            return SelectSpec.from_dict(v)
        return v

    @model_validator(mode="after")
    def _validate_table_connector(self) -> Self:
        """Validate table_name and join wiring."""
        if not self.table_name:
            raise ValueError("table_name is required for TableConnector")
        if not _BASE_TABLE_ALIAS_IDENT.match(self.base_alias):
            raise ValueError(
                "base_alias must be a simple SQL identifier "
                "(ASCII letter, digit, underscore)"
            )
        join_aliases = {jc.alias or jc.table for jc in self.joins}
        if self.base_alias in join_aliases:
            raise ValueError(
                f"base_alias {self.base_alias!r} conflicts with a join alias "
                f"{sorted(join_aliases)}"
            )
        if self.row_annotations:
            raise ValueError("row_annotations is not implemented for TableConnector")
        return self

    @property
    def date_field(self) -> str | None:
        """Column used for time filtering, if any (compat alias for ``time_filter.column``)."""
        return self.time_filter.column if self.time_filter else None

    def matches(self, resource_identifier: str) -> bool:
        """Check if connector matches a table name.

        Args:
            resource_identifier: Table name to match (format: schema.table or just table)

        Returns:
            bool: True if connector matches
        """
        if not self.table_name:
            return False

        # Compile regex expression
        if self.table_name.startswith("^") or self.table_name.endswith("$"):
            # Already a regex expression
            compiled_regex = re.compile(self.table_name)
        else:
            # Exact match expression
            compiled_regex = re.compile(f"^{re.escape(self.table_name)}$")

        # Check if resource_identifier matches
        if compiled_regex.match(resource_identifier):
            return True

        # If schema_name is specified, also check schema.table format
        if self.schema_name:
            full_name = f"{self.schema_name}.{resource_identifier}"
            if compiled_regex.match(full_name):
                return True

        return False

    def bound_source_kind(self) -> BoundSourceKind:
        return BoundSourceKind.SQL_TABLE

    def build_where_clause(self, base_alias: str | None = None) -> str:
        """Build SQL WHERE clause from time filter **and** general filters.

        Returns:
            WHERE clause string (without the WHERE keyword) or empty string if no filters
        """
        from graflo.onto import ExpressionFlavor

        conditions: list[str] = []

        if self.time_filter is not None:
            expr = self.time_filter.as_filter_expression()
            if expr is not None:
                filt_expr = self._coerce_filter_expression(expr, base_alias)
                if filt_expr is not None:
                    rendered = filt_expr(kind=ExpressionFlavor.SQL)
                    if rendered:
                        conditions.append(str(rendered))

        # General-purpose FilterExpression filters
        for filt in self.filters:
            filt_expr = self._coerce_filter_expression(filt, base_alias)
            if filt_expr is not None:
                rendered = filt_expr(kind=ExpressionFlavor.SQL)
                if rendered:
                    conditions.append(str(rendered))

        if conditions:
            return " AND ".join(conditions)
        return ""

    def build_query(self, effective_schema: str | None = None) -> str:
        """Build a complete SQL SELECT query.

        When ``view`` is set, delegates to ``view.build_sql()``. Otherwise
        incorporates the base table, any JoinClauses, explicit select_columns,
        time_filter, and FilterExpression filters.

        Args:
            effective_schema: Schema to use if ``self.schema_name`` is None.

        Returns:
            Complete SQL query string.
        """
        schema = self.schema_name or effective_schema or "public"
        if self.view is not None:
            from graflo.filter.select import SelectSpec

            if isinstance(self.view, SelectSpec):
                query = self.view.build_sql(schema=schema, base_table=self.table_name)
                where = self.build_where_clause(base_alias=self.view.base_alias)
                if where:
                    return self._append_where_condition(query, where)
                return query
        base_alias = self.base_alias if self.joins else None
        base_ref = f'"{schema}"."{self.table_name}"'
        if base_alias:
            base_ref_aliased = f"{base_ref} {base_alias}"
        else:
            base_ref_aliased = base_ref

        # --- SELECT ---
        select_parts: list[str] = []
        if self.select_columns is not None:
            select_parts = list(self.select_columns)
        elif self.joins:
            select_parts.append(f"{base_alias}.*")
            for jc in self.joins:
                alias = jc.alias or jc.table
                jc_schema = jc.schema_name or schema
                if jc.select_fields is not None:
                    for col in jc.select_fields:
                        select_parts.append(f'{alias}."{col}" AS "{alias}__{col}"')
                else:
                    select_parts.append(f"{alias}.*")
        else:
            select_parts.append("*")

        select_clause = ", ".join(select_parts)

        # --- FROM + JOINs ---
        from_clause = base_ref_aliased
        for jc in self.joins:
            jc_schema = jc.schema_name or schema
            alias = jc.alias or jc.table
            join_ref = f'"{jc_schema}"."{jc.table}"'
            left_col = (
                f'{base_alias}."{jc.on_self}"' if base_alias else f'"{jc.on_self}"'
            )
            right_col = f'{alias}."{jc.on_other}"'
            from_clause += (
                f" {jc.join_type} JOIN {join_ref} {alias} ON {left_col} = {right_col}"
            )

        query = f"SELECT {select_clause} FROM {from_clause}"

        # --- WHERE ---
        where = self.build_where_clause(base_alias=base_alias)
        if where:
            query += f" WHERE {where}"

        return query

    @staticmethod
    def _append_where_condition(query: str, condition: str) -> str:
        """Append a SQL condition to *query* preserving an existing WHERE clause."""
        if re.search(r"\bWHERE\b", query, flags=re.IGNORECASE):
            return f"{query} AND {condition}"
        return f"{query} WHERE {condition}"

    @staticmethod
    def _qualified_column_ref(column: str, base_alias: str | None) -> str:
        if base_alias:
            return f'{base_alias}."{column}"'
        return f'"{column}"'

    @classmethod
    def _qualify_filter_payload(
        cls, payload: dict[str, Any], base_alias: str | None
    ) -> dict[str, Any]:
        qualified = dict(payload)
        if base_alias is None:
            return qualified
        if qualified.get("kind") == "leaf":
            field = qualified.get("field")
            if isinstance(field, str) and "." not in field:
                qualified["field"] = f"{base_alias}.{field}"
            return qualified
        deps = qualified.get("deps")
        if isinstance(deps, list):
            qualified["deps"] = [
                cls._qualify_filter_payload(dep, base_alias)
                if isinstance(dep, dict)
                else dep
                for dep in deps
            ]
        return qualified

    @classmethod
    def _coerce_filter_expression(
        cls, raw_filter: Any, base_alias: str | None
    ) -> FilterExpression | None:
        from graflo.filter.onto import parse_filter_expression

        if raw_filter is None:
            return None
        expr = parse_filter_expression(raw_filter)
        if base_alias is None:
            return expr
        payload = expr.model_dump(mode="python")
        return parse_filter_expression(cls._qualify_filter_payload(payload, base_alias))

date_field property

Column used for time filtering, if any (compat alias for time_filter.column).

build_query(effective_schema=None)

Build a complete SQL SELECT query.

When view is set, delegates to view.build_sql(). Otherwise incorporates the base table, any JoinClauses, explicit select_columns, time_filter, and FilterExpression filters.

Parameters:

Name Type Description Default
effective_schema str | None

Schema to use if self.schema_name is None.

None

Returns:

Type Description
str

Complete SQL query string.

Source code in graflo/architecture/contract/bindings/connectors.py
def build_query(self, effective_schema: str | None = None) -> str:
    """Build a complete SQL SELECT query.

    When ``view`` is set, delegates to ``view.build_sql()``. Otherwise
    incorporates the base table, any JoinClauses, explicit select_columns,
    time_filter, and FilterExpression filters.

    Args:
        effective_schema: Schema to use if ``self.schema_name`` is None.

    Returns:
        Complete SQL query string.
    """
    schema = self.schema_name or effective_schema or "public"
    if self.view is not None:
        from graflo.filter.select import SelectSpec

        if isinstance(self.view, SelectSpec):
            query = self.view.build_sql(schema=schema, base_table=self.table_name)
            where = self.build_where_clause(base_alias=self.view.base_alias)
            if where:
                return self._append_where_condition(query, where)
            return query
    base_alias = self.base_alias if self.joins else None
    base_ref = f'"{schema}"."{self.table_name}"'
    if base_alias:
        base_ref_aliased = f"{base_ref} {base_alias}"
    else:
        base_ref_aliased = base_ref

    # --- SELECT ---
    select_parts: list[str] = []
    if self.select_columns is not None:
        select_parts = list(self.select_columns)
    elif self.joins:
        select_parts.append(f"{base_alias}.*")
        for jc in self.joins:
            alias = jc.alias or jc.table
            jc_schema = jc.schema_name or schema
            if jc.select_fields is not None:
                for col in jc.select_fields:
                    select_parts.append(f'{alias}."{col}" AS "{alias}__{col}"')
            else:
                select_parts.append(f"{alias}.*")
    else:
        select_parts.append("*")

    select_clause = ", ".join(select_parts)

    # --- FROM + JOINs ---
    from_clause = base_ref_aliased
    for jc in self.joins:
        jc_schema = jc.schema_name or schema
        alias = jc.alias or jc.table
        join_ref = f'"{jc_schema}"."{jc.table}"'
        left_col = (
            f'{base_alias}."{jc.on_self}"' if base_alias else f'"{jc.on_self}"'
        )
        right_col = f'{alias}."{jc.on_other}"'
        from_clause += (
            f" {jc.join_type} JOIN {join_ref} {alias} ON {left_col} = {right_col}"
        )

    query = f"SELECT {select_clause} FROM {from_clause}"

    # --- WHERE ---
    where = self.build_where_clause(base_alias=base_alias)
    if where:
        query += f" WHERE {where}"

    return query

build_where_clause(base_alias=None)

Build SQL WHERE clause from time filter and general filters.

Returns:

Type Description
str

WHERE clause string (without the WHERE keyword) or empty string if no filters

Source code in graflo/architecture/contract/bindings/connectors.py
def build_where_clause(self, base_alias: str | None = None) -> str:
    """Build SQL WHERE clause from time filter **and** general filters.

    Returns:
        WHERE clause string (without the WHERE keyword) or empty string if no filters
    """
    from graflo.onto import ExpressionFlavor

    conditions: list[str] = []

    if self.time_filter is not None:
        expr = self.time_filter.as_filter_expression()
        if expr is not None:
            filt_expr = self._coerce_filter_expression(expr, base_alias)
            if filt_expr is not None:
                rendered = filt_expr(kind=ExpressionFlavor.SQL)
                if rendered:
                    conditions.append(str(rendered))

    # General-purpose FilterExpression filters
    for filt in self.filters:
        filt_expr = self._coerce_filter_expression(filt, base_alias)
        if filt_expr is not None:
            rendered = filt_expr(kind=ExpressionFlavor.SQL)
            if rendered:
                conditions.append(str(rendered))

    if conditions:
        return " AND ".join(conditions)
    return ""

matches(resource_identifier)

Check if connector matches a table name.

Parameters:

Name Type Description Default
resource_identifier str

Table name to match (format: schema.table or just table)

required

Returns:

Name Type Description
bool bool

True if connector matches

Source code in graflo/architecture/contract/bindings/connectors.py
def matches(self, resource_identifier: str) -> bool:
    """Check if connector matches a table name.

    Args:
        resource_identifier: Table name to match (format: schema.table or just table)

    Returns:
        bool: True if connector matches
    """
    if not self.table_name:
        return False

    # Compile regex expression
    if self.table_name.startswith("^") or self.table_name.endswith("$"):
        # Already a regex expression
        compiled_regex = re.compile(self.table_name)
    else:
        # Exact match expression
        compiled_regex = re.compile(f"^{re.escape(self.table_name)}$")

    # Check if resource_identifier matches
    if compiled_regex.match(resource_identifier):
        return True

    # If schema_name is specified, also check schema.table format
    if self.schema_name:
        full_name = f"{self.schema_name}.{resource_identifier}"
        if compiled_regex.match(full_name):
            return True

    return False