Skip to content

graflo.architecture

Architecture façade.

For lighter imports, prefer:

  • graflo.architecture.schema — graph schema types
  • graflo.architecture.contract — manifest, bindings, resources, transforms
  • graflo.architecture.pipeline.runtime — actors and executor

See docs/importing.md in the package.

Bindings

Bases: ConfigBaseModel

Named resource connectors with explicit resource linkage.

Source code in graflo/architecture/contract/bindings/core.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
class Bindings(ConfigBaseModel):
    """Named resource connectors with explicit resource linkage."""

    connectors: list[FileConnector | TableConnector | SparqlConnector] = Field(
        default_factory=list
    )
    # Accept dict entries at init-time (see validators below).
    # Internally and at runtime, Graflo uses typed lists derived from these.
    resource_connector: list[ResourceConnectorBinding | dict[str, str]] = Field(
        default_factory=list
    )
    # Connector -> runtime endpoint config indirection (proxy by name).
    connector_connection: list[ConnectorConnectionBinding | dict[str, str]] = Field(
        default_factory=list
    )
    _resource_connector_typed: list[ResourceConnectorBinding] = PrivateAttr(
        default_factory=list
    )
    _connector_connection_typed: list[ConnectorConnectionBinding] = PrivateAttr(
        default_factory=list
    )
    _connectors_index: dict[str, ResourceConnector] = PrivateAttr(default_factory=dict)
    _connectors_name_index: dict[str, str] = PrivateAttr(default_factory=dict)
    _resource_to_connector_hashes: dict[str, list[str]] = PrivateAttr(
        default_factory=dict
    )
    _connector_to_conn_proxy: dict[str, str] = PrivateAttr(default_factory=dict)
    staging_proxy: list[StagingProxyBinding | dict[str, str]] = Field(
        default_factory=list,
        description="Optional named staging endpoints (S3) -> conn_proxy wiring.",
    )
    _staging_proxy_typed: list[StagingProxyBinding] = PrivateAttr(default_factory=list)
    _staging_name_to_conn_proxy: dict[str, str] = PrivateAttr(default_factory=dict)

    @property
    def connector_connection_bindings(
        self,
    ) -> list[ConnectorConnectionBinding]:
        # Expose typed entries for downstream components (type-checker friendly).
        return self._connector_connection_typed

    @field_validator("staging_proxy", mode="before")
    @classmethod
    def _coerce_staging_proxy_entries(
        cls, v: Any
    ) -> list[StagingProxyBinding | dict[str, str]]:
        if v is None:
            return []
        if not isinstance(v, list):
            raise ValueError(
                "staging_proxy must be a list of {name, conn_proxy} entries"
            )
        coerced: list[StagingProxyBinding | dict[str, str]] = []
        for i, item in enumerate(v):
            if isinstance(item, StagingProxyBinding):
                coerced.append(item)
                continue
            if isinstance(item, dict):
                missing = [k for k in ("name", "conn_proxy") if k not in item]
                if missing:
                    raise ValueError(
                        f"Invalid staging_proxy entry at index {i}: missing {missing}."
                    )
                coerced.append(StagingProxyBinding.model_validate(item))
                continue
            raise ValueError(
                f"Invalid staging_proxy entry at index {i}: got {type(item).__name__}."
            )
        return coerced

    def _rebuild_staging_proxy_index(self) -> None:
        self._staging_name_to_conn_proxy = {}
        for m in self._staging_proxy_typed:
            existing = self._staging_name_to_conn_proxy.get(m.name)
            if existing is not None and existing != m.conn_proxy:
                raise ValueError(
                    f"Duplicate staging_proxy name '{m.name}' with conflicting conn_proxy."
                )
            self._staging_name_to_conn_proxy[m.name] = m.conn_proxy

    def get_staging_conn_proxy(self, name: str) -> str | None:
        """Return ``conn_proxy`` for a staging profile name, if declared."""
        return self._staging_name_to_conn_proxy.get(name)

    def _rebuild_indexes(self) -> None:
        self._connectors_index = {}
        self._connectors_name_index = {}
        for connector in self.connectors:
            existing = self._connectors_index.get(connector.hash)
            if existing is not None:
                raise ValueError(
                    "Connector hash collision detected for connectors "
                    f"'{type(existing).__name__}' and '{type(connector).__name__}' "
                    f"(hash='{connector.hash}')."
                )
            self._connectors_index[connector.hash] = connector

            if connector.name:
                existing_hash = self._connectors_name_index.get(connector.name)
                if existing_hash is not None and existing_hash != connector.hash:
                    raise ValueError(
                        "Connector names must be unique when provided. "
                        f"Duplicate connector name '{connector.name}'."
                    )
                self._connectors_name_index[connector.name] = connector.hash

    def _append_resource_connector_hash(
        self, resource_name: str, connector_hash: str
    ) -> None:
        """Append *connector_hash* for *resource_name* if not already present (order kept)."""
        bucket = self._resource_to_connector_hashes.setdefault(resource_name, [])
        if connector_hash not in bucket:
            bucket.append(connector_hash)

    @field_validator("resource_connector", mode="before")
    @classmethod
    def _coerce_resource_connector_entries(
        cls, v: Any
    ) -> list[ResourceConnectorBinding]:
        if v is None:
            return []
        if not isinstance(v, list):
            raise ValueError(
                "resource_connector must be a list of {resource, connector} entries"
            )

        coerced: list[ResourceConnectorBinding] = []
        for i, item in enumerate(v):
            if isinstance(item, ResourceConnectorBinding):
                coerced.append(item)
                continue

            if isinstance(item, dict):
                missing = [k for k in ("resource", "connector") if k not in item]
                if missing:
                    raise ValueError(
                        f"Invalid resource_connector entry at index {i}: missing required keys {missing}. "
                        "Expected keys: ['resource', 'connector']."
                    )

                try:
                    coerced.append(ResourceConnectorBinding.model_validate(item))
                except Exception as e:  # noqa: BLE001
                    # Keep the message concise and contextual; nested pydantic
                    # errors can be noisy for config authors.
                    raise ValueError(
                        f"Invalid resource_connector entry at index {i}: {item!r}."
                    ) from e
                continue

            raise ValueError(
                f"Invalid resource_connector entry at index {i}: expected dict or "
                f"ResourceConnectorBinding, got {type(item).__name__}."
            )

        return coerced

    @field_validator("connector_connection", mode="before")
    @classmethod
    def _coerce_connector_connection_entries(
        cls, v: Any
    ) -> list[ConnectorConnectionBinding]:
        if v is None:
            return []
        if not isinstance(v, list):
            raise ValueError(
                "connector_connection must be a list of {connector, conn_proxy} entries"
            )

        coerced: list[ConnectorConnectionBinding] = []
        for i, item in enumerate(v):
            if isinstance(item, ConnectorConnectionBinding):
                coerced.append(item)
                continue

            if isinstance(item, dict):
                missing = [k for k in ("connector", "conn_proxy") if k not in item]
                if missing:
                    raise ValueError(
                        f"Invalid connector_connection entry at index {i}: missing required keys {missing}. "
                        "Expected keys: ['connector', 'conn_proxy']."
                    )
                try:
                    coerced.append(ConnectorConnectionBinding.model_validate(item))
                except Exception as e:  # noqa: BLE001
                    raise ValueError(
                        f"Invalid connector_connection entry at index {i}: {item!r}."
                    ) from e
                continue

            raise ValueError(
                f"Invalid connector_connection entry at index {i}: expected dict or "
                f"ConnectorConnectionBinding, got {type(item).__name__}."
            )

        return coerced

    @staticmethod
    def default_connector_name(connector: ResourceConnector) -> str:
        if connector.name:
            return connector.name
        if isinstance(connector, FileConnector):
            return connector.regex or str(connector.sub_path)
        if isinstance(connector, TableConnector):
            return connector.table_name
        if isinstance(connector, SparqlConnector):
            return connector.rdf_class
        raise TypeError(f"Unsupported connector type: {type(connector)!r}")

    @model_validator(mode="after")
    def _populate_resource_connector(self) -> Self:
        self._rebuild_indexes()
        self._resource_to_connector_hashes = {}

        # Create typed views so internal code never has to handle dicts.
        self._resource_connector_typed = [
            ResourceConnectorBinding.model_validate(m) if isinstance(m, dict) else m
            for m in self.resource_connector
        ]
        self._connector_connection_typed = [
            ConnectorConnectionBinding.model_validate(m) if isinstance(m, dict) else m
            for m in self.connector_connection
        ]
        self._staging_proxy_typed = [
            StagingProxyBinding.model_validate(m) if isinstance(m, dict) else m
            for m in self.staging_proxy
        ]
        self._rebuild_staging_proxy_index()

        for connector in self.connectors:
            if connector.resource_name is None:
                continue
            self._append_resource_connector_hash(
                connector.resource_name, connector.hash
            )

        for mapping in self._resource_connector_typed:
            connector_hash = self._connectors_name_index.get(mapping.connector)
            if connector_hash is None:
                if mapping.connector in self._connectors_index:
                    connector_hash = mapping.connector
                else:
                    raise ValueError(
                        f"resource_connector references unknown connector '{mapping.connector}' "
                        f"for resource '{mapping.resource}'."
                    )
            self._append_resource_connector_hash(mapping.resource, connector_hash)
        self._rebuild_connector_to_conn_proxy()
        return self

    def _resolve_connector_ref_to_hash(self, connector_ref: str) -> str:
        """Resolve a connector reference to its canonical connector hash.

        Allowed references:
        - ``connector.hash`` (canonical internal id), or
        - ``connector.name`` (when a name is provided / auto-filled).

        Ingestion resource names are not valid connector references (a resource
        may map to multiple connectors).
        """
        if connector_ref in self._connectors_index:
            return connector_ref
        resolved_hash = self._connectors_name_index.get(connector_ref)
        if resolved_hash is None:
            raise ValueError(f"Unknown connector reference '{connector_ref}'")
        return resolved_hash

    def _rebuild_connector_to_conn_proxy(self) -> None:
        self._connector_to_conn_proxy = {}
        for mapping in self._connector_connection_typed:
            connector_hash = self._resolve_connector_ref_to_hash(mapping.connector)
            existing = self._connector_to_conn_proxy.get(connector_hash)
            if existing is not None and existing != mapping.conn_proxy:
                raise ValueError(
                    "Conflicting conn_proxy mapping for connector "
                    f"'{connector_hash}' (existing='{existing}', new='{mapping.conn_proxy}')."
                )
            self._connector_to_conn_proxy[connector_hash] = mapping.conn_proxy

    def get_conn_proxy_for_connector(
        self, connector: TableConnector | FileConnector | SparqlConnector
    ) -> str | None:
        """Return the mapped runtime proxy name for a given connector."""
        return self._connector_to_conn_proxy.get(connector.hash)

    def bind_connector_to_conn_proxy(
        self,
        connector: TableConnector | FileConnector | SparqlConnector,
        conn_proxy: str,
    ) -> None:
        """Bind a connector to a non-secret runtime proxy name.

        Uses ``connector.name`` when available, falling back to ``connector.hash``.
        """
        # Ensure indexes include the connector and that a default name is set.
        if connector.hash not in self._connectors_index:
            self.add_connector(connector)
        # Pick a contract reference string that's stable and user-friendly.
        connector_ref = connector.name or connector.hash

        # Ensure uniqueness by connector.hash (not by ref-string).
        connector_hash = connector.hash
        existing_idx: int | None = None
        for i, m in enumerate(self._connector_connection_typed):
            try:
                if self._resolve_connector_ref_to_hash(m.connector) == connector_hash:
                    existing_idx = i
                    break
            except ValueError:
                continue

        if existing_idx is None:
            self._connector_connection_typed.append(
                ConnectorConnectionBinding(
                    connector=connector_ref, conn_proxy=conn_proxy
                )
            )
        else:
            self._connector_connection_typed[existing_idx] = ConnectorConnectionBinding(
                connector=connector_ref, conn_proxy=conn_proxy
            )
        # Keep the public contract field in sync for serialization / downstream.
        self.connector_connection = list(self._connector_connection_typed)

        self._rebuild_connector_to_conn_proxy()

    @classmethod
    def from_dict(cls, data: dict[str, Any] | list[Any]) -> Self:
        if isinstance(data, list):
            raise ValueError(
                "Bindings.from_dict expects a mapping with 'connectors' and optional "
                "'resource_connector'. List-style connector payloads are not supported."
            )
        legacy_keys = {
            "postgres_connections",
            "table_connectors",
            "file_connectors",
            "sparql_connectors",
        }
        found_legacy = sorted(k for k in legacy_keys if k in data)
        if found_legacy:
            raise ValueError(
                "Legacy Bindings init keys are not supported. "
                f"Unsupported keys: {', '.join(found_legacy)}."
            )
        return cls.model_validate(data)

    def add_connector(
        self,
        connector: TableConnector | FileConnector | SparqlConnector,
    ) -> None:
        if connector.name is None:
            object.__setattr__(
                connector, "name", self.default_connector_name(connector)
            )
        existing_name_hash = None
        if connector.name:
            existing_name_hash = self._connectors_name_index.get(connector.name)
        if (
            connector.name
            and existing_name_hash is not None
            and existing_name_hash != connector.hash
        ):
            raise ValueError(
                "Connector names must be unique when provided. "
                f"Duplicate connector name '{connector.name}'."
            )

        if connector.hash in self._connectors_index:
            old_connector = self._connectors_index[connector.hash]
            for idx, existing in enumerate(self.connectors):
                if existing is old_connector:
                    self.connectors[idx] = connector
                    break
        else:
            self.connectors.append(connector)
        self._rebuild_indexes()
        if connector.resource_name is not None:
            self._append_resource_connector_hash(
                connector.resource_name, connector.hash
            )

    def bind_resource(
        self,
        resource_name: str,
        connector: TableConnector | FileConnector | SparqlConnector,
    ) -> None:
        if connector.hash not in self._connectors_index:
            raise KeyError(f"Connector not found for hash='{connector.hash}'")
        self._append_resource_connector_hash(resource_name, connector.hash)
        connector_name = connector.name or self.default_connector_name(connector)
        self._resource_connector_typed.append(
            ResourceConnectorBinding(
                resource=resource_name,
                connector=connector_name,
            )
        )
        # Keep the public contract field in sync for serialization / downstream.
        self.resource_connector = list(self._resource_connector_typed)

    def get_connectors_for_resource(
        self, resource_name: str
    ) -> list[TableConnector | FileConnector | SparqlConnector]:
        """Return connectors bound to *resource_name*, in binding order (unique by hash)."""
        result: list[TableConnector | FileConnector | SparqlConnector] = []
        for h in self._resource_to_connector_hashes.get(resource_name, []):
            c = self._connectors_index.get(h)
            if isinstance(c, (TableConnector, FileConnector, SparqlConnector)):
                result.append(c)
        return result

bind_connector_to_conn_proxy(connector, conn_proxy)

Bind a connector to a non-secret runtime proxy name.

Uses connector.name when available, falling back to connector.hash.

Source code in graflo/architecture/contract/bindings/core.py
def bind_connector_to_conn_proxy(
    self,
    connector: TableConnector | FileConnector | SparqlConnector,
    conn_proxy: str,
) -> None:
    """Bind a connector to a non-secret runtime proxy name.

    Uses ``connector.name`` when available, falling back to ``connector.hash``.
    """
    # Ensure indexes include the connector and that a default name is set.
    if connector.hash not in self._connectors_index:
        self.add_connector(connector)
    # Pick a contract reference string that's stable and user-friendly.
    connector_ref = connector.name or connector.hash

    # Ensure uniqueness by connector.hash (not by ref-string).
    connector_hash = connector.hash
    existing_idx: int | None = None
    for i, m in enumerate(self._connector_connection_typed):
        try:
            if self._resolve_connector_ref_to_hash(m.connector) == connector_hash:
                existing_idx = i
                break
        except ValueError:
            continue

    if existing_idx is None:
        self._connector_connection_typed.append(
            ConnectorConnectionBinding(
                connector=connector_ref, conn_proxy=conn_proxy
            )
        )
    else:
        self._connector_connection_typed[existing_idx] = ConnectorConnectionBinding(
            connector=connector_ref, conn_proxy=conn_proxy
        )
    # Keep the public contract field in sync for serialization / downstream.
    self.connector_connection = list(self._connector_connection_typed)

    self._rebuild_connector_to_conn_proxy()

get_conn_proxy_for_connector(connector)

Return the mapped runtime proxy name for a given connector.

Source code in graflo/architecture/contract/bindings/core.py
def get_conn_proxy_for_connector(
    self, connector: TableConnector | FileConnector | SparqlConnector
) -> str | None:
    """Return the mapped runtime proxy name for a given connector."""
    return self._connector_to_conn_proxy.get(connector.hash)

get_connectors_for_resource(resource_name)

Return connectors bound to resource_name, in binding order (unique by hash).

Source code in graflo/architecture/contract/bindings/core.py
def get_connectors_for_resource(
    self, resource_name: str
) -> list[TableConnector | FileConnector | SparqlConnector]:
    """Return connectors bound to *resource_name*, in binding order (unique by hash)."""
    result: list[TableConnector | FileConnector | SparqlConnector] = []
    for h in self._resource_to_connector_hashes.get(resource_name, []):
        c = self._connectors_index.get(h)
        if isinstance(c, (TableConnector, FileConnector, SparqlConnector)):
            result.append(c)
    return result

get_staging_conn_proxy(name)

Return conn_proxy for a staging profile name, if declared.

Source code in graflo/architecture/contract/bindings/core.py
def get_staging_conn_proxy(self, name: str) -> str | None:
    """Return ``conn_proxy`` for a staging profile name, if declared."""
    return self._staging_name_to_conn_proxy.get(name)

BoundSourceKind

Bases: BaseEnum

Physical source modality for a bound connector (how rows are retrieved).

This describes the connector-backed access pattern, not the abstract ingestion resource. File format (CSV, JSON, etc.) is chosen by the loader from file extensions.

Attributes:

Name Type Description
FILE

File-based connector (directory + pattern or paths).

SQL_TABLE

SQL table / database-backed connector.

SPARQL

SPARQL / RDF connector (endpoint or local RDF via rdflib).

Source code in graflo/architecture/contract/bindings/connectors.py
class BoundSourceKind(BaseEnum):
    """Physical source modality for a bound connector (how rows are retrieved).

    This describes the connector-backed access pattern, not the abstract
    ingestion resource. File format (CSV, JSON, etc.) is chosen by the loader
    from file extensions.

    Attributes:
        FILE: File-based connector (directory + pattern or paths).
        SQL_TABLE: SQL table / database-backed connector.
        SPARQL: SPARQL / RDF connector (endpoint or local RDF via rdflib).
    """

    FILE = "file"
    SQL_TABLE = "sql_table"
    SPARQL = "sparql"

CoreSchema

Bases: ConfigBaseModel

Logical graph model (A): vertices and edges.

Source code in graflo/architecture/schema/core.py
class CoreSchema(ConfigBaseModel):
    """Logical graph model (A): vertices and edges."""

    vertex_config: VertexConfig = PydanticField(
        ...,
        description="Configuration for vertex collections (vertices, identities, properties).",
    )
    edge_config: EdgeConfig = PydanticField(
        ...,
        description="Configuration for edge collections (edges, weights).",
    )

    @model_validator(mode="after")
    def _init_graph(self) -> CoreSchema:
        self.finish_init()
        return self

    def finish_init(self) -> None:
        self.vertex_config.finish_init()
        self._validate_edge_vertices_defined()
        self.edge_config.finish_init(self.vertex_config)

    def _validate_edge_vertices_defined(self) -> None:
        """Ensure all edge endpoints reference defined vertex names."""
        declared_vertices = self.vertex_config.vertex_set
        edge_vertices = self.edge_config.vertices
        undefined_vertices = edge_vertices - declared_vertices
        if undefined_vertices:
            undefined_vertices_list = sorted(undefined_vertices)
            declared_vertices_list = sorted(declared_vertices)
            raise ValueError(
                "edge_config references undefined vertices: "
                f"{undefined_vertices_list}. "
                f"Declared vertices: {declared_vertices_list}"
            )

    def remove_disconnected_vertices(self) -> set[str]:
        """Remove disconnected vertices and return removed names."""
        connected = self.edge_config.vertices
        disconnected = self.vertex_config.vertex_set - connected
        if disconnected:
            self.vertex_config.remove_vertices(disconnected)
        return disconnected

remove_disconnected_vertices()

Remove disconnected vertices and return removed names.

Source code in graflo/architecture/schema/core.py
def remove_disconnected_vertices(self) -> set[str]:
    """Remove disconnected vertices and return removed names."""
    connected = self.edge_config.vertices
    disconnected = self.vertex_config.vertex_set - connected
    if disconnected:
        self.vertex_config.remove_vertices(disconnected)
    return disconnected

DatabaseProfile

Bases: ConfigBaseModel

Container for DB-only physical features such as secondary indexes.

