Skip to content

graflo.hq.connection_provider

Runtime connection/config resolution for source connectors.

This module defines a connector-centric runtime indirection: Bindings (contract) -> conn_proxy name -> GeneralizedConnConfig (runtime).

ApiAuth

Bases: BaseModel

Authentication payload for REST API source access.

Source code in graflo/connection_models.py
class ApiAuth(BaseModel):
    """Authentication payload for REST API source access."""

    auth_type: Literal["bearer", "basic", "digest", "api_key"] = "bearer"
    token: str | None = None
    username: str | None = None
    password: str | None = None
    header_name: str = "Authorization"
    prefix: str = "Bearer"

ApiGeneralizedConnConfig

Bases: BaseModel

Generalized runtime config variant for REST API connections.

Source code in graflo/connection_models.py
class ApiGeneralizedConnConfig(BaseModel):
    """Generalized runtime config variant for REST API connections."""

    kind: Literal["rest_api"] = "rest_api"
    config: RestApiConnConfig

ConnectionProvider

Bases: Protocol

Resolve runtime source connection/auth configuration.

New connector-centric resolution (preferred): - :meth:get_generalized_conn_config takes a connector and returns the generalized runtime config.

Legacy helpers (kept for backwards compatibility): - :meth:get_postgres_config - :meth:get_sparql_auth

Source code in graflo/hq/connection_provider.py
class ConnectionProvider(Protocol):
    """Resolve runtime source connection/auth configuration.

    New connector-centric resolution (preferred):
    - :meth:`get_generalized_conn_config` takes a connector and returns the
      generalized runtime config.

    Legacy helpers (kept for backwards compatibility):
    - :meth:`get_postgres_config`
    - :meth:`get_sparql_auth`
    """

    def get_generalized_conn_config(
        self, connector: ResourceConnector
    ) -> GeneralizedConnConfig | None:
        """Return generalized runtime config for a connector."""

    def get_postgres_config(
        self, resource_name: str, connector: TableConnector
    ) -> PostgresConfig | None:
        """Return source DB config for a SQL table resource (legacy)."""

    def get_sparql_auth(
        self, resource_name: str, connector: SparqlConnector
    ) -> SparqlAuth | None:
        """Return source auth payload for a SPARQL resource (legacy)."""

    def get_generalized_config_by_proxy(
        self, conn_proxy: str
    ) -> GeneralizedConnConfig | None:
        """Resolve a non-secret proxy name to runtime config (S3, etc.)."""

get_generalized_config_by_proxy(conn_proxy)

Resolve a non-secret proxy name to runtime config (S3, etc.).

Source code in graflo/hq/connection_provider.py
def get_generalized_config_by_proxy(
    self, conn_proxy: str
) -> GeneralizedConnConfig | None:
    """Resolve a non-secret proxy name to runtime config (S3, etc.)."""

get_generalized_conn_config(connector)

Return generalized runtime config for a connector.

Source code in graflo/hq/connection_provider.py
def get_generalized_conn_config(
    self, connector: ResourceConnector
) -> GeneralizedConnConfig | None:
    """Return generalized runtime config for a connector."""

get_postgres_config(resource_name, connector)

Return source DB config for a SQL table resource (legacy).

Source code in graflo/hq/connection_provider.py
def get_postgres_config(
    self, resource_name: str, connector: TableConnector
) -> PostgresConfig | None:
    """Return source DB config for a SQL table resource (legacy)."""

get_sparql_auth(resource_name, connector)

Return source auth payload for a SPARQL resource (legacy).

Source code in graflo/hq/connection_provider.py
def get_sparql_auth(
    self, resource_name: str, connector: SparqlConnector
) -> SparqlAuth | None:
    """Return source auth payload for a SPARQL resource (legacy)."""

EmptyConnectionProvider

No-op provider when no source credentials/config are configured.

