Skip to content

graflo.data_source.api_response

API response envelope parsing and optional first-response auto-detection.

ResolvedApiResponse

Bases: ApiResponseStructure

Response paths resolved after optional auto-detection on the first body.

Source code in graflo/data_source/api_response.py
class ResolvedApiResponse(ApiResponseStructure):
    """Response paths resolved after optional auto-detection on the first body."""

    @classmethod
    def resolve(
        cls,
        config: ApiResponseStructure,
        body: dict[str, Any] | list[Any],
    ) -> ResolvedApiResponse:
        detected: dict[str, str] = {}
        if config.auto_detect and isinstance(body, dict):
            detected = detect_response_shape(body)
            if detected:
                logger.info("Auto-detected API response paths: %s", detected)

        return cls(
            records_path=config.records_path or detected.get("records_path"),
            total_count_path=config.total_count_path
            or detected.get("total_count_path"),
            offset_path=config.offset_path or detected.get("offset_path"),
            next_offset_path=config.next_offset_path
            or detected.get("next_offset_path"),
            has_more_path=config.has_more_path or detected.get("has_more_path"),
            cursor_path=config.cursor_path or detected.get("cursor_path"),
            batch_metadata_paths=dict(config.batch_metadata_paths),
            auto_detect=False,
        )

detect_response_shape(body)

Infer unset response path keys from a top-level response envelope.

Source code in graflo/data_source/api_response.py
def detect_response_shape(body: dict[str, Any]) -> dict[str, str]:
    """Infer unset response path keys from a top-level response envelope."""
    detected: dict[str, str] = {}

    records_path = _detect_records_path(body)
    if records_path is not None:
        detected["records_path"] = records_path

    for field, candidates in (
        ("next_offset_path", NEXT_OFFSET_CANDIDATES),
        ("total_count_path", TOTAL_COUNT_CANDIDATES),
        ("offset_path", OFFSET_CANDIDATES),
        ("has_more_path", HAS_MORE_CANDIDATES),
        ("cursor_path", CURSOR_CANDIDATES),
    ):
        key = _detect_top_level_key(body, candidates)
        if key is not None:
            detected[field] = key

    return detected

extract_records(body, resolved)

Extract record dicts from a parsed JSON body.

Source code in graflo/data_source/api_response.py
def extract_records(
    body: dict[str, Any] | list[Any],
    resolved: ResolvedApiResponse,
) -> list[dict[str, Any]]:
    """Extract record dicts from a parsed JSON body."""
    if resolved.records_path is None:
        if isinstance(body, list):
            return [_as_record(item) for item in body]
        raise ValueError(
            "API response is an object envelope but response.records_path is not "
            "configured. Set response.records_path or enable response.auto_detect."
        )

    data = get_at_path(body, resolved.records_path)
    if data is None:
        return []
    if isinstance(data, list):
        return [_as_record(item) for item in data]
    if isinstance(data, dict):
        return [cast(dict[str, Any], data)]
    return []

get_at_path(obj, path)

Resolve a dot-separated path against a JSON-like object.

Source code in graflo/data_source/api_response.py
def get_at_path(obj: object, path: str | None) -> object | None:
    """Resolve a dot-separated path against a JSON-like object."""
    if path is None:
        return None

    data: object = obj
    for part in path.split("."):
        if isinstance(data, dict):
            mapping = cast(dict[str, Any], data)
            data = mapping.get(part)
        elif isinstance(data, list):
            try:
                data = data[int(part)]
            except (ValueError, IndexError):
                return None
        else:
            return None
        if data is None:
            return None
    return data

get_batch_metadata(body, resolved)

Read batch-level metadata from the response envelope.

Source code in graflo/data_source/api_response.py
def get_batch_metadata(
    body: dict[str, Any] | list[Any],
    resolved: ResolvedApiResponse,
) -> dict[str, Any]:
    """Read batch-level metadata from the response envelope."""
    if not isinstance(body, dict) or not resolved.batch_metadata_paths:
        return {}

    metadata: dict[str, Any] = {}
    for annotation_key, response_path in resolved.batch_metadata_paths.items():
        value = get_at_path(body, response_path)
        if value is not None:
            metadata[annotation_key] = value
    return metadata

has_more_pages(body, resolved, items, *, strategy)

Return whether another HTTP page should be fetched.

Source code in graflo/data_source/api_response.py
def has_more_pages(
    body: dict[str, Any] | list[Any],
    resolved: ResolvedApiResponse,
    items: list[dict[str, Any]],
    *,
    strategy: str,
) -> bool:
    """Return whether another HTTP page should be fetched."""
    if not isinstance(body, dict):
        return len(items) > 0

    if resolved.has_more_path is not None:
        return bool(get_at_path(body, resolved.has_more_path))

    if resolved.next_offset_path is not None:
        return get_at_path(body, resolved.next_offset_path) is not None

    if resolved.total_count_path is not None and resolved.offset_path is not None:
        count = get_at_path(body, resolved.total_count_path)
        offset = get_at_path(body, resolved.offset_path)
        if count is not None and offset is not None:
            return _as_int(offset) + len(items) < _as_int(count)

    if strategy == "cursor" and resolved.cursor_path is not None:
        cursor = get_at_path(body, resolved.cursor_path)
        return cursor is not None and str(cursor) != ""

    return len(items) > 0

next_cursor_value(body, resolved)

Read the next cursor token from the response when configured.

Source code in graflo/data_source/api_response.py
def next_cursor_value(
    body: dict[str, Any] | list[Any],
    resolved: ResolvedApiResponse,
) -> str | None:
    """Read the next cursor token from the response when configured."""
    if not isinstance(body, dict) or resolved.cursor_path is None:
        return None

    value = get_at_path(body, resolved.cursor_path)
    if value is None:
        return None
    token = str(value)
    return token if token else None

next_offset_value(body, resolved)

Read the next offset from the response when configured.

Source code in graflo/data_source/api_response.py
def next_offset_value(
    body: dict[str, Any] | list[Any],
    resolved: ResolvedApiResponse,
) -> int | None:
    """Read the next offset from the response when configured."""
    if not isinstance(body, dict) or resolved.next_offset_path is None:
        return None

    value = get_at_path(body, resolved.next_offset_path)
    if value is None:
        return None
    return _as_int(value)