Source code in graflo/architecture/database_features.py
class DatabaseProfile(ConfigBaseModel):
    """Container for DB-only physical features such as secondary indexes."""

    db_flavor: DBType = PydanticField(
        default=DBType.ARANGO,
        description="Target DB flavor used for physical naming and defaults.",
    )
    target_namespace: str | None = PydanticField(
        default=None,
        description=(
            "Runtime target LPG namespace when the connection config leaves it unset: "
            "Arango/Neo4j/FalkorDB/Memgraph database, TigerGraph graph name, Nebula space. "
            "GraphEngine uses this before falling back to schema.metadata.name."
        ),
    )
    vertex_storage_names: dict[str, str] = PydanticField(
        default_factory=dict,
        description="Physical vertex collection/label names keyed by logical vertex name.",
    )
    vertex_indexes: dict[str, list[Index]] = PydanticField(
        default_factory=dict,
        description="Secondary indexes per vertex name (identity excluded).",
    )
    edge_specs: list[EdgeVariantSpec] = PydanticField(
        default_factory=list,
        description="Unified edge physical specs keyed by edge identity + purpose.",
    )
    default_property_values: DefaultPropertyValues | None = PydanticField(
        default=None,
        description=(
            "Optional per-attribute GSQL DEFAULT values for TigerGraph (and similar) DDL. "
            "Vertex keys are logical vertex names; edge entries match logical (source, target, relation). "
            "Does not change logical LPG types—only physical schema projection."
        ),
    )

    @model_validator(mode="after")
    def _normalize_edge_specs(self) -> "DatabaseProfile":
        def _variant_key(
            spec: EdgeVariantSpec,
        ) -> tuple[str, str, str | None, str | None]:
            return (
                spec.source,
                spec.target,
                spec.relation,
                spec.purpose,
            )

        def _ensure_variant(
            merged: dict[tuple[str, str, str | None, str | None], EdgeVariantSpec],
            *,
            source: str,
            target: str,
            relation: str | None,
            purpose: str | None,
        ) -> EdgeVariantSpec:
            key = (source, target, relation, purpose)
            if key not in merged:
                merged[key] = EdgeVariantSpec(
                    source=source,
                    target=target,
                    relation=relation,
                    purpose=purpose,
                )
            return merged[key]

        merged: dict[tuple[str, str, str | None, str | None], EdgeVariantSpec] = {}

        for item in self.edge_specs:
            variant = _ensure_variant(
                merged,
                source=item.source,
                target=item.target,
                relation=item.relation,
                purpose=item.purpose,
            )
            if item.relation_name is not None:
                variant.relation_name = item.relation_name
            existing = {tuple(ix.fields) for ix in variant.indexes}
            for idx in item.indexes:
                if tuple(idx.fields) not in existing:
                    variant.indexes.append(idx)
            if item.indexes_mode != "inherit" or variant.indexes_mode == "inherit":
                variant.indexes_mode = item.indexes_mode

        object.__setattr__(self, "edge_specs", list(merged.values()))
        return self

    def vertex_property_default(
        self, vertex_name: str, property_name: str
    ) -> Any | None:
        """Return declared default for a vertex property, or None if not specified."""
        dpv = self.default_property_values
        if dpv is None:
            return None
        per_vertex = dpv.vertices.get(vertex_name)
        if per_vertex is None:
            return None
        return per_vertex.get(property_name)

    def has_vertex_property_default(self, vertex_name: str, property_name: str) -> bool:
        dpv = self.default_property_values
        if dpv is None:
            return False
        per_vertex = dpv.vertices.get(vertex_name)
        return per_vertex is not None and property_name in per_vertex

    def edge_property_default(self, edge_id: EdgeId, property_name: str) -> Any | None:
        """Return declared default for an edge attribute, or None if not specified."""
        dpv = self.default_property_values
        if dpv is None or not dpv.edges:
            return None
        source, target, relation = edge_id
        for spec in reversed(dpv.edges):
            if spec.source != source or spec.target != target:
                continue
            if spec.relation != relation:
                continue
            if property_name not in spec.values:
                continue
            return spec.values[property_name]
        return None

    def has_edge_property_default(self, edge_id: EdgeId, property_name: str) -> bool:
        dpv = self.default_property_values
        if dpv is None or not dpv.edges:
            return False
        source, target, relation = edge_id
        for spec in reversed(dpv.edges):
            if spec.source != source or spec.target != target:
                continue
            if spec.relation != relation:
                continue
            if property_name in spec.values:
                return True
        return False

    def _edge_variant_spec(
        self,
        edge_id: EdgeId,
        purpose: str | None = None,
    ) -> EdgeVariantSpec | None:
        for item in self.edge_specs:
            if item.edge_id != edge_id:
                continue
            if item.purpose != purpose:
                continue
            return item
        return None

    def edge_purposes(self, edge_id: EdgeId) -> list[str | None]:
        """Return declared physical purposes for an edge.

        The base variant (`None`) is always included; additional purposes are
        collected from matching edge variant specs.
        """
        purposes: list[str | None] = [None]
        seen: set[str | None] = {None}

        for item in self.edge_specs:
            if item.edge_id != edge_id:
                continue
            if item.purpose not in seen:
                seen.add(item.purpose)
                purposes.append(item.purpose)

        return purposes

    def edge_physical_variants(
        self,
        edge_id: EdgeId,
        *,
        source_storage: str,
        target_storage: str,
    ) -> list[dict[str, str | None | list[Index]]]:
        """Return resolved physical variants (base + purpose copies) for one edge."""
        variants: list[dict[str, str | None | list[Index]]] = []
        for purpose in self.edge_purposes(edge_id):
            variants.append(
                {
                    "purpose": purpose,
                    "storage_name": self.edge_storage_name(
                        edge_id,
                        source_storage=source_storage,
                        target_storage=target_storage,
                        purpose=purpose,
                    ),
                    "graph_name": self.edge_graph_name(
                        edge_id,
                        source_storage=source_storage,
                        target_storage=target_storage,
                        purpose=purpose,
                    ),
                    "indexes": self.edge_secondary_indexes(
                        edge_id,
                        purpose=purpose,
                    ),
                }
            )
        return variants

    def vertex_secondary_indexes(self, vertex_name: str) -> list[Index]:
        return list(self.vertex_indexes.get(vertex_name, []))

    def vertex_storage_name(self, vertex_name: str) -> str:
        return self.vertex_storage_names.get(vertex_name, vertex_name)

    def edge_secondary_indexes(
        self,
        edge_id: EdgeId,
        purpose: str | None = None,
    ) -> list[Index]:
        base_spec = self._edge_variant_spec(edge_id=edge_id, purpose=None)
        base_indexes = list(base_spec.indexes) if base_spec is not None else []

        if purpose is None:
            effective = base_indexes
        else:
            purpose_spec = self._edge_variant_spec(edge_id=edge_id, purpose=purpose)
            if purpose_spec is None:
                effective = base_indexes
            elif purpose_spec.indexes_mode == "replace":
                effective = list(purpose_spec.indexes)
            elif purpose_spec.indexes_mode == "append":
                effective = base_indexes + list(purpose_spec.indexes)
            else:
                effective = base_indexes

        deduped: list[Index] = []
        seen: set[tuple[str, ...]] = set()
        for idx in effective:
            key = tuple(idx.fields)
            if key in seen:
                continue
            seen.add(key)
            deduped.append(idx)
        return deduped

    def has_edge_index_spec(
        self,
        edge_id: EdgeId,
        *,
        purpose: str | None = None,
    ) -> bool:
        """Return True when an exact purpose edge index spec exists."""
        spec = self._edge_variant_spec(edge_id=edge_id, purpose=purpose)
        return spec is not None

    def has_explicit_edge_indexes(
        self,
        edge_id: EdgeId,
        *,
        purpose: str | None = None,
    ) -> bool:
        spec = self._edge_variant_spec(edge_id=edge_id, purpose=purpose)
        return spec is not None and len(spec.indexes) > 0

    def edge_index_spec(
        self,
        edge_id: EdgeId,
        purpose: str | None = None,
    ) -> EdgeVariantSpec | None:
        spec = self._edge_variant_spec(edge_id=edge_id, purpose=purpose)
        if spec is None and purpose is not None:
            spec = self._edge_variant_spec(edge_id=edge_id, purpose=None)
        return spec

    def add_edge_index(
        self,
        edge_id: EdgeId,
        index: Index,
        *,
        purpose: str | None = None,
    ) -> None:
        spec = self._edge_variant_spec(edge_id=edge_id, purpose=purpose)
        if spec is None:
            source, target, relation = edge_id
            spec = EdgeVariantSpec(
                source=source,
                target=target,
                relation=relation,
                purpose=purpose,
            )
            # Auto-added indexes are additive by default.
            spec.indexes_mode = "append" if purpose is not None else "inherit"
            self.edge_specs.append(spec)
        existing = {tuple(ix.fields) for ix in spec.indexes}
        if tuple(index.fields) not in existing:
            spec.indexes.append(index)

    def edge_name_spec(
        self,
        edge_id: EdgeId,
        purpose: str | None = None,
    ) -> EdgeVariantSpec | None:
        spec = self._edge_variant_spec(edge_id=edge_id, purpose=purpose)
        if spec is None and purpose is not None:
            spec = self._edge_variant_spec(edge_id=edge_id, purpose=None)
        return spec

    def set_edge_name_spec(
        self,
        edge_id: EdgeId,
        *,
        relation_name: str | None = None,
        purpose: str | None = None,
    ) -> None:
        spec = self._edge_variant_spec(edge_id=edge_id, purpose=purpose)
        if spec is None:
            source, target, relation = edge_id
            spec = EdgeVariantSpec(
                source=source,
                target=target,
                relation=relation,
                purpose=purpose,
            )
            self.edge_specs.append(spec)
        if relation_name is not None:
            spec.relation_name = relation_name
        if purpose is not None:
            spec.purpose = purpose

    def edge_relation_name(
        self,
        edge_id: EdgeId,
        default_relation: str | None = None,
        purpose: str | None = None,
    ) -> str | None:
        spec = self.edge_name_spec(edge_id, purpose=purpose)
        if spec is not None and spec.relation_name is not None:
            return spec.relation_name
        return default_relation

    def edge_storage_name(
        self,
        edge_id: EdgeId,
        *,
        source_storage: str,
        target_storage: str,
        purpose: str | None = None,
    ) -> str | None:
        spec = self._edge_variant_spec(edge_id, purpose=purpose)
        if self.db_flavor != DBType.ARANGO:
            return None
        tokens = [source_storage, target_storage]
        purpose = spec.purpose if spec is not None else purpose
        if purpose is not None:
            tokens.append(purpose)
        return "_".join(tokens + ["edges"])

    def edge_graph_name(
        self,
        edge_id: EdgeId,
        *,
        source_storage: str,
        target_storage: str,
        purpose: str | None = None,
    ) -> str | None:
        return self.edge_storage_name(
            edge_id,
            source_storage=source_storage,
            target_storage=target_storage,
            purpose=purpose,
        )

edge_physical_variants(edge_id, *, source_storage, target_storage)

Return resolved physical variants (base + purpose copies) for one edge.

Source code in graflo/architecture/database_features.py
def edge_physical_variants(
    self,
    edge_id: EdgeId,
    *,
    source_storage: str,
    target_storage: str,
) -> list[dict[str, str | None | list[Index]]]:
    """Return resolved physical variants (base + purpose copies) for one edge."""
    variants: list[dict[str, str | None | list[Index]]] = []
    for purpose in self.edge_purposes(edge_id):
        variants.append(
            {
                "purpose": purpose,
                "storage_name": self.edge_storage_name(
                    edge_id,
                    source_storage=source_storage,
                    target_storage=target_storage,
                    purpose=purpose,
                ),
                "graph_name": self.edge_graph_name(
                    edge_id,
                    source_storage=source_storage,
                    target_storage=target_storage,
                    purpose=purpose,
                ),
                "indexes": self.edge_secondary_indexes(
                    edge_id,
                    purpose=purpose,
                ),
            }
        )
    return variants

edge_property_default(edge_id, property_name)

Return declared default for an edge attribute, or None if not specified.

Source code in graflo/architecture/database_features.py
def edge_property_default(self, edge_id: EdgeId, property_name: str) -> Any | None:
    """Return declared default for an edge attribute, or None if not specified."""
    dpv = self.default_property_values
    if dpv is None or not dpv.edges:
        return None
    source, target, relation = edge_id
    for spec in reversed(dpv.edges):
        if spec.source != source or spec.target != target:
            continue
        if spec.relation != relation:
            continue
        if property_name not in spec.values:
            continue
        return spec.values[property_name]
    return None

edge_purposes(edge_id)

Return declared physical purposes for an edge.

The base variant (None) is always included; additional purposes are collected from matching edge variant specs.

Source code in graflo/architecture/database_features.py
def edge_purposes(self, edge_id: EdgeId) -> list[str | None]:
    """Return declared physical purposes for an edge.

    The base variant (`None`) is always included; additional purposes are
    collected from matching edge variant specs.
    """
    purposes: list[str | None] = [None]
    seen: set[str | None] = {None}

    for item in self.edge_specs:
        if item.edge_id != edge_id:
            continue
        if item.purpose not in seen:
            seen.add(item.purpose)
            purposes.append(item.purpose)

    return purposes

has_edge_index_spec(edge_id, *, purpose=None)

Return True when an exact purpose edge index spec exists.

Source code in graflo/architecture/database_features.py
def has_edge_index_spec(
    self,
    edge_id: EdgeId,
    *,
    purpose: str | None = None,
) -> bool:
    """Return True when an exact purpose edge index spec exists."""
    spec = self._edge_variant_spec(edge_id=edge_id, purpose=purpose)
    return spec is not None

vertex_property_default(vertex_name, property_name)

Return declared default for a vertex property, or None if not specified.

Source code in graflo/architecture/database_features.py
def vertex_property_default(
    self, vertex_name: str, property_name: str
) -> Any | None:
    """Return declared default for a vertex property, or None if not specified."""
    dpv = self.default_property_values
    if dpv is None:
        return None
    per_vertex = dpv.vertices.get(vertex_name)
    if per_vertex is None:
        return None
    return per_vertex.get(property_name)

Edge

Bases: ConfigBaseModel

Abstract graph edge kind (schema / edge_config only).

Ingestion-only behavior (location filters, relation column, relation from key, etc.) belongs on :class:~graflo.architecture.edge_derivation.EdgeDerivation in pipeline edge steps, not on this model.

Source code in graflo/architecture/schema/edge.py
class Edge(ConfigBaseModel):
    """Abstract graph edge kind (schema / ``edge_config`` only).

    Ingestion-only behavior (location filters, relation column, relation from
    key, etc.) belongs on :class:`~graflo.architecture.edge_derivation.EdgeDerivation`
    in pipeline edge steps, not on this model.
    """

    source: str = PydanticField(
        ...,
        description="Source vertex type name (e.g. user, company).",
    )
    target: str = PydanticField(
        ...,
        description="Target vertex type name (e.g. post, company).",
    )
    relation: str | None = PydanticField(
        default=None,
        description="Relation/edge type name (e.g. Neo4j relationship type). For ArangoDB used as weight.",
    )
    description: str | None = PydanticField(
        default=None,
        description="Optional semantic description of edge intent, direction semantics, and business meaning.",
    )

    identities: list[list[str]] = PydanticField(
        default_factory=list,
        description=(
            "Logical uniqueness keys for this edge: each key names fields that, "
            "together with the resolved source and target vertex ids, must be unique "
            "(``source`` / ``target`` tokens stand for endpoints; other tokens are edge "
            "attributes). Multiple keys define multiple uniqueness constraints. "
            "Non-endpoint tokens are merged into ``properties`` during "
            ":meth:`finish_init` if not already declared (same idea as vertex identity)."
        ),
    )
    properties: list[Field] = PydanticField(
        default_factory=list,
        description=(
            "Edge property names/types (relationship properties). "
            "Vertex-derived bindings belong in ingestion (:class:`~graflo.architecture.contract."
            "declarations.edge_derivation_registry.EdgeDerivationRegistry`)."
        ),
    )

    type: EdgeType = PydanticField(
        default=EdgeType.DIRECT,
        description="Edge type: DIRECT (created during ingestion) or INDIRECT (pre-existing collection).",
    )

    by: str | None = PydanticField(
        default=None,
        description="For INDIRECT edges: vertex type name used to define the edge.",
    )

    @field_validator("properties", mode="before")
    @classmethod
    def normalize_properties(cls, v: Any) -> Any:
        if not isinstance(v, list):
            return v
        return [_normalize_direct_item(item) for item in v]

    @field_validator("identities", mode="before")
    @classmethod
    def normalize_identities(cls, v: Any) -> Any:
        if v is None:
            return []
        if isinstance(v, list):
            # identities can be provided as [["source", "target"], ["source", "target", "pub_id"]]
            if all(isinstance(item, str) for item in v):
                return [list(v)]
            normalized: list[list[str]] = []
            for item in v:
                if isinstance(item, tuple):
                    item = list(item)
                if not isinstance(item, list) or not all(
                    isinstance(token, str) for token in item
                ):
                    raise ValueError("edge identities must be list[list[str]]")
                normalized.append(cast(list[str], item))
            return normalized
        raise ValueError("edge identities must be list[list[str]]")

    @model_validator(mode="after")
    def normalize_identity_keys(self) -> "Edge":
        deduped_keys: list[list[str]] = []
        seen_keys: set[tuple[str, ...]] = set()
        for key in self.identities:
            deduped_tokens: list[str] = []
            for token in key:
                if token not in deduped_tokens:
                    deduped_tokens.append(token)
            key_tuple = tuple(deduped_tokens)
            if key_tuple and key_tuple not in seen_keys:
                seen_keys.add(key_tuple)
                deduped_keys.append(deduped_tokens)
        object.__setattr__(self, "identities", deduped_keys)
        return self

    def finish_init(self, vertex_config: VertexConfig):
        """Complete logical edge initialization with vertex configuration."""
        _ = vertex_config
        self._merge_identity_fields_into_properties()
        self._validate_identity_tokens()

    def _merge_identity_fields_into_properties(self) -> None:
        """Append :class:`Field` entries for identity tokens not already declared.

        Endpoint tokens ``source`` and ``target`` are not edge properties; every
        other token (including ``relation``) is materialized like vertex identity.
        """
        endpoint_tokens = frozenset({"source", "target"})
        seen_names = {f.name for f in self.properties}
        augmented = list(self.properties)
        for key in self.identities:
            for token in key:
                if token in endpoint_tokens:
                    continue
                if token not in seen_names:
                    augmented.append(Field(name=token, type=None))
                    seen_names.add(token)
        object.__setattr__(self, "properties", augmented)

    def _validate_identity_tokens(self) -> None:
        """Validate edge identity keys against reserved tokens and declared edge fields."""
        reserved = {"source", "target", "relation"}
        direct_weight_fields = set(self.property_names)
        # Identity token "relation" maps to the default TigerGraph attribute name
        # when physical fields are declared (see EdgeConfigDBAware.effective_weights).
        logical_relation_attr = {DEFAULT_TIGERGRAPH_RELATION_WEIGHTNAME}
        allowed_fields = reserved | direct_weight_fields | logical_relation_attr
        unknown_by_key = [
            [token for token in key if token not in allowed_fields]
            for key in self.identities
        ]
        unknown_by_key = [u for u in unknown_by_key if u]
        if unknown_by_key:
            raise ValueError(
                "Edge identity key fields must use reserved tokens "
                "('source', 'target', 'relation') or declared edge property / relation fields. "
                f"Edge ({self.source}, {self.target}, {self.relation}) has unknown identity fields: {unknown_by_key}"
            )

    @property
    def edge_name_dyad(self):
        """Get the edge name as a dyad (source, target).

        Returns:
            tuple[str, str]: Source and target vertex names
        """
        return self.source, self.target

    @property
    def edge_id(self) -> EdgeId:
        """Alias for edge_id."""
        return self.source, self.target, self.relation

    @property
    def property_names(self) -> list[str]:
        """Declared materialized edge property names."""
        return [f.name for f in self.properties]

edge_id property

Alias for edge_id.

edge_name_dyad property

Get the edge name as a dyad (source, target).

Returns:

Type Description

tuple[str, str]: Source and target vertex names

property_names property

Declared materialized edge property names.

finish_init(vertex_config)

Complete logical edge initialization with vertex configuration.

Source code in graflo/architecture/schema/edge.py
def finish_init(self, vertex_config: VertexConfig):
    """Complete logical edge initialization with vertex configuration."""
    _ = vertex_config
    self._merge_identity_fields_into_properties()
    self._validate_identity_tokens()

EdgeConfig

Bases: ConfigBaseModel

Configuration for managing collections of edges.

This class manages a collection of edges, providing methods for accessing and manipulating edge configurations.

Attributes:

Name Type Description
edges list[Edge]

List of edge configurations

