Skip to content

graflo.architecture.contract.bindings.core

Named connectors and resource-to-connector wiring.

Bindings

Bases: ConfigBaseModel

Named resource connectors with explicit resource linkage.

Source code in graflo/architecture/contract/bindings/core.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
class Bindings(ConfigBaseModel):
    """Named resource connectors with explicit resource linkage."""

    connectors: list[FileConnector | TableConnector | SparqlConnector] = Field(
        default_factory=list
    )
    # Accept dict entries at init-time (see validators below).
    # Internally and at runtime, Graflo uses typed lists derived from these.
    resource_connector: list[ResourceConnectorBinding | dict[str, str]] = Field(
        default_factory=list
    )
    # Connector -> runtime endpoint config indirection (proxy by name).
    connector_connection: list[ConnectorConnectionBinding | dict[str, str]] = Field(
        default_factory=list
    )
    _resource_connector_typed: list[ResourceConnectorBinding] = PrivateAttr(
        default_factory=list
    )
    _connector_connection_typed: list[ConnectorConnectionBinding] = PrivateAttr(
        default_factory=list
    )
    _connectors_index: dict[str, ResourceConnector] = PrivateAttr(default_factory=dict)
    _connectors_name_index: dict[str, str] = PrivateAttr(default_factory=dict)
    _resource_to_connector_hashes: dict[str, list[str]] = PrivateAttr(
        default_factory=dict
    )
    _connector_to_conn_proxy: dict[str, str] = PrivateAttr(default_factory=dict)
    staging_proxy: list[StagingProxyBinding | dict[str, str]] = Field(
        default_factory=list,
        description="Optional named staging endpoints (S3) -> conn_proxy wiring.",
    )
    _staging_proxy_typed: list[StagingProxyBinding] = PrivateAttr(default_factory=list)
    _staging_name_to_conn_proxy: dict[str, str] = PrivateAttr(default_factory=dict)

    @property
    def connector_connection_bindings(
        self,
    ) -> list[ConnectorConnectionBinding]:
        # Expose typed entries for downstream components (type-checker friendly).
        return self._connector_connection_typed

    @field_validator("staging_proxy", mode="before")
    @classmethod
    def _coerce_staging_proxy_entries(
        cls, v: Any
    ) -> list[StagingProxyBinding | dict[str, str]]:
        if v is None:
            return []
        if not isinstance(v, list):
            raise ValueError(
                "staging_proxy must be a list of {name, conn_proxy} entries"
            )
        coerced: list[StagingProxyBinding | dict[str, str]] = []
        for i, item in enumerate(v):
            if isinstance(item, StagingProxyBinding):
                coerced.append(item)
                continue
            if isinstance(item, dict):
                missing = [k for k in ("name", "conn_proxy") if k not in item]
                if missing:
                    raise ValueError(
                        f"Invalid staging_proxy entry at index {i}: missing {missing}."
                    )
                coerced.append(StagingProxyBinding.model_validate(item))
                continue
            raise ValueError(
                f"Invalid staging_proxy entry at index {i}: got {type(item).__name__}."
            )
        return coerced

    def _rebuild_staging_proxy_index(self) -> None:
        self._staging_name_to_conn_proxy = {}
        for m in self._staging_proxy_typed:
            existing = self._staging_name_to_conn_proxy.get(m.name)
            if existing is not None and existing != m.conn_proxy:
                raise ValueError(
                    f"Duplicate staging_proxy name '{m.name}' with conflicting conn_proxy."
                )
            self._staging_name_to_conn_proxy[m.name] = m.conn_proxy

    def get_staging_conn_proxy(self, name: str) -> str | None:
        """Return ``conn_proxy`` for a staging profile name, if declared."""
        return self._staging_name_to_conn_proxy.get(name)

    def _rebuild_indexes(self) -> None:
        self._connectors_index = {}
        self._connectors_name_index = {}
        for connector in self.connectors:
            existing = self._connectors_index.get(connector.hash)
            if existing is not None:
                raise ValueError(
                    "Connector hash collision detected for connectors "
                    f"'{type(existing).__name__}' and '{type(connector).__name__}' "
                    f"(hash='{connector.hash}')."
                )
            self._connectors_index[connector.hash] = connector

            if connector.name:
                existing_hash = self._connectors_name_index.get(connector.name)
                if existing_hash is not None and existing_hash != connector.hash:
                    raise ValueError(
                        "Connector names must be unique when provided. "
                        f"Duplicate connector name '{connector.name}'."
                    )
                self._connectors_name_index[connector.name] = connector.hash

    def _append_resource_connector_hash(
        self, resource_name: str, connector_hash: str
    ) -> None:
        """Append *connector_hash* for *resource_name* if not already present (order kept)."""
        bucket = self._resource_to_connector_hashes.setdefault(resource_name, [])
        if connector_hash not in bucket:
            bucket.append(connector_hash)

    @field_validator("resource_connector", mode="before")
    @classmethod
    def _coerce_resource_connector_entries(
        cls, v: Any
    ) -> list[ResourceConnectorBinding]:
        if v is None:
            return []
        if not isinstance(v, list):
            raise ValueError(
                "resource_connector must be a list of {resource, connector} entries"
            )

        coerced: list[ResourceConnectorBinding] = []
        for i, item in enumerate(v):
            if isinstance(item, ResourceConnectorBinding):
                coerced.append(item)
                continue

            if isinstance(item, dict):
                missing = [k for k in ("resource", "connector") if k not in item]
                if missing:
                    raise ValueError(
                        f"Invalid resource_connector entry at index {i}: missing required keys {missing}. "
                        "Expected keys: ['resource', 'connector']."
                    )

                try:
                    coerced.append(ResourceConnectorBinding.model_validate(item))
                except Exception as e:  # noqa: BLE001
                    # Keep the message concise and contextual; nested pydantic
                    # errors can be noisy for config authors.
                    raise ValueError(
                        f"Invalid resource_connector entry at index {i}: {item!r}."
                    ) from e
                continue

            raise ValueError(
                f"Invalid resource_connector entry at index {i}: expected dict or "
                f"ResourceConnectorBinding, got {type(item).__name__}."
            )

        return coerced

    @field_validator("connector_connection", mode="before")
    @classmethod
    def _coerce_connector_connection_entries(
        cls, v: Any
    ) -> list[ConnectorConnectionBinding]:
        if v is None:
            return []
        if not isinstance(v, list):
            raise ValueError(
                "connector_connection must be a list of {connector, conn_proxy} entries"
            )

        coerced: list[ConnectorConnectionBinding] = []
        for i, item in enumerate(v):
            if isinstance(item, ConnectorConnectionBinding):
                coerced.append(item)
                continue

            if isinstance(item, dict):
                missing = [k for k in ("connector", "conn_proxy") if k not in item]
                if missing:
                    raise ValueError(
                        f"Invalid connector_connection entry at index {i}: missing required keys {missing}. "
                        "Expected keys: ['connector', 'conn_proxy']."
                    )
                try:
                    coerced.append(ConnectorConnectionBinding.model_validate(item))
                except Exception as e:  # noqa: BLE001
                    raise ValueError(
                        f"Invalid connector_connection entry at index {i}: {item!r}."
                    ) from e
                continue

            raise ValueError(
                f"Invalid connector_connection entry at index {i}: expected dict or "
                f"ConnectorConnectionBinding, got {type(item).__name__}."
            )

        return coerced

    @staticmethod
    def default_connector_name(connector: ResourceConnector) -> str:
        if connector.name:
            return connector.name
        if isinstance(connector, FileConnector):
            return connector.regex or str(connector.sub_path)
        if isinstance(connector, TableConnector):
            return connector.table_name
        if isinstance(connector, SparqlConnector):
            return connector.rdf_class
        raise TypeError(f"Unsupported connector type: {type(connector)!r}")

    @model_validator(mode="after")
    def _populate_resource_connector(self) -> Self:
        self._rebuild_indexes()
        self._resource_to_connector_hashes = {}

        # Create typed views so internal code never has to handle dicts.
        self._resource_connector_typed = [
            ResourceConnectorBinding.model_validate(m) if isinstance(m, dict) else m
            for m in self.resource_connector
        ]
        self._connector_connection_typed = [
            ConnectorConnectionBinding.model_validate(m) if isinstance(m, dict) else m
            for m in self.connector_connection
        ]
        self._staging_proxy_typed = [
            StagingProxyBinding.model_validate(m) if isinstance(m, dict) else m
            for m in self.staging_proxy
        ]
        self._rebuild_staging_proxy_index()

        for connector in self.connectors:
            if connector.resource_name is None:
                continue
            self._append_resource_connector_hash(
                connector.resource_name, connector.hash
            )

        for mapping in self._resource_connector_typed:
            connector_hash = self._connectors_name_index.get(mapping.connector)
            if connector_hash is None:
                if mapping.connector in self._connectors_index:
                    connector_hash = mapping.connector
                else:
                    raise ValueError(
                        f"resource_connector references unknown connector '{mapping.connector}' "
                        f"for resource '{mapping.resource}'."
                    )
            self._append_resource_connector_hash(mapping.resource, connector_hash)
        self._rebuild_connector_to_conn_proxy()
        return self

    def _resolve_connector_ref_to_hash(self, connector_ref: str) -> str:
        """Resolve a connector reference to its canonical connector hash.

        Allowed references:
        - ``connector.hash`` (canonical internal id), or
        - ``connector.name`` (when a name is provided / auto-filled).

        Ingestion resource names are not valid connector references (a resource
        may map to multiple connectors).
        """
        if connector_ref in self._connectors_index:
            return connector_ref
        resolved_hash = self._connectors_name_index.get(connector_ref)
        if resolved_hash is None:
            raise ValueError(f"Unknown connector reference '{connector_ref}'")
        return resolved_hash

    def _rebuild_connector_to_conn_proxy(self) -> None:
        self._connector_to_conn_proxy = {}
        for mapping in self._connector_connection_typed:
            connector_hash = self._resolve_connector_ref_to_hash(mapping.connector)
            existing = self._connector_to_conn_proxy.get(connector_hash)
            if existing is not None and existing != mapping.conn_proxy:
                raise ValueError(
                    "Conflicting conn_proxy mapping for connector "
                    f"'{connector_hash}' (existing='{existing}', new='{mapping.conn_proxy}')."
                )
            self._connector_to_conn_proxy[connector_hash] = mapping.conn_proxy

    def get_conn_proxy_for_connector(
        self, connector: TableConnector | FileConnector | SparqlConnector
    ) -> str | None:
        """Return the mapped runtime proxy name for a given connector."""
        return self._connector_to_conn_proxy.get(connector.hash)

    def bind_connector_to_conn_proxy(
        self,
        connector: TableConnector | FileConnector | SparqlConnector,
        conn_proxy: str,
    ) -> None:
        """Bind a connector to a non-secret runtime proxy name.

        Uses ``connector.name`` when available, falling back to ``connector.hash``.
        """
        # Ensure indexes include the connector and that a default name is set.
        if connector.hash not in self._connectors_index:
            self.add_connector(connector)
        # Pick a contract reference string that's stable and user-friendly.
        connector_ref = connector.name or connector.hash

        # Ensure uniqueness by connector.hash (not by ref-string).
        connector_hash = connector.hash
        existing_idx: int | None = None
        for i, m in enumerate(self._connector_connection_typed):
            try:
                if self._resolve_connector_ref_to_hash(m.connector) == connector_hash:
                    existing_idx = i
                    break
            except ValueError:
                continue

        if existing_idx is None:
            self._connector_connection_typed.append(
                ConnectorConnectionBinding(
                    connector=connector_ref, conn_proxy=conn_proxy
                )
            )
        else:
            self._connector_connection_typed[existing_idx] = ConnectorConnectionBinding(
                connector=connector_ref, conn_proxy=conn_proxy
            )
        # Keep the public contract field in sync for serialization / downstream.
        self.connector_connection = list(self._connector_connection_typed)

        self._rebuild_connector_to_conn_proxy()

    @classmethod
    def from_dict(cls, data: dict[str, Any] | list[Any]) -> Self:
        if isinstance(data, list):
            raise ValueError(
                "Bindings.from_dict expects a mapping with 'connectors' and optional "
                "'resource_connector'. List-style connector payloads are not supported."
            )
        legacy_keys = {
            "postgres_connections",
            "table_connectors",
            "file_connectors",
            "sparql_connectors",
        }
        found_legacy = sorted(k for k in legacy_keys if k in data)
        if found_legacy:
            raise ValueError(
                "Legacy Bindings init keys are not supported. "
                f"Unsupported keys: {', '.join(found_legacy)}."
            )
        return cls.model_validate(data)

    def add_connector(
        self,
        connector: TableConnector | FileConnector | SparqlConnector,
    ) -> None:
        if connector.name is None:
            object.__setattr__(
                connector, "name", self.default_connector_name(connector)
            )
        existing_name_hash = None
        if connector.name:
            existing_name_hash = self._connectors_name_index.get(connector.name)
        if (
            connector.name
            and existing_name_hash is not None
            and existing_name_hash != connector.hash
        ):
            raise ValueError(
                "Connector names must be unique when provided. "
                f"Duplicate connector name '{connector.name}'."
            )

        if connector.hash in self._connectors_index:
            old_connector = self._connectors_index[connector.hash]
            for idx, existing in enumerate(self.connectors):
                if existing is old_connector:
                    self.connectors[idx] = connector
                    break
        else:
            self.connectors.append(connector)
        self._rebuild_indexes()
        if connector.resource_name is not None:
            self._append_resource_connector_hash(
                connector.resource_name, connector.hash
            )

    def bind_resource(
        self,
        resource_name: str,
        connector: TableConnector | FileConnector | SparqlConnector,
    ) -> None:
        if connector.hash not in self._connectors_index:
            raise KeyError(f"Connector not found for hash='{connector.hash}'")
        self._append_resource_connector_hash(resource_name, connector.hash)
        connector_name = connector.name or self.default_connector_name(connector)
        self._resource_connector_typed.append(
            ResourceConnectorBinding(
                resource=resource_name,
                connector=connector_name,
            )
        )
        # Keep the public contract field in sync for serialization / downstream.
        self.resource_connector = list(self._resource_connector_typed)

    def get_connectors_for_resource(
        self, resource_name: str
    ) -> list[TableConnector | FileConnector | SparqlConnector]:
        """Return connectors bound to *resource_name*, in binding order (unique by hash)."""
        result: list[TableConnector | FileConnector | SparqlConnector] = []
        for h in self._resource_to_connector_hashes.get(resource_name, []):
            c = self._connectors_index.get(h)
            if isinstance(c, (TableConnector, FileConnector, SparqlConnector)):
                result.append(c)
        return result

