Skip to content

graflo.db.connection

DBConfig

Bases: BaseSettings, ABC

Abstract base class for all database connection configurations using Pydantic BaseSettings.

Source code in graflo/db/connection/onto.py
class DBConfig(BaseSettings, abc.ABC):
    """Abstract base class for all database connection configurations using Pydantic BaseSettings."""

    uri: str | None = Field(default=None, description="Backend URI")
    username: str | None = Field(default=None, description="Authentication username")
    password: str | None = Field(default=None, description="Authentication Password")
    database: str | None = Field(
        default=None,
        description="Database name (backward compatibility, DB-specific mapping)",
    )
    schema_name: str | None = Field(
        default=None,
        validation_alias=AliasChoices("schema", "schema_name"),
        description="Schema/graph name (unified internal structure)",
    )
    request_timeout: float = Field(
        default=60.0, description="Request timeout in seconds"
    )

    @abc.abstractmethod
    def _get_default_port(self) -> int:
        """Get the default port for this db type."""
        pass

    @abc.abstractmethod
    def _get_effective_database(self) -> str | None:
        """Get the effective database name based on DB type.

        For SQL databases: returns the database name
        For graph databases: returns None (they don't have a database level)

        Returns:
            Database name or None
        """
        pass

    @abc.abstractmethod
    def _get_effective_schema(self) -> str | None:
        """Get the effective schema/graph name based on DB type.

        For SQL databases: returns the schema name
        For graph databases: returns the graph/database name (mapped from user-facing field)

        Returns:
            Schema/graph name or None
        """
        pass

    @property
    def effective_database(self) -> str | None:
        """Get the effective database name (delegates to concrete class)."""
        return self._get_effective_database()

    @property
    def effective_schema(self) -> str | None:
        """Get the effective schema/graph name (delegates to concrete class)."""
        return self._get_effective_schema()

    @model_validator(mode="after")
    def _add_default_port_to_uri(self):
        """Add default port to URI if missing."""
        if self.uri is None:
            return self

        parsed = urlparse(self.uri)
        if parsed.port is not None:
            return self

        # Add default port
        default_port = self._get_default_port()
        if parsed.scheme and parsed.hostname:
            # Reconstruct URI with port
            port_part = f":{default_port}" if default_port else ""
            path_part = parsed.path or ""
            query_part = f"?{parsed.query}" if parsed.query else ""
            fragment_part = f"#{parsed.fragment}" if parsed.fragment else ""
            self.uri = f"{parsed.scheme}://{parsed.hostname}{port_part}{path_part}{query_part}{fragment_part}"

        return self

    @property
    def url(self) -> str | None:
        """Backward compatibility property: alias for uri."""
        return self.uri

    @property
    def url_without_port(self) -> str:
        """Get URL without port."""
        if self.uri is None:
            raise ValueError("URI is not set")
        parsed = urlparse(self.uri)
        return f"{parsed.scheme}://{parsed.hostname}"

    @property
    def port(self) -> str | None:
        """Get port from URI."""
        if self.uri is None:
            return None
        parsed = urlparse(self.uri)
        return str(parsed.port) if parsed.port else None

    @property
    def protocol(self) -> str:
        """Get protocol/scheme from URI."""
        if self.uri is None:
            return "http"
        parsed = urlparse(self.uri)
        return parsed.scheme or "http"

    @property
    def hostname(self) -> str | None:
        """Get hostname from URI."""
        if self.uri is None:
            return None
        parsed = urlparse(self.uri)
        return parsed.hostname

    @property
    def connection_type(self) -> "DBType":
        """Get database type from class."""
        # Map class to DBType - need to import here to avoid circular import
        from .config_mapping import DB_TYPE_MAPPING

        # Reverse lookup: find DBType for this class
        for db_type, config_class in DB_TYPE_MAPPING.items():
            if type(self) is config_class:
                return db_type

        # Fallback (shouldn't happen)
        return DBType.ARANGO

    def can_be_source(self) -> bool:
        """Check if this database type can be used as a source."""
        return self.connection_type in SOURCE_DATABASES

    def can_be_target(self) -> bool:
        """Check if this database type can be used as a target."""
        return self.connection_type in TARGET_DATABASES

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "DBConfig":
        """Create a connection config from a dictionary."""
        if not isinstance(data, dict):
            raise TypeError(f"Expected dict, got {type(data)}")

        # Copy the data to avoid modifying the original
        config_data = data.copy()

        db_type = config_data.pop("db_type", None) or config_data.pop(
            "connection_type", None
        )
        if not db_type:
            raise ValueError("Missing 'db_type' or 'connection_type' in configuration")

        try:
            conn_type = DBType(db_type)
        except ValueError:
            raise ValueError(
                f"Database type '{db_type}' not supported. "
                f"Should be one of: {list(DBType)}"
            )

        # Map old 'url' field to 'uri' for backward compatibility
        if "url" in config_data and "uri" not in config_data:
            config_data["uri"] = config_data.pop("url")

        # Map old credential fields
        if "cred_name" in config_data and "username" not in config_data:
            config_data["username"] = config_data.pop("cred_name")
        if "cred_pass" in config_data and "password" not in config_data:
            config_data["password"] = config_data.pop("cred_pass")

        # Construct URI from protocol/hostname/port if uri is not provided
        if "uri" not in config_data:
            protocol = config_data.pop("protocol", "http")
            hostname = config_data.pop("hostname", None)
            port = config_data.pop("port", None)
            hosts = config_data.pop("hosts", None)

            if hosts:
                # Use hosts as URI
                config_data["uri"] = hosts
            elif hostname:
                # Construct URI from components
                if port:
                    config_data["uri"] = f"{protocol}://{hostname}:{port}"
                else:
                    config_data["uri"] = f"{protocol}://{hostname}"

        # Get the appropriate config class and initialize it
        config_class = conn_type.config_class
        return config_class(**config_data)

    @classmethod
    def from_docker_env(cls, docker_dir: str | Path | None = None) -> "DBConfig":
        """Load config from docker .env file.

        Args:
            docker_dir: Path to docker directory. If None, uses default based on db type.

        Returns:
            DBConfig instance loaded from .env file
        """
        raise NotImplementedError("Subclasses must implement from_docker_env")

    @classmethod
    def from_env(cls: Type[T], prefix: str | None = None) -> T:
        """Load config from environment variables using Pydantic BaseSettings.

        Supports custom prefixes for multiple configs:
        - Default (prefix=None): Uses {BASE_PREFIX}URI, {BASE_PREFIX}USERNAME, etc.
        - With prefix (prefix="USER"): Uses USER_{BASE_PREFIX}URI, USER_{BASE_PREFIX}USERNAME, etc.

        Args:
            prefix: Optional prefix for environment variables (e.g., "USER", "LAKE", "KG").
                   If None, uses default {BASE_PREFIX}* variables.

        Returns:
            DBConfig instance loaded from environment variables using Pydantic BaseSettings

        Examples:
            # Load default config (ARANGO_URI, ARANGO_USERNAME, etc.)
            config = ArangoConfig.from_env()

            # Load config with prefix (USER_ARANGO_URI, USER_ARANGO_USERNAME, etc.)
            user_config = ArangoConfig.from_env(prefix="USER")
        """
        if prefix:
            # Get the base prefix from the class's model_config
            base_prefix = cls.model_config.get("env_prefix")
            if not base_prefix:
                raise ValueError(
                    f"Class {cls.__name__} does not have env_prefix configured in model_config"
                )
            # Create a new model class with modified env_prefix
            new_prefix = f"{prefix.upper()}_{base_prefix}"
            case_sensitive = cls.model_config.get("case_sensitive", False)
            model_config = SettingsConfigDict(
                env_prefix=new_prefix,
                case_sensitive=case_sensitive,
            )
            # Create a new class dynamically with the modified prefix
            temp_class = type(
                f"{cls.__name__}WithPrefix", (cls,), {"model_config": model_config}
            )
            return temp_class()
        else:
            # Use default prefix - Pydantic will read from environment automatically
            return cls()

