Runtime connector updates¶
Each FileConnector, TableConnector, and SparqlConnector gets a deterministic hash from its defining fields (excluding name and resource_name). Bindings indexes connectors and resource wiring by that hash, and connector_connection maps each connector to a conn_proxy by resolved hash.
If you change defining fields (for example narrowing a time_filter window (start / interval / end), adding filters, or adjusting file regex), the hash changes. You must replace the old connector in Bindings and re-wire internal maps; appending a second connector or using add_connector alone can leave stale hash entries.
Time filters (ColumnTimeFilter)¶
FileConnector and TableConnector share an optional nested time_filter (ColumnTimeFilter): a column name plus bounds. SQL is built with FilterExpression (same mechanism as filters).
| Field | Meaning |
|---|---|
column |
Identifier of the date/time column in generated SQL. |
start |
Optional lower bound (ISO date YYYY-MM-DD or ISO datetime). Combined with start_inclusive (default true → >=; false → >). |
end |
Optional upper bound. Combined with end_inclusive (default false → <; true → <=). |
interval |
Optional pandas Timedelta string (e.g. "7D", "2h"). Requires start; defines half-open [start, start + interval) with >= and <. Mutually exclusive with end. |
not_equals |
Single value for !=; mutually exclusive with start, end, and interval. |
Column-only hint: time_filter: { column: "created_at" } (no bounds) records the default datetime column for ingestion (IngestionParams.datetime_after / datetime_before) without adding a WHERE clause from the connector itself.
Durations must parse as a fixed pandas.Timedelta (wall-clock offset). Calendar-style strings that are not valid timedeltas (for example ambiguous month rolls) are unsupported; use explicit start / end instead.
At runtime, the read-only date_field property on connectors resolves to time_filter.column when present (for code and docs that read “which column is the event time?”). Manifests and patches must use the nested time_filter object; older flat date_* keys are not accepted.
Pushdown filters (TableConnector.filters)¶
TableConnector (and SelectSpec.where on view) accept FilterExpression trees in YAML. Filters are parsed when Bindings load (parse_filter_expression), so invalid shorthand fails at manifest validation—not on first SQL execution.
Use the same logical-operator shorthand as VertexConfig.filters in the schema block:
| Operator | YAML key | SQL pushdown |
|---|---|---|
| AND | AND: |
… AND … |
| OR | OR: |
… OR … |
| NOT | NOT: |
NOT … |
| Implication | IF_THEN: |
(NOT antecedent OR consequent) |
Example on a connector (also valid in ConnectorUpdate patches):
connectors:
- name: events_table
table_name: events
filters:
- OR:
- {field: status, cmp_operator: "==", value: [active]}
- {field: status, cmp_operator: "==", value: [pending]}
- IF_THEN:
- {field: kind, cmp_operator: "==", value: [quote]}
- {field: amount, cmp_operator: ">", value: [0]}
Semantics:
- Several top-level
filterslist entries areAND-joined (same as multiple pushdown clauses). - For OR across fields, use one composite entry (
OR:oroperator: OR+deps). - Nested composites are parenthesized in SQL for correct precedence.
Full cookbook and view.where notes: Table connector views — Bindings filter cookbook.
Manifest vs patches¶
The GraphManifest (and its bindings block) holds the normal contract only: connectors, resource_connector, optional connector_connection, optional staging_proxy. It does not include a connector_updates key—patches are outside the canonical manifest.
You apply patches after the manifest is loaded (or merged from disk), from:
- a separate YAML/JSON file (your own schema: list of dicts),
- environment or CLI-derived parameters,
- or plain Python.
Then call Bindings.apply_connector_update or replace_connector before GraphEngine, RegistryBuilder.build, or any code that assumes bindings are final.
Registry and DataSourceRegistry
Build the registry after patches are applied. Otherwise SQL/file/SPARQL sources may use stale hashes or queries.
Patch shape (ConnectorUpdate)¶
ConnectorUpdate is the typed carrier for one patch:
- Required:
connector— connectornameorhash(same resolution asresource_connector.connectorandconnector_connection.connector). - Any other keys: merged onto that connector’s current data; same field names as
TableConnector/FileConnector/SparqlConnector, but only for fields you change (patch-only). Do not repeattable_name,rdf_class, etc. unless you are actually changing them.
Extra keys use Pydantic extra="allow", so new connector fields do not require extending ConnectorUpdate.
External YAML (example only)¶
Your application can define any file layout; below is a minimal list you might load with yaml.safe_load and apply in a loop. This file is not part of GraphManifest.
# connector_patches.yaml (separate from manifest)
# Canonical: patch the nested time_filter (merged in full on the connector).
- connector: events_table
time_filter:
column: created_at
start: "2021-06-01"
interval: "30D"
Baseline manifest bindings (no patches)¶
bindings:
connectors:
- name: events_table
table_name: events
time_filter:
column: created_at
start: "2020-01-01"
interval: "365D"
resource_connector:
- resource: events
connector: events_table
Apply after load¶
from pathlib import Path
import yaml
from graflo.architecture.contract.bindings import Bindings, ConnectorUpdate
bindings = Bindings.model_validate(manifest_dict["bindings"])
for row in yaml.safe_load(Path("connector_patches.yaml").read_text()):
bindings.apply_connector_update(ConnectorUpdate.model_validate(row))
# Then attach bindings to your manifest object / pass to GraphEngine / build registry.
Or build ConnectorUpdate instances directly without a side file:
from graflo.architecture.contract.bindings import Bindings, ConnectorUpdate
bindings.apply_connector_update(
ConnectorUpdate.model_validate(
{
"connector": "events_table",
"time_filter": {
"column": "created_at",
"start": "2021-06-01",
"interval": "30D",
},
}
)
)
API details¶
Bindings.apply_connector_update¶
Resolves update.connector, merges old.model_dump(mode="python") with the patch, runs old.__class__.model_validate(merged), then swaps the instance and reindexes. Validators (including hash) must run; a plain model_copy(update=patch) would not re-run @model_validator and would leave a stale hash.
Bindings.replace_connector¶
Lower-level: replace_connector(old, new) where old is an existing connector instance or a name/hash string, and new is the fully built replacement (TableConnector | FileConnector | SparqlConnector). If new.name is unset and the old connector had a name, the name is copied onto new. Resource→connector hash lists and conn_proxy mappings move from the old hash to new.hash, then name/hash indexes are rebuilt.
Use this when you already built new yourself; use apply_connector_update for dict-shaped patches.
When patching time_filter, the merged payload replaces the entire nested object (provide a full time_filter dict in YAML/JSON, not single-key deltas inside it).
Behaviour summary¶
| Concern | Behaviour |
|---|---|
Resource bindings (resource_connector / resource_name on connector) |
Hash entries in internal resource maps are rewritten to the new hash. |
connector_connection / conn_proxy |
Mapping follows the connector from old hash to new hash. |
Connector name |
Preserved on replace when the new instance has no name. |
| Empty patch | apply_connector_update no-ops if there are no extra keys besides connector. |
| Invalid patch fields | Validation error from the concrete connector model when merging. |
Related concepts¶
- Table connector views and
SelectSpec— advancedTableConnectorSQL shape and Bindings filter cookbook. - Explicit
connector_connectionproxy wiring — manifest example forconn_proxy. - Bindings overview in Concepts overview.
Implementation entry points:
graflo.architecture.contract.bindings.ColumnTimeFiltergraflo.architecture.contract.bindings.ConnectorUpdategraflo.architecture.contract.bindings.Bindings.apply_connector_updategraflo.architecture.contract.bindings.Bindings.replace_connector