Skip to content

graflo.data_source.file

File-based data source implementations.

This module provides data source implementations for file-based data sources, including JSON, JSONL, and CSV/TSV files. It integrates with the existing chunker logic for efficient batch processing.

FileDataSource dataclass

Bases: AbstractDataSource

Base class for file-based data sources.

This class provides a common interface for file-based data sources, integrating with the existing chunker system for batch processing.

Attributes:

Name Type Description
path Path | str

Path to the file

file_type str | None

Type of file (json, jsonl, table)

encoding EncodingType

File encoding (default: UTF_8)

Source code in graflo/data_source/file.py
@dataclasses.dataclass
class FileDataSource(AbstractDataSource):
    """Base class for file-based data sources.

    This class provides a common interface for file-based data sources,
    integrating with the existing chunker system for batch processing.

    Attributes:
        path: Path to the file
        file_type: Type of file (json, jsonl, table)
        encoding: File encoding (default: UTF_8)
    """

    path: Path | str
    file_type: str | None = None
    encoding: EncodingType = EncodingType.UTF_8

    def __post_init__(self):
        """Initialize the file data source."""
        self.source_type = DataSourceType.FILE
        if isinstance(self.path, str):
            self.path = Path(self.path)

    def iter_batches(
        self, batch_size: int = 1000, limit: int | None = None
    ) -> Iterator[list[dict]]:
        """Iterate over file data in batches.

        Args:
            batch_size: Number of items per batch
            limit: Maximum number of items to retrieve

        Yields:
            list[dict]: Batches of documents as dictionaries
        """
        # Determine chunker type
        chunker_type = None
        if self.file_type:
            chunker_type = ChunkerType(self.file_type.lower())

        # Create chunker using factory
        chunker_kwargs = {
            "resource": self.path,
            "type": chunker_type,
            "batch_size": batch_size,
            "limit": limit,
            "encoding": self.encoding,
        }
        # Only add sep for table files
        if chunker_type == ChunkerType.TABLE and hasattr(self, "sep"):
            chunker_kwargs["sep"] = self.sep

        chunker = ChunkerFactory.create_chunker(**chunker_kwargs)

        # Yield batches
        for batch in chunker:
            yield batch

__post_init__()

Initialize the file data source.

Source code in graflo/data_source/file.py
def __post_init__(self):
    """Initialize the file data source."""
    self.source_type = DataSourceType.FILE
    if isinstance(self.path, str):
        self.path = Path(self.path)

iter_batches(batch_size=1000, limit=None)

Iterate over file data in batches.

Parameters:

Name Type Description Default
batch_size int

Number of items per batch

1000
limit int | None

Maximum number of items to retrieve

None

Yields:

Type Description
list[dict]

list[dict]: Batches of documents as dictionaries

Source code in graflo/data_source/file.py
def iter_batches(
    self, batch_size: int = 1000, limit: int | None = None
) -> Iterator[list[dict]]:
    """Iterate over file data in batches.

    Args:
        batch_size: Number of items per batch
        limit: Maximum number of items to retrieve

    Yields:
        list[dict]: Batches of documents as dictionaries
    """
    # Determine chunker type
    chunker_type = None
    if self.file_type:
        chunker_type = ChunkerType(self.file_type.lower())

    # Create chunker using factory
    chunker_kwargs = {
        "resource": self.path,
        "type": chunker_type,
        "batch_size": batch_size,
        "limit": limit,
        "encoding": self.encoding,
    }
    # Only add sep for table files
    if chunker_type == ChunkerType.TABLE and hasattr(self, "sep"):
        chunker_kwargs["sep"] = self.sep

    chunker = ChunkerFactory.create_chunker(**chunker_kwargs)

    # Yield batches
    for batch in chunker:
        yield batch

JsonFileDataSource dataclass

Bases: FileDataSource

Data source for JSON files.

JSON files are expected to contain hierarchical data structures, similar to REST API responses. The chunker handles nested structures and converts them to dictionaries.

Attributes:

Name Type Description
path Path | str

Path to the JSON file

