Skip to content

graflo.architecture.contract.bindings.core

Named connectors and resource-to-connector wiring.

BindingsConfig

Bases: ConfigBaseModel

Declarative bindings contract (connectors and resource wiring).

Source code in graflo/architecture/contract/bindings/core.py
class BindingsConfig(ConfigBaseModel):
    """Declarative bindings contract (connectors and resource wiring)."""

    connector_templates: list[ConnectorTemplate] = Field(
        default_factory=list,
        description=(
            "Named connector defaults for ``base:`` expansion on connector entries."
        ),
    )
    conn_proxy: str | None = Field(
        default=None,
        description=(
            "Default runtime connection proxy for connectors without an explicit "
            "``connector_connection`` entry."
        ),
    )
    connectors: list[AnyConnector] = Field(default_factory=list)
    # Accept dict entries at init-time (see validators below).
    # Internally and at runtime, Graflo uses typed lists derived from these.
    resource_connector: list[ResourceConnectorBinding | dict[str, str]] = Field(
        default_factory=list
    )
    # Connector -> runtime endpoint config indirection (proxy by name).
    connector_connection: list[ConnectorConnectionBinding | dict[str, str]] = Field(
        default_factory=list
    )
    _resource_connector_typed: list[ResourceConnectorBinding] = PrivateAttr(
        default_factory=list
    )
    _connector_connection_typed: list[ConnectorConnectionBinding] = PrivateAttr(
        default_factory=list
    )
    _connectors_index: dict[str, ResourceConnector] = PrivateAttr(default_factory=dict)
    _connectors_name_index: dict[str, str] = PrivateAttr(default_factory=dict)
    _resource_to_connector_hashes: dict[str, list[str]] = PrivateAttr(
        default_factory=dict
    )
    _connector_to_conn_proxy: dict[str, str] = PrivateAttr(default_factory=dict)
    staging_proxy: list[StagingProxyBinding | dict[str, str]] = Field(
        default_factory=list,
        description="Optional named staging endpoints (S3) -> conn_proxy wiring.",
    )
    _staging_proxy_typed: list[StagingProxyBinding] = PrivateAttr(default_factory=list)
    _staging_name_to_conn_proxy: dict[str, str] = PrivateAttr(default_factory=dict)

    @property
    def connector_connection_bindings(
        self,
    ) -> list[ConnectorConnectionBinding]:
        # Expose typed entries for downstream components (type-checker friendly).
        return self._connector_connection_typed

    @model_validator(mode="before")
    @classmethod
    def _expand_connector_templates(cls, data: Any) -> Any:
        if not isinstance(data, dict):
            return data
        return _expand_connectors_from_templates(dict(data))

    @field_validator("connector_templates", mode="before")
    @classmethod
    def _coerce_connector_templates(cls, v: Any) -> list[ConnectorTemplate]:
        if v is None:
            return []
        if not isinstance(v, list):
            raise ValueError("connector_templates must be a list")
        coerced: list[ConnectorTemplate] = []
        for i, item in enumerate(v):
            if isinstance(item, ConnectorTemplate):
                coerced.append(item)
                continue
            if isinstance(item, dict):
                try:
                    coerced.append(ConnectorTemplate.model_validate(item))
                except Exception as e:  # noqa: BLE001
                    raise ValueError(
                        f"Invalid connector_templates entry at index {i}: {item!r}."
                    ) from e
                continue
            raise ValueError(
                f"Invalid connector_templates entry at index {i}: expected dict or "
                f"ConnectorTemplate, got {type(item).__name__}."
            )
        return coerced

    @field_validator("staging_proxy", mode="before")
    @classmethod
    def _coerce_staging_proxy_entries(
        cls, v: Any
    ) -> list[StagingProxyBinding | dict[str, str]]:
        if v is None:
            return []
        if not isinstance(v, list):
            raise ValueError(
                "staging_proxy must be a list of {name, conn_proxy} entries"
            )
        coerced: list[StagingProxyBinding | dict[str, str]] = []
        for i, item in enumerate(v):
            if isinstance(item, StagingProxyBinding):
                coerced.append(item)
                continue
            if isinstance(item, dict):
                missing = [k for k in ("name", "conn_proxy") if k not in item]
                if missing:
                    raise ValueError(
                        f"Invalid staging_proxy entry at index {i}: missing {missing}."
                    )
                coerced.append(StagingProxyBinding.model_validate(item))
                continue
            raise ValueError(
                f"Invalid staging_proxy entry at index {i}: got {type(item).__name__}."
            )
        return coerced

    def _rebuild_staging_proxy_index(self) -> None:
        self._staging_name_to_conn_proxy = {}
        for m in self._staging_proxy_typed:
            existing = self._staging_name_to_conn_proxy.get(m.name)
            if existing is not None and existing != m.conn_proxy:
                raise ValueError(
                    f"Duplicate staging_proxy name '{m.name}' with conflicting conn_proxy."
                )
            self._staging_name_to_conn_proxy[m.name] = m.conn_proxy

    def get_staging_conn_proxy(self, name: str) -> str | None:
        """Return ``conn_proxy`` for a staging profile name, if declared."""
        return self._staging_name_to_conn_proxy.get(name)

    def _rebuild_indexes(self) -> None:
        self._connectors_index = {}
        self._connectors_name_index = {}
        for connector in self.connectors:
            existing = self._connectors_index.get(connector.hash)
            if existing is not None:
                raise ValueError(
                    "Connector hash collision detected for connectors "
                    f"'{type(existing).__name__}' and '{type(connector).__name__}' "
                    f"(hash='{connector.hash}')."
                )
            self._connectors_index[connector.hash] = connector

            if connector.name:
                existing_hash = self._connectors_name_index.get(connector.name)
                if existing_hash is not None and existing_hash != connector.hash:
                    raise ValueError(
                        "Connector names must be unique when provided. "
                        f"Duplicate connector name '{connector.name}'."
                    )
                self._connectors_name_index[connector.name] = connector.hash

    def _append_resource_connector_hash(
        self, resource_name: str, connector_hash: str
    ) -> None:
        """Append *connector_hash* for *resource_name* if not already present (order kept)."""
        bucket = self._resource_to_connector_hashes.setdefault(resource_name, [])
        if connector_hash not in bucket:
            bucket.append(connector_hash)

    @field_validator("resource_connector", mode="before")
    @classmethod
    def _coerce_resource_connector_entries(
        cls, v: Any
    ) -> list[ResourceConnectorBinding]:
        if v is None:
            return []
        if not isinstance(v, list):
            raise ValueError(
                "resource_connector must be a list of {resource, connector} entries"
            )

        coerced: list[ResourceConnectorBinding] = []
        for i, item in enumerate(v):
            if isinstance(item, ResourceConnectorBinding):
                coerced.append(item)
                continue

            if isinstance(item, dict):
                missing = [k for k in ("resource", "connector") if k not in item]
                if missing:
                    raise ValueError(
                        f"Invalid resource_connector entry at index {i}: missing required keys {missing}. "
                        "Expected keys: ['resource', 'connector']."
                    )

                try:
                    coerced.append(ResourceConnectorBinding.model_validate(item))
                except Exception as e:  # noqa: BLE001
                    # Keep the message concise and contextual; nested pydantic
                    # errors can be noisy for config authors.
                    raise ValueError(
                        f"Invalid resource_connector entry at index {i}: {item!r}."
                    ) from e
                continue

            raise ValueError(
                f"Invalid resource_connector entry at index {i}: expected dict or "
                f"ResourceConnectorBinding, got {type(item).__name__}."
            )

        return coerced

    @field_validator("connector_connection", mode="before")
    @classmethod
    def _coerce_connector_connection_entries(
        cls, v: Any
    ) -> list[ConnectorConnectionBinding]:
        if v is None:
            return []
        if not isinstance(v, list):
            raise ValueError(
                "connector_connection must be a list of {connector, conn_proxy} entries"
            )

        coerced: list[ConnectorConnectionBinding] = []
        for i, item in enumerate(v):
            if isinstance(item, ConnectorConnectionBinding):
                coerced.append(item)
                continue

            if isinstance(item, dict):
                missing = [k for k in ("connector", "conn_proxy") if k not in item]
                if missing:
                    raise ValueError(
                        f"Invalid connector_connection entry at index {i}: missing required keys {missing}. "
                        "Expected keys: ['connector', 'conn_proxy']."
                    )
                try:
                    coerced.append(ConnectorConnectionBinding.model_validate(item))
                except Exception as e:  # noqa: BLE001
                    raise ValueError(
                        f"Invalid connector_connection entry at index {i}: {item!r}."
                    ) from e
                continue

            raise ValueError(
                f"Invalid connector_connection entry at index {i}: expected dict or "
                f"ConnectorConnectionBinding, got {type(item).__name__}."
            )

        return coerced

    @staticmethod
    def default_connector_name(connector: ResourceConnector) -> str:
        if connector.name:
            return connector.name
        if isinstance(connector, FileConnector):
            return connector.regex or str(connector.sub_path)
        if isinstance(connector, TableConnector):
            return connector.table_name
        if isinstance(connector, SparqlConnector):
            return connector.rdf_class
        if isinstance(connector, APIConnector):
            return connector.path
        raise TypeError(f"Unsupported connector type: {type(connector)!r}")

    @model_validator(mode="after")
    def _populate_resource_connector(self) -> Self:
        self._rebuild_indexes()
        self._resource_to_connector_hashes = {}

        # Create typed views so internal code never has to handle dicts.
        self._resource_connector_typed = [
            ResourceConnectorBinding.model_validate(m) if isinstance(m, dict) else m
            for m in self.resource_connector
        ]
        self._connector_connection_typed = [
            ConnectorConnectionBinding.model_validate(m) if isinstance(m, dict) else m
            for m in self.connector_connection
        ]
        self._staging_proxy_typed = [
            StagingProxyBinding.model_validate(m) if isinstance(m, dict) else m
            for m in self.staging_proxy
        ]
        self._rebuild_staging_proxy_index()

        for connector in self.connectors:
            if connector.resource_name is None:
                continue
            self._append_resource_connector_hash(
                connector.resource_name, connector.hash
            )

        for mapping in self._resource_connector_typed:
            connector_hash = self._connectors_name_index.get(mapping.connector)
            if connector_hash is None:
                if mapping.connector in self._connectors_index:
                    connector_hash = mapping.connector
                else:
                    raise ValueError(
                        f"resource_connector references unknown connector '{mapping.connector}' "
                        f"for resource '{mapping.resource}'."
                    )
            self._append_resource_connector_hash(mapping.resource, connector_hash)
        self._rebuild_connector_to_conn_proxy()
        return self

    def _resolve_connector_ref_to_hash(self, connector_ref: str) -> str:
        """Resolve a connector reference to its canonical connector hash.

        Allowed references:
        - ``connector.hash`` (canonical internal id), or
        - ``connector.name`` (when a name is provided / auto-filled).

        Ingestion resource names are not valid connector references (a resource
        may map to multiple connectors).
        """
        if connector_ref in self._connectors_index:
            return connector_ref
        resolved_hash = self._connectors_name_index.get(connector_ref)
        if resolved_hash is None:
            raise ValueError(f"Unknown connector reference '{connector_ref}'")
        return resolved_hash

    def _rebuild_connector_to_conn_proxy(self) -> None:
        self._connector_to_conn_proxy = {}
        for mapping in self._connector_connection_typed:
            connector_hash = self._resolve_connector_ref_to_hash(mapping.connector)
            existing = self._connector_to_conn_proxy.get(connector_hash)
            if existing is not None and existing != mapping.conn_proxy:
                raise ValueError(
                    "Conflicting conn_proxy mapping for connector "
                    f"'{connector_hash}' (existing='{existing}', new='{mapping.conn_proxy}')."
                )
            self._connector_to_conn_proxy[connector_hash] = mapping.conn_proxy

        if self.conn_proxy is not None:
            for connector in self.connectors:
                if connector.hash not in self._connector_to_conn_proxy:
                    self._connector_to_conn_proxy[connector.hash] = self.conn_proxy

    def get_conn_proxy_for_connector(self, connector: AnyConnector) -> str | None:
        """Return the mapped runtime proxy name for a given connector."""
        return self._connector_to_conn_proxy.get(connector.hash)

    def resolve_connector_refs_to_hashes(self, connector_refs: list[str]) -> set[str]:
        """Resolve connector names or hashes to canonical connector hashes."""
        return {self._resolve_connector_ref_to_hash(ref) for ref in connector_refs}

    def get_connectors_for_resource(self, resource_name: str) -> list[AnyConnector]:
        """Return connectors bound to *resource_name*, in binding order (unique by hash)."""
        result: list[AnyConnector] = []
        for h in self._resource_to_connector_hashes.get(resource_name, []):
            c = self._connectors_index.get(h)
            if isinstance(
                c, (TableConnector, FileConnector, SparqlConnector, APIConnector)
            ):
                result.append(c)
        return result

    @classmethod
    def from_dict(cls, data: dict[str, Any] | list[Any]) -> Self:
        if isinstance(data, list):
            raise ValueError(
                "Bindings.from_dict expects a mapping with 'connectors' and optional "
                "'resource_connector'. List-style connector payloads are not supported."
            )
        legacy_keys = {
            "postgres_connections",
            "table_connectors",
            "file_connectors",
            "sparql_connectors",
        }
        found_legacy = sorted(k for k in legacy_keys if k in data)
        if found_legacy:
            raise ValueError(
                "Legacy Bindings init keys are not supported. "
                f"Unsupported keys: {', '.join(found_legacy)}."
            )
        return cls.model_validate(data)