Source code in graflo/architecture/schema/edge.py
class EdgeConfig(ConfigBaseModel):
    """Configuration for managing collections of edges.

    This class manages a collection of edges, providing methods for accessing
    and manipulating edge configurations.

    Attributes:
        edges: List of edge configurations
    """

    edges: list[Edge] = PydanticField(
        default_factory=list,
        description="List of edge definitions (source, target, identities, properties, relation, etc.).",
    )
    _edges_map: dict[EdgeId, Edge] = PrivateAttr()

    @model_validator(mode="after")
    def _build_edges_map(self) -> EdgeConfig:
        """Build internal mapping of edge IDs to edge configurations."""
        object.__setattr__(self, "_edges_map", {e.edge_id: e for e in self.edges})
        return self

    @staticmethod
    def _map_key(edge: Edge) -> EdgeId:
        return edge.edge_id

    def finish_init(self, vc: VertexConfig):
        """Complete initialization of all logical edges."""
        for e in self.edges:
            e.finish_init(vertex_config=vc)

    def values(self) -> Iterator[Edge]:
        """Iterate over edge configurations."""
        return iter(self._edges_map.values())

    def items(self) -> Iterator[tuple[EdgeId, Edge]]:
        """Iterate over ``(edge_id, edge)`` pairs."""
        return iter(self._edges_map.items())

    def __contains__(self, item: EdgeId | EdgeId | Edge):
        """Check if edge exists in configuration.

        Args:
            item: Edge ID or Edge instance to check

        Returns:
            bool: True if edge exists, False otherwise
        """
        if isinstance(item, Edge):
            return self._map_key(item) in self._edges_map
        if isinstance(item, tuple) and len(item) == 3:
            return item in self._edges_map
        return False

    def update_edges(
        self,
        edge: Edge,
        vertex_config: VertexConfig,
    ):
        """Update edge configuration.

        Args:
            edge: Edge configuration to update
            vertex_config: Vertex configuration
        """
        edge_key = self._map_key(edge)
        if edge_key in self._edges_map:
            self._edges_map[edge_key].update(edge)
        else:
            self._edges_map[edge_key] = edge
            self.edges.append(edge)

        self._edges_map[edge_key].finish_init(
            vertex_config=vertex_config,
        )

    def edge_for(self, edge_id: EdgeId) -> Edge:
        """Return the config-owned :class:`Edge` instance for ``edge_id`` after merges.

        Pipeline actors may construct a partial :class:`Edge` that is merged into the
        schema edge via :meth:`update_edges`. Callers that need properties, identities,
        etc. must use this object (same reference as in :meth:`items`), not the
        pre-merge actor copy.
        """
        return self._edges_map[edge_id]

    @property
    def vertices(self):
        """Get set of vertex names involved in edges.

        Returns:
            set[str]: Set of vertex names
        """
        return {e.source for e in self.edges} | {e.target for e in self.edges}

vertices property

Get set of vertex names involved in edges.

Returns:

Type Description

set[str]: Set of vertex names

__contains__(item)

Check if edge exists in configuration.

Parameters:

Name Type Description Default
item EdgeId | EdgeId | Edge

Edge ID or Edge instance to check

required

Returns:

Name Type Description
bool

True if edge exists, False otherwise

Source code in graflo/architecture/schema/edge.py
def __contains__(self, item: EdgeId | EdgeId | Edge):
    """Check if edge exists in configuration.

    Args:
        item: Edge ID or Edge instance to check

    Returns:
        bool: True if edge exists, False otherwise
    """
    if isinstance(item, Edge):
        return self._map_key(item) in self._edges_map
    if isinstance(item, tuple) and len(item) == 3:
        return item in self._edges_map
    return False

edge_for(edge_id)

Return the config-owned :class:Edge instance for edge_id after merges.

Pipeline actors may construct a partial :class:Edge that is merged into the schema edge via :meth:update_edges. Callers that need properties, identities, etc. must use this object (same reference as in :meth:items), not the pre-merge actor copy.

Source code in graflo/architecture/schema/edge.py
def edge_for(self, edge_id: EdgeId) -> Edge:
    """Return the config-owned :class:`Edge` instance for ``edge_id`` after merges.

    Pipeline actors may construct a partial :class:`Edge` that is merged into the
    schema edge via :meth:`update_edges`. Callers that need properties, identities,
    etc. must use this object (same reference as in :meth:`items`), not the
    pre-merge actor copy.
    """
    return self._edges_map[edge_id]

finish_init(vc)

Complete initialization of all logical edges.

Source code in graflo/architecture/schema/edge.py
def finish_init(self, vc: VertexConfig):
    """Complete initialization of all logical edges."""
    for e in self.edges:
        e.finish_init(vertex_config=vc)

items()

Iterate over (edge_id, edge) pairs.

Source code in graflo/architecture/schema/edge.py
def items(self) -> Iterator[tuple[EdgeId, Edge]]:
    """Iterate over ``(edge_id, edge)`` pairs."""
    return iter(self._edges_map.items())

update_edges(edge, vertex_config)

Update edge configuration.

Parameters:

Name Type Description Default
edge Edge

Edge configuration to update

required
vertex_config VertexConfig

Vertex configuration

required
Source code in graflo/architecture/schema/edge.py
def update_edges(
    self,
    edge: Edge,
    vertex_config: VertexConfig,
):
    """Update edge configuration.

    Args:
        edge: Edge configuration to update
        vertex_config: Vertex configuration
    """
    edge_key = self._map_key(edge)
    if edge_key in self._edges_map:
        self._edges_map[edge_key].update(edge)
    else:
        self._edges_map[edge_key] = edge
        self.edges.append(edge)

    self._edges_map[edge_key].finish_init(
        vertex_config=vertex_config,
    )

values()

Iterate over edge configurations.

Source code in graflo/architecture/schema/edge.py
def values(self) -> Iterator[Edge]:
    """Iterate over edge configurations."""
    return iter(self._edges_map.values())

EdgeConfigDBAware

DB-aware projection wrapper for EdgeConfig.

Source code in graflo/architecture/schema/db_aware.py
class EdgeConfigDBAware:
    """DB-aware projection wrapper for `EdgeConfig`."""

    def __init__(
        self,
        logical: EdgeConfig,
        vertex_config: VertexConfigDBAware,
        database_features: DatabaseProfile,
        ingestion_overlay: EdgeIngestionOverlay | None = None,
    ):
        self.logical = logical
        self.vertex_config = vertex_config
        self.db_profile = database_features
        self.ingestion_overlay = ingestion_overlay

    def _uses_relation_from_key(self, edge_id: EdgeId) -> bool:
        if self.ingestion_overlay is not None:
            return self.ingestion_overlay.uses_relation_from_key(edge_id)
        return False

    @property
    def edges(self) -> list[Edge]:
        return self.logical.edges

    def __iter__(self) -> Iterator[Edge]:
        return self.values()

    def values(self) -> Iterator[Edge]:
        return self.logical.values()

    def items(self) -> Iterator[tuple[EdgeId, Edge]]:
        return self.logical.items()

    @property
    def vertices(self):
        return self.logical.vertices

    def relation_dbname(self, edge: Edge) -> str | None:
        relation = edge.relation
        if self.db_profile.db_flavor == DBType.TIGERGRAPH and relation is None:
            relation = DEFAULT_TIGERGRAPH_RELATION
        return self.db_profile.edge_relation_name(
            edge.edge_id,
            default_relation=relation,
        )

    def effective_weights(self, edge: Edge) -> WeightConfig | None:
        def _as_weight_config() -> WeightConfig | None:
            if not edge.properties:
                return None
            return WeightConfig(
                direct=[f.model_copy(deep=True) for f in edge.properties],
            )

        if self.db_profile.db_flavor != DBType.TIGERGRAPH:
            return _as_weight_config()

        # Typed TigerGraph edge: per-row relation label stored under a stable attribute.
        needs_relation_attr = edge.relation is None or self._uses_relation_from_key(
            edge.edge_id
        )
        if not needs_relation_attr:
            return _as_weight_config()

        base = _as_weight_config() or WeightConfig()
        if DEFAULT_TIGERGRAPH_RELATION_WEIGHTNAME not in base.direct_names:
            base.direct.append(
                Field(
                    name=DEFAULT_TIGERGRAPH_RELATION_WEIGHTNAME, type=FieldType.STRING
                )
            )
        return base

    def runtime(self, edge: Edge) -> EdgeRuntime:
        needs_tg_relation_attr = self.db_profile.db_flavor == DBType.TIGERGRAPH and (
            edge.relation is None or self._uses_relation_from_key(edge.edge_id)
        )
        runtime = EdgeRuntime(
            edge=edge,
            source_storage=self.vertex_config.vertex_dbname(edge.source),
            target_storage=self.vertex_config.vertex_dbname(edge.target),
            relation_name=self.relation_dbname(edge),
            store_extracted_relation_as_weight=needs_tg_relation_attr,
            effective_relation_field=(
                DEFAULT_TIGERGRAPH_RELATION_WEIGHTNAME
                if needs_tg_relation_attr
                else None
            ),
            db_profile=self.db_profile,
        )
        return runtime

    def relationship_merge_property_names(self, edge: Edge) -> list[str]:
        """Relationship properties used for edge upsert/MERGE keys (per backend).

        Uniqueness is ``(source_id, *identity_fields, target_id)`` for the **first**
        logical ``identities`` key (endpoints are matched separately on vertices).
        Additional ``identities`` keys are compiled into separate unique indexes
        via :meth:`compile_identity_indexes` but do not change the writer merge key.

        If that key yields no relationship fields, or ``identities`` is empty,
        falls back to all declared edge attribute names.
        """
        db_flavor = self.db_profile.db_flavor
        if edge.identities:
            props = self._identity_tokens_to_relationship_properties(
                edge.identities[0], db_flavor
            )
            if props:
                return props
        if edge.property_names:
            return list(edge.property_names)
        return []

    @staticmethod
    def _identity_tokens_to_relationship_properties(
        identity_key: list[str], db_flavor: DBType
    ) -> list[str]:
        fields: list[str] = []
        for token in identity_key:
            if token in ("source", "target"):
                continue
            if token == "relation":
                if db_flavor != DBType.TIGERGRAPH:
                    fields.append("relation")
                continue
            fields.append(token)
        deduped: list[str] = []
        for field in fields:
            if field not in deduped:
                deduped.append(field)
        return deduped

    def compile_identity_indexes(self) -> None:
        db_flavor = self.db_profile.db_flavor
        for edge in self.logical.edges:
            for identity_key in edge.identities:
                identity_fields = self._identity_key_index_fields(
                    identity_key, db_flavor
                )
                if not identity_fields:
                    continue
                fields, unique = self._normalize_edge_identity_index(
                    identity_fields, db_flavor
                )
                if not fields:
                    continue
                self.db_profile.add_edge_index(
                    edge.edge_id,
                    Index(fields=fields, unique=unique),
                    purpose=None,
                )

    def _identity_key_index_fields(
        self, identity_key: list[str], db_flavor: DBType
    ) -> list[str]:
        fields: list[str] = []
        for token in identity_key:
            if token == "source":
                if db_flavor == DBType.ARANGO:
                    fields.append("_from")
            elif token == "target":
                if db_flavor == DBType.ARANGO:
                    fields.append("_to")
            elif token == "relation":
                if db_flavor != DBType.TIGERGRAPH:
                    fields.append("relation")
            else:
                fields.append(token)
        deduped: list[str] = []
        for field in fields:
            if field not in deduped:
                deduped.append(field)
        return deduped

    @staticmethod
    def _normalize_edge_identity_index(
        fields: list[str], db_flavor: DBType
    ) -> tuple[list[str], bool]:
        """Map logical edge identity to physical index fields and DB uniqueness.

        Logical uniqueness is always ``(source, *relationship_fields, target)``.

        * **ArangoDB** — Edge documents carry ``_from`` / ``_to``. Unique persistent
          indexes must include them before other fields, even when the YAML
          ``identities`` entry lists only relationship tokens (e.g. ``_role``).
        * **Neo4j, FalkorDB, Memgraph, Nebula** — Indexed columns are relationship /
          edge-type properties only; they cannot express endpoint scope. We still
          register the property fields for lookups but set ``unique=False`` so the
          database is not asked to enforce a misleading global uniqueness on those
          properties alone. (Application MERGE / ingest semantics remain authoritative.)
        * **TigerGraph** — Edge secondary indexes are not applied by the driver today;
          fields are kept for profiling; uniqueness is preserved for consistency.
        """
        rest = [f for f in fields if f not in ("_from", "_to")]
        if db_flavor == DBType.ARANGO:
            return (["_from", "_to", *rest], True)
        if db_flavor in (
            DBType.NEO4J,
            DBType.FALKORDB,
            DBType.MEMGRAPH,
            DBType.NEBULA,
        ):
            return (fields, False)
        return (fields, True)

relationship_merge_property_names(edge)

Relationship properties used for edge upsert/MERGE keys (per backend).

Uniqueness is (source_id, *identity_fields, target_id) for the first logical identities key (endpoints are matched separately on vertices). Additional identities keys are compiled into separate unique indexes via :meth:compile_identity_indexes but do not change the writer merge key.

If that key yields no relationship fields, or identities is empty, falls back to all declared edge attribute names.

Source code in graflo/architecture/schema/db_aware.py
def relationship_merge_property_names(self, edge: Edge) -> list[str]:
    """Relationship properties used for edge upsert/MERGE keys (per backend).

    Uniqueness is ``(source_id, *identity_fields, target_id)`` for the **first**
    logical ``identities`` key (endpoints are matched separately on vertices).
    Additional ``identities`` keys are compiled into separate unique indexes
    via :meth:`compile_identity_indexes` but do not change the writer merge key.

    If that key yields no relationship fields, or ``identities`` is empty,
    falls back to all declared edge attribute names.
    """
    db_flavor = self.db_profile.db_flavor
    if edge.identities:
        props = self._identity_tokens_to_relationship_properties(
            edge.identities[0], db_flavor
        )
        if props:
            return props
    if edge.property_names:
        return list(edge.property_names)
    return []

FieldType

Bases: BaseEnum

Supported field types for graph databases.

These types are primarily used for TigerGraph, which requires explicit field types. Other databases (ArangoDB, Neo4j) may use different type systems or not require types.

Attributes:

Name Type Description
INT

Integer type

UINT

Unsigned integer type

FLOAT

Floating point type

DOUBLE

Double precision floating point type

BOOL

Boolean type

STRING

String type

DATETIME

DateTime type

Source code in graflo/architecture/schema/vertex.py
class FieldType(BaseEnum):
    """Supported field types for graph databases.

    These types are primarily used for TigerGraph, which requires explicit field types.
    Other databases (ArangoDB, Neo4j) may use different type systems or not require types.

    Attributes:
        INT: Integer type
        UINT: Unsigned integer type
        FLOAT: Floating point type
        DOUBLE: Double precision floating point type
        BOOL: Boolean type
        STRING: String type
        DATETIME: DateTime type
    """

    INT = "INT"
    UINT = "UINT"
    FLOAT = "FLOAT"
    DOUBLE = "DOUBLE"
    BOOL = "BOOL"
    STRING = "STRING"
    DATETIME = "DATETIME"

FileConnector

Bases: ResourceConnector

Connector for matching files.

Attributes:

Name Type Description
regex str | None

Regular expression pattern for matching filenames

sub_path Path

Path to search for matching files (default: "./")

date_field str | None

Name of the date field to filter on (for date-based filtering)

date_filter str | None

SQL-style date filter condition (e.g., "> '2020-10-10'")

date_range_start str | None

Start date for range filtering (e.g., "2015-11-11")

date_range_days int | None

Number of days after start date (used with date_range_start)

Source code in graflo/architecture/contract/bindings/connectors.py
class FileConnector(ResourceConnector):
    """Connector for matching files.

    Attributes:
        regex: Regular expression pattern for matching filenames
        sub_path: Path to search for matching files (default: "./")
        date_field: Name of the date field to filter on (for date-based filtering)
        date_filter: SQL-style date filter condition (e.g., "> '2020-10-10'")
        date_range_start: Start date for range filtering (e.g., "2015-11-11")
        date_range_days: Number of days after start date (used with date_range_start)
    """

    regex: str | None = None
    sub_path: pathlib.Path = Field(default_factory=lambda: pathlib.Path("./"))
    date_field: str | None = None
    date_filter: str | None = None
    date_range_start: str | None = None
    date_range_days: int | None = None

    @model_validator(mode="after")
    def _validate_file_connector(self) -> Self:
        """Ensure sub_path is a Path and validate date filtering parameters."""
        if not isinstance(self.sub_path, pathlib.Path):
            object.__setattr__(self, "sub_path", pathlib.Path(self.sub_path))
        if (self.date_filter or self.date_range_start) and not self.date_field:
            raise ValueError(
                "date_field is required when using date_filter or date_range_start"
            )
        if self.date_range_days is not None and not self.date_range_start:
            raise ValueError("date_range_start is required when using date_range_days")
        return self

    def matches(self, resource_identifier: str) -> bool:
        """Check if connector matches a filename.

        Args:
            resource_identifier: Filename to match

        Returns:
            bool: True if connector matches
        """
        if self.regex is None:
            return False
        return bool(re.match(self.regex, resource_identifier))

    def bound_source_kind(self) -> BoundSourceKind:
        """File connector always uses ``BoundSourceKind.FILE``."""
        return BoundSourceKind.FILE

bound_source_kind()

File connector always uses BoundSourceKind.FILE.

Source code in graflo/architecture/contract/bindings/connectors.py
def bound_source_kind(self) -> BoundSourceKind:
    """File connector always uses ``BoundSourceKind.FILE``."""
    return BoundSourceKind.FILE

matches(resource_identifier)

Check if connector matches a filename.

Parameters:

Name Type Description Default
resource_identifier str

Filename to match

required

Returns:

Name Type Description
bool bool

True if connector matches

Source code in graflo/architecture/contract/bindings/connectors.py
def matches(self, resource_identifier: str) -> bool:
    """Check if connector matches a filename.

    Args:
        resource_identifier: Filename to match

    Returns:
        bool: True if connector matches
    """
    if self.regex is None:
        return False
    return bool(re.match(self.regex, resource_identifier))

GraphManifest

Bases: ConfigBaseModel

Canonical config contract for graph schema, ingestion, and bindings.

Source code in graflo/architecture/contract/manifest.py
class GraphManifest(ConfigBaseModel):
    """Canonical config contract for graph schema, ingestion, and bindings."""

    model_config = ConfigDict(populate_by_name=True)

    graph_schema: Schema | None = PydanticField(
        default=None,
        description="Logical graph schema contract.",
        validation_alias=AliasChoices("schema", "graph_schema"),
        serialization_alias="schema",
    )
    ingestion_model: IngestionModel | None = PydanticField(
        default=None,
        description="Ingestion resources and transforms.",
    )
    bindings: Bindings | None = PydanticField(
        default=None,
        description="Bindings mapping resources to concrete data sources.",
    )

    @classmethod
    def from_config(cls, data: dict[str, Any]) -> "GraphManifest":
        """Build a manifest from a Python mapping payload."""
        return cls.from_dict(data)

    @model_validator(mode="after")
    def _validate_manifest(self) -> "GraphManifest":
        if (
            self.graph_schema is None
            and self.ingestion_model is None
            and self.bindings is None
        ):
            raise ValueError(
                "GraphManifest requires at least one block: "
                "schema, ingestion_model, or bindings."
            )
        return self

    def finish_init(
        self,
        *,
        strict_references: bool = False,
        dynamic_edge_feedback: bool = False,
    ) -> None:
        """Initialize model internals and cross-block runtime links."""
        if self.graph_schema is not None:
            self.graph_schema.finish_init()
        if self.graph_schema is not None and self.ingestion_model is not None:
            self.ingestion_model.finish_init(
                self.graph_schema.core_schema,
                strict_references=strict_references,
                dynamic_edge_feedback=dynamic_edge_feedback,
                target_db_flavor=self.graph_schema.db_profile.db_flavor,
            )

    def require_schema(self) -> Schema:
        if self.graph_schema is None:
            raise ValueError("GraphManifest is missing required 'schema' block.")
        return self.graph_schema

    def require_ingestion_model(self) -> IngestionModel:
        if self.ingestion_model is None:
            raise ValueError(
                "GraphManifest is missing required 'ingestion_model' block."
            )
        return self.ingestion_model

    def require_bindings(self) -> Bindings:
        if self.bindings is None:
            raise ValueError("GraphManifest is missing required 'bindings' block.")
        return self.bindings

finish_init(*, strict_references=False, dynamic_edge_feedback=False)

Initialize model internals and cross-block runtime links.

Source code in graflo/architecture/contract/manifest.py
def finish_init(
    self,
    *,
    strict_references: bool = False,
    dynamic_edge_feedback: bool = False,
) -> None:
    """Initialize model internals and cross-block runtime links."""
    if self.graph_schema is not None:
        self.graph_schema.finish_init()
    if self.graph_schema is not None and self.ingestion_model is not None:
        self.ingestion_model.finish_init(
            self.graph_schema.core_schema,
            strict_references=strict_references,
            dynamic_edge_feedback=dynamic_edge_feedback,
            target_db_flavor=self.graph_schema.db_profile.db_flavor,
        )

from_config(data) classmethod

Build a manifest from a Python mapping payload.

Source code in graflo/architecture/contract/manifest.py
@classmethod
def from_config(cls, data: dict[str, Any]) -> "GraphManifest":
    """Build a manifest from a Python mapping payload."""
    return cls.from_dict(data)

GraphMetadata

Bases: ConfigBaseModel

Schema metadata and versioning information.

Holds metadata about the schema, including its name, version, and description. Used for schema identification and versioning. Suitable for LLM-generated schema constituents.

