Skip to content

graflo.architecture.contract.bindings

Resource connectors and named binding collections.

Bindings

Bases: ConfigBaseModel

Named resource connectors with explicit resource linkage.

Source code in graflo/architecture/contract/bindings/core.py
class Bindings(ConfigBaseModel):
    """Named resource connectors with explicit resource linkage."""

    connectors: list[FileConnector | TableConnector | SparqlConnector] = Field(
        default_factory=list
    )
    # Accept dict entries at init-time (see validators below).
    # Internally and at runtime, Graflo uses typed lists derived from these.
    resource_connector: list[ResourceConnectorBinding | dict[str, str]] = Field(
        default_factory=list
    )
    # Connector -> runtime endpoint config indirection (proxy by name).
    connector_connection: list[ConnectorConnectionBinding | dict[str, str]] = Field(
        default_factory=list
    )
    _resource_connector_typed: list[ResourceConnectorBinding] = PrivateAttr(
        default_factory=list
    )
    _connector_connection_typed: list[ConnectorConnectionBinding] = PrivateAttr(
        default_factory=list
    )
    _connectors_index: dict[str, ResourceConnector] = PrivateAttr(default_factory=dict)
    _connectors_name_index: dict[str, str] = PrivateAttr(default_factory=dict)
    _resource_to_connector_hash: dict[str, str] = PrivateAttr(default_factory=dict)
    _connector_to_conn_proxy: dict[str, str] = PrivateAttr(default_factory=dict)

    @property
    def connector_connection_bindings(
        self,
    ) -> list[ConnectorConnectionBinding]:
        # Expose typed entries for downstream components (type-checker friendly).
        return self._connector_connection_typed

    def _rebuild_indexes(self) -> None:
        self._connectors_index = {}
        self._connectors_name_index = {}
        for connector in self.connectors:
            existing = self._connectors_index.get(connector.hash)
            if existing is not None:
                raise ValueError(
                    "Connector hash collision detected for connectors "
                    f"'{type(existing).__name__}' and '{type(connector).__name__}' "
                    f"(hash='{connector.hash}')."
                )
            self._connectors_index[connector.hash] = connector

            if connector.name:
                existing_hash = self._connectors_name_index.get(connector.name)
                if existing_hash is not None and existing_hash != connector.hash:
                    raise ValueError(
                        "Connector names must be unique when provided. "
                        f"Duplicate connector name '{connector.name}'."
                    )
                self._connectors_name_index[connector.name] = connector.hash

    @field_validator("resource_connector", mode="before")
    @classmethod
    def _coerce_resource_connector_entries(
        cls, v: Any
    ) -> list[ResourceConnectorBinding]:
        if v is None:
            return []
        if not isinstance(v, list):
            raise ValueError(
                "resource_connector must be a list of {resource, connector} entries"
            )

        coerced: list[ResourceConnectorBinding] = []
        for i, item in enumerate(v):
            if isinstance(item, ResourceConnectorBinding):
                coerced.append(item)
                continue

            if isinstance(item, dict):
                missing = [k for k in ("resource", "connector") if k not in item]
                if missing:
                    raise ValueError(
                        f"Invalid resource_connector entry at index {i}: missing required keys {missing}. "
                        "Expected keys: ['resource', 'connector']."
                    )

                try:
                    coerced.append(ResourceConnectorBinding.model_validate(item))
                except Exception as e:  # noqa: BLE001
                    # Keep the message concise and contextual; nested pydantic
                    # errors can be noisy for config authors.
                    raise ValueError(
                        f"Invalid resource_connector entry at index {i}: {item!r}."
                    ) from e
                continue

            raise ValueError(
                f"Invalid resource_connector entry at index {i}: expected dict or "
                f"ResourceConnectorBinding, got {type(item).__name__}."
            )

        return coerced

    @field_validator("connector_connection", mode="before")
    @classmethod
    def _coerce_connector_connection_entries(
        cls, v: Any
    ) -> list[ConnectorConnectionBinding]:
        if v is None:
            return []
        if not isinstance(v, list):
            raise ValueError(
                "connector_connection must be a list of {connector, conn_proxy} entries"
            )

        coerced: list[ConnectorConnectionBinding] = []
        for i, item in enumerate(v):
            if isinstance(item, ConnectorConnectionBinding):
                coerced.append(item)
                continue

            if isinstance(item, dict):
                missing = [k for k in ("connector", "conn_proxy") if k not in item]
                if missing:
                    raise ValueError(
                        f"Invalid connector_connection entry at index {i}: missing required keys {missing}. "
                        "Expected keys: ['connector', 'conn_proxy']."
                    )
                try:
                    coerced.append(ConnectorConnectionBinding.model_validate(item))
                except Exception as e:  # noqa: BLE001
                    raise ValueError(
                        f"Invalid connector_connection entry at index {i}: {item!r}."
                    ) from e
                continue

            raise ValueError(
                f"Invalid connector_connection entry at index {i}: expected dict or "
                f"ConnectorConnectionBinding, got {type(item).__name__}."
            )

        return coerced

    @staticmethod
    def default_connector_name(connector: ResourceConnector) -> str:
        if connector.name:
            return connector.name
        if isinstance(connector, FileConnector):
            return connector.regex or str(connector.sub_path)
        if isinstance(connector, TableConnector):
            return connector.table_name
        if isinstance(connector, SparqlConnector):
            return connector.rdf_class
        raise TypeError(f"Unsupported connector type: {type(connector)!r}")

    @model_validator(mode="after")
    def _populate_resource_connector(self) -> Self:
        self._rebuild_indexes()
        self._resource_to_connector_hash = {}

        # Create typed views so internal code never has to handle dicts.
        self._resource_connector_typed = [
            ResourceConnectorBinding.model_validate(m) if isinstance(m, dict) else m
            for m in self.resource_connector
        ]
        self._connector_connection_typed = [
            ConnectorConnectionBinding.model_validate(m) if isinstance(m, dict) else m
            for m in self.connector_connection
        ]

        for connector in self.connectors:
            if connector.resource_name is None:
                continue
            existing_hash = self._resource_to_connector_hash.get(
                connector.resource_name
            )
            if existing_hash is not None and existing_hash != connector.hash:
                raise ValueError(
                    "Conflicting resource binding for resource "
                    f"'{connector.resource_name}'."
                )
            self._resource_to_connector_hash[connector.resource_name] = connector.hash

        for mapping in self._resource_connector_typed:
            connector_hash = self._connectors_name_index.get(mapping.connector)
            if connector_hash is None:
                raise ValueError(
                    f"resource_connector references unknown connector '{mapping.connector}' "
                    f"for resource '{mapping.resource}'."
                )
            existing_hash = self._resource_to_connector_hash.get(mapping.resource)
            if existing_hash is not None and existing_hash != connector_hash:
                raise ValueError(
                    f"Conflicting resource binding for resource '{mapping.resource}'."
                )
            self._resource_to_connector_hash[mapping.resource] = connector_hash
        self._rebuild_connector_to_conn_proxy()
        return self

    def _resolve_connector_ref_to_hash(self, connector_ref: str) -> str:
        """Resolve a connector reference to its canonical connector hash.

        The contract allows referencing either:
        - ``connector.hash`` (canonical internal id), or
        - ``connector.name`` (when a name is provided / auto-filled).
        - ``resource_name`` (alias when ``connector.name`` is omitted in manifests).
        """
        if connector_ref in self._connectors_index:
            return connector_ref
        resolved_hash = self._connectors_name_index.get(connector_ref)
        if resolved_hash is None:
            resolved_hash = self._resource_to_connector_hash.get(connector_ref)
        if resolved_hash is None:
            raise ValueError(f"Unknown connector reference '{connector_ref}'")
        return resolved_hash

    def _rebuild_connector_to_conn_proxy(self) -> None:
        self._connector_to_conn_proxy = {}
        for mapping in self._connector_connection_typed:
            connector_hash = self._resolve_connector_ref_to_hash(mapping.connector)
            existing = self._connector_to_conn_proxy.get(connector_hash)
            if existing is not None and existing != mapping.conn_proxy:
                raise ValueError(
                    "Conflicting conn_proxy mapping for connector "
                    f"'{connector_hash}' (existing='{existing}', new='{mapping.conn_proxy}')."
                )
            self._connector_to_conn_proxy[connector_hash] = mapping.conn_proxy

    def get_conn_proxy_for_connector(
        self, connector: TableConnector | FileConnector | SparqlConnector
    ) -> str | None:
        """Return the mapped runtime proxy name for a given connector."""
        return self._connector_to_conn_proxy.get(connector.hash)

    def bind_connector_to_conn_proxy(
        self,
        connector: TableConnector | FileConnector | SparqlConnector,
        conn_proxy: str,
    ) -> None:
        """Bind a connector to a non-secret runtime proxy name.

        Uses ``connector.name`` when available, falling back to ``connector.hash``.
        """
        # Ensure indexes include the connector and that a default name is set.
        if connector.hash not in self._connectors_index:
            self.add_connector(connector)
        # Pick a contract reference string that's stable and user-friendly.
        connector_ref = connector.name or connector.hash

        # Ensure uniqueness by connector.hash (not by ref-string).
        connector_hash = connector.hash
        existing_idx: int | None = None
        for i, m in enumerate(self._connector_connection_typed):
            try:
                if self._resolve_connector_ref_to_hash(m.connector) == connector_hash:
                    existing_idx = i
                    break
            except ValueError:
                continue

        if existing_idx is None:
            self._connector_connection_typed.append(
                ConnectorConnectionBinding(
                    connector=connector_ref, conn_proxy=conn_proxy
                )
            )
        else:
            self._connector_connection_typed[existing_idx] = ConnectorConnectionBinding(
                connector=connector_ref, conn_proxy=conn_proxy
            )
        # Keep the public contract field in sync for serialization / downstream.
        self.connector_connection = list(self._connector_connection_typed)

        self._rebuild_connector_to_conn_proxy()

    @classmethod
    def from_dict(cls, data: dict[str, Any] | list[Any]) -> Self:
        if isinstance(data, list):
            raise ValueError(
                "Bindings.from_dict expects a mapping with 'connectors' and optional "
                "'resource_connector'. List-style connector payloads are not supported."
            )
        legacy_keys = {
            "postgres_connections",
            "table_connectors",
            "file_connectors",
            "sparql_connectors",
        }
        found_legacy = sorted(k for k in legacy_keys if k in data)
        if found_legacy:
            raise ValueError(
                "Legacy Bindings init keys are not supported. "
                f"Unsupported keys: {', '.join(found_legacy)}."
            )
        return cls.model_validate(data)

    def add_connector(
        self,
        connector: TableConnector | FileConnector | SparqlConnector,
    ) -> None:
        if connector.name is None:
            object.__setattr__(
                connector, "name", self.default_connector_name(connector)
            )
        existing_name_hash = None
        if connector.name:
            existing_name_hash = self._connectors_name_index.get(connector.name)
        if (
            connector.name
            and existing_name_hash is not None
            and existing_name_hash != connector.hash
        ):
            raise ValueError(
                "Connector names must be unique when provided. "
                f"Duplicate connector name '{connector.name}'."
            )

        if connector.hash in self._connectors_index:
            old_connector = self._connectors_index[connector.hash]
            for idx, existing in enumerate(self.connectors):
                if existing is old_connector:
                    self.connectors[idx] = connector
                    break
        else:
            self.connectors.append(connector)
        self._rebuild_indexes()
        if connector.resource_name is not None:
            existing_hash = self._resource_to_connector_hash.get(
                connector.resource_name
            )
            if existing_hash is not None and existing_hash != connector.hash:
                raise ValueError(
                    "Conflicting resource binding for resource "
                    f"'{connector.resource_name}'."
                )
            self._resource_to_connector_hash[connector.resource_name] = connector.hash

    def bind_resource(
        self,
        resource_name: str,
        connector: TableConnector | FileConnector | SparqlConnector,
    ) -> None:
        if connector.hash not in self._connectors_index:
            raise KeyError(f"Connector not found for hash='{connector.hash}'")
        self._resource_to_connector_hash[resource_name] = connector.hash
        connector_name = connector.name or self.default_connector_name(connector)
        mapping_idx = None
        for idx, mapping in enumerate(self._resource_connector_typed):
            if mapping.resource == resource_name:
                mapping_idx = idx
                break
        new_mapping = ResourceConnectorBinding(
            resource=resource_name,
            connector=connector_name,
        )
        if mapping_idx is None:
            self._resource_connector_typed.append(new_mapping)
        else:
            self._resource_connector_typed[mapping_idx] = new_mapping
        # Keep the public contract field in sync for serialization / downstream.
        self.resource_connector = list(self._resource_connector_typed)

    def get_connector_for_resource(
        self, resource_name: str
    ) -> TableConnector | FileConnector | SparqlConnector | None:
        connector_hash = self._resource_to_connector_hash.get(resource_name)
        if connector_hash is None:
            return None
        connector = self._connectors_index.get(connector_hash)
        if isinstance(connector, (TableConnector, FileConnector, SparqlConnector)):
            return connector
        return None

    def get_resource_type(self, resource_name: str) -> ResourceType | None:
        connector = self.get_connector_for_resource(resource_name)
        if connector is None:
            return None
        return connector.get_resource_type()

    def get_table_info(self, resource_name: str) -> tuple[str, str | None] | None:
        connector = self.get_connector_for_resource(resource_name)
        if isinstance(connector, TableConnector):
            return (connector.table_name, connector.schema_name)
        return None