bind_connector_to_conn_proxy(connector, conn_proxy)

Bind a connector to a non-secret runtime proxy name.

Uses connector.name when available, falling back to connector.hash.

Source code in graflo/architecture/contract/bindings/core.py
def bind_connector_to_conn_proxy(
    self,
    connector: TableConnector | FileConnector | SparqlConnector,
    conn_proxy: str,
) -> None:
    """Bind a connector to a non-secret runtime proxy name.

    Uses ``connector.name`` when available, falling back to ``connector.hash``.
    """
    # Ensure indexes include the connector and that a default name is set.
    if connector.hash not in self._connectors_index:
        self.add_connector(connector)
    # Pick a contract reference string that's stable and user-friendly.
    connector_ref = connector.name or connector.hash

    # Ensure uniqueness by connector.hash (not by ref-string).
    connector_hash = connector.hash
    existing_idx: int | None = None
    for i, m in enumerate(self._connector_connection_typed):
        try:
            if self._resolve_connector_ref_to_hash(m.connector) == connector_hash:
                existing_idx = i
                break
        except ValueError:
            continue

    if existing_idx is None:
        self._connector_connection_typed.append(
            ConnectorConnectionBinding(
                connector=connector_ref, conn_proxy=conn_proxy
            )
        )
    else:
        self._connector_connection_typed[existing_idx] = ConnectorConnectionBinding(
            connector=connector_ref, conn_proxy=conn_proxy
        )
    # Keep the public contract field in sync for serialization / downstream.
    self.connector_connection = list(self._connector_connection_typed)

    self._rebuild_connector_to_conn_proxy()