get_conn_proxy_for_connector(connector)

Return the mapped runtime proxy name for a given connector.

Source code in graflo/architecture/contract/bindings/core.py
def get_conn_proxy_for_connector(self, connector: AnyConnector) -> str | None:
    """Return the mapped runtime proxy name for a given connector."""
    return self._connector_to_conn_proxy.get(connector.hash)

get_connectors_for_resource(resource_name)

Return connectors bound to resource_name, in binding order (unique by hash).

Source code in graflo/architecture/contract/bindings/core.py
def get_connectors_for_resource(self, resource_name: str) -> list[AnyConnector]:
    """Return connectors bound to *resource_name*, in binding order (unique by hash)."""
    result: list[AnyConnector] = []
    for h in self._resource_to_connector_hashes.get(resource_name, []):
        c = self._connectors_index.get(h)
        if isinstance(
            c, (TableConnector, FileConnector, SparqlConnector, APIConnector)
        ):
            result.append(c)
    return result

get_staging_conn_proxy(name)

Return conn_proxy for a staging profile name, if declared.

Source code in graflo/architecture/contract/bindings/core.py
def get_staging_conn_proxy(self, name: str) -> str | None:
    """Return ``conn_proxy`` for a staging profile name, if declared."""
    return self._staging_name_to_conn_proxy.get(name)