bind_connector_to_conn_proxy(connector, conn_proxy)

Bind a connector to a non-secret runtime proxy name.

Uses connector.name when available, falling back to connector.hash.

Source code in graflo/architecture/contract/bindings/core.py
def bind_connector_to_conn_proxy(
    self,
    connector: TableConnector | FileConnector | SparqlConnector,
    conn_proxy: str,
) -> None:
    """Bind a connector to a non-secret runtime proxy name.

    Uses ``connector.name`` when available, falling back to ``connector.hash``.
    """
    # Ensure indexes include the connector and that a default name is set.
    if connector.hash not in self._connectors_index:
        self.add_connector(connector)
    # Pick a contract reference string that's stable and user-friendly.
    connector_ref = connector.name or connector.hash

    # Ensure uniqueness by connector.hash (not by ref-string).
    connector_hash = connector.hash
    existing_idx: int | None = None
    for i, m in enumerate(self._connector_connection_typed):
        try:
            if self._resolve_connector_ref_to_hash(m.connector) == connector_hash:
                existing_idx = i
                break
        except ValueError:
            continue

    if existing_idx is None:
        self._connector_connection_typed.append(
            ConnectorConnectionBinding(
                connector=connector_ref, conn_proxy=conn_proxy
            )
        )
    else:
        self._connector_connection_typed[existing_idx] = ConnectorConnectionBinding(
            connector=connector_ref, conn_proxy=conn_proxy
        )
    # Keep the public contract field in sync for serialization / downstream.
    self.connector_connection = list(self._connector_connection_typed)

    self._rebuild_connector_to_conn_proxy()

