class BulkCsvAppender:
"""Thread-safe appender for per-type CSV files under a staging directory."""
def __init__(
self,
*,
staging_dir: Path,
bulk_cfg: TigergraphBulkLoadConfig,
schema_db: SchemaDBAware,
) -> None:
if schema_db.db_profile.db_flavor != DBType.TIGERGRAPH:
raise ValueError("BulkCsvAppender requires TigerGraph db_flavor")
self._staging_dir = staging_dir
self._bulk_cfg = bulk_cfg
self._schema_db = schema_db
self._lock = threading.Lock()
self._open_files: dict[str, TextIO] = {}
self._writers: dict[str, Any] = {}
self._manifest: dict[str, Path] = {}
@property
def staged_file_paths(self) -> dict[str, Path]:
"""Map manifest keys ``v:<physical>`` / ``e:<edge>`` to local CSV paths."""
return dict(self._manifest)
def _csv_params(self) -> dict[str, Any]:
return {
"delimiter": self._bulk_cfg.separator,
"quotechar": self._bulk_cfg.quote_char,
"lineterminator": self._bulk_cfg.line_terminator,
}
def _path_for_vertex(self, physical: str) -> Path:
return self._staging_dir / f"{_slug_filename_token(physical)}.csv"
def _path_for_edge(self, physical_edge: str) -> Path:
return self._staging_dir / f"edge_{_slug_filename_token(physical_edge)}.csv"
def _ensure_writer(self, key: str, path: Path, columns: list[str]) -> Any:
if key not in self._open_files:
path.parent.mkdir(parents=True, exist_ok=True)
self._manifest.setdefault(key, path)
write_header = not path.exists() or path.stat().st_size == 0
fh = open(path, "a", encoding="utf-8", newline="")
self._open_files[key] = fh
writer = csv.writer(fh, **self._csv_params())
self._writers[key] = writer
if self._bulk_cfg.include_header and write_header:
writer.writerow(columns)
return self._writers[key]
def append_graph_container(self, gc: GraphContainer, schema: Schema) -> None:
with self._lock:
vc = self._schema_db.vertex_config
for vlogical, docs in gc.vertices.items():
phys = vc.vertex_dbname(vlogical)
cols = vertex_column_order(vlogical, self._schema_db)
w = self._ensure_writer(f"v:{phys}", self._path_for_vertex(phys), cols)
for doc in docs:
clean = clean_document_for_staging(doc)
row = [_format_csv_value(clean.get(c)) for c in cols]
w.writerow(row)
ec = self._schema_db.edge_config
for edge_id, docs in gc.edges.items():
if edge_id not in schema.core_schema.edge_config:
continue
edge = schema.core_schema.edge_config.edge_for(edge_id)
if not docs:
continue
runtime = ec.runtime(edge)
relation_name = runtime.relation_name
if not relation_name:
logger.warning("Skipping edge without relation name: %s", edge_id)
continue
phys_edge = relation_name
_, _, rel_key = edge_id
projected, _ = _project_tg_edge_triples(docs, rel_key, runtime=runtime)
cols = edge_column_order(edge, self._schema_db)
w = self._ensure_writer(
f"e:{phys_edge}", self._path_for_edge(phys_edge), cols
)
match_src = tuple(vc.identity_fields(edge.source))
match_tgt = tuple(vc.identity_fields(edge.target))
ddl_edge = tigergraph_ddl_edge_projection(edge, ec)
disc = edge_identity_discriminator_field_names(ddl_edge)
n_src = len(match_src)
n_tgt = len(match_tgt)
attr_tail = cols[n_src + n_tgt + len(disc) :]
for src_doc, tgt_doc, weight in projected:
clean_w = clean_document_for_staging(weight)
src_part = [_format_csv_value(src_doc.get(k)) for k in match_src]
tgt_part = [_format_csv_value(tgt_doc.get(k)) for k in match_tgt]
mid = [_format_csv_value(clean_w.get(k)) for k in disc]
tail = [_format_csv_value(clean_w.get(k)) for k in attr_tail]
w.writerow([*src_part, *tgt_part, *mid, *tail])
def close(self) -> None:
with self._lock:
for fh in self._open_files.values():
fh.close()
self._open_files.clear()
self._writers.clear()