Source code in graflo/architecture/schema/metadata.py
class GraphMetadata(ConfigBaseModel):
    """Schema metadata and versioning information.

    Holds metadata about the schema, including its name, version, and
    description.  Used for schema identification and versioning.
    Suitable for LLM-generated schema constituents.
    """

    name: str = PydanticField(
        ...,
        description="Name of the schema (e.g. graph or database identifier).",
    )
    version: str | None = PydanticField(
        default=None,
        description="Semantic version of the schema (e.g. '1.0.0', '2.1.3-beta+build.42').",
    )
    description: str | None = PydanticField(
        default=None,
        description="Optional human-readable description of the schema.",
    )

    @field_validator("version")
    @classmethod
    def _validate_semver(cls, v: str | None) -> str | None:
        if v is not None and not _SEMVER_RE.match(v):
            raise ValueError(
                f"version '{v}' is not a valid semantic version "
                f"(expected MAJOR.MINOR.PATCH[-prerelease][+build])"
            )
        return v

Index

Bases: ConfigBaseModel

Configuration for database indexes.

Attributes:

Name Type Description
name str | None

Optional name of the index

fields list[str]

List of fields to index

unique bool

Whether the index enforces uniqueness

type IndexType

Type of index to create

deduplicate bool

Whether to deduplicate index entries

sparse bool

Whether to create a sparse index

exclude_edge_endpoints bool

Whether to exclude edge endpoints from index

Source code in graflo/architecture/graph_types.py
class Index(ConfigBaseModel):
    """Configuration for database indexes.

    Attributes:
        name: Optional name of the index
        fields: List of fields to index
        unique: Whether the index enforces uniqueness
        type: Type of index to create
        deduplicate: Whether to deduplicate index entries
        sparse: Whether to create a sparse index
        exclude_edge_endpoints: Whether to exclude edge endpoints from index
    """

    name: str | None = Field(
        default=None,
        description="Optional index name. For edges, can reference a vertex name for composite fields.",
    )
    fields: list[str] = Field(
        default_factory=list,
        description="List of field names included in this index.",
    )
    unique: bool = Field(
        default=True,
        description="If True, index enforces uniqueness on the field combination.",
    )
    type: IndexType = Field(
        default=IndexType.PERSISTENT,
        description="Index type (PERSISTENT, HASH, SKIPLIST, FULLTEXT).",
    )
    deduplicate: bool = Field(
        default=True,
        description="Whether to deduplicate index entries (e.g. ArangoDB).",
    )
    sparse: bool = Field(
        default=False,
        description="If True, create a sparse index (exclude null/missing values).",
    )
    exclude_edge_endpoints: bool = Field(
        default=False,
        description="If True, do not add _from/_to to edge index (e.g. ArangoDB).",
    )

    def __iter__(self):
        """Iterate over the indexed fields."""
        return iter(self.fields)

    def db_form(self, db_type: DBType) -> dict:
        """Convert index configuration to database-specific format.

        Args:
            db_type: Type of database (ARANGO or NEO4J)

        Returns:
            Dictionary of index configuration in database-specific format

        Raises:
            ValueError: If db_type is not supported
        """
        r = dict(self.to_dict())
        if db_type == DBType.ARANGO:
            r.pop("name", None)
            r.pop("exclude_edge_endpoints", None)
        return r

__iter__()

Iterate over the indexed fields.

Source code in graflo/architecture/graph_types.py
def __iter__(self):
    """Iterate over the indexed fields."""
    return iter(self.fields)

db_form(db_type)

Convert index configuration to database-specific format.

Parameters:

Name Type Description Default
db_type DBType

Type of database (ARANGO or NEO4J)

required

Returns:

Type Description
dict

Dictionary of index configuration in database-specific format

Raises:

Type Description
ValueError

If db_type is not supported

Source code in graflo/architecture/graph_types.py
def db_form(self, db_type: DBType) -> dict:
    """Convert index configuration to database-specific format.

    Args:
        db_type: Type of database (ARANGO or NEO4J)

    Returns:
        Dictionary of index configuration in database-specific format

    Raises:
        ValueError: If db_type is not supported
    """
    r = dict(self.to_dict())
    if db_type == DBType.ARANGO:
        r.pop("name", None)
        r.pop("exclude_edge_endpoints", None)
    return r

IngestionModel

Bases: ConfigBaseModel

Ingestion model (C): resources and transform registry.

Source code in graflo/architecture/contract/declarations/ingestion_model/model.py
class IngestionModel(ConfigBaseModel):
    """Ingestion model (C): resources and transform registry."""

    edges_on_duplicate: Literal["ignore", "upsert"] = PydanticField(
        default="ignore",
        description=(
            "How batch edge writes tolerate an already-matching edge. Passed through to "
            ":meth:`~graflo.db.conn.Connection.insert_edges_batch` where the target backend "
            "supports it. Today ArangoDB maps ``ignore`` to INSERT with ignoreErrors and "
            "``upsert`` to AQL UPSERT (with schema merge keys as ``uniq_weight_fields`` when "
            "present). Other databases may interpret the same values later."
        ),
    )
    resources: list[Resource] = PydanticField(
        default_factory=list,
        description="List of resource definitions (data pipelines mapping to vertices/edges).",
    )
    transforms: list[ProtoTransform] = PydanticField(
        default_factory=list,
        description="List of named transforms available to resources.",
    )

    _resources: dict[str, Resource] = PrivateAttr()
    _transforms: dict[str, ProtoTransform] = PrivateAttr(default_factory=dict)
    _combined_edge_derivation: EdgeDerivationRegistry = PrivateAttr(
        default_factory=EdgeDerivationRegistry
    )

    @model_validator(mode="after")
    def _init_model(self) -> IngestionModel:
        """Build transform and resource lookup maps."""
        self._rebuild_runtime_state()
        return self

    def _rebuild_resource_map(self) -> None:
        """Validate resource name uniqueness and refresh lookup map."""
        names = [r.name for r in self.resources]
        c = Counter(names)
        for k, v in c.items():
            if v > 1:
                raise ValueError(f"resource name {k} used {v} times")
        object.__setattr__(self, "_resources", {r.name: r for r in self.resources})

    def _rebuild_transform_map(self) -> None:
        """Validate transform names and refresh name lookup map."""
        missing_names = [idx for idx, t in enumerate(self.transforms) if not t.name]
        if missing_names:
            raise ValueError(
                "All ingestion transforms must define a non-empty name. "
                f"Missing at indexes: {missing_names}"
            )

        transform_names = [t.name for t in self.transforms if t.name is not None]
        name_counts = Counter(transform_names)
        duplicates = sorted([name for name, count in name_counts.items() if count > 1])
        if duplicates:
            raise ValueError(f"Duplicate ingestion transform names found: {duplicates}")

        object.__setattr__(
            self,
            "_transforms",
            {t.name: t for t in self.transforms if t.name is not None},
        )

    def finish_init(
        self,
        core_schema: CoreSchema,
        *,
        strict_references: bool = False,
        dynamic_edge_feedback: bool = False,
        allowed_vertex_names: set[str] | None = None,
        target_db_flavor: DBType | None = None,
    ) -> None:
        """Initialize resources against graph model and transform library."""
        self._rebuild_runtime_state()
        for r in self.resources:
            r.finish_init(
                vertex_config=core_schema.vertex_config,
                edge_config=core_schema.edge_config,
                transforms=self._transforms,
                strict_references=strict_references,
                dynamic_edge_feedback=dynamic_edge_feedback,
                allowed_vertex_names=allowed_vertex_names,
                target_db_flavor=target_db_flavor,
            )

    def _rebuild_runtime_state(self) -> None:
        """Rebuild transform and resource lookup maps."""
        self._rebuild_transform_map()
        self._rebuild_resource_map()

    def fetch_resource(self, name: str | None = None) -> Resource:
        """Fetch a resource by name or get the first available resource.

        Args:
            name: Optional name of the resource to fetch

        Returns:
            Resource: The requested resource

        Raises:
            ValueError: If the requested resource is not found or if no resources exist
        """
        _current_resource = None

        if name is not None:
            if name in self._resources:
                _current_resource = self._resources[name]
            else:
                raise ValueError(f"Resource {name} not found")
        else:
            if self._resources:
                _current_resource = self.resources[0]
            else:
                raise ValueError("Empty resource container :(")
        return _current_resource

    def prune_to_graph(
        self, core_schema: CoreSchema, disconnected: set[str] | None = None
    ) -> None:
        """Drop resource actors that reference disconnected vertices."""
        if disconnected is None:
            disconnected = (
                core_schema.vertex_config.vertex_set - core_schema.edge_config.vertices
            )
        if not disconnected:
            return

        def _mentions_disconnected(wrapper) -> bool:
            return bool(wrapper.actor.references_vertices() & disconnected)

        to_drop: list[Resource] = []
        for resource in self.resources:
            root = resource.root
            if _mentions_disconnected(root):
                to_drop.append(resource)
                continue
            root.remove_descendants_if(_mentions_disconnected)
            if not any(a.references_vertices() for a in root.collect_actors()):
                to_drop.append(resource)

        for r in to_drop:
            self.resources.remove(r)
            self._resources.pop(r.name, None)

fetch_resource(name=None)

Fetch a resource by name or get the first available resource.

Parameters:

Name Type Description Default
name str | None

Optional name of the resource to fetch

None

Returns:

Name Type Description
Resource Resource

The requested resource

Raises:

Type Description
ValueError

If the requested resource is not found or if no resources exist

Source code in graflo/architecture/contract/declarations/ingestion_model/model.py
def fetch_resource(self, name: str | None = None) -> Resource:
    """Fetch a resource by name or get the first available resource.

    Args:
        name: Optional name of the resource to fetch

    Returns:
        Resource: The requested resource

    Raises:
        ValueError: If the requested resource is not found or if no resources exist
    """
    _current_resource = None

    if name is not None:
        if name in self._resources:
            _current_resource = self._resources[name]
        else:
            raise ValueError(f"Resource {name} not found")
    else:
        if self._resources:
            _current_resource = self.resources[0]
        else:
            raise ValueError("Empty resource container :(")
    return _current_resource

finish_init(core_schema, *, strict_references=False, dynamic_edge_feedback=False, allowed_vertex_names=None, target_db_flavor=None)

Initialize resources against graph model and transform library.

Source code in graflo/architecture/contract/declarations/ingestion_model/model.py
def finish_init(
    self,
    core_schema: CoreSchema,
    *,
    strict_references: bool = False,
    dynamic_edge_feedback: bool = False,
    allowed_vertex_names: set[str] | None = None,
    target_db_flavor: DBType | None = None,
) -> None:
    """Initialize resources against graph model and transform library."""
    self._rebuild_runtime_state()
    for r in self.resources:
        r.finish_init(
            vertex_config=core_schema.vertex_config,
            edge_config=core_schema.edge_config,
            transforms=self._transforms,
            strict_references=strict_references,
            dynamic_edge_feedback=dynamic_edge_feedback,
            allowed_vertex_names=allowed_vertex_names,
            target_db_flavor=target_db_flavor,
        )

prune_to_graph(core_schema, disconnected=None)

Drop resource actors that reference disconnected vertices.

Source code in graflo/architecture/contract/declarations/ingestion_model/model.py
def prune_to_graph(
    self, core_schema: CoreSchema, disconnected: set[str] | None = None
) -> None:
    """Drop resource actors that reference disconnected vertices."""
    if disconnected is None:
        disconnected = (
            core_schema.vertex_config.vertex_set - core_schema.edge_config.vertices
        )
    if not disconnected:
        return

    def _mentions_disconnected(wrapper) -> bool:
        return bool(wrapper.actor.references_vertices() & disconnected)

    to_drop: list[Resource] = []
    for resource in self.resources:
        root = resource.root
        if _mentions_disconnected(root):
            to_drop.append(resource)
            continue
        root.remove_descendants_if(_mentions_disconnected)
        if not any(a.references_vertices() for a in root.collect_actors()):
            to_drop.append(resource)

    for r in to_drop:
        self.resources.remove(r)
        self._resources.pop(r.name, None)

JoinClause

Bases: ConfigBaseModel

Specification for a SQL JOIN operation.

Used by TableConnector to describe multi-table queries. Each JoinClause adds one JOIN to the generated SQL. The base row uses TableConnector.base_alias (default base), not a hard-coded name.

Attributes:

Name Type Description
table str

Table name to join (e.g. "all_classes").

schema_name str | None

Optional schema override for the joined table.

alias str | None

SQL alias for the joined table (e.g. "s", "t"). Required when the same table is joined more than once.

on_self str

Column on the base (left) table used in the ON condition.

on_other str

Column on the joined (right) table used in the ON condition.

join_type str

Type of join -- LEFT, INNER, etc. Defaults to LEFT.

select_fields list[str] | None

Explicit list of columns to SELECT from this join. When None every column of the joined table is included (aliased with the join alias prefix).

Source code in graflo/architecture/contract/bindings/connectors.py
class JoinClause(ConfigBaseModel):
    """Specification for a SQL JOIN operation.

    Used by TableConnector to describe multi-table queries. Each JoinClause
    adds one JOIN to the generated SQL. The base row uses ``TableConnector.base_alias``
    (default ``base``), not a hard-coded name.

    Attributes:
        table: Table name to join (e.g. "all_classes").
        schema_name: Optional schema override for the joined table.
        alias: SQL alias for the joined table (e.g. "s", "t"). Required when
            the same table is joined more than once.
        on_self: Column on the base (left) table used in the ON condition.
        on_other: Column on the joined (right) table used in the ON condition.
        join_type: Type of join -- LEFT, INNER, etc. Defaults to LEFT.
        select_fields: Explicit list of columns to SELECT from this join.
            When None every column of the joined table is included (aliased
            with the join alias prefix).
    """

    table: str = Field(..., description="Table name to join.")
    schema_name: str | None = Field(
        default=None, description="Schema override for the joined table."
    )
    alias: str | None = Field(
        default=None, description="SQL alias for the joined table."
    )
    on_self: str = Field(
        ..., description="Column on the base table for the ON condition."
    )
    on_other: str = Field(
        ..., description="Column on the joined table for the ON condition."
    )
    join_type: str = Field(default="LEFT", description="JOIN type (LEFT, INNER, etc.).")
    select_fields: list[str] | None = Field(
        default=None,
        description="Columns to SELECT from this join (None = all columns).",
    )

ProtoTransform

Bases: ConfigBaseModel

Base class for transform definitions.

This class provides the foundation for data transformations, supporting both functional transformations and declarative mappings.

Attributes:

Name Type Description
name str | None

Optional name of the transform

module str | None

Optional module containing the transform function

params dict[str, Any]

Dictionary of transform parameters

foo str | None

Optional name of the transform function

input tuple[str, ...]

Tuple of input field names

output tuple[str, ...]

Tuple of output field names

dress DressConfig | None

Optional pivot dressing for scalar functional results

target Literal['values', 'keys']

Whether to transform field values or document keys

keys KeySelectionConfig

Key selection when target is keys

_foo Any

Internal reference to the transform function

Source code in graflo/architecture/contract/declarations/transform.py
class ProtoTransform(ConfigBaseModel):
    """Base class for transform definitions.

    This class provides the foundation for data transformations, supporting both
    functional transformations and declarative mappings.

    Attributes:
        name: Optional name of the transform
        module: Optional module containing the transform function
        params: Dictionary of transform parameters
        foo: Optional name of the transform function
        input: Tuple of input field names
        output: Tuple of output field names
        dress: Optional pivot dressing for scalar functional results
        target: Whether to transform field values or document keys
        keys: Key selection when target is keys
        _foo: Internal reference to the transform function
    """

    name: str | None = Field(
        default=None,
        description="Optional name for this transform (e.g. for reference in ingestion_model.transforms).",
    )
    module: str | None = Field(
        default=None,
        description="Python module path containing the transform function (e.g. my_package.transforms).",
    )
    params: dict[str, Any] = Field(
        default_factory=dict,
        description="Extra parameters passed to the transform function at runtime.",
    )
    foo: str | None = Field(
        default=None,
        description="Name of the callable in module to use as the transform function.",
    )
    input: tuple[str, ...] = Field(
        default_factory=tuple,
        description="Input field names passed to the transform function.",
    )
    output: tuple[str, ...] = Field(
        default_factory=tuple,
        description="Output field names produced by the transform (defaults to input if unset).",
    )
    input_groups: tuple[tuple[str, ...], ...] = Field(
        default_factory=tuple,
        description=(
            "Explicit groups of input fields for repeated tuple-style value calls."
        ),
    )
    output_groups: tuple[tuple[str, ...], ...] = Field(
        default_factory=tuple,
        description=(
            "Explicit output field groups aligned with input_groups for grouped value calls."
        ),
    )
    dress: DressConfig | None = Field(
        default=None,
        description=(
            "Dressing spec for pivoted output. Applies to ingestion_model.transforms "
            "entries and to inline transform steps. "
            "dress.key receives the input field name, dress.value receives the "
            "function result. E.g. dress={key: name, value: value} with "
            "input=(Open,) produces {name: 'Open', value: <result>}."
        ),
    )
    target: Literal["values", "keys"] = Field(
        default="values",
        description=(
            "Transform target. values=apply function to input values; "
            "keys=apply function to selected document keys."
        ),
    )
    keys: KeySelectionConfig = Field(
        default_factory=KeySelectionConfig,
        description="Key selection for key-target transforms.",
    )

    _foo: Any = PrivateAttr(default=None)

    @model_validator(mode="before")
    @classmethod
    def _normalize_input_output(cls, data: Any) -> Any:
        if not isinstance(data, dict):
            return data
        data = dict(data)
        if "dress" in data and isinstance(data["dress"], (list, tuple)):
            raise ValueError(
                "List-style `dress` is no longer supported. "
                "Use a dict: dress={key: ..., value: ...}."
            )
        for key in ("input", "output"):
            if key in data:
                if data[key] is not None:
                    data[key] = _tuple_it(data[key])
                else:
                    data[key] = ()
        for key in ("input_groups", "output_groups"):
            if key in data:
                if data[key] is None:
                    data[key] = ()
                else:
                    data[key] = _tuple_groups_it(data[key])
        _normalize_keys_in_dict(data)
        return data

    @model_validator(mode="after")
    def _init_foo_and_output(self) -> Self:
        if self.module is not None and self.foo is not None:
            try:
                _module = importlib.import_module(self.module)
            except Exception as e:
                raise TypeError(f"Provided module {self.module} is not valid: {e}")
            try:
                object.__setattr__(self, "_foo", getattr(_module, self.foo))
            except Exception as e:
                raise ValueError(
                    f"Could not instantiate transform function. Exception: {e}"
                )
        if self.dress is not None:
            if self.target == "keys":
                raise ValueError("target='keys' is not compatible with dress.")
            object.__setattr__(self, "output", (self.dress.key, self.dress.value))
        elif not self.output and self.input:
            object.__setattr__(self, "output", self.input)
        return self

    @classmethod
    def get_fields_members(cls) -> list[str]:
        """Get list of field members (public model fields)."""
        return list(cls.model_fields.keys())

    def apply(self, *args: Any, **kwargs: Any) -> Any:
        """Apply the raw transform function to the given arguments.

        This is the core function invocation without any input extraction or
        output dressing — purely ``self._foo(*args, **kwargs, **self.params)``.

        Raises:
            TransformException: If no transform function has been set.
        """
        if self._foo is None:
            raise TransformException("No transform function set")
        return self._foo(*args, **kwargs, **self.params)

    def __lt__(self, other: object) -> bool:
        """Compare transforms for ordering.

        Args:
            other: Other transform to compare with

        Returns:
            bool: True if this transform should be ordered before other
        """
        if not isinstance(other, ProtoTransform):
            return NotImplemented
        if self._foo is None and other._foo is not None:
            return True
        return False

__lt__(other)

Compare transforms for ordering.

Parameters:

Name Type Description Default
other object

Other transform to compare with

required

Returns:

Name Type Description
bool bool

True if this transform should be ordered before other

Source code in graflo/architecture/contract/declarations/transform.py
def __lt__(self, other: object) -> bool:
    """Compare transforms for ordering.

    Args:
        other: Other transform to compare with

    Returns:
        bool: True if this transform should be ordered before other
    """
    if not isinstance(other, ProtoTransform):
        return NotImplemented
    if self._foo is None and other._foo is not None:
        return True
    return False

apply(*args, **kwargs)

Apply the raw transform function to the given arguments.

This is the core function invocation without any input extraction or output dressing — purely self._foo(*args, **kwargs, **self.params).

Raises:

Type Description
TransformException

If no transform function has been set.

Source code in graflo/architecture/contract/declarations/transform.py
def apply(self, *args: Any, **kwargs: Any) -> Any:
    """Apply the raw transform function to the given arguments.

    This is the core function invocation without any input extraction or
    output dressing — purely ``self._foo(*args, **kwargs, **self.params)``.

    Raises:
        TransformException: If no transform function has been set.
    """
    if self._foo is None:
        raise TransformException("No transform function set")
    return self._foo(*args, **kwargs, **self.params)

get_fields_members() classmethod

Get list of field members (public model fields).

Source code in graflo/architecture/contract/declarations/transform.py
@classmethod
def get_fields_members(cls) -> list[str]:
    """Get list of field members (public model fields)."""
    return list(cls.model_fields.keys())

Resource

Bases: ConfigBaseModel

Resource configuration and processing.