get_conn_proxy_for_connector(connector)

Return the mapped runtime proxy name for a given connector.

Source code in graflo/architecture/contract/bindings/core.py
def get_conn_proxy_for_connector(
    self, connector: TableConnector | FileConnector | SparqlConnector
) -> str | None:
    """Return the mapped runtime proxy name for a given connector."""
    return self._connector_to_conn_proxy.get(connector.hash)

FileConnector

Bases: ResourceConnector

Connector for matching files.

Attributes:

Name Type Description
regex str | None

Regular expression pattern for matching filenames

sub_path Path

Path to search for matching files (default: "./")

date_field str | None

Name of the date field to filter on (for date-based filtering)

date_filter str | None

SQL-style date filter condition (e.g., "> '2020-10-10'")

date_range_start str | None

Start date for range filtering (e.g., "2015-11-11")

date_range_days int | None

Number of days after start date (used with date_range_start)

Source code in graflo/architecture/contract/bindings/connectors.py
class FileConnector(ResourceConnector):
    """Connector for matching files.

    Attributes:
        regex: Regular expression pattern for matching filenames
        sub_path: Path to search for matching files (default: "./")
        date_field: Name of the date field to filter on (for date-based filtering)
        date_filter: SQL-style date filter condition (e.g., "> '2020-10-10'")
        date_range_start: Start date for range filtering (e.g., "2015-11-11")
        date_range_days: Number of days after start date (used with date_range_start)
    """

    regex: str | None = None
    sub_path: pathlib.Path = Field(default_factory=lambda: pathlib.Path("./"))
    date_field: str | None = None
    date_filter: str | None = None
    date_range_start: str | None = None
    date_range_days: int | None = None

    @model_validator(mode="after")
    def _validate_file_connector(self) -> Self:
        """Ensure sub_path is a Path and validate date filtering parameters."""
        if not isinstance(self.sub_path, pathlib.Path):
            object.__setattr__(self, "sub_path", pathlib.Path(self.sub_path))
        if (self.date_filter or self.date_range_start) and not self.date_field:
            raise ValueError(
                "date_field is required when using date_filter or date_range_start"
            )
        if self.date_range_days is not None and not self.date_range_start:
            raise ValueError("date_range_start is required when using date_range_days")
        return self

    def matches(self, resource_identifier: str) -> bool:
        """Check if connector matches a filename.

        Args:
            resource_identifier: Filename to match

        Returns:
            bool: True if connector matches
        """
        if self.regex is None:
            return False
        return bool(re.match(self.regex, resource_identifier))

    def get_resource_type(self) -> ResourceType:
        """Get resource type.

        FileConnector always represents a FILE resource type.
        The specific file format (CSV, JSON, JSONL, Parquet, etc.) is
        automatically detected by the loader based on file extensions.
        """
        return ResourceType.FILE