resolve_connector_refs_to_hashes(connector_refs)

Resolve connector names or hashes to canonical connector hashes.

Source code in graflo/architecture/contract/bindings/core.py
def resolve_connector_refs_to_hashes(self, connector_refs: list[str]) -> set[str]:
    """Resolve connector names or hashes to canonical connector hashes."""
    return {self._resolve_connector_ref_to_hash(ref) for ref in connector_refs}

BindingsRegistry

Bases: BindingsConfig

Mutable bindings registry for programmatic connector updates.

Source code in graflo/architecture/contract/bindings/core.py
class BindingsRegistry(BindingsConfig):
    """Mutable bindings registry for programmatic connector updates."""

    def bind_connector_to_conn_proxy(
        self,
        connector: AnyConnector,
        conn_proxy: str,
    ) -> None:
        """Bind a connector to a non-secret runtime proxy name.

        Uses ``connector.name`` when available, falling back to ``connector.hash``.
        """
        # Ensure indexes include the connector and that a default name is set.
        if connector.hash not in self._connectors_index:
            self.add_connector(connector)
        # Pick a contract reference string that's stable and user-friendly.
        connector_ref = connector.name or connector.hash

        # Ensure uniqueness by connector.hash (not by ref-string).
        connector_hash = connector.hash
        existing_idx: int | None = None
        for i, m in enumerate(self._connector_connection_typed):
            try:
                if self._resolve_connector_ref_to_hash(m.connector) == connector_hash:
                    existing_idx = i
                    break
            except ValueError:
                continue

        if existing_idx is None:
            self._connector_connection_typed.append(
                ConnectorConnectionBinding(
                    connector=connector_ref, conn_proxy=conn_proxy
                )
            )
        else:
            self._connector_connection_typed[existing_idx] = ConnectorConnectionBinding(
                connector=connector_ref, conn_proxy=conn_proxy
            )
        # Keep the public contract field in sync for serialization / downstream.
        self.connector_connection = list(self._connector_connection_typed)

        self._rebuild_connector_to_conn_proxy()

    def apply_connector_update(self, update: ConnectorUpdate) -> None:
        """Patch a connector in-place in this binding (re-hashes and reindexes).

        Uses ``model_validate`` on merged data so connector validators (including
        hash recomputation) run; ``model_copy(update=...)`` would not re-run them.
        """
        connector_hash = self._resolve_connector_ref_to_hash(update.connector)
        old = self._connectors_index[connector_hash]
        patch = update.as_patch()
        if not patch:
            return
        merged = old.model_dump(mode="python")
        merged.update(patch)
        new = cast(
            AnyConnector,
            old.__class__.model_validate(merged),
        )
        self.replace_connector(old, new)

    def replace_connector(
        self,
        old: ResourceConnector | str,
        new: AnyConnector,
    ) -> None:
        """Swap *old* for *new*, preserving resource wiring and conn_proxy by hash."""
        old_hash = (
            old.hash
            if isinstance(old, ResourceConnector)
            else self._resolve_connector_ref_to_hash(old)
        )
        if old_hash not in self._connectors_index:
            raise KeyError(f"Connector not found for hash={old_hash!r}")

        old_connector = self._connectors_index[old_hash]
        if new.name is None and old_connector.name is not None:
            object.__setattr__(new, "name", old_connector.name)

        replaced = False
        for idx, c in enumerate(self.connectors):
            if c.hash == old_hash:
                self.connectors[idx] = new
                replaced = True
                break
        if not replaced:
            raise KeyError(
                f"Connector with hash={old_hash!r} not found in bindings.connectors list"
            )

        new_hash = new.hash
        for hashes in self._resource_to_connector_hashes.values():
            for i, h in enumerate(hashes):
                if h == old_hash:
                    hashes[i] = new_hash

        old_proxy = self._connector_to_conn_proxy.pop(old_hash, None)
        self._rebuild_indexes()
        if old_proxy is not None:
            self._connector_to_conn_proxy[new_hash] = old_proxy

    def add_connector(
        self,
        connector: AnyConnector,
    ) -> None:
        if connector.name is None:
            object.__setattr__(
                connector, "name", self.default_connector_name(connector)
            )
        existing_name_hash = None
        if connector.name:
            existing_name_hash = self._connectors_name_index.get(connector.name)
        if (
            connector.name
            and existing_name_hash is not None
            and existing_name_hash != connector.hash
        ):
            raise ValueError(
                "Connector names must be unique when provided. "
                f"Duplicate connector name '{connector.name}'."
            )

        if connector.hash in self._connectors_index:
            old_connector = self._connectors_index[connector.hash]
            for idx, existing in enumerate(self.connectors):
                if existing is old_connector:
                    self.connectors[idx] = connector
                    break
        else:
            self.connectors.append(connector)
        self._rebuild_indexes()
        if connector.resource_name is not None:
            self._append_resource_connector_hash(
                connector.resource_name, connector.hash
            )

    def bind_resource(
        self,
        resource_name: str,
        connector: AnyConnector,
    ) -> None:
        if connector.hash not in self._connectors_index:
            raise KeyError(f"Connector not found for hash='{connector.hash}'")
        self._append_resource_connector_hash(resource_name, connector.hash)
        connector_name = connector.name or self.default_connector_name(connector)
        self._resource_connector_typed.append(
            ResourceConnectorBinding(
                resource=resource_name,
                connector=connector_name,
            )
        )
        # Keep the public contract field in sync for serialization / downstream.
        self.resource_connector = list(self._resource_connector_typed)