Source code in graflo/hq/connection_provider.py
class EmptyConnectionProvider:
    """No-op provider when no source credentials/config are configured."""

    def get_generalized_conn_config(
        self, connector: ResourceConnector
    ) -> GeneralizedConnConfig | None:
        return None

    def get_postgres_config(
        self, resource_name: str, connector: TableConnector
    ) -> PostgresConfig | None:
        return None

    def get_sparql_auth(
        self, resource_name: str, connector: SparqlConnector
    ) -> SparqlAuth | None:
        return None

    def get_generalized_config_by_proxy(
        self, conn_proxy: str
    ) -> GeneralizedConnConfig | None:
        return None

InMemoryConnectionProvider

Bases: BaseModel

Simple in-memory provider for proxy-based generalized configs.

Supports two wiring modes: - New: proxy_by_connector_hash + configs_by_proxy - Legacy: per-resource maps (postgres_by_resource / sparql_by_resource)

Source code in graflo/hq/connection_provider.py
class InMemoryConnectionProvider(BaseModel):
    """Simple in-memory provider for proxy-based generalized configs.

    Supports two wiring modes:
    - New: ``proxy_by_connector_hash`` + ``configs_by_proxy``
    - Legacy: per-resource maps (``postgres_by_resource`` / ``sparql_by_resource``)
    """

    configs_by_proxy: dict[str, GeneralizedConnConfig] = Field(default_factory=dict)
    proxy_by_connector_hash: dict[str, str] = Field(default_factory=dict)

    postgres_by_resource: dict[str, PostgresConfig] = Field(default_factory=dict)
    sparql_by_resource: dict[str, SparqlEndpointConfig] = Field(default_factory=dict)
    sparql_by_endpoint: dict[str, SparqlEndpointConfig] = Field(default_factory=dict)
    default_sparql: SparqlEndpointConfig | None = None

    def register_generalized_config(
        self, *, conn_proxy: str, config: GeneralizedConnConfig
    ) -> None:
        self.configs_by_proxy[conn_proxy] = config

    def bind_connector_to_conn_proxy(
        self, *, connector: ResourceConnector, conn_proxy: str
    ) -> None:
        self.proxy_by_connector_hash[connector.hash] = conn_proxy

    def bind_from_bindings(self, *, bindings: Bindings) -> None:
        """Populate ``proxy_by_connector_hash`` from the contract bindings."""
        for connector in bindings.connectors:
            proxy = bindings.get_conn_proxy_for_connector(connector)
            if proxy is not None:
                self.proxy_by_connector_hash[connector.hash] = proxy

    def bind_single_config_for_bindings(
        self,
        *,
        bindings: Bindings,
        conn_proxy: str,
        config: GeneralizedConnConfig,
    ) -> None:
        """Bind one generalized config to all connectors in bindings.

        This is intended for the common case where a single source DB
        (or single generalized API endpoint) supplies all SQL/SPARQL connectors
        in the manifest.

        Raises:
            ValueError: if bindings use multiple different ``conn_proxy`` labels.
        """
        used_proxies: set[str] = set()
        for connector in bindings.connectors:
            proxy = bindings.get_conn_proxy_for_connector(connector)
            if proxy is not None:
                used_proxies.add(proxy)

        if not used_proxies:
            raise ValueError(
                "No connector_connection mappings found in bindings; "
                "expected connector -> conn_proxy rows."
            )

        if used_proxies != {conn_proxy}:
            used = ", ".join(sorted(used_proxies))
            raise ValueError(
                f"Expected all connector_connection mappings to use conn_proxy='{conn_proxy}', "
                f"but found proxies: {used}. For multi-proxy setups, bind explicitly "
                "with register_generalized_config(...) and bind_from_bindings(...)."
            )

        self.register_generalized_config(conn_proxy=conn_proxy, config=config)
        self.bind_from_bindings(bindings=bindings)

    def register_api_config_from_env(
        self,
        *,
        conn_proxy: str,
        env_prefix: str | None = None,
    ) -> None:
        """Register REST API runtime config for *conn_proxy* from environment variables.

        *env_prefix* defaults to a proxy-derived prefix (e.g. ``user_service`` →
        ``USER_SERVICE_``), reading ``{prefix}BASE_URL`` and optional auth vars.
        """
        prefix = (
            env_prefix if env_prefix is not None else _proxy_to_env_prefix(conn_proxy)
        )
        runtime = RestApiConnConfig.from_env(env_prefix=prefix)
        self.register_generalized_config(
            conn_proxy=conn_proxy,
            config=ApiGeneralizedConnConfig(config=runtime),
        )

    def register_all_api_configs_from_env(
        self,
        *,
        bindings: Bindings,
        env_prefix_map: dict[str, str] | None = None,
    ) -> None:
        """Register env-backed API configs for all API ``conn_proxy`` labels in *bindings*.

        Discovers unique ``conn_proxy`` values attached to :class:`APIConnector`
        instances, loads each via :meth:`register_api_config_from_env`, then binds
        all connectors from *bindings*.

        Args:
            bindings: Manifest bindings with ``connector_connection`` rows.
            env_prefix_map: Optional per-proxy env prefix overrides.
        """
        prefix_map = env_prefix_map or {}
        api_proxies: set[str] = set()
        for connector in bindings.connectors:
            if not isinstance(connector, APIConnector):
                continue
            proxy = bindings.get_conn_proxy_for_connector(connector)
            if proxy is not None:
                api_proxies.add(proxy)

        if not api_proxies:
            raise ValueError(
                "No API connector_connection mappings found in bindings; "
                "expected at least one APIConnector with a conn_proxy."
            )

        for conn_proxy in sorted(api_proxies):
            self.register_api_config_from_env(
                conn_proxy=conn_proxy,
                env_prefix=prefix_map.get(conn_proxy),
            )
        self.bind_from_bindings(bindings=bindings)

    def get_generalized_conn_config(
        self, connector: ResourceConnector
    ) -> GeneralizedConnConfig | None:
        proxy = self.proxy_by_connector_hash.get(connector.hash)
        if proxy is None:
            return None
        return self.configs_by_proxy.get(proxy)

    def get_generalized_config_by_proxy(
        self, conn_proxy: str
    ) -> GeneralizedConnConfig | None:
        return self.configs_by_proxy.get(conn_proxy)

    def register_s3_config(
        self, *, conn_proxy: str, config: S3GeneralizedConnConfig
    ) -> None:
        """Store S3 staging credentials/config under *conn_proxy*."""
        self.configs_by_proxy[conn_proxy] = config

    def get_postgres_config(
        self, resource_name: str, connector: TableConnector
    ) -> PostgresConfig | None:
        generalized = self.get_generalized_conn_config(connector)
        if isinstance(generalized, PostgresGeneralizedConnConfig):
            return generalized.config
        return self.postgres_by_resource.get(resource_name)

    def get_sparql_auth(
        self, resource_name: str, connector: SparqlConnector
    ) -> SparqlAuth | None:
        generalized = self.get_generalized_conn_config(connector)
        if isinstance(generalized, SparqlGeneralizedConnConfig):
            cfg = generalized.config
            return SparqlAuth(username=cfg.username, password=cfg.password)

        cfg = self.sparql_by_resource.get(resource_name)
        if cfg is None and connector.endpoint_url:
            cfg = self.sparql_by_endpoint.get(connector.endpoint_url)
        if cfg is None:
            cfg = self.default_sparql
        if cfg is None:
            return None
        return SparqlAuth(username=cfg.username, password=cfg.password)