Represents a data resource that can be processed and transformed into graph structures. Manages the processing pipeline through actors and handles data encoding, transformation, and mapping. Suitable for LLM-generated schema constituents.

Dynamic vertex-type routing is handled by vertex_router steps in the pipeline (see :class:~graflo.architecture.pipeline.runtime.actor.VertexRouterActor). Per-row relationship labels and location matching for edges belong on edge pipeline steps (:class:~graflo.architecture.edge_derivation.EdgeDerivation), not on Resource.

Source code in graflo/architecture/contract/declarations/resource.py
class Resource(ConfigBaseModel):
    """Resource configuration and processing.

    Represents a data resource that can be processed and transformed into graph
    structures. Manages the processing pipeline through actors and handles data
    encoding, transformation, and mapping. Suitable for LLM-generated schema
    constituents.

    Dynamic vertex-type routing is handled by ``vertex_router`` steps in the
    pipeline (see :class:`~graflo.architecture.pipeline.runtime.actor.VertexRouterActor`).
    Per-row relationship labels and location matching for edges belong on
    ``edge`` pipeline steps (:class:`~graflo.architecture.edge_derivation.EdgeDerivation`),
    not on ``Resource``.
    """

    model_config = {"extra": "forbid"}

    name: str = PydanticField(
        ...,
        description="Name of the resource (e.g. table or file identifier).",
    )
    pipeline: list[dict[str, Any]] = PydanticField(
        ...,
        description="Pipeline of actor steps to apply in sequence (vertex, edge, transform, descend). "
        'Each step is a dict, e.g. {"vertex": "user"} or {"edge": {"from": "a", "to": "b"}}.',
        validation_alias=AliasChoices("pipeline", "apply"),
    )
    encoding: EncodingType = PydanticField(
        default=EncodingType.UTF_8,
        description="Character encoding for input/output (e.g. utf-8, ISO-8859-1).",
    )
    merge_collections: list[str] = PydanticField(
        default_factory=list,
        description="List of collection names to merge when writing to the graph.",
    )
    extra_weights: list[ResourceExtraWeightEntry] = PydanticField(
        default_factory=list,
        description="Additional edge attribute / vertex-weight enrichment for this resource.",
    )
    types: dict[str, str] = PydanticField(
        default_factory=dict,
        description='Field name to Python type expression for casting (e.g. {"amount": "float"}).',
    )
    infer_edges: bool = PydanticField(
        default=True,
        description=(
            "If True, infer edges from current vertex population. "
            "If False, emit only edges explicitly declared as edge actors in the pipeline."
        ),
    )
    infer_edge_only: list[EdgeInferSpec] = PydanticField(
        default_factory=list,
        description=(
            "Optional allow-list for inferred edges. Applies only to inferred (greedy) edges, "
            "not explicit edge actors."
        ),
    )
    infer_edge_except: list[EdgeInferSpec] = PydanticField(
        default_factory=list,
        description=(
            "Optional deny-list for inferred edges. Applies only to inferred (greedy) edges, "
            "not explicit edge actors."
        ),
    )
    drop_trivial_input_fields: bool = PydanticField(
        default=False,
        description=(
            "If True, remove top-level input keys whose value is None or the empty string before "
            "the actor pipeline runs. Only the outer dict is filtered: nested dicts and list "
            "elements are left unchanged, and keys whose values are containers (dict/list) are "
            "kept even when empty. Numeric 0 and boolean False are kept. Use with wide or "
            "sparse tabular rows so VertexActor projection sees fewer irrelevant columns."
        ),
    )
    skip_actors_on_missing_input_keys: bool | None = PydanticField(
        default=None,
        description=(
            "If True, actors that declare required input keys may skip execution when keys are "
            "missing in the current document instead of raising indexing errors. "
            "If None, defaults to drop_trivial_input_fields."
        ),
    )

    _root: ActorWrapper = PrivateAttr()
    _types: dict[str, Callable[..., Any]] = PrivateAttr(default_factory=dict)
    _vertex_config: VertexConfig = PrivateAttr()
    _edge_config: EdgeConfig = PrivateAttr()
    _executor: ActorExecutor = PrivateAttr()
    _initialized: bool = PrivateAttr(default=False)
    _edge_derivation_registry: EdgeDerivationRegistry | None = PrivateAttr(default=None)

    @model_validator(mode="after")
    def _build_root_and_types(self) -> Resource:
        """Build root ActorWrapper and resolve safe cast functions."""
        from graflo.architecture.pipeline.runtime.actor import ActorWrapper
        from graflo.architecture.pipeline.runtime.executor import ActorExecutor

        object.__setattr__(self, "_root", ActorWrapper(*self.pipeline))
        object.__setattr__(self, "_executor", ActorExecutor(self._root))
        object.__setattr__(self, "_types", {})
        for k, v in self.types.items():
            caster = _resolve_type_caster(v)
            if caster is not None:
                self._types[k] = caster
            else:
                logger.error(
                    "For resource %s for field %s failed to resolve cast type %s",
                    self.name,
                    k,
                    v,
                )
        # Placeholders until schema binds real configs.
        object.__setattr__(self, "_vertex_config", VertexConfig(vertices=[]))
        object.__setattr__(self, "_edge_config", EdgeConfig())
        object.__setattr__(self, "_initialized", False)
        self._validate_infer_edge_spec_policy()
        return self

    def _validate_infer_edge_spec_policy(self) -> None:
        if self.infer_edge_only and self.infer_edge_except:
            raise ValueError(
                "Resource infer_edge_only and infer_edge_except are mutually exclusive."
            )

    def _validate_infer_edge_spec_targets(self, edge_config: EdgeConfig) -> None:
        known_edge_ids = {edge_id for edge_id, _ in edge_config.items()}

        def _validate_list(field_name: str, specs: list[EdgeInferSpec]) -> None:
            unknown: list[EdgeId] = []
            for spec in specs:
                if not any(spec.matches(edge_id) for edge_id in known_edge_ids):
                    unknown.append(spec.edge_id)
            if unknown:
                raise ValueError(
                    f"Resource {field_name} contains unknown edge selectors: {unknown}"
                )

        _validate_list("infer_edge_only", self.infer_edge_only)
        _validate_list("infer_edge_except", self.infer_edge_except)

    @property
    def vertex_config(self) -> VertexConfig:
        """Vertex configuration (set by Schema.finish_init)."""
        return self._vertex_config

    @property
    def edge_config(self) -> EdgeConfig:
        """Edge configuration (set by Schema.finish_init)."""
        return self._edge_config

    @property
    def root(self) -> ActorWrapper:
        """Root actor wrapper for the processing pipeline."""
        return self._root

    def finish_init(
        self,
        vertex_config: VertexConfig,
        edge_config: EdgeConfig,
        transforms: dict[str, ProtoTransform],
        *,
        strict_references: bool = False,
        dynamic_edge_feedback: bool = False,
        allowed_vertex_names: set[str] | None = None,
        target_db_flavor: DBType | None = None,
    ) -> None:
        """Complete resource initialization.

        Initializes the resource with vertex and edge configurations,
        and sets up the processing pipeline. Called by Schema after load.

        Args:
            vertex_config: Configuration for vertices
            edge_config: Configuration for edges
            transforms: Dictionary of available transforms
            target_db_flavor: Target graph DB flavor (for ingestion-time defaults, e.g. TigerGraph).
        """
        self._rebuild_runtime(
            vertex_config=vertex_config,
            edge_config=edge_config,
            transforms=transforms,
            strict_references=strict_references,
            dynamic_edge_feedback=dynamic_edge_feedback,
            allowed_vertex_names=allowed_vertex_names,
            target_db_flavor=target_db_flavor,
        )

    def _edge_ids_from_edge_actors(self) -> set[EdgeId]:
        """Collect (source, target, None) for every EdgeActor in this resource's pipeline.

        Used to auto-add to infer_edge_except so inferred edges do not duplicate
        edges produced by explicit edge actors.
        """
        from graflo.architecture.pipeline.runtime.actor import EdgeActor

        edge_actors = [
            a for a in self.root.collect_actors() if isinstance(a, EdgeActor)
        ]
        # Dynamic EdgeActors (ea.edge is None) resolve types at row time;
        # exclude them from static inference suppression.
        return {
            (ea.edge.source, ea.edge.target, None)
            for ea in edge_actors
            if ea.edge is not None
        }

    def _validate_dynamic_edge_vertices_exist(
        self, vertex_config: VertexConfig
    ) -> None:
        """Ensure all vertices implied by dynamic edge controls are declared."""
        known_vertices = set(vertex_config.vertex_set)
        referenced_vertices: set[str] = set()

        for spec in self.infer_edge_only:
            referenced_vertices.add(spec.source)
            referenced_vertices.add(spec.target)

        for spec in self.infer_edge_except:
            referenced_vertices.add(spec.source)
            referenced_vertices.add(spec.target)

        for source, target, _ in self._edge_ids_from_edge_actors():
            referenced_vertices.add(source)
            referenced_vertices.add(target)

        missing_vertices = sorted(referenced_vertices - known_vertices)
        if missing_vertices:
            raise ValueError(
                "Resource dynamic edge references undefined vertices: "
                f"{missing_vertices}. "
                "Declare these vertices in vertex_config before using dynamic/inferred edges."
            )

    def _rebuild_runtime(
        self,
        *,
        vertex_config: VertexConfig,
        edge_config: EdgeConfig,
        transforms: dict[str, ProtoTransform],
        strict_references: bool = False,
        dynamic_edge_feedback: bool = False,
        allowed_vertex_names: set[str] | None = None,
        target_db_flavor: DBType | None = None,
    ) -> None:
        """Rebuild runtime actor initialization state from typed context."""
        # Keep the full schema vertex_config for correctness validations, but
        # use the filtered runtime vertex_config for actor execution.
        runtime_vertex_config = _filter_vertex_config_by_allowed(
            vertex_config, allowed_vertex_names=allowed_vertex_names
        )
        object.__setattr__(self, "_vertex_config", runtime_vertex_config)
        # Runtime actors may register dynamic edges; keep per-resource edge state.
        local_edge_config = EdgeConfig.model_validate(
            edge_config.to_dict(skip_defaults=False)
        )
        object.__setattr__(self, "_edge_config", local_edge_config)
        self._validate_dynamic_edge_vertices_exist(vertex_config)
        self._validate_infer_edge_spec_targets(self._edge_config)

        baseline_edge_ids = {edge_id for edge_id, _ in edge_config.items()}
        infer_edge_except = {spec.edge_id for spec in self.infer_edge_except}
        # When not using infer_edge_only, auto-add (s,t,None) to infer_edge_except
        # for any edge type handled by explicit EdgeActors in this resource.
        if not self.infer_edge_only:
            infer_edge_except |= self._edge_ids_from_edge_actors()

        from graflo.architecture.pipeline.runtime.actor import ActorInitContext

        edge_derivation_registry = EdgeDerivationRegistry()
        object.__setattr__(self, "_edge_derivation_registry", edge_derivation_registry)

        logger.debug("total resource actor count : %s", self.root.count())
        skip_on_missing_input_keys = (
            self.skip_actors_on_missing_input_keys
            if self.skip_actors_on_missing_input_keys is not None
            else self.drop_trivial_input_fields
        )
        init_ctx = ActorInitContext(
            vertex_config=runtime_vertex_config,
            edge_config=self._edge_config,
            transforms=transforms,
            edge_derivation=edge_derivation_registry,
            allowed_vertex_names=allowed_vertex_names,
            infer_edges=self.infer_edges,
            infer_edge_only={spec.edge_id for spec in self.infer_edge_only},
            infer_edge_except=infer_edge_except,
            strict_references=strict_references,
            skip_actors_on_missing_input_keys=skip_on_missing_input_keys,
            target_db_flavor=target_db_flavor,
        )
        self.root.finish_init(init_ctx=init_ctx)
        object.__setattr__(self, "_initialized", True)

        if dynamic_edge_feedback:
            # Edge actors register static edge definitions into the resource-local edge
            # config during finish_init(). Optionally propagate newly discovered edges
            # to the shared schema-level edge_config so schema definition and DB
            # writers can see them.
            for edge_id, edge in self._edge_config.items():
                if edge_id in baseline_edge_ids:
                    continue
                edge_config.update_edges(
                    edge.model_copy(deep=True), vertex_config=vertex_config
                )

        logger.debug("total resource actor count (after finit): %s", self.root.count())

        reg = self._edge_derivation_registry
        for entry in self.extra_weights:
            entry.edge.finish_init(vertex_config)
            if reg is not None and entry.vertex_weights:
                reg.merge_vertex_weights(entry.edge.edge_id, entry.vertex_weights)

    def __call__(self, doc: dict) -> defaultdict[GraphEntity, list]:
        """Process a document through the resource pipeline.

        Args:
            doc: Document to process

        Returns:
            defaultdict[GraphEntity, list]: Processed graph entities
        """
        if not self._initialized:
            raise RuntimeError(
                f"Resource '{self.name}' must be initialized via finish_init() before use."
            )
        work_doc: dict[str, Any] = (
            _strip_trivial_top_level_fields(doc)
            if self.drop_trivial_input_fields
            else doc
        )
        extraction_ctx = self._executor.extract(work_doc)
        result = self._executor.assemble_result(extraction_ctx)
        return result.entities

    def count(self) -> int:
        """Total number of actors in the resource pipeline."""
        return self.root.count()

edge_config property

Edge configuration (set by Schema.finish_init).

root property

Root actor wrapper for the processing pipeline.

vertex_config property

Vertex configuration (set by Schema.finish_init).

__call__(doc)

Process a document through the resource pipeline.

Parameters:

Name Type Description Default
doc dict

Document to process

required

Returns:

Type Description
defaultdict[GraphEntity, list]

defaultdict[GraphEntity, list]: Processed graph entities

Source code in graflo/architecture/contract/declarations/resource.py
def __call__(self, doc: dict) -> defaultdict[GraphEntity, list]:
    """Process a document through the resource pipeline.

    Args:
        doc: Document to process

    Returns:
        defaultdict[GraphEntity, list]: Processed graph entities
    """
    if not self._initialized:
        raise RuntimeError(
            f"Resource '{self.name}' must be initialized via finish_init() before use."
        )
    work_doc: dict[str, Any] = (
        _strip_trivial_top_level_fields(doc)
        if self.drop_trivial_input_fields
        else doc
    )
    extraction_ctx = self._executor.extract(work_doc)
    result = self._executor.assemble_result(extraction_ctx)
    return result.entities

count()

Total number of actors in the resource pipeline.

Source code in graflo/architecture/contract/declarations/resource.py
def count(self) -> int:
    """Total number of actors in the resource pipeline."""
    return self.root.count()

finish_init(vertex_config, edge_config, transforms, *, strict_references=False, dynamic_edge_feedback=False, allowed_vertex_names=None, target_db_flavor=None)

Complete resource initialization.

Initializes the resource with vertex and edge configurations, and sets up the processing pipeline. Called by Schema after load.

Parameters:

Name Type Description Default
vertex_config VertexConfig

Configuration for vertices

required
edge_config EdgeConfig

Configuration for edges

required
transforms dict[str, ProtoTransform]

Dictionary of available transforms

required
target_db_flavor DBType | None

Target graph DB flavor (for ingestion-time defaults, e.g. TigerGraph).

None
Source code in graflo/architecture/contract/declarations/resource.py
def finish_init(
    self,
    vertex_config: VertexConfig,
    edge_config: EdgeConfig,
    transforms: dict[str, ProtoTransform],
    *,
    strict_references: bool = False,
    dynamic_edge_feedback: bool = False,
    allowed_vertex_names: set[str] | None = None,
    target_db_flavor: DBType | None = None,
) -> None:
    """Complete resource initialization.

    Initializes the resource with vertex and edge configurations,
    and sets up the processing pipeline. Called by Schema after load.

    Args:
        vertex_config: Configuration for vertices
        edge_config: Configuration for edges
        transforms: Dictionary of available transforms
        target_db_flavor: Target graph DB flavor (for ingestion-time defaults, e.g. TigerGraph).
    """
    self._rebuild_runtime(
        vertex_config=vertex_config,
        edge_config=edge_config,
        transforms=transforms,
        strict_references=strict_references,
        dynamic_edge_feedback=dynamic_edge_feedback,
        allowed_vertex_names=allowed_vertex_names,
        target_db_flavor=target_db_flavor,
    )

ResourceConnector

Bases: ConfigBaseModel, ABC

Abstract base class for resource connectors (files or tables).

Provides common API for connector matching and resource identification. All concrete connector types inherit from this class.

Connectors only describe source-side matching/query behavior. Resource-to- connector linkage is handled by Bindings.

Source code in graflo/architecture/contract/bindings/connectors.py
class ResourceConnector(ConfigBaseModel, abc.ABC):
    """Abstract base class for resource connectors (files or tables).

    Provides common API for connector matching and resource identification.
    All concrete connector types inherit from this class.

    Connectors only describe source-side matching/query behavior. Resource-to-
    connector linkage is handled by ``Bindings``.
    """

    name: str | None = Field(
        default=None,
        description="Optional connector name used by top-level resource_connector mapping.",
    )
    resource_name: str | None = Field(
        default=None,
        description="Optional direct resource binding declared on the connector itself.",
    )
    hash: str = Field(
        default="",
        exclude=True,
        description="Deterministic internal connector id derived from defining fields.",
    )

    def _hash_payload(self) -> dict[str, Any]:
        payload = self.model_dump(
            mode="json",
            by_alias=True,
            exclude={"hash", "name", "resource_name"},
        )
        payload["_connector_type"] = type(self).__name__
        return payload

    @model_validator(mode="after")
    def _compute_hash(self) -> Self:
        canonical = json.dumps(
            self._hash_payload(), sort_keys=True, separators=(",", ":")
        )
        object.__setattr__(
            self,
            "hash",
            hashlib.sha256(canonical.encode("utf-8")).hexdigest(),
        )
        return self

    @abc.abstractmethod
    def matches(self, resource_identifier: str) -> bool:
        """Check if connector matches a resource identifier.

        Args:
            resource_identifier: Identifier to match (filename or table name)

        Returns:
            bool: True if connector matches
        """
        pass

    @abc.abstractmethod
    def bound_source_kind(self) -> BoundSourceKind:
        """Return the physical source kind for this connector."""
        pass

bound_source_kind() abstractmethod

Return the physical source kind for this connector.

Source code in graflo/architecture/contract/bindings/connectors.py
@abc.abstractmethod
def bound_source_kind(self) -> BoundSourceKind:
    """Return the physical source kind for this connector."""
    pass

matches(resource_identifier) abstractmethod

Check if connector matches a resource identifier.

Parameters:

Name Type Description Default
resource_identifier str

Identifier to match (filename or table name)

required

Returns:

Name Type Description
bool bool

True if connector matches

Source code in graflo/architecture/contract/bindings/connectors.py
@abc.abstractmethod
def matches(self, resource_identifier: str) -> bool:
    """Check if connector matches a resource identifier.

    Args:
        resource_identifier: Identifier to match (filename or table name)

    Returns:
        bool: True if connector matches
    """
    pass

Schema

Bases: ConfigBaseModel

Graph schema (A+B): metadata, core schema, and DB profile.

Source code in graflo/architecture/schema/document.py
class Schema(ConfigBaseModel):
    """Graph schema (A+B): metadata, core schema, and DB profile."""

    metadata: GraphMetadata = PydanticField(
        ...,
        description="Schema metadata and versioning (name, version).",
    )
    core_schema: CoreSchema = PydanticField(
        ...,
        description="Core schema model (vertices + edges).",
        validation_alias=AliasChoices("core_schema", "graph"),
    )
    db_profile: DatabaseProfile = PydanticField(
        default_factory=DatabaseProfile,
        description=(
            "Database-specific physical profile (secondary indexes, naming, TigerGraph GSQL "
            "DEFAULT overrides via default_property_values, etc.)."
        ),
    )

    @model_validator(mode="after")
    def _init_schema(self) -> Schema:
        self.finish_init()
        return self

    def finish_init(self) -> None:
        self.core_schema.finish_init()

    def remove_disconnected_vertices(self) -> set[str]:
        return self.core_schema.remove_disconnected_vertices()

    def resolve_db_aware(self, db_flavor: DBType | None = None) -> SchemaDBAware:
        """Build DB-aware runtime wrappers without mutating logical schema."""
        from .db_aware import (
            EdgeConfigDBAware,
            SchemaDBAware,
            VertexConfigDBAware,
        )

        if db_flavor is not None:
            self.db_profile.db_flavor = db_flavor

        vertex_db = VertexConfigDBAware(self.core_schema.vertex_config, self.db_profile)
        edge_db = EdgeConfigDBAware(
            self.core_schema.edge_config, vertex_db, self.db_profile
        )
        edge_db.compile_identity_indexes()
        return SchemaDBAware(
            vertex_config=vertex_db,
            edge_config=edge_db,
            db_profile=self.db_profile,
        )

    @staticmethod
    def _slug_filename_token(token: str) -> str:
        """Normalize arbitrary token into filename-safe slug."""
        cleaned = re.sub(r"[^A-Za-z0-9._-]+", "-", token.strip())
        return cleaned.strip("-") or "schema"

    def default_dump_filename(self) -> str:
        """Return default schema dump filename: <name>-<version>.yaml."""
        schema_name = self._slug_filename_token(self.metadata.name)
        version = (
            self.metadata.version
            if self.metadata.version is not None
            else "unversioned"
        )
        schema_version = self._slug_filename_token(version)
        return f"{schema_name}-{schema_version}.yaml"

    def dump(
        self,
        path: str | pathlib.Path | None = None,
        *,
        exclude_defaults: bool = True,
    ) -> pathlib.Path:
        """Dump schema YAML to path, excluding defaults by default.

        If path is omitted, writes into current working directory using
        `<schema_name>-<version>.yaml`.
        """
        if path is None:
            target_path = pathlib.Path.cwd() / self.default_dump_filename()
        else:
            target_path = pathlib.Path(path)
            if target_path.is_dir():
                target_path = target_path / self.default_dump_filename()

        if exclude_defaults:
            payload = self.to_minimal_canonical_dict()
        else:
            payload = self.to_dict(skip_defaults=False)
        target_path.parent.mkdir(parents=True, exist_ok=True)
        target_path.write_text(
            yaml.safe_dump(
                payload,
                default_flow_style=False,
                sort_keys=False,
            ),
            encoding="utf-8",
        )
        return target_path