apply_connector_update(update)

Patch a connector in-place in this binding (re-hashes and reindexes).

Uses model_validate on merged data so connector validators (including hash recomputation) run; model_copy(update=...) would not re-run them.

Source code in graflo/architecture/contract/bindings/core.py
def apply_connector_update(self, update: ConnectorUpdate) -> None:
    """Patch a connector in-place in this binding (re-hashes and reindexes).

    Uses ``model_validate`` on merged data so connector validators (including
    hash recomputation) run; ``model_copy(update=...)`` would not re-run them.
    """
    connector_hash = self._resolve_connector_ref_to_hash(update.connector)
    old = self._connectors_index[connector_hash]
    patch = update.as_patch()
    if not patch:
        return
    merged = old.model_dump(mode="python")
    merged.update(patch)
    new = cast(
        AnyConnector,
        old.__class__.model_validate(merged),
    )
    self.replace_connector(old, new)

bind_connector_to_conn_proxy(connector, conn_proxy)

Bind a connector to a non-secret runtime proxy name.

Uses connector.name when available, falling back to connector.hash.

Source code in graflo/architecture/contract/bindings/core.py
def bind_connector_to_conn_proxy(
    self,
    connector: AnyConnector,
    conn_proxy: str,
) -> None:
    """Bind a connector to a non-secret runtime proxy name.

    Uses ``connector.name`` when available, falling back to ``connector.hash``.
    """
    # Ensure indexes include the connector and that a default name is set.
    if connector.hash not in self._connectors_index:
        self.add_connector(connector)
    # Pick a contract reference string that's stable and user-friendly.
    connector_ref = connector.name or connector.hash

    # Ensure uniqueness by connector.hash (not by ref-string).
    connector_hash = connector.hash
    existing_idx: int | None = None
    for i, m in enumerate(self._connector_connection_typed):
        try:
            if self._resolve_connector_ref_to_hash(m.connector) == connector_hash:
                existing_idx = i
                break
        except ValueError:
            continue

    if existing_idx is None:
        self._connector_connection_typed.append(
            ConnectorConnectionBinding(
                connector=connector_ref, conn_proxy=conn_proxy
            )
        )
    else:
        self._connector_connection_typed[existing_idx] = ConnectorConnectionBinding(
            connector=connector_ref, conn_proxy=conn_proxy
        )
    # Keep the public contract field in sync for serialization / downstream.
    self.connector_connection = list(self._connector_connection_typed)

    self._rebuild_connector_to_conn_proxy()