get_resource_type()

Get resource type.

FileConnector always represents a FILE resource type. The specific file format (CSV, JSON, JSONL, Parquet, etc.) is automatically detected by the loader based on file extensions.

Source code in graflo/architecture/contract/bindings/connectors.py
def get_resource_type(self) -> ResourceType:
    """Get resource type.

    FileConnector always represents a FILE resource type.
    The specific file format (CSV, JSON, JSONL, Parquet, etc.) is
    automatically detected by the loader based on file extensions.
    """
    return ResourceType.FILE

matches(resource_identifier)

Check if connector matches a filename.

Parameters:

Name Type Description Default
resource_identifier str

Filename to match

required

Returns:

Name Type Description
bool bool

True if connector matches

Source code in graflo/architecture/contract/bindings/connectors.py
def matches(self, resource_identifier: str) -> bool:
    """Check if connector matches a filename.

    Args:
        resource_identifier: Filename to match

    Returns:
        bool: True if connector matches
    """
    if self.regex is None:
        return False
    return bool(re.match(self.regex, resource_identifier))

JoinClause

Bases: ConfigBaseModel

Specification for a SQL JOIN operation.

Used by TableConnector to describe multi-table queries. Each JoinClause adds one JOIN to the generated SQL.

Attributes:

Name Type Description
table str

Table name to join (e.g. "all_classes").

schema_name str | None

Optional schema override for the joined table.

alias str | None

SQL alias for the joined table (e.g. "s", "t"). Required when the same table is joined more than once.

on_self str

Column on the base (left) table used in the ON condition.

on_other str

Column on the joined (right) table used in the ON condition.

join_type str

Type of join -- LEFT, INNER, etc. Defaults to LEFT.

select_fields list[str] | None

Explicit list of columns to SELECT from this join. When None every column of the joined table is included (aliased with the join alias prefix).

Source code in graflo/architecture/contract/bindings/connectors.py
class JoinClause(ConfigBaseModel):
    """Specification for a SQL JOIN operation.

    Used by TableConnector to describe multi-table queries. Each JoinClause
    adds one JOIN to the generated SQL.

    Attributes:
        table: Table name to join (e.g. "all_classes").
        schema_name: Optional schema override for the joined table.
        alias: SQL alias for the joined table (e.g. "s", "t"). Required when
            the same table is joined more than once.
        on_self: Column on the base (left) table used in the ON condition.
        on_other: Column on the joined (right) table used in the ON condition.
        join_type: Type of join -- LEFT, INNER, etc. Defaults to LEFT.
        select_fields: Explicit list of columns to SELECT from this join.
            When None every column of the joined table is included (aliased
            with the join alias prefix).
    """

    table: str = Field(..., description="Table name to join.")
    schema_name: str | None = Field(
        default=None, description="Schema override for the joined table."
    )
    alias: str | None = Field(
        default=None, description="SQL alias for the joined table."
    )
    on_self: str = Field(
        ..., description="Column on the base table for the ON condition."
    )
    on_other: str = Field(
        ..., description="Column on the joined table for the ON condition."
    )
    join_type: str = Field(default="LEFT", description="JOIN type (LEFT, INNER, etc.).")
    select_fields: list[str] | None = Field(
        default=None,
        description="Columns to SELECT from this join (None = all columns).",
    )

ResourceConnector

Bases: ConfigBaseModel, ABC

Abstract base class for resource connectors (files or tables).

Provides common API for connector matching and resource identification. All concrete connector types inherit from this class.

Connectors only describe source-side matching/query behavior. Resource-to- connector linkage is handled by Bindings.

Source code in graflo/architecture/contract/bindings/connectors.py
class ResourceConnector(ConfigBaseModel, abc.ABC):
    """Abstract base class for resource connectors (files or tables).

    Provides common API for connector matching and resource identification.
    All concrete connector types inherit from this class.

    Connectors only describe source-side matching/query behavior. Resource-to-
    connector linkage is handled by ``Bindings``.
    """

    name: str | None = Field(
        default=None,
        description="Optional connector name used by top-level resource_connector mapping.",
    )
    resource_name: str | None = Field(
        default=None,
        description="Optional direct resource binding declared on the connector itself.",
    )
    hash: str = Field(
        default="",
        exclude=True,
        description="Deterministic internal connector id derived from defining fields.",
    )

    def _hash_payload(self) -> dict[str, Any]:
        payload = self.model_dump(
            mode="json",
            by_alias=True,
            exclude={"hash", "name", "resource_name"},
        )
        payload["_connector_type"] = type(self).__name__
        return payload

    @model_validator(mode="after")
    def _compute_hash(self) -> Self:
        canonical = json.dumps(
            self._hash_payload(), sort_keys=True, separators=(",", ":")
        )
        object.__setattr__(
            self,
            "hash",
            hashlib.sha256(canonical.encode("utf-8")).hexdigest(),
        )
        return self

    @abc.abstractmethod
    def matches(self, resource_identifier: str) -> bool:
        """Check if connector matches a resource identifier.

        Args:
            resource_identifier: Identifier to match (filename or table name)

        Returns:
            bool: True if connector matches
        """
        pass

    @abc.abstractmethod
    def get_resource_type(self) -> ResourceType:
        """Get the type of resource this connector matches.

        Returns:
            ResourceType: Resource type enum value
        """
        pass

