Skip to content

graflo.data_source

Data source abstraction layer for graph database ingestion.

This package provides a unified interface for different data source types, separating "where data comes from" (DataSource) from "how it's transformed" (Resource).

Key Components
  • AbstractDataSource: Base class for all data sources
  • FileDataSource: File-based data sources (JSON, JSONL, CSV/TSV)
  • APIDataSource: REST API data source
  • SQLDataSource: SQL database data source
  • DataSourceRegistry: Maps DataSources to Resource names
Example

from graflo.data_source import FileDataSource, DataSourceRegistry source = FileDataSource(path="data.json", file_type="json") registry = DataSourceRegistry() registry.register(source, resource_name="users")

APIConfig dataclass

Bases: BaseDataclass

Configuration for REST API data source.

Attributes:

Name Type Description
url str

API endpoint URL

method str

HTTP method (default: 'GET')

headers dict[str, str]

HTTP headers as dictionary

auth dict[str, Any] | None

Authentication configuration - For Basic auth: {'type': 'basic', 'username': '...', 'password': '...'} - For Bearer token: {'type': 'bearer', 'token': '...'} - For Digest auth: {'type': 'digest', 'username': '...', 'password': '...'}

params dict[str, Any]

Query parameters as dictionary

timeout float | None

Request timeout in seconds (default: None for no timeout)

retries int

Number of retry attempts (default: 0)

retry_backoff_factor float

Backoff factor for retries (default: 0.1)

retry_status_forcelist list[int]

HTTP status codes to retry on (default: [500, 502, 503, 504])

verify bool

Verify SSL certificates (default: True)

pagination PaginationConfig | None

Pagination configuration (default: None)

Source code in graflo/data_source/api.py
@dataclasses.dataclass
class APIConfig(BaseDataclass):
    """Configuration for REST API data source.

    Attributes:
        url: API endpoint URL
        method: HTTP method (default: 'GET')
        headers: HTTP headers as dictionary
        auth: Authentication configuration
            - For Basic auth: {'type': 'basic', 'username': '...', 'password': '...'}
            - For Bearer token: {'type': 'bearer', 'token': '...'}
            - For Digest auth: {'type': 'digest', 'username': '...', 'password': '...'}
        params: Query parameters as dictionary
        timeout: Request timeout in seconds (default: None for no timeout)
        retries: Number of retry attempts (default: 0)
        retry_backoff_factor: Backoff factor for retries (default: 0.1)
        retry_status_forcelist: HTTP status codes to retry on (default: [500, 502, 503, 504])
        verify: Verify SSL certificates (default: True)
        pagination: Pagination configuration (default: None)
    """

    url: str
    method: str = "GET"
    headers: dict[str, str] = dataclasses.field(default_factory=dict)
    auth: dict[str, Any] | None = None
    params: dict[str, Any] = dataclasses.field(default_factory=dict)
    timeout: float | None = None
    retries: int = 0
    retry_backoff_factor: float = 0.1
    retry_status_forcelist: list[int] = dataclasses.field(
        default_factory=lambda: [500, 502, 503, 504]
    )
    verify: bool = True
    pagination: PaginationConfig | None = None

APIDataSource dataclass

Bases: AbstractDataSource

Data source for REST API endpoints.

This class provides a data source for REST API endpoints, supporting full HTTP configuration, authentication, pagination, and retry logic. Returns JSON responses as hierarchical dictionaries, similar to JSON files.

Attributes:

Name Type Description
config APIConfig

API configuration

Source code in graflo/data_source/api.py
@dataclasses.dataclass
class APIDataSource(AbstractDataSource):
    """Data source for REST API endpoints.

    This class provides a data source for REST API endpoints, supporting
    full HTTP configuration, authentication, pagination, and retry logic.
    Returns JSON responses as hierarchical dictionaries, similar to JSON files.

    Attributes:
        config: API configuration
    """

    config: APIConfig

    def __post_init__(self):
        """Initialize the API data source."""
        self.source_type = DataSourceType.API

    def _create_session(self) -> requests.Session:
        """Create a requests session with retry configuration.

        Returns:
            Configured requests session
        """
        session = requests.Session()

        # Configure retries
        if self.config.retries > 0:
            retry_strategy = Retry(
                total=self.config.retries,
                backoff_factor=self.config.retry_backoff_factor,
                status_forcelist=self.config.retry_status_forcelist,
            )
            adapter = HTTPAdapter(max_retries=retry_strategy)
            session.mount("http://", adapter)
            session.mount("https://", adapter)

        # Configure authentication
        if self.config.auth:
            auth_type = self.config.auth.get("type", "").lower()
            if auth_type == "basic":
                session.auth = HTTPBasicAuth(
                    self.config.auth.get("username", ""),
                    self.config.auth.get("password", ""),
                )
            elif auth_type == "digest":
                session.auth = HTTPDigestAuth(
                    self.config.auth.get("username", ""),
                    self.config.auth.get("password", ""),
                )
            elif auth_type == "bearer":
                token = self.config.auth.get("token", "")
                session.headers["Authorization"] = f"Bearer {token}"

        # Set headers
        session.headers.update(self.config.headers)

        return session

    def _extract_data(self, response: dict | list) -> list[dict]:
        """Extract data array from API response.

        Args:
            response: API response as dictionary or list

        Returns:
            List of data items
        """
        if self.config.pagination and self.config.pagination.data_path:
            # Navigate JSON path
            parts = self.config.pagination.data_path.split(".")
            data = response
            for part in parts:
                if isinstance(data, dict):
                    data = data.get(part)
                elif isinstance(data, list):
                    data = data[int(part)]
                else:
                    return []
            if isinstance(data, list):
                return data
            elif isinstance(data, dict):
                return [data]
            else:
                return []
        else:
            # Root level data
            if isinstance(response, list):
                return response
            elif isinstance(response, dict):
                return [response]
            else:
                return []

    def _has_more(self, response: dict) -> bool:
        """Check if there are more pages to fetch.

        Args:
            response: API response as dictionary

        Returns:
            True if there are more pages
        """
        if not self.config.pagination:
            return False

        if self.config.pagination.has_more_path:
            parts = self.config.pagination.has_more_path.split(".")
            value = response
            for part in parts:
                if isinstance(value, dict):
                    value = value.get(part)
                else:
                    return False
            return bool(value)

        # Default: check if data array is not empty
        data = self._extract_data(response)
        return len(data) > 0

    def _get_next_cursor(self, response: dict) -> str | None:
        """Get next cursor from response.

        Args:
            response: API response as dictionary

        Returns:
            Next cursor value or None
        """
        if not self.config.pagination or not self.config.pagination.cursor_path:
            return None

        parts = self.config.pagination.cursor_path.split(".")
        value = response
        for part in parts:
            if isinstance(value, dict):
                value = value.get(part)
            else:
                return None
        return str(value) if value is not None else None

    def iter_batches(
        self, batch_size: int = 1000, limit: int | None = None
    ) -> Iterator[list[dict]]:
        """Iterate over API data in batches.

        Args:
            batch_size: Number of items per batch
            limit: Maximum number of items to retrieve

        Yields:
            list[dict]: Batches of documents as dictionaries
        """
        session = self._create_session()
        total_items = 0

        try:
            # Initialize pagination state
            offset = (
                self.config.pagination.initial_offset if self.config.pagination else 0
            )
            page = self.config.pagination.initial_page if self.config.pagination else 1
            cursor: str | None = None

            while True:
                # Build request parameters
                params = self.config.params.copy()

                # Add pagination parameters
                if self.config.pagination:
                    if self.config.pagination.strategy == "offset":
                        params[self.config.pagination.offset_param] = offset
                        params[self.config.pagination.limit_param] = (
                            self.config.pagination.page_size
                        )
                    elif self.config.pagination.strategy == "page":
                        params[self.config.pagination.page_param] = page
                        params[self.config.pagination.per_page_param] = (
                            self.config.pagination.page_size
                        )
                    elif self.config.pagination.strategy == "cursor" and cursor:
                        params[self.config.pagination.cursor_param] = cursor

                # Make request
                try:
                    response = session.request(
                        method=self.config.method,
                        url=self.config.url,
                        params=params,
                        timeout=self.config.timeout,
                        verify=self.config.verify,
                    )
                    response.raise_for_status()
                    data = response.json()
                except requests.RequestException as e:
                    logger.error(f"API request failed: {e}")
                    break

                # Extract data from response
                items = self._extract_data(data)

                # Process items in batches
                batch = []
                for item in items:
                    if limit and total_items >= limit:
                        break
                    batch.append(item)
                    total_items += 1

                    if len(batch) >= batch_size:
                        yield batch
                        batch = []

                # Yield remaining items
                if batch:
                    yield batch

                # Check if we should continue
                if limit and total_items >= limit:
                    break

                # Update pagination state
                if self.config.pagination:
                    if self.config.pagination.strategy == "offset":
                        if not self._has_more(data):
                            break
                        offset += self.config.pagination.page_size
                    elif self.config.pagination.strategy == "page":
                        if not self._has_more(data):
                            break
                        page += 1
                    elif self.config.pagination.strategy == "cursor":
                        cursor = self._get_next_cursor(data)
                        if not cursor:
                            break
                else:
                    # No pagination, single request
                    break

        finally:
            session.close()