replace_connector(old, new)

Swap old for new, preserving resource wiring and conn_proxy by hash.

Source code in graflo/architecture/contract/bindings/core.py
def replace_connector(
    self,
    old: ResourceConnector | str,
    new: AnyConnector,
) -> None:
    """Swap *old* for *new*, preserving resource wiring and conn_proxy by hash."""
    old_hash = (
        old.hash
        if isinstance(old, ResourceConnector)
        else self._resolve_connector_ref_to_hash(old)
    )
    if old_hash not in self._connectors_index:
        raise KeyError(f"Connector not found for hash={old_hash!r}")

    old_connector = self._connectors_index[old_hash]
    if new.name is None and old_connector.name is not None:
        object.__setattr__(new, "name", old_connector.name)

    replaced = False
    for idx, c in enumerate(self.connectors):
        if c.hash == old_hash:
            self.connectors[idx] = new
            replaced = True
            break
    if not replaced:
        raise KeyError(
            f"Connector with hash={old_hash!r} not found in bindings.connectors list"
        )

    new_hash = new.hash
    for hashes in self._resource_to_connector_hashes.values():
        for i, h in enumerate(hashes):
            if h == old_hash:
                hashes[i] = new_hash

    old_proxy = self._connector_to_conn_proxy.pop(old_hash, None)
    self._rebuild_indexes()
    if old_proxy is not None:
        self._connector_to_conn_proxy[new_hash] = old_proxy

