TigerGraph bulk load (CSV staging + LOADING JOB)¶
For TigerGraph targets, GraFlo can optionally bypass per-record REST++ JSON upserts and instead append typed CSV files during ingestion, then run a single CREATE LOADING JOB / RUN LOADING JOB sequence at the end of the ingest. The default REST path is unchanged when bulk load is disabled.
When to use it¶
- Large batch or initial loads where REST++ overhead dominates.
- When TigerGraph can read staged files from local disk (same host as the loader) or from S3 (or an S3-compatible endpoint).
Configuration¶
Target database (TigergraphConfig)¶
Set a nested bulk_load block on TigergraphConfig (see TigergraphBulkLoadConfig in code):
| Field | Role |
|---|---|
enabled |
Turn bulk mode on. |
staging_dir |
Local directory for CSV files (a session subfolder is created per run). |
separator, include_header, etc. |
CSV layout passed through to LOAD ... USING. |
loading_job |
concurrency, batch_size, job_name_prefix, run_mode, drop_job_after_run. |
s3_staging_name |
Optional: name of a row in bindings.staging_proxy → resolves conn_proxy. |
s3_conn_proxy |
Optional: use this proxy label directly (no manifest row). |
s3_bucket, s3_key_prefix |
Destination for upload; bucket may also live on S3GeneralizedConnConfig. |
Manifest (Bindings.staging_proxy)¶
Input connectors stay on connectors / resource_connector / connector_connection as today. Staging (S3 credentials) uses a parallel list that only carries names, not secrets:
At runtime, conn_proxy must be registered on InMemoryConnectionProvider as an S3GeneralizedConnConfig.
Runtime provider¶
Call register_generalized_config(conn_proxy="minio_bulk", config=S3GeneralizedConnConfig(...)) and pass that provider into GraphEngine.ingest (or Caster.ingest). The manifest never stores AWS keys.
Execution flow¶
Ingestion coordinates begin/finalize through the backend-agnostic BulkSessionCoordinator (graflo.hq.bulk_session); each cast batch appends via DBWriter.write(..., bulk_session_id=...) when a session is active.
- Begin — First batch opens a bulk session (CSV writers under
staging_dir/<session_id>/). - Append — Each cast batch appends rows per physical vertex/edge type.
- Finalize — After all resources: optionally upload to S3, build GSQL with
DEFINE FILENAMEpointing atfile://ors3://URLs,RUN LOADING JOB, optionallyDROP JOB.
Limitations (current release)¶
blank_verticesin the logical schema are rejected atbulk_load_begin.- Resources with
extra_weights(DB lookups during ingest) cannot use bulk for that resource; use REST ingest or remove extra weights for those resources.
Upsert semantics differ from REST: native LOAD is oriented toward append semantics; plan idempotency and clears according to your operations model.
Emulating S3 in development¶
GraFlo uses boto3. Any endpoint that speaks the S3 API works if you set endpoint_url on S3GeneralizedConnConfig:
- MinIO (very common): run a container, create a bucket, point
endpoint_urlathttp://127.0.0.1:9000(or your mapped port), use the MinIO root user/password asaws_access_key_id/aws_secret_access_key. - LocalStack — S3-compatible endpoint for local stacks.
- moto (Python tests) —
@mock_awsfrom themotolibrary mocks boto3 calls in-process; useful for unit tests, not for TigerGraph itself (the database still needs a reals3://it can reach unless you test upload-only code paths).
For end-to-end tests against a real loader, MinIO or cloud S3 with matching VPC/network access is the usual approach.
TigerGraph resolves s3:// paths via a GSQL CREATE DATA_SOURCE (S3 credentials and, for MinIO, file.reader.settings.fs.s3a.endpoint and path-style access). GraFlo emits that statement when bulk staging uploads to S3. The endpoint in that data source must be reachable from the TigerGraph server; if TigerGraph runs in Docker and MinIO on the host, set MINIO_LOADER_ENDPOINT in docker/minio/.env to a URL visible inside the TG container (see docker/README.md MinIO section).
See also¶
- Example 10: TigerGraph bulk load and S3 staging
- Implementation:
graflo/db/tigergraph/bulk_csv.py(CSV layout),graflo/db/tigergraph/bulk_gsql.py(GSQL generation)