get_conn_proxy_for_connector(connector)

Return the mapped runtime proxy name for a given connector.

Source code in graflo/architecture/contract/bindings/core.py
def get_conn_proxy_for_connector(
    self, connector: TableConnector | FileConnector | SparqlConnector
) -> str | None:
    """Return the mapped runtime proxy name for a given connector."""
    return self._connector_to_conn_proxy.get(connector.hash)

get_connectors_for_resource(resource_name)

Return connectors bound to resource_name, in binding order (unique by hash).

Source code in graflo/architecture/contract/bindings/core.py
def get_connectors_for_resource(
    self, resource_name: str
) -> list[TableConnector | FileConnector | SparqlConnector]:
    """Return connectors bound to *resource_name*, in binding order (unique by hash)."""
    result: list[TableConnector | FileConnector | SparqlConnector] = []
    for h in self._resource_to_connector_hashes.get(resource_name, []):
        c = self._connectors_index.get(h)
        if isinstance(c, (TableConnector, FileConnector, SparqlConnector)):
            result.append(c)
    return result

get_staging_conn_proxy(name)

Return conn_proxy for a staging profile name, if declared.

Source code in graflo/architecture/contract/bindings/core.py
def get_staging_conn_proxy(self, name: str) -> str | None:
    """Return ``conn_proxy`` for a staging profile name, if declared."""
    return self._staging_name_to_conn_proxy.get(name)

