Bases: AbstractDataSource
Data source for in-memory data structures.
This class provides a data source for Python objects that are already
in memory, including lists of dictionaries, lists of lists, and Pandas DataFrames.
Attributes:
| Name |
Type |
Description |
data |
list[dict] | list[list] | DataFrame
|
Data to process (list[dict], list[list], or pd.DataFrame)
|
columns |
list[str] | None
|
Optional column names for list[list] data
|
Source code in graflo/data_source/memory.py
| class InMemoryDataSource(AbstractDataSource):
"""Data source for in-memory data structures.
This class provides a data source for Python objects that are already
in memory, including lists of dictionaries, lists of lists, and Pandas DataFrames.
Attributes:
data: Data to process (list[dict], list[list], or pd.DataFrame)
columns: Optional column names for list[list] data
"""
model_config = {"arbitrary_types_allowed": True}
data: list[dict] | list[list] | pd.DataFrame
columns: list[str] | None = None
source_type: DataSourceType = DataSourceType.IN_MEMORY
def iter_batches(
self, batch_size: int = 1000, limit: int | None = None
) -> Iterator[list[dict]]:
"""Iterate over in-memory data in batches.
Args:
batch_size: Number of items per batch
limit: Maximum number of items to retrieve
Yields:
list[dict]: Batches of documents as dictionaries
"""
# Normalize data: convert list[list] to list[dict] if needed
data = self.data
if isinstance(data, list) and len(data) > 0 and isinstance(data[0], list):
# list[list] - convert to list[dict] using columns
if self.columns is None:
raise ValueError(
"columns parameter is required when data is list[list]"
)
# Type narrowing: we've confirmed data[0] is a list, so data is list[list]
# Create a properly typed list for iteration
data_list: list[list] = []
for item in data:
if isinstance(item, list):
data_list.append(item)
data = [{k: v for k, v in zip(self.columns, item)} for item in data_list]
# Create chunker using factory (only pass columns if it's a DataFrame)
chunker_kwargs = {
"resource": data,
"batch_size": batch_size,
"limit": limit,
}
# Note: columns is not passed to chunker - we handle list[list] conversion above
# DataFrame chunker doesn't need columns either
chunker = ChunkerFactory.create_chunker(**chunker_kwargs)
# Yield batches
for batch in chunker:
yield batch
|
iter_batches(batch_size=1000, limit=None)
Iterate over in-memory data in batches.
Parameters:
| Name |
Type |
Description |
Default |
batch_size
|
int
|
Number of items per batch
|
1000
|
limit
|
int | None
|
Maximum number of items to retrieve
|
None
|
Yields:
| Type |
Description |
list[dict]
|
list[dict]: Batches of documents as dictionaries
|
Source code in graflo/data_source/memory.py
| def iter_batches(
self, batch_size: int = 1000, limit: int | None = None
) -> Iterator[list[dict]]:
"""Iterate over in-memory data in batches.
Args:
batch_size: Number of items per batch
limit: Maximum number of items to retrieve
Yields:
list[dict]: Batches of documents as dictionaries
"""
# Normalize data: convert list[list] to list[dict] if needed
data = self.data
if isinstance(data, list) and len(data) > 0 and isinstance(data[0], list):
# list[list] - convert to list[dict] using columns
if self.columns is None:
raise ValueError(
"columns parameter is required when data is list[list]"
)
# Type narrowing: we've confirmed data[0] is a list, so data is list[list]
# Create a properly typed list for iteration
data_list: list[list] = []
for item in data:
if isinstance(item, list):
data_list.append(item)
data = [{k: v for k, v in zip(self.columns, item)} for item in data_list]
# Create chunker using factory (only pass columns if it's a DataFrame)
chunker_kwargs = {
"resource": data,
"batch_size": batch_size,
"limit": limit,
}
# Note: columns is not passed to chunker - we handle list[list] conversion above
# DataFrame chunker doesn't need columns either
chunker = ChunkerFactory.create_chunker(**chunker_kwargs)
# Yield batches
for batch in chunker:
yield batch
|