connection_type property

Get database type from class.

effective_database property

Get the effective database name (delegates to concrete class).

effective_schema property

Get the effective schema/graph name (delegates to concrete class).

hostname property

Get hostname from URI.

port property

Get port from URI.

protocol property

Get protocol/scheme from URI.

url property

Backward compatibility property: alias for uri.

url_without_port property

Get URL without port.

can_be_source()

Check if this database type can be used as a source.

Source code in graflo/db/connection/onto.py
def can_be_source(self) -> bool:
    """Check if this database type can be used as a source."""
    return self.connection_type in SOURCE_DATABASES

can_be_target()

Check if this database type can be used as a target.

Source code in graflo/db/connection/onto.py
def can_be_target(self) -> bool:
    """Check if this database type can be used as a target."""
    return self.connection_type in TARGET_DATABASES

from_dict(data) classmethod

Create a connection config from a dictionary.

Source code in graflo/db/connection/onto.py
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "DBConfig":
    """Create a connection config from a dictionary."""
    if not isinstance(data, dict):
        raise TypeError(f"Expected dict, got {type(data)}")

    # Copy the data to avoid modifying the original
    config_data = data.copy()

    db_type = config_data.pop("db_type", None) or config_data.pop(
        "connection_type", None
    )
    if not db_type:
        raise ValueError("Missing 'db_type' or 'connection_type' in configuration")

    try:
        conn_type = DBType(db_type)
    except ValueError:
        raise ValueError(
            f"Database type '{db_type}' not supported. "
            f"Should be one of: {list(DBType)}"
        )

    # Map old 'url' field to 'uri' for backward compatibility
    if "url" in config_data and "uri" not in config_data:
        config_data["uri"] = config_data.pop("url")

    # Map old credential fields
    if "cred_name" in config_data and "username" not in config_data:
        config_data["username"] = config_data.pop("cred_name")
    if "cred_pass" in config_data and "password" not in config_data:
        config_data["password"] = config_data.pop("cred_pass")

    # Construct URI from protocol/hostname/port if uri is not provided
    if "uri" not in config_data:
        protocol = config_data.pop("protocol", "http")
        hostname = config_data.pop("hostname", None)
        port = config_data.pop("port", None)
        hosts = config_data.pop("hosts", None)

        if hosts:
            # Use hosts as URI
            config_data["uri"] = hosts
        elif hostname:
            # Construct URI from components
            if port:
                config_data["uri"] = f"{protocol}://{hostname}:{port}"
            else:
                config_data["uri"] = f"{protocol}://{hostname}"

    # Get the appropriate config class and initialize it
    config_class = conn_type.config_class
    return config_class(**config_data)

