def bulk_load_finalize( # noqa: PLR0912
conn: TigerGraphConnection,
session_id: str,
schema: Schema,
*,
bindings: Bindings | None = None,
connection_provider: ConnectionProvider | None = None,
) -> str:
"""Upload to S3 when configured, then CREATE/RUN/DROP LOADING JOB."""
_ = schema
with _tiger_bulk_sessions_lock:
if session_id not in _tiger_bulk_sessions:
raise KeyError(f"Unknown TigerGraph bulk session {session_id!r}")
appender, bulk_cfg, schema_db, _staging_root = _tiger_bulk_sessions.pop(
session_id
)
appender.close()
staged = appender.staged_file_paths
if not staged:
return ""
graph_name = conn._require_configured_graph_name()
job_name = f"{bulk_cfg.loading_job.job_name_prefix}_{session_id}"
path_for_gsql: dict[str, str] = {k: str(v.resolve()) for k, v in staged.items()}
proxy = bulk_cfg.resolve_s3_conn_proxy(bindings)
bucket = bulk_cfg.s3_bucket
tigergraph_s3_loader: S3GeneralizedConnConfig | None = None
if proxy and connection_provider is not None:
from graflo.hq.connection_provider import S3GeneralizedConnConfig
gen = connection_provider.get_generalized_config_by_proxy(proxy)
if isinstance(gen, S3GeneralizedConnConfig):
tigergraph_s3_loader = gen
resolved_bucket = bucket or gen.bucket
if not resolved_bucket:
raise ValueError(
"S3 bulk staging requires TigergraphBulkLoadConfig.s3_bucket "
"or S3GeneralizedConnConfig.bucket"
)
path_for_gsql = upload_staged_csvs(
staged_files=staged,
bucket=resolved_bucket,
key_prefix=bulk_cfg.s3_key_prefix,
session_id=session_id,
s3_cfg=gen,
)
if bulk_cfg.loading_job.run_mode == "run_only":
gsql = build_run_loading_job_only(job_name=job_name, opts=bulk_cfg.loading_job)
else:
gsql = build_create_and_run_loading_job(
graph_name=graph_name,
job_name=job_name,
schema_db=schema_db,
staged_files=staged,
bulk_cfg=bulk_cfg,
path_for_gsql=path_for_gsql,
tigergraph_s3_loader=tigergraph_s3_loader,
tigergraph_s3_data_source_name=f"gf_s3_{session_id}",
)
return str(conn._execute_gsql(gsql))