default_dump_filename()

Return default schema dump filename: -.yaml.

Source code in graflo/architecture/schema/document.py
def default_dump_filename(self) -> str:
    """Return default schema dump filename: <name>-<version>.yaml."""
    schema_name = self._slug_filename_token(self.metadata.name)
    version = (
        self.metadata.version
        if self.metadata.version is not None
        else "unversioned"
    )
    schema_version = self._slug_filename_token(version)
    return f"{schema_name}-{schema_version}.yaml"

dump(path=None, *, exclude_defaults=True)

Dump schema YAML to path, excluding defaults by default.

If path is omitted, writes into current working directory using <schema_name>-<version>.yaml.

Source code in graflo/architecture/schema/document.py
def dump(
    self,
    path: str | pathlib.Path | None = None,
    *,
    exclude_defaults: bool = True,
) -> pathlib.Path:
    """Dump schema YAML to path, excluding defaults by default.

    If path is omitted, writes into current working directory using
    `<schema_name>-<version>.yaml`.
    """
    if path is None:
        target_path = pathlib.Path.cwd() / self.default_dump_filename()
    else:
        target_path = pathlib.Path(path)
        if target_path.is_dir():
            target_path = target_path / self.default_dump_filename()

    if exclude_defaults:
        payload = self.to_minimal_canonical_dict()
    else:
        payload = self.to_dict(skip_defaults=False)
    target_path.parent.mkdir(parents=True, exist_ok=True)
    target_path.write_text(
        yaml.safe_dump(
            payload,
            default_flow_style=False,
            sort_keys=False,
        ),
        encoding="utf-8",
    )
    return target_path

resolve_db_aware(db_flavor=None)

Build DB-aware runtime wrappers without mutating logical schema.

Source code in graflo/architecture/schema/document.py
def resolve_db_aware(self, db_flavor: DBType | None = None) -> SchemaDBAware:
    """Build DB-aware runtime wrappers without mutating logical schema."""
    from .db_aware import (
        EdgeConfigDBAware,
        SchemaDBAware,
        VertexConfigDBAware,
    )

    if db_flavor is not None:
        self.db_profile.db_flavor = db_flavor

    vertex_db = VertexConfigDBAware(self.core_schema.vertex_config, self.db_profile)
    edge_db = EdgeConfigDBAware(
        self.core_schema.edge_config, vertex_db, self.db_profile
    )
    edge_db.compile_identity_indexes()
    return SchemaDBAware(
        vertex_config=vertex_db,
        edge_config=edge_db,
        db_profile=self.db_profile,
    )

SchemaDBAware dataclass

DB-aware schema runtime view.

Source code in graflo/architecture/schema/db_aware.py
@dataclass(frozen=True)
class SchemaDBAware:
    """DB-aware schema runtime view."""

    vertex_config: VertexConfigDBAware
    edge_config: EdgeConfigDBAware
    db_profile: DatabaseProfile

SparqlConnector

Bases: ResourceConnector

Connector for matching SPARQL / RDF data sources.

Each SparqlConnector targets instances of a single rdf:Class. It can be backed either by a remote SPARQL endpoint (Fuseki, Blazegraph, ...) or by a local RDF file parsed with rdflib.

Attributes:

Name Type Description
rdf_class str

Full URI of the rdf:Class whose instances this connector fetches (e.g. "http://example.org/Person").

endpoint_url str | None

SPARQL query endpoint URL. When set, instances are fetched via HTTP. When None the connector is for local file mode.

graph_uri str | None

Named-graph URI to restrict the query to (optional).

sparql_query str | None

Custom SPARQL SELECT query override. When provided the auto-generated per-class query is skipped.

rdf_file Path | None

Path to a local RDF file (.ttl, .rdf, .n3, .jsonld). Mutually exclusive with endpoint_url.

Source code in graflo/architecture/contract/bindings/connectors.py
class SparqlConnector(ResourceConnector):
    """Connector for matching SPARQL / RDF data sources.

    Each ``SparqlConnector`` targets instances of a single ``rdf:Class``.
    It can be backed either by a remote SPARQL endpoint (Fuseki, Blazegraph, ...)
    or by a local RDF file parsed with *rdflib*.

    Attributes:
        rdf_class: Full URI of the ``rdf:Class`` whose instances this connector
            fetches (e.g. ``"http://example.org/Person"``).
        endpoint_url: SPARQL query endpoint URL.  When set, instances are
            fetched via HTTP.  When ``None`` the connector is for local file mode.
        graph_uri: Named-graph URI to restrict the query to (optional).
        sparql_query: Custom SPARQL ``SELECT`` query override.  When provided
            the auto-generated per-class query is skipped.
        rdf_file: Path to a local RDF file (``.ttl``, ``.rdf``, ``.n3``,
            ``.jsonld``).  Mutually exclusive with *endpoint_url*.
    """

    rdf_class: str = Field(
        ..., description="URI of the rdf:Class to fetch instances of"
    )
    endpoint_url: str | None = Field(
        default=None, description="SPARQL query endpoint URL"
    )
    graph_uri: str | None = Field(
        default=None, description="Named graph URI (optional)"
    )
    sparql_query: str | None = Field(
        default=None, description="Custom SPARQL query override"
    )
    rdf_file: pathlib.Path | None = Field(
        default=None, description="Path to a local RDF file"
    )

    def matches(self, resource_identifier: str) -> bool:
        """Match by the local name (fragment) of the rdf:Class URI.

        Args:
            resource_identifier: Identifier to match against

        Returns:
            True when *resource_identifier* equals the class local name
        """
        local_name = self.rdf_class.rsplit("#", 1)[-1].rsplit("/", 1)[-1]
        return resource_identifier == local_name

    def bound_source_kind(self) -> BoundSourceKind:
        """Return ``BoundSourceKind.SPARQL``."""
        return BoundSourceKind.SPARQL

    def build_select_query(self) -> str:
        """Build a SPARQL SELECT query for instances of ``rdf_class``.

        If *sparql_query* is set it is returned as-is.  Otherwise a simple
        per-class query is generated::

            SELECT ?s ?p ?o WHERE {
              ?s a <rdf_class> .
              ?s ?p ?o .
            }

        Returns:
            SPARQL query string
        """
        if self.sparql_query:
            return self.sparql_query

        graph_open = f"GRAPH <{self.graph_uri}> {{" if self.graph_uri else ""
        graph_close = "}" if self.graph_uri else ""

        return (
            "SELECT ?s ?p ?o WHERE { "
            f"{graph_open} "
            f"?s a <{self.rdf_class}> . "
            f"?s ?p ?o . "
            f"{graph_close} "
            "}"
        )

bound_source_kind()

Return BoundSourceKind.SPARQL.

Source code in graflo/architecture/contract/bindings/connectors.py
def bound_source_kind(self) -> BoundSourceKind:
    """Return ``BoundSourceKind.SPARQL``."""
    return BoundSourceKind.SPARQL

build_select_query()

Build a SPARQL SELECT query for instances of rdf_class.

If sparql_query is set it is returned as-is. Otherwise a simple per-class query is generated::

SELECT ?s ?p ?o WHERE {
  ?s a <rdf_class> .
  ?s ?p ?o .
}

Returns:

Type Description
str

SPARQL query string

Source code in graflo/architecture/contract/bindings/connectors.py
def build_select_query(self) -> str:
    """Build a SPARQL SELECT query for instances of ``rdf_class``.

    If *sparql_query* is set it is returned as-is.  Otherwise a simple
    per-class query is generated::

        SELECT ?s ?p ?o WHERE {
          ?s a <rdf_class> .
          ?s ?p ?o .
        }

    Returns:
        SPARQL query string
    """
    if self.sparql_query:
        return self.sparql_query

    graph_open = f"GRAPH <{self.graph_uri}> {{" if self.graph_uri else ""
    graph_close = "}" if self.graph_uri else ""

    return (
        "SELECT ?s ?p ?o WHERE { "
        f"{graph_open} "
        f"?s a <{self.rdf_class}> . "
        f"?s ?p ?o . "
        f"{graph_close} "
        "}"
    )

matches(resource_identifier)

Match by the local name (fragment) of the rdf:Class URI.

Parameters:

Name Type Description Default
resource_identifier str

Identifier to match against

required

Returns:

Type Description
bool

True when resource_identifier equals the class local name

Source code in graflo/architecture/contract/bindings/connectors.py
def matches(self, resource_identifier: str) -> bool:
    """Match by the local name (fragment) of the rdf:Class URI.

    Args:
        resource_identifier: Identifier to match against

    Returns:
        True when *resource_identifier* equals the class local name
    """
    local_name = self.rdf_class.rsplit("#", 1)[-1].rsplit("/", 1)[-1]
    return resource_identifier == local_name

TableConnector

Bases: ResourceConnector

Connector for matching database tables.

Supports simple single-table queries as well as multi-table JOINs and pushdown filters via FilterExpression.

Attributes:

Name Type Description
table_name str

Exact table name or regex pattern

schema_name str | None

Schema name (optional, defaults to public)

database str | None

Database name (optional)

date_field str | None

Name of the date field to filter on (for date-based filtering)

date_filter str | None

SQL-style date filter condition (e.g., "> '2020-10-10'")

date_range_start str | None

Start date for range filtering (e.g., "2015-11-11")

date_range_days int | None

Number of days after start date (used with date_range_start)

filters list[Any]

General-purpose pushdown filters rendered as SQL WHERE fragments.

joins list[JoinClause]

Multi-table JOIN specifications (auto-generated or explicit).

base_alias str

SQL alias for the base table when joins is non-empty.

select_columns list[str] | None

Explicit SELECT column list. None means * for the base table (plus aliased columns from joins).

Source code in graflo/architecture/contract/bindings/connectors.py
class TableConnector(ResourceConnector):
    """Connector for matching database tables.

    Supports simple single-table queries as well as multi-table JOINs and
    pushdown filters via ``FilterExpression``.

    Attributes:
        table_name: Exact table name or regex pattern
        schema_name: Schema name (optional, defaults to public)
        database: Database name (optional)
        date_field: Name of the date field to filter on (for date-based filtering)
        date_filter: SQL-style date filter condition (e.g., "> '2020-10-10'")
        date_range_start: Start date for range filtering (e.g., "2015-11-11")
        date_range_days: Number of days after start date (used with date_range_start)
        filters: General-purpose pushdown filters rendered as SQL WHERE fragments.
        joins: Multi-table JOIN specifications (auto-generated or explicit).
        base_alias: SQL alias for the base table when ``joins`` is non-empty.
        select_columns: Explicit SELECT column list. None means ``*`` for the
            base table (plus aliased columns from joins).
    """

    table_name: str = Field(
        default="", validation_alias=AliasChoices("table_name", "table")
    )
    schema_name: str | None = Field(
        default=None, validation_alias=AliasChoices("schema_name", "schema")
    )
    database: str | None = None
    date_field: str | None = None
    date_filter: str | None = None
    date_range_start: str | None = None
    date_range_days: int | None = None
    filters: list[Any] = Field(
        default_factory=list,
        description="Pushdown FilterExpression filters (rendered to SQL WHERE).",
    )
    joins: list[JoinClause] = Field(
        default_factory=list,
        description="JOIN clauses for multi-table queries.",
    )
    base_alias: str = Field(
        default="base",
        description="SQL alias for the base table row when joins are present.",
    )
    select_columns: list[str] | None = Field(
        default=None,
        description="Explicit SELECT columns. None = SELECT * (plus join aliases).",
    )
    view: Any = Field(
        default=None,
        description="SelectSpec or dict for declarative view (alternative to table+joins+filters).",
    )

    @field_validator("view", mode="before")
    @classmethod
    def _coerce_view(cls, v: Any) -> Any:
        if v is None:
            return None
        if isinstance(v, dict):
            from graflo.filter.select import SelectSpec

            return SelectSpec.from_dict(v)
        return v

    @model_validator(mode="after")
    def _validate_table_connector(self) -> Self:
        """Validate table_name and date filtering parameters."""
        if not self.table_name:
            raise ValueError("table_name is required for TableConnector")
        if not _BASE_TABLE_ALIAS_IDENT.match(self.base_alias):
            raise ValueError(
                "base_alias must be a simple SQL identifier "
                "(ASCII letter, digit, underscore)"
            )
        join_aliases = {jc.alias or jc.table for jc in self.joins}
        if self.base_alias in join_aliases:
            raise ValueError(
                f"base_alias {self.base_alias!r} conflicts with a join alias "
                f"{sorted(join_aliases)}"
            )
        if (self.date_filter or self.date_range_start) and not self.date_field:
            raise ValueError(
                "date_field is required when using date_filter or date_range_start"
            )
        if self.date_range_days is not None and not self.date_range_start:
            raise ValueError("date_range_start is required when using date_range_days")
        return self

    def matches(self, resource_identifier: str) -> bool:
        """Check if connector matches a table name.

        Args:
            resource_identifier: Table name to match (format: schema.table or just table)

        Returns:
            bool: True if connector matches
        """
        if not self.table_name:
            return False

        # Compile regex expression
        if self.table_name.startswith("^") or self.table_name.endswith("$"):
            # Already a regex expression
            compiled_regex = re.compile(self.table_name)
        else:
            # Exact match expression
            compiled_regex = re.compile(f"^{re.escape(self.table_name)}$")

        # Check if resource_identifier matches
        if compiled_regex.match(resource_identifier):
            return True

        # If schema_name is specified, also check schema.table format
        if self.schema_name:
            full_name = f"{self.schema_name}.{resource_identifier}"
            if compiled_regex.match(full_name):
                return True

        return False

    def bound_source_kind(self) -> BoundSourceKind:
        return BoundSourceKind.SQL_TABLE

    def build_where_clause(self, base_alias: str | None = None) -> str:
        """Build SQL WHERE clause from date filtering parameters **and** general filters.

        Returns:
            WHERE clause string (without the WHERE keyword) or empty string if no filters
        """
        from graflo.onto import ExpressionFlavor

        conditions: list[str] = []

        # Date-specific conditions (legacy fields)
        if self.date_field:
            date_col = self._qualified_column_ref(self.date_field, base_alias)
            if self.date_range_start and self.date_range_days is not None:
                start_date = self._parse_iso_date(self.date_range_start)
                end_date = start_date + timedelta(days=self.date_range_days)
                conditions.append(f"{date_col} >= '{start_date.isoformat()}'")
                conditions.append(f"{date_col} < '{end_date.isoformat()}'")
            elif self.date_filter:
                filter_parts = self.date_filter.strip().split(None, 1)
                if len(filter_parts) == 2:
                    operator, value = filter_parts
                    if not (value.startswith("'") and value.endswith("'")):
                        if len(value) == 10 and value.count("-") == 2:
                            value = f"'{value}'"
                    conditions.append(f"{date_col} {operator} {value}")
                else:
                    conditions.append(f"{date_col} {self.date_filter}")

        # General-purpose FilterExpression filters
        for filt in self.filters:
            filt_expr = self._coerce_filter_expression(filt, base_alias)
            if filt_expr is not None:
                rendered = filt_expr(kind=ExpressionFlavor.SQL)
                if rendered:
                    conditions.append(str(rendered))

        if conditions:
            return " AND ".join(conditions)
        return ""

    def build_query(self, effective_schema: str | None = None) -> str:
        """Build a complete SQL SELECT query.

        When ``view`` is set, delegates to ``view.build_sql()``. Otherwise
        incorporates the base table, any JoinClauses, explicit select_columns,
        date filters, and FilterExpression filters.

        Args:
            effective_schema: Schema to use if ``self.schema_name`` is None.

        Returns:
            Complete SQL query string.
        """
        schema = self.schema_name or effective_schema or "public"
        if self.view is not None:
            from graflo.filter.select import SelectSpec

            if isinstance(self.view, SelectSpec):
                query = self.view.build_sql(schema=schema, base_table=self.table_name)
                where = self.build_where_clause(base_alias=self.view.base_alias)
                if where:
                    return self._append_where_condition(query, where)
                return query
        base_alias = self.base_alias if self.joins else None
        base_ref = f'"{schema}"."{self.table_name}"'
        if base_alias:
            base_ref_aliased = f"{base_ref} {base_alias}"
        else:
            base_ref_aliased = base_ref

        # --- SELECT ---
        select_parts: list[str] = []
        if self.select_columns is not None:
            select_parts = list(self.select_columns)
        elif self.joins:
            select_parts.append(f"{base_alias}.*")
            for jc in self.joins:
                alias = jc.alias or jc.table
                jc_schema = jc.schema_name or schema
                if jc.select_fields is not None:
                    for col in jc.select_fields:
                        select_parts.append(f'{alias}."{col}" AS "{alias}__{col}"')
                else:
                    select_parts.append(f"{alias}.*")
        else:
            select_parts.append("*")

        select_clause = ", ".join(select_parts)

        # --- FROM + JOINs ---
        from_clause = base_ref_aliased
        for jc in self.joins:
            jc_schema = jc.schema_name or schema
            alias = jc.alias or jc.table
            join_ref = f'"{jc_schema}"."{jc.table}"'
            left_col = (
                f'{base_alias}."{jc.on_self}"' if base_alias else f'"{jc.on_self}"'
            )
            right_col = f'{alias}."{jc.on_other}"'
            from_clause += (
                f" {jc.join_type} JOIN {join_ref} {alias} ON {left_col} = {right_col}"
            )

        query = f"SELECT {select_clause} FROM {from_clause}"

        # --- WHERE ---
        where = self.build_where_clause(base_alias=base_alias)
        if where:
            query += f" WHERE {where}"

        return query

    @staticmethod
    def _append_where_condition(query: str, condition: str) -> str:
        """Append a SQL condition to *query* preserving an existing WHERE clause."""
        if re.search(r"\bWHERE\b", query, flags=re.IGNORECASE):
            return f"{query} AND {condition}"
        return f"{query} WHERE {condition}"

    @staticmethod
    def _qualified_column_ref(column: str, base_alias: str | None) -> str:
        if base_alias:
            return f'{base_alias}."{column}"'
        return f'"{column}"'

    @staticmethod
    def _parse_iso_date(raw: str) -> date:
        value = raw.strip().strip("'")
        try:
            return date.fromisoformat(value)
        except ValueError:
            dt = datetime.fromisoformat(value)
            return dt.date()

    @classmethod
    def _qualify_filter_payload(
        cls, payload: dict[str, Any], base_alias: str | None
    ) -> dict[str, Any]:
        qualified = dict(payload)
        if base_alias is None:
            return qualified
        if qualified.get("kind") == "leaf":
            field = qualified.get("field")
            if isinstance(field, str) and "." not in field:
                qualified["field"] = f"{base_alias}.{field}"
            return qualified
        deps = qualified.get("deps")
        if isinstance(deps, list):
            qualified["deps"] = [
                cls._qualify_filter_payload(dep, base_alias)
                if isinstance(dep, dict)
                else dep
                for dep in deps
            ]
        return qualified

    @classmethod
    def _coerce_filter_expression(
        cls, raw_filter: Any, base_alias: str | None
    ) -> FilterExpression | None:
        from graflo.filter.onto import FilterExpression

        if isinstance(raw_filter, FilterExpression):
            payload = raw_filter.model_dump(mode="python")
            return FilterExpression.from_dict(
                cls._qualify_filter_payload(payload, base_alias)
            )
        if isinstance(raw_filter, dict):
            return FilterExpression.from_dict(
                cls._qualify_filter_payload(raw_filter, base_alias)
            )
        return None

build_query(effective_schema=None)

Build a complete SQL SELECT query.

When view is set, delegates to view.build_sql(). Otherwise incorporates the base table, any JoinClauses, explicit select_columns, date filters, and FilterExpression filters.

Parameters:

Name Type Description Default
effective_schema str | None

Schema to use if self.schema_name is None.

None

Returns:

Type Description
str

Complete SQL query string.

