Skip to content

graflo.db.nebula.query

Query builders for NebulaGraph nGQL (v3.x) and ISO GQL (v5.x).

Each public function returns a query string ready to be passed to the adapter's execute method.

aggregate_ngql(tag_name, agg_func, discriminant=None, aggregated_field=None, filter_clause='')

Build an aggregation query using MATCH.

Source code in graflo/db/nebula/query.py
def aggregate_ngql(
    tag_name: str,
    agg_func: str,
    discriminant: str | None = None,
    aggregated_field: str | None = None,
    filter_clause: str = "",
) -> str:
    """Build an aggregation query using MATCH."""
    where = f" WHERE {filter_clause}" if filter_clause else ""
    if agg_func == "COUNT":
        if discriminant:
            return (
                f"MATCH (v:`{tag_name}`){where} "
                f"RETURN v.`{tag_name}`.`{discriminant}` AS `key`, count(*) AS `count`"
            )
        return f"MATCH (v:`{tag_name}`){where} RETURN count(*) AS `count`"
    elif agg_func in ("MAX", "MIN", "AVG"):
        func = agg_func.lower()
        return (
            f"MATCH (v:`{tag_name}`){where} "
            f"RETURN {func}(v.`{tag_name}`.`{aggregated_field}`) AS `val`"
        )
    elif agg_func == "SORTED_UNIQUE":
        return (
            f"MATCH (v:`{tag_name}`){where} "
            f"RETURN DISTINCT v.`{tag_name}`.`{aggregated_field}` AS `val` "
            f"ORDER BY `val`"
        )
    raise ValueError(f"Unsupported aggregation: {agg_func}")

batch_upsert_vertices_ngql(tag_name, docs, match_keys, tag_fields)

Return a list of UPSERT VERTEX statements for a document batch.

Source code in graflo/db/nebula/query.py
def batch_upsert_vertices_ngql(
    tag_name: str,
    docs: list[dict[str, Any]],
    match_keys: list[str] | tuple[str, ...],
    tag_fields: list[str],
) -> list[str]:
    """Return a list of ``UPSERT VERTEX`` statements for a document batch."""
    statements: list[str] = []
    for doc in docs:
        vid = make_vid(doc, match_keys)
        stmt = upsert_vertex_ngql(tag_name, vid, doc, tag_fields)
        if stmt:
            statements.append(stmt)
    return statements

create_edge_index_ngql(index_name, edge_type, index_fields, string_index_length=256)

CREATE EDGE INDEX IF NOT EXISTS idx ON edge_type(field(len), ...).

Source code in graflo/db/nebula/query.py
def create_edge_index_ngql(
    index_name: str,
    edge_type: str,
    index_fields: list[str],
    string_index_length: int = 256,
) -> str:
    """``CREATE EDGE INDEX IF NOT EXISTS idx ON edge_type(field(len), ...)``."""
    parts: list[str] = []
    for f in index_fields:
        parts.append(f"`{f}`({string_index_length})")
    fields_str = ", ".join(parts)
    return (
        f"CREATE EDGE INDEX IF NOT EXISTS `{index_name}` "
        f"ON `{edge_type}` ({fields_str})"
    )

create_edge_type_ngql(edge_type, fields=None)

CREATE EDGE IF NOT EXISTS EdgeType(prop type, ...).

Source code in graflo/db/nebula/query.py
def create_edge_type_ngql(edge_type: str, fields: list[Field] | None = None) -> str:
    """``CREATE EDGE IF NOT EXISTS EdgeType(prop type, ...)``."""
    if fields:
        cols = ", ".join(f"`{f.name}` {nebula_type(f.type)}" for f in fields)
        return f"CREATE EDGE IF NOT EXISTS `{edge_type}` ({cols})"
    return f"CREATE EDGE IF NOT EXISTS `{edge_type}` ()"

create_tag_index_ngql(index_name, tag_name, index_fields, string_index_length=256, string_fields=None)

CREATE TAG INDEX IF NOT EXISTS idx ON tag(field(len), ...).

Source code in graflo/db/nebula/query.py
def create_tag_index_ngql(
    index_name: str,
    tag_name: str,
    index_fields: list[str],
    string_index_length: int = 256,
    string_fields: set[str] | None = None,
) -> str:
    """``CREATE TAG INDEX IF NOT EXISTS idx ON tag(field(len), ...)``."""
    parts: list[str] = []
    for f in index_fields:
        if string_fields is None or f in string_fields:
            parts.append(f"`{f}`({string_index_length})")
        else:
            parts.append(f"`{f}`")
    fields_str = ", ".join(parts)
    return (
        f"CREATE TAG INDEX IF NOT EXISTS `{index_name}` ON `{tag_name}` ({fields_str})"
    )

create_tag_ngql(tag_name, fields)

CREATE TAG IF NOT EXISTS Tag(prop type, ...).

Source code in graflo/db/nebula/query.py
def create_tag_ngql(tag_name: str, fields: list[Field]) -> str:
    """``CREATE TAG IF NOT EXISTS Tag(prop type, ...)``."""
    if fields:
        cols = ", ".join(f"`{f.name}` {nebula_type(f.type)}" for f in fields)
        return f"CREATE TAG IF NOT EXISTS `{tag_name}` ({cols})"
    return f"CREATE TAG IF NOT EXISTS `{tag_name}` ()"

fetch_docs_ngql(tag_name, filter_clause='', limit=None, return_keys=None)

MATCH (v:Tag) WHERE ... RETURN v LIMIT n.

