Skip to content

Data Source Reference

This section documents the data source abstraction layer in graflo. Data sources define where data comes from, separate from Resources which define how data is transformed.

Overview

Data sources handle data retrieval from various sources: - File Data Sources: JSON, JSONL, CSV/TSV files - API Data Sources: REST API endpoints - SQL Data Sources: SQL databases - In-Memory Data Sources: Python objects (lists, DataFrames)

Many data sources can map to the same Resource, allowing flexible data ingestion.

Core Classes

AbstractDataSource

Base class for all data sources. Provides: - Unified batch iteration interface (iter_batches()) - Resource name mapping - Type information

DataSourceFactory

Factory for creating data source instances: - Automatic type detection - Configuration-based creation - Support for all data source types

DataSourceRegistry

Maps data sources to resource names: - Register multiple data sources per resource - Retrieve data sources by resource name - Manage data source lifecycle

File Data Sources

FileDataSource

Base class for file-based data sources.

JsonFileDataSource

For JSON files with hierarchical data structures.

JsonlFileDataSource

For JSONL (JSON Lines) files - one JSON object per line.

TableFileDataSource

For CSV/TSV files with configurable separator.

API Data Sources

APIDataSource

REST API connector with full HTTP configuration support.

APIConfig

Configuration for API endpoints: - URL, method, headers - Authentication (Basic, Bearer, Digest) - Query parameters, timeouts, retries - SSL verification

PaginationConfig

Pagination configuration: - Offset-based pagination - Cursor-based pagination - Page-based pagination - JSON path configuration for data extraction

SQL Data Sources

SQLDataSource

SQL database connector using SQLAlchemy.

SQLConfig

SQL configuration: - Connection string (SQLAlchemy format) - Query string with parameterized queries - Pagination support

In-Memory Data Sources

InMemoryDataSource

For Python objects already in memory: - list[dict]: List of dictionaries - list[list]: List of lists (requires column names) - pd.DataFrame: Pandas DataFrame

Usage Examples

File Data Source

from graflo.data_source import DataSourceFactory

# Automatic type detection
source = DataSourceFactory.create_file_data_source(path="data.json")

# Explicit type with custom separator
source = DataSourceFactory.create_file_data_source(
    path="data.csv",
    file_type="table",
    sep="\t"
)

API Data Source

from graflo.data_source import DataSourceFactory, APIConfig, PaginationConfig

config = APIConfig(
    url="https://api.example.com/users",
    method="GET",
    headers={"Authorization": "Bearer token"},
    pagination=PaginationConfig(
        strategy="offset",
        page_size=100,
        data_path="data",
    ),
)

source = DataSourceFactory.create_api_data_source(config)

SQL Data Source

from graflo.data_source import DataSourceFactory, SQLConfig

config = SQLConfig(
    connection_string="postgresql://user:pass@localhost/db",
    query="SELECT * FROM users WHERE active = :active",
    params={"active": True},
)

source = DataSourceFactory.create_sql_data_source(config)

Using with Caster

from graflo import Caster, DataSourceRegistry
from graflo.caster import IngestionParams

registry = DataSourceRegistry()
registry.register(file_source, resource_name="users")
registry.register(api_source, resource_name="users")  # Multiple sources for same resource

caster = Caster(schema)

ingestion_params = IngestionParams(
    batch_size=1000,  # Process 1000 items per batch
    clean_start=False,  # Set to True to wipe existing database
)

caster.ingest_data_sources(
    data_source_registry=registry,
    conn_conf=conn_conf,
    ingestion_params=ingestion_params,
)