Skip to content

graflo.hq.bulk_session

Backend-agnostic coordinator for optional native bulk ingestion.

The coordinator keeps begin/finalize lifecycle out of :class:Caster and delegates feature support decisions to database connections.

BulkSessionCoordinator

Coordinate a single optional native bulk session for an ingest run.

Source code in graflo/hq/bulk_session.py
class BulkSessionCoordinator:
    """Coordinate a single optional native bulk session for an ingest run."""

    def __init__(self, schema: Schema):
        self._schema = schema
        self._session_id: str | None = None
        self._begin_lock = asyncio.Lock()

    async def ensure_session(self, conn_conf: DBConfig) -> str | None:
        """Return an active bulk session id, or ``None`` when unsupported/disabled."""
        async with self._begin_lock:
            if self._session_id is not None:
                return self._session_id

            def _begin() -> str | None:
                from graflo.db.manager import ConnectionManager

                with ConnectionManager(connection_config=conn_conf) as db:
                    bulk_cfg = getattr(conn_conf, "bulk_load", None)
                    if bulk_cfg is None or not getattr(bulk_cfg, "enabled", False):
                        return None
                    try:
                        return db.bulk_load_begin(self._schema, bulk_cfg)
                    except UnsupportedBulkLoad:
                        return None

            self._session_id = await asyncio.to_thread(_begin)
            return self._session_id

    async def finalize(
        self,
        conn_conf: DBConfig,
        *,
        bindings: Bindings | None,
        connection_provider: ConnectionProvider | None,
    ) -> None:
        """Finalize the active session if one exists."""
        session_id = self._session_id
        self._session_id = None
        if session_id is None:
            return

        def _finalize() -> None:
            from graflo.db.manager import ConnectionManager

            with ConnectionManager(connection_config=conn_conf) as db:
                db.bulk_load_finalize(
                    session_id,
                    self._schema,
                    bindings=bindings,
                    connection_provider=connection_provider,
                )

        await asyncio.to_thread(_finalize)

ensure_session(conn_conf) async

Return an active bulk session id, or None when unsupported/disabled.

Source code in graflo/hq/bulk_session.py
async def ensure_session(self, conn_conf: DBConfig) -> str | None:
    """Return an active bulk session id, or ``None`` when unsupported/disabled."""
    async with self._begin_lock:
        if self._session_id is not None:
            return self._session_id

        def _begin() -> str | None:
            from graflo.db.manager import ConnectionManager

            with ConnectionManager(connection_config=conn_conf) as db:
                bulk_cfg = getattr(conn_conf, "bulk_load", None)
                if bulk_cfg is None or not getattr(bulk_cfg, "enabled", False):
                    return None
                try:
                    return db.bulk_load_begin(self._schema, bulk_cfg)
                except UnsupportedBulkLoad:
                    return None

        self._session_id = await asyncio.to_thread(_begin)
        return self._session_id

finalize(conn_conf, *, bindings, connection_provider) async

Finalize the active session if one exists.

Source code in graflo/hq/bulk_session.py
async def finalize(
    self,
    conn_conf: DBConfig,
    *,
    bindings: Bindings | None,
    connection_provider: ConnectionProvider | None,
) -> None:
    """Finalize the active session if one exists."""
    session_id = self._session_id
    self._session_id = None
    if session_id is None:
        return

    def _finalize() -> None:
        from graflo.db.manager import ConnectionManager

        with ConnectionManager(connection_config=conn_conf) as db:
            db.bulk_load_finalize(
                session_id,
                self._schema,
                bindings=bindings,
                connection_provider=connection_provider,
            )

    await asyncio.to_thread(_finalize)