Skip to content

graflo.object_storage

S3-compatible object storage helpers (boto3): config, bucket ensure, bulk upload.

This package is separate from graph :class:~graflo.db.ConnectionManager targets. Use :class:~graflo.object_storage.config.MinioConfig with :meth:~graflo.object_storage.config.MinioConfig.from_docker_env alongside docker/minio/.env, same idea as Neo4jConfig.from_docker_env.

S3EndpointConfig = MinioConfig module-attribute

MinioConfig

Bases: BaseModel

Credentials and endpoint for MinIO or any S3-compatible API (boto3).

Source code in graflo/object_storage/config.py
class MinioConfig(BaseModel):
    """Credentials and endpoint for MinIO or any S3-compatible API (boto3)."""

    model_config = ConfigDict(extra="forbid")

    endpoint_url: str = Field(
        description="Base URL for the S3 API, e.g. http://127.0.0.1:9000",
    )
    access_key: str = Field(
        description="S3 access key id (MinIO root user or IAM key)."
    )
    secret_key: str = Field(description="S3 secret access key.")
    bucket: str = Field(
        default="graflo-staging",
        description="Default bucket for staged bulk CSV objects.",
    )
    region: str = Field(
        default="us-east-1",
        description="AWS region string passed to boto3 (use us-east-1 for MinIO).",
    )
    loader_endpoint_url: str | None = Field(
        default=None,
        description=(
            "Optional S3 API URL for TigerGraph LOADING JOB (CREATE DATA_SOURCE). "
            "Set when TigerGraph runs in Docker and MinIO is on the host "
            "(e.g. http://172.17.0.1:9003). Boto3 uses endpoint_url above."
        ),
    )

    def to_s3_generalized_conn_config(self) -> "S3GeneralizedConnConfig":
        """Map to runtime provider config for :class:`~graflo.hq.connection_provider.S3GeneralizedConnConfig`."""
        from graflo.hq.connection_provider import S3GeneralizedConnConfig

        return S3GeneralizedConnConfig(
            bucket=self.bucket,
            region=self.region,
            endpoint_url=self.endpoint_url,
            aws_access_key_id=self.access_key,
            aws_secret_access_key=self.secret_key,
            loader_endpoint_url=self.loader_endpoint_url,
        )

    @classmethod
    def from_docker_env(cls, docker_dir: str | Path | None = None) -> MinioConfig:
        """Load from ``docker/minio/.env`` (same layout as other ``docker/*`` stacks)."""
        if docker_dir is None:
            docker_dir = (
                Path(__file__).resolve().parent.parent.parent / "docker" / "minio"
            )
        else:
            docker_dir = Path(docker_dir)

        env_file = docker_dir / ".env"
        if not env_file.exists():
            raise FileNotFoundError(f"Environment file not found: {env_file}")

        env_vars = parse_dotenv_file(env_file)
        data: dict[str, Any] = {}

        if raw_endpoint := env_vars.get("MINIO_ENDPOINT"):
            data["endpoint_url"] = raw_endpoint
        else:
            host = env_vars.get("MINIO_HOSTNAME", "127.0.0.1")
            port = env_vars.get("MINIO_API_PORT", "9000")
            protocol = env_vars.get("MINIO_PROTOCOL", "http")
            data["endpoint_url"] = f"{protocol}://{host}:{port}"

        access = env_vars.get("MINIO_ACCESS_KEY") or env_vars.get("MINIO_ROOT_USER")
        secret = env_vars.get("MINIO_SECRET_KEY") or env_vars.get("MINIO_ROOT_PASSWORD")
        if not access or not secret:
            raise ValueError(
                "MinIO docker .env must set MINIO_ROOT_USER/MINIO_ROOT_PASSWORD "
                "or MINIO_ACCESS_KEY/MINIO_SECRET_KEY"
            )
        data["access_key"] = access
        data["secret_key"] = secret

        if bucket := env_vars.get("MINIO_STAGING_BUCKET") or env_vars.get(
            "BULK_S3_BUCKET"
        ):
            data["bucket"] = bucket
        if region := env_vars.get("AWS_REGION"):
            data["region"] = region

        if loader_ep := env_vars.get("MINIO_LOADER_ENDPOINT") or env_vars.get(
            "MINIO_TIGERGRAPH_ENDPOINT"
        ):
            data["loader_endpoint_url"] = loader_ep.strip().strip('"').strip("'")

        return cls(**data)