encoding EncodingType

File encoding (default: UTF_8)

Source code in graflo/data_source/file.py
@dataclasses.dataclass
class JsonFileDataSource(FileDataSource):
    """Data source for JSON files.

    JSON files are expected to contain hierarchical data structures,
    similar to REST API responses. The chunker handles nested structures
    and converts them to dictionaries.

    Attributes:
        path: Path to the JSON file
        encoding: File encoding (default: UTF_8)
    """

    def __post_init__(self):
        """Initialize the JSON file data source."""
        super().__post_init__()
        self.file_type = ChunkerType.JSON.value

__post_init__()

Initialize the JSON file data source.

Source code in graflo/data_source/file.py
def __post_init__(self):
    """Initialize the JSON file data source."""
    super().__post_init__()
    self.file_type = ChunkerType.JSON.value

JsonlFileDataSource dataclass

Bases: FileDataSource

Data source for JSONL (JSON Lines) files.

JSONL files contain one JSON object per line, making them suitable for streaming and batch processing.

Attributes:

Name Type Description
path Path | str

Path to the JSONL file

encoding EncodingType

File encoding (default: UTF_8)

Source code in graflo/data_source/file.py
@dataclasses.dataclass
class JsonlFileDataSource(FileDataSource):
    """Data source for JSONL (JSON Lines) files.

    JSONL files contain one JSON object per line, making them suitable
    for streaming and batch processing.

    Attributes:
        path: Path to the JSONL file
        encoding: File encoding (default: UTF_8)
    """

    def __post_init__(self):
        """Initialize the JSONL file data source."""
        super().__post_init__()
        self.file_type = ChunkerType.JSONL.value

__post_init__()

Initialize the JSONL file data source.

Source code in graflo/data_source/file.py
def __post_init__(self):
    """Initialize the JSONL file data source."""
    super().__post_init__()
    self.file_type = ChunkerType.JSONL.value

ParquetFileDataSource dataclass

Bases: FileDataSource

Data source for Parquet files.

Parquet files are columnar storage format files that are read using pandas. Each row becomes a dictionary with column names as keys.

Attributes:

Name Type Description
path Path | str

Path to the Parquet file

Source code in graflo/data_source/file.py
@dataclasses.dataclass
class ParquetFileDataSource(FileDataSource):
    """Data source for Parquet files.

    Parquet files are columnar storage format files that are read using pandas.
    Each row becomes a dictionary with column names as keys.

    Attributes:
        path: Path to the Parquet file
    """

    def __post_init__(self):
        """Initialize the Parquet file data source."""
        super().__post_init__()
        self.file_type = ChunkerType.PARQUET.value

__post_init__()

Initialize the Parquet file data source.

Source code in graflo/data_source/file.py
def __post_init__(self):
    """Initialize the Parquet file data source."""
    super().__post_init__()
    self.file_type = ChunkerType.PARQUET.value

TableFileDataSource dataclass

Bases: FileDataSource

Data source for CSV/TSV files.

Table files are converted to dictionaries with column headers as keys. Each row becomes a dictionary.

Attributes:

Name Type Description
path Path | str

Path to the CSV/TSV file

encoding EncodingType

File encoding (default: UTF_8)

sep str

Field separator (default: ',')

Source code in graflo/data_source/file.py
@dataclasses.dataclass
class TableFileDataSource(FileDataSource):
    """Data source for CSV/TSV files.

    Table files are converted to dictionaries with column headers as keys.
    Each row becomes a dictionary.

    Attributes:
        path: Path to the CSV/TSV file
        encoding: File encoding (default: UTF_8)
        sep: Field separator (default: ',')
    """

    sep: str = ","

    def __post_init__(self):
        """Initialize the table file data source."""
        super().__post_init__()
        self.file_type = ChunkerType.TABLE.value

__post_init__()

Initialize the table file data source.

Source code in graflo/data_source/file.py
def __post_init__(self):
    """Initialize the table file data source."""
    super().__post_init__()
    self.file_type = ChunkerType.TABLE.value