graflo.util.chunker¶
Data chunking utilities for efficient file processing.
This module provides utilities for processing large files by breaking them into manageable chunks. It supports various file formats (JSON, JSONL, CSV) and provides both file-based and in-memory chunking capabilities.
Key Components
- AbstractChunker: Base class for chunking implementations
- FileChunker: File-based chunking with encoding support
- TableChunker: CSV/TSV file chunking
- JsonlChunker: JSON Lines file chunking
- JsonChunker: JSON file chunking
- TrivialChunker: In-memory list chunking
- ChunkerDataFrame: Pandas DataFrame chunking
- ChunkerFactory: Factory for creating appropriate chunkers
Example
chunker = ChunkerFactory.create_chunker( ... resource="data.json", ... type=ChunkerType.JSON, ... batch_size=1000 ... ) for batch in chunker: ... process_batch(batch)
AbstractChunker
¶
Bases: ABC
Abstract base class for chunking implementations.
This class defines the interface for all chunkers, providing common functionality for batch processing and iteration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size
|
Number of items per batch (default: 10) |
10
|
|
limit
|
Maximum number of items to process (default: None) |
None
|
Attributes:
| Name | Type | Description |
|---|---|---|
units_processed |
Number of items processed |
|
batch_size |
Size of each batch |
|
limit |
int | None
|
Maximum number of items to process |
cnt |
Current count of processed items |
|
iteration_tried |
Whether iteration has been attempted |
Source code in graflo/util/chunker.py
__iter__()
¶
Initialize iteration if not already done.
Returns:
| Name | Type | Description |
|---|---|---|
self |
Iterator instance |
__next__()
¶
Get the next batch of items.
Returns:
| Name | Type | Description |
|---|---|---|
list |
Next batch of items |
Raises:
| Type | Description |
|---|---|
StopIteration
|
When no more items are available or limit is reached |
Source code in graflo/util/chunker.py
ChunkFlusherMono
¶
Monolithic chunk flusher for writing data to files.
This class provides functionality for writing chunks of data to files, with support for file naming and size limits.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target_prefix
|
Prefix for output files |
required | |
chunksize
|
Maximum number of items per file |
required | |
maxchunks
|
Maximum number of chunks to write |
None
|
|
suffix
|
File suffix (default: '.json') |
None
|
Source code in graflo/util/chunker.py
flush_chunk()
¶
Write the current chunk to a file.
Source code in graflo/util/chunker.py
items_processed()
¶
Get the total number of items processed.
Returns:
| Name | Type | Description |
|---|---|---|
int |
Number of items processed |
push(item)
¶
Add an item to the current chunk.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
item
|
Item to add to the chunk |
required |
ChunkerDataFrame
¶
Bases: AbstractChunker
Chunker for Pandas DataFrames.
This class provides chunking functionality for Pandas DataFrames, converting each chunk into a list of dictionaries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
DataFrame to chunk |
required |
**kwargs
|
Additional arguments for AbstractChunker |
{}
|
Source code in graflo/util/chunker.py
ChunkerFactory
¶
Factory for creating appropriate chunkers.
This class provides a factory method for creating chunkers based on the type of resource and configuration provided.
Example
chunker = ChunkerFactory.create_chunker( ... resource="data.json", ... type=ChunkerType.JSON, ... batch_size=1000 ... )
Source code in graflo/util/chunker.py
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 | |
create_chunker(**kwargs)
classmethod
¶
Create an appropriate chunker for the given resource.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
Configuration for the chunker, including: resource: Path to file, list, or DataFrame type: Type of chunker to create (optional, will be guessed if None) batch_size: Size of each batch limit: Maximum number of items to process |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
AbstractChunker |
AbstractChunker
|
Appropriate chunker instance |
Raises:
| Type | Description |
|---|---|
ValueError
|
If resource type is not supported or chunker type cannot be guessed |
Source code in graflo/util/chunker.py
ChunkerType
¶
Bases: BaseEnum
Types of chunkers supported by the system.
JSON: For JSON files JSONL: For JSON Lines files TABLE: For CSV/TSV files PARQUET: For Parquet files (columnar storage format) TRIVIAL: For in-memory lists
Source code in graflo/util/chunker.py
FPSmart
¶
Smart file pointer for pattern-based file processing.
This class provides a file-like interface with pattern-based transformation of the data being read.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fp
|
File pointer to wrap |
required | |
pattern
|
Regular expression pattern to match |
required | |
substitute
|
String to substitute for matches |
''
|
|
count
|
Maximum number of substitutions (0 for unlimited) |
0
|
Source code in graflo/util/chunker.py
close()
¶
read(n)
¶
Read and transform data from the file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
n
|
Number of bytes to read |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
Transformed data |
transform(s)
¶
Transform the data using the pattern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
s
|
Data to transform |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
Transformed data |
FileChunker
¶
Bases: AbstractChunker
Base class for file-based chunking.
This class provides functionality for reading and chunking files, with support for different encodings and compression.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filename
|
Path to the file to process |
required | |
encoding
|
EncodingType
|
File encoding (default: UTF_8) |
UTF_8
|
mode
|
File mode ('t' for text, 'b' for binary) |
't'
|
|
**kwargs
|
Additional arguments for AbstractChunker |
{}
|
Attributes:
| Name | Type | Description |
|---|---|---|
filename |
Path
|
Path to the file |
file_obj |
TextIO | GzipFile | None
|
File object for reading |
encoding |
EncodingType | None
|
File encoding |
mode |
File mode |
Source code in graflo/util/chunker.py
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 | |
__next__()
¶
Get the next batch of lines.
Returns:
| Type | Description |
|---|---|
|
list[str]: Next batch of lines |
Raises:
| Type | Description |
|---|---|
StopIteration
|
When end of file is reached or limit is reached |
RuntimeError
|
If file is not opened (should not happen in normal flow) |
Source code in graflo/util/chunker.py
JsonChunker
¶
Bases: FileChunker
Chunker for JSON files.
This class extends FileChunker to handle JSON files using streaming JSON parsing for memory efficiency.
Source code in graflo/util/chunker.py
JsonlChunker
¶
Bases: FileChunker
Chunker for JSON Lines files.
This class extends FileChunker to handle JSON Lines format, parsing each line as a JSON object.
Source code in graflo/util/chunker.py
__next__()
¶
Get the next batch of JSON objects.
Returns:
| Type | Description |
|---|---|
|
list[dict]: Next batch of parsed JSON objects |
ParquetChunker
¶
Bases: FileChunker
Chunker for Parquet files.
This class extends FileChunker to handle Parquet files (columnar storage format). It reads Parquet files in batches using pandas, converting each batch to dictionaries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filename
|
Path
|
Path to the Parquet file |
required |
**kwargs
|
Additional arguments for FileChunker |
{}
|
Source code in graflo/util/chunker.py
TableChunker
¶
Bases: FileChunker
Chunker for CSV/TSV files.
This class extends FileChunker to handle tabular data, converting each row into a dictionary with column headers as keys.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
Arguments for FileChunker, including: sep: Field separator (default: ',') |
{}
|
Source code in graflo/util/chunker.py
__next__()
¶
Get the next batch of rows as dictionaries.
Returns:
| Type | Description |
|---|---|
|
list[dict]: Next batch of rows as dictionaries |
Source code in graflo/util/chunker.py
TrivialChunker
¶
Bases: AbstractChunker
Chunker for in-memory lists.
This class provides chunking functionality for lists of dictionaries that are already in memory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
array
|
list[dict]
|
List of dictionaries to chunk |
required |
**kwargs
|
Additional arguments for AbstractChunker |
{}
|
Source code in graflo/util/chunker.py
__next__()
¶
Get the next batch of items.
Returns:
| Type | Description |
|---|---|
|
list[dict]: Next batch of items |
Raises:
| Type | Description |
|---|---|
StopIteration
|
When no more items are available or limit is reached |
Source code in graflo/util/chunker.py
convert(source, target_root, chunk_size=10000, max_chunks=None, pattern=None, force_list=None, root_tag=None)
¶
Convert XML file to JSON chunks.
This function processes an XML file and converts it to a series of JSON files, with support for pattern-based transformation and chunking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
Path
|
Path to source XML file |
required |
target_root
|
str
|
Root path for output files |
required |
chunk_size
|
int
|
Number of items per output file (default: 10000) |
10000
|
max_chunks
|
Maximum number of chunks to create (default: None) |
None
|
|
pattern
|
str | None
|
Regular expression pattern for transformation |
None
|
force_list
|
List of tags that should always be lists |
None
|
|
root_tag
|
Root tag to start parsing from |
None
|
Example
convert( ... source="data.xml", ... target_root="output", ... chunk_size=1000, ... pattern=r'xmlns="[^"]*"', ... root_tag="PubmedArticle" ... )
Source code in graflo/util/chunker.py
gunzip_file(fname_in, fname_out)
¶
Decompress a gzipped file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fname_in
|
Path to input gzipped file |
required | |
fname_out
|
Path to output decompressed file |
required |
Source code in graflo/util/chunker.py
nullcontext(enter_result=None)
¶
Context manager that does nothing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
enter_result
|
Value to return when entering the context |
None
|
Yields:
| Type | Description |
|---|---|
|
The enter_result value |
parse_simple(fp, good_cf, force_list=None, root_tag=None)
¶
Parse XML file with simple structure.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fp
|
File pointer to parse |
required | |
good_cf
|
Function to check if an element is valid |
required | |
force_list
|
List of tags that should always be lists |
None
|
|
root_tag
|
Root tag to start parsing from |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
dict |
Parsed XML data |