S3-compatible object storage helpers (boto3): config, bucket ensure, bulk upload.
This package is separate from graph :class:~graflo.db.ConnectionManager targets.
Use :class:~graflo.object_storage.config.MinioConfig with
:meth:~graflo.object_storage.config.MinioConfig.from_docker_env alongside
docker/minio/.env, same idea as Neo4jConfig.from_docker_env.
S3EndpointConfig = MinioConfig
module-attribute
MinioConfig
Bases: BaseModel
Credentials and endpoint for MinIO or any S3-compatible API (boto3).
Source code in graflo/object_storage/config.py
| class MinioConfig(BaseModel):
"""Credentials and endpoint for MinIO or any S3-compatible API (boto3)."""
model_config = ConfigDict(extra="forbid")
endpoint_url: str = Field(
description="Base URL for the S3 API, e.g. http://127.0.0.1:9000",
)
access_key: str = Field(
description="S3 access key id (MinIO root user or IAM key)."
)
secret_key: str = Field(description="S3 secret access key.")
bucket: str = Field(
default="graflo-staging",
description="Default bucket for staged bulk CSV objects.",
)
region: str = Field(
default="us-east-1",
description="AWS region string passed to boto3 (use us-east-1 for MinIO).",
)
loader_endpoint_url: str | None = Field(
default=None,
description=(
"Optional S3 API URL for TigerGraph LOADING JOB (CREATE DATA_SOURCE). "
"Set when TigerGraph runs in Docker and MinIO is on the host "
"(e.g. http://172.17.0.1:9003). Boto3 uses endpoint_url above."
),
)
def to_s3_generalized_conn_config(self) -> "S3GeneralizedConnConfig":
"""Map to runtime provider config for :class:`~graflo.hq.connection_provider.S3GeneralizedConnConfig`."""
from graflo.hq.connection_provider import S3GeneralizedConnConfig
return S3GeneralizedConnConfig(
bucket=self.bucket,
region=self.region,
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
loader_endpoint_url=self.loader_endpoint_url,
)
@classmethod
def from_docker_env(cls, docker_dir: str | Path | None = None) -> MinioConfig:
"""Load from ``docker/minio/.env`` (same layout as other ``docker/*`` stacks)."""
if docker_dir is None:
docker_dir = (
Path(__file__).resolve().parent.parent.parent / "docker" / "minio"
)
else:
docker_dir = Path(docker_dir)
env_file = docker_dir / ".env"
if not env_file.exists():
raise FileNotFoundError(f"Environment file not found: {env_file}")
env_vars = parse_dotenv_file(env_file)
data: dict[str, Any] = {}
if raw_endpoint := env_vars.get("MINIO_ENDPOINT"):
data["endpoint_url"] = raw_endpoint
else:
host = env_vars.get("MINIO_HOSTNAME", "127.0.0.1")
port = env_vars.get("MINIO_API_PORT", "9000")
protocol = env_vars.get("MINIO_PROTOCOL", "http")
data["endpoint_url"] = f"{protocol}://{host}:{port}"
access = env_vars.get("MINIO_ACCESS_KEY") or env_vars.get("MINIO_ROOT_USER")
secret = env_vars.get("MINIO_SECRET_KEY") or env_vars.get("MINIO_ROOT_PASSWORD")
if not access or not secret:
raise ValueError(
"MinIO docker .env must set MINIO_ROOT_USER/MINIO_ROOT_PASSWORD "
"or MINIO_ACCESS_KEY/MINIO_SECRET_KEY"
)
data["access_key"] = access
data["secret_key"] = secret
if bucket := env_vars.get("MINIO_STAGING_BUCKET") or env_vars.get(
"BULK_S3_BUCKET"
):
data["bucket"] = bucket
if region := env_vars.get("AWS_REGION"):
data["region"] = region
if loader_ep := env_vars.get("MINIO_LOADER_ENDPOINT") or env_vars.get(
"MINIO_TIGERGRAPH_ENDPOINT"
):
data["loader_endpoint_url"] = loader_ep.strip().strip('"').strip("'")
return cls(**data)
|
from_docker_env(docker_dir=None)
classmethod
Load from docker/minio/.env (same layout as other docker/* stacks).
Source code in graflo/object_storage/config.py
| @classmethod
def from_docker_env(cls, docker_dir: str | Path | None = None) -> MinioConfig:
"""Load from ``docker/minio/.env`` (same layout as other ``docker/*`` stacks)."""
if docker_dir is None:
docker_dir = (
Path(__file__).resolve().parent.parent.parent / "docker" / "minio"
)
else:
docker_dir = Path(docker_dir)
env_file = docker_dir / ".env"
if not env_file.exists():
raise FileNotFoundError(f"Environment file not found: {env_file}")
env_vars = parse_dotenv_file(env_file)
data: dict[str, Any] = {}
if raw_endpoint := env_vars.get("MINIO_ENDPOINT"):
data["endpoint_url"] = raw_endpoint
else:
host = env_vars.get("MINIO_HOSTNAME", "127.0.0.1")
port = env_vars.get("MINIO_API_PORT", "9000")
protocol = env_vars.get("MINIO_PROTOCOL", "http")
data["endpoint_url"] = f"{protocol}://{host}:{port}"
access = env_vars.get("MINIO_ACCESS_KEY") or env_vars.get("MINIO_ROOT_USER")
secret = env_vars.get("MINIO_SECRET_KEY") or env_vars.get("MINIO_ROOT_PASSWORD")
if not access or not secret:
raise ValueError(
"MinIO docker .env must set MINIO_ROOT_USER/MINIO_ROOT_PASSWORD "
"or MINIO_ACCESS_KEY/MINIO_SECRET_KEY"
)
data["access_key"] = access
data["secret_key"] = secret
if bucket := env_vars.get("MINIO_STAGING_BUCKET") or env_vars.get(
"BULK_S3_BUCKET"
):
data["bucket"] = bucket
if region := env_vars.get("AWS_REGION"):
data["region"] = region
if loader_ep := env_vars.get("MINIO_LOADER_ENDPOINT") or env_vars.get(
"MINIO_TIGERGRAPH_ENDPOINT"
):
data["loader_endpoint_url"] = loader_ep.strip().strip('"').strip("'")
return cls(**data)
|
to_s3_generalized_conn_config()
Map to runtime provider config for :class:~graflo.hq.connection_provider.S3GeneralizedConnConfig.
Source code in graflo/object_storage/config.py
| def to_s3_generalized_conn_config(self) -> "S3GeneralizedConnConfig":
"""Map to runtime provider config for :class:`~graflo.hq.connection_provider.S3GeneralizedConnConfig`."""
from graflo.hq.connection_provider import S3GeneralizedConnConfig
return S3GeneralizedConnConfig(
bucket=self.bucket,
region=self.region,
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
loader_endpoint_url=self.loader_endpoint_url,
)
|
boto3_s3_client_from_generalized(cfg)
Build a boto3 S3 client from :class:~graflo.hq.connection_provider.S3GeneralizedConnConfig.
Source code in graflo/object_storage/s3_client.py
| def boto3_s3_client_from_generalized(cfg: "S3GeneralizedConnConfig") -> Any:
"""Build a boto3 S3 client from :class:`~graflo.hq.connection_provider.S3GeneralizedConnConfig`."""
import boto3
return boto3.client(
"s3",
region_name=cfg.region or None,
endpoint_url=cfg.endpoint_url or None,
aws_access_key_id=cfg.aws_access_key_id or None,
aws_secret_access_key=cfg.aws_secret_access_key or None,
)
|
boto3_s3_client_from_minio(cfg)
Build a boto3 S3 client from :class:~graflo.object_storage.config.MinioConfig.
Source code in graflo/object_storage/s3_client.py
| def boto3_s3_client_from_minio(cfg: "MinioConfig") -> Any:
"""Build a boto3 S3 client from :class:`~graflo.object_storage.config.MinioConfig`."""
import boto3
return boto3.client(
"s3",
region_name=cfg.region or None,
endpoint_url=cfg.endpoint_url or None,
aws_access_key_id=cfg.access_key,
aws_secret_access_key=cfg.secret_key,
)
|
ensure_bucket_exists(client, bucket)
Create bucket if it does not exist. Return bucket.
Safe to call repeatedly. Raises :exc:botocore.exceptions.ClientError
for non-recoverable API errors.
Source code in graflo/object_storage/bucket.py
| def ensure_bucket_exists(client: Any, bucket: str) -> str:
"""Create *bucket* if it does not exist. Return *bucket*.
Safe to call repeatedly. Raises :exc:`botocore.exceptions.ClientError`
for non-recoverable API errors.
"""
try:
client.head_bucket(Bucket=bucket)
logger.info("S3 bucket %r already exists", bucket)
return bucket
except ClientError as e:
code = str(e.response.get("Error", {}).get("Code", ""))
http = e.response.get("ResponseMetadata", {}).get("HTTPStatusCode")
missing = code in ("404", "NoSuchBucket", "NotFound") or http == 404
if not missing:
raise
try:
client.create_bucket(Bucket=bucket)
logger.info("Created S3 bucket %r", bucket)
except ClientError as e:
code = str(e.response.get("Error", {}).get("Code", ""))
if code in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"):
logger.info(
"S3 bucket %r already exists (race or concurrent create)", bucket
)
else:
raise
return bucket
|
ensure_staging_bucket_for_config(cfg=None)
Ensure :attr:~graflo.object_storage.config.MinioConfig.bucket exists.
If cfg is None, loads :meth:~graflo.object_storage.config.MinioConfig.from_docker_env.
Source code in graflo/object_storage/bucket.py
| def ensure_staging_bucket_for_config(cfg: "MinioConfig | None" = None) -> str:
"""Ensure :attr:`~graflo.object_storage.config.MinioConfig.bucket` exists.
If *cfg* is ``None``, loads :meth:`~graflo.object_storage.config.MinioConfig.from_docker_env`.
"""
from graflo.object_storage.config import MinioConfig
from graflo.object_storage.s3_client import boto3_s3_client_from_minio
resolved = cfg or MinioConfig.from_docker_env()
client = boto3_s3_client_from_minio(resolved)
try:
return ensure_bucket_exists(client, resolved.bucket)
except EndpointConnectionError as e:
raise RuntimeError(
f"Cannot connect to {resolved.endpoint_url!r}. "
"Start MinIO (e.g. `docker compose --env-file .env --profile graflo.minio up -d` "
"in `docker/minio`) and ensure `MINIO_API_PORT` / `MINIO_ENDPOINT` matches the "
"published S3 API port (default host 9000). The web console (e.g. :9001/endpoints) "
"is not the S3 endpoint. If `docker compose` reports 'port is already allocated', "
"set `MINIO_CONSOLE_PORT` and/or `MINIO_API_PORT` to free ports, remove the stuck "
"container (`docker rm -f graflo.minio`), and bring the stack up again."
) from e
|
parse_dotenv_file(env_file)
Parse a simple KEY=value .env file (no shell expansion).
Source code in graflo/object_storage/config.py
| def parse_dotenv_file(env_file: Path) -> dict[str, str]:
"""Parse a simple ``KEY=value`` .env file (no shell expansion)."""
env_vars: dict[str, str] = {}
with open(env_file, encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, value = line.split("=", 1)
env_vars[key.strip()] = value.strip().strip('"').strip("'")
return env_vars
|
upload_staged_csvs(*, staged_files, bucket, key_prefix, session_id, s3_cfg)
Upload files and return manifest key -> s3://bucket/key for GSQL.
Source code in graflo/object_storage/upload.py
| def upload_staged_csvs(
*,
staged_files: dict[str, Path],
bucket: str,
key_prefix: str,
session_id: str,
s3_cfg: "S3GeneralizedConnConfig",
) -> dict[str, str]:
"""Upload files and return manifest key -> ``s3://bucket/key`` for GSQL."""
client = boto3_s3_client_from_generalized(s3_cfg)
base = "/".join(p.strip("/") for p in (key_prefix, session_id) if p)
urls: dict[str, str] = {}
for label, path in staged_files.items():
safe_name = path.name
s3_key = f"{base}/{safe_name}" if base else safe_name
client.upload_file(str(path), bucket, s3_key)
urls[label] = f"s3://{bucket}/{s3_key}"
return urls
|