class Bindings(ConfigBaseModel):
"""Named resource connectors with explicit resource linkage."""
connectors: list[FileConnector | TableConnector | SparqlConnector] = Field(
default_factory=list
)
# Accept dict entries at init-time (see validators below).
# Internally and at runtime, Graflo uses typed lists derived from these.
resource_connector: list[ResourceConnectorBinding | dict[str, str]] = Field(
default_factory=list
)
# Connector -> runtime endpoint config indirection (proxy by name).
connector_connection: list[ConnectorConnectionBinding | dict[str, str]] = Field(
default_factory=list
)
_resource_connector_typed: list[ResourceConnectorBinding] = PrivateAttr(
default_factory=list
)
_connector_connection_typed: list[ConnectorConnectionBinding] = PrivateAttr(
default_factory=list
)
_connectors_index: dict[str, ResourceConnector] = PrivateAttr(default_factory=dict)
_connectors_name_index: dict[str, str] = PrivateAttr(default_factory=dict)
_resource_to_connector_hash: dict[str, str] = PrivateAttr(default_factory=dict)
_connector_to_conn_proxy: dict[str, str] = PrivateAttr(default_factory=dict)
@property
def connector_connection_bindings(
self,
) -> list[ConnectorConnectionBinding]:
# Expose typed entries for downstream components (type-checker friendly).
return self._connector_connection_typed
def _rebuild_indexes(self) -> None:
self._connectors_index = {}
self._connectors_name_index = {}
for connector in self.connectors:
existing = self._connectors_index.get(connector.hash)
if existing is not None:
raise ValueError(
"Connector hash collision detected for connectors "
f"'{type(existing).__name__}' and '{type(connector).__name__}' "
f"(hash='{connector.hash}')."
)
self._connectors_index[connector.hash] = connector
if connector.name:
existing_hash = self._connectors_name_index.get(connector.name)
if existing_hash is not None and existing_hash != connector.hash:
raise ValueError(
"Connector names must be unique when provided. "
f"Duplicate connector name '{connector.name}'."
)
self._connectors_name_index[connector.name] = connector.hash
@field_validator("resource_connector", mode="before")
@classmethod
def _coerce_resource_connector_entries(
cls, v: Any
) -> list[ResourceConnectorBinding]:
if v is None:
return []
if not isinstance(v, list):
raise ValueError(
"resource_connector must be a list of {resource, connector} entries"
)
coerced: list[ResourceConnectorBinding] = []
for i, item in enumerate(v):
if isinstance(item, ResourceConnectorBinding):
coerced.append(item)
continue
if isinstance(item, dict):
missing = [k for k in ("resource", "connector") if k not in item]
if missing:
raise ValueError(
f"Invalid resource_connector entry at index {i}: missing required keys {missing}. "
"Expected keys: ['resource', 'connector']."
)
try:
coerced.append(ResourceConnectorBinding.model_validate(item))
except Exception as e: # noqa: BLE001
# Keep the message concise and contextual; nested pydantic
# errors can be noisy for config authors.
raise ValueError(
f"Invalid resource_connector entry at index {i}: {item!r}."
) from e
continue
raise ValueError(
f"Invalid resource_connector entry at index {i}: expected dict or "
f"ResourceConnectorBinding, got {type(item).__name__}."
)
return coerced
@field_validator("connector_connection", mode="before")
@classmethod
def _coerce_connector_connection_entries(
cls, v: Any
) -> list[ConnectorConnectionBinding]:
if v is None:
return []
if not isinstance(v, list):
raise ValueError(
"connector_connection must be a list of {connector, conn_proxy} entries"
)
coerced: list[ConnectorConnectionBinding] = []
for i, item in enumerate(v):
if isinstance(item, ConnectorConnectionBinding):
coerced.append(item)
continue
if isinstance(item, dict):
missing = [k for k in ("connector", "conn_proxy") if k not in item]
if missing:
raise ValueError(
f"Invalid connector_connection entry at index {i}: missing required keys {missing}. "
"Expected keys: ['connector', 'conn_proxy']."
)
try:
coerced.append(ConnectorConnectionBinding.model_validate(item))
except Exception as e: # noqa: BLE001
raise ValueError(
f"Invalid connector_connection entry at index {i}: {item!r}."
) from e
continue
raise ValueError(
f"Invalid connector_connection entry at index {i}: expected dict or "
f"ConnectorConnectionBinding, got {type(item).__name__}."
)
return coerced
@staticmethod
def default_connector_name(connector: ResourceConnector) -> str:
if connector.name:
return connector.name
if isinstance(connector, FileConnector):
return connector.regex or str(connector.sub_path)
if isinstance(connector, TableConnector):
return connector.table_name
if isinstance(connector, SparqlConnector):
return connector.rdf_class
raise TypeError(f"Unsupported connector type: {type(connector)!r}")
@model_validator(mode="after")
def _populate_resource_connector(self) -> Self:
self._rebuild_indexes()
self._resource_to_connector_hash = {}
# Create typed views so internal code never has to handle dicts.
self._resource_connector_typed = [
ResourceConnectorBinding.model_validate(m) if isinstance(m, dict) else m
for m in self.resource_connector
]
self._connector_connection_typed = [
ConnectorConnectionBinding.model_validate(m) if isinstance(m, dict) else m
for m in self.connector_connection
]
for connector in self.connectors:
if connector.resource_name is None:
continue
existing_hash = self._resource_to_connector_hash.get(
connector.resource_name
)
if existing_hash is not None and existing_hash != connector.hash:
raise ValueError(
"Conflicting resource binding for resource "
f"'{connector.resource_name}'."
)
self._resource_to_connector_hash[connector.resource_name] = connector.hash
for mapping in self._resource_connector_typed:
connector_hash = self._connectors_name_index.get(mapping.connector)
if connector_hash is None:
raise ValueError(
f"resource_connector references unknown connector '{mapping.connector}' "
f"for resource '{mapping.resource}'."
)
existing_hash = self._resource_to_connector_hash.get(mapping.resource)
if existing_hash is not None and existing_hash != connector_hash:
raise ValueError(
f"Conflicting resource binding for resource '{mapping.resource}'."
)
self._resource_to_connector_hash[mapping.resource] = connector_hash
self._rebuild_connector_to_conn_proxy()
return self
def _resolve_connector_ref_to_hash(self, connector_ref: str) -> str:
"""Resolve a connector reference to its canonical connector hash.
The contract allows referencing either:
- ``connector.hash`` (canonical internal id), or
- ``connector.name`` (when a name is provided / auto-filled).
- ``resource_name`` (alias when ``connector.name`` is omitted in manifests).
"""
if connector_ref in self._connectors_index:
return connector_ref
resolved_hash = self._connectors_name_index.get(connector_ref)
if resolved_hash is None:
resolved_hash = self._resource_to_connector_hash.get(connector_ref)
if resolved_hash is None:
raise ValueError(f"Unknown connector reference '{connector_ref}'")
return resolved_hash
def _rebuild_connector_to_conn_proxy(self) -> None:
self._connector_to_conn_proxy = {}
for mapping in self._connector_connection_typed:
connector_hash = self._resolve_connector_ref_to_hash(mapping.connector)
existing = self._connector_to_conn_proxy.get(connector_hash)
if existing is not None and existing != mapping.conn_proxy:
raise ValueError(
"Conflicting conn_proxy mapping for connector "
f"'{connector_hash}' (existing='{existing}', new='{mapping.conn_proxy}')."
)
self._connector_to_conn_proxy[connector_hash] = mapping.conn_proxy
def get_conn_proxy_for_connector(
self, connector: TableConnector | FileConnector | SparqlConnector
) -> str | None:
"""Return the mapped runtime proxy name for a given connector."""
return self._connector_to_conn_proxy.get(connector.hash)
def bind_connector_to_conn_proxy(
self,
connector: TableConnector | FileConnector | SparqlConnector,
conn_proxy: str,
) -> None:
"""Bind a connector to a non-secret runtime proxy name.
Uses ``connector.name`` when available, falling back to ``connector.hash``.
"""
# Ensure indexes include the connector and that a default name is set.
if connector.hash not in self._connectors_index:
self.add_connector(connector)
# Pick a contract reference string that's stable and user-friendly.
connector_ref = connector.name or connector.hash
# Ensure uniqueness by connector.hash (not by ref-string).
connector_hash = connector.hash
existing_idx: int | None = None
for i, m in enumerate(self._connector_connection_typed):
try:
if self._resolve_connector_ref_to_hash(m.connector) == connector_hash:
existing_idx = i
break
except ValueError:
continue
if existing_idx is None:
self._connector_connection_typed.append(
ConnectorConnectionBinding(
connector=connector_ref, conn_proxy=conn_proxy
)
)
else:
self._connector_connection_typed[existing_idx] = ConnectorConnectionBinding(
connector=connector_ref, conn_proxy=conn_proxy
)
# Keep the public contract field in sync for serialization / downstream.
self.connector_connection = list(self._connector_connection_typed)
self._rebuild_connector_to_conn_proxy()
@classmethod
def from_dict(cls, data: dict[str, Any] | list[Any]) -> Self:
if isinstance(data, list):
raise ValueError(
"Bindings.from_dict expects a mapping with 'connectors' and optional "
"'resource_connector'. List-style connector payloads are not supported."
)
legacy_keys = {
"postgres_connections",
"table_connectors",
"file_connectors",
"sparql_connectors",
}
found_legacy = sorted(k for k in legacy_keys if k in data)
if found_legacy:
raise ValueError(
"Legacy Bindings init keys are not supported. "
f"Unsupported keys: {', '.join(found_legacy)}."
)
return cls.model_validate(data)
def add_connector(
self,
connector: TableConnector | FileConnector | SparqlConnector,
) -> None:
if connector.name is None:
object.__setattr__(
connector, "name", self.default_connector_name(connector)
)
existing_name_hash = None
if connector.name:
existing_name_hash = self._connectors_name_index.get(connector.name)
if (
connector.name
and existing_name_hash is not None
and existing_name_hash != connector.hash
):
raise ValueError(
"Connector names must be unique when provided. "
f"Duplicate connector name '{connector.name}'."
)
if connector.hash in self._connectors_index:
old_connector = self._connectors_index[connector.hash]
for idx, existing in enumerate(self.connectors):
if existing is old_connector:
self.connectors[idx] = connector
break
else:
self.connectors.append(connector)
self._rebuild_indexes()
if connector.resource_name is not None:
existing_hash = self._resource_to_connector_hash.get(
connector.resource_name
)
if existing_hash is not None and existing_hash != connector.hash:
raise ValueError(
"Conflicting resource binding for resource "
f"'{connector.resource_name}'."
)
self._resource_to_connector_hash[connector.resource_name] = connector.hash
def bind_resource(
self,
resource_name: str,
connector: TableConnector | FileConnector | SparqlConnector,
) -> None:
if connector.hash not in self._connectors_index:
raise KeyError(f"Connector not found for hash='{connector.hash}'")
self._resource_to_connector_hash[resource_name] = connector.hash
connector_name = connector.name or self.default_connector_name(connector)
mapping_idx = None
for idx, mapping in enumerate(self._resource_connector_typed):
if mapping.resource == resource_name:
mapping_idx = idx
break
new_mapping = ResourceConnectorBinding(
resource=resource_name,
connector=connector_name,
)
if mapping_idx is None:
self._resource_connector_typed.append(new_mapping)
else:
self._resource_connector_typed[mapping_idx] = new_mapping
# Keep the public contract field in sync for serialization / downstream.
self.resource_connector = list(self._resource_connector_typed)
def get_connector_for_resource(
self, resource_name: str
) -> TableConnector | FileConnector | SparqlConnector | None:
connector_hash = self._resource_to_connector_hash.get(resource_name)
if connector_hash is None:
return None
connector = self._connectors_index.get(connector_hash)
if isinstance(connector, (TableConnector, FileConnector, SparqlConnector)):
return connector
return None
def get_resource_type(self, resource_name: str) -> ResourceType | None:
connector = self.get_connector_for_resource(resource_name)
if connector is None:
return None
return connector.get_resource_type()
def get_table_info(self, resource_name: str) -> tuple[str, str | None] | None:
connector = self.get_connector_for_resource(resource_name)
if isinstance(connector, TableConnector):
return (connector.table_name, connector.schema_name)
return None