__post_init__()

Initialize the API data source.

Source code in graflo/data_source/api.py
def __post_init__(self):
    """Initialize the API data source."""
    self.source_type = DataSourceType.API

iter_batches(batch_size=1000, limit=None)

Iterate over API data in batches.

Parameters:

Name Type Description Default
batch_size int

Number of items per batch

1000
limit int | None

Maximum number of items to retrieve

None

Yields:

Type Description
list[dict]

list[dict]: Batches of documents as dictionaries

Source code in graflo/data_source/api.py
def iter_batches(
    self, batch_size: int = 1000, limit: int | None = None
) -> Iterator[list[dict]]:
    """Iterate over API data in batches.

    Args:
        batch_size: Number of items per batch
        limit: Maximum number of items to retrieve

    Yields:
        list[dict]: Batches of documents as dictionaries
    """
    session = self._create_session()
    total_items = 0

    try:
        # Initialize pagination state
        offset = (
            self.config.pagination.initial_offset if self.config.pagination else 0
        )
        page = self.config.pagination.initial_page if self.config.pagination else 1
        cursor: str | None = None

        while True:
            # Build request parameters
            params = self.config.params.copy()

            # Add pagination parameters
            if self.config.pagination:
                if self.config.pagination.strategy == "offset":
                    params[self.config.pagination.offset_param] = offset
                    params[self.config.pagination.limit_param] = (
                        self.config.pagination.page_size
                    )
                elif self.config.pagination.strategy == "page":
                    params[self.config.pagination.page_param] = page
                    params[self.config.pagination.per_page_param] = (
                        self.config.pagination.page_size
                    )
                elif self.config.pagination.strategy == "cursor" and cursor:
                    params[self.config.pagination.cursor_param] = cursor

            # Make request
            try:
                response = session.request(
                    method=self.config.method,
                    url=self.config.url,
                    params=params,
                    timeout=self.config.timeout,
                    verify=self.config.verify,
                )
                response.raise_for_status()
                data = response.json()
            except requests.RequestException as e:
                logger.error(f"API request failed: {e}")
                break

            # Extract data from response
            items = self._extract_data(data)

            # Process items in batches
            batch = []
            for item in items:
                if limit and total_items >= limit:
                    break
                batch.append(item)
                total_items += 1

                if len(batch) >= batch_size:
                    yield batch
                    batch = []

            # Yield remaining items
            if batch:
                yield batch

            # Check if we should continue
            if limit and total_items >= limit:
                break

            # Update pagination state
            if self.config.pagination:
                if self.config.pagination.strategy == "offset":
                    if not self._has_more(data):
                        break
                    offset += self.config.pagination.page_size
                elif self.config.pagination.strategy == "page":
                    if not self._has_more(data):
                        break
                    page += 1
                elif self.config.pagination.strategy == "cursor":
                    cursor = self._get_next_cursor(data)
                    if not cursor:
                        break
            else:
                # No pagination, single request
                break

    finally:
        session.close()

AbstractDataSource dataclass

Bases: BaseDataclass, ABC

Abstract base class for all data sources.

Data sources handle data retrieval from various sources and provide a unified interface for batch iteration. They are separate from Resources, which handle data transformation. Many DataSources can map to the same Resource.

Attributes:

Name Type Description
source_type DataSourceType

Type of the data source

resource_name str | None

Name of the resource this data source maps to (set externally via DataSourceRegistry)

Source code in graflo/data_source/base.py
class AbstractDataSource(BaseDataclass, abc.ABC):
    """Abstract base class for all data sources.

    Data sources handle data retrieval from various sources and provide
    a unified interface for batch iteration. They are separate from Resources,
    which handle data transformation. Many DataSources can map to the same Resource.

    Attributes:
        source_type: Type of the data source
        resource_name: Name of the resource this data source maps to
            (set externally via DataSourceRegistry)
    """

    source_type: DataSourceType

    def __post_init__(self):
        """Initialize the data source after dataclass initialization."""
        self._resource_name: str | None = None

    @property
    def resource_name(self) -> str | None:
        """Get the resource name this data source maps to.

        Returns:
            Resource name or None if not set
        """
        return self._resource_name

    @resource_name.setter
    def resource_name(self, value: str | None):
        """Set the resource name this data source maps to.

        Args:
            value: Resource name to set
        """
        self._resource_name = value

    @abc.abstractmethod
    def iter_batches(
        self, batch_size: int = 1000, limit: int | None = None
    ) -> Iterator[list[dict]]:
        """Iterate over data in batches.

        This method yields batches of documents (dictionaries) from the data source.
        Each batch is a list of dictionaries representing the data items.

        Args:
            batch_size: Number of items per batch
            limit: Maximum number of items to retrieve (None for no limit)

        Yields:
            list[dict]: Batches of documents as dictionaries

        Raises:
            NotImplementedError: Must be implemented by subclasses
        """
        raise NotImplementedError("Subclasses must implement iter_batches")

    def __iter__(self):
        """Make data source iterable, yielding individual items.

        Yields:
            dict: Individual documents
        """
        for batch in self.iter_batches(batch_size=1, limit=None):
            for item in batch:
                yield item

resource_name property writable

Get the resource name this data source maps to.

Returns:

Type Description
str | None

Resource name or None if not set

__iter__()

Make data source iterable, yielding individual items.

Yields:

Name Type Description
dict

Individual documents

Source code in graflo/data_source/base.py
def __iter__(self):
    """Make data source iterable, yielding individual items.

    Yields:
        dict: Individual documents
    """
    for batch in self.iter_batches(batch_size=1, limit=None):
        for item in batch:
            yield item

__post_init__()

Initialize the data source after dataclass initialization.

Source code in graflo/data_source/base.py
def __post_init__(self):
    """Initialize the data source after dataclass initialization."""
    self._resource_name: str | None = None

iter_batches(batch_size=1000, limit=None) abstractmethod

Iterate over data in batches.

This method yields batches of documents (dictionaries) from the data source. Each batch is a list of dictionaries representing the data items.

Parameters:

Name Type Description Default
batch_size int

Number of items per batch

1000
limit int | None

Maximum number of items to retrieve (None for no limit)

None

Yields:

Type Description
list[dict]

list[dict]: Batches of documents as dictionaries

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses

Source code in graflo/data_source/base.py
@abc.abstractmethod
def iter_batches(
    self, batch_size: int = 1000, limit: int | None = None
) -> Iterator[list[dict]]:
    """Iterate over data in batches.

    This method yields batches of documents (dictionaries) from the data source.
    Each batch is a list of dictionaries representing the data items.

    Args:
        batch_size: Number of items per batch
        limit: Maximum number of items to retrieve (None for no limit)

    Yields:
        list[dict]: Batches of documents as dictionaries

    Raises:
        NotImplementedError: Must be implemented by subclasses
    """
    raise NotImplementedError("Subclasses must implement iter_batches")

DataSourceFactory

Factory for creating data source instances.

This factory creates appropriate data source instances based on the provided configuration. It supports file-based, API, and SQL data sources.

Source code in graflo/data_source/factory.py
class DataSourceFactory:
    """Factory for creating data source instances.

    This factory creates appropriate data source instances based on the
    provided configuration. It supports file-based, API, and SQL data sources.
    """

    @staticmethod
    def _guess_file_type(filename: Path) -> ChunkerType:
        """Guess the file type based on file extension.

        Args:
            filename: Path to the file

        Returns:
            ChunkerType: Guessed file type

        Raises:
            ValueError: If file extension is not recognized
        """
        return ChunkerFactory._guess_chunker_type(filename)

    @classmethod
    def create_file_data_source(
        cls,
        path: Path | str,
        file_type: str | ChunkerType | None = None,
        encoding: EncodingType = EncodingType.UTF_8,
        sep: str | None = None,
    ) -> (
        JsonFileDataSource
        | JsonlFileDataSource
        | TableFileDataSource
        | ParquetFileDataSource
    ):
        """Create a file-based data source.

        Args:
            path: Path to the file
            file_type: Type of file ('json', 'jsonl', 'table', 'parquet') or ChunkerType.
                If None, will be guessed from file extension.
            encoding: File encoding (default: UTF_8)
            sep: Field separator for table files (default: ',').
                Only used for table files.

        Returns:
            Appropriate file data source instance (JsonFileDataSource,
            JsonlFileDataSource, TableFileDataSource, or ParquetFileDataSource)

        Raises:
            ValueError: If file type cannot be determined
        """
        if isinstance(path, str):
            path = Path(path)

        # Determine file type
        if file_type is None:
            try:
                file_type_enum = cls._guess_file_type(path)
            except ValueError as e:
                raise ValueError(
                    f"Could not determine file type for {path}. "
                    f"Please specify file_type explicitly. Error: {e}"
                )
        elif isinstance(file_type, str):
            file_type_enum = ChunkerType(file_type.lower())
        else:
            file_type_enum = file_type

        # Create appropriate data source
        if file_type_enum == ChunkerType.JSON:
            return JsonFileDataSource(path=path, encoding=encoding)
        elif file_type_enum == ChunkerType.JSONL:
            return JsonlFileDataSource(path=path, encoding=encoding)
        elif file_type_enum == ChunkerType.TABLE:
            # sep is only for table files
            return TableFileDataSource(path=path, encoding=encoding, sep=sep or ",")
        elif file_type_enum == ChunkerType.PARQUET:
            return ParquetFileDataSource(path=path)
        else:
            raise ValueError(f"Unsupported file type: {file_type_enum}")

    @classmethod
    def create_api_data_source(cls, config: APIConfig) -> APIDataSource:
        """Create an API data source.

        Args:
            config: API configuration

        Returns:
            APIDataSource instance
        """
        return APIDataSource(config=config)

    @classmethod
    def create_sql_data_source(cls, config: SQLConfig) -> SQLDataSource:
        """Create a SQL data source.

        Args:
            config: SQL configuration

        Returns:
            SQLDataSource instance
        """
        return SQLDataSource(config=config)

    @classmethod
    def create_in_memory_data_source(
        cls,
        data: list[dict] | list[list] | pd.DataFrame,
        columns: list[str] | None = None,
    ) -> InMemoryDataSource:
        """Create an in-memory data source.

        Args:
            data: Data to process (list[dict], list[list], or pd.DataFrame)
            columns: Optional column names for list[list] data

        Returns:
            InMemoryDataSource instance
        """
        return InMemoryDataSource(data=data, columns=columns)

    @classmethod
    def create_data_source(
        cls,
        source_type: DataSourceType | str | None = None,
        **kwargs: Any,
    ) -> AbstractDataSource:
        """Create a data source of the specified type.

        This is a general factory method that routes to specific factory methods
        based on the source type.

        Args:
            source_type: Type of data source to create. If None, will be inferred
                from kwargs (e.g., 'path' -> FILE, 'data' -> IN_MEMORY, 'config' with url -> API)
            **kwargs: Configuration parameters for the data source

        Returns:
            Data source instance

        Raises:
            ValueError: If source type is not supported or required parameters are missing
        """
        # Auto-detect source type if not provided
        if source_type is None:
            if "path" in kwargs or "file_type" in kwargs:
                source_type = DataSourceType.FILE
            elif "data" in kwargs:
                source_type = DataSourceType.IN_MEMORY
            elif "config" in kwargs:
                config = kwargs["config"]
                # Check if it's an API config (has 'url') or SQL config (has 'connection_string')
                if isinstance(config, dict):
                    if "url" in config:
                        source_type = DataSourceType.API
                    elif "connection_string" in config or "query" in config:
                        source_type = DataSourceType.SQL
                    else:
                        # Try to create from dict
                        if "source_type" in config:
                            source_type = DataSourceType(config["source_type"].lower())
                        else:
                            raise ValueError(
                                "Cannot determine source type from config. "
                                "Please specify source_type or provide 'url' (API) "
                                "or 'connection_string'/'query' (SQL) in config."
                            )
                elif hasattr(config, "url"):
                    source_type = DataSourceType.API
                elif hasattr(config, "connection_string") or hasattr(config, "query"):
                    source_type = DataSourceType.SQL
                else:
                    raise ValueError(
                        "Cannot determine source type from config. "
                        "Please specify source_type explicitly."
                    )
            else:
                raise ValueError(
                    "Cannot determine source type. Please specify source_type or "
                    "provide one of: path (FILE), data (IN_MEMORY), or config (API/SQL)."
                )

        if isinstance(source_type, str):
            source_type = DataSourceType(source_type.lower())

        if source_type == DataSourceType.FILE:
            return cls.create_file_data_source(**kwargs)
        elif source_type == DataSourceType.API:
            if "config" not in kwargs:
                # Create APIConfig from kwargs
                from graflo.data_source.api import APIConfig, PaginationConfig

                # Handle nested pagination config manually
                api_kwargs = kwargs.copy()
                pagination_dict = api_kwargs.pop("pagination", None)
                pagination = None
                if pagination_dict is not None:
                    if isinstance(pagination_dict, dict):
                        # Manually construct PaginationConfig to avoid dataclass_wizard issues
                        pagination = PaginationConfig(**pagination_dict)
                    else:
                        pagination = pagination_dict
                api_kwargs["pagination"] = pagination
                config = APIConfig(**api_kwargs)
                return cls.create_api_data_source(config=config)
            config = kwargs["config"]
            if isinstance(config, dict):
                from graflo.data_source.api import APIConfig, PaginationConfig

                # Handle nested pagination config manually
                config_copy = config.copy()
                pagination_dict = config_copy.pop("pagination", None)
                pagination = None
                if pagination_dict is not None:
                    if isinstance(pagination_dict, dict):
                        # Manually construct PaginationConfig to avoid dataclass_wizard issues
                        pagination = PaginationConfig(**pagination_dict)
                    else:
                        pagination = pagination_dict
                config_copy["pagination"] = pagination
                config = APIConfig(**config_copy)
            return cls.create_api_data_source(config=config)
        elif source_type == DataSourceType.SQL:
            if "config" not in kwargs:
                # Create SQLConfig from kwargs
                from graflo.data_source.sql import SQLConfig

                config = SQLConfig.from_dict(kwargs)
                return cls.create_sql_data_source(config=config)
            config = kwargs["config"]
            if isinstance(config, dict):
                from graflo.data_source.sql import SQLConfig

                config = SQLConfig.from_dict(config)
            return cls.create_sql_data_source(config=config)
        elif source_type == DataSourceType.IN_MEMORY:
            if "data" not in kwargs:
                raise ValueError("In-memory data source requires 'data' parameter")
            return cls.create_in_memory_data_source(**kwargs)
        else:
            raise ValueError(f"Unsupported data source type: {source_type}")

    @classmethod
    def create_data_source_from_config(
        cls, config: dict[str, Any]
    ) -> AbstractDataSource:
        """Create a data source from a configuration dictionary.

        The configuration dict should contain:
        - 'source_type': Type of data source (FILE, API, SQL, IN_MEMORY)
        - Other parameters specific to the data source type

        Examples:
            File source:
                {"source_type": "file", "path": "data.json"}
            API source:
                {"source_type": "api", "config": {"url": "https://api.example.com"}}
            SQL source:
                {"source_type": "sql", "config": {"connection_string": "...", "query": "..."}}
            In-memory source:
                {"source_type": "in_memory", "data": [...]}

        Args:
            config: Configuration dictionary

        Returns:
            Data source instance

        Raises:
            ValueError: If configuration is invalid
        """
        config = config.copy()
        source_type = config.pop("source_type", None)
        return cls.create_data_source(source_type=source_type, **config)