bind_from_bindings(*, bindings)

Populate proxy_by_connector_hash from the contract bindings.

Source code in graflo/hq/connection_provider.py
def bind_from_bindings(self, *, bindings: Bindings) -> None:
    """Populate ``proxy_by_connector_hash`` from the contract bindings."""
    for connector in bindings.connectors:
        proxy = bindings.get_conn_proxy_for_connector(connector)
        if proxy is not None:
            self.proxy_by_connector_hash[connector.hash] = proxy

bind_single_config_for_bindings(*, bindings, conn_proxy, config)

Bind one generalized config to all connectors in bindings.

This is intended for the common case where a single source DB (or single generalized API endpoint) supplies all SQL/SPARQL connectors in the manifest.

Raises:

Type Description
ValueError

if bindings use multiple different conn_proxy labels.

Source code in graflo/hq/connection_provider.py
def bind_single_config_for_bindings(
    self,
    *,
    bindings: Bindings,
    conn_proxy: str,
    config: GeneralizedConnConfig,
) -> None:
    """Bind one generalized config to all connectors in bindings.

    This is intended for the common case where a single source DB
    (or single generalized API endpoint) supplies all SQL/SPARQL connectors
    in the manifest.

    Raises:
        ValueError: if bindings use multiple different ``conn_proxy`` labels.
    """
    used_proxies: set[str] = set()
    for connector in bindings.connectors:
        proxy = bindings.get_conn_proxy_for_connector(connector)
        if proxy is not None:
            used_proxies.add(proxy)

    if not used_proxies:
        raise ValueError(
            "No connector_connection mappings found in bindings; "
            "expected connector -> conn_proxy rows."
        )

    if used_proxies != {conn_proxy}:
        used = ", ".join(sorted(used_proxies))
        raise ValueError(
            f"Expected all connector_connection mappings to use conn_proxy='{conn_proxy}', "
            f"but found proxies: {used}. For multi-proxy setups, bind explicitly "
            "with register_generalized_config(...) and bind_from_bindings(...)."
        )

    self.register_generalized_config(conn_proxy=conn_proxy, config=config)
    self.bind_from_bindings(bindings=bindings)