get_resource_type() abstractmethod

Get the type of resource this connector matches.

Returns:

Name Type Description
ResourceType ResourceType

Resource type enum value

Source code in graflo/architecture/contract/bindings/connectors.py
@abc.abstractmethod
def get_resource_type(self) -> ResourceType:
    """Get the type of resource this connector matches.

    Returns:
        ResourceType: Resource type enum value
    """
    pass

matches(resource_identifier) abstractmethod

Check if connector matches a resource identifier.

Parameters:

Name Type Description Default
resource_identifier str

Identifier to match (filename or table name)

required

Returns:

Name Type Description
bool bool

True if connector matches

Source code in graflo/architecture/contract/bindings/connectors.py
@abc.abstractmethod
def matches(self, resource_identifier: str) -> bool:
    """Check if connector matches a resource identifier.

    Args:
        resource_identifier: Identifier to match (filename or table name)

    Returns:
        bool: True if connector matches
    """
    pass

ResourceConnectorBinding

Bases: ConfigBaseModel

Top-level resource -> connector-name mapping entry.

Source code in graflo/architecture/contract/bindings/core.py
class ResourceConnectorBinding(ConfigBaseModel):
    """Top-level resource -> connector-name mapping entry."""

    resource: str
    connector: str

ResourceType

Bases: BaseEnum

Resource types for data sources.

Resource types distinguish between different data source categories. File type detection (CSV, JSON, JSONL, Parquet, etc.) is handled automatically by the loader based on file extensions.

Attributes:

Name Type Description
FILE

File-based data source (any format: CSV, JSON, JSONL, Parquet, etc.)

SQL_TABLE

SQL database table (e.g., PostgreSQL table)

SPARQL

SPARQL / RDF data source (endpoint or .ttl/.rdf files via rdflib)

Source code in graflo/architecture/contract/bindings/connectors.py
class ResourceType(BaseEnum):
    """Resource types for data sources.

    Resource types distinguish between different data source categories.
    File type detection (CSV, JSON, JSONL, Parquet, etc.) is handled
    automatically by the loader based on file extensions.

    Attributes:
        FILE: File-based data source (any format: CSV, JSON, JSONL, Parquet, etc.)
        SQL_TABLE: SQL database table (e.g., PostgreSQL table)
        SPARQL: SPARQL / RDF data source (endpoint or .ttl/.rdf files via rdflib)
    """

    FILE = "file"
    SQL_TABLE = "sql_table"
    SPARQL = "sparql"

SparqlConnector

Bases: ResourceConnector

Connector for matching SPARQL / RDF data sources.

Each SparqlConnector targets instances of a single rdf:Class. It can be backed either by a remote SPARQL endpoint (Fuseki, Blazegraph, ...) or by a local RDF file parsed with rdflib.

Attributes:

Name Type Description
rdf_class str

Full URI of the rdf:Class whose instances this connector fetches (e.g. "http://example.org/Person").

endpoint_url str | None

SPARQL query endpoint URL. When set, instances are fetched via HTTP. When None the connector is for local file mode.

graph_uri str | None

Named-graph URI to restrict the query to (optional).

sparql_query str | None

Custom SPARQL SELECT query override. When provided the auto-generated per-class query is skipped.

rdf_file Path | None

Path to a local RDF file (.ttl, .rdf, .n3, .jsonld). Mutually exclusive with endpoint_url.

Source code in graflo/architecture/contract/bindings/connectors.py
class SparqlConnector(ResourceConnector):
    """Connector for matching SPARQL / RDF data sources.

    Each ``SparqlConnector`` targets instances of a single ``rdf:Class``.
    It can be backed either by a remote SPARQL endpoint (Fuseki, Blazegraph, ...)
    or by a local RDF file parsed with *rdflib*.

    Attributes:
        rdf_class: Full URI of the ``rdf:Class`` whose instances this connector
            fetches (e.g. ``"http://example.org/Person"``).
        endpoint_url: SPARQL query endpoint URL.  When set, instances are
            fetched via HTTP.  When ``None`` the connector is for local file mode.
        graph_uri: Named-graph URI to restrict the query to (optional).
        sparql_query: Custom SPARQL ``SELECT`` query override.  When provided
            the auto-generated per-class query is skipped.
        rdf_file: Path to a local RDF file (``.ttl``, ``.rdf``, ``.n3``,
            ``.jsonld``).  Mutually exclusive with *endpoint_url*.
    """

    rdf_class: str = Field(
        ..., description="URI of the rdf:Class to fetch instances of"
    )
    endpoint_url: str | None = Field(
        default=None, description="SPARQL query endpoint URL"
    )
    graph_uri: str | None = Field(
        default=None, description="Named graph URI (optional)"
    )
    sparql_query: str | None = Field(
        default=None, description="Custom SPARQL query override"
    )
    rdf_file: pathlib.Path | None = Field(
        default=None, description="Path to a local RDF file"
    )

    def matches(self, resource_identifier: str) -> bool:
        """Match by the local name (fragment) of the rdf:Class URI.

        Args:
            resource_identifier: Identifier to match against

        Returns:
            True when *resource_identifier* equals the class local name
        """
        local_name = self.rdf_class.rsplit("#", 1)[-1].rsplit("/", 1)[-1]
        return resource_identifier == local_name

    def get_resource_type(self) -> ResourceType:
        """Return ``ResourceType.SPARQL``."""
        return ResourceType.SPARQL

    def build_select_query(self) -> str:
        """Build a SPARQL SELECT query for instances of ``rdf_class``.

        If *sparql_query* is set it is returned as-is.  Otherwise a simple
        per-class query is generated::

            SELECT ?s ?p ?o WHERE {
              ?s a <rdf_class> .
              ?s ?p ?o .
            }

        Returns:
            SPARQL query string
        """
        if self.sparql_query:
            return self.sparql_query

        graph_open = f"GRAPH <{self.graph_uri}> {{" if self.graph_uri else ""
        graph_close = "}" if self.graph_uri else ""

        return (
            "SELECT ?s ?p ?o WHERE { "
            f"{graph_open} "
            f"?s a <{self.rdf_class}> . "
            f"?s ?p ?o . "
            f"{graph_close} "
            "}"
        )