from_docker_env(docker_dir=None) classmethod

Load from docker/minio/.env (same layout as other docker/* stacks).

Source code in graflo/object_storage/config.py
@classmethod
def from_docker_env(cls, docker_dir: str | Path | None = None) -> MinioConfig:
    """Load from ``docker/minio/.env`` (same layout as other ``docker/*`` stacks)."""
    if docker_dir is None:
        docker_dir = (
            Path(__file__).resolve().parent.parent.parent / "docker" / "minio"
        )
    else:
        docker_dir = Path(docker_dir)

    env_file = docker_dir / ".env"
    if not env_file.exists():
        raise FileNotFoundError(f"Environment file not found: {env_file}")

    env_vars = parse_dotenv_file(env_file)
    data: dict[str, Any] = {}

    if raw_endpoint := env_vars.get("MINIO_ENDPOINT"):
        data["endpoint_url"] = raw_endpoint
    else:
        host = env_vars.get("MINIO_HOSTNAME", "127.0.0.1")
        port = env_vars.get("MINIO_API_PORT", "9000")
        protocol = env_vars.get("MINIO_PROTOCOL", "http")
        data["endpoint_url"] = f"{protocol}://{host}:{port}"

    access = env_vars.get("MINIO_ACCESS_KEY") or env_vars.get("MINIO_ROOT_USER")
    secret = env_vars.get("MINIO_SECRET_KEY") or env_vars.get("MINIO_ROOT_PASSWORD")
    if not access or not secret:
        raise ValueError(
            "MinIO docker .env must set MINIO_ROOT_USER/MINIO_ROOT_PASSWORD "
            "or MINIO_ACCESS_KEY/MINIO_SECRET_KEY"
        )
    data["access_key"] = access
    data["secret_key"] = secret

    if bucket := env_vars.get("MINIO_STAGING_BUCKET") or env_vars.get(
        "BULK_S3_BUCKET"
    ):
        data["bucket"] = bucket
    if region := env_vars.get("AWS_REGION"):
        data["region"] = region

    if loader_ep := env_vars.get("MINIO_LOADER_ENDPOINT") or env_vars.get(
        "MINIO_TIGERGRAPH_ENDPOINT"
    ):
        data["loader_endpoint_url"] = loader_ep.strip().strip('"').strip("'")

    return cls(**data)

to_s3_generalized_conn_config()

Map to runtime provider config for :class:~graflo.hq.connection_provider.S3GeneralizedConnConfig.

Source code in graflo/object_storage/config.py
def to_s3_generalized_conn_config(self) -> "S3GeneralizedConnConfig":
    """Map to runtime provider config for :class:`~graflo.hq.connection_provider.S3GeneralizedConnConfig`."""
    from graflo.hq.connection_provider import S3GeneralizedConnConfig

    return S3GeneralizedConnConfig(
        bucket=self.bucket,
        region=self.region,
        endpoint_url=self.endpoint_url,
        aws_access_key_id=self.access_key,
        aws_secret_access_key=self.secret_key,
        loader_endpoint_url=self.loader_endpoint_url,
    )

boto3_s3_client_from_generalized(cfg)

Build a boto3 S3 client from :class:~graflo.hq.connection_provider.S3GeneralizedConnConfig.

Source code in graflo/object_storage/s3_client.py
def boto3_s3_client_from_generalized(cfg: "S3GeneralizedConnConfig") -> Any:
    """Build a boto3 S3 client from :class:`~graflo.hq.connection_provider.S3GeneralizedConnConfig`."""
    import boto3

    return boto3.client(
        "s3",
        region_name=cfg.region or None,
        endpoint_url=cfg.endpoint_url or None,
        aws_access_key_id=cfg.aws_access_key_id or None,
        aws_secret_access_key=cfg.aws_secret_access_key or None,
    )

boto3_s3_client_from_minio(cfg)

Build a boto3 S3 client from :class:~graflo.object_storage.config.MinioConfig.

Source code in graflo/object_storage/s3_client.py
def boto3_s3_client_from_minio(cfg: "MinioConfig") -> Any:
    """Build a boto3 S3 client from :class:`~graflo.object_storage.config.MinioConfig`."""
    import boto3

    return boto3.client(
        "s3",
        region_name=cfg.region or None,
        endpoint_url=cfg.endpoint_url or None,
        aws_access_key_id=cfg.access_key,
        aws_secret_access_key=cfg.secret_key,
    )

ensure_bucket_exists(client, bucket)

Create bucket if it does not exist. Return bucket.

Safe to call repeatedly. Raises :exc:botocore.exceptions.ClientError for non-recoverable API errors.

Source code in graflo/object_storage/bucket.py
def ensure_bucket_exists(client: Any, bucket: str) -> str:
    """Create *bucket* if it does not exist. Return *bucket*.

    Safe to call repeatedly. Raises :exc:`botocore.exceptions.ClientError`
    for non-recoverable API errors.
    """
    try:
        client.head_bucket(Bucket=bucket)
        logger.info("S3 bucket %r already exists", bucket)
        return bucket
    except ClientError as e:
        code = str(e.response.get("Error", {}).get("Code", ""))
        http = e.response.get("ResponseMetadata", {}).get("HTTPStatusCode")
        missing = code in ("404", "NoSuchBucket", "NotFound") or http == 404
        if not missing:
            raise

    try:
        client.create_bucket(Bucket=bucket)
        logger.info("Created S3 bucket %r", bucket)
    except ClientError as e:
        code = str(e.response.get("Error", {}).get("Code", ""))
        if code in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"):
            logger.info(
                "S3 bucket %r already exists (race or concurrent create)", bucket
            )
        else:
            raise

    return bucket

ensure_staging_bucket_for_config(cfg=None)

Ensure :attr:~graflo.object_storage.config.MinioConfig.bucket exists.

If cfg is None, loads :meth:~graflo.object_storage.config.MinioConfig.from_docker_env.

Source code in graflo/object_storage/bucket.py
def ensure_staging_bucket_for_config(cfg: "MinioConfig | None" = None) -> str:
    """Ensure :attr:`~graflo.object_storage.config.MinioConfig.bucket` exists.

    If *cfg* is ``None``, loads :meth:`~graflo.object_storage.config.MinioConfig.from_docker_env`.
    """
    from graflo.object_storage.config import MinioConfig
    from graflo.object_storage.s3_client import boto3_s3_client_from_minio

    resolved = cfg or MinioConfig.from_docker_env()
    client = boto3_s3_client_from_minio(resolved)
    try:
        return ensure_bucket_exists(client, resolved.bucket)
    except EndpointConnectionError as e:
        raise RuntimeError(
            f"Cannot connect to {resolved.endpoint_url!r}. "
            "Start MinIO (e.g. `docker compose --env-file .env --profile graflo.minio up -d` "
            "in `docker/minio`) and ensure `MINIO_API_PORT` / `MINIO_ENDPOINT` matches the "
            "published S3 API port (default host 9000). The web console (e.g. :9001/endpoints) "
            "is not the S3 endpoint. If `docker compose` reports 'port is already allocated', "
            "set `MINIO_CONSOLE_PORT` and/or `MINIO_API_PORT` to free ports, remove the stuck "
            "container (`docker rm -f graflo.minio`), and bring the stack up again."
        ) from e

parse_dotenv_file(env_file)

Parse a simple KEY=value .env file (no shell expansion).

Source code in graflo/object_storage/config.py
def parse_dotenv_file(env_file: Path) -> dict[str, str]:
    """Parse a simple ``KEY=value`` .env file (no shell expansion)."""
    env_vars: dict[str, str] = {}
    with open(env_file, encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if line and not line.startswith("#") and "=" in line:
                key, value = line.split("=", 1)
                env_vars[key.strip()] = value.strip().strip('"').strip("'")
    return env_vars

upload_staged_csvs(*, staged_files, bucket, key_prefix, session_id, s3_cfg)

Upload files and return manifest key -> s3://bucket/key for GSQL.

Source code in graflo/object_storage/upload.py
def upload_staged_csvs(
    *,
    staged_files: dict[str, Path],
    bucket: str,
    key_prefix: str,
    session_id: str,
    s3_cfg: "S3GeneralizedConnConfig",
) -> dict[str, str]:
    """Upload files and return manifest key -> ``s3://bucket/key`` for GSQL."""
    client = boto3_s3_client_from_generalized(s3_cfg)
    base = "/".join(p.strip("/") for p in (key_prefix, session_id) if p)
    urls: dict[str, str] = {}
    for label, path in staged_files.items():
        safe_name = path.name
        s3_key = f"{base}/{safe_name}" if base else safe_name
        client.upload_file(str(path), bucket, s3_key)
        urls[label] = f"s3://{bucket}/{s3_key}"
    return urls