register_all_api_configs_from_env(*, bindings, env_prefix_map=None)

Register env-backed API configs for all API conn_proxy labels in bindings.

Discovers unique conn_proxy values attached to :class:APIConnector instances, loads each via :meth:register_api_config_from_env, then binds all connectors from bindings.

Parameters:

Name Type Description Default
bindings Bindings

Manifest bindings with connector_connection rows.

required
env_prefix_map dict[str, str] | None

Optional per-proxy env prefix overrides.

None
Source code in graflo/hq/connection_provider.py
def register_all_api_configs_from_env(
    self,
    *,
    bindings: Bindings,
    env_prefix_map: dict[str, str] | None = None,
) -> None:
    """Register env-backed API configs for all API ``conn_proxy`` labels in *bindings*.

    Discovers unique ``conn_proxy`` values attached to :class:`APIConnector`
    instances, loads each via :meth:`register_api_config_from_env`, then binds
    all connectors from *bindings*.

    Args:
        bindings: Manifest bindings with ``connector_connection`` rows.
        env_prefix_map: Optional per-proxy env prefix overrides.
    """
    prefix_map = env_prefix_map or {}
    api_proxies: set[str] = set()
    for connector in bindings.connectors:
        if not isinstance(connector, APIConnector):
            continue
        proxy = bindings.get_conn_proxy_for_connector(connector)
        if proxy is not None:
            api_proxies.add(proxy)

    if not api_proxies:
        raise ValueError(
            "No API connector_connection mappings found in bindings; "
            "expected at least one APIConnector with a conn_proxy."
        )

    for conn_proxy in sorted(api_proxies):
        self.register_api_config_from_env(
            conn_proxy=conn_proxy,
            env_prefix=prefix_map.get(conn_proxy),
        )
    self.bind_from_bindings(bindings=bindings)

register_api_config_from_env(*, conn_proxy, env_prefix=None)

