Skip to content

graflo.architecture.backend

On-disk GraFlo backend I/O primitives.

CollectionEntry

Bases: ConfigBaseModel

Inventory for one vertex type or edge collection on disk.

Source code in graflo/architecture/backend/index.py
class CollectionEntry(ConfigBaseModel):
    """Inventory for one vertex type or edge collection on disk."""

    chunks: list[str] = Field(
        default_factory=list,
        description="Relative paths to gzip JSONL chunk files.",
    )
    record_count: int = Field(default=0, ge=0)

GraFloBackendReader

Read schema and chunked graph data from a GraFlo backend directory.

Source code in graflo/architecture/backend/reader.py
class GraFloBackendReader:
    """Read schema and chunked graph data from a GraFlo backend directory."""

    def __init__(self, input_dir: Path) -> None:
        self._layout = GraFloLayout(input_dir)
        self._index: GraFloIndex | None = None

    def read_index(self) -> GraFloIndex:
        if self._index is None:
            with open(self._layout.index_path, encoding="utf-8") as fin:
                payload = json.load(fin)
            self._index = GraFloIndex.model_validate(payload)
        return self._index

    def read_schema(self) -> Schema:
        return Schema.from_yaml(str(self._layout.schema_path))

    def iter_vertex_batches(
        self,
        vertex_type: str,
        *,
        batch_size: int = 1000,
        limit: int | None = None,
    ) -> Iterator[list[dict[str, Any]]]:
        index = self.read_index()
        entry = index.vertices.get(vertex_type)
        if entry is None:
            return
        yielded = 0
        batch: list[dict[str, Any]] = []
        for chunk in entry.chunks:
            for record in self._iter_chunk_records(chunk):
                batch.append(record)
                if len(batch) >= batch_size:
                    yield batch
                    yielded += len(batch)
                    batch = []
                    if limit is not None and yielded >= limit:
                        return
            if limit is not None and yielded >= limit:
                return
        if batch and (limit is None or yielded < limit):
            if limit is not None:
                batch = batch[: limit - yielded]
            if batch:
                yield batch

    def iter_edge_batches(
        self,
        edge_key: tuple[str, str, str | None],
        *,
        batch_size: int = 1000,
        limit: int | None = None,
    ) -> Iterator[list[list[Any]]]:
        index = self.read_index()
        index_name = GraFloLayout.edge_key_to_index_name(edge_key)
        entry = index.edges.get(index_name)
        if entry is None:
            return
        yielded = 0
        batch: list[list[Any]] = []
        for chunk in entry.chunks:
            for record in self._iter_chunk_records(chunk):
                batch.append(record)
                if len(batch) >= batch_size:
                    yield batch
                    yielded += len(batch)
                    batch = []
                    if limit is not None and yielded >= limit:
                        return
            if limit is not None and yielded >= limit:
                return
        if batch and (limit is None or yielded < limit):
            if limit is not None:
                batch = batch[: limit - yielded]
            if batch:
                yield batch

    def load_graph_container(self) -> GraphContainer:
        schema = self.read_schema()
        vertices: dict[str, list] = {}
        edges: dict[tuple[str, str, str | None], list] = {}

        for vertex in schema.core_schema.vertex_config.vertices:
            docs: list[dict[str, Any]] = []
            for batch in self.iter_vertex_batches(vertex.name):
                docs.extend(batch)
            if docs:
                vertices[vertex.name] = docs

        for edge in schema.core_schema.edge_config.values():
            edge_docs: list[list[Any]] = []
            edge_key = edge.edge_id
            for batch in self.iter_edge_batches(edge_key):
                edge_docs.extend(batch)
            if edge_docs:
                edges[edge_key] = edge_docs

        return GraphContainer(vertices=vertices, edges=edges, linear=[])

    def _iter_chunk_records(self, relative_chunk: str) -> Iterator[Any]:
        chunk_path = self._layout.root / relative_chunk
        with gzip.open(chunk_path, "rt", encoding="utf-8") as fin:
            for line in fin:
                line = line.strip()
                if not line:
                    continue
                yield json.loads(line)

GraFloBackendWriter

Write schema and chunked graph data to a GraFlo backend directory.

