Skip to content

graflo.db.nebula.util

NebulaGraph utility functions.

Type mapping, filter rendering, value escaping, and schema-propagation helpers.

escape_nebula_string(value)

Escape a string value for safe embedding in nGQL / GQL literals.

Source code in graflo/db/nebula/util.py
def escape_nebula_string(value: str) -> str:
    """Escape a string value for safe embedding in nGQL / GQL literals."""
    return value.replace("\\", "\\\\").replace('"', '\\"')

make_vid(doc, match_keys)

Derive a VID string from a document's match-key values.

When a single match key is used the raw value is taken. When multiple keys are present the values are joined with :: so the VID is deterministic and unique for the combination.

Source code in graflo/db/nebula/util.py
def make_vid(doc: dict[str, Any], match_keys: list[str] | tuple[str, ...]) -> str:
    """Derive a VID string from a document's match-key values.

    When a single match key is used the raw value is taken.  When multiple keys
    are present the values are joined with ``::`` so the VID is deterministic
    and unique for the combination.
    """
    parts = [str(doc.get(k, "")) for k in match_keys]
    return "::".join(parts)

nebula_type(ft)

Map a graflo FieldType to the corresponding NebulaGraph type name.

Source code in graflo/db/nebula/util.py
def nebula_type(ft: FieldType | None) -> str:
    """Map a graflo ``FieldType`` to the corresponding NebulaGraph type name."""
    if ft is None:
        return DEFAULT_NEBULA_TYPE
    return FIELD_TYPE_TO_NEBULA.get(ft, DEFAULT_NEBULA_TYPE)

render_filters_cypher(filters, doc_name)

Render a FilterExpression as a Cypher WHERE clause (without the keyword).

Source code in graflo/db/nebula/util.py
def render_filters_cypher(
    filters: list | dict | FilterExpression | None,
    doc_name: str,
) -> str:
    """Render a ``FilterExpression`` as a Cypher ``WHERE`` clause (without the keyword)."""
    if filters is None:
        return ""
    if not isinstance(filters, FilterExpression):
        ff = FilterExpression.from_dict(filters)
    else:
        ff = filters
    return str(ff(doc_name=doc_name, kind=ExpressionFlavor.CYPHER))

render_filters_ngql(filters, doc_name)

Render a FilterExpression as an nGQL WHERE clause (without the keyword).

Source code in graflo/db/nebula/util.py
def render_filters_ngql(
    filters: list | dict | FilterExpression | None,
    doc_name: str,
) -> str:
    """Render a ``FilterExpression`` as an nGQL ``WHERE`` clause (without the keyword)."""
    if filters is None:
        return ""
    if not isinstance(filters, FilterExpression):
        ff = FilterExpression.from_dict(filters)
    else:
        ff = filters
    return str(ff(doc_name=doc_name, kind=ExpressionFlavor.NGQL))

serialize_nebula_value(value)

Serialise a Python value into an nGQL literal string.

Source code in graflo/db/nebula/util.py
def serialize_nebula_value(value: Any) -> str:
    """Serialise a Python value into an nGQL literal string."""
    if value is None:
        return "NULL"
    if isinstance(value, bool):
        return "true" if value else "false"
    if isinstance(value, (int, float)):
        return str(value)
    if isinstance(value, Decimal):
        return str(float(value))
    if isinstance(value, datetime):
        return f'"{value.isoformat()}"'
    if isinstance(value, date):
        return f'"{value.isoformat()}"'
    if isinstance(value, dt_time):
        return f'"{value.isoformat()}"'
    if isinstance(value, (list, dict)):
        return f'"{escape_nebula_string(json.dumps(value, default=str))}"'
    return f'"{escape_nebula_string(str(value))}"'

wait_for_schema_propagation(adapter, check_statement, *, max_retries=30, interval=1.0)

Poll check_statement until it succeeds or retries are exhausted.

NebulaGraph propagates schema changes asynchronously across the cluster. After CREATE SPACE / CREATE TAG / CREATE EDGE, subsequent statements may fail until propagation completes (typically within two heartbeat cycles, ~20 s for default settings).

Source code in graflo/db/nebula/util.py
def wait_for_schema_propagation(
    adapter: NebulaClientAdapter,
    check_statement: str,
    *,
    max_retries: int = 30,
    interval: float = 1.0,
) -> None:
    """Poll *check_statement* until it succeeds or retries are exhausted.

    NebulaGraph propagates schema changes asynchronously across the cluster.
    After ``CREATE SPACE`` / ``CREATE TAG`` / ``CREATE EDGE``, subsequent
    statements may fail until propagation completes (typically within two
    heartbeat cycles, ~20 s for default settings).
    """
    for attempt in range(max_retries):
        try:
            adapter.execute(check_statement)
            return
        except Exception:
            if attempt == max_retries - 1:
                raise
            logger.debug(
                "Schema not yet propagated (attempt %d/%d), retrying in %.1fs …",
                attempt + 1,
                max_retries,
                interval,
            )
            time.sleep(interval)

wait_for_space_ready(adapter, space_name, *, max_retries=30, interval=1.0)

Wait until `USEspace_name``` succeeds.

Source code in graflo/db/nebula/util.py
def wait_for_space_ready(
    adapter: NebulaClientAdapter,
    space_name: str,
    *,
    max_retries: int = 30,
    interval: float = 1.0,
) -> None:
    """Wait until ``USE `space_name``` succeeds."""
    wait_for_schema_propagation(
        adapter,
        f"USE `{space_name}`",
        max_retries=max_retries,
        interval=interval,
    )