build_select_query()

Build a SPARQL SELECT query for instances of rdf_class.

If sparql_query is set it is returned as-is. Otherwise a simple per-class query is generated::

SELECT ?s ?p ?o WHERE {
  ?s a <rdf_class> .
  ?s ?p ?o .
}

Returns:

Type Description
str

SPARQL query string

Source code in graflo/architecture/contract/bindings/connectors.py
def build_select_query(self) -> str:
    """Build a SPARQL SELECT query for instances of ``rdf_class``.

    If *sparql_query* is set it is returned as-is.  Otherwise a simple
    per-class query is generated::

        SELECT ?s ?p ?o WHERE {
          ?s a <rdf_class> .
          ?s ?p ?o .
        }

    Returns:
        SPARQL query string
    """
    if self.sparql_query:
        return self.sparql_query

    graph_open = f"GRAPH <{self.graph_uri}> {{" if self.graph_uri else ""
    graph_close = "}" if self.graph_uri else ""

    return (
        "SELECT ?s ?p ?o WHERE { "
        f"{graph_open} "
        f"?s a <{self.rdf_class}> . "
        f"?s ?p ?o . "
        f"{graph_close} "
        "}"
    )

get_resource_type()

Return ResourceType.SPARQL.

Source code in graflo/architecture/contract/bindings/connectors.py
def get_resource_type(self) -> ResourceType:
    """Return ``ResourceType.SPARQL``."""
    return ResourceType.SPARQL

matches(resource_identifier)

Match by the local name (fragment) of the rdf:Class URI.

Parameters:

Name Type Description Default
resource_identifier str

Identifier to match against

required

Returns:

Type Description
bool

True when resource_identifier equals the class local name

Source code in graflo/architecture/contract/bindings/connectors.py
def matches(self, resource_identifier: str) -> bool:
    """Match by the local name (fragment) of the rdf:Class URI.

    Args:
        resource_identifier: Identifier to match against

    Returns:
        True when *resource_identifier* equals the class local name
    """
    local_name = self.rdf_class.rsplit("#", 1)[-1].rsplit("/", 1)[-1]
    return resource_identifier == local_name

TableConnector

Bases: ResourceConnector

Connector for matching database tables.

Supports simple single-table queries as well as multi-table JOINs and pushdown filters via FilterExpression.

Attributes:

Name Type Description
table_name str

Exact table name or regex pattern

schema_name str | None

Schema name (optional, defaults to public)

database str | None

Database name (optional)

date_field str | None

Name of the date field to filter on (for date-based filtering)

date_filter str | None

SQL-style date filter condition (e.g., "> '2020-10-10'")

date_range_start str | None

Start date for range filtering (e.g., "2015-11-11")

date_range_days int | None

Number of days after start date (used with date_range_start)

filters list[Any]

General-purpose pushdown filters rendered as SQL WHERE fragments.

joins list[JoinClause]

Multi-table JOIN specifications (auto-generated or explicit).

select_columns list[str] | None

Explicit SELECT column list. None means * for the base table (plus aliased columns from joins).