Source code in graflo/architecture/contract/bindings/connectors.py
def build_query(self, effective_schema: str | None = None) -> str:
    """Build a complete SQL SELECT query.

    When ``view`` is set, delegates to ``view.build_sql()``. Otherwise
    incorporates the base table, any JoinClauses, explicit select_columns,
    date filters, and FilterExpression filters.

    Args:
        effective_schema: Schema to use if ``self.schema_name`` is None.

    Returns:
        Complete SQL query string.
    """
    schema = self.schema_name or effective_schema or "public"
    if self.view is not None:
        from graflo.filter.select import SelectSpec

        if isinstance(self.view, SelectSpec):
            query = self.view.build_sql(schema=schema, base_table=self.table_name)
            where = self.build_where_clause(base_alias=self.view.base_alias)
            if where:
                return self._append_where_condition(query, where)
            return query
    base_alias = self.base_alias if self.joins else None
    base_ref = f'"{schema}"."{self.table_name}"'
    if base_alias:
        base_ref_aliased = f"{base_ref} {base_alias}"
    else:
        base_ref_aliased = base_ref

    # --- SELECT ---
    select_parts: list[str] = []
    if self.select_columns is not None:
        select_parts = list(self.select_columns)
    elif self.joins:
        select_parts.append(f"{base_alias}.*")
        for jc in self.joins:
            alias = jc.alias or jc.table
            jc_schema = jc.schema_name or schema
            if jc.select_fields is not None:
                for col in jc.select_fields:
                    select_parts.append(f'{alias}."{col}" AS "{alias}__{col}"')
            else:
                select_parts.append(f"{alias}.*")
    else:
        select_parts.append("*")

    select_clause = ", ".join(select_parts)

    # --- FROM + JOINs ---
    from_clause = base_ref_aliased
    for jc in self.joins:
        jc_schema = jc.schema_name or schema
        alias = jc.alias or jc.table
        join_ref = f'"{jc_schema}"."{jc.table}"'
        left_col = (
            f'{base_alias}."{jc.on_self}"' if base_alias else f'"{jc.on_self}"'
        )
        right_col = f'{alias}."{jc.on_other}"'
        from_clause += (
            f" {jc.join_type} JOIN {join_ref} {alias} ON {left_col} = {right_col}"
        )

    query = f"SELECT {select_clause} FROM {from_clause}"

    # --- WHERE ---
    where = self.build_where_clause(base_alias=base_alias)
    if where:
        query += f" WHERE {where}"

    return query

build_where_clause(base_alias=None)

Build SQL WHERE clause from date filtering parameters and general filters.

Returns:

Type Description
str

WHERE clause string (without the WHERE keyword) or empty string if no filters

Source code in graflo/architecture/contract/bindings/connectors.py
def build_where_clause(self, base_alias: str | None = None) -> str:
    """Build SQL WHERE clause from date filtering parameters **and** general filters.

    Returns:
        WHERE clause string (without the WHERE keyword) or empty string if no filters
    """
    from graflo.onto import ExpressionFlavor

    conditions: list[str] = []

    # Date-specific conditions (legacy fields)
    if self.date_field:
        date_col = self._qualified_column_ref(self.date_field, base_alias)
        if self.date_range_start and self.date_range_days is not None:
            start_date = self._parse_iso_date(self.date_range_start)
            end_date = start_date + timedelta(days=self.date_range_days)
            conditions.append(f"{date_col} >= '{start_date.isoformat()}'")
            conditions.append(f"{date_col} < '{end_date.isoformat()}'")
        elif self.date_filter:
            filter_parts = self.date_filter.strip().split(None, 1)
            if len(filter_parts) == 2:
                operator, value = filter_parts
                if not (value.startswith("'") and value.endswith("'")):
                    if len(value) == 10 and value.count("-") == 2:
                        value = f"'{value}'"
                conditions.append(f"{date_col} {operator} {value}")
            else:
                conditions.append(f"{date_col} {self.date_filter}")

    # General-purpose FilterExpression filters
    for filt in self.filters:
        filt_expr = self._coerce_filter_expression(filt, base_alias)
        if filt_expr is not None:
            rendered = filt_expr(kind=ExpressionFlavor.SQL)
            if rendered:
                conditions.append(str(rendered))

    if conditions:
        return " AND ".join(conditions)
    return ""

matches(resource_identifier)

Check if connector matches a table name.

Parameters:

Name Type Description Default
resource_identifier str

Table name to match (format: schema.table or just table)

required

Returns:

Name Type Description
bool bool

True if connector matches

Source code in graflo/architecture/contract/bindings/connectors.py
def matches(self, resource_identifier: str) -> bool:
    """Check if connector matches a table name.

    Args:
        resource_identifier: Table name to match (format: schema.table or just table)

    Returns:
        bool: True if connector matches
    """
    if not self.table_name:
        return False

    # Compile regex expression
    if self.table_name.startswith("^") or self.table_name.endswith("$"):
        # Already a regex expression
        compiled_regex = re.compile(self.table_name)
    else:
        # Exact match expression
        compiled_regex = re.compile(f"^{re.escape(self.table_name)}$")

    # Check if resource_identifier matches
    if compiled_regex.match(resource_identifier):
        return True

    # If schema_name is specified, also check schema.table format
    if self.schema_name:
        full_name = f"{self.schema_name}.{resource_identifier}"
        if compiled_regex.match(full_name):
            return True

    return False

Transform

Bases: ProtoTransform

Concrete transform implementation.

Wraps a ProtoTransform with input extraction, output dressing, field mapping, and transform composition.

Attributes:

Name Type Description
fields tuple[str, ...]

Tuple of fields to transform

rename dict[str, str]

Dictionary mapping input fields to output fields

functional_transform bool

Whether this is a functional transform

Source code in graflo/architecture/contract/declarations/transform.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
class Transform(ProtoTransform):
    """Concrete transform implementation.

    Wraps a ProtoTransform with input extraction, output dressing, field
    mapping, and transform composition.

    Attributes:
        fields: Tuple of fields to transform
        rename: Dictionary mapping input fields to output fields
        functional_transform: Whether this is a functional transform
    """

    fields: tuple[str, ...] = Field(
        default_factory=tuple,
        description="Field names for declarative transform (used to derive input when input unset).",
    )
    rename: dict[str, str] = Field(
        default_factory=dict,
        description="Mapping of input_key -> output_key for pure field renaming (no function).",
    )
    strategy: Literal["single", "each", "all"] = Field(
        default="single",
        description=(
            "Functional call strategy. "
            "single: call function once with all input values. "
            "each: call function once per input field (unary). "
            "all: pass full document as a single argument."
        ),
    )
    passthrough_group_output: bool = Field(
        default=True,
        description=(
            "When grouped mode omits outputs, map function results back to input group keys."
        ),
    )

    functional_transform: bool = Field(
        default=False,
        description="True when a callable (module.foo) is set; False for pure map/dress transforms.",
    )

    @model_validator(mode="before")
    @classmethod
    def _normalize_fields(cls, data: Any) -> Any:
        if not isinstance(data, dict):
            return data
        data = dict(data)
        if "fields" in data and data["fields"] is not None:
            data["fields"] = _tuple_it(data["fields"])
        if "switch" in data:
            raise ValueError(
                "Legacy `switch` is no longer supported. Use `input` + `dress`."
            )
        return data

    @model_validator(mode="after")
    def _init_derived(self) -> Self:
        explicit_map = bool(self.rename)
        object.__setattr__(self, "functional_transform", self._foo is not None)
        next_input, next_output, next_map = self._derive_effective_io_and_map()
        object.__setattr__(self, "input", next_input)
        object.__setattr__(self, "output", next_output)
        object.__setattr__(self, "map", next_map)
        self._validate_configuration(explicit_map=explicit_map)
        return self

    def _derive_grouped_default_output(self) -> tuple[str, ...]:
        if not self.input_groups or self.output or self.output_groups:
            return self.output
        if not self.passthrough_group_output:
            return self.output
        scalar_names: list[str] = []
        for group in self.input_groups:
            if len(group) != 1:
                return self.output
            scalar_names.append(group[0])
        return tuple(scalar_names) if scalar_names else self.output

    def _derive_effective_io_and_map(
        self,
    ) -> tuple[tuple[str, ...], tuple[str, ...], dict[str, str]]:
        """Compute effective input/output/map once using explicit precedence."""
        next_input = self.input
        next_output = self._derive_grouped_default_output()
        next_map = dict(self.rename)

        if self.fields and not next_input:
            next_input = self.fields

        if next_map:
            if not next_input and not next_output:
                next_input = tuple(next_map.keys())
                next_output = tuple(next_map.values())
            elif not next_input:
                next_input = tuple(next_map.keys())
            elif not next_output:
                next_output = tuple(next_map.values())

        if self.dress is not None:
            next_output = (self.dress.key, self.dress.value)
        elif not next_output and next_input:
            next_output = next_input

        if (
            not next_map
            and next_input
            and next_output
            and len(next_input) == len(next_output)
        ):
            next_map = {src: dst for src, dst in zip(next_input, next_output)}

        return next_input, next_output, next_map

    def _init_io_from_map(self, force_init: bool = False) -> None:
        """Backwards-compatible shim; prefer sync_io_from_map()."""
        if not self.rename:
            return
        map_input = tuple(self.rename.keys())
        map_output = tuple(self.rename.values())
        if force_init or (not self.input and not self.output):
            object.__setattr__(self, "input", map_input)
            object.__setattr__(self, "output", map_output)
            return
        if not self.input:
            object.__setattr__(self, "input", map_input)
        elif not self.output:
            object.__setattr__(self, "output", map_output)

    def _validate_configuration(self, *, explicit_map: bool) -> None:
        """Validate that the transform has enough information to operate."""
        if self.target == "keys":
            if self.input_groups or self.output_groups:
                raise ValueError(
                    "target='keys' does not accept input_groups/output_groups."
                )
            if self._foo is None:
                raise ValueError("target='keys' requires a functional transform.")
            if self.rename:
                raise ValueError("target='keys' cannot be combined with map.")
            if self.input or self.output or self.fields:
                raise ValueError(
                    "target='keys' does not accept input/output/fields; use keys selector."
                )
            if self.dress is not None:
                raise ValueError("target='keys' is not compatible with dress.")
            if self.strategy != "single":
                raise ValueError(
                    "target='keys' uses implicit per-key execution and does not accept strategy."
                )
            return

        # Reject only user-specified map+function conflict. A derived map
        # (from input/output defaults) is valid for functional transforms.
        if explicit_map and self.rename and self._foo is not None:
            raise ValueError("map and functional transform cannot be used together.")
        if self.dress is not None:
            if len(self.input) != 1:
                raise ValueError("dress requires exactly one input field.")
        if self.strategy != "single" and self._foo is None:
            raise ValueError("strategy applies only to functional transforms.")
        if self.input_groups:
            if self._foo is None:
                raise ValueError(
                    "input_groups requires a functional transform (module + foo)."
                )
            if self.strategy != "single":
                raise ValueError(
                    "input_groups mode is explicit grouped execution and does not accept strategy."
                )
            if self.input or self.fields:
                raise ValueError("input_groups cannot be combined with input/fields.")
            if self.rename:
                raise ValueError("input_groups cannot be combined with map.")
            if self.dress is not None:
                raise ValueError("input_groups is not compatible with dress.")
            if self.output_groups and self.output:
                raise ValueError(
                    "Provide either output or output_groups for input_groups mode, not both."
                )
            if self.output_groups and len(self.output_groups) != len(self.input_groups):
                raise ValueError(
                    "output_groups must have same number of groups as input_groups."
                )
            if self.output and len(self.output) != len(self.input_groups):
                raise ValueError(
                    "When using input_groups with scalar outputs, output length must match number of input_groups."
                )
        elif self.output_groups:
            raise ValueError("output_groups requires input_groups.")
        if self._foo is not None and not self.input:
            if self.strategy != "all" and not self.input_groups:
                raise ValueError(
                    "Functional transforms require `input` (string or list of field names)."
                )
        if self.strategy == "all":
            if self.input or self.fields:
                raise ValueError("strategy='all' does not accept input/fields.")
            if self.dress is not None:
                raise ValueError("strategy='all' is not compatible with dress.")
        if self.strategy == "each":
            if not self.input:
                raise ValueError("strategy='each' requires one or more input fields.")
            if self.output and len(self.input) != len(self.output):
                raise ValueError(
                    "strategy='each' requires output length to match input length."
                )
        if (
            self._foo is None
            and self.dress is None
            and self.input
            and self.output
            and len(self.input) != len(self.output)
        ):
            raise ValueError(
                "Non-functional transforms require input/output to have the same length."
            )
        if (
            not self.input
            and not self.output
            and not self.input_groups
            and not self.output_groups
            and not self.name
            and not (self._foo is not None and self.strategy == "all")
        ):
            raise ValueError(
                "Either input/output, fields, map or name must be provided in "
                "Transform constructor."
            )

    def _refresh_derived(self) -> None:
        """Re-run derived state (e.g. map from input/output) after mutating attributes."""
        if self.rename or not self.input or not self.output:
            return
        if len(self.input) != len(self.output):
            return
        object.__setattr__(
            self, "map", {src: dst for src, dst in zip(self.input, self.output)}
        )

    def __call__(self, *nargs: Any, **kwargs: Any) -> dict[str, Any] | Any:
        """Execute the transform.

        Args:
            *nargs: Positional arguments for the transform
            **kwargs: Keyword arguments for the transform

        Returns:
            dict: Transformed data
        """
        if self.target == "keys":
            input_doc = nargs[0] if nargs and isinstance(nargs[0], dict) else None
            if input_doc is None:
                raise TransformException(
                    "target='keys' requires a document dictionary."
                )
            return self._transform_keys(input_doc, **kwargs)

        if self.input_groups:
            input_doc = nargs[0] if nargs and isinstance(nargs[0], dict) else None
            if input_doc is None:
                raise TransformException(
                    "input_groups transforms require a document dictionary."
                )
            return self._transform_input_groups(input_doc, **kwargs)

        if self.is_mapping:
            input_doc = nargs[0]
            if isinstance(input_doc, dict):
                output_values = [input_doc[k] for k in self.input]
            else:
                output_values = list(nargs)
            if self.dress is not None and len(output_values) == 1:
                # Non-functional dress shorthand: keep scalar value.
                output_values = output_values[0]
        else:
            if self.strategy == "all":
                if nargs and isinstance(nargs[0], dict):
                    output_values = self.apply(nargs[0], **kwargs)
                else:
                    output_values = self.apply(*nargs, **kwargs)
            elif self.strategy == "each":
                if nargs and isinstance(input_doc := nargs[0], dict):
                    output_values = [
                        self.apply(input_doc[k], **kwargs) for k in self.input
                    ]
                else:
                    output_values = [self.apply(value, **kwargs) for value in nargs]
            else:
                if nargs and isinstance(input_doc := nargs[0], dict):
                    new_args = [input_doc[k] for k in self.input]
                    output_values = self.apply(*new_args, **kwargs)
                else:
                    output_values = self.apply(*nargs, **kwargs)

        if self.output:
            r = self._dress_as_dict(output_values)
        else:
            r = output_values
        return r

    def _apply_grouped_result(
        self,
        out: dict[str, Any],
        result: Any,
        input_group: tuple[str, ...],
        output_group: tuple[str, ...] | None,
        *,
        group_index: int,
    ) -> None:
        if output_group is not None:
            if isinstance(result, (list, tuple)):
                values = list(result)
            else:
                values = [result]
            if len(values) != len(output_group):
                raise TransformException(
                    f"input_groups[{group_index}] produced {len(values)} values, "
                    f"but output_groups[{group_index}] expects {len(output_group)}."
                )
            pairs = zip(output_group, values)
        elif self.output:
            pairs = ((self.output[group_index], result),)
        else:
            if isinstance(result, (list, tuple)):
                values = list(result)
                if len(values) != len(input_group):
                    raise TransformException(
                        f"input_groups[{group_index}] has {len(input_group)} fields, "
                        f"but transform returned {len(values)} values. "
                        "Provide output/output_groups explicitly to resolve mapping."
                    )
                pairs = zip(input_group, values)
            else:
                if len(input_group) != 1:
                    raise TransformException(
                        f"input_groups[{group_index}] has {len(input_group)} fields "
                        "but transform returned a scalar. "
                        "Provide output/output_groups explicitly for scalar group results."
                    )
                pairs = ((input_group[0], result),)
        for key, value in pairs:
            if key in out:
                raise TransformException(
                    f"Grouped transform produced duplicate output key '{key}'."
                )
            out[key] = value

    def _transform_input_groups(
        self, doc: dict[str, Any], **kwargs: Any
    ) -> dict[str, Any]:
        out: dict[str, Any] = {}
        for idx, input_group in enumerate(self.input_groups):
            values = [doc[k] for k in input_group]
            result = self.apply(*values, **kwargs)
            output_group = self.output_groups[idx] if self.output_groups else None
            self._apply_grouped_result(
                out,
                result,
                input_group,
                output_group,
                group_index=idx,
            )
        return out

    @property
    def is_mapping(self) -> bool:
        """True when the transform is pure mapping (no function)."""
        return self._foo is None

    def _dress_as_dict(self, transform_result: Any) -> dict[str, Any]:
        """Convert transform result to dictionary format.

        When ``dress`` is set the result is pivoted: the input field name is
        stored under ``dress.key`` and the function result under ``dress.value``.
        Otherwise the result is mapped positionally to ``output`` fields.
        """
        if self.dress is not None:
            return {
                self.dress.key: self.input[0],
                self.dress.value: transform_result,
            }
        elif isinstance(transform_result, (list, tuple)):
            return {k: v for k, v in zip(self.output, transform_result)}
        else:
            return {self.output[-1]: transform_result}

    def _selected_keys(self, doc: dict[str, Any]) -> set[str]:
        if self.keys.mode == "all":
            return set(doc.keys())
        selected = set(self.keys.names)
        if self.keys.mode == "include":
            return selected
        return {k for k in doc if k not in selected}

    def _transform_keys(self, doc: dict[str, Any], **kwargs: Any) -> dict[str, Any]:
        selected = self._selected_keys(doc)
        out: dict[str, Any] = {}
        for key, value in doc.items():
            new_key = self.apply(key, **kwargs) if key in selected else key
            if not isinstance(new_key, str):
                raise TransformException(
                    "Key transform functions must return str values."
                )
            if new_key in out:
                raise TransformException(
                    f"Key transform collision detected for key '{new_key}'."
                )
            out[new_key] = value
        return out

    @property
    def is_dummy(self) -> bool:
        """Check if this is a dummy transform.

        Returns:
            bool: True if this is a dummy transform
        """
        return self.name is not None and not self.rename and self._foo is None

    def merge_from(self, t: Transform) -> Transform:
        """Merge another transform's configuration into a copy of it.

        Returns a new Transform with values from self overriding t where set.
        Does not override ConfigBaseModel.update (in-place); use this for
        copy-and-merge semantics.

        Args:
            t: Transform to merge from

        Returns:
            Transform: New transform with merged configuration
        """
        t_copy = deepcopy(t)
        if self.input:
            t_copy.input = self.input
        if self.output:
            t_copy.output = self.output
        if self.params:
            t_copy.params = {**t_copy.params, **self.params}
        t_copy._refresh_derived()
        return t_copy

    def get_barebone(
        self, other: Transform | None
    ) -> tuple[Transform | None, Transform | None]:
        """Get the barebone transform configuration.

        Args:
            other: Optional transform to use as base

        Returns:
            tuple[Transform | None, Transform | None]: Updated self transform
            and transform to store in library
        """
        self_param = self.to_dict(exclude_defaults=True)
        if self.foo is not None:
            # self will be the lib transform
            return None, self
        elif other is not None and other.foo is not None:
            # init self from other
            self_param.pop("foo", None)
            self_param.pop("module", None)
            other_param = other.to_dict(exclude_defaults=True)
            other_param.update(self_param)
            return Transform(**other_param), None
        else:
            return None, None

is_dummy property

Check if this is a dummy transform.

Returns:

Name Type Description
bool bool

True if this is a dummy transform

is_mapping property

True when the transform is pure mapping (no function).

__call__(*nargs, **kwargs)

Execute the transform.

Parameters:

Name Type Description Default
*nargs Any

Positional arguments for the transform

()
**kwargs Any

Keyword arguments for the transform

{}

Returns:

Name Type Description
dict dict[str, Any] | Any

Transformed data

Source code in graflo/architecture/contract/declarations/transform.py
def __call__(self, *nargs: Any, **kwargs: Any) -> dict[str, Any] | Any:
    """Execute the transform.

    Args:
        *nargs: Positional arguments for the transform
        **kwargs: Keyword arguments for the transform

    Returns:
        dict: Transformed data
    """
    if self.target == "keys":
        input_doc = nargs[0] if nargs and isinstance(nargs[0], dict) else None
        if input_doc is None:
            raise TransformException(
                "target='keys' requires a document dictionary."
            )
        return self._transform_keys(input_doc, **kwargs)

    if self.input_groups:
        input_doc = nargs[0] if nargs and isinstance(nargs[0], dict) else None
        if input_doc is None:
            raise TransformException(
                "input_groups transforms require a document dictionary."
            )
        return self._transform_input_groups(input_doc, **kwargs)

    if self.is_mapping:
        input_doc = nargs[0]
        if isinstance(input_doc, dict):
            output_values = [input_doc[k] for k in self.input]
        else:
            output_values = list(nargs)
        if self.dress is not None and len(output_values) == 1:
            # Non-functional dress shorthand: keep scalar value.
            output_values = output_values[0]
    else:
        if self.strategy == "all":
            if nargs and isinstance(nargs[0], dict):
                output_values = self.apply(nargs[0], **kwargs)
            else:
                output_values = self.apply(*nargs, **kwargs)
        elif self.strategy == "each":
            if nargs and isinstance(input_doc := nargs[0], dict):
                output_values = [
                    self.apply(input_doc[k], **kwargs) for k in self.input
                ]
            else:
                output_values = [self.apply(value, **kwargs) for value in nargs]
        else:
            if nargs and isinstance(input_doc := nargs[0], dict):
                new_args = [input_doc[k] for k in self.input]
                output_values = self.apply(*new_args, **kwargs)
            else:
                output_values = self.apply(*nargs, **kwargs)

    if self.output:
        r = self._dress_as_dict(output_values)
    else:
        r = output_values
    return r