create_api_data_source(config) classmethod

Create an API data source.

Parameters:

Name Type Description Default
config APIConfig

API configuration

required

Returns:

Type Description
APIDataSource

APIDataSource instance

Source code in graflo/data_source/factory.py
@classmethod
def create_api_data_source(cls, config: APIConfig) -> APIDataSource:
    """Create an API data source.

    Args:
        config: API configuration

    Returns:
        APIDataSource instance
    """
    return APIDataSource(config=config)

create_data_source(source_type=None, **kwargs) classmethod

Create a data source of the specified type.

This is a general factory method that routes to specific factory methods based on the source type.

Parameters:

Name Type Description Default
source_type DataSourceType | str | None

Type of data source to create. If None, will be inferred from kwargs (e.g., 'path' -> FILE, 'data' -> IN_MEMORY, 'config' with url -> API)

None
**kwargs Any

Configuration parameters for the data source

{}

Returns:

Type Description
AbstractDataSource

Data source instance

Raises:

Type Description
ValueError

If source type is not supported or required parameters are missing

Source code in graflo/data_source/factory.py
@classmethod
def create_data_source(
    cls,
    source_type: DataSourceType | str | None = None,
    **kwargs: Any,
) -> AbstractDataSource:
    """Create a data source of the specified type.

    This is a general factory method that routes to specific factory methods
    based on the source type.

    Args:
        source_type: Type of data source to create. If None, will be inferred
            from kwargs (e.g., 'path' -> FILE, 'data' -> IN_MEMORY, 'config' with url -> API)
        **kwargs: Configuration parameters for the data source

    Returns:
        Data source instance

    Raises:
        ValueError: If source type is not supported or required parameters are missing
    """
    # Auto-detect source type if not provided
    if source_type is None:
        if "path" in kwargs or "file_type" in kwargs:
            source_type = DataSourceType.FILE
        elif "data" in kwargs:
            source_type = DataSourceType.IN_MEMORY
        elif "config" in kwargs:
            config = kwargs["config"]
            # Check if it's an API config (has 'url') or SQL config (has 'connection_string')
            if isinstance(config, dict):
                if "url" in config:
                    source_type = DataSourceType.API
                elif "connection_string" in config or "query" in config:
                    source_type = DataSourceType.SQL
                else:
                    # Try to create from dict
                    if "source_type" in config:
                        source_type = DataSourceType(config["source_type"].lower())
                    else:
                        raise ValueError(
                            "Cannot determine source type from config. "
                            "Please specify source_type or provide 'url' (API) "
                            "or 'connection_string'/'query' (SQL) in config."
                        )
            elif hasattr(config, "url"):
                source_type = DataSourceType.API
            elif hasattr(config, "connection_string") or hasattr(config, "query"):
                source_type = DataSourceType.SQL
            else:
                raise ValueError(
                    "Cannot determine source type from config. "
                    "Please specify source_type explicitly."
                )
        else:
            raise ValueError(
                "Cannot determine source type. Please specify source_type or "
                "provide one of: path (FILE), data (IN_MEMORY), or config (API/SQL)."
            )

    if isinstance(source_type, str):
        source_type = DataSourceType(source_type.lower())

    if source_type == DataSourceType.FILE:
        return cls.create_file_data_source(**kwargs)
    elif source_type == DataSourceType.API:
        if "config" not in kwargs:
            # Create APIConfig from kwargs
            from graflo.data_source.api import APIConfig, PaginationConfig

            # Handle nested pagination config manually
            api_kwargs = kwargs.copy()
            pagination_dict = api_kwargs.pop("pagination", None)
            pagination = None
            if pagination_dict is not None:
                if isinstance(pagination_dict, dict):
                    # Manually construct PaginationConfig to avoid dataclass_wizard issues
                    pagination = PaginationConfig(**pagination_dict)
                else:
                    pagination = pagination_dict
            api_kwargs["pagination"] = pagination
            config = APIConfig(**api_kwargs)
            return cls.create_api_data_source(config=config)
        config = kwargs["config"]
        if isinstance(config, dict):
            from graflo.data_source.api import APIConfig, PaginationConfig

            # Handle nested pagination config manually
            config_copy = config.copy()
            pagination_dict = config_copy.pop("pagination", None)
            pagination = None
            if pagination_dict is not None:
                if isinstance(pagination_dict, dict):
                    # Manually construct PaginationConfig to avoid dataclass_wizard issues
                    pagination = PaginationConfig(**pagination_dict)
                else:
                    pagination = pagination_dict
            config_copy["pagination"] = pagination
            config = APIConfig(**config_copy)
        return cls.create_api_data_source(config=config)
    elif source_type == DataSourceType.SQL:
        if "config" not in kwargs:
            # Create SQLConfig from kwargs
            from graflo.data_source.sql import SQLConfig

            config = SQLConfig.from_dict(kwargs)
            return cls.create_sql_data_source(config=config)
        config = kwargs["config"]
        if isinstance(config, dict):
            from graflo.data_source.sql import SQLConfig

            config = SQLConfig.from_dict(config)
        return cls.create_sql_data_source(config=config)
    elif source_type == DataSourceType.IN_MEMORY:
        if "data" not in kwargs:
            raise ValueError("In-memory data source requires 'data' parameter")
        return cls.create_in_memory_data_source(**kwargs)
    else:
        raise ValueError(f"Unsupported data source type: {source_type}")

create_data_source_from_config(config) classmethod

Create a data source from a configuration dictionary.

The configuration dict should contain: - 'source_type': Type of data source (FILE, API, SQL, IN_MEMORY) - Other parameters specific to the data source type

Examples:

File source: {"source_type": "file", "path": "data.json"} API source: {"source_type": "api", "config": {"url": "https://api.example.com"}} SQL source: {"source_type": "sql", "config": {"connection_string": "...", "query": "..."}} In-memory source:

Parameters:

Name Type Description Default
config dict[str, Any]

Configuration dictionary

required

Returns:

Type Description
AbstractDataSource

Data source instance

Raises:

Type Description
ValueError

If configuration is invalid

