graflo.data_source¶
Data source abstraction layer for graph database ingestion.
This package provides a unified interface for different data source types, separating "where data comes from" (DataSource) from "how it's transformed" (Resource).
Key Components
- AbstractDataSource: Base class for all data sources
- FileDataSource: File-based data sources (JSON, JSONL, CSV/TSV)
- APIDataSource: REST API data source
- SQLDataSource: SQL database data source
- DataSourceRegistry: Maps DataSources to Resource names
Example
from graflo.data_source import FileDataSource, DataSourceRegistry source = FileDataSource(path="data.json", file_type="json") registry = DataSourceRegistry() registry.register(source, resource_name="users")
APIConfig
dataclass
¶
Bases: BaseDataclass
Configuration for REST API data source.
Attributes:
| Name | Type | Description |
|---|---|---|
url |
str
|
API endpoint URL |
method |
str
|
HTTP method (default: 'GET') |
headers |
dict[str, str]
|
HTTP headers as dictionary |
auth |
dict[str, Any] | None
|
Authentication configuration - For Basic auth: {'type': 'basic', 'username': '...', 'password': '...'} - For Bearer token: {'type': 'bearer', 'token': '...'} - For Digest auth: {'type': 'digest', 'username': '...', 'password': '...'} |
params |
dict[str, Any]
|
Query parameters as dictionary |
timeout |
float | None
|
Request timeout in seconds (default: None for no timeout) |
retries |
int
|
Number of retry attempts (default: 0) |
retry_backoff_factor |
float
|
Backoff factor for retries (default: 0.1) |
retry_status_forcelist |
list[int]
|
HTTP status codes to retry on (default: [500, 502, 503, 504]) |
verify |
bool
|
Verify SSL certificates (default: True) |
pagination |
PaginationConfig | None
|
Pagination configuration (default: None) |
Source code in graflo/data_source/api.py
APIDataSource
dataclass
¶
Bases: AbstractDataSource
Data source for REST API endpoints.
This class provides a data source for REST API endpoints, supporting full HTTP configuration, authentication, pagination, and retry logic. Returns JSON responses as hierarchical dictionaries, similar to JSON files.
Attributes:
| Name | Type | Description |
|---|---|---|
config |
APIConfig
|
API configuration |
Source code in graflo/data_source/api.py
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 | |
__post_init__()
¶
iter_batches(batch_size=1000, limit=None)
¶
Iterate over API data in batches.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size
|
int
|
Number of items per batch |
1000
|
limit
|
int | None
|
Maximum number of items to retrieve |
None
|
Yields:
| Type | Description |
|---|---|
list[dict]
|
list[dict]: Batches of documents as dictionaries |
Source code in graflo/data_source/api.py
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 | |
AbstractDataSource
dataclass
¶
Bases: BaseDataclass, ABC
Abstract base class for all data sources.
Data sources handle data retrieval from various sources and provide a unified interface for batch iteration. They are separate from Resources, which handle data transformation. Many DataSources can map to the same Resource.
Attributes:
| Name | Type | Description |
|---|---|---|
source_type |
DataSourceType
|
Type of the data source |
resource_name |
str | None
|
Name of the resource this data source maps to (set externally via DataSourceRegistry) |
Source code in graflo/data_source/base.py
resource_name
property
writable
¶
Get the resource name this data source maps to.
Returns:
| Type | Description |
|---|---|
str | None
|
Resource name or None if not set |
__iter__()
¶
Make data source iterable, yielding individual items.
Yields:
| Name | Type | Description |
|---|---|---|
dict |
Individual documents |
__post_init__()
¶
iter_batches(batch_size=1000, limit=None)
abstractmethod
¶
Iterate over data in batches.
This method yields batches of documents (dictionaries) from the data source. Each batch is a list of dictionaries representing the data items.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size
|
int
|
Number of items per batch |
1000
|
limit
|
int | None
|
Maximum number of items to retrieve (None for no limit) |
None
|
Yields:
| Type | Description |
|---|---|
list[dict]
|
list[dict]: Batches of documents as dictionaries |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
Must be implemented by subclasses |
Source code in graflo/data_source/base.py
DataSourceFactory
¶
Factory for creating data source instances.
This factory creates appropriate data source instances based on the provided configuration. It supports file-based, API, and SQL data sources.
Source code in graflo/data_source/factory.py
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 | |
create_api_data_source(config)
classmethod
¶
Create an API data source.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
APIConfig
|
API configuration |
required |
Returns:
| Type | Description |
|---|---|
APIDataSource
|
APIDataSource instance |
Source code in graflo/data_source/factory.py
create_data_source(source_type=None, **kwargs)
classmethod
¶
Create a data source of the specified type.
This is a general factory method that routes to specific factory methods based on the source type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_type
|
DataSourceType | str | None
|
Type of data source to create. If None, will be inferred from kwargs (e.g., 'path' -> FILE, 'data' -> IN_MEMORY, 'config' with url -> API) |
None
|
**kwargs
|
Any
|
Configuration parameters for the data source |
{}
|
Returns:
| Type | Description |
|---|---|
AbstractDataSource
|
Data source instance |
Raises:
| Type | Description |
|---|---|
ValueError
|
If source type is not supported or required parameters are missing |
Source code in graflo/data_source/factory.py
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 | |
create_data_source_from_config(config)
classmethod
¶
Create a data source from a configuration dictionary.
The configuration dict should contain: - 'source_type': Type of data source (FILE, API, SQL, IN_MEMORY) - Other parameters specific to the data source type
Examples:
File source: {"source_type": "file", "path": "data.json"} API source: {"source_type": "api", "config": {"url": "https://api.example.com"}} SQL source: {"source_type": "sql", "config": {"connection_string": "...", "query": "..."}} In-memory source:
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any]
|
Configuration dictionary |
required |
Returns:
| Type | Description |
|---|---|
AbstractDataSource
|
Data source instance |
Raises:
| Type | Description |
|---|---|
ValueError
|
If configuration is invalid |
Source code in graflo/data_source/factory.py
create_file_data_source(path, file_type=None, encoding=EncodingType.UTF_8, sep=None)
classmethod
¶
Create a file-based data source.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path | str
|
Path to the file |
required |
file_type
|
str | ChunkerType | None
|
Type of file ('json', 'jsonl', 'table', 'parquet') or ChunkerType. If None, will be guessed from file extension. |
None
|
encoding
|
EncodingType
|
File encoding (default: UTF_8) |
UTF_8
|
sep
|
str | None
|
Field separator for table files (default: ','). Only used for table files. |
None
|
Returns:
| Type | Description |
|---|---|
JsonFileDataSource | JsonlFileDataSource | TableFileDataSource | ParquetFileDataSource
|
Appropriate file data source instance (JsonFileDataSource, |
JsonFileDataSource | JsonlFileDataSource | TableFileDataSource | ParquetFileDataSource
|
JsonlFileDataSource, TableFileDataSource, or ParquetFileDataSource) |
Raises:
| Type | Description |
|---|---|
ValueError
|
If file type cannot be determined |
Source code in graflo/data_source/factory.py
create_in_memory_data_source(data, columns=None)
classmethod
¶
Create an in-memory data source.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
list[dict] | list[list] | DataFrame
|
Data to process (list[dict], list[list], or pd.DataFrame) |
required |
columns
|
list[str] | None
|
Optional column names for list[list] data |
None
|
Returns:
| Type | Description |
|---|---|
InMemoryDataSource
|
InMemoryDataSource instance |
Source code in graflo/data_source/factory.py
create_sql_data_source(config)
classmethod
¶
Create a SQL data source.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
SQLConfig
|
SQL configuration |
required |
Returns:
| Type | Description |
|---|---|
SQLDataSource
|
SQLDataSource instance |
Source code in graflo/data_source/factory.py
DataSourceRegistry
dataclass
¶
Bases: BaseDataclass
Registry for mapping data sources to resource names.
This class maintains a mapping from resource names to lists of data sources. Many data sources can map to the same resource, allowing data to be ingested from multiple sources and combined.
Attributes:
| Name | Type | Description |
|---|---|---|
sources |
dict[str, list[AbstractDataSource]]
|
Dictionary mapping resource names to lists of data sources |
Source code in graflo/data_source/registry.py
clear()
¶
get_all_data_sources()
¶
Get all registered data sources.
Returns:
| Type | Description |
|---|---|
list[AbstractDataSource]
|
List of all registered data sources |
Source code in graflo/data_source/registry.py
get_data_sources(resource_name)
¶
Get all data sources for a resource.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource_name
|
str
|
Name of the resource |
required |
Returns:
| Type | Description |
|---|---|
list[AbstractDataSource]
|
List of data sources for the resource (empty list if none found) |
Source code in graflo/data_source/registry.py
has_resource(resource_name)
¶
Check if a resource has any data sources.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource_name
|
str
|
Name of the resource |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the resource has data sources, False otherwise |
Source code in graflo/data_source/registry.py
register(data_source, resource_name)
¶
Register a data source for a resource.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_source
|
AbstractDataSource
|
Data source to register |
required |
resource_name
|
str
|
Name of the resource to map to |
required |
Source code in graflo/data_source/registry.py
DataSourceType
¶
Bases: BaseEnum
Types of data sources supported by the system.
FILE: File-based data sources (JSON, JSONL, CSV/TSV) API: REST API data sources SQL: SQL database data sources IN_MEMORY: In-memory data sources (lists, DataFrames)
Source code in graflo/data_source/base.py
FileDataSource
dataclass
¶
Bases: AbstractDataSource
Base class for file-based data sources.
This class provides a common interface for file-based data sources, integrating with the existing chunker system for batch processing.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path | str
|
Path to the file |
file_type |
str | None
|
Type of file (json, jsonl, table) |
encoding |
EncodingType
|
File encoding (default: UTF_8) |
Source code in graflo/data_source/file.py
__post_init__()
¶
iter_batches(batch_size=1000, limit=None)
¶
Iterate over file data in batches.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size
|
int
|
Number of items per batch |
1000
|
limit
|
int | None
|
Maximum number of items to retrieve |
None
|
Yields:
| Type | Description |
|---|---|
list[dict]
|
list[dict]: Batches of documents as dictionaries |
Source code in graflo/data_source/file.py
InMemoryDataSource
dataclass
¶
Bases: AbstractDataSource
Data source for in-memory data structures.
This class provides a data source for Python objects that are already in memory, including lists of dictionaries, lists of lists, and Pandas DataFrames.
Attributes:
| Name | Type | Description |
|---|---|---|
data |
list[dict] | list[list] | DataFrame
|
Data to process (list[dict], list[list], or pd.DataFrame) |
columns |
list[str] | None
|
Optional column names for list[list] data |
Source code in graflo/data_source/memory.py
__post_init__()
¶
iter_batches(batch_size=1000, limit=None)
¶
Iterate over in-memory data in batches.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size
|
int
|
Number of items per batch |
1000
|
limit
|
int | None
|
Maximum number of items to retrieve |
None
|
Yields:
| Type | Description |
|---|---|
list[dict]
|
list[dict]: Batches of documents as dictionaries |
Source code in graflo/data_source/memory.py
JsonFileDataSource
dataclass
¶
Bases: FileDataSource
Data source for JSON files.
JSON files are expected to contain hierarchical data structures, similar to REST API responses. The chunker handles nested structures and converts them to dictionaries.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path | str
|
Path to the JSON file |
encoding |
EncodingType
|
File encoding (default: UTF_8) |
Source code in graflo/data_source/file.py
JsonlFileDataSource
dataclass
¶
Bases: FileDataSource
Data source for JSONL (JSON Lines) files.
JSONL files contain one JSON object per line, making them suitable for streaming and batch processing.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path | str
|
Path to the JSONL file |
encoding |
EncodingType
|
File encoding (default: UTF_8) |
Source code in graflo/data_source/file.py
PaginationConfig
dataclass
¶
Bases: BaseDataclass
Configuration for API pagination.
Supports multiple pagination strategies: - offset: Offset-based pagination (offset, limit) - cursor: Cursor-based pagination (cursor parameter) - page: Page-based pagination (page, per_page)
Attributes:
| Name | Type | Description |
|---|---|---|
strategy |
str
|
Pagination strategy ('offset', 'cursor', 'page') |
offset_param |
str
|
Parameter name for offset (default: 'offset') |
limit_param |
str
|
Parameter name for limit (default: 'limit') |
cursor_param |
str
|
Parameter name for cursor (default: 'cursor') |
page_param |
str
|
Parameter name for page (default: 'page') |
per_page_param |
str
|
Parameter name for per_page (default: 'per_page') |
initial_offset |
int
|
Initial offset value (default: 0) |
initial_page |
int
|
Initial page value (default: 1) |
page_size |
int
|
Number of items per page (default: 100) |
cursor_path |
str | None
|
JSON path to cursor in response (for cursor-based) |
has_more_path |
str | None
|
JSON path to has_more flag in response |
data_path |
str | None
|
JSON path to data array in response (default: root) |
Source code in graflo/data_source/api.py
SQLConfig
dataclass
¶
Bases: BaseDataclass
Configuration for SQL data source.
Uses SQLAlchemy connection string format.
Attributes:
| Name | Type | Description |
|---|---|---|
connection_string |
str
|
SQLAlchemy connection string (e.g., 'postgresql://user:pass@localhost/dbname') |
query |
str
|
SQL query string (supports parameterized queries) |
params |
dict[str, Any]
|
Query parameters as dictionary (for parameterized queries) |
pagination |
bool
|
Whether to use pagination (default: True) |
page_size |
int
|
Number of rows per page (default: 1000) |
Source code in graflo/data_source/sql.py
SQLDataSource
dataclass
¶
Bases: AbstractDataSource
Data source for SQL databases.
This class provides a data source for SQL databases using SQLAlchemy. It supports parameterized queries and pagination. Returns rows as dictionaries with column names as keys.
Attributes:
| Name | Type | Description |
|---|---|---|
config |
SQLConfig
|
SQL configuration |
engine |
Engine | None
|
SQLAlchemy engine (created on first use) |
Source code in graflo/data_source/sql.py
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 | |
__post_init__()
¶
iter_batches(batch_size=1000, limit=None)
¶
Iterate over SQL query results in batches.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size
|
int
|
Number of items per batch |
1000
|
limit
|
int | None
|
Maximum number of items to retrieve |
None
|
Yields:
| Type | Description |
|---|---|
list[dict]
|
list[dict]: Batches of rows as dictionaries |
Source code in graflo/data_source/sql.py
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 | |
TableFileDataSource
dataclass
¶
Bases: FileDataSource
Data source for CSV/TSV files.
Table files are converted to dictionaries with column headers as keys. Each row becomes a dictionary.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path | str
|
Path to the CSV/TSV file |
encoding |
EncodingType
|
File encoding (default: UTF_8) |
sep |
str
|
Field separator (default: ',') |