from_docker_env(docker_dir=None) classmethod

Load config from docker .env file.

Parameters:

Name Type Description Default
docker_dir str | Path | None

Path to docker directory. If None, uses default based on db type.

None

Returns:

Type Description
DBConfig

DBConfig instance loaded from .env file

Source code in graflo/db/connection/onto.py
@classmethod
def from_docker_env(cls, docker_dir: str | Path | None = None) -> "DBConfig":
    """Load config from docker .env file.

    Args:
        docker_dir: Path to docker directory. If None, uses default based on db type.

    Returns:
        DBConfig instance loaded from .env file
    """
    raise NotImplementedError("Subclasses must implement from_docker_env")

from_env(prefix=None) classmethod

Load config from environment variables using Pydantic BaseSettings.

Supports custom prefixes for multiple configs: - Default (prefix=None): Uses {BASE_PREFIX}URI, {BASE_PREFIX}USERNAME, etc. - With prefix (prefix="USER"): Uses USER_{BASE_PREFIX}URI, USER_{BASE_PREFIX}USERNAME, etc.

Parameters:

Name Type Description Default
prefix str | None

Optional prefix for environment variables (e.g., "USER", "LAKE", "KG"). If None, uses default {BASE_PREFIX}* variables.

None

Returns:

Type Description
T

DBConfig instance loaded from environment variables using Pydantic BaseSettings

Examples:

Load default config (ARANGO_URI, ARANGO_USERNAME, etc.)

config = ArangoConfig.from_env()

Load config with prefix (USER_ARANGO_URI, USER_ARANGO_USERNAME, etc.)

user_config = ArangoConfig.from_env(prefix="USER")

Source code in graflo/db/connection/onto.py
@classmethod
def from_env(cls: Type[T], prefix: str | None = None) -> T:
    """Load config from environment variables using Pydantic BaseSettings.

    Supports custom prefixes for multiple configs:
    - Default (prefix=None): Uses {BASE_PREFIX}URI, {BASE_PREFIX}USERNAME, etc.
    - With prefix (prefix="USER"): Uses USER_{BASE_PREFIX}URI, USER_{BASE_PREFIX}USERNAME, etc.

    Args:
        prefix: Optional prefix for environment variables (e.g., "USER", "LAKE", "KG").
               If None, uses default {BASE_PREFIX}* variables.

    Returns:
        DBConfig instance loaded from environment variables using Pydantic BaseSettings

    Examples:
        # Load default config (ARANGO_URI, ARANGO_USERNAME, etc.)
        config = ArangoConfig.from_env()

        # Load config with prefix (USER_ARANGO_URI, USER_ARANGO_USERNAME, etc.)
        user_config = ArangoConfig.from_env(prefix="USER")
    """
    if prefix:
        # Get the base prefix from the class's model_config
        base_prefix = cls.model_config.get("env_prefix")
        if not base_prefix:
            raise ValueError(
                f"Class {cls.__name__} does not have env_prefix configured in model_config"
            )
        # Create a new model class with modified env_prefix
        new_prefix = f"{prefix.upper()}_{base_prefix}"
        case_sensitive = cls.model_config.get("case_sensitive", False)
        model_config = SettingsConfigDict(
            env_prefix=new_prefix,
            case_sensitive=case_sensitive,
        )
        # Create a new class dynamically with the modified prefix
        temp_class = type(
            f"{cls.__name__}WithPrefix", (cls,), {"model_config": model_config}
        )
        return temp_class()
    else:
        # Use default prefix - Pydantic will read from environment automatically
        return cls()

DBType

Bases: StrEnum

Enum representing different types of databases.

Includes both graph databases and source databases (SQL, NoSQL, etc.).

Source code in graflo/db/connection/onto.py
class DBType(StrEnum, metaclass=EnumMetaWithContains):
    """Enum representing different types of databases.

    Includes both graph databases and source databases (SQL, NoSQL, etc.).
    """

    # Graph databases
    ARANGO = "arango"
    NEO4J = "neo4j"
    TIGERGRAPH = "tigergraph"
    FALKORDB = "falkordb"

    # Source databases (SQL, NoSQL)
    POSTGRES = "postgres"
    MYSQL = "mysql"
    MONGODB = "mongodb"
    SQLITE = "sqlite"

    @property
    def config_class(self) -> Type["DBConfig"]:
        """Get the appropriate config class for this database type."""
        from .config_mapping import DB_TYPE_MAPPING

        return DB_TYPE_MAPPING[self]

config_class property

Get the appropriate config class for this database type.