Source code in graflo/data_source/factory.py
@classmethod
def create_data_source_from_config(
    cls, config: dict[str, Any]
) -> AbstractDataSource:
    """Create a data source from a configuration dictionary.

    The configuration dict should contain:
    - 'source_type': Type of data source (FILE, API, SQL, IN_MEMORY)
    - Other parameters specific to the data source type

    Examples:
        File source:
            {"source_type": "file", "path": "data.json"}
        API source:
            {"source_type": "api", "config": {"url": "https://api.example.com"}}
        SQL source:
            {"source_type": "sql", "config": {"connection_string": "...", "query": "..."}}
        In-memory source:
            {"source_type": "in_memory", "data": [...]}

    Args:
        config: Configuration dictionary

    Returns:
        Data source instance

    Raises:
        ValueError: If configuration is invalid
    """
    config = config.copy()
    source_type = config.pop("source_type", None)
    return cls.create_data_source(source_type=source_type, **config)

create_file_data_source(path, file_type=None, encoding=EncodingType.UTF_8, sep=None) classmethod

Create a file-based data source.

Parameters:

Name Type Description Default
path Path | str

Path to the file

required
file_type str | ChunkerType | None

Type of file ('json', 'jsonl', 'table', 'parquet') or ChunkerType. If None, will be guessed from file extension.

None
encoding EncodingType

File encoding (default: UTF_8)

UTF_8
sep str | None

Field separator for table files (default: ','). Only used for table files.

None

Returns:

Type Description
JsonFileDataSource | JsonlFileDataSource | TableFileDataSource | ParquetFileDataSource

Appropriate file data source instance (JsonFileDataSource,

JsonFileDataSource | JsonlFileDataSource | TableFileDataSource | ParquetFileDataSource

JsonlFileDataSource, TableFileDataSource, or ParquetFileDataSource)

Raises:

Type Description
ValueError

If file type cannot be determined

Source code in graflo/data_source/factory.py
@classmethod
def create_file_data_source(
    cls,
    path: Path | str,
    file_type: str | ChunkerType | None = None,
    encoding: EncodingType = EncodingType.UTF_8,
    sep: str | None = None,
) -> (
    JsonFileDataSource
    | JsonlFileDataSource
    | TableFileDataSource
    | ParquetFileDataSource
):
    """Create a file-based data source.

    Args:
        path: Path to the file
        file_type: Type of file ('json', 'jsonl', 'table', 'parquet') or ChunkerType.
            If None, will be guessed from file extension.
        encoding: File encoding (default: UTF_8)
        sep: Field separator for table files (default: ',').
            Only used for table files.

    Returns:
        Appropriate file data source instance (JsonFileDataSource,
        JsonlFileDataSource, TableFileDataSource, or ParquetFileDataSource)

    Raises:
        ValueError: If file type cannot be determined
    """
    if isinstance(path, str):
        path = Path(path)

    # Determine file type
    if file_type is None:
        try:
            file_type_enum = cls._guess_file_type(path)
        except ValueError as e:
            raise ValueError(
                f"Could not determine file type for {path}. "
                f"Please specify file_type explicitly. Error: {e}"
            )
    elif isinstance(file_type, str):
        file_type_enum = ChunkerType(file_type.lower())
    else:
        file_type_enum = file_type

    # Create appropriate data source
    if file_type_enum == ChunkerType.JSON:
        return JsonFileDataSource(path=path, encoding=encoding)
    elif file_type_enum == ChunkerType.JSONL:
        return JsonlFileDataSource(path=path, encoding=encoding)
    elif file_type_enum == ChunkerType.TABLE:
        # sep is only for table files
        return TableFileDataSource(path=path, encoding=encoding, sep=sep or ",")
    elif file_type_enum == ChunkerType.PARQUET:
        return ParquetFileDataSource(path=path)
    else:
        raise ValueError(f"Unsupported file type: {file_type_enum}")

create_in_memory_data_source(data, columns=None) classmethod

Create an in-memory data source.

Parameters:

Name Type Description Default
data list[dict] | list[list] | DataFrame

Data to process (list[dict], list[list], or pd.DataFrame)

required
columns list[str] | None

Optional column names for list[list] data

None

Returns:

Type Description
InMemoryDataSource

InMemoryDataSource instance

Source code in graflo/data_source/factory.py
@classmethod
def create_in_memory_data_source(
    cls,
    data: list[dict] | list[list] | pd.DataFrame,
    columns: list[str] | None = None,
) -> InMemoryDataSource:
    """Create an in-memory data source.

    Args:
        data: Data to process (list[dict], list[list], or pd.DataFrame)
        columns: Optional column names for list[list] data

    Returns:
        InMemoryDataSource instance
    """
    return InMemoryDataSource(data=data, columns=columns)

create_sql_data_source(config) classmethod

Create a SQL data source.

Parameters:

Name Type Description Default
config SQLConfig

SQL configuration

required

Returns:

Type Description
SQLDataSource

SQLDataSource instance

Source code in graflo/data_source/factory.py
@classmethod
def create_sql_data_source(cls, config: SQLConfig) -> SQLDataSource:
    """Create a SQL data source.

    Args:
        config: SQL configuration

    Returns:
        SQLDataSource instance
    """
    return SQLDataSource(config=config)

DataSourceRegistry dataclass

Bases: BaseDataclass

Registry for mapping data sources to resource names.

This class maintains a mapping from resource names to lists of data sources. Many data sources can map to the same resource, allowing data to be ingested from multiple sources and combined.

Attributes:

Name Type Description
sources dict[str, list[AbstractDataSource]]

Dictionary mapping resource names to lists of data sources

Source code in graflo/data_source/registry.py
@dataclasses.dataclass
class DataSourceRegistry(BaseDataclass):
    """Registry for mapping data sources to resource names.

    This class maintains a mapping from resource names to lists of data sources.
    Many data sources can map to the same resource, allowing data to be ingested
    from multiple sources and combined.

    Attributes:
        sources: Dictionary mapping resource names to lists of data sources
    """

    sources: dict[str, list[AbstractDataSource]] = dataclasses.field(
        default_factory=dict
    )

    def register(self, data_source: AbstractDataSource, resource_name: str) -> None:
        """Register a data source for a resource.

        Args:
            data_source: Data source to register
            resource_name: Name of the resource to map to
        """
        if resource_name not in self.sources:
            self.sources[resource_name] = []
        self.sources[resource_name].append(data_source)
        data_source.resource_name = resource_name

    def get_data_sources(self, resource_name: str) -> list[AbstractDataSource]:
        """Get all data sources for a resource.

        Args:
            resource_name: Name of the resource

        Returns:
            List of data sources for the resource (empty list if none found)
        """
        return self.sources.get(resource_name, [])

    def get_all_data_sources(self) -> list[AbstractDataSource]:
        """Get all registered data sources.

        Returns:
            List of all registered data sources
        """
        all_sources = []
        for sources_list in self.sources.values():
            all_sources.extend(sources_list)
        return all_sources

    def has_resource(self, resource_name: str) -> bool:
        """Check if a resource has any data sources.

        Args:
            resource_name: Name of the resource

        Returns:
            True if the resource has data sources, False otherwise
        """
        return resource_name in self.sources and len(self.sources[resource_name]) > 0

    def clear(self) -> None:
        """Clear all registered data sources."""
        self.sources.clear()

clear()

Clear all registered data sources.

Source code in graflo/data_source/registry.py
def clear(self) -> None:
    """Clear all registered data sources."""
    self.sources.clear()

get_all_data_sources()

Get all registered data sources.

Returns:

Type Description
list[AbstractDataSource]

List of all registered data sources

Source code in graflo/data_source/registry.py
def get_all_data_sources(self) -> list[AbstractDataSource]:
    """Get all registered data sources.

    Returns:
        List of all registered data sources
    """
    all_sources = []
    for sources_list in self.sources.values():
        all_sources.extend(sources_list)
    return all_sources