Source code in graflo/architecture/contract/bindings/connectors.py
class TableConnector(ResourceConnector):
    """Connector for matching database tables.

    Supports simple single-table queries as well as multi-table JOINs and
    pushdown filters via ``FilterExpression``.

    Attributes:
        table_name: Exact table name or regex pattern
        schema_name: Schema name (optional, defaults to public)
        database: Database name (optional)
        date_field: Name of the date field to filter on (for date-based filtering)
        date_filter: SQL-style date filter condition (e.g., "> '2020-10-10'")
        date_range_start: Start date for range filtering (e.g., "2015-11-11")
        date_range_days: Number of days after start date (used with date_range_start)
        filters: General-purpose pushdown filters rendered as SQL WHERE fragments.
        joins: Multi-table JOIN specifications (auto-generated or explicit).
        select_columns: Explicit SELECT column list. None means ``*`` for the
            base table (plus aliased columns from joins).
    """

    table_name: str = Field(
        default="", validation_alias=AliasChoices("table_name", "table")
    )
    schema_name: str | None = Field(
        default=None, validation_alias=AliasChoices("schema_name", "schema")
    )
    database: str | None = None
    date_field: str | None = None
    date_filter: str | None = None
    date_range_start: str | None = None
    date_range_days: int | None = None
    filters: list[Any] = Field(
        default_factory=list,
        description="Pushdown FilterExpression filters (rendered to SQL WHERE).",
    )
    joins: list[JoinClause] = Field(
        default_factory=list,
        description="JOIN clauses for multi-table queries.",
    )
    select_columns: list[str] | None = Field(
        default=None,
        description="Explicit SELECT columns. None = SELECT * (plus join aliases).",
    )
    view: Any = Field(
        default=None,
        description="SelectSpec or dict for declarative view (alternative to table+joins+filters).",
    )

    @field_validator("view", mode="before")
    @classmethod
    def _coerce_view(cls, v: Any) -> Any:
        if v is None:
            return None
        if isinstance(v, dict):
            from graflo.filter.select import SelectSpec

            return SelectSpec.from_dict(v)
        return v

    @model_validator(mode="after")
    def _validate_table_connector(self) -> Self:
        """Validate table_name and date filtering parameters."""
        if not self.table_name:
            raise ValueError("table_name is required for TableConnector")
        if (self.date_filter or self.date_range_start) and not self.date_field:
            raise ValueError(
                "date_field is required when using date_filter or date_range_start"
            )
        if self.date_range_days is not None and not self.date_range_start:
            raise ValueError("date_range_start is required when using date_range_days")
        return self

    def matches(self, resource_identifier: str) -> bool:
        """Check if connector matches a table name.

        Args:
            resource_identifier: Table name to match (format: schema.table or just table)

        Returns:
            bool: True if connector matches
        """
        if not self.table_name:
            return False

        # Compile regex expression
        if self.table_name.startswith("^") or self.table_name.endswith("$"):
            # Already a regex expression
            compiled_regex = re.compile(self.table_name)
        else:
            # Exact match expression
            compiled_regex = re.compile(f"^{re.escape(self.table_name)}$")

        # Check if resource_identifier matches
        if compiled_regex.match(resource_identifier):
            return True

        # If schema_name is specified, also check schema.table format
        if self.schema_name:
            full_name = f"{self.schema_name}.{resource_identifier}"
            if compiled_regex.match(full_name):
                return True

        return False

    def get_resource_type(self) -> ResourceType:
        """Get resource type."""
        return ResourceType.SQL_TABLE

    def build_where_clause(self) -> str:
        """Build SQL WHERE clause from date filtering parameters **and** general filters.

        Returns:
            WHERE clause string (without the WHERE keyword) or empty string if no filters
        """
        from graflo.filter.onto import FilterExpression
        from graflo.onto import ExpressionFlavor

        conditions: list[str] = []

        # Date-specific conditions (legacy fields)
        if self.date_field:
            if self.date_range_start and self.date_range_days is not None:
                conditions.append(
                    f"\"{self.date_field}\" >= '{self.date_range_start}'::date"
                )
                conditions.append(
                    f"\"{self.date_field}\" < '{self.date_range_start}'::date + INTERVAL '{self.date_range_days} days'"
                )
            elif self.date_filter:
                filter_parts = self.date_filter.strip().split(None, 1)
                if len(filter_parts) == 2:
                    operator, value = filter_parts
                    if not (value.startswith("'") and value.endswith("'")):
                        if len(value) == 10 and value.count("-") == 2:
                            value = f"'{value}'"
                    conditions.append(f'"{self.date_field}" {operator} {value}')
                else:
                    conditions.append(f'"{self.date_field}" {self.date_filter}')

        # General-purpose FilterExpression filters
        for filt in self.filters:
            if isinstance(filt, FilterExpression):
                rendered = filt(kind=ExpressionFlavor.SQL)
                if rendered:
                    conditions.append(str(rendered))

        if conditions:
            return " AND ".join(conditions)
        return ""

    def build_query(self, effective_schema: str | None = None) -> str:
        """Build a complete SQL SELECT query.

        When ``view`` is set, delegates to ``view.build_sql()``. Otherwise
        incorporates the base table, any JoinClauses, explicit select_columns,
        date filters, and FilterExpression filters.

        Args:
            effective_schema: Schema to use if ``self.schema_name`` is None.

        Returns:
            Complete SQL query string.
        """
        schema = self.schema_name or effective_schema or "public"
        if self.view is not None:
            from graflo.filter.select import SelectSpec

            if isinstance(self.view, SelectSpec):
                return self.view.build_sql(schema=schema, base_table=self.table_name)
        base_alias = "r" if self.joins else None
        base_ref = f'"{schema}"."{self.table_name}"'
        if base_alias:
            base_ref_aliased = f"{base_ref} {base_alias}"
        else:
            base_ref_aliased = base_ref

        # --- SELECT ---
        select_parts: list[str] = []
        if self.select_columns is not None:
            select_parts = list(self.select_columns)
        elif self.joins:
            select_parts.append(f"{base_alias}.*")
            for jc in self.joins:
                alias = jc.alias or jc.table
                jc_schema = jc.schema_name or schema
                if jc.select_fields is not None:
                    for col in jc.select_fields:
                        select_parts.append(f'{alias}."{col}" AS "{alias}__{col}"')
                else:
                    select_parts.append(f"{alias}.*")
        else:
            select_parts.append("*")

        select_clause = ", ".join(select_parts)

        # --- FROM + JOINs ---
        from_clause = base_ref_aliased
        for jc in self.joins:
            jc_schema = jc.schema_name or schema
            alias = jc.alias or jc.table
            join_ref = f'"{jc_schema}"."{jc.table}"'
            left_col = (
                f'{base_alias}."{jc.on_self}"' if base_alias else f'"{jc.on_self}"'
            )
            right_col = f'{alias}."{jc.on_other}"'
            from_clause += (
                f" {jc.join_type} JOIN {join_ref} {alias} ON {left_col} = {right_col}"
            )

        query = f"SELECT {select_clause} FROM {from_clause}"

        # --- WHERE ---
        where = self.build_where_clause()
        if where:
            query += f" WHERE {where}"

        return query