Register REST API runtime config for conn_proxy from environment variables.

env_prefix defaults to a proxy-derived prefix (e.g. user_serviceUSER_SERVICE_), reading {prefix}BASE_URL and optional auth vars.

Source code in graflo/hq/connection_provider.py
def register_api_config_from_env(
    self,
    *,
    conn_proxy: str,
    env_prefix: str | None = None,
) -> None:
    """Register REST API runtime config for *conn_proxy* from environment variables.

    *env_prefix* defaults to a proxy-derived prefix (e.g. ``user_service`` →
    ``USER_SERVICE_``), reading ``{prefix}BASE_URL`` and optional auth vars.
    """
    prefix = (
        env_prefix if env_prefix is not None else _proxy_to_env_prefix(conn_proxy)
    )
    runtime = RestApiConnConfig.from_env(env_prefix=prefix)
    self.register_generalized_config(
        conn_proxy=conn_proxy,
        config=ApiGeneralizedConnConfig(config=runtime),
    )

register_s3_config(*, conn_proxy, config)

Store S3 staging credentials/config under conn_proxy.

Source code in graflo/hq/connection_provider.py
def register_s3_config(
    self, *, conn_proxy: str, config: S3GeneralizedConnConfig
) -> None:
    """Store S3 staging credentials/config under *conn_proxy*."""
    self.configs_by_proxy[conn_proxy] = config

PostgresGeneralizedConnConfig

Bases: BaseModel

Generalized runtime config variant for SQL/Postgres connections.

Source code in graflo/connection_models.py
class PostgresGeneralizedConnConfig(BaseModel):
    """Generalized runtime config variant for SQL/Postgres connections."""

    kind: Literal["postgres"] = "postgres"
    config: PostgresConfig

RestApiConnConfig

Bases: BaseModel

Runtime REST API connection settings (base URL and credentials).

Source code in graflo/connection_models.py
class RestApiConnConfig(BaseModel):
    """Runtime REST API connection settings (base URL and credentials)."""

    base_url: str
    auth: ApiAuth | None = None
    default_headers: dict[str, str] = Field(default_factory=dict)

    @classmethod
    def from_env(cls, env_prefix: str) -> RestApiConnConfig:
        """Load REST API config from environment variables.

        Supported variables (all prefixed with *env_prefix*):

        - ``BASE_URL`` (required)
        - ``AUTH_TYPE``: ``bearer``, ``basic``, ``digest``, or ``api_key`` (default: ``bearer``)
        - ``TOKEN``, ``USERNAME``, ``PASSWORD``
        - ``HEADER_NAME``, ``PREFIX`` (bearer / api_key)
        """
        base_url = os.environ.get(f"{env_prefix}BASE_URL")
        if not base_url:
            raise ValueError(
                f"Environment variable {env_prefix}BASE_URL is required for RestApiConnConfig"
            )

        auth_type_raw = os.environ.get(f"{env_prefix}AUTH_TYPE", "bearer")
        auth_type_lower = auth_type_raw.lower()
        if auth_type_lower not in _VALID_AUTH_TYPES:
            raise ValueError(
                f"Invalid {env_prefix}AUTH_TYPE={auth_type_raw!r}; "
                "expected bearer, basic, digest, or api_key"
            )
        auth = ApiAuth(
            auth_type=cast(AuthType, auth_type_lower),
            token=os.environ.get(f"{env_prefix}TOKEN"),
            username=os.environ.get(f"{env_prefix}USERNAME"),
            password=os.environ.get(f"{env_prefix}PASSWORD"),
            header_name=cast(
                str,
                os.environ.get(f"{env_prefix}HEADER_NAME") or "Authorization",
            ),
            prefix=cast(
                str,
                os.environ.get(f"{env_prefix}PREFIX") or "Bearer",
            ),
        )

        return cls(base_url=cast(str, base_url), auth=auth)

from_env(env_prefix) classmethod