get_data_sources(resource_name)

Get all data sources for a resource.

Parameters:

Name Type Description Default
resource_name str

Name of the resource

required

Returns:

Type Description
list[AbstractDataSource]

List of data sources for the resource (empty list if none found)

Source code in graflo/data_source/registry.py
def get_data_sources(self, resource_name: str) -> list[AbstractDataSource]:
    """Get all data sources for a resource.

    Args:
        resource_name: Name of the resource

    Returns:
        List of data sources for the resource (empty list if none found)
    """
    return self.sources.get(resource_name, [])

has_resource(resource_name)

Check if a resource has any data sources.

Parameters:

Name Type Description Default
resource_name str

Name of the resource

required

Returns:

Type Description
bool

True if the resource has data sources, False otherwise

Source code in graflo/data_source/registry.py
def has_resource(self, resource_name: str) -> bool:
    """Check if a resource has any data sources.

    Args:
        resource_name: Name of the resource

    Returns:
        True if the resource has data sources, False otherwise
    """
    return resource_name in self.sources and len(self.sources[resource_name]) > 0

register(data_source, resource_name)

Register a data source for a resource.

Parameters:

Name Type Description Default
data_source AbstractDataSource

Data source to register

required
resource_name str

Name of the resource to map to

required
Source code in graflo/data_source/registry.py
def register(self, data_source: AbstractDataSource, resource_name: str) -> None:
    """Register a data source for a resource.

    Args:
        data_source: Data source to register
        resource_name: Name of the resource to map to
    """
    if resource_name not in self.sources:
        self.sources[resource_name] = []
    self.sources[resource_name].append(data_source)
    data_source.resource_name = resource_name

DataSourceType

Bases: BaseEnum

Types of data sources supported by the system.

FILE: File-based data sources (JSON, JSONL, CSV/TSV) API: REST API data sources SQL: SQL database data sources IN_MEMORY: In-memory data sources (lists, DataFrames)

Source code in graflo/data_source/base.py
class DataSourceType(BaseEnum):
    """Types of data sources supported by the system.

    FILE: File-based data sources (JSON, JSONL, CSV/TSV)
    API: REST API data sources
    SQL: SQL database data sources
    IN_MEMORY: In-memory data sources (lists, DataFrames)
    """

    FILE = "file"
    API = "api"
    SQL = "sql"
    IN_MEMORY = "in_memory"

FileDataSource dataclass

Bases: AbstractDataSource

Base class for file-based data sources.

This class provides a common interface for file-based data sources, integrating with the existing chunker system for batch processing.

Attributes:

Name Type Description
path Path | str

Path to the file

file_type str | None

Type of file (json, jsonl, table)

encoding EncodingType

File encoding (default: UTF_8)

Source code in graflo/data_source/file.py
@dataclasses.dataclass
class FileDataSource(AbstractDataSource):
    """Base class for file-based data sources.

    This class provides a common interface for file-based data sources,
    integrating with the existing chunker system for batch processing.

    Attributes:
        path: Path to the file
        file_type: Type of file (json, jsonl, table)
        encoding: File encoding (default: UTF_8)
    """

    path: Path | str
    file_type: str | None = None
    encoding: EncodingType = EncodingType.UTF_8

    def __post_init__(self):
        """Initialize the file data source."""
        self.source_type = DataSourceType.FILE
        if isinstance(self.path, str):
            self.path = Path(self.path)

    def iter_batches(
        self, batch_size: int = 1000, limit: int | None = None
    ) -> Iterator[list[dict]]:
        """Iterate over file data in batches.

        Args:
            batch_size: Number of items per batch
            limit: Maximum number of items to retrieve

        Yields:
            list[dict]: Batches of documents as dictionaries
        """
        # Determine chunker type
        chunker_type = None
        if self.file_type:
            chunker_type = ChunkerType(self.file_type.lower())

        # Create chunker using factory
        chunker_kwargs = {
            "resource": self.path,
            "type": chunker_type,
            "batch_size": batch_size,
            "limit": limit,
            "encoding": self.encoding,
        }
        # Only add sep for table files
        if chunker_type == ChunkerType.TABLE and hasattr(self, "sep"):
            chunker_kwargs["sep"] = self.sep

        chunker = ChunkerFactory.create_chunker(**chunker_kwargs)

        # Yield batches
        for batch in chunker:
            yield batch

__post_init__()

Initialize the file data source.

Source code in graflo/data_source/file.py
def __post_init__(self):
    """Initialize the file data source."""
    self.source_type = DataSourceType.FILE
    if isinstance(self.path, str):
        self.path = Path(self.path)

iter_batches(batch_size=1000, limit=None)

Iterate over file data in batches.

Parameters:

Name Type Description Default
batch_size int

Number of items per batch

1000
limit int | None

Maximum number of items to retrieve

None

Yields:

Type Description
list[dict]

list[dict]: Batches of documents as dictionaries

Source code in graflo/data_source/file.py
def iter_batches(
    self, batch_size: int = 1000, limit: int | None = None
) -> Iterator[list[dict]]:
    """Iterate over file data in batches.

    Args:
        batch_size: Number of items per batch
        limit: Maximum number of items to retrieve

    Yields:
        list[dict]: Batches of documents as dictionaries
    """
    # Determine chunker type
    chunker_type = None
    if self.file_type:
        chunker_type = ChunkerType(self.file_type.lower())

    # Create chunker using factory
    chunker_kwargs = {
        "resource": self.path,
        "type": chunker_type,
        "batch_size": batch_size,
        "limit": limit,
        "encoding": self.encoding,
    }
    # Only add sep for table files
    if chunker_type == ChunkerType.TABLE and hasattr(self, "sep"):
        chunker_kwargs["sep"] = self.sep

    chunker = ChunkerFactory.create_chunker(**chunker_kwargs)

    # Yield batches
    for batch in chunker:
        yield batch

InMemoryDataSource dataclass

Bases: AbstractDataSource

Data source for in-memory data structures.

This class provides a data source for Python objects that are already in memory, including lists of dictionaries, lists of lists, and Pandas DataFrames.

Attributes:

Name Type Description
data list[dict] | list[list] | DataFrame

Data to process (list[dict], list[list], or pd.DataFrame)

columns list[str] | None

Optional column names for list[list] data

Source code in graflo/data_source/memory.py
@dataclasses.dataclass
class InMemoryDataSource(AbstractDataSource):
    """Data source for in-memory data structures.

    This class provides a data source for Python objects that are already
    in memory, including lists of dictionaries, lists of lists, and Pandas DataFrames.

    Attributes:
        data: Data to process (list[dict], list[list], or pd.DataFrame)
        columns: Optional column names for list[list] data
    """

    data: list[dict] | list[list] | pd.DataFrame
    columns: list[str] | None = None

    def __post_init__(self):
        """Initialize the in-memory data source."""
        self.source_type = DataSourceType.IN_MEMORY

    def iter_batches(
        self, batch_size: int = 1000, limit: int | None = None
    ) -> Iterator[list[dict]]:
        """Iterate over in-memory data in batches.

        Args:
            batch_size: Number of items per batch
            limit: Maximum number of items to retrieve

        Yields:
            list[dict]: Batches of documents as dictionaries
        """
        # Normalize data: convert list[list] to list[dict] if needed
        data = self.data
        if isinstance(data, list) and len(data) > 0 and isinstance(data[0], list):
            # list[list] - convert to list[dict] using columns
            if self.columns is None:
                raise ValueError(
                    "columns parameter is required when data is list[list]"
                )
            data = [{k: v for k, v in zip(self.columns, item)} for item in data]

        # Create chunker using factory (only pass columns if it's a DataFrame)
        chunker_kwargs = {
            "resource": data,
            "batch_size": batch_size,
            "limit": limit,
        }
        # Note: columns is not passed to chunker - we handle list[list] conversion above
        # DataFrame chunker doesn't need columns either

        chunker = ChunkerFactory.create_chunker(**chunker_kwargs)

        # Yield batches
        for batch in chunker:
            yield batch