ConnectorConnectionBinding

Bases: ConfigBaseModel

Connector -> runtime connection-proxy mapping entry.

This is a non-secret contract block: manifests store proxy names only. At runtime, the :class:~graflo.hq.connection_provider.ConnectionProvider resolves each conn_proxy to a concrete generalized config holding credentials/secrets.

Source code in graflo/architecture/contract/bindings/core.py
class ConnectorConnectionBinding(ConfigBaseModel):
    """Connector -> runtime connection-proxy mapping entry.

    This is a non-secret contract block: manifests store proxy names only.
    At runtime, the :class:`~graflo.hq.connection_provider.ConnectionProvider`
    resolves each ``conn_proxy`` to a concrete generalized config holding
    credentials/secrets.
    """

    connector: str
    conn_proxy: str

ResourceConnectorBinding

Bases: ConfigBaseModel

Top-level resource -> connector-name mapping entry.

Source code in graflo/architecture/contract/bindings/core.py
class ResourceConnectorBinding(ConfigBaseModel):
    """Top-level resource -> connector-name mapping entry."""

    resource: str
    connector: str

StagingProxyBinding

Bases: ConfigBaseModel

Named staging profile -> runtime connection-proxy (e.g. S3 credentials).

Used by TigerGraph bulk ingest to resolve S3GeneralizedConnConfig without putting secrets in the manifest.

Source code in graflo/architecture/contract/bindings/core.py
class StagingProxyBinding(ConfigBaseModel):
    """Named staging profile -> runtime connection-proxy (e.g. S3 credentials).

    Used by TigerGraph bulk ingest to resolve ``S3GeneralizedConnConfig`` without
    putting secrets in the manifest.
    """

    name: str
    conn_proxy: str