get_barebone(other)

Get the barebone transform configuration.

Parameters:

Name Type Description Default
other Transform | None

Optional transform to use as base

required

Returns:

Type Description
Transform | None

tuple[Transform | None, Transform | None]: Updated self transform

Transform | None

and transform to store in library

Source code in graflo/architecture/contract/declarations/transform.py
def get_barebone(
    self, other: Transform | None
) -> tuple[Transform | None, Transform | None]:
    """Get the barebone transform configuration.

    Args:
        other: Optional transform to use as base

    Returns:
        tuple[Transform | None, Transform | None]: Updated self transform
        and transform to store in library
    """
    self_param = self.to_dict(exclude_defaults=True)
    if self.foo is not None:
        # self will be the lib transform
        return None, self
    elif other is not None and other.foo is not None:
        # init self from other
        self_param.pop("foo", None)
        self_param.pop("module", None)
        other_param = other.to_dict(exclude_defaults=True)
        other_param.update(self_param)
        return Transform(**other_param), None
    else:
        return None, None

merge_from(t)

Merge another transform's configuration into a copy of it.

Returns a new Transform with values from self overriding t where set. Does not override ConfigBaseModel.update (in-place); use this for copy-and-merge semantics.

Parameters:

Name Type Description Default
t Transform

Transform to merge from

required

Returns:

Name Type Description
Transform Transform

New transform with merged configuration

Source code in graflo/architecture/contract/declarations/transform.py
def merge_from(self, t: Transform) -> Transform:
    """Merge another transform's configuration into a copy of it.

    Returns a new Transform with values from self overriding t where set.
    Does not override ConfigBaseModel.update (in-place); use this for
    copy-and-merge semantics.

    Args:
        t: Transform to merge from

    Returns:
        Transform: New transform with merged configuration
    """
    t_copy = deepcopy(t)
    if self.input:
        t_copy.input = self.input
    if self.output:
        t_copy.output = self.output
    if self.params:
        t_copy.params = {**t_copy.params, **self.params}
    t_copy._refresh_derived()
    return t_copy

Vertex

Bases: ConfigBaseModel

Represents a vertex in the graph database.

A vertex is a fundamental unit in the graph that can have properties, identity, and filters. Properties can be specified as strings, Field objects, or dicts. Internally, properties are stored as Field objects but behave like strings where a string-like Field is needed.

Attributes:

Name Type Description
name str

Name of the vertex

properties list[Field]

List of field names (str), Field objects, or dicts. Will be normalized to Field objects by the validator.

identity list[str]

List of property names forming logical primary identity

filters list[FilterExpression]

List of filter expressions

Examples:

>>> # List of strings
>>> v1 = Vertex(name="user", properties=["id", "name"])
>>> # Typed properties: list of Field objects
>>> v2 = Vertex(name="user", properties=[
...     Field(name="id", type="INT"),
...     Field(name="name", type="STRING")
... ])
>>> # From dicts (e.g., from YAML/JSON)
>>> v3 = Vertex(name="user", properties=[
...     {"name": "id", "type": "INT"},
...     {"name": "name"}  # defaults to None type
... ])
Source code in graflo/architecture/schema/vertex.py
class Vertex(ConfigBaseModel):
    """Represents a vertex in the graph database.

    A vertex is a fundamental unit in the graph that can have properties, identity,
    and filters. Properties can be specified as strings, Field objects, or dicts.
    Internally, properties are stored as Field objects but behave like strings
    where a string-like Field is needed.

    Attributes:
        name: Name of the vertex
        properties: List of field names (str), Field objects, or dicts.
               Will be normalized to Field objects by the validator.
        identity: List of property names forming logical primary identity
        filters: List of filter expressions

    Examples:
        >>> # List of strings
        >>> v1 = Vertex(name="user", properties=["id", "name"])

        >>> # Typed properties: list of Field objects
        >>> v2 = Vertex(name="user", properties=[
        ...     Field(name="id", type="INT"),
        ...     Field(name="name", type="STRING")
        ... ])

        >>> # From dicts (e.g., from YAML/JSON)
        >>> v3 = Vertex(name="user", properties=[
        ...     {"name": "id", "type": "INT"},
        ...     {"name": "name"}  # defaults to None type
        ... ])
    """

    # Allow extra keys when loading from YAML (e.g. transforms, other runtime keys)
    model_config = ConfigDict(extra="ignore")

    name: str = PydanticField(
        ...,
        description="Name of the vertex type (e.g. user, post, company).",
    )
    properties: list[Field] = PydanticField(
        default_factory=list,
        description="List of fields (names, Field objects, or dicts). Normalized to Field objects.",
    )
    identity: list[str] = PydanticField(
        default_factory=list,
        description="Logical identity property names (primary key semantics for matching/upserts).",
    )
    filters: list[FilterExpression] = PydanticField(
        default_factory=list,
        description="Filter expressions (logical formulae) applied when querying this vertex.",
    )
    description: str | None = PydanticField(
        default=None,
        description="Optional semantic description of the vertex meaning, role, and intended interpretation.",
    )

    @field_validator("properties", mode="before")
    @classmethod
    def convert_to_properties(cls, v: Any) -> Any:
        if not isinstance(v, list):
            raise ValueError("properties must be a list")
        return [_normalize_fields_item(item) for item in v]

    @field_validator("filters", mode="before")
    @classmethod
    def convert_to_expressions(cls, v: Any) -> Any:
        if not isinstance(v, list):
            return v
        result: list[FilterExpression] = []
        for item in v:
            if isinstance(item, FilterExpression):
                result.append(item)
            elif isinstance(item, (dict, list)):
                result.append(FilterExpression.from_dict(item))
            else:
                raise ValueError(
                    "each filter must be a FilterExpression instance or a dict/list (parsed as FilterExpression)"
                )
        return result

    @field_validator("identity", mode="before")
    @classmethod
    def convert_identity(cls, v: Any) -> Any:
        if v is None:
            return []
        if isinstance(v, tuple):
            return list(v)
        if isinstance(v, list):
            return v
        raise ValueError("identity must be a list[str]")

    @model_validator(mode="after")
    def set_identity(self) -> "Vertex":
        merged_properties = _merge_duplicate_fields(self.name, list(self.properties))
        identity_names = _dedupe_ordered(list(self.identity))
        seen_names = {f.name for f in merged_properties}
        augmented = list(merged_properties)
        for name in identity_names:
            if name not in seen_names:
                augmented.append(Field(name=name, type=None))
                seen_names.add(name)
        object.__setattr__(self, "identity", identity_names)
        object.__setattr__(self, "properties", augmented)
        return self

    @property
    def property_names(self) -> list[str]:
        """Property names as strings (Field.name for each entry)."""
        return [field.name for field in self.properties]

    def get_properties(self) -> list[Field]:
        return self.properties

    def finish_init(self):
        """Complete logical initialization for vertex."""
        return None

property_names property

Property names as strings (Field.name for each entry).

finish_init()

Complete logical initialization for vertex.

Source code in graflo/architecture/schema/vertex.py
def finish_init(self):
    """Complete logical initialization for vertex."""
    return None

VertexConfig

Bases: ConfigBaseModel

Configuration for managing vertices.

This class manages vertices, providing methods for accessing and manipulating vertex configurations.

Attributes:

Name Type Description
vertices list[Vertex]

List of vertex configurations

blank_vertices list[str]

List of blank vertex names

force_types dict[str, list]

Dictionary mapping vertex names to type lists

Source code in graflo/architecture/schema/vertex.py
class VertexConfig(ConfigBaseModel):
    """Configuration for managing vertices.

    This class manages vertices, providing methods for accessing
    and manipulating vertex configurations.

    Attributes:
        vertices: List of vertex configurations
        blank_vertices: List of blank vertex names
        force_types: Dictionary mapping vertex names to type lists
    """

    # Allow extra keys when loading from YAML (e.g. vertex_config wrapper key)
    model_config = ConfigDict(extra="ignore")

    vertices: list[Vertex] = PydanticField(
        ...,
        description="List of vertex type definitions (name, properties, identity, filters).",
    )
    blank_vertices: list[str] = PydanticField(
        default_factory=list,
        description="Vertex names that may be created without explicit data (e.g. placeholders).",
    )
    force_types: dict[str, list] = PydanticField(
        default_factory=dict,
        description="Override mapping: vertex name -> list of field type names for type inference.",
    )
    identity_from_all_properties: bool = PydanticField(
        default=True,
        description=(
            "When true, vertices without explicit identity fall back to all property names. "
            "When false, explicit identity is required except for blank vertices."
        ),
    )
    _vertices_map: dict[str, Vertex] | None = PrivateAttr(default=None)
    _vertex_numeric_fields_map: dict[str, object] | None = PrivateAttr(default=None)

    @model_validator(mode="after")
    def build_vertices_map_and_validate_blank(self) -> "VertexConfig":
        object.__setattr__(
            self,
            "_vertices_map",
            {item.name: item for item in self.vertices},
        )
        object.__setattr__(self, "_vertex_numeric_fields_map", {})
        if set(self.blank_vertices) - set(self.vertex_set):
            raise ValueError(
                f" Blank vertices {self.blank_vertices} are not defined as vertices"
            )
        self._normalize_vertex_identities()
        return self

    def _normalize_vertex_identities(
        self,
    ) -> None:
        blank_id_field = "id"
        for vertex in self.vertices:
            if not vertex.identity:
                if vertex.name in self.blank_vertices:
                    vertex.identity = [blank_id_field]
                elif self.identity_from_all_properties:
                    vertex.identity = list(vertex.property_names)
                else:
                    raise ValueError(
                        f"Vertex '{vertex.name}' must define identity fields"
                    )
            vertex.identity = _dedupe_ordered(list(vertex.identity))
            missing = [f for f in vertex.identity if f not in vertex.property_names]
            for field_name in missing:
                vertex.properties.append(Field(name=field_name, type=None))

    def _get_vertices_map(self) -> dict[str, Vertex]:
        """Return the vertices map (set by model validator)."""
        if self._vertices_map is None:
            raise RuntimeError("VertexConfig not fully initialized")
        return self._vertices_map

    @property
    def vertex_set(self):
        """Get set of vertex names.

        Returns:
            set[str]: Set of vertex names
        """
        return set(self._get_vertices_map().keys())

    @property
    def vertex_list(self):
        """Get list of vertex configurations.

        Returns:
            list[Vertex]: List of vertex configurations
        """
        return list(self._get_vertices_map().values())

    def _get_vertex_by_name(self, identifier: str) -> Vertex:
        """Get vertex by logical vertex name."""
        m = self._get_vertices_map()
        if identifier in m:
            return m[identifier]
        available_names = list(m.keys())
        raise KeyError(
            f"Vertex '{identifier}' not found by logical name. "
            f"Available names: {available_names}"
        )

    def identity_fields(self, vertex_name: str) -> list[str]:
        """Get identity fields for a vertex."""
        return list(self._get_vertices_map()[vertex_name].identity)

    def properties(self, vertex_name: str) -> list[Field]:
        """Vertex properties as Field objects."""

        vertex = self._get_vertex_by_name(vertex_name)

        return vertex.properties

    def property_names(
        self,
        vertex_name: str,
    ) -> list[str]:
        """Vertex property names as strings."""

        vertex = self._get_vertex_by_name(vertex_name)
        return vertex.property_names

    def numeric_fields_list(self, vertex_name):
        """Get list of numeric fields for a vertex.

        Args:
            vertex_name: Name of the vertex

        Returns:
            tuple: Tuple of numeric field names

        Raises:
            ValueError: If vertex is not defined in config
        """
        if vertex_name in self.vertex_set:
            nmap = self._vertex_numeric_fields_map
            if nmap is not None and vertex_name in nmap:
                return nmap[vertex_name]
            else:
                return ()
        else:
            raise ValueError(
                " Accessing vertex numeric fields: vertex"
                f" {vertex_name} was not defined in config"
            )

    def filters(self, vertex_name) -> list[FilterExpression]:
        """Get filter clauses for a vertex.

        Args:
            vertex_name: Name of the vertex

        Returns:
            list[FilterExpression]: List of filter expressions
        """
        m = self._get_vertices_map()
        if vertex_name in m:
            return m[vertex_name].filters
        else:
            return []

    def remove_vertices(self, names: set[str]) -> None:
        """Remove vertices by name.

        Removes vertices from the configuration and from blank_vertices
        when present. Mutates the instance in place.

        Args:
            names: Set of vertex names to remove
        """
        if not names:
            return
        self.vertices[:] = [v for v in self.vertices if v.name not in names]
        m = self._get_vertices_map()
        for n in names:
            m.pop(n, None)
        self.blank_vertices[:] = [b for b in self.blank_vertices if b not in names]

    def update_vertex(self, v: Vertex):
        """Update vertex configuration.

        Args:
            v: Vertex configuration to update
        """
        self._get_vertices_map()[v.name] = v

    def __getitem__(self, key: str):
        """Get vertex configuration by name.

        Args:
            key: Vertex name

        Returns:
            Vertex: Vertex configuration

        Raises:
            KeyError: If vertex is not found
        """
        m = self._get_vertices_map()
        if key in m:
            return m[key]
        else:
            raise KeyError(f"Vertex {key} absent")

    def __setitem__(self, key: str, value: Vertex):
        """Set vertex configuration by name.

        Args:
            key: Vertex name
            value: Vertex configuration
        """
        self._get_vertices_map()[key] = value

    def finish_init(self):
        """Complete logical initialization of vertices."""
        self._normalize_vertex_identities()
        for v in self.vertices:
            v.finish_init()

vertex_list property

Get list of vertex configurations.

Returns:

Type Description

list[Vertex]: List of vertex configurations

vertex_set property

Get set of vertex names.

Returns:

Type Description

set[str]: Set of vertex names

__getitem__(key)

Get vertex configuration by name.

Parameters:

Name Type Description Default
key str

Vertex name

required

Returns:

Name Type Description
Vertex

Vertex configuration

Raises:

Type Description
KeyError

If vertex is not found

Source code in graflo/architecture/schema/vertex.py
def __getitem__(self, key: str):
    """Get vertex configuration by name.

    Args:
        key: Vertex name

    Returns:
        Vertex: Vertex configuration

    Raises:
        KeyError: If vertex is not found
    """
    m = self._get_vertices_map()
    if key in m:
        return m[key]
    else:
        raise KeyError(f"Vertex {key} absent")

__setitem__(key, value)

Set vertex configuration by name.

Parameters:

Name Type Description Default
key str

Vertex name

required
value Vertex

Vertex configuration

required
Source code in graflo/architecture/schema/vertex.py
def __setitem__(self, key: str, value: Vertex):
    """Set vertex configuration by name.

    Args:
        key: Vertex name
        value: Vertex configuration
    """
    self._get_vertices_map()[key] = value

filters(vertex_name)

Get filter clauses for a vertex.

Parameters:

Name Type Description Default
vertex_name

Name of the vertex

required

Returns:

Type Description
list[FilterExpression]

list[FilterExpression]: List of filter expressions

Source code in graflo/architecture/schema/vertex.py
def filters(self, vertex_name) -> list[FilterExpression]:
    """Get filter clauses for a vertex.

    Args:
        vertex_name: Name of the vertex

    Returns:
        list[FilterExpression]: List of filter expressions
    """
    m = self._get_vertices_map()
    if vertex_name in m:
        return m[vertex_name].filters
    else:
        return []

finish_init()

Complete logical initialization of vertices.

Source code in graflo/architecture/schema/vertex.py
def finish_init(self):
    """Complete logical initialization of vertices."""
    self._normalize_vertex_identities()
    for v in self.vertices:
        v.finish_init()

identity_fields(vertex_name)

Get identity fields for a vertex.

Source code in graflo/architecture/schema/vertex.py
def identity_fields(self, vertex_name: str) -> list[str]:
    """Get identity fields for a vertex."""
    return list(self._get_vertices_map()[vertex_name].identity)

numeric_fields_list(vertex_name)

Get list of numeric fields for a vertex.

Parameters:

Name Type Description Default
vertex_name

Name of the vertex

required

Returns:

Name Type Description
tuple

Tuple of numeric field names

Raises:

Type Description
ValueError

If vertex is not defined in config

Source code in graflo/architecture/schema/vertex.py
def numeric_fields_list(self, vertex_name):
    """Get list of numeric fields for a vertex.

    Args:
        vertex_name: Name of the vertex

    Returns:
        tuple: Tuple of numeric field names

    Raises:
        ValueError: If vertex is not defined in config
    """
    if vertex_name in self.vertex_set:
        nmap = self._vertex_numeric_fields_map
        if nmap is not None and vertex_name in nmap:
            return nmap[vertex_name]
        else:
            return ()
    else:
        raise ValueError(
            " Accessing vertex numeric fields: vertex"
            f" {vertex_name} was not defined in config"
        )

properties(vertex_name)

Vertex properties as Field objects.

Source code in graflo/architecture/schema/vertex.py
def properties(self, vertex_name: str) -> list[Field]:
    """Vertex properties as Field objects."""

    vertex = self._get_vertex_by_name(vertex_name)

    return vertex.properties

property_names(vertex_name)

Vertex property names as strings.

Source code in graflo/architecture/schema/vertex.py
def property_names(
    self,
    vertex_name: str,
) -> list[str]:
    """Vertex property names as strings."""

    vertex = self._get_vertex_by_name(vertex_name)
    return vertex.property_names

remove_vertices(names)

Remove vertices by name.

Removes vertices from the configuration and from blank_vertices when present. Mutates the instance in place.

Parameters:

Name Type Description Default
names set[str]

Set of vertex names to remove

required
Source code in graflo/architecture/schema/vertex.py
def remove_vertices(self, names: set[str]) -> None:
    """Remove vertices by name.

    Removes vertices from the configuration and from blank_vertices
    when present. Mutates the instance in place.

    Args:
        names: Set of vertex names to remove
    """
    if not names:
        return
    self.vertices[:] = [v for v in self.vertices if v.name not in names]
    m = self._get_vertices_map()
    for n in names:
        m.pop(n, None)
    self.blank_vertices[:] = [b for b in self.blank_vertices if b not in names]

update_vertex(v)

Update vertex configuration.

Parameters:

Name Type Description Default
v Vertex

Vertex configuration to update

required
Source code in graflo/architecture/schema/vertex.py
def update_vertex(self, v: Vertex):
    """Update vertex configuration.

    Args:
        v: Vertex configuration to update
    """
    self._get_vertices_map()[v.name] = v

VertexConfigDBAware

DB-aware projection wrapper for VertexConfig.

Source code in graflo/architecture/schema/db_aware.py
class VertexConfigDBAware:
    """DB-aware projection wrapper for `VertexConfig`."""

    def __init__(self, logical: VertexConfig, database_features: DatabaseProfile):
        self.logical = logical
        self.db_profile = database_features

    @property
    def vertex_set(self):
        return self.logical.vertex_set

    @property
    def blank_vertices(self):
        return self.logical.blank_vertices

    @property
    def vertices(self):
        return self.logical.vertices

    def vertex_dbname(self, vertex_name: str) -> str:
        return self.db_profile.vertex_storage_name(vertex_name)

    def index(self, vertex_name: str) -> Index:
        """Get primary index for a vertex (DB layer needs Index for collection setup)."""
        return Index(fields=self.identity_fields(vertex_name))

    def identity_fields(self, vertex_name: str) -> list[str]:
        identity = self.logical.identity_fields(vertex_name)
        if identity:
            return identity
        if vertex_name in self.logical.blank_vertices:
            return ["_key"] if self.db_profile.db_flavor == DBType.ARANGO else ["id"]
        return identity

    def properties(self, vertex_name: str) -> list[Field]:
        props = self.logical.properties(vertex_name)
        if self.db_profile.db_flavor != DBType.TIGERGRAPH:
            return props
        # TigerGraph needs explicit scalar defaults for schema definition.
        return [
            Field(name=f.name, type=FieldType.STRING if f.type is None else f.type)
            for f in props
        ]

    def property_names(self, vertex_name: str) -> list[str]:
        return [f.name for f in self.properties(vertex_name)]

index(vertex_name)

Get primary index for a vertex (DB layer needs Index for collection setup).

Source code in graflo/architecture/schema/db_aware.py
def index(self, vertex_name: str) -> Index:
    """Get primary index for a vertex (DB layer needs Index for collection setup)."""
    return Index(fields=self.identity_fields(vertex_name))