__post_init__()

Initialize the in-memory data source.

Source code in graflo/data_source/memory.py
def __post_init__(self):
    """Initialize the in-memory data source."""
    self.source_type = DataSourceType.IN_MEMORY

iter_batches(batch_size=1000, limit=None)

Iterate over in-memory data in batches.

Parameters:

Name Type Description Default
batch_size int

Number of items per batch

1000
limit int | None

Maximum number of items to retrieve

None

Yields:

Type Description
list[dict]

list[dict]: Batches of documents as dictionaries

Source code in graflo/data_source/memory.py
def iter_batches(
    self, batch_size: int = 1000, limit: int | None = None
) -> Iterator[list[dict]]:
    """Iterate over in-memory data in batches.

    Args:
        batch_size: Number of items per batch
        limit: Maximum number of items to retrieve

    Yields:
        list[dict]: Batches of documents as dictionaries
    """
    # Normalize data: convert list[list] to list[dict] if needed
    data = self.data
    if isinstance(data, list) and len(data) > 0 and isinstance(data[0], list):
        # list[list] - convert to list[dict] using columns
        if self.columns is None:
            raise ValueError(
                "columns parameter is required when data is list[list]"
            )
        data = [{k: v for k, v in zip(self.columns, item)} for item in data]

    # Create chunker using factory (only pass columns if it's a DataFrame)
    chunker_kwargs = {
        "resource": data,
        "batch_size": batch_size,
        "limit": limit,
    }
    # Note: columns is not passed to chunker - we handle list[list] conversion above
    # DataFrame chunker doesn't need columns either

    chunker = ChunkerFactory.create_chunker(**chunker_kwargs)

    # Yield batches
    for batch in chunker:
        yield batch

JsonFileDataSource dataclass

Bases: FileDataSource

Data source for JSON files.

JSON files are expected to contain hierarchical data structures, similar to REST API responses. The chunker handles nested structures and converts them to dictionaries.

Attributes:

Name Type Description
path Path | str

Path to the JSON file

encoding EncodingType

File encoding (default: UTF_8)

Source code in graflo/data_source/file.py
@dataclasses.dataclass
class JsonFileDataSource(FileDataSource):
    """Data source for JSON files.

    JSON files are expected to contain hierarchical data structures,
    similar to REST API responses. The chunker handles nested structures
    and converts them to dictionaries.

    Attributes:
        path: Path to the JSON file
        encoding: File encoding (default: UTF_8)
    """

    def __post_init__(self):
        """Initialize the JSON file data source."""
        super().__post_init__()
        self.file_type = ChunkerType.JSON.value

__post_init__()

Initialize the JSON file data source.

Source code in graflo/data_source/file.py
def __post_init__(self):
    """Initialize the JSON file data source."""
    super().__post_init__()
    self.file_type = ChunkerType.JSON.value

JsonlFileDataSource dataclass

Bases: FileDataSource

Data source for JSONL (JSON Lines) files.

JSONL files contain one JSON object per line, making them suitable for streaming and batch processing.

Attributes:

Name Type Description
path Path | str

Path to the JSONL file

encoding EncodingType

File encoding (default: UTF_8)

Source code in graflo/data_source/file.py
@dataclasses.dataclass
class JsonlFileDataSource(FileDataSource):
    """Data source for JSONL (JSON Lines) files.

    JSONL files contain one JSON object per line, making them suitable
    for streaming and batch processing.

    Attributes:
        path: Path to the JSONL file
        encoding: File encoding (default: UTF_8)
    """

    def __post_init__(self):
        """Initialize the JSONL file data source."""
        super().__post_init__()
        self.file_type = ChunkerType.JSONL.value

__post_init__()

Initialize the JSONL file data source.

Source code in graflo/data_source/file.py
def __post_init__(self):
    """Initialize the JSONL file data source."""
    super().__post_init__()
    self.file_type = ChunkerType.JSONL.value

PaginationConfig dataclass

Bases: BaseDataclass

Configuration for API pagination.

Supports multiple pagination strategies: - offset: Offset-based pagination (offset, limit) - cursor: Cursor-based pagination (cursor parameter) - page: Page-based pagination (page, per_page)

Attributes:

Name Type Description
strategy str

Pagination strategy ('offset', 'cursor', 'page')

offset_param str

Parameter name for offset (default: 'offset')

limit_param str

Parameter name for limit (default: 'limit')

cursor_param str

Parameter name for cursor (default: 'cursor')

page_param str

Parameter name for page (default: 'page')

per_page_param str

Parameter name for per_page (default: 'per_page')

initial_offset int

Initial offset value (default: 0)

initial_page int

Initial page value (default: 1)

page_size int

Number of items per page (default: 100)

cursor_path str | None

JSON path to cursor in response (for cursor-based)

has_more_path str | None

JSON path to has_more flag in response

data_path str | None

JSON path to data array in response (default: root)

Source code in graflo/data_source/api.py
@dataclasses.dataclass
class PaginationConfig(BaseDataclass):
    """Configuration for API pagination.

    Supports multiple pagination strategies:
    - offset: Offset-based pagination (offset, limit)
    - cursor: Cursor-based pagination (cursor parameter)
    - page: Page-based pagination (page, per_page)

    Attributes:
        strategy: Pagination strategy ('offset', 'cursor', 'page')
        offset_param: Parameter name for offset (default: 'offset')
        limit_param: Parameter name for limit (default: 'limit')
        cursor_param: Parameter name for cursor (default: 'cursor')
        page_param: Parameter name for page (default: 'page')
        per_page_param: Parameter name for per_page (default: 'per_page')
        initial_offset: Initial offset value (default: 0)
        initial_page: Initial page value (default: 1)
        page_size: Number of items per page (default: 100)
        cursor_path: JSON path to cursor in response (for cursor-based)
        has_more_path: JSON path to has_more flag in response
        data_path: JSON path to data array in response (default: root)
    """

    strategy: str = "offset"  # 'offset', 'cursor', 'page'
    offset_param: str = "offset"
    limit_param: str = "limit"
    cursor_param: str = "cursor"
    page_param: str = "page"
    per_page_param: str = "per_page"
    initial_offset: int = 0
    initial_page: int = 1
    page_size: int = 100
    cursor_path: str | None = None  # JSON path like "next_cursor"
    has_more_path: str | None = None  # JSON path like "has_more"
    data_path: str | None = None  # JSON path to data array, None means root

SQLConfig dataclass

Bases: BaseDataclass

Configuration for SQL data source.

Uses SQLAlchemy connection string format.

Attributes:

Name Type Description
connection_string str

SQLAlchemy connection string (e.g., 'postgresql://user:pass@localhost/dbname')

query str

SQL query string (supports parameterized queries)

params dict[str, Any]

Query parameters as dictionary (for parameterized queries)

pagination bool

Whether to use pagination (default: True)

page_size int

Number of rows per page (default: 1000)

Source code in graflo/data_source/sql.py
@dataclasses.dataclass
class SQLConfig(BaseDataclass):
    """Configuration for SQL data source.

    Uses SQLAlchemy connection string format.

    Attributes:
        connection_string: SQLAlchemy connection string
            (e.g., 'postgresql://user:pass@localhost/dbname')
        query: SQL query string (supports parameterized queries)
        params: Query parameters as dictionary (for parameterized queries)
        pagination: Whether to use pagination (default: True)
        page_size: Number of rows per page (default: 1000)
    """

    connection_string: str
    query: str
    params: dict[str, Any] = dataclasses.field(default_factory=dict)
    pagination: bool = True
    page_size: int = 1000

