Skip to content

graflo.db.nebula.adapter

Version-specific client adapters for NebulaGraph.

Provides a uniform execute / close / use_space interface over nebula3-python (v3.x, Thrift) and nebula5-python (v5.x, gRPC).

NebulaClientAdapter

Bases: ABC

Abstract adapter that hides driver differences.

Source code in graflo/db/nebula/adapter.py
class NebulaClientAdapter(abc.ABC):
    """Abstract adapter that hides driver differences."""

    @abc.abstractmethod
    def connect(self, config: NebulaConfig) -> None: ...

    @abc.abstractmethod
    def execute(self, statement: str) -> NebulaResultSet: ...

    @abc.abstractmethod
    def close(self) -> None: ...

    @abc.abstractmethod
    def use_space(self, space_name: str) -> None: ...

NebulaResultSet

Thin wrapper around driver-specific result objects.

Normalises access so that NebulaConnection never has to know which driver is in use.

Source code in graflo/db/nebula/adapter.py
class NebulaResultSet:
    """Thin wrapper around driver-specific result objects.

    Normalises access so that ``NebulaConnection`` never has to know which
    driver is in use.
    """

    def __init__(self, raw: Any, *, is_v3: bool = True):
        self._raw = raw
        self._is_v3 = is_v3

    @property
    def raw(self) -> Any:
        return self._raw

    def is_succeeded(self) -> bool:
        if self._is_v3:
            return self._raw.is_succeeded()
        return True

    def error_msg(self) -> str:
        if self._is_v3:
            return self._raw.error_msg()
        return ""

    def column_values(self, col: str) -> list[Any]:
        if self._is_v3:
            return [v.cast() for v in self._raw.column_values(col)]
        return self._raw.as_primitive_by_column().get(col, [])

    def rows_as_dicts(self) -> list[dict[str, Any]]:
        """Return all rows as list of primitive-type dicts."""
        if self._is_v3:
            return self._raw.as_primitive()
        return self._raw.as_primitive()

rows_as_dicts()

Return all rows as list of primitive-type dicts.

Source code in graflo/db/nebula/adapter.py
def rows_as_dicts(self) -> list[dict[str, Any]]:
    """Return all rows as list of primitive-type dicts."""
    if self._is_v3:
        return self._raw.as_primitive()
    return self._raw.as_primitive()

NebulaV3Adapter

Bases: NebulaClientAdapter

Adapter for nebula3-python (NebulaGraph 3.x, Thrift).

Source code in graflo/db/nebula/adapter.py
class NebulaV3Adapter(NebulaClientAdapter):
    """Adapter for ``nebula3-python`` (NebulaGraph 3.x, Thrift)."""

    def __init__(self) -> None:
        self._pool: Any = None
        self._session: Any = None

    def connect(self, config: NebulaConfig) -> None:
        from nebula3.Config import Config as N3Config
        from nebula3.gclient.net import ConnectionPool

        hostname = config.hostname or "localhost"
        port = config.port or 9669
        username = config.username or "root"
        password = config.password or "nebula"

        n3_cfg = N3Config()
        n3_cfg.max_connection_pool_size = 10
        n3_cfg.timeout = int(config.request_timeout * 1000)

        self._pool = ConnectionPool()
        ok = self._pool.init([(hostname, port)], n3_cfg)
        if not ok:
            raise ConnectionError(
                f"Failed to connect to NebulaGraph at {hostname}:{port}"
            )

        self._session = self._pool.get_session(username, password)
        logger.info("Connected to NebulaGraph 3.x at %s:%s", hostname, port)

    def execute(self, statement: str) -> NebulaResultSet:
        if self._session is None:
            raise RuntimeError("Not connected")
        result = self._session.execute(statement)
        rs = NebulaResultSet(result, is_v3=True)
        if not rs.is_succeeded():
            raise RuntimeError(
                f"nGQL execution failed: {rs.error_msg()}\nStatement: {statement}"
            )
        return rs

    def use_space(self, space_name: str) -> None:
        self.execute(f"USE `{space_name}`")

    def close(self) -> None:
        if self._session is not None:
            self._session.release()
            self._session = None
        if self._pool is not None:
            self._pool.close()
            self._pool = None

NebulaV5Adapter

Bases: NebulaClientAdapter

Adapter for nebula5-python (NebulaGraph 5.x, gRPC / ISO GQL).

Source code in graflo/db/nebula/adapter.py
class NebulaV5Adapter(NebulaClientAdapter):
    """Adapter for ``nebula5-python`` (NebulaGraph 5.x, gRPC / ISO GQL)."""

    def __init__(self) -> None:
        self._client: Any = None

    def connect(self, config: NebulaConfig) -> None:
        from nebulagraph_python.client import NebulaClient

        hostname = config.hostname or "localhost"
        port = config.port or 9669
        username = config.username or "root"
        password = config.password or "nebula"

        self._client = NebulaClient(
            hosts=[f"{hostname}:{port}"],
            username=username,
            password=password,
        )
        logger.info("Connected to NebulaGraph 5.x at %s:%s", hostname, port)

    def execute(self, statement: str) -> NebulaResultSet:
        if self._client is None:
            raise RuntimeError("Not connected")
        result = self._client.execute(statement)
        return NebulaResultSet(result, is_v3=False)

    def use_space(self, space_name: str) -> None:
        self.execute(f"USE `{space_name}`")

    def close(self) -> None:
        if self._client is not None:
            self._client.close()
            self._client = None

create_adapter(config)

Source code in graflo/db/nebula/adapter.py
def create_adapter(config: NebulaConfig) -> NebulaClientAdapter:
    """Factory: instantiate and connect the correct adapter for *config.version*."""
    adapter: NebulaClientAdapter
    if config.is_v3:
        adapter = NebulaV3Adapter()
    else:
        adapter = NebulaV5Adapter()
    adapter.connect(config)
    return adapter