Example 6: REST API Data Source [coming]¶
This example demonstrates how to ingest data from a REST API endpoint into a graph database.
Scenario¶
Suppose you have a REST API that provides user data with pagination. You want to ingest this data into a graph database.
API Response Format¶
The API returns data in the following format:
{
"data": [
{"id": 1, "name": "Alice", "department": "Engineering"},
{"id": 2, "name": "Bob", "department": "Sales"}
],
"has_more": true,
"offset": 0,
"limit": 100,
"total": 250
}
Schema Definition¶
Define your schema as usual:
general:
name: users_api
vertices:
- name: person
fields:
- id
- name
- department
indexes:
- fields:
- id
resources:
- resource_name: users
apply:
- vertex: person
Using API Data Source¶
Python Code¶
from suthing import FileHandle
from graflo import Caster, DataSourceRegistry, Schema
from graflo.data_source import DataSourceFactory, APIConfig, PaginationConfig
from graflo.db.connection.onto import DBConfig
# Load schema
schema = Schema.from_dict(FileHandle.load("schema.yaml"))
# Create API configuration
api_config = APIConfig(
url="https://api.example.com/users",
method="GET",
headers={"Authorization": "Bearer your-token"},
pagination=PaginationConfig(
strategy="offset",
offset_param="offset",
limit_param="limit",
page_size=100,
has_more_path="has_more",
data_path="data",
),
)
# Create API data source
api_source = DataSourceFactory.create_api_data_source(api_config)
# Register with resource
registry = DataSourceRegistry()
registry.register(api_source, resource_name="users")
# Create caster and ingest
from graflo.caster import IngestionParams
caster = Caster(schema)
# Load config from file
config_data = FileHandle.load("db.yaml")
conn_conf = DBConfig.from_dict(config_data)
ingestion_params = IngestionParams(
batch_size=1000, # Process 1000 items per batch
)
caster.ingest_data_sources(
data_source_registry=registry,
conn_conf=conn_conf,
ingestion_params=ingestion_params,
)
Using Configuration File¶
Create a data source configuration file (data_sources.yaml):
data_sources:
- source_type: api
resource_name: users
config:
url: https://api.example.com/users
method: GET
headers:
Authorization: "Bearer your-token"
pagination:
strategy: offset
offset_param: offset
limit_param: limit
page_size: 100
has_more_path: has_more
data_path: data
Then use the CLI:
uv run ingest \
--db-config-path config/db.yaml \
--schema-path config/schema.yaml \
--data-source-config-path data_sources.yaml
Pagination Strategies¶
Offset-based Pagination¶
pagination = PaginationConfig(
strategy="offset",
offset_param="offset",
limit_param="limit",
page_size=100,
has_more_path="has_more",
data_path="data",
)
Cursor-based Pagination¶
pagination = PaginationConfig(
strategy="cursor",
cursor_param="next_cursor",
cursor_path="next_cursor",
page_size=100,
data_path="items",
)
Page-based Pagination¶
pagination = PaginationConfig(
strategy="page",
page_param="page",
per_page_param="per_page",
page_size=50,
data_path="results",
)
Authentication¶
Basic Authentication¶
api_config = APIConfig(
url="https://api.example.com/users",
auth={"type": "basic", "username": "user", "password": "pass"},
)
Bearer Token¶
api_config = APIConfig(
url="https://api.example.com/users",
auth={"type": "bearer", "token": "your-token"},
)
Custom Headers¶
api_config = APIConfig(
url="https://api.example.com/users",
headers={"X-API-Key": "your-api-key"},
)
Combining Multiple Data Sources¶
You can combine multiple data sources for the same resource:
registry = DataSourceRegistry()
# API source
api_source = DataSourceFactory.create_api_data_source(api_config)
registry.register(api_source, resource_name="users")
# File source
file_source = DataSourceFactory.create_file_data_source(path="users_backup.json")
registry.register(file_source, resource_name="users")
# Both will be processed and combined
from graflo.caster import IngestionParams
ingestion_params = IngestionParams() # Use default parameters
caster.ingest_data_sources(
data_source_registry=registry,
conn_conf=conn_conf,
ingestion_params=ingestion_params,
)