SQLDataSource dataclass

Bases: AbstractDataSource

Data source for SQL databases.

This class provides a data source for SQL databases using SQLAlchemy. It supports parameterized queries and pagination. Returns rows as dictionaries with column names as keys.

Attributes:

Name Type Description
config SQLConfig

SQL configuration

engine Engine | None

SQLAlchemy engine (created on first use)

Source code in graflo/data_source/sql.py
@dataclasses.dataclass
class SQLDataSource(AbstractDataSource):
    """Data source for SQL databases.

    This class provides a data source for SQL databases using SQLAlchemy.
    It supports parameterized queries and pagination. Returns rows as
    dictionaries with column names as keys.

    Attributes:
        config: SQL configuration
        engine: SQLAlchemy engine (created on first use)
    """

    config: SQLConfig
    engine: Engine | None = dataclasses.field(default=None, init=False)

    def __post_init__(self):
        """Initialize the SQL data source."""
        super().__post_init__()
        self.source_type = DataSourceType.SQL

    def _get_engine(self) -> Engine:
        """Get or create SQLAlchemy engine.

        Returns:
            SQLAlchemy engine instance
        """
        if self.engine is None:
            self.engine = create_engine(self.config.connection_string)
        return self.engine

    def _add_pagination(self, query: str, offset: int, limit: int) -> str:
        """Add pagination to SQL query.

        Args:
            query: Original SQL query
            offset: Offset value
            limit: Limit value

        Returns:
            Query with pagination added
        """
        # Check if query already has LIMIT/OFFSET
        query_upper = query.upper().strip()
        if "LIMIT" in query_upper or "OFFSET" in query_upper:
            # Query already has pagination, return as-is
            return query

        # Add pagination based on database type
        # For most SQL databases, use LIMIT/OFFSET
        # For SQL Server, use TOP and OFFSET/FETCH
        connection_string_lower = self.config.connection_string.lower()

        if "sqlserver" in connection_string_lower or "mssql" in connection_string_lower:
            # SQL Server syntax
            return f"{query} OFFSET {offset} ROWS FETCH NEXT {limit} ROWS ONLY"
        elif "oracle" in connection_string_lower:
            # Oracle syntax (using ROWNUM or FETCH)
            return f"{query} OFFSET {offset} ROWS FETCH NEXT {limit} ROWS ONLY"
        else:
            # Standard SQL (PostgreSQL, MySQL, SQLite, etc.)
            return f"{query} LIMIT {limit} OFFSET {offset}"

    def iter_batches(
        self, batch_size: int = 1000, limit: int | None = None
    ) -> Iterator[list[dict]]:
        """Iterate over SQL query results in batches.

        Args:
            batch_size: Number of items per batch
            limit: Maximum number of items to retrieve

        Yields:
            list[dict]: Batches of rows as dictionaries
        """
        engine = self._get_engine()
        total_items = 0
        offset = 0

        # Use configured page size or batch size, whichever is smaller
        page_size = min(self.config.page_size, batch_size)

        while True:
            # Build query
            if self.config.pagination:
                query_str = self._add_pagination(
                    self.config.query, offset=offset, limit=page_size
                )
            else:
                query_str = self.config.query

            # Execute query
            try:
                with engine.connect() as conn:
                    result = conn.execute(text(query_str), self.config.params)
                    rows = result.fetchall()

                    # Convert rows to dictionaries
                    batch = []
                    from decimal import Decimal

                    for row in rows:
                        if limit and total_items >= limit:
                            break

                        # Convert row to dictionary
                        row_dict = dict(row._mapping)
                        # Convert Decimal to float for JSON compatibility
                        for key, value in row_dict.items():
                            if isinstance(value, Decimal):
                                row_dict[key] = float(value)
                        batch.append(row_dict)
                        total_items += 1

                        # Yield when batch is full
                        if len(batch) >= batch_size:
                            yield batch
                            batch = []

                    # Yield remaining items
                    if batch:
                        yield batch

                    # Check if we should continue
                    if limit and total_items >= limit:
                        break

                    # Check if there are more rows
                    if len(rows) < page_size:
                        # No more rows
                        break

                    # Update offset for next iteration
                    if self.config.pagination:
                        offset += page_size
                    else:
                        # No pagination, single query
                        break

            except Exception as e:
                logger.error(f"SQL query execution failed: {e}")
                break

__post_init__()

Initialize the SQL data source.

Source code in graflo/data_source/sql.py
def __post_init__(self):
    """Initialize the SQL data source."""
    super().__post_init__()
    self.source_type = DataSourceType.SQL

iter_batches(batch_size=1000, limit=None)

Iterate over SQL query results in batches.

Parameters:

Name Type Description Default
batch_size int

Number of items per batch

1000
limit int | None

Maximum number of items to retrieve

None

Yields:

Type Description
list[dict]

list[dict]: Batches of rows as dictionaries

Source code in graflo/data_source/sql.py
def iter_batches(
    self, batch_size: int = 1000, limit: int | None = None
) -> Iterator[list[dict]]:
    """Iterate over SQL query results in batches.

    Args:
        batch_size: Number of items per batch
        limit: Maximum number of items to retrieve

    Yields:
        list[dict]: Batches of rows as dictionaries
    """
    engine = self._get_engine()
    total_items = 0
    offset = 0

    # Use configured page size or batch size, whichever is smaller
    page_size = min(self.config.page_size, batch_size)

    while True:
        # Build query
        if self.config.pagination:
            query_str = self._add_pagination(
                self.config.query, offset=offset, limit=page_size
            )
        else:
            query_str = self.config.query

        # Execute query
        try:
            with engine.connect() as conn:
                result = conn.execute(text(query_str), self.config.params)
                rows = result.fetchall()

                # Convert rows to dictionaries
                batch = []
                from decimal import Decimal

                for row in rows:
                    if limit and total_items >= limit:
                        break

                    # Convert row to dictionary
                    row_dict = dict(row._mapping)
                    # Convert Decimal to float for JSON compatibility
                    for key, value in row_dict.items():
                        if isinstance(value, Decimal):
                            row_dict[key] = float(value)
                    batch.append(row_dict)
                    total_items += 1

                    # Yield when batch is full
                    if len(batch) >= batch_size:
                        yield batch
                        batch = []

                # Yield remaining items
                if batch:
                    yield batch

                # Check if we should continue
                if limit and total_items >= limit:
                    break

                # Check if there are more rows
                if len(rows) < page_size:
                    # No more rows
                    break

                # Update offset for next iteration
                if self.config.pagination:
                    offset += page_size
                else:
                    # No pagination, single query
                    break

        except Exception as e:
            logger.error(f"SQL query execution failed: {e}")
            break

TableFileDataSource dataclass

Bases: FileDataSource

Data source for CSV/TSV files.

Table files are converted to dictionaries with column headers as keys. Each row becomes a dictionary.

Attributes:

Name Type Description
path Path | str

Path to the CSV/TSV file

encoding EncodingType

File encoding (default: UTF_8)

sep str

Field separator (default: ',')

Source code in graflo/data_source/file.py
@dataclasses.dataclass
class TableFileDataSource(FileDataSource):
    """Data source for CSV/TSV files.

    Table files are converted to dictionaries with column headers as keys.
    Each row becomes a dictionary.

    Attributes:
        path: Path to the CSV/TSV file
        encoding: File encoding (default: UTF_8)
        sep: Field separator (default: ',')
    """

    sep: str = ","

    def __post_init__(self):
        """Initialize the table file data source."""
        super().__post_init__()
        self.file_type = ChunkerType.TABLE.value

__post_init__()

Initialize the table file data source.

Source code in graflo/data_source/file.py
def __post_init__(self):
    """Initialize the table file data source."""
    super().__post_init__()
    self.file_type = ChunkerType.TABLE.value