ConnectorConnectionBinding

Bases: ConfigBaseModel

Connector -> runtime connection-proxy mapping entry.

This is a non-secret contract block: manifests store proxy names only. At runtime, the :class:~graflo.hq.connection_provider.ConnectionProvider resolves each conn_proxy to a concrete generalized config holding credentials/secrets.

Source code in graflo/architecture/contract/bindings/core.py
class ConnectorConnectionBinding(ConfigBaseModel):
    """Connector -> runtime connection-proxy mapping entry.

    This is a non-secret contract block: manifests store proxy names only.
    At runtime, the :class:`~graflo.hq.connection_provider.ConnectionProvider`
    resolves each ``conn_proxy`` to a concrete generalized config holding
    credentials/secrets.
    """

    connector: str
    conn_proxy: str

ConnectorTemplate

Bases: ConfigBaseModel

Named connector defaults referenced by base on connector entries.

Template-only metadata (not copied onto connectors): name, conn_proxy. All other fields are merged into expanded connectors (dict fields such as params are deep-merged; scalars and blocks like pagination are replaced when the connector entry provides them explicitly).

Source code in graflo/architecture/contract/bindings/core.py
class ConnectorTemplate(ConfigBaseModel):
    """Named connector defaults referenced by ``base`` on connector entries.

    Template-only metadata (not copied onto connectors): ``name``, ``conn_proxy``.
    All other fields are merged into expanded connectors (dict fields such as
    ``params`` are deep-merged; scalars and blocks like ``pagination`` are replaced
    when the connector entry provides them explicitly).
    """

    model_config = ConfigDict(extra="allow")

    name: str = Field(
        ..., description="Template name referenced by connector ``base``."
    )
    conn_proxy: str | None = Field(
        default=None,
        description=(
            "Template metadata: auto-wires ``connector_connection`` for expanded "
            "connectors that declare ``name``."
        ),
    )

ResourceConnectorBinding

Bases: ConfigBaseModel

Top-level resource -> connector-name mapping entry.

Source code in graflo/architecture/contract/bindings/core.py
class ResourceConnectorBinding(ConfigBaseModel):
    """Top-level resource -> connector-name mapping entry."""

    resource: str
    connector: str

StagingProxyBinding

Bases: ConfigBaseModel

Named staging profile -> runtime connection-proxy (e.g. S3 credentials).

Used by TigerGraph bulk ingest to resolve S3GeneralizedConnConfig without putting secrets in the manifest.

Source code in graflo/architecture/contract/bindings/core.py
class StagingProxyBinding(ConfigBaseModel):
    """Named staging profile -> runtime connection-proxy (e.g. S3 credentials).

    Used by TigerGraph bulk ingest to resolve ``S3GeneralizedConnConfig`` without
    putting secrets in the manifest.
    """

    name: str
    conn_proxy: str