Source code in graflo/db/nebula/query.py
def fetch_docs_ngql(
    tag_name: str,
    filter_clause: str = "",
    limit: int | None = None,
    return_keys: list[str] | None = None,
) -> str:
    """``MATCH (v:Tag) WHERE ... RETURN v LIMIT n``."""
    where = f" WHERE {filter_clause}" if filter_clause else ""
    if return_keys:
        ret = ", ".join(f"v.`{tag_name}`.`{k}` AS `{k}`" for k in return_keys)
    else:
        ret = "v"
    lim = f" LIMIT {limit}" if limit else ""
    return f"MATCH (v:`{tag_name}`){where} RETURN {ret}{lim}"

fetch_edges_ngql(from_tag, from_vid, edge_type=None, to_tag=None, to_vid=None, filter_clause='', limit=None)

Build a GO / MATCH query for fetching edges.

Source code in graflo/db/nebula/query.py
def fetch_edges_ngql(
    from_tag: str,
    from_vid: str,
    edge_type: str | None = None,
    to_tag: str | None = None,
    to_vid: str | None = None,
    filter_clause: str = "",
    limit: int | None = None,
) -> str:
    """Build a GO / MATCH query for fetching edges."""
    escaped_from = escape_nebula_string(from_vid)
    over = f"`{edge_type}`" if edge_type else "*"
    where_parts: list[str] = []
    if to_vid:
        escaped_to = escape_nebula_string(to_vid)
        where_parts.append(f'id($$) == "{escaped_to}"')
    if filter_clause:
        where_parts.append(filter_clause)
    where = " WHERE " + " AND ".join(where_parts) if where_parts else ""
    lim = f"| LIMIT {limit}" if limit else ""
    return (
        f'GO FROM "{escaped_from}" OVER {over}{where} '
        f"YIELD properties(edge) AS props, src(edge) AS src, dst(edge) AS dst, "
        f"type(edge) AS edge_type {lim}"
    )

insert_edges_ngql(edge_type, edges, edge_fields=None)

Batch INSERT EDGE IF NOT EXISTS type(cols) VALUES "src"->"dst":(vals), ....

Source code in graflo/db/nebula/query.py
def insert_edges_ngql(
    edge_type: str,
    edges: list[tuple[str, str, dict[str, Any]]],
    edge_fields: list[str] | None = None,
) -> str:
    """Batch ``INSERT EDGE IF NOT EXISTS type(cols) VALUES "src"->"dst":(vals), ...``."""
    if not edges:
        return ""

    if edge_fields:
        cols = ", ".join(f"`{f}`" for f in edge_fields)
        value_parts: list[str] = []
        for src_vid, dst_vid, props in edges:
            src = escape_nebula_string(src_vid)
            dst = escape_nebula_string(dst_vid)
            vals = ", ".join(serialize_nebula_value(props.get(f)) for f in edge_fields)
            value_parts.append(f'"{src}"->"{dst}":({vals})')
        values_str = ", ".join(value_parts)
        return f"INSERT EDGE IF NOT EXISTS `{edge_type}` ({cols}) VALUES {values_str}"
    else:
        value_parts_simple: list[str] = []
        for src_vid, dst_vid, _ in edges:
            src = escape_nebula_string(src_vid)
            dst = escape_nebula_string(dst_vid)
            value_parts_simple.append(f'"{src}"->"{dst}":()')
        values_str = ", ".join(value_parts_simple)
        return f"INSERT EDGE IF NOT EXISTS `{edge_type}` () VALUES {values_str}"

insert_vertices_ngql(tag_name, docs, match_keys, tag_fields)

Batch INSERT VERTEX IF NOT EXISTS tag(cols) VALUES "vid":(vals), ....

Source code in graflo/db/nebula/query.py
def insert_vertices_ngql(
    tag_name: str,
    docs: list[dict[str, Any]],
    match_keys: list[str] | tuple[str, ...],
    tag_fields: list[str],
) -> str:
    """Batch ``INSERT VERTEX IF NOT EXISTS tag(cols) VALUES "vid":(vals), ...``."""
    if not docs or not tag_fields:
        return ""
    ordered_fields = [f for f in tag_fields]
    cols = ", ".join(f"`{f}`" for f in ordered_fields)
    value_parts: list[str] = []
    for doc in docs:
        vid = make_vid(doc, match_keys)
        escaped_vid = escape_nebula_string(vid)
        vals = ", ".join(serialize_nebula_value(doc.get(f)) for f in ordered_fields)
        value_parts.append(f'"{escaped_vid}":({vals})')
    values_str = ", ".join(value_parts)
    return f"INSERT VERTEX IF NOT EXISTS `{tag_name}` ({cols}) VALUES {values_str}"

upsert_vertex_gql(tag_name, vid, props, tag_fields)

Source code in graflo/db/nebula/query.py
def upsert_vertex_gql(
    tag_name: str,
    vid: str,
    props: dict[str, Any],
    tag_fields: list[str],
) -> str:
    """v5: UPSERT VERTEX ON tag "vid" SET prop=val, ... (same syntax)."""
    return upsert_vertex_ngql(tag_name, vid, props, tag_fields)

upsert_vertex_ngql(tag_name, vid, props, tag_fields)

UPSERT VERTEX ON tag "vid" SET prop=val, ....

Source code in graflo/db/nebula/query.py
def upsert_vertex_ngql(
    tag_name: str,
    vid: str,
    props: dict[str, Any],
    tag_fields: list[str],
) -> str:
    """``UPSERT VERTEX ON tag "vid" SET prop=val, ...``."""
    set_parts = [
        f"`{k}` = {serialize_nebula_value(v)}"
        for k, v in props.items()
        if k in tag_fields
    ]
    if not set_parts:
        return ""
    set_clause = ", ".join(set_parts)
    escaped_vid = escape_nebula_string(vid)
    return f'UPSERT VERTEX ON `{tag_name}` "{escaped_vid}" SET {set_clause}'