Backend-agnostic coordinator for optional native bulk ingestion.
The coordinator keeps begin/finalize lifecycle out of :class:Caster and
delegates feature support decisions to database connections.
BulkSessionCoordinator
Coordinate a single optional native bulk session for an ingest run.
Source code in graflo/hq/bulk_session.py
| class BulkSessionCoordinator:
"""Coordinate a single optional native bulk session for an ingest run."""
def __init__(self, schema: Schema):
self._schema = schema
self._session_id: str | None = None
self._begin_lock = asyncio.Lock()
async def ensure_session(self, conn_conf: DBConfig) -> str | None:
"""Return an active bulk session id, or ``None`` when unsupported/disabled."""
async with self._begin_lock:
if self._session_id is not None:
return self._session_id
def _begin() -> str | None:
from graflo.db.manager import ConnectionManager
with ConnectionManager(connection_config=conn_conf) as db:
bulk_cfg = getattr(conn_conf, "bulk_load", None)
if bulk_cfg is None or not getattr(bulk_cfg, "enabled", False):
return None
try:
return db.bulk_load_begin(self._schema, bulk_cfg)
except UnsupportedBulkLoad:
return None
self._session_id = await asyncio.to_thread(_begin)
return self._session_id
async def finalize(
self,
conn_conf: DBConfig,
*,
bindings: Bindings | None,
connection_provider: ConnectionProvider | None,
) -> None:
"""Finalize the active session if one exists."""
session_id = self._session_id
self._session_id = None
if session_id is None:
return
def _finalize() -> None:
from graflo.db.manager import ConnectionManager
with ConnectionManager(connection_config=conn_conf) as db:
db.bulk_load_finalize(
session_id,
self._schema,
bindings=bindings,
connection_provider=connection_provider,
)
await asyncio.to_thread(_finalize)
|
ensure_session(conn_conf)
async
Return an active bulk session id, or None when unsupported/disabled.
Source code in graflo/hq/bulk_session.py
| async def ensure_session(self, conn_conf: DBConfig) -> str | None:
"""Return an active bulk session id, or ``None`` when unsupported/disabled."""
async with self._begin_lock:
if self._session_id is not None:
return self._session_id
def _begin() -> str | None:
from graflo.db.manager import ConnectionManager
with ConnectionManager(connection_config=conn_conf) as db:
bulk_cfg = getattr(conn_conf, "bulk_load", None)
if bulk_cfg is None or not getattr(bulk_cfg, "enabled", False):
return None
try:
return db.bulk_load_begin(self._schema, bulk_cfg)
except UnsupportedBulkLoad:
return None
self._session_id = await asyncio.to_thread(_begin)
return self._session_id
|
finalize(conn_conf, *, bindings, connection_provider)
async
Finalize the active session if one exists.
Source code in graflo/hq/bulk_session.py
| async def finalize(
self,
conn_conf: DBConfig,
*,
bindings: Bindings | None,
connection_provider: ConnectionProvider | None,
) -> None:
"""Finalize the active session if one exists."""
session_id = self._session_id
self._session_id = None
if session_id is None:
return
def _finalize() -> None:
from graflo.db.manager import ConnectionManager
with ConnectionManager(connection_config=conn_conf) as db:
db.bulk_load_finalize(
session_id,
self._schema,
bindings=bindings,
connection_provider=connection_provider,
)
await asyncio.to_thread(_finalize)
|