Source code in graflo/architecture/backend/writer.py
class GraFloBackendWriter:
    """Write schema and chunked graph data to a GraFlo backend directory."""

    def __init__(
        self,
        output_dir: Path,
        *,
        chunk_size: int = 50_000,
        resume: bool = False,
    ) -> None:
        self._layout = GraFloLayout(output_dir)
        self._chunk_size = chunk_size
        self._resume = resume
        self._schema: Schema | None = None
        self._index: GraFloIndex | None = None
        self._vertex_writers: dict[str, _CollectionWriter] = {}
        self._edge_writers: dict[tuple[str, str, str | None], _CollectionWriter] = {}
        if resume and self._layout.index_path.exists():
            with open(self._layout.index_path, encoding="utf-8") as fin:
                payload = json.load(fin)
            self._index = GraFloIndex.model_validate(payload)

    def __enter__(self) -> GraFloBackendWriter:
        self._layout.ensure_dirs()
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        tb: TracebackType | None,
    ) -> None:
        if exc is None:
            self.flush_index()

    @property
    def layout(self) -> GraFloLayout:
        return self._layout

    def reset_data(self) -> None:
        """Remove data chunks and index while keeping schema if present."""
        for path in (self._layout.vertices_dir, self._layout.edges_dir):
            if path.exists():
                shutil.rmtree(path)
        if self._layout.index_path.exists():
            self._layout.index_path.unlink()
        self._index = None
        self._vertex_writers = {}
        self._edge_writers = {}
        self._layout.ensure_dirs()

    def write_schema(self, schema: Schema) -> None:
        self._schema = schema
        self._layout.ensure_dirs()
        import yaml

        with open(self._layout.schema_path, "w", encoding="utf-8") as fout:
            yaml.safe_dump(
                schema.model_dump(mode="json", by_alias=True, exclude_none=True),
                fout,
                default_flow_style=False,
                sort_keys=False,
            )

    def write_vertex_batch(self, vertex_type: str, docs: list[dict[str, Any]]) -> None:
        if not docs:
            return
        writer = self._vertex_writers.setdefault(
            vertex_type,
            self._make_vertex_writer(vertex_type),
        )
        writer.push_many(docs)

    def write_edge_batch(
        self,
        edge_key: tuple[str, str, str | None],
        docs: list[list[Any]],
    ) -> None:
        if not docs:
            return
        writer = self._edge_writers.setdefault(
            edge_key,
            self._make_edge_writer(edge_key),
        )
        writer.push_many(docs)

    def flush_index(self) -> GraFloIndex:
        if self._schema is None and self._layout.schema_path.exists():
            self._schema = Schema.from_yaml(str(self._layout.schema_path))
        if self._schema is None:
            raise ValueError("Cannot flush GraFlo backend index without schema")

        for writer in self._vertex_writers.values():
            writer.flush()
        for writer in self._edge_writers.values():
            writer.flush()

        vertices = self._collect_vertex_entries()
        edges = self._collect_edge_entries()
        index = GraFloIndex(
            graflo_version=_graflo_package_version(),
            schema_hash=backend_schema_hash(self._schema),
            vertices=vertices,
            edges=edges,
        )
        with open(self._layout.index_path, "w", encoding="utf-8") as fout:
            fout.write(
                json.dumps(
                    index.model_dump(mode="json", by_alias=True, exclude_none=True),
                    indent=2,
                    sort_keys=True,
                )
                + "\n"
            )
        self._index = index
        return index

    def _make_vertex_writer(self, vertex_type: str) -> _CollectionWriter:
        existing = self._index.vertices.get(vertex_type) if self._index else None
        return _CollectionWriter(
            self._layout,
            chunk_size=self._chunk_size,
            vertex_type=vertex_type,
            existing=existing,
        )

    def _make_edge_writer(
        self, edge_key: tuple[str, str, str | None]
    ) -> _CollectionWriter:
        index_name = GraFloLayout.edge_key_to_index_name(edge_key)
        existing = self._index.edges.get(index_name) if self._index else None
        return _CollectionWriter(
            self._layout,
            chunk_size=self._chunk_size,
            edge_key=edge_key,
            existing=existing,
        )

    def _collect_vertex_entries(self) -> dict[str, CollectionEntry]:
        entries = dict(self._index.vertices) if self._index is not None else {}
        for name, writer in self._vertex_writers.items():
            entries[name] = writer.snapshot()
        return entries

    def _collect_edge_entries(self) -> dict[str, CollectionEntry]:
        entries = dict(self._index.edges) if self._index is not None else {}
        for edge_key, writer in self._edge_writers.items():
            index_name = GraFloLayout.edge_key_to_index_name(edge_key)
            entries[index_name] = writer.snapshot()
        return entries

reset_data()

Remove data chunks and index while keeping schema if present.

Source code in graflo/architecture/backend/writer.py
def reset_data(self) -> None:
    """Remove data chunks and index while keeping schema if present."""
    for path in (self._layout.vertices_dir, self._layout.edges_dir):
        if path.exists():
            shutil.rmtree(path)
    if self._layout.index_path.exists():
        self._layout.index_path.unlink()
    self._index = None
    self._vertex_writers = {}
    self._edge_writers = {}
    self._layout.ensure_dirs()

GraFloIndex

Bases: ConfigBaseModel

Self-describing manifest for a GraFlo file backend directory.

