Document cast errors and doc error sink¶
When a resource maps a source document (one item from a batch: a JSON object, CSV row as dict, grouped RDF subject, API element, etc.) into graph data, a single document can fail while others in the same batch succeed. Ingestion behavior is controlled by IngestionParams on Caster (and the same parameters flow through GraphEngine and the ingest CLI).
on_doc_error¶
skip(default): the bad document is skipped; the batch continues. Failures are recorded (see below) and a summary is logged at WARNING for the batch.fail: any document exception fails the whole batch (same as a hard error during casting).
Persisting failures: doc_error_sink_path¶
Set IngestionParams.doc_error_sink_path to a filesystem path (convention: *.jsonl.gz). The caster appends gzip-compressed JSONL: each line is one JSON object matching DocCastFailure (resource name, doc_index within the batch, exception type, message, traceback, optional document preview). Writes are serialized with an internal async lock so concurrent batches do not corrupt the file.
Each append may add a new gzip member to the file (normal for log-style gzip). Tools such as zcat, gzip -dc, or pigz -dc stream all concatenated members, for example:
When no file sink is configured¶
If doc_error_sink_path is None, skipped failures are emitted as structured logger.error entries (with JSON-serializable metadata in the log extra under doc_cast_failure). Use a file sink when you need durable, replayable records for debugging or reprocessing.
Optional caps¶
-
max_doc_errors: if the total number of persisted document failures across the run exceeds this limit, ingestion raisesDocErrorBudgetExceeded(after writing the failures that pushed over the limit). Use this to stop a bad source early. -
doc_error_preview_max_bytesanddoc_error_preview_keys: bound the size and shape of thedoc_previewfield onDocCastFailureso logs and files stay readable and bounded.
CLI¶
The ingest command accepts:
uv run ingest \
--db-config-path config/db.yaml \
--schema-path config/manifest.yaml \
--source-path data/ \
--on-doc-error skip \
--doc-error-sink ./artifacts/doc_cast_failures.jsonl.gz
Programmatic use¶
from pathlib import Path
from graflo.hq.caster import IngestionParams
ingestion_params = IngestionParams(
on_doc_error="skip",
doc_error_sink_path=Path("artifacts/doc_cast_failures.jsonl.gz"),
max_doc_errors=10_000,
)
Extensibility¶
Additional sink types can implement the DocErrorSink protocol (async write_failures(failures)) and be wired from your own orchestration code; the built-in path is JsonlGzDocErrorSink behind doc_error_sink_path.