API connector and pagination¶
REST API ingestion follows the same bindings + conn_proxy pattern as SQL and SPARQL:
APIConnector(manifest) — endpoint path, HTTP options, static query params, andPaginationConfig(secret-free).RestApiConnConfig(runtime) —base_url,ApiAuth, and optional default headers, registered on aConnectionProviderunder aconn_proxylabel.APIDataSource(runtime) — HTTP executor built byRegistryBuilder; you do not construct it directly.
See also Creating a manifest — bindings and Explicit connector_connection proxy wiring.
Manifest shape¶
bindings:
connectors:
- name: users_api
path: /api/users
method: GET
params:
active: true
pagination:
request:
strategy: offset
offset_param: offset
limit_param: limit
page_size: 100
response:
records_path: data
has_more_path: has_more
resource_connector:
- resource: users
connector: users_api
connector_connection:
- connector: users_api
conn_proxy: api_source
Runtime (Python) — from environment variables (recommended for local dev and CI):
from graflo.hq.connection_provider import InMemoryConnectionProvider
provider = InMemoryConnectionProvider()
provider.register_all_api_configs_from_env(bindings=bindings)
With conn_proxy: api_source, set:
export API_SOURCE_BASE_URL=https://api.example.com
export API_SOURCE_AUTH_TYPE=bearer
export API_SOURCE_TOKEN=your-token
Each conn_proxy label maps to an env prefix: uppercase, - → _, trailing _
(e.g. user_service → USER_SERVICE_BASE_URL).
For multi-proxy manifests, register_all_api_configs_from_env registers every API
conn_proxy automatically. Override a prefix when needed:
provider.register_all_api_configs_from_env(
bindings=bindings,
env_prefix_map={"user_service": "USERS_API_"},
)
Register a single proxy explicitly:
Runtime (Python) — manual registration:
from graflo.hq.connection_provider import (
ApiAuth,
ApiGeneralizedConnConfig,
InMemoryConnectionProvider,
RestApiConnConfig,
)
provider = InMemoryConnectionProvider()
provider.register_generalized_config(
conn_proxy="api_source",
config=ApiGeneralizedConnConfig(
config=RestApiConnConfig(
base_url="https://api.example.com",
auth=ApiAuth(auth_type="bearer", token="..."),
)
),
)
provider.bind_from_bindings(bindings=bindings)
The resolved request URL is base_url.rstrip("/") + "/" + path.lstrip("/") — e.g. https://api.example.com/api/users.
Authentication (ApiAuth)¶
Credentials live in RestApiConnConfig, not in the manifest.
auth_type |
Fields | HTTP behaviour |
|---|---|---|
bearer (default) |
token, optional header_name (default Authorization), prefix (default Bearer) |
Header: {prefix} {token} |
basic |
username, password |
requests HTTP Basic auth |
digest |
username, password |
requests HTTP Digest auth |
api_key |
token, header_name (e.g. X-Api-Key) |
Header: raw token value |
Environment variables (RestApiConnConfig.from_env)¶
Call RestApiConnConfig.from_env(env_prefix) or use the provider helpers above.
env_prefix is required (no default). Supported variables:
| Env var (with prefix) | Required | Meaning |
|---|---|---|
BASE_URL |
yes | API base URL |
AUTH_TYPE |
no (default bearer) |
bearer, basic, digest, or api_key |
TOKEN |
when using bearer/api_key | Token or API key value |
USERNAME / PASSWORD |
when using basic/digest | Credentials |
HEADER_NAME |
no | Header for bearer/api_key (default Authorization) |
PREFIX |
no | Bearer prefix (default Bearer) |
AUTH_TYPE defaults to bearer when unset. Set TOKEN, USERNAME, or PASSWORD as needed for the chosen type.
Non-secret headers belong on APIConnector.headers or RestApiConnConfig.default_headers (connector headers override defaults).
Pagination overview¶
Pagination is declared on APIConnector.pagination as a PaginationConfig object. When pagination is omitted, GraFlo performs a single HTTP request and yields whatever JSON records APIDataSource extracts from the response.
When pagination is set, GraFlo loops: each iteration builds the request URL from base_url + path, merges static and pagination query parameters, parses the JSON body using pagination.response, yields records in iter_batches, then decides whether to fetch the next page.
flowchart TD
start[Start iter_batches]
build["Build request: config.url + static params + request pagination params"]
req[HTTP request]
parse[Parse JSON]
detect{response.auto_detect?}
resolve[Resolve unset response paths from first body]
extract["Extract records at records_path"]
yield[Yield batches]
more{More pages?}
start --> build --> req --> parse --> detect
detect -->|yes, first page| resolve --> extract
detect -->|no| extract
extract --> yield --> more
more -->|yes| advance["Advance offset/cursor from response or client increment"]
advance --> build
more -->|no| done[Done]
IngestionParams.batch_size overrides pagination.request.page_size when the connector defines pagination (same idea as SPARQL endpoint page size). That controls how many rows each API page requests, not the internal iter_batches chunk size (which still splits a page into smaller yield batches if needed).
iter_batches(..., limit=N) caps the total number of records read across all pages, not the number of HTTP calls.
PaginationConfig structure¶
PaginationConfig has two parts:
request— how to build paginated HTTP requests (PaginationRequestConfig)response— how to parse JSON response envelopes (ApiResponseStructure)
Next-page URLs are always constructed from the connector's base_url and path. The response may supply the next offset value or cursor token, but GraFlo does not follow response link fields such as next by default.
PaginationRequestConfig (pagination.request)¶
| Field | Default | Used by | Meaning |
|---|---|---|---|
strategy |
"offset" |
all | "offset", "page", or "cursor" |
offset_param |
"offset" |
offset | Query param for skip/offset |
limit_param |
"limit" |
offset | Query param name for page size; value comes from page_size |
page_param |
"page" |
page | Query param for 1-based page index |
per_page_param |
"per_page" |
page | Query param name for page size; value comes from page_size |
cursor_param |
"cursor" |
cursor | Query param for opaque cursor token |
initial_offset |
0 |
offset | First request offset |
initial_page |
1 |
page | First request page number |
initial_cursor |
null |
cursor | Send cursor on the first request when the API requires it |
page_size |
100 |
all | Records requested per HTTP call |
ApiResponseStructure (pagination.response)¶
| Field | Default | Meaning |
|---|---|---|
records_path |
null |
Dot path to the record list (e.g. results, 0.results for [{"results": [...]}]). Required for object envelopes unless auto_detect is enabled. |
total_count_path |
null |
Total items across all pages (e.g. count) |
offset_path |
null |
Echoed page start index (e.g. offset) |
next_offset_path |
null |
Server-provided next offset for the following request (e.g. next_offset) |
has_more_path |
null |
Boolean “more pages exist” flag (e.g. has_more) |
cursor_path |
null |
Next opaque cursor token |
batch_metadata_paths |
{} |
Map row annotation keys to response dot paths (e.g. _batch_id: result_id) |
auto_detect |
false |
Infer unset response paths from the first response body |
Dot paths use . segments. Each segment is either a dict key or a numeric list index (e.g. meta.next_cursor, 0.results). See Dot paths and response shapes for top-level arrays and array-wrapped envelopes.
When pagination is omitted, a top-level JSON array of record objects is accepted as-is. A top-level object without records_path (and without auto_detect) raises an error.
Dot paths and response shapes¶
GraFlo parses the JSON response body and walks dot paths with get_at_path: dict segments use key lookup; numeric segments index into lists (0 → first element).
Three common shapes:
| Shape | Example | Configuration |
|---|---|---|
| Object envelope | {"results": [{"id": 1}], "count": 100} |
records_path: results (or auto_detect: true) |
| Array of records | [{"id": 1}, {"id": 2}] |
Omit pagination, or leave records_path unset — each array element is a row |
| Array-wrapped envelope | [{"results": [{"id": 1}], "count": 100}] |
Prefix every path with the envelope index, e.g. records_path: 0.results |
Array-wrapped envelope¶
Some APIs return pagination metadata inside a single-element array instead of a plain object:
Set explicit paths with a leading 0. segment (index into the wrapper array):
pagination:
request:
strategy: offset
page_size: 100
response:
records_path: 0.results
total_count_path: 0.count
offset_path: 0.offset
next_offset_path: 0.next_offset
Nested paths combine freely: 0.meta.next_cursor reaches next_cursor inside body[0]["meta"].
auto_detect does not apply to array-wrapped envelopes. Detection runs only when the parsed body is a top-level object; a list body skips heuristics entirely. Configure 0.* paths manually.
Pagination metadata on list bodies: record extraction via 0.* paths works, but stop/advance helpers (has_more_path, next_offset_path, cursor_path, batch_metadata_paths) currently require a top-level object body. With an array-wrapped envelope, pagination falls back to “keep going while the extracted record list is non-empty” and offset advances by page_size rather than a server-provided next_offset. Prefer a plain object envelope from the API when possible; otherwise verify paging behaviour against your endpoint.
Stop conditions (evaluated in order)¶
| Priority | Configuration | Stop when |
|---|---|---|
| 1 | has_more_path set |
Boolean at path is falsy |
| 2 | next_offset_path set |
Value is absent or null after current page |
| 3 | total_count_path + offset_path set |
offset + len(records) >= count |
| 4 | cursor_path set (cursor strategy) |
Value is absent or empty |
| 5 | Fallback | Extracted records list is empty |
Advance rules (URL always base_url + path)¶
| Strategy | Advance rule |
|---|---|
| offset | If next_offset_path is set and present in response → use that value for offset_param. Else → offset += page_size. |
| page | page += 1 |
| cursor | Set cursor_param from value at cursor_path |
First-response heuristics (auto_detect: true)¶
When response.auto_detect is true, GraFlo inspects the first response body and fills any unset response paths. Detected paths are logged at INFO. Auto-detection applies only to top-level object envelopes — not to array-wrapped envelopes ([{...}]).
| Field | Candidate keys (priority order) |
|---|---|
records_path |
results, data, items, records, entries, rows; or sole top-level list-of-dicts key |
next_offset_path |
next_offset, nextOffset |
total_count_path |
count, total, total_count |
offset_path |
offset, skip |
has_more_path |
has_more, hasMore |
cursor_path |
next_cursor, cursor, next_page_token |
Link fields such as next are not inferred or followed.
Strategy: offset (default)¶
Best for APIs that accept offset + limit (or similarly named) query parameters.
Request loop:
- Set
offset_param=initial_offsetandlimit_param=page_size(names configurable). - Parse rows from
response.records_path. - Stop using the stop conditions above.
- Advance offset from
response.next_offset_pathwhen configured, else increment bypage_size.
Example API response:
Manifest:
pagination:
request:
strategy: offset
offset_param: skip # API uses ?skip=… instead of ?offset=…
limit_param: take
page_size: 50
initial_offset: 0
response:
records_path: data
has_more_path: has_more
Equivalent query progression: ?skip=0&take=50, then ?skip=50&take=50, …
Envelope with server-provided next offset¶
{
"count": 12345,
"offset": 0,
"results": [{"id": 1}, {"id": 2}],
"next_offset": 100,
"result_id": "batch-abc"
}
pagination:
request:
strategy: offset
offset_param: offset
limit_param: limit
page_size: 100
response:
records_path: results
total_count_path: count
offset_path: offset
next_offset_path: next_offset
batch_metadata_paths:
_batch_id: result_id
Request URLs stay base_url + path; only query params change (offset=0, then offset=100, …). Response fields such as next (URL links) are ignored.
Strategy: page¶
Best for APIs that use page + per_page (or page + limit) semantics with a 1-based page index.
Request loop:
- Set
page_param=initial_pageandper_page_param=page_size. - Parse rows from
response.records_path. - Stop using the stop conditions.
- Increment page by 1.
Example:
pagination:
request:
strategy: page
page_param: page
per_page_param: page_size
page_size: 25
initial_page: 1
response:
records_path: results.items
has_more_path: results.has_next_page
Strategy: cursor¶
Best for APIs that return an opaque next_cursor (or link token) instead of numeric offsets.
Request loop:
- First request: omit
cursor_paramunlessinitial_cursoris set. - Parse rows from
response.records_path. - Read the next token from
response.cursor_path. - Subsequent requests set
cursor_paramto that token. - Stop when
cursor_pathis missing or empty after a page.
Example API response:
Manifest:
pagination:
request:
strategy: cursor
cursor_param: cursor
page_size: 100
initial_cursor: null
response:
records_path: items
cursor_path: pagination.next_cursor
If the first call must include a cursor (some APIs use cursor=empty or a fixed start token), set initial_cursor accordingly.
Static query parameters¶
Use APIConnector.params for filters that do not change between pages (tenant id, active=true, API version flags). Pagination params are merged on top each iteration; static params are preserved.
Static row annotations¶
Use row_annotations on APIConnector (declared on the shared ResourceConnector base) to stamp constant fields onto every fetched row. Annotations are merged as defaults: if the API response already contains a key, the response value wins. Prefer _-prefixed keys (e.g. _src_type) to avoid colliding with payload fields.
Typical use: polymorphic edge queries where the HTTP response has uniform source / target columns but no vertex-type discriminator. Stamp types at the connector, then route with vertex_router in the resource pipeline:
bindings:
connector_templates:
- name: edge_query_base
path: /api/query
resource_name: polymorphic_edges
conn_proxy: api_source
pagination:
request:
strategy: offset
page_size: 100
conn_proxy: api_source # optional default for connectors without connector_connection
connectors:
- name: edge_typeA_typeB
base: edge_query_base
params:
query: 'search TypeA where ... show key as source, explode #RelationC:TypeB.key as target'
row_annotations:
_src_type: TypeA
_tgt_type: TypeB
_rel: RelationC
resources:
- name: polymorphic_edges
pipeline:
- type: vertex_router
type_field: _src_type
role: src
from: {id: source}
- type: vertex_router
type_field: _tgt_type
role: tgt
from: {id: target}
- type: edge
source_role: src
target_role: tgt
relation_field: _rel
row_annotations is only implemented for API connectors today; other connector types reject non-empty values.
Connector templates¶
Declare reusable defaults under bindings.connector_templates and reference them from connectors with base: <template_name>. Expansion happens when Bindings is loaded:
- Dict fields (
params,row_annotations,headers) are deep-merged (connector keys win). - Scalars and blocks such as
paginationare replaced when the connector entry provides them. - Template
conn_proxyauto-appendsconnector_connectionfor named connectors (explicit entries win). - Template
resource_nameis inherited unless overridden on the connector. - Top-level
conn_proxyonbindingsapplies to any connector without an explicitconnector_connectionmapping.
HTTP options on the connector¶
| Field | Default | Notes |
|---|---|---|
method |
GET |
Passed to requests |
timeout |
null |
Seconds; null = no timeout |
retries |
0 |
urllib3 retry count on 5xx |
retry_backoff_factor |
0.1 |
Backoff between retries |
retry_status_forcelist |
[500,502,503,504] |
Status codes to retry |
verify |
true |
TLS certificate verification |
Related¶
- Example 14 — API env wiring
- Data source reference — API
- Runtime connector updates — patch
APIConnectorfields viaConnectorUpdate(hash recomputes on change) - Quick Start — Using API Data Sources
Implementation: graflo.architecture.contract.bindings.APIConnector, PaginationConfig, graflo.data_source.api.APIDataSource, graflo.hq.connection_provider.