Data Source Reference¶
This section documents the data source abstraction layer in graflo. Data sources define where data comes from, separate from Resources which define how data is transformed.
Overview¶
Data sources handle data retrieval from various sources:
- File Data Sources: JSON, JSONL, CSV/TSV files
- API Data Sources: REST API endpoints
- SQL Data Sources: SQL databases
- In-Memory Data Sources: Python objects (lists, DataFrames)
Many data sources can map to the same Resource, allowing flexible data ingestion.
Core Classes¶
AbstractDataSource¶
Base class for all data sources. Provides:
- Unified batch iteration interface (
iter_batches()) - Resource name mapping
- Type information
DataSourceFactory¶
Factory for creating data source instances:
- Automatic type detection
- Configuration-based creation
- Support for all data source types
DataSourceRegistry¶
Maps data sources to resource names:
- Register multiple data sources per resource
- Retrieve data sources by resource name
- Manage data source lifecycle
File Data Sources¶
FileDataSource¶
Base class for file-based data sources.
JsonFileDataSource¶
For JSON files with hierarchical data structures.
JsonlFileDataSource¶
For JSONL (JSON Lines) files - one JSON object per line.
TableFileDataSource¶
For CSV/TSV files with configurable separator.
API Data Sources¶
REST API ingestion uses APIConnector in manifest bindings plus runtime credentials via conn_proxy. Full guide: API connector and pagination.
APIConnector¶
Manifest contract for REST API endpoints:
path— relative path appended to runtimebase_urlmethod,params,headers, HTTP retry/timeout optionspagination— optionalPaginationConfig(see below)
APIDataSource¶
Runtime HTTP executor. Built by RegistryBuilder from APIConnector + RestApiConnConfig; not constructed directly.
PaginationConfig¶
Contract-level pagination on APIConnector. Two sub-blocks:
request(PaginationRequestConfig) — how to build paginated HTTP requestsresponse(ApiResponseStructure) — how to parse JSON response envelopes
Three request strategies:
| Strategy | Query params (defaults) | When to use |
|---|---|---|
offset |
offset, limit |
Skip/limit APIs (?offset=0&limit=100) |
page |
page, per_page |
Page-index APIs (?page=1&per_page=25) |
cursor |
cursor |
Opaque next-token APIs |
Key fields:
request.page_size— records per HTTP request (overridden byIngestionParams.batch_sizewhen set)response.records_path— dot path to the JSON record list (e.g.results,data, or0.resultsfor an array-wrapped envelope)response.next_offset_path— server-provided next offset (e.g.next_offset)response.has_more_path— boolean stop signal (e.g.has_more)response.cursor_path— next cursor token for cursor strategyresponse.auto_detect— infer unset response paths from the first object response body (not array-wrapped[{...}]envelopes)
Dot paths support numeric segments for list indexing (e.g. 0.results when the API returns [{"results": [...]}]). See Dot paths and response shapes in the API connector guide.
See API connector and pagination for loop behaviour, examples per strategy, and field reference.
ApiAuth / RestApiConnConfig¶
Runtime base_url and credentials (bearer, basic, digest, api_key) in graflo.hq.connection_provider, registered on a ConnectionProvider.
Env wiring — map each conn_proxy to env vars (user_service → USER_SERVICE_BASE_URL, USER_SERVICE_AUTH_TYPE, …) and call register_all_api_configs_from_env(bindings) or register_api_config_from_env(conn_proxy). See API connector and pagination and Example 14.
SQL Data Sources¶
SQLDataSource¶
SQL database connector using SQLAlchemy.
SQLConfig¶
SQL configuration:
- Connection string (SQLAlchemy format)
- Query string with parameterized queries
- Pagination support
In-Memory Data Sources¶
InMemoryDataSource¶
For Python objects already in memory:
list[dict]: List of dictionarieslist[list]: List of lists (requires column names)pd.DataFrame: Pandas DataFrame
Usage Examples¶
File Data Source¶
from graflo.data_source import DataSourceFactory
# Automatic type detection
source = DataSourceFactory.create_file_data_source(path="data.json")
# Explicit type with custom separator
source = DataSourceFactory.create_file_data_source(
path="data.csv",
file_type="table",
sep="\t"
)
API Data Source (via bindings)¶
from graflo.architecture.contract.bindings import (
APIConnector,
ApiResponseStructure,
Bindings,
PaginationConfig,
PaginationRequestConfig,
)
from graflo.hq.connection_provider import InMemoryConnectionProvider
connector = APIConnector(
name="users_api",
path="/api/users",
pagination=PaginationConfig(
request=PaginationRequestConfig(strategy="offset", page_size=100),
response=ApiResponseStructure(records_path="data"),
),
)
bindings = Bindings(
connectors=[connector],
resource_connector=[{"resource": "users", "connector": "users_api"}],
connector_connection=[{"connector": "users_api", "conn_proxy": "api_source"}],
)
# export API_SOURCE_BASE_URL, API_SOURCE_AUTH_TYPE=bearer, API_SOURCE_TOKEN=...
provider = InMemoryConnectionProvider()
provider.register_all_api_configs_from_env(bindings=bindings)
Manual registration remains available via register_generalized_config + RestApiConnConfig / ApiAuth — see API connector and pagination.
SQL Data Source¶
from graflo.data_source import DataSourceFactory, SQLConfig
config = SQLConfig(
connection_string="postgresql://user:pass@localhost/db",
query="SELECT * FROM users WHERE active = :active",
params={"active": True},
)
source = DataSourceFactory.create_sql_data_source(config)
Using with GraphEngine (API via bindings)¶
API sources are registered automatically when you call GraphEngine.define_and_ingest with bindings that include APIConnector rows and a ConnectionProvider. See Quick Start — Using API Data Sources and API connector and pagination.
For file/SQL sidecar configs and manual DataSourceRegistry wiring (non-API sources):
import asyncio
from graflo import Caster, DataSourceRegistry
from graflo.hq.caster import IngestionParams
registry = DataSourceRegistry()
registry.register(file_source, resource_name="users")
caster = Caster(schema=schema, ingestion_model=ingestion_model)
ingestion_params = IngestionParams(
batch_size=1000,
clear_data=False,
# resources=["users"],
# connectors=["users_files"], # connector name or hash; intersects with resources
)
asyncio.run(
caster.ingest_data_sources(
data_source_registry=registry,
conn_conf=conn_conf,
ingestion_params=ingestion_params,
)
)