graflo.data_source¶
Data source abstraction layer for graph database ingestion.
This package provides a unified interface for different data source types, separating "where data comes from" (DataSource) from "how it's transformed" (Resource).
Key Components
- AbstractDataSource: Base class for all data sources
- FileDataSource: File-based data sources (JSON, JSONL, CSV/TSV)
- APIDataSource: REST API runtime executor (built from APIConnector + conn_proxy)
- SQLDataSource: SQL database data source
- DataSourceRegistry: Maps DataSources to Resource names
APIConfig
¶
Bases: ConfigBaseModel
Merged runtime configuration for REST API requests.
Built exclusively via :meth:APIConnector.build_api_config; not intended
for direct construction in manifests or factory helpers.
Source code in graflo/data_source/api.py
APIConnector
¶
Bases: ResourceConnector
Connector for REST API endpoints.
Declares the non-secret access pattern (path, method, pagination). Runtime
base_url and credentials are supplied via connector_connection ->
conn_proxy -> :class:~graflo.hq.connection_provider.ApiGeneralizedConnConfig.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
str
|
Relative endpoint path (e.g. |
method |
str
|
HTTP method (default |
params |
dict[str, Any]
|
Static query parameters. |
pagination |
PaginationConfig | None
|
Pagination strategy and response path configuration. |
row_annotations |
dict[str, Any]
|
Constant fields merged into every fetched row (doc wins). |
headers |
dict[str, str]
|
Non-secret HTTP headers. |
timeout |
float | None
|
Request timeout in seconds. |
retries |
int
|
Number of retry attempts. |
retry_backoff_factor |
float
|
Backoff factor for retries. |
retry_status_forcelist |
list[int]
|
HTTP status codes to retry on. |
verify |
bool
|
Verify SSL certificates. |
Source code in graflo/architecture/contract/bindings/connectors.py
710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 | |
build_api_config(*, base_url, auth=None, default_headers=None, page_size_override=None)
¶
Merge contract fields with runtime connection config into APIConfig.
Source code in graflo/architecture/contract/bindings/connectors.py
matches(resource_identifier)
¶
Match resource name, connector name, or path tail.
Source code in graflo/architecture/contract/bindings/connectors.py
APIDataSource
¶
Bases: AbstractDataSource
Data source for REST API endpoints.
Source code in graflo/data_source/api.py
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 | |
AbstractDataSource
¶
Bases: ConfigBaseModel, ABC
Abstract base class for all data sources.
Data sources handle data retrieval from various sources and provide a unified interface for batch iteration. They are separate from Resources, which handle data transformation. Many DataSources can map to the same Resource.
Attributes:
| Name | Type | Description |
|---|---|---|
source_type |
DataSourceType
|
Type of the data source |
resource_name |
str | None
|
Name of the resource this data source maps to (set externally via DataSourceRegistry) |
Source code in graflo/data_source/base.py
resource_name
property
writable
¶
Get the resource name this data source maps to.
Returns:
| Type | Description |
|---|---|
str | None
|
Resource name or None if not set |
__iter__()
¶
Make data source iterable, yielding individual items.
Yields:
| Name | Type | Description |
|---|---|---|
dict |
Individual documents |
iter_batches(batch_size=1000, limit=None)
abstractmethod
¶
Iterate over data in batches.
This method yields batches of documents (dictionaries) from the data source. Each batch is a list of dictionaries representing the data items.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size
|
int
|
Maximum number of items per yielded batch (last batch may be smaller). |
1000
|
limit
|
int | None
|
Cap on total items read from this source (rows, JSON
documents, SPARQL subjects after grouping, etc.), not a cap on
batches. |
None
|
Yields:
| Type | Description |
|---|---|
list[dict]
|
list[dict]: Batches of documents as dictionaries |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
Must be implemented by subclasses |
Source code in graflo/data_source/base.py
ApiResponseStructure
¶
Bases: ConfigBaseModel
Maps JSON response envelope fields to extraction and pagination signals.
Source code in graflo/architecture/contract/bindings/connectors.py
DataSourceFactory
¶
Factory for creating data source instances.
Source code in graflo/data_source/factory.py
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | |
DataSourceRegistry
¶
Bases: ConfigBaseModel
Registry for mapping data sources to resource names.
This class maintains a mapping from resource names to lists of data sources. Many data sources can map to the same resource, allowing data to be ingested from multiple sources and combined.
Attributes:
| Name | Type | Description |
|---|---|---|
sources |
dict[str, list[AbstractDataSource]]
|
Dictionary mapping resource names to lists of data sources |
Source code in graflo/data_source/registry.py
clear()
¶
get_all_data_sources()
¶
Get all registered data sources.
Returns:
| Type | Description |
|---|---|
list[AbstractDataSource]
|
List of all registered data sources |
Source code in graflo/data_source/registry.py
get_data_sources(resource_name)
¶
Get all data sources for a resource.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource_name
|
str
|
Name of the resource |
required |
Returns:
| Type | Description |
|---|---|
list[AbstractDataSource]
|
List of data sources for the resource (empty list if none found) |
Source code in graflo/data_source/registry.py
has_resource(resource_name)
¶
Check if a resource has any data sources.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource_name
|
str
|
Name of the resource |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the resource has data sources, False otherwise |
Source code in graflo/data_source/registry.py
register(data_source, resource_name)
¶
Register a data source for a resource.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_source
|
AbstractDataSource
|
Data source to register |
required |
resource_name
|
str
|
Name of the resource to map to |
required |
Source code in graflo/data_source/registry.py
DataSourceType
¶
Bases: BaseEnum
Types of data sources supported by the system.
FILE: File-based data sources (JSON, JSONL, CSV/TSV) API: REST API data sources SQL: SQL database data sources IN_MEMORY: In-memory data sources (lists, DataFrames) SPARQL: RDF data sources (local files via rdflib, remote endpoints via SPARQLWrapper)
Source code in graflo/data_source/base.py
FileDataSource
¶
Bases: AbstractDataSource
Base class for file-based data sources.
This class provides a common interface for file-based data sources, integrating with the existing chunker system for batch processing.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path | str
|
Path to the file |
file_type |
str | None
|
Type of file (json, jsonl, table) |
encoding |
EncodingType
|
File encoding (default: UTF_8) |
Source code in graflo/data_source/file.py
iter_batches(batch_size=1000, limit=None)
¶
Iterate over file data in batches.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size
|
int
|
Number of items per batch |
1000
|
limit
|
int | None
|
Maximum number of items to retrieve |
None
|
Yields:
| Type | Description |
|---|---|
list[dict]
|
list[dict]: Batches of documents as dictionaries |
Source code in graflo/data_source/file.py
InMemoryDataSource
¶
Bases: AbstractDataSource
Data source for in-memory data structures.
This class provides a data source for Python objects that are already in memory, including lists of dictionaries, lists of lists, and Pandas DataFrames.
Attributes:
| Name | Type | Description |
|---|---|---|
data |
list[dict] | list[list] | DataFrame
|
Data to process (list[dict], list[list], or pd.DataFrame) |
columns |
list[str] | None
|
Optional column names for list[list] data |
Source code in graflo/data_source/memory.py
iter_batches(batch_size=1000, limit=None)
¶
Iterate over in-memory data in batches.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size
|
int
|
Number of items per batch |
1000
|
limit
|
int | None
|
Maximum number of items to retrieve |
None
|
Yields:
| Type | Description |
|---|---|
list[dict]
|
list[dict]: Batches of documents as dictionaries |
Source code in graflo/data_source/memory.py
JsonFileDataSource
¶
Bases: FileDataSource
Data source for JSON files.
JSON files are expected to contain hierarchical data structures, similar to REST API responses. The chunker handles nested structures and converts them to dictionaries.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path | str
|
Path to the JSON file |
encoding |
EncodingType
|
File encoding (default: UTF_8) |
Source code in graflo/data_source/file.py
JsonlFileDataSource
¶
Bases: FileDataSource
Data source for JSONL (JSON Lines) files.
JSONL files contain one JSON object per line, making them suitable for streaming and batch processing.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path | str
|
Path to the JSONL file |
encoding |
EncodingType
|
File encoding (default: UTF_8) |
Source code in graflo/data_source/file.py
PaginationConfig
¶
Bases: ConfigBaseModel
Configuration for API pagination (contract-level, secret-free).
Combines request construction (request) with response envelope parsing
(response).
Source code in graflo/architecture/contract/bindings/connectors.py
PaginationRequestConfig
¶
Bases: ConfigBaseModel
Configuration for building paginated HTTP requests.
Source code in graflo/architecture/contract/bindings/connectors.py
RdfDataSource
¶
Bases: AbstractDataSource, ABC
Abstract base for RDF data sources (file and endpoint).
Captures the fields and batch-yielding logic shared by both
:class:RdfFileDataSource and :class:SparqlEndpointDataSource.
Attributes:
| Name | Type | Description |
|---|---|---|
rdf_class |
str | None
|
Optional URI of the |
Source code in graflo/data_source/rdf.py
RdfFileDataSource
¶
Bases: RdfDataSource
Data source for local RDF files.
Parses RDF files using rdflib and yields flat dictionaries grouped by
subject URI. Optionally filters by rdf_class so that only instances
of a specific class are returned.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path
|
Path to the RDF file. |
rdf_format |
str | None
|
Explicit rdflib format string (e.g. |
Source code in graflo/data_source/rdf.py
iter_batches(batch_size=1000, limit=None)
¶
Parse the RDF file and yield batches of flat dictionaries.
Source code in graflo/data_source/rdf.py
SQLConfig
¶
Bases: ConfigBaseModel
Configuration for SQL data source.
Uses SQLAlchemy connection string format.
Attributes:
| Name | Type | Description |
|---|---|---|
connection_string |
str
|
SQLAlchemy connection string (e.g., 'postgresql://user:pass@localhost/dbname') |
query |
str
|
SQL query string (supports parameterized queries) |
params |
dict[str, Any]
|
Query parameters as dictionary (for parameterized queries) |
pagination |
bool | None
|
Deprecated. Ignored; retained for config compatibility. |
page_size |
int | None
|
Deprecated. Ignored; use |
Source code in graflo/data_source/sql.py
SQLDataSource
¶
Bases: AbstractDataSource
Data source for SQL databases.
This class provides a data source for SQL databases using SQLAlchemy.
Results are streamed with stream_results and fetchmany so large
queries avoid OFFSET-based re-scans and bounded memory per chunk.
Rows are returned as dictionaries with column names as keys.
Attributes:
| Name | Type | Description |
|---|---|---|
config |
SQLConfig
|
SQL configuration |
engine |
SQLConfig
|
SQLAlchemy engine (created on first use) |
Source code in graflo/data_source/sql.py
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 | |
iter_batches(batch_size=1000, limit=None)
¶
Iterate over SQL query results in batches.
Executes the configured query once per call and reads via
fetchmany on a streaming result. Optional limit stops after
that many rows without adding LIMIT/OFFSET to the SQL text.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size
|
int
|
Target size of each yielded batch of row dicts (last batch may be smaller). |
1000
|
limit
|
int | None
|
Maximum total rows to read, or |
None
|
Yields:
| Type | Description |
|---|---|
list[dict]
|
list[dict]: Batches of rows as dictionaries |
Source code in graflo/data_source/sql.py
SparqlEndpointDataSource
¶
Bases: RdfDataSource
Data source that reads from a SPARQL endpoint.
Uses SPARQLWrapper to query an endpoint and returns flat dictionaries
grouped by subject.
Attributes:
| Name | Type | Description |
|---|---|---|
config |
SparqlSourceConfig
|
SPARQL source configuration. |
Source code in graflo/data_source/rdf.py
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 | |
iter_batches(batch_size=1000, limit=None)
¶
Query the SPARQL endpoint and yield batches of flat dictionaries.
Paginates with SPARQL LIMIT/OFFSET on bindings (triple rows), merges rows into subject documents in a streaming fashion, and stops fetching once limit subjects have been yielded (when set).
Source code in graflo/data_source/rdf.py
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 | |
SparqlSourceConfig
¶
Bases: ConfigBaseModel
Configuration for a SPARQL endpoint data source.
Attributes:
| Name | Type | Description |
|---|---|---|
endpoint_url |
str
|
Full SPARQL query endpoint URL
(e.g. |
rdf_class |
str | None
|
URI of the rdf:Class whose instances to fetch |
graph_uri |
str | None
|
Named graph to restrict the query to (optional) |
sparql_query |
str | None
|
Custom SPARQL query override (optional) |
username |
str | None
|
HTTP basic-auth username (optional) |
password |
str | None
|
HTTP basic-auth password (optional) |
page_size |
int
|
Number of results per SPARQL LIMIT/OFFSET page |
Source code in graflo/data_source/rdf.py
build_query(offset=0, limit=None)
¶
Build a SPARQL SELECT query.
If sparql_query is set it is returned with LIMIT/OFFSET appended. Otherwise generates::
SELECT ?s ?p ?o WHERE { ?s a <rdf_class> . ?s ?p ?o . }
Source code in graflo/data_source/rdf.py
TableFileDataSource
¶
Bases: FileDataSource
Data source for CSV/TSV files.
Table files are converted to dictionaries with column headers as keys. Each row becomes a dictionary.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path | str
|
Path to the CSV/TSV file |
encoding |
EncodingType
|
File encoding (default: UTF_8) |
sep |
str
|
Field separator (default: ',') |