build_query(effective_schema=None)

Build a complete SQL SELECT query.

When view is set, delegates to view.build_sql(). Otherwise incorporates the base table, any JoinClauses, explicit select_columns, date filters, and FilterExpression filters.

Parameters:

Name Type Description Default
effective_schema str | None

Schema to use if self.schema_name is None.

None

Returns:

Type Description
str

Complete SQL query string.

Source code in graflo/architecture/contract/bindings/connectors.py
def build_query(self, effective_schema: str | None = None) -> str:
    """Build a complete SQL SELECT query.

    When ``view`` is set, delegates to ``view.build_sql()``. Otherwise
    incorporates the base table, any JoinClauses, explicit select_columns,
    date filters, and FilterExpression filters.

    Args:
        effective_schema: Schema to use if ``self.schema_name`` is None.

    Returns:
        Complete SQL query string.
    """
    schema = self.schema_name or effective_schema or "public"
    if self.view is not None:
        from graflo.filter.select import SelectSpec

        if isinstance(self.view, SelectSpec):
            return self.view.build_sql(schema=schema, base_table=self.table_name)
    base_alias = "r" if self.joins else None
    base_ref = f'"{schema}"."{self.table_name}"'
    if base_alias:
        base_ref_aliased = f"{base_ref} {base_alias}"
    else:
        base_ref_aliased = base_ref

    # --- SELECT ---
    select_parts: list[str] = []
    if self.select_columns is not None:
        select_parts = list(self.select_columns)
    elif self.joins:
        select_parts.append(f"{base_alias}.*")
        for jc in self.joins:
            alias = jc.alias or jc.table
            jc_schema = jc.schema_name or schema
            if jc.select_fields is not None:
                for col in jc.select_fields:
                    select_parts.append(f'{alias}."{col}" AS "{alias}__{col}"')
            else:
                select_parts.append(f"{alias}.*")
    else:
        select_parts.append("*")

    select_clause = ", ".join(select_parts)

    # --- FROM + JOINs ---
    from_clause = base_ref_aliased
    for jc in self.joins:
        jc_schema = jc.schema_name or schema
        alias = jc.alias or jc.table
        join_ref = f'"{jc_schema}"."{jc.table}"'
        left_col = (
            f'{base_alias}."{jc.on_self}"' if base_alias else f'"{jc.on_self}"'
        )
        right_col = f'{alias}."{jc.on_other}"'
        from_clause += (
            f" {jc.join_type} JOIN {join_ref} {alias} ON {left_col} = {right_col}"
        )

    query = f"SELECT {select_clause} FROM {from_clause}"

    # --- WHERE ---
    where = self.build_where_clause()
    if where:
        query += f" WHERE {where}"

    return query

build_where_clause()

Build SQL WHERE clause from date filtering parameters and general filters.

Returns:

Type Description
str

WHERE clause string (without the WHERE keyword) or empty string if no filters

Source code in graflo/architecture/contract/bindings/connectors.py
def build_where_clause(self) -> str:
    """Build SQL WHERE clause from date filtering parameters **and** general filters.

    Returns:
        WHERE clause string (without the WHERE keyword) or empty string if no filters
    """
    from graflo.filter.onto import FilterExpression
    from graflo.onto import ExpressionFlavor

    conditions: list[str] = []

    # Date-specific conditions (legacy fields)
    if self.date_field:
        if self.date_range_start and self.date_range_days is not None:
            conditions.append(
                f"\"{self.date_field}\" >= '{self.date_range_start}'::date"
            )
            conditions.append(
                f"\"{self.date_field}\" < '{self.date_range_start}'::date + INTERVAL '{self.date_range_days} days'"
            )
        elif self.date_filter:
            filter_parts = self.date_filter.strip().split(None, 1)
            if len(filter_parts) == 2:
                operator, value = filter_parts
                if not (value.startswith("'") and value.endswith("'")):
                    if len(value) == 10 and value.count("-") == 2:
                        value = f"'{value}'"
                conditions.append(f'"{self.date_field}" {operator} {value}')
            else:
                conditions.append(f'"{self.date_field}" {self.date_filter}')

    # General-purpose FilterExpression filters
    for filt in self.filters:
        if isinstance(filt, FilterExpression):
            rendered = filt(kind=ExpressionFlavor.SQL)
            if rendered:
                conditions.append(str(rendered))

    if conditions:
        return " AND ".join(conditions)
    return ""

get_resource_type()

Get resource type.

Source code in graflo/architecture/contract/bindings/connectors.py
def get_resource_type(self) -> ResourceType:
    """Get resource type."""
    return ResourceType.SQL_TABLE

matches(resource_identifier)

Check if connector matches a table name.

Parameters:

Name Type Description Default
resource_identifier str

Table name to match (format: schema.table or just table)

required

Returns:

Name Type Description
bool bool

True if connector matches

Source code in graflo/architecture/contract/bindings/connectors.py
def matches(self, resource_identifier: str) -> bool:
    """Check if connector matches a table name.

    Args:
        resource_identifier: Table name to match (format: schema.table or just table)

    Returns:
        bool: True if connector matches
    """
    if not self.table_name:
        return False

    # Compile regex expression
    if self.table_name.startswith("^") or self.table_name.endswith("$"):
        # Already a regex expression
        compiled_regex = re.compile(self.table_name)
    else:
        # Exact match expression
        compiled_regex = re.compile(f"^{re.escape(self.table_name)}$")

    # Check if resource_identifier matches
    if compiled_regex.match(resource_identifier):
        return True

    # If schema_name is specified, also check schema.table format
    if self.schema_name:
        full_name = f"{self.schema_name}.{resource_identifier}"
        if compiled_regex.match(full_name):
            return True

    return False