def build_create_and_run_loading_job(
*,
graph_name: str,
job_name: str,
schema_db: SchemaDBAware,
staged_files: dict[str, Path],
bulk_cfg: TigergraphBulkLoadConfig,
path_for_gsql: dict[str, str],
tigergraph_s3_loader: "S3GeneralizedConnConfig | None" = None,
tigergraph_s3_data_source_name: str | None = None,
) -> str:
"""Build a GSQL script: USE GRAPH, optional DATA_SOURCE, CREATE LOADING JOB, RUN.
*path_for_gsql* maps manifest keys (``v:...``, ``e:...``) to absolute local paths
or ``s3://`` URLs.
TigerGraph requires a ``CREATE DATA_SOURCE`` with credentials and (for MinIO) a
custom endpoint when using ``s3://`` paths; filenames must look like
``"$data_source_name:s3://bucket/key"``. Pass *tigergraph_s3_loader* when any path
is ``s3://`` (typically the same :class:`~graflo.hq.connection_provider.S3GeneralizedConnConfig`
used for boto3 upload).
"""
if schema_db.db_profile.db_flavor != DBType.TIGERGRAPH:
raise ValueError("bulk_gsql requires TigerGraph schema projection")
vc = schema_db.vertex_config
ec = schema_db.edge_config
sep = bulk_cfg.separator
header_flag = "true" if bulk_cfg.include_header else "false"
sep_esc = sep.replace("\\", "\\\\").replace('"', '\\"')
needs_s3_data_source = any(v.startswith("s3://") for v in path_for_gsql.values())
ds_name: str | None = None
if needs_s3_data_source:
if tigergraph_s3_loader is None:
raise ValueError(
"TigerGraph LOADING JOB with s3:// paths requires "
"tigergraph_s3_loader (S3GeneralizedConnConfig): "
"use CREATE DATA_SOURCE with credentials and a MinIO endpoint. "
"Bare s3:// URLs are resolved against AWS and will not load from MinIO."
)
ds_name = tigergraph_s3_data_source_name or "gf_s3_loader"
if not _DATA_SOURCE_NAME_RE.match(ds_name):
raise ValueError(
f"Invalid GSQL data source name {ds_name!r} "
"(use letters, digits, underscore; start with letter or _)"
)
_tigergraph_s3_data_source_dict(tigergraph_s3_loader)
header_lines: list[str] = [f"USE GRAPH {graph_name}"]
if (
needs_s3_data_source
and ds_name is not None
and tigergraph_s3_loader is not None
):
ds_json = json.dumps(
_tigergraph_s3_data_source_dict(tigergraph_s3_loader),
separators=(",", ":"),
)
header_lines.append(
f'CREATE DATA_SOURCE {ds_name} = """{ds_json}""" FOR GRAPH {graph_name}'
)
body_lines: list[str] = [
*header_lines,
f"CREATE LOADING JOB {job_name} FOR GRAPH {graph_name} {{",
]
fn_counter = 0
def append_load(label: str, physical: str, ncols: int, edge_mode: bool) -> None:
nonlocal fn_counter
if ncols <= 0:
return
gsql_path = path_for_gsql.get(label) or str(staged_files[label].resolve())
fname = f"f{fn_counter}"
fn_counter += 1
vals = _values_placeholders(ncols)
target_kw = "EDGE" if edge_mode else "VERTEX"
rhs = _define_filename_rhs(gsql_path, data_source_name=ds_name)
body_lines.append(f" DEFINE FILENAME {fname} = {rhs};")
body_lines.append(
f" LOAD {fname} TO {target_kw} {physical} VALUES ({vals}) "
f'USING HEADER="{header_flag}", SEPARATOR="{sep_esc}"'
)
for key in sorted(k for k in staged_files if k.startswith("v:")):
physical = key[2:]
logical = _logical_vertex_for_physical(vc, physical)
append_load(
key,
physical,
len(vertex_column_order(logical, schema_db)),
edge_mode=False,
)
for key in sorted(k for k in staged_files if k.startswith("e:")):
physical = key[2:]
try:
edge = _first_edge_for_physical_relation(ec, physical)
except KeyError:
continue
append_load(
key,
physical,
len(edge_column_order(edge, schema_db)),
edge_mode=True,
)
body_lines.append("}")
job_opts = bulk_cfg.loading_job
body_lines.append(
f"RUN LOADING JOB {job_name} "
f"USING CONCURRENCY={job_opts.concurrency}, BATCH_SIZE={job_opts.batch_size}"
)
if job_opts.drop_job_after_run:
body_lines.append(f"DROP JOB {job_name}")
if needs_s3_data_source and ds_name is not None:
body_lines.append(f"DROP DATA_SOURCE {ds_name}")
return "\n".join(body_lines) + "\n"