Source code in graflo/architecture/backend/index.py
class GraFloIndex(ConfigBaseModel):
    """Self-describing manifest for a GraFlo file backend directory."""

    graflo_version: str = Field(
        ..., description="GraFlo package version at export time."
    )
    created_at: str = Field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat(),
        description="UTC timestamp when the export was finalized.",
    )
    schema_hash: str = Field(
        ..., description="Stable hash of the stored schema document."
    )
    vertices: dict[str, CollectionEntry] = Field(default_factory=dict)
    edges: dict[str, CollectionEntry] = Field(default_factory=dict)

GraFloLayout

Deterministic path builder for a GraFlo backend root directory.

Source code in graflo/architecture/backend/layout.py
class GraFloLayout:
    """Deterministic path builder for a GraFlo backend root directory."""

    def __init__(self, root: Path) -> None:
        self.root = root.resolve()

    @property
    def index_path(self) -> Path:
        return self.root / INDEX_FILENAME

    @property
    def schema_path(self) -> Path:
        return self.root / SCHEMA_FILENAME

    @property
    def vertices_dir(self) -> Path:
        return self.root / VERTICES_DIR

    @property
    def edges_dir(self) -> Path:
        return self.root / EDGES_DIR

    def ensure_dirs(self) -> None:
        self.vertices_dir.mkdir(parents=True, exist_ok=True)
        self.edges_dir.mkdir(parents=True, exist_ok=True)

    def vertex_chunk_path(self, vertex_type: str, chunk_index: int) -> Path:
        stem = self._vertex_stem(vertex_type)
        filename = f"{stem}.{chunk_index:03d}{CHUNK_SUFFIX}"
        return self.vertices_dir / filename

    def edge_chunk_path(
        self, edge_key: tuple[str, str, str | None], chunk_index: int
    ) -> Path:
        stem = self._edge_stem(edge_key)
        filename = f"{stem}.{chunk_index:03d}{CHUNK_SUFFIX}"
        return self.edges_dir / filename

    def relative_vertex_chunk(self, vertex_type: str, chunk_index: int) -> str:
        return (
            self.vertex_chunk_path(vertex_type, chunk_index)
            .relative_to(self.root)
            .as_posix()
        )

    def relative_edge_chunk(
        self, edge_key: tuple[str, str, str | None], chunk_index: int
    ) -> str:
        return (
            self.edge_chunk_path(edge_key, chunk_index)
            .relative_to(self.root)
            .as_posix()
        )

    @staticmethod
    def edge_key_to_index_name(edge_key: tuple[str, str, str | None]) -> str:
        """Human-readable edge key when safe, otherwise JSON-array encoding."""
        source, target, relation = edge_key
        if (
            _SAFE_NAME_RE.fullmatch(source)
            and _SAFE_NAME_RE.fullmatch(target)
            and (relation is None or _SAFE_NAME_RE.fullmatch(relation))
        ):
            rel = relation or ""
            return f"{source}{_EDGE_DELIM}{rel}{_EDGE_DELIM}{target}"
        return serialize_edge_key(edge_key)

    @staticmethod
    def index_name_to_edge_key(name: str) -> tuple[str, str, str | None]:
        if name.startswith("["):
            return deserialize_edge_key(name)
        parts = name.split(_EDGE_DELIM)
        if len(parts) != 3:
            raise ValueError(f"Invalid edge index name: {name!r}")
        source, relation, target = parts
        return (source, target, relation or None)

    @staticmethod
    def _vertex_stem(vertex_type: str) -> str:
        if _SAFE_NAME_RE.fullmatch(vertex_type):
            return vertex_type
        return GraFloLayout._encode_stem(vertex_type)

    @staticmethod
    def _edge_stem(edge_key: tuple[str, str, str | None]) -> str:
        index_name = GraFloLayout.edge_key_to_index_name(edge_key)
        if _SAFE_NAME_RE.fullmatch(index_name.replace(_EDGE_DELIM, "_")):
            return index_name
        return GraFloLayout._encode_stem(serialize_edge_key(edge_key))

    @staticmethod
    def _encode_stem(value: str) -> str:
        encoded = base64.urlsafe_b64encode(value.encode("utf-8")).decode("ascii")
        return encoded.rstrip("=")

edge_key_to_index_name(edge_key) staticmethod

Human-readable edge key when safe, otherwise JSON-array encoding.

Source code in graflo/architecture/backend/layout.py
@staticmethod
def edge_key_to_index_name(edge_key: tuple[str, str, str | None]) -> str:
    """Human-readable edge key when safe, otherwise JSON-array encoding."""
    source, target, relation = edge_key
    if (
        _SAFE_NAME_RE.fullmatch(source)
        and _SAFE_NAME_RE.fullmatch(target)
        and (relation is None or _SAFE_NAME_RE.fullmatch(relation))
    ):
        rel = relation or ""
        return f"{source}{_EDGE_DELIM}{rel}{_EDGE_DELIM}{target}"
    return serialize_edge_key(edge_key)