class FilterExpression(ConfigBaseModel):
"""Unified filter expression (discriminated: leaf or composite).
- kind="leaf": single field comparison (field, cmp_operator, value, optional unary_op).
- kind="composite": logical combination (operator AND/OR/NOT/IF_THEN, deps).
"""
kind: Literal["leaf", "composite"]
# Leaf fields (used when kind="leaf")
cmp_operator: ComparisonOperator | None = None
value: list[Any] = Field(default_factory=list)
field: str | None = None
unary_op: str | None = (
None # optional operator before comparison (YAML key: "operator")
)
# Composite fields (used when kind="composite")
operator: LogicalOperator | None = None # AND, OR, NOT, IF_THEN
deps: list[FilterExpression] = Field(default_factory=list)
@field_validator("value", mode="before")
@classmethod
def value_to_list(cls, v: list[Any] | Any) -> list[Any]:
"""Convert single value to list if necessary. Explicit None becomes [None] for null comparison."""
if v is None:
return [None]
if isinstance(v, list):
return v
return [v]
@model_validator(mode="before")
@classmethod
def leaf_operator_to_unary_op(cls, data: Any) -> Any:
"""Map leaf 'operator' or 'foo' (YAML/kwargs) to unary_op; infer kind and cmp_operator."""
if not isinstance(data, dict):
return data
data = dict(data)
if data.get("kind") == "composite":
return data
if data.get("kind") is None:
op = data.get("operator")
if isinstance(op, LogicalOperator):
data["kind"] = "composite"
return data
if isinstance(op, str) and op in _LOGICAL_OPERATOR_JSON_VALUES:
data["kind"] = "composite"
return data
deps = data.get("deps")
if isinstance(deps, list) and len(deps) > 0:
data["kind"] = "composite"
return data
if data.get("cmp_operator") is not None or data.get("field") is not None:
data["kind"] = "leaf"
raw_op = None
if "operator" in data and isinstance(data["operator"], str):
raw_op = data.pop("operator")
elif "foo" in data and isinstance(data["foo"], str):
raw_op = data.pop("foo")
if raw_op is not None:
data["unary_op"] = raw_op
if data.get("cmp_operator") is None and raw_op in DUNDER_TO_CMP:
data["cmp_operator"] = DUNDER_TO_CMP[raw_op]
if data.get("kind") is None:
data["kind"] = "leaf"
return data
@model_validator(mode="after")
def check_discriminated_shape(self) -> FilterExpression:
"""Enforce exactly one shape per kind and normalise null-check operators."""
if self.kind == "leaf":
if self.operator is not None or self.deps:
raise ValueError("leaf expression must not have operator or deps")
# IS_NULL / IS_NOT_NULL are unary; clear any spurious value list
if self.cmp_operator in (
ComparisonOperator.IS_NULL,
ComparisonOperator.IS_NOT_NULL,
):
object.__setattr__(self, "value", [])
else:
if self.operator is None:
raise ValueError("composite expression must have operator")
return self
@field_validator("deps", mode="before")
@classmethod
def parse_deps(cls, v: list[Any]) -> list[Any]:
"""Parse dict/list items into FilterExpression instances."""
if not isinstance(v, list):
return v
result = []
for item in v:
if isinstance(item, (dict, list)):
result.append(FilterExpression.from_dict(item))
else:
result.append(item)
return result
@classmethod
def from_list(cls, current: list[Any]) -> FilterExpression:
"""Build a leaf expression from list form [cmp_operator, value, field?, unary_op?]."""
cmp_operator = current[0]
value = current[1]
field = current[2] if len(current) > 2 else None
unary_op = current[3] if len(current) > 3 else None
return cls(
kind="leaf",
cmp_operator=cmp_operator,
value=value,
field=field,
unary_op=unary_op,
)
@classmethod
def from_dict(cls, data: dict[str, Any] | list[Any]) -> Self:
"""Create a filter expression from a dictionary or list.
Returns FilterExpression (leaf or composite). LSP-compliant: return type is Self.
"""
if isinstance(data, list):
if data[0] in ComparisonOperator:
return cast(Self, cls.from_list(data))
elif data[0] in LogicalOperator:
return cls(kind="composite", operator=data[0], deps=data[1])
elif isinstance(data, dict):
k = list(data.keys())[0]
norm_k = k.upper() if isinstance(k, str) else k
if norm_k in LogicalOperator:
deps: list[FilterExpression] = [cls.from_dict(v) for v in data[k]]
return cls(
kind="composite", operator=LogicalOperator(norm_k), deps=deps
)
else:
unary_op = data.get("operator") or data.get("foo")
cmp_operator = data.get("cmp_operator")
if cmp_operator is None and unary_op is not None:
cmp_operator = DUNDER_TO_CMP.get(unary_op)
return cls(
kind="leaf",
cmp_operator=cmp_operator,
value=data.get("value", []),
field=data.get("field"),
unary_op=unary_op,
)
raise ValueError(f"expected dict or list, got {type(data)}")
def __call__(
self,
doc_name="doc",
kind: ExpressionFlavor = ExpressionFlavor.AQL,
**kwargs,
) -> str | bool:
"""Render or evaluate the expression in the target language."""
if self.kind == "leaf":
return self._call_leaf(doc_name=doc_name, kind=kind, **kwargs)
return self._call_composite(doc_name=doc_name, kind=kind, **kwargs)
def _is_null_operator(self) -> bool:
"""Check if this is a null-checking operator (IS_NULL or IS_NOT_NULL)."""
return self.cmp_operator in (
ComparisonOperator.IS_NULL,
ComparisonOperator.IS_NOT_NULL,
)
def _call_leaf(
self,
doc_name="doc",
kind: ExpressionFlavor = ExpressionFlavor.AQL,
**kwargs,
) -> str | bool:
if not self._is_null_operator() and not self.value:
logger.warning(f"for {self} value is not set : {self.value}")
if self.cmp_operator is None and kind != ExpressionFlavor.PYTHON:
raise ValueError(
"leaf expression requires cmp_operator for non-PYTHON flavor"
)
if kind == ExpressionFlavor.AQL:
return self._cast_arango(doc_name)
elif kind == ExpressionFlavor.CYPHER:
return self._cast_cypher(doc_name)
elif kind == ExpressionFlavor.NGQL:
return self._cast_ngql(doc_name)
elif kind == ExpressionFlavor.GSQL:
if doc_name == "":
field_types = kwargs.get("field_types")
return self._cast_restpp(field_types=field_types)
return self._cast_tigergraph(doc_name)
elif kind == ExpressionFlavor.SQL:
return self._cast_sql()
elif kind == ExpressionFlavor.PYTHON:
return self._cast_python(**kwargs)
raise ValueError(f"kind {kind} not implemented")
def _call_composite(
self,
doc_name="doc",
kind: ExpressionFlavor = ExpressionFlavor.AQL,
**kwargs,
) -> str | bool:
if kind in (
ExpressionFlavor.AQL,
ExpressionFlavor.CYPHER,
ExpressionFlavor.NGQL,
ExpressionFlavor.GSQL,
ExpressionFlavor.SQL,
):
return self._cast_generic(doc_name=doc_name, kind=kind)
elif kind == ExpressionFlavor.PYTHON:
return self._cast_python_composite(kind=kind, **kwargs)
raise ValueError(f"kind {kind} not implemented")
def _cast_value(self) -> str:
value = f"{self.value[0]}" if len(self.value) == 1 else f"{self.value}"
if len(self.value) == 1:
if isinstance(self.value[0], str):
escaped = self.value[0].replace("\\", "\\\\").replace('"', '\\"')
value = f'"{escaped}"'
elif self.value[0] is None:
value = "null"
else:
value = f"{self.value[0]}"
return value
def _cast_arango(self, doc_name: str) -> str:
if self.cmp_operator == ComparisonOperator.IS_NULL:
return f'{doc_name}["{self.field}"] == null'
if self.cmp_operator == ComparisonOperator.IS_NOT_NULL:
return f'{doc_name}["{self.field}"] != null'
const = self._cast_value()
lemma = f"{self.cmp_operator} {const}"
if self.unary_op is not None:
lemma = f"{self.unary_op} {lemma}"
if self.field is not None:
lemma = f'{doc_name}["{self.field}"] {lemma}'
return lemma
def _cast_cypher(self, doc_name: str) -> str:
if self.cmp_operator == ComparisonOperator.IS_NULL:
return f"{doc_name}.{self.field} IS NULL"
if self.cmp_operator == ComparisonOperator.IS_NOT_NULL:
return f"{doc_name}.{self.field} IS NOT NULL"
const = self._cast_value()
cmp_op = (
"=" if self.cmp_operator == ComparisonOperator.EQ else self.cmp_operator
)
lemma = f"{cmp_op} {const}"
if self.unary_op is not None:
lemma = f"{self.unary_op} {lemma}"
if self.field is not None:
lemma = f"{doc_name}.{self.field} {lemma}"
return lemma
def _cast_ngql(self, doc_name: str) -> str:
"""Render leaf as nGQL expression (NebulaGraph 3.x).
Uses dot-access like Cypher but keeps ``==`` for equality (nGQL standard).
The caller passes *doc_name* as ``"v.TagName"`` so property access becomes
``v.TagName.field``.
"""
if self.cmp_operator == ComparisonOperator.IS_NULL:
return f"{doc_name}.{self.field} IS EMPTY"
if self.cmp_operator == ComparisonOperator.IS_NOT_NULL:
return f"{doc_name}.{self.field} IS NOT EMPTY"
const = self._cast_value()
lemma = f"{self.cmp_operator} {const}"
if self.unary_op is not None:
lemma = f"{self.unary_op} {lemma}"
if self.field is not None:
lemma = f"{doc_name}.{self.field} {lemma}"
return lemma
def _cast_tigergraph(self, doc_name: str) -> str:
if self.cmp_operator == ComparisonOperator.IS_NULL:
return f"{doc_name}.{self.field} IS NULL"
if self.cmp_operator == ComparisonOperator.IS_NOT_NULL:
return f"{doc_name}.{self.field} IS NOT NULL"
const = self._cast_value()
cmp_op = (
"==" if self.cmp_operator == ComparisonOperator.EQ else self.cmp_operator
)
lemma = f"{cmp_op} {const}"
if self.unary_op is not None:
lemma = f"{self.unary_op} {lemma}"
if self.field is not None:
lemma = f"{doc_name}.{self.field} {lemma}"
return lemma
@staticmethod
def _quote_sql_field(field: str) -> str:
"""Quote a SQL field name, handling dotted alias.column references.
``sys_id`` -> ``"sys_id"``
``s.sys_id`` -> ``s."sys_id"``
"""
if "." in field:
alias, col = field.split(".", 1)
return f'{alias}."{col}"'
return f'"{field}"'
def _cast_sql(self) -> str:
"""Render leaf as SQL WHERE fragment: \"column\" op value (strings/dates single-quoted)."""
if not self.field:
return ""
quoted = self._quote_sql_field(self.field)
if self.cmp_operator == ComparisonOperator.IS_NULL:
return f"{quoted} IS NULL"
if self.cmp_operator == ComparisonOperator.IS_NOT_NULL:
return f"{quoted} IS NOT NULL"
if self.cmp_operator == ComparisonOperator.EQ:
op_str = "="
elif self.cmp_operator == ComparisonOperator.NEQ:
op_str = "!="
elif self.cmp_operator in (
ComparisonOperator.GT,
ComparisonOperator.LT,
ComparisonOperator.GE,
ComparisonOperator.LE,
):
op_str = str(self.cmp_operator)
else:
op_str = str(self.cmp_operator)
value = self.value[0] if self.value else None
if value is None:
value_str = "null"
elif isinstance(value, (int, float)):
value_str = str(value)
else:
# Strings and ISO datetimes: single-quoted for SQL
value_str = str(value).replace("'", "''")
value_str = f"'{value_str}'"
return f"{quoted} {op_str} {value_str}"
def _cast_restpp(self, field_types: dict[str, Any] | None = None) -> str:
if not self.field:
return ""
if self.cmp_operator == ComparisonOperator.IS_NULL:
return f'{self.field}=""'
if self.cmp_operator == ComparisonOperator.IS_NOT_NULL:
return f'{self.field}!=""'
if self.cmp_operator == ComparisonOperator.EQ:
op_str = "="
elif self.cmp_operator == ComparisonOperator.NEQ:
op_str = "!="
elif self.cmp_operator == ComparisonOperator.GT:
op_str = ">"
elif self.cmp_operator == ComparisonOperator.LT:
op_str = "<"
elif self.cmp_operator == ComparisonOperator.GE:
op_str = ">="
elif self.cmp_operator == ComparisonOperator.LE:
op_str = "<="
else:
op_str = str(self.cmp_operator)
value = self.value[0] if self.value else None
if value is None:
value_str = "null"
elif isinstance(value, (int, float)):
value_str = str(value)
elif isinstance(value, str):
is_string_field = True
if field_types and self.field in field_types:
field_type = field_types[self.field]
field_type_str = (
field_type.value
if hasattr(field_type, "value")
else str(field_type).upper()
)
if field_type_str in ("INT", "UINT", "FLOAT", "DOUBLE"):
is_string_field = False
value_str = f'"{value}"' if is_string_field else str(value)
else:
value_str = str(value)
return f"{self.field}{op_str}{value_str}"
def _cast_python(self, **kwargs: Any) -> bool:
if self.field is not None:
field_val = kwargs.pop(self.field, None)
if self.cmp_operator == ComparisonOperator.IS_NULL:
return field_val is None
if self.cmp_operator == ComparisonOperator.IS_NOT_NULL:
return field_val is not None
if field_val is not None and self.unary_op is not None:
foo = getattr(field_val, self.unary_op)
return foo(self.value[0])
return False
@staticmethod
def _wrap_composite_operand(
dep: FilterExpression, rendered: str, kind: ExpressionFlavor
) -> str:
"""Parenthesize nested composite operands for SQL/Cypher precedence."""
if (
kind in (ExpressionFlavor.SQL, ExpressionFlavor.CYPHER)
and dep.kind == "composite"
):
return f"({rendered})"
return rendered
def _render_dep(
self, dep: FilterExpression, doc_name: str, kind: ExpressionFlavor
) -> str:
rendered = str(dep(kind=kind, doc_name=doc_name))
return self._wrap_composite_operand(dep, rendered, kind)
def _cast_generic(self, doc_name: str, kind: ExpressionFlavor) -> str:
if self.operator is None:
raise ValueError("composite expression requires operator")
if (
kind == ExpressionFlavor.SQL
and self.operator == LogicalOperator.IMPLICATION
):
if len(self.deps) != 2:
raise ValueError("IF_THEN composite requires exactly 2 deps")
antecedent = self._render_dep(self.deps[0], doc_name, kind)
consequent = self._render_dep(self.deps[1], doc_name, kind)
return f"(NOT ({antecedent}) OR ({consequent}))"
if len(self.deps) == 1:
if self.operator == LogicalOperator.NOT:
result = self._render_dep(self.deps[0], doc_name, kind)
if doc_name == "" and kind == ExpressionFlavor.GSQL:
return f"!{result}"
return f"NOT {result}"
raise ValueError(
f" length of deps = {len(self.deps)} but operator is not {LogicalOperator.NOT}"
)
deps_str_cast = [self._render_dep(dep, doc_name, kind) for dep in self.deps]
if doc_name == "" and kind == ExpressionFlavor.GSQL:
if self.operator == LogicalOperator.AND:
return " && ".join(deps_str_cast)
if self.operator == LogicalOperator.OR:
return " || ".join(deps_str_cast)
return f" {self.operator} ".join(deps_str_cast)
def _cast_python_composite(self, kind: ExpressionFlavor, **kwargs: Any) -> bool:
if self.operator is None:
raise ValueError("composite expression requires operator")
if len(self.deps) == 1:
if self.operator == LogicalOperator.NOT:
return not self.deps[0](kind=kind, **kwargs)
raise ValueError(
f" length of deps = {len(self.deps)} but operator is not {LogicalOperator.NOT}"
)
return OperatorMapping[self.operator](
[dep(kind=kind, **kwargs) for dep in self.deps]
)