graphcast.util.chunker
¶
Data chunking utilities for efficient file processing.
This module provides utilities for processing large files by breaking them into manageable chunks. It supports various file formats (JSON, JSONL, CSV) and provides both file-based and in-memory chunking capabilities.
Key Components
- AbstractChunker: Base class for chunking implementations
- FileChunker: File-based chunking with encoding support
- TableChunker: CSV/TSV file chunking
- JsonlChunker: JSON Lines file chunking
- JsonChunker: JSON file chunking
- TrivialChunker: In-memory list chunking
- ChunkerDataFrame: Pandas DataFrame chunking
- ChunkerFactory: Factory for creating appropriate chunkers
Example
chunker = ChunkerFactory.create_chunker( ... resource="data.json", ... type=ChunkerType.JSON, ... batch_size=1000 ... ) for batch in chunker: ... process_batch(batch)
AbstractChunker
¶
Bases: ABC
Abstract base class for chunking implementations.
This class defines the interface for all chunkers, providing common functionality for batch processing and iteration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch_size
|
Number of items per batch (default: 10) |
10
|
|
limit
|
Maximum number of items to process (default: None) |
None
|
Attributes:
Name | Type | Description |
---|---|---|
units_processed |
Number of items processed |
|
batch_size |
Size of each batch |
|
limit |
int | None
|
Maximum number of items to process |
cnt |
Current count of processed items |
|
iteration_tried |
Whether iteration has been attempted |
Source code in graphcast/util/chunker.py
__iter__()
¶
Initialize iteration if not already done.
Returns:
Name | Type | Description |
---|---|---|
self |
Iterator instance |
__next__()
¶
Get the next batch of items.
Returns:
Name | Type | Description |
---|---|---|
list |
Next batch of items |
Raises:
Type | Description |
---|---|
StopIteration
|
When no more items are available or limit is reached |
Source code in graphcast/util/chunker.py
ChunkFlusherMono
¶
Monolithic chunk flusher for writing data to files.
This class provides functionality for writing chunks of data to files, with support for file naming and size limits.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
target_prefix
|
Prefix for output files |
required | |
chunksize
|
Maximum number of items per file |
required | |
maxchunks
|
Maximum number of chunks to write |
None
|
|
suffix
|
File suffix (default: '.json') |
None
|
Source code in graphcast/util/chunker.py
flush_chunk()
¶
Write the current chunk to a file.
Source code in graphcast/util/chunker.py
items_processed()
¶
Get the total number of items processed.
Returns:
Name | Type | Description |
---|---|---|
int |
Number of items processed |
push(item)
¶
Add an item to the current chunk.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item
|
Item to add to the chunk |
required |
ChunkerDataFrame
¶
Bases: AbstractChunker
Chunker for Pandas DataFrames.
This class provides chunking functionality for Pandas DataFrames, converting each chunk into a list of dictionaries.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
DataFrame
|
DataFrame to chunk |
required |
**kwargs
|
Additional arguments for AbstractChunker |
{}
|
Source code in graphcast/util/chunker.py
ChunkerFactory
¶
Factory for creating appropriate chunkers.
This class provides a factory method for creating chunkers based on the type of resource and configuration provided.
Example
chunker = ChunkerFactory.create_chunker( ... resource="data.json", ... type=ChunkerType.JSON, ... batch_size=1000 ... )
Source code in graphcast/util/chunker.py
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 |
|
create_chunker(**kwargs)
classmethod
¶
Create an appropriate chunker for the given resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs
|
Configuration for the chunker, including: resource: Path to file, list, or DataFrame type: Type of chunker to create (optional, will be guessed if None) batch_size: Size of each batch limit: Maximum number of items to process |
{}
|
Returns:
Name | Type | Description |
---|---|---|
AbstractChunker |
AbstractChunker
|
Appropriate chunker instance |
Raises:
Type | Description |
---|---|
ValueError
|
If resource type is not supported or chunker type cannot be guessed |
Source code in graphcast/util/chunker.py
ChunkerType
¶
Bases: BaseEnum
Types of chunkers supported by the system.
JSON: For JSON files JSONL: For JSON Lines files TABLE: For CSV/TSV files TRIVIAL: For in-memory lists
Source code in graphcast/util/chunker.py
FPSmart
¶
Smart file pointer for pattern-based file processing.
This class provides a file-like interface with pattern-based transformation of the data being read.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fp
|
File pointer to wrap |
required | |
pattern
|
Regular expression pattern to match |
required | |
substitute
|
String to substitute for matches |
''
|
|
count
|
Maximum number of substitutions (0 for unlimited) |
0
|
Source code in graphcast/util/chunker.py
close()
¶
read(n)
¶
Read and transform data from the file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n
|
Number of bytes to read |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
Transformed data |
transform(s)
¶
Transform the data using the pattern.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
s
|
Data to transform |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
Transformed data |
FileChunker
¶
Bases: AbstractChunker
Base class for file-based chunking.
This class provides functionality for reading and chunking files, with support for different encodings and compression.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename
|
Path to the file to process |
required | |
encoding
|
EncodingType
|
File encoding (default: UTF_8) |
UTF_8
|
mode
|
File mode ('t' for text, 'b' for binary) |
't'
|
|
**kwargs
|
Additional arguments for AbstractChunker |
{}
|
Attributes:
Name | Type | Description |
---|---|---|
filename |
Path
|
Path to the file |
file_obj |
TextIO | GzipFile | None
|
File object for reading |
encoding |
EncodingType | None
|
File encoding |
mode |
File mode |
Source code in graphcast/util/chunker.py
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
|
__next__()
¶
Get the next batch of lines.
Returns:
Type | Description |
---|---|
list[str]: Next batch of lines |
Raises:
Type | Description |
---|---|
StopIteration
|
When end of file is reached or limit is reached |
Source code in graphcast/util/chunker.py
JsonChunker
¶
Bases: FileChunker
Chunker for JSON files.
This class extends FileChunker to handle JSON files using streaming JSON parsing for memory efficiency.
Source code in graphcast/util/chunker.py
JsonlChunker
¶
Bases: FileChunker
Chunker for JSON Lines files.
This class extends FileChunker to handle JSON Lines format, parsing each line as a JSON object.
Source code in graphcast/util/chunker.py
__next__()
¶
Get the next batch of JSON objects.
Returns:
Type | Description |
---|---|
list[dict]: Next batch of parsed JSON objects |
TableChunker
¶
Bases: FileChunker
Chunker for CSV/TSV files.
This class extends FileChunker to handle tabular data, converting each row into a dictionary with column headers as keys.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs
|
Arguments for FileChunker, including: sep: Field separator (default: ',') |
{}
|
Source code in graphcast/util/chunker.py
__next__()
¶
Get the next batch of rows as dictionaries.
Returns:
Type | Description |
---|---|
list[dict]: Next batch of rows as dictionaries |
Source code in graphcast/util/chunker.py
TrivialChunker
¶
Bases: AbstractChunker
Chunker for in-memory lists.
This class provides chunking functionality for lists of dictionaries that are already in memory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
array
|
list[dict]
|
List of dictionaries to chunk |
required |
**kwargs
|
Additional arguments for AbstractChunker |
{}
|
Source code in graphcast/util/chunker.py
__next__()
¶
Get the next batch of items.
Returns:
Type | Description |
---|---|
list[dict]: Next batch of items |
Raises:
Type | Description |
---|---|
StopIteration
|
When no more items are available or limit is reached |
Source code in graphcast/util/chunker.py
convert(source, target_root, chunk_size=10000, max_chunks=None, pattern=None, force_list=None, root_tag=None)
¶
Convert XML file to JSON chunks.
This function processes an XML file and converts it to a series of JSON files, with support for pattern-based transformation and chunking.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source
|
Path
|
Path to source XML file |
required |
target_root
|
str
|
Root path for output files |
required |
chunk_size
|
int
|
Number of items per output file (default: 10000) |
10000
|
max_chunks
|
Maximum number of chunks to create (default: None) |
None
|
|
pattern
|
str | None
|
Regular expression pattern for transformation |
None
|
force_list
|
List of tags that should always be lists |
None
|
|
root_tag
|
Root tag to start parsing from |
None
|
Example
convert( ... source="data.xml", ... target_root="output", ... chunk_size=1000, ... pattern=r'xmlns="[^"]*"', ... root_tag="PubmedArticle" ... )
Source code in graphcast/util/chunker.py
gunzip_file(fname_in, fname_out)
¶
Decompress a gzipped file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fname_in
|
Path to input gzipped file |
required | |
fname_out
|
Path to output decompressed file |
required |
Source code in graphcast/util/chunker.py
nullcontext(enter_result=None)
¶
Context manager that does nothing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
enter_result
|
Value to return when entering the context |
None
|
Yields:
Type | Description |
---|---|
The enter_result value |
parse_simple(fp, good_cf, force_list=None, root_tag=None)
¶
Parse XML file with simple structure.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fp
|
File pointer to parse |
required | |
good_cf
|
Function to check if an element is valid |
required | |
force_list
|
List of tags that should always be lists |
None
|
|
root_tag
|
Root tag to start parsing from |
None
|
Returns:
Name | Type | Description |
---|---|---|
dict |
Parsed XML data |