Load REST API config from environment variables.

Supported variables (all prefixed with env_prefix):

  • BASE_URL (required)
  • AUTH_TYPE: bearer, basic, digest, or api_key (default: bearer)
  • TOKEN, USERNAME, PASSWORD
  • HEADER_NAME, PREFIX (bearer / api_key)
Source code in graflo/connection_models.py
@classmethod
def from_env(cls, env_prefix: str) -> RestApiConnConfig:
    """Load REST API config from environment variables.

    Supported variables (all prefixed with *env_prefix*):

    - ``BASE_URL`` (required)
    - ``AUTH_TYPE``: ``bearer``, ``basic``, ``digest``, or ``api_key`` (default: ``bearer``)
    - ``TOKEN``, ``USERNAME``, ``PASSWORD``
    - ``HEADER_NAME``, ``PREFIX`` (bearer / api_key)
    """
    base_url = os.environ.get(f"{env_prefix}BASE_URL")
    if not base_url:
        raise ValueError(
            f"Environment variable {env_prefix}BASE_URL is required for RestApiConnConfig"
        )

    auth_type_raw = os.environ.get(f"{env_prefix}AUTH_TYPE", "bearer")
    auth_type_lower = auth_type_raw.lower()
    if auth_type_lower not in _VALID_AUTH_TYPES:
        raise ValueError(
            f"Invalid {env_prefix}AUTH_TYPE={auth_type_raw!r}; "
            "expected bearer, basic, digest, or api_key"
        )
    auth = ApiAuth(
        auth_type=cast(AuthType, auth_type_lower),
        token=os.environ.get(f"{env_prefix}TOKEN"),
        username=os.environ.get(f"{env_prefix}USERNAME"),
        password=os.environ.get(f"{env_prefix}PASSWORD"),
        header_name=cast(
            str,
            os.environ.get(f"{env_prefix}HEADER_NAME") or "Authorization",
        ),
        prefix=cast(
            str,
            os.environ.get(f"{env_prefix}PREFIX") or "Bearer",
        ),
    )

    return cls(base_url=cast(str, base_url), auth=auth)

S3GeneralizedConnConfig

Bases: BaseModel

Runtime credentials and defaults for S3 staging (TigerGraph bulk ingest).

Source code in graflo/connection_models.py
class S3GeneralizedConnConfig(BaseModel):
    """Runtime credentials and defaults for S3 staging (TigerGraph bulk ingest)."""

    kind: Literal["s3"] = "s3"
    bucket: str | None = Field(
        default=None,
        description="Default bucket when TigergraphBulkLoadConfig.s3_bucket is unset.",
    )
    region: str | None = Field(default=None)
    aws_access_key_id: str | None = Field(default=None)
    aws_secret_access_key: str | None = Field(default=None)
    endpoint_url: str | None = Field(
        default=None, description="For S3-compatible endpoints (MinIO, etc.)."
    )
    loader_endpoint_url: str | None = Field(
        default=None,
        description=(
            "S3 endpoint URL as seen by TigerGraph when it runs in another network "
            "namespace (e.g. Docker). Used only in CREATE DATA_SOURCE for LOADING JOB; "
            "boto3 continues to use endpoint_url."
        ),
    )

SparqlAuth

Bases: BaseModel

Authentication payload for SPARQL endpoint access.

Source code in graflo/connection_models.py
class SparqlAuth(BaseModel):
    """Authentication payload for SPARQL endpoint access."""

    username: str | None = None
    password: str | None = None

SparqlGeneralizedConnConfig

Bases: BaseModel

Generalized runtime config variant for SPARQL endpoint connections.

Source code in graflo/connection_models.py
class SparqlGeneralizedConnConfig(BaseModel):
    """Generalized runtime config variant for SPARQL endpoint connections."""

    kind: Literal["sparql"] = "sparql"
    config: SparqlEndpointConfig