Skip to content

graflo.db.falkordb

FalkorDB connection implementation for graph database operations.

This module implements the Connection interface for FalkorDB, providing specific functionality for graph operations in FalkorDB. FalkorDB is a Redis-based graph database that supports OpenCypher query language.

Key Features
  • Label-based node organization (like Neo4j)
  • Relationship type management
  • Property indices
  • Cypher query execution
  • Batch node and relationship operations
  • Redis-based storage with graph namespacing
Example

from graflo.db.falkordb import FalkordbConnection from graflo.db.connection import FalkordbConfig config = FalkordbConfig(uri="redis://localhost:6379", database="mygraph") conn = FalkordbConnection(config) conn.init_db(schema, clean_start=True)

FalkordbConnection

Bases: Connection

FalkorDB connector implementing the graflo Connection interface.

Provides complete graph database operations for FalkorDB including node/relationship CRUD, batch operations, aggregations, and raw Cypher query execution.

Thread Safety

This class is NOT thread-safe. Each thread should use its own connection instance. For concurrent access, use ConnectionManager with separate instances per thread.

Error Handling

  • Connection errors raise on instantiation
  • Query errors propagate as redis.exceptions.ResponseError
  • Invalid inputs raise ValueError with descriptive messages

Attributes

flavor : DBFlavor Database type identifier (DBFlavor.FALKORDB) config : FalkordbConfig Connection configuration (URI, database, credentials) client : FalkorDB Underlying FalkorDB client instance graph : Graph Active graph object for query execution _graph_name : str Name of the currently selected graph

Examples

Direct instantiation (prefer ConnectionManager for production)::

config = FalkordbConfig(uri="redis://localhost:6379")
conn = FalkordbConnection(config)
try:
    result = conn.execute("MATCH (n) RETURN count(n)")
finally:
    conn.close()
Source code in graflo/db/falkordb/conn.py
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
class FalkordbConnection(Connection):
    """FalkorDB connector implementing the graflo Connection interface.

    Provides complete graph database operations for FalkorDB including
    node/relationship CRUD, batch operations, aggregations, and raw
    Cypher query execution.

    Thread Safety
    -------------
    This class is NOT thread-safe. Each thread should use its own
    connection instance. For concurrent access, use ConnectionManager
    with separate instances per thread.

    Error Handling
    --------------
    - Connection errors raise on instantiation
    - Query errors propagate as redis.exceptions.ResponseError
    - Invalid inputs raise ValueError with descriptive messages

    Attributes
    ----------
    flavor : DBFlavor
        Database type identifier (DBFlavor.FALKORDB)
    config : FalkordbConfig
        Connection configuration (URI, database, credentials)
    client : FalkorDB
        Underlying FalkorDB client instance
    graph : Graph
        Active graph object for query execution
    _graph_name : str
        Name of the currently selected graph

    Examples
    --------
    Direct instantiation (prefer ConnectionManager for production)::

        config = FalkordbConfig(uri="redis://localhost:6379")
        conn = FalkordbConnection(config)
        try:
            result = conn.execute("MATCH (n) RETURN count(n)")
        finally:
            conn.close()
    """

    flavor = DBFlavor.FALKORDB

    # Type annotations for instance attributes
    client: FalkorDB | None
    graph: Graph | None
    _graph_name: str

    def __init__(self, config: FalkordbConfig):
        """Initialize FalkorDB connection and select graph.

        Establishes connection to the FalkorDB instance and selects
        the specified graph for subsequent operations.

        Parameters
        ----------
        config : FalkordbConfig
            Connection configuration with the following fields:
            - uri: Redis URI (redis://host:port)
            - database: Graph name (optional, defaults to "default")
            - password: Redis password (optional)

        Raises
        ------
        ValueError
            If URI is not provided in configuration
        redis.exceptions.ConnectionError
            If unable to connect to Redis instance
        """
        super().__init__()
        self.config = config

        if config.uri is None:
            raise ValueError("FalkorDB connection requires a URI to be configured")

        # Parse URI to extract host and port
        parsed = urlparse(config.uri)
        host = parsed.hostname or "localhost"
        port = parsed.port or 6379

        # Initialize FalkorDB client
        if config.password:
            self.client = FalkorDB(host=host, port=port, password=config.password)
        else:
            self.client = FalkorDB(host=host, port=port)

        # Select the graph (database in config maps to graph name)
        graph_name = config.database or "default"
        self.graph = self.client.select_graph(graph_name)
        self._graph_name = graph_name

    def execute(self, query: str, **kwargs):
        """Execute a raw OpenCypher query against the graph.

        Executes the provided Cypher query with optional parameters.
        Parameters are safely injected using FalkorDB's parameterized
        query mechanism to prevent injection attacks.

        Parameters
        ----------
        query : str
            OpenCypher query string. Can include parameter placeholders
            using $name syntax (e.g., "MATCH (n) WHERE n.id = $id")
        **kwargs
            Query parameters as keyword arguments. Values are safely
            escaped by the driver.

        Returns
        -------
        QueryResult
            FalkorDB result object containing:
            - result_set: List of result rows
            - statistics: Query execution statistics

        Examples
        --------
        Simple query::

            result = conn.execute("MATCH (n:Person) RETURN n.name")

        Parameterized query::

            result = conn.execute(
                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
                min_age=21
            )
        """
        assert self.graph is not None, "Connection is closed"
        result = self.graph.query(query, kwargs if kwargs else None)
        return result

    def close(self):
        """Close the FalkorDB connection.

        Note: FalkorDB client uses Redis connection pooling,
        so explicit close is not always necessary.
        """
        # FalkorDB client handles connection pooling internally
        # No explicit close needed, but we can delete the reference
        self.graph = None
        self.client = None

    @staticmethod
    def _is_valid_property_value(value) -> bool:
        """Validate that a value can be stored as a FalkorDB property.

        FalkorDB (like most databases) cannot store special float values.
        This method rejects values that would cause query failures.

        Parameters
        ----------
        value : Any
            Value to validate

        Returns
        -------
        bool
            True if value can be safely stored, False otherwise

        Notes
        -----
        Rejected values:
        - float('nan'): Not a Number
        - float('inf'): Positive infinity
        - float('-inf'): Negative infinity
        """
        import math

        if isinstance(value, float):
            if math.isnan(value) or math.isinf(value):
                return False
        return True

    @staticmethod
    def _sanitize_string_value(value: str) -> str:
        """Remove characters that break the Cypher parser.

        Null bytes (\\x00) cause FalkorDB's Cypher parser to fail with
        cryptic errors. This method strips them from string values.

        Parameters
        ----------
        value : str
            String value to sanitize

        Returns
        -------
        str
            Sanitized string with problematic characters removed

        Notes
        -----
        Currently handles:
        - Null bytes (\\x00): Break Cypher parser tokenization
        """
        if "\x00" in value:
            value = value.replace("\x00", "")
        return value

    def _sanitize_document(
        self, doc: dict, match_keys: list[str] | None = None
    ) -> dict:
        """Sanitize a document for safe FalkorDB insertion.

        Performs comprehensive input validation and sanitization to ensure
        documents can be safely inserted without query errors or injection.

        Sanitization Steps
        ------------------
        1. Filter non-string property keys (log warning)
        2. Remove properties with invalid float values (NaN, Inf)
        3. Strip null bytes from string values
        4. Validate presence of required match keys

        Parameters
        ----------
        doc : dict
            Document to sanitize. Modified values are logged as warnings.
        match_keys : list[str], optional
            Keys that must be present with valid (non-None) values.
            Typically the fields used for MERGE matching.

        Returns
        -------
        dict
            Sanitized copy of the document

        Raises
        ------
        ValueError
            If a required match_key is missing or has None value

        Examples
        --------
        >>> doc = {"id": "1", "name": "test\\x00", 123: "bad_key"}
        >>> sanitized = conn._sanitize_document(doc, match_keys=["id"])
        # Logs: Skipping non-string property key: 123
        # Logs: Sanitized property 'name': removed null bytes
        >>> sanitized
        {"id": "1", "name": "test"}
        """
        sanitized = {}

        for key, value in doc.items():
            # Filter non-string keys
            if not isinstance(key, str):
                logger.warning(
                    f"Skipping non-string property key: {key!r} (type: {type(key).__name__})"
                )
                continue

            # Check for invalid float values
            if not self._is_valid_property_value(value):
                logger.warning(f"Skipping property '{key}' with invalid value: {value}")
                continue

            # Sanitize string values (remove null bytes that break Cypher)
            if isinstance(value, str):
                original = value
                value = self._sanitize_string_value(value)
                if value != original:
                    logger.warning(
                        f"Sanitized property '{key}': removed null bytes from value"
                    )

            sanitized[key] = value

        # Validate match_keys presence
        if match_keys:
            for key in match_keys:
                if key not in sanitized:
                    raise ValueError(
                        f"Required match key '{key}' is missing or has invalid value in document: {doc}"
                    )
                if sanitized[key] is None:
                    raise ValueError(
                        f"Match key '{key}' cannot be None in document: {doc}"
                    )

        return sanitized

    def _sanitize_batch(
        self, docs: list[dict], match_keys: list[str] | None = None
    ) -> list[dict]:
        """Sanitize a batch of documents.

        Args:
            docs: List of documents to sanitize
            match_keys: Optional list of required keys to validate

        Returns:
            list[dict]: List of sanitized documents
        """
        return [self._sanitize_document(doc, match_keys) for doc in docs]

    def create_database(self, name: str):
        """Create a new graph in FalkorDB.

        In FalkorDB, creating a database means selecting a new graph.
        The graph is created implicitly when data is first inserted.

        Args:
            name: Name of the graph to create
        """
        # In FalkorDB, graphs are created implicitly when you first insert data
        # We just need to select the graph
        assert self.client is not None, "Connection is closed"
        self.graph = self.client.select_graph(name)
        self._graph_name = name
        logger.info(f"Selected FalkorDB graph '{name}'")

    def delete_database(self, name: str):
        """Delete a graph from FalkorDB.

        Args:
            name: Name of the graph to delete (if empty, uses current graph)
        """
        graph_to_delete = name if name else self._graph_name
        assert self.client is not None, "Connection is closed"
        try:
            # Delete the graph using the FalkorDB API
            graph = self.client.select_graph(graph_to_delete)
            graph.delete()
            logger.info(f"Successfully deleted FalkorDB graph '{graph_to_delete}'")
        except Exception as e:
            logger.error(
                f"Failed to delete FalkorDB graph '{graph_to_delete}': {e}",
                exc_info=True,
            )
            raise

    def define_vertex_indices(self, vertex_config: VertexConfig):
        """Define indices for vertex labels.

        Creates indices for each vertex label based on the configuration.
        FalkorDB supports range indices on node properties.

        Args:
            vertex_config: Vertex configuration containing index definitions
        """
        for c in vertex_config.vertex_set:
            for index_obj in vertex_config.indexes(c):
                self._add_index(c, index_obj)

    def define_edge_indices(self, edges: list[Edge]):
        """Define indices for relationship types.

        Creates indices for each relationship type based on the configuration.
        FalkorDB supports range indices on relationship properties.

        Args:
            edges: List of edge configurations containing index definitions
        """
        for edge in edges:
            for index_obj in edge.indexes:
                if edge.relation is not None:
                    self._add_index(edge.relation, index_obj, is_vertex_index=False)

    def _add_index(self, obj_name: str, index: Index, is_vertex_index: bool = True):
        """Add an index to a label or relationship type.

        FalkorDB uses CREATE INDEX syntax similar to Neo4j but with some differences.

        Args:
            obj_name: Label or relationship type name
            index: Index configuration to create
            is_vertex_index: If True, create index on nodes, otherwise on relationships
        """
        for field in index.fields:
            try:
                if is_vertex_index:
                    # FalkorDB node index syntax
                    q = f"CREATE INDEX FOR (n:{obj_name}) ON (n.{field})"
                else:
                    # FalkorDB relationship index syntax
                    q = f"CREATE INDEX FOR ()-[r:{obj_name}]-() ON (r.{field})"

                self.execute(q)
                logger.debug(f"Created index on {obj_name}.{field}")
            except Exception as e:
                # Index may already exist, log and continue
                logger.debug(f"Index creation note for {obj_name}.{field}: {e}")

    def define_schema(self, schema: Schema):
        """Define collections based on schema.

        Note: This is a no-op in FalkorDB as collections are implicit.
        Labels and relationship types are created when data is inserted.

        Args:
            schema: Schema containing collection definitions
        """
        pass

    def define_vertex_collections(self, schema: Schema):
        """Define vertex collections based on schema.

        Note: This is a no-op in FalkorDB as vertex collections are implicit.

        Args:
            schema: Schema containing vertex definitions
        """
        pass

    def define_edge_collections(self, edges: list[Edge]):
        """Define edge collections based on schema.

        Note: This is a no-op in FalkorDB as edge collections are implicit.

        Args:
            edges: List of edge configurations
        """
        pass

    def delete_graph_structure(self, vertex_types=(), graph_names=(), delete_all=False):
        """Delete graph structure (nodes and relationships) from FalkorDB.

        In FalkorDB:
        - Labels: Categories for nodes (equivalent to vertex types)
        - Relationship Types: Types of relationships (equivalent to edge types)
        - Graph: Redis key containing all nodes and relationships

        Args:
            vertex_types: Label names to delete nodes for
            graph_names: Graph names to delete entirely
            delete_all: If True, delete all nodes and relationships
        """
        if delete_all or (not vertex_types and not graph_names):
            # Delete all nodes and relationships in current graph
            try:
                self.execute("MATCH (n) DETACH DELETE n")
                logger.debug("Deleted all nodes and relationships from graph")
            except Exception as e:
                logger.debug(f"Graph may be empty or not exist: {e}")
        elif vertex_types:
            # Delete nodes with specific labels
            for label in vertex_types:
                try:
                    self.execute(f"MATCH (n:{label}) DETACH DELETE n")
                    logger.debug(f"Deleted all nodes with label '{label}'")
                except Exception as e:
                    logger.warning(f"Failed to delete nodes with label '{label}': {e}")

        # Delete specific graphs
        assert self.client is not None, "Connection is closed"
        for graph_name in graph_names:
            try:
                graph = self.client.select_graph(graph_name)
                graph.delete()
                logger.debug(f"Deleted graph '{graph_name}'")
            except Exception as e:
                logger.warning(f"Failed to delete graph '{graph_name}': {e}")

    def init_db(self, schema: Schema, clean_start: bool):
        """Initialize FalkorDB with the given schema.

        Uses schema.general.name if database is not set in config.

        Args:
            schema: Schema containing graph structure definitions
            clean_start: If True, delete all existing data before initialization
        """
        # Determine graph name: use config.database if set, otherwise use schema.general.name
        graph_name = self.config.database
        if not graph_name:
            graph_name = schema.general.name
            self.config.database = graph_name

        # Select/create the graph
        assert self.client is not None, "Connection is closed"
        self.graph = self.client.select_graph(graph_name)
        self._graph_name = graph_name
        logger.info(f"Initialized FalkorDB graph '{graph_name}'")

        if clean_start:
            try:
                self.delete_graph_structure(delete_all=True)
                logger.debug(f"Cleaned graph '{graph_name}' for fresh start")
            except Exception as e:
                logger.debug(f"Clean start note for graph '{graph_name}': {e}")

        try:
            self.define_indexes(schema)
            logger.debug(f"Defined indexes for graph '{graph_name}'")
        except Exception as e:
            logger.error(
                f"Failed to define indexes for graph '{graph_name}': {e}",
                exc_info=True,
            )
            raise

    def upsert_docs_batch(
        self, docs: list[dict], class_name: str, match_keys: list[str], **kwargs
    ):
        """Upsert a batch of nodes using Cypher MERGE.

        Performs atomic upsert (update-or-insert) operations on a batch of
        documents. Uses Cypher MERGE with ON MATCH/ON CREATE for efficiency.

        The operation:
        1. Sanitizes all documents (removes invalid keys/values)
        2. For each document, attempts to MERGE on match_keys
        3. If node exists: updates all properties
        4. If node doesn't exist: creates with all properties

        Parameters
        ----------
        docs : list[dict]
            Documents to upsert. Each document must contain all match_keys.
        class_name : str
            Node label (e.g., "Person", "Product")
        match_keys : list[str]
            Properties used to identify existing nodes. These form the
            MERGE pattern: ``MERGE (n:Label {key1: val1, key2: val2})``
        **kwargs
            Additional options:
            - dry (bool): If True, build query but don't execute

        Raises
        ------
        ValueError
            If any document is missing a required match_key or has None value

        Examples
        --------
        Insert or update users by email::

            docs = [
                {"email": "alice@example.com", "name": "Alice", "age": 30},
                {"email": "bob@example.com", "name": "Bob", "age": 25}
            ]
            conn.upsert_docs_batch(docs, "User", match_keys=["email"])

        Notes
        -----
        The generated Cypher query uses UNWIND for batch efficiency::

            UNWIND $batch AS row
            MERGE (n:Label {match_key: row.match_key})
            ON MATCH SET n += row
            ON CREATE SET n += row
        """
        dry = kwargs.pop("dry", False)

        if not docs:
            return

        # Sanitize documents: filter invalid keys/values, validate match_keys
        sanitized_docs = self._sanitize_batch(docs, match_keys)

        if not sanitized_docs:
            return

        # Build the MERGE clause with match keys
        index_str = ", ".join([f"{k}: row.{k}" for k in match_keys])
        q = f"""
            UNWIND $batch AS row
            MERGE (n:{class_name} {{ {index_str} }})
            ON MATCH SET n += row
            ON CREATE SET n += row
        """
        if not dry:
            self.execute(q, batch=sanitized_docs)

    def insert_edges_batch(
        self,
        docs_edges: list,
        source_class: str,
        target_class: str,
        relation_name: str,
        collection_name: str | None = None,
        match_keys_source: tuple[str, ...] = ("_key",),
        match_keys_target: tuple[str, ...] = ("_key",),
        filter_uniques: bool = True,
        uniq_weight_fields=None,
        uniq_weight_collections=None,
        upsert_option: bool = False,
        head: int | None = None,
        **kwargs,
    ):
        """Create relationships between existing nodes using Cypher MERGE.

        Efficiently creates relationships in batch by matching source and
        target nodes, then creating or updating the relationship between them.

        Parameters
        ----------
        docs_edges : list
            Edge specifications as list of [source, target, props] triples:
            ``[[{source_props}, {target_props}, {edge_props}], ...]``
        source_class : str
            Label of source nodes (e.g., "Person")
        target_class : str
            Label of target nodes (e.g., "Company")
        relation_name : str
            Relationship type name (e.g., "WORKS_AT")
        collection_name : str, optional
            Unused in FalkorDB (kept for interface compatibility)
        match_keys_source : tuple[str, ...]
            Properties to match source nodes (default: ("_key",))
        match_keys_target : tuple[str, ...]
            Properties to match target nodes (default: ("_key",))
        filter_uniques : bool
            Unused in FalkorDB (kept for interface compatibility)
        uniq_weight_fields
            Unused in FalkorDB (kept for interface compatibility)
        uniq_weight_collections
            Unused in FalkorDB (kept for interface compatibility)
        upsert_option : bool
            Unused in FalkorDB (kept for interface compatibility)
        head : int, optional
            Unused in FalkorDB (kept for interface compatibility)
        **kwargs
            Additional options:
            - dry (bool): If True, build query but don't execute

        Examples
        --------
        Create KNOWS relationships between people::

            edges = [
                [{"id": "1"}, {"id": "2"}, {"since": 2020}],
                [{"id": "1"}, {"id": "3"}, {"since": 2021}]
            ]
            conn.insert_edges_batch(
                edges,
                source_class="Person",
                target_class="Person",
                relation_name="KNOWS",
                match_keys_source=["id"],
                match_keys_target=["id"]
            )

        Notes
        -----
        Generated Cypher pattern::

            UNWIND $batch AS row
            MATCH (source:Label), (target:Label)
            WHERE source.key = row[0].key AND target.key = row[1].key
            MERGE (source)-[r:REL_TYPE]->(target)
            SET r += row[2]
        """
        dry = kwargs.pop("dry", False)

        if not docs_edges:
            return

        # Build match conditions for source and target nodes
        source_match_str = [f"source.{key} = row[0].{key}" for key in match_keys_source]
        target_match_str = [f"target.{key} = row[1].{key}" for key in match_keys_target]

        match_clause = "WHERE " + " AND ".join(source_match_str + target_match_str)

        q = f"""
            UNWIND $batch AS row
            MATCH (source:{source_class}),
                  (target:{target_class}) {match_clause}
            MERGE (source)-[r:{relation_name}]->(target)
            SET r += row[2]
        """
        if not dry:
            self.execute(q, batch=docs_edges)

    def insert_return_batch(self, docs, class_name):
        """Insert nodes and return their properties.

        Note: Limited implementation in FalkorDB.

        Args:
            docs: Documents to insert
            class_name: Label to insert into

        Raises:
            NotImplementedError: This method is not fully implemented for FalkorDB
        """
        raise NotImplementedError("insert_return_batch is not implemented for FalkorDB")

    def fetch_docs(
        self,
        class_name,
        filters: list | dict | None = None,
        limit: int | None = None,
        return_keys: list | None = None,
        unset_keys: list | None = None,
        **kwargs,
    ):
        """Fetch nodes from a label.

        Args:
            class_name: Label to fetch from
            filters: Query filters
            limit: Maximum number of nodes to return
            return_keys: Keys to return
            unset_keys: Unused in FalkorDB

        Returns:
            list: Fetched nodes as dictionaries
        """
        # Build filter clause
        if filters is not None:
            ff = Expression.from_dict(filters)
            # Use NEO4J flavor since FalkorDB uses OpenCypher
            filter_clause = f"WHERE {ff(doc_name='n', kind=DBFlavor.NEO4J)}"
        else:
            filter_clause = ""

        # Build return clause
        if return_keys is not None:
            # Project specific keys
            keep_clause_ = ", ".join([f"n.{item} AS {item}" for item in return_keys])
            return_clause = f"RETURN {keep_clause_}"
        else:
            return_clause = "RETURN n"

        # Build limit clause (must be positive integer)
        if limit is not None and isinstance(limit, int) and limit > 0:
            limit_clause = f"LIMIT {limit}"
        else:
            limit_clause = ""

        q = f"""
            MATCH (n:{class_name})
            {filter_clause}
            {return_clause}
            {limit_clause}
        """

        result = self.execute(q)

        # Convert FalkorDB results to list of dictionaries
        if return_keys is not None:
            # Results are already projected
            return [dict(zip(return_keys, row)) for row in result.result_set]
        else:
            # Results contain node objects
            return [self._node_to_dict(row[0]) for row in result.result_set]

    def _node_to_dict(self, node) -> dict:
        """Convert a FalkorDB node to a dictionary.

        Args:
            node: FalkorDB node object

        Returns:
            dict: Node properties as dictionary
        """
        if hasattr(node, "properties"):
            return dict(node.properties)
        elif isinstance(node, dict):
            return node
        else:
            # Try to convert to dict
            return dict(node) if node else {}

    def fetch_edges(
        self,
        from_type: str,
        from_id: str,
        edge_type: str | None = None,
        to_type: str | None = None,
        to_id: str | None = None,
        filters: list | dict | None = None,
        limit: int | None = None,
        return_keys: list | None = None,
        unset_keys: list | None = None,
        **kwargs,
    ):
        """Fetch edges from FalkorDB using Cypher.

        Args:
            from_type: Source node label
            from_id: Source node ID (property name depends on match_keys used)
            edge_type: Optional relationship type to filter by
            to_type: Optional target node label to filter by
            to_id: Optional target node ID to filter by
            filters: Additional query filters
            limit: Maximum number of edges to return
            return_keys: Keys to return (projection)
            unset_keys: Keys to exclude (projection) - not supported in FalkorDB
            **kwargs: Additional parameters

        Returns:
            list: List of fetched edges as dictionaries
        """
        # Build source node match
        source_match = f"(source:{from_type} {{id: '{from_id}'}})"

        # Build relationship pattern
        if edge_type:
            rel_pattern = f"-[r:{edge_type}]->"
        else:
            rel_pattern = "-[r]->"

        # Build target node match
        if to_type:
            target_match = f"(target:{to_type})"
        else:
            target_match = "(target)"

        # Build WHERE clauses
        where_clauses = []
        if to_id:
            where_clauses.append(f"target.id = '{to_id}'")

        # Add additional filters if provided
        if filters is not None:
            ff = Expression.from_dict(filters)
            filter_clause = ff(doc_name="r", kind=ExpressionFlavor.NEO4J)
            where_clauses.append(filter_clause)

        where_clause = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""

        # Build return clause
        if return_keys is not None:
            return_parts = ", ".join([f"r.{key} AS {key}" for key in return_keys])
            return_clause = f"RETURN {return_parts}"
        else:
            return_clause = "RETURN r"

        limit_clause = f"LIMIT {limit}" if limit and limit > 0 else ""

        query = f"""
            MATCH {source_match}{rel_pattern}{target_match}
            {where_clause}
            {return_clause}
            {limit_clause}
        """

        result = self.execute(query)

        # Convert results
        if return_keys is not None:
            return [dict(zip(return_keys, row)) for row in result.result_set]
        else:
            return [self._edge_to_dict(row[0]) for row in result.result_set]

    def _edge_to_dict(self, edge) -> dict:
        """Convert a FalkorDB edge to a dictionary.

        Args:
            edge: FalkorDB edge object

        Returns:
            dict: Edge properties as dictionary
        """
        if hasattr(edge, "properties"):
            return dict(edge.properties)
        elif isinstance(edge, dict):
            return edge
        else:
            return dict(edge) if edge else {}

    def fetch_present_documents(
        self,
        batch,
        class_name,
        match_keys,
        keep_keys,
        flatten=False,
        filters: list | dict | None = None,
    ):
        """Fetch nodes that exist in the database.

        Args:
            batch: Batch of documents to check
            class_name: Label to check in
            match_keys: Keys to match nodes
            keep_keys: Keys to keep in result
            flatten: Unused in FalkorDB
            filters: Additional query filters

        Returns:
            list: Documents that exist in the database
        """
        if not batch:
            return []

        # Build match conditions for each document in batch
        results = []
        for doc in batch:
            match_conditions = " AND ".join([f"n.{key} = ${key}" for key in match_keys])
            params = {key: doc.get(key) for key in match_keys}

            q = f"""
                MATCH (n:{class_name})
                WHERE {match_conditions}
                RETURN n
                LIMIT 1
            """

            try:
                result = self.execute(q, **params)
                if result.result_set:
                    node_dict = self._node_to_dict(result.result_set[0][0])
                    if keep_keys:
                        node_dict = {k: node_dict.get(k) for k in keep_keys}
                    results.append(node_dict)
            except Exception as e:
                logger.debug(f"Error checking document presence: {e}")

        return results

    def aggregate(
        self,
        class_name,
        aggregation_function: AggregationType,
        discriminant: str | None = None,
        aggregated_field: str | None = None,
        filters: list | dict | None = None,
    ):
        """Perform aggregation on nodes.

        Args:
            class_name: Label to aggregate
            aggregation_function: Type of aggregation to perform
            discriminant: Field to group by
            aggregated_field: Field to aggregate
            filters: Query filters

        Returns:
            dict or int: Aggregation results
        """
        # Build filter clause
        if filters is not None:
            ff = Expression.from_dict(filters)
            filter_clause = f"WHERE {ff(doc_name='n', kind=DBFlavor.NEO4J)}"
        else:
            filter_clause = ""

        # Build aggregation query based on function type
        if aggregation_function == AggregationType.COUNT:
            if discriminant:
                q = f"""
                    MATCH (n:{class_name})
                    {filter_clause}
                    RETURN n.{discriminant} AS key, count(*) AS count
                """
                result = self.execute(q)
                return {row[0]: row[1] for row in result.result_set}
            else:
                q = f"""
                    MATCH (n:{class_name})
                    {filter_clause}
                    RETURN count(*) AS count
                """
                result = self.execute(q)
                return result.result_set[0][0] if result.result_set else 0

        elif aggregation_function == AggregationType.MAX:
            if not aggregated_field:
                raise ValueError("aggregated_field is required for MAX aggregation")
            q = f"""
                MATCH (n:{class_name})
                {filter_clause}
                RETURN max(n.{aggregated_field}) AS max_value
            """
            result = self.execute(q)
            return result.result_set[0][0] if result.result_set else None

        elif aggregation_function == AggregationType.MIN:
            if not aggregated_field:
                raise ValueError("aggregated_field is required for MIN aggregation")
            q = f"""
                MATCH (n:{class_name})
                {filter_clause}
                RETURN min(n.{aggregated_field}) AS min_value
            """
            result = self.execute(q)
            return result.result_set[0][0] if result.result_set else None

        elif aggregation_function == AggregationType.AVERAGE:
            if not aggregated_field:
                raise ValueError("aggregated_field is required for AVERAGE aggregation")
            q = f"""
                MATCH (n:{class_name})
                {filter_clause}
                RETURN avg(n.{aggregated_field}) AS avg_value
            """
            result = self.execute(q)
            return result.result_set[0][0] if result.result_set else None

        elif aggregation_function == AggregationType.SORTED_UNIQUE:
            if not aggregated_field:
                raise ValueError(
                    "aggregated_field is required for SORTED_UNIQUE aggregation"
                )
            q = f"""
                MATCH (n:{class_name})
                {filter_clause}
                RETURN DISTINCT n.{aggregated_field} AS value
                ORDER BY value
            """
            result = self.execute(q)
            return [row[0] for row in result.result_set]

        else:
            raise ValueError(
                f"Unsupported aggregation function: {aggregation_function}"
            )

    def keep_absent_documents(
        self,
        batch,
        class_name,
        match_keys,
        keep_keys,
        filters: list | dict | None = None,
    ):
        """Keep documents that don't exist in the database.

        Args:
            batch: Batch of documents to check
            class_name: Label to check in
            match_keys: Keys to match nodes
            keep_keys: Keys to keep in result
            filters: Additional query filters

        Returns:
            list: Documents that don't exist in the database
        """
        if not batch:
            return []

        # Find documents that exist
        present_docs = self.fetch_present_documents(
            batch, class_name, match_keys, match_keys, filters=filters
        )

        # Create a set of present document keys for efficient lookup
        present_keys = set()
        for doc in present_docs:
            key_tuple = tuple(doc.get(k) for k in match_keys)
            present_keys.add(key_tuple)

        # Filter out documents that exist
        absent_docs = []
        for doc in batch:
            key_tuple = tuple(doc.get(k) for k in match_keys)
            if key_tuple not in present_keys:
                if keep_keys:
                    absent_docs.append({k: doc.get(k) for k in keep_keys})
                else:
                    absent_docs.append(doc)

        return absent_docs

__init__(config)

Initialize FalkorDB connection and select graph.

Establishes connection to the FalkorDB instance and selects the specified graph for subsequent operations.

Parameters

config : FalkordbConfig Connection configuration with the following fields: - uri: Redis URI (redis://host:port) - database: Graph name (optional, defaults to "default") - password: Redis password (optional)

Raises

ValueError If URI is not provided in configuration redis.exceptions.ConnectionError If unable to connect to Redis instance

Source code in graflo/db/falkordb/conn.py
def __init__(self, config: FalkordbConfig):
    """Initialize FalkorDB connection and select graph.

    Establishes connection to the FalkorDB instance and selects
    the specified graph for subsequent operations.

    Parameters
    ----------
    config : FalkordbConfig
        Connection configuration with the following fields:
        - uri: Redis URI (redis://host:port)
        - database: Graph name (optional, defaults to "default")
        - password: Redis password (optional)

    Raises
    ------
    ValueError
        If URI is not provided in configuration
    redis.exceptions.ConnectionError
        If unable to connect to Redis instance
    """
    super().__init__()
    self.config = config

    if config.uri is None:
        raise ValueError("FalkorDB connection requires a URI to be configured")

    # Parse URI to extract host and port
    parsed = urlparse(config.uri)
    host = parsed.hostname or "localhost"
    port = parsed.port or 6379

    # Initialize FalkorDB client
    if config.password:
        self.client = FalkorDB(host=host, port=port, password=config.password)
    else:
        self.client = FalkorDB(host=host, port=port)

    # Select the graph (database in config maps to graph name)
    graph_name = config.database or "default"
    self.graph = self.client.select_graph(graph_name)
    self._graph_name = graph_name

aggregate(class_name, aggregation_function, discriminant=None, aggregated_field=None, filters=None)

Perform aggregation on nodes.

Parameters:

Name Type Description Default
class_name

Label to aggregate

required
aggregation_function AggregationType

Type of aggregation to perform

required
discriminant str | None

Field to group by

None
aggregated_field str | None

Field to aggregate

None
filters list | dict | None

Query filters

None

Returns:

Type Description

dict or int: Aggregation results

Source code in graflo/db/falkordb/conn.py
def aggregate(
    self,
    class_name,
    aggregation_function: AggregationType,
    discriminant: str | None = None,
    aggregated_field: str | None = None,
    filters: list | dict | None = None,
):
    """Perform aggregation on nodes.

    Args:
        class_name: Label to aggregate
        aggregation_function: Type of aggregation to perform
        discriminant: Field to group by
        aggregated_field: Field to aggregate
        filters: Query filters

    Returns:
        dict or int: Aggregation results
    """
    # Build filter clause
    if filters is not None:
        ff = Expression.from_dict(filters)
        filter_clause = f"WHERE {ff(doc_name='n', kind=DBFlavor.NEO4J)}"
    else:
        filter_clause = ""

    # Build aggregation query based on function type
    if aggregation_function == AggregationType.COUNT:
        if discriminant:
            q = f"""
                MATCH (n:{class_name})
                {filter_clause}
                RETURN n.{discriminant} AS key, count(*) AS count
            """
            result = self.execute(q)
            return {row[0]: row[1] for row in result.result_set}
        else:
            q = f"""
                MATCH (n:{class_name})
                {filter_clause}
                RETURN count(*) AS count
            """
            result = self.execute(q)
            return result.result_set[0][0] if result.result_set else 0

    elif aggregation_function == AggregationType.MAX:
        if not aggregated_field:
            raise ValueError("aggregated_field is required for MAX aggregation")
        q = f"""
            MATCH (n:{class_name})
            {filter_clause}
            RETURN max(n.{aggregated_field}) AS max_value
        """
        result = self.execute(q)
        return result.result_set[0][0] if result.result_set else None

    elif aggregation_function == AggregationType.MIN:
        if not aggregated_field:
            raise ValueError("aggregated_field is required for MIN aggregation")
        q = f"""
            MATCH (n:{class_name})
            {filter_clause}
            RETURN min(n.{aggregated_field}) AS min_value
        """
        result = self.execute(q)
        return result.result_set[0][0] if result.result_set else None

    elif aggregation_function == AggregationType.AVERAGE:
        if not aggregated_field:
            raise ValueError("aggregated_field is required for AVERAGE aggregation")
        q = f"""
            MATCH (n:{class_name})
            {filter_clause}
            RETURN avg(n.{aggregated_field}) AS avg_value
        """
        result = self.execute(q)
        return result.result_set[0][0] if result.result_set else None

    elif aggregation_function == AggregationType.SORTED_UNIQUE:
        if not aggregated_field:
            raise ValueError(
                "aggregated_field is required for SORTED_UNIQUE aggregation"
            )
        q = f"""
            MATCH (n:{class_name})
            {filter_clause}
            RETURN DISTINCT n.{aggregated_field} AS value
            ORDER BY value
        """
        result = self.execute(q)
        return [row[0] for row in result.result_set]

    else:
        raise ValueError(
            f"Unsupported aggregation function: {aggregation_function}"
        )

close()

Close the FalkorDB connection.

Note: FalkorDB client uses Redis connection pooling, so explicit close is not always necessary.

Source code in graflo/db/falkordb/conn.py
def close(self):
    """Close the FalkorDB connection.

    Note: FalkorDB client uses Redis connection pooling,
    so explicit close is not always necessary.
    """
    # FalkorDB client handles connection pooling internally
    # No explicit close needed, but we can delete the reference
    self.graph = None
    self.client = None

create_database(name)

Create a new graph in FalkorDB.

In FalkorDB, creating a database means selecting a new graph. The graph is created implicitly when data is first inserted.

Parameters:

Name Type Description Default
name str

Name of the graph to create

required
Source code in graflo/db/falkordb/conn.py
def create_database(self, name: str):
    """Create a new graph in FalkorDB.

    In FalkorDB, creating a database means selecting a new graph.
    The graph is created implicitly when data is first inserted.

    Args:
        name: Name of the graph to create
    """
    # In FalkorDB, graphs are created implicitly when you first insert data
    # We just need to select the graph
    assert self.client is not None, "Connection is closed"
    self.graph = self.client.select_graph(name)
    self._graph_name = name
    logger.info(f"Selected FalkorDB graph '{name}'")

define_edge_collections(edges)

Define edge collections based on schema.

Note: This is a no-op in FalkorDB as edge collections are implicit.

Parameters:

Name Type Description Default
edges list[Edge]

List of edge configurations

required
Source code in graflo/db/falkordb/conn.py
def define_edge_collections(self, edges: list[Edge]):
    """Define edge collections based on schema.

    Note: This is a no-op in FalkorDB as edge collections are implicit.

    Args:
        edges: List of edge configurations
    """
    pass

define_edge_indices(edges)

Define indices for relationship types.

Creates indices for each relationship type based on the configuration. FalkorDB supports range indices on relationship properties.

Parameters:

Name Type Description Default
edges list[Edge]

List of edge configurations containing index definitions

required
Source code in graflo/db/falkordb/conn.py
def define_edge_indices(self, edges: list[Edge]):
    """Define indices for relationship types.

    Creates indices for each relationship type based on the configuration.
    FalkorDB supports range indices on relationship properties.

    Args:
        edges: List of edge configurations containing index definitions
    """
    for edge in edges:
        for index_obj in edge.indexes:
            if edge.relation is not None:
                self._add_index(edge.relation, index_obj, is_vertex_index=False)

define_schema(schema)

Define collections based on schema.

Note: This is a no-op in FalkorDB as collections are implicit. Labels and relationship types are created when data is inserted.

Parameters:

Name Type Description Default
schema Schema

Schema containing collection definitions

required
Source code in graflo/db/falkordb/conn.py
def define_schema(self, schema: Schema):
    """Define collections based on schema.

    Note: This is a no-op in FalkorDB as collections are implicit.
    Labels and relationship types are created when data is inserted.

    Args:
        schema: Schema containing collection definitions
    """
    pass

define_vertex_collections(schema)

Define vertex collections based on schema.

Note: This is a no-op in FalkorDB as vertex collections are implicit.

Parameters:

Name Type Description Default
schema Schema

Schema containing vertex definitions

required
Source code in graflo/db/falkordb/conn.py
def define_vertex_collections(self, schema: Schema):
    """Define vertex collections based on schema.

    Note: This is a no-op in FalkorDB as vertex collections are implicit.

    Args:
        schema: Schema containing vertex definitions
    """
    pass

define_vertex_indices(vertex_config)

Define indices for vertex labels.

Creates indices for each vertex label based on the configuration. FalkorDB supports range indices on node properties.

Parameters:

Name Type Description Default
vertex_config VertexConfig

Vertex configuration containing index definitions

required
Source code in graflo/db/falkordb/conn.py
def define_vertex_indices(self, vertex_config: VertexConfig):
    """Define indices for vertex labels.

    Creates indices for each vertex label based on the configuration.
    FalkorDB supports range indices on node properties.

    Args:
        vertex_config: Vertex configuration containing index definitions
    """
    for c in vertex_config.vertex_set:
        for index_obj in vertex_config.indexes(c):
            self._add_index(c, index_obj)

delete_database(name)

Delete a graph from FalkorDB.

Parameters:

Name Type Description Default
name str

Name of the graph to delete (if empty, uses current graph)

required
Source code in graflo/db/falkordb/conn.py
def delete_database(self, name: str):
    """Delete a graph from FalkorDB.

    Args:
        name: Name of the graph to delete (if empty, uses current graph)
    """
    graph_to_delete = name if name else self._graph_name
    assert self.client is not None, "Connection is closed"
    try:
        # Delete the graph using the FalkorDB API
        graph = self.client.select_graph(graph_to_delete)
        graph.delete()
        logger.info(f"Successfully deleted FalkorDB graph '{graph_to_delete}'")
    except Exception as e:
        logger.error(
            f"Failed to delete FalkorDB graph '{graph_to_delete}': {e}",
            exc_info=True,
        )
        raise

delete_graph_structure(vertex_types=(), graph_names=(), delete_all=False)

Delete graph structure (nodes and relationships) from FalkorDB.

In FalkorDB: - Labels: Categories for nodes (equivalent to vertex types) - Relationship Types: Types of relationships (equivalent to edge types) - Graph: Redis key containing all nodes and relationships

Parameters:

Name Type Description Default
vertex_types

Label names to delete nodes for

()
graph_names

Graph names to delete entirely

()
delete_all

If True, delete all nodes and relationships

False
Source code in graflo/db/falkordb/conn.py
def delete_graph_structure(self, vertex_types=(), graph_names=(), delete_all=False):
    """Delete graph structure (nodes and relationships) from FalkorDB.

    In FalkorDB:
    - Labels: Categories for nodes (equivalent to vertex types)
    - Relationship Types: Types of relationships (equivalent to edge types)
    - Graph: Redis key containing all nodes and relationships

    Args:
        vertex_types: Label names to delete nodes for
        graph_names: Graph names to delete entirely
        delete_all: If True, delete all nodes and relationships
    """
    if delete_all or (not vertex_types and not graph_names):
        # Delete all nodes and relationships in current graph
        try:
            self.execute("MATCH (n) DETACH DELETE n")
            logger.debug("Deleted all nodes and relationships from graph")
        except Exception as e:
            logger.debug(f"Graph may be empty or not exist: {e}")
    elif vertex_types:
        # Delete nodes with specific labels
        for label in vertex_types:
            try:
                self.execute(f"MATCH (n:{label}) DETACH DELETE n")
                logger.debug(f"Deleted all nodes with label '{label}'")
            except Exception as e:
                logger.warning(f"Failed to delete nodes with label '{label}': {e}")

    # Delete specific graphs
    assert self.client is not None, "Connection is closed"
    for graph_name in graph_names:
        try:
            graph = self.client.select_graph(graph_name)
            graph.delete()
            logger.debug(f"Deleted graph '{graph_name}'")
        except Exception as e:
            logger.warning(f"Failed to delete graph '{graph_name}': {e}")

execute(query, **kwargs)

Execute a raw OpenCypher query against the graph.

Executes the provided Cypher query with optional parameters. Parameters are safely injected using FalkorDB's parameterized query mechanism to prevent injection attacks.

Parameters

query : str OpenCypher query string. Can include parameter placeholders using $name syntax (e.g., "MATCH (n) WHERE n.id = $id") **kwargs Query parameters as keyword arguments. Values are safely escaped by the driver.

Returns

QueryResult FalkorDB result object containing: - result_set: List of result rows - statistics: Query execution statistics

Examples

Simple query::

result = conn.execute("MATCH (n:Person) RETURN n.name")

Parameterized query::

result = conn.execute(
    "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
    min_age=21
)
Source code in graflo/db/falkordb/conn.py
def execute(self, query: str, **kwargs):
    """Execute a raw OpenCypher query against the graph.

    Executes the provided Cypher query with optional parameters.
    Parameters are safely injected using FalkorDB's parameterized
    query mechanism to prevent injection attacks.

    Parameters
    ----------
    query : str
        OpenCypher query string. Can include parameter placeholders
        using $name syntax (e.g., "MATCH (n) WHERE n.id = $id")
    **kwargs
        Query parameters as keyword arguments. Values are safely
        escaped by the driver.

    Returns
    -------
    QueryResult
        FalkorDB result object containing:
        - result_set: List of result rows
        - statistics: Query execution statistics

    Examples
    --------
    Simple query::

        result = conn.execute("MATCH (n:Person) RETURN n.name")

    Parameterized query::

        result = conn.execute(
            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
            min_age=21
        )
    """
    assert self.graph is not None, "Connection is closed"
    result = self.graph.query(query, kwargs if kwargs else None)
    return result

fetch_docs(class_name, filters=None, limit=None, return_keys=None, unset_keys=None, **kwargs)

Fetch nodes from a label.

Parameters:

Name Type Description Default
class_name

Label to fetch from

required
filters list | dict | None

Query filters

None
limit int | None

Maximum number of nodes to return

None
return_keys list | None

Keys to return

None
unset_keys list | None

Unused in FalkorDB

None

Returns:

Name Type Description
list

Fetched nodes as dictionaries

Source code in graflo/db/falkordb/conn.py
def fetch_docs(
    self,
    class_name,
    filters: list | dict | None = None,
    limit: int | None = None,
    return_keys: list | None = None,
    unset_keys: list | None = None,
    **kwargs,
):
    """Fetch nodes from a label.

    Args:
        class_name: Label to fetch from
        filters: Query filters
        limit: Maximum number of nodes to return
        return_keys: Keys to return
        unset_keys: Unused in FalkorDB

    Returns:
        list: Fetched nodes as dictionaries
    """
    # Build filter clause
    if filters is not None:
        ff = Expression.from_dict(filters)
        # Use NEO4J flavor since FalkorDB uses OpenCypher
        filter_clause = f"WHERE {ff(doc_name='n', kind=DBFlavor.NEO4J)}"
    else:
        filter_clause = ""

    # Build return clause
    if return_keys is not None:
        # Project specific keys
        keep_clause_ = ", ".join([f"n.{item} AS {item}" for item in return_keys])
        return_clause = f"RETURN {keep_clause_}"
    else:
        return_clause = "RETURN n"

    # Build limit clause (must be positive integer)
    if limit is not None and isinstance(limit, int) and limit > 0:
        limit_clause = f"LIMIT {limit}"
    else:
        limit_clause = ""

    q = f"""
        MATCH (n:{class_name})
        {filter_clause}
        {return_clause}
        {limit_clause}
    """

    result = self.execute(q)

    # Convert FalkorDB results to list of dictionaries
    if return_keys is not None:
        # Results are already projected
        return [dict(zip(return_keys, row)) for row in result.result_set]
    else:
        # Results contain node objects
        return [self._node_to_dict(row[0]) for row in result.result_set]

fetch_edges(from_type, from_id, edge_type=None, to_type=None, to_id=None, filters=None, limit=None, return_keys=None, unset_keys=None, **kwargs)

Fetch edges from FalkorDB using Cypher.

Parameters:

Name Type Description Default
from_type str

Source node label

required
from_id str

Source node ID (property name depends on match_keys used)

required
edge_type str | None

Optional relationship type to filter by

None
to_type str | None

Optional target node label to filter by

None
to_id str | None

Optional target node ID to filter by

None
filters list | dict | None

Additional query filters

None
limit int | None

Maximum number of edges to return

None
return_keys list | None

Keys to return (projection)

None
unset_keys list | None

Keys to exclude (projection) - not supported in FalkorDB

None
**kwargs

Additional parameters

{}

Returns:

Name Type Description
list

List of fetched edges as dictionaries

Source code in graflo/db/falkordb/conn.py
def fetch_edges(
    self,
    from_type: str,
    from_id: str,
    edge_type: str | None = None,
    to_type: str | None = None,
    to_id: str | None = None,
    filters: list | dict | None = None,
    limit: int | None = None,
    return_keys: list | None = None,
    unset_keys: list | None = None,
    **kwargs,
):
    """Fetch edges from FalkorDB using Cypher.

    Args:
        from_type: Source node label
        from_id: Source node ID (property name depends on match_keys used)
        edge_type: Optional relationship type to filter by
        to_type: Optional target node label to filter by
        to_id: Optional target node ID to filter by
        filters: Additional query filters
        limit: Maximum number of edges to return
        return_keys: Keys to return (projection)
        unset_keys: Keys to exclude (projection) - not supported in FalkorDB
        **kwargs: Additional parameters

    Returns:
        list: List of fetched edges as dictionaries
    """
    # Build source node match
    source_match = f"(source:{from_type} {{id: '{from_id}'}})"

    # Build relationship pattern
    if edge_type:
        rel_pattern = f"-[r:{edge_type}]->"
    else:
        rel_pattern = "-[r]->"

    # Build target node match
    if to_type:
        target_match = f"(target:{to_type})"
    else:
        target_match = "(target)"

    # Build WHERE clauses
    where_clauses = []
    if to_id:
        where_clauses.append(f"target.id = '{to_id}'")

    # Add additional filters if provided
    if filters is not None:
        ff = Expression.from_dict(filters)
        filter_clause = ff(doc_name="r", kind=ExpressionFlavor.NEO4J)
        where_clauses.append(filter_clause)

    where_clause = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""

    # Build return clause
    if return_keys is not None:
        return_parts = ", ".join([f"r.{key} AS {key}" for key in return_keys])
        return_clause = f"RETURN {return_parts}"
    else:
        return_clause = "RETURN r"

    limit_clause = f"LIMIT {limit}" if limit and limit > 0 else ""

    query = f"""
        MATCH {source_match}{rel_pattern}{target_match}
        {where_clause}
        {return_clause}
        {limit_clause}
    """

    result = self.execute(query)

    # Convert results
    if return_keys is not None:
        return [dict(zip(return_keys, row)) for row in result.result_set]
    else:
        return [self._edge_to_dict(row[0]) for row in result.result_set]

fetch_present_documents(batch, class_name, match_keys, keep_keys, flatten=False, filters=None)

Fetch nodes that exist in the database.

Parameters:

Name Type Description Default
batch

Batch of documents to check

required
class_name

Label to check in

required
match_keys

Keys to match nodes

required
keep_keys

Keys to keep in result

required
flatten

Unused in FalkorDB

False
filters list | dict | None

Additional query filters

None

Returns:

Name Type Description
list

Documents that exist in the database

Source code in graflo/db/falkordb/conn.py
def fetch_present_documents(
    self,
    batch,
    class_name,
    match_keys,
    keep_keys,
    flatten=False,
    filters: list | dict | None = None,
):
    """Fetch nodes that exist in the database.

    Args:
        batch: Batch of documents to check
        class_name: Label to check in
        match_keys: Keys to match nodes
        keep_keys: Keys to keep in result
        flatten: Unused in FalkorDB
        filters: Additional query filters

    Returns:
        list: Documents that exist in the database
    """
    if not batch:
        return []

    # Build match conditions for each document in batch
    results = []
    for doc in batch:
        match_conditions = " AND ".join([f"n.{key} = ${key}" for key in match_keys])
        params = {key: doc.get(key) for key in match_keys}

        q = f"""
            MATCH (n:{class_name})
            WHERE {match_conditions}
            RETURN n
            LIMIT 1
        """

        try:
            result = self.execute(q, **params)
            if result.result_set:
                node_dict = self._node_to_dict(result.result_set[0][0])
                if keep_keys:
                    node_dict = {k: node_dict.get(k) for k in keep_keys}
                results.append(node_dict)
        except Exception as e:
            logger.debug(f"Error checking document presence: {e}")

    return results

init_db(schema, clean_start)

Initialize FalkorDB with the given schema.

Uses schema.general.name if database is not set in config.

Parameters:

Name Type Description Default
schema Schema

Schema containing graph structure definitions

required
clean_start bool

If True, delete all existing data before initialization

required
Source code in graflo/db/falkordb/conn.py
def init_db(self, schema: Schema, clean_start: bool):
    """Initialize FalkorDB with the given schema.

    Uses schema.general.name if database is not set in config.

    Args:
        schema: Schema containing graph structure definitions
        clean_start: If True, delete all existing data before initialization
    """
    # Determine graph name: use config.database if set, otherwise use schema.general.name
    graph_name = self.config.database
    if not graph_name:
        graph_name = schema.general.name
        self.config.database = graph_name

    # Select/create the graph
    assert self.client is not None, "Connection is closed"
    self.graph = self.client.select_graph(graph_name)
    self._graph_name = graph_name
    logger.info(f"Initialized FalkorDB graph '{graph_name}'")

    if clean_start:
        try:
            self.delete_graph_structure(delete_all=True)
            logger.debug(f"Cleaned graph '{graph_name}' for fresh start")
        except Exception as e:
            logger.debug(f"Clean start note for graph '{graph_name}': {e}")

    try:
        self.define_indexes(schema)
        logger.debug(f"Defined indexes for graph '{graph_name}'")
    except Exception as e:
        logger.error(
            f"Failed to define indexes for graph '{graph_name}': {e}",
            exc_info=True,
        )
        raise

insert_edges_batch(docs_edges, source_class, target_class, relation_name, collection_name=None, match_keys_source=('_key',), match_keys_target=('_key',), filter_uniques=True, uniq_weight_fields=None, uniq_weight_collections=None, upsert_option=False, head=None, **kwargs)

Create relationships between existing nodes using Cypher MERGE.

Efficiently creates relationships in batch by matching source and target nodes, then creating or updating the relationship between them.

Parameters

docs_edges : list Edge specifications as list of [source, target, props] triples: [[{source_props}, {target_props}, {edge_props}], ...] source_class : str Label of source nodes (e.g., "Person") target_class : str Label of target nodes (e.g., "Company") relation_name : str Relationship type name (e.g., "WORKS_AT") collection_name : str, optional Unused in FalkorDB (kept for interface compatibility) match_keys_source : tuple[str, ...] Properties to match source nodes (default: ("_key",)) match_keys_target : tuple[str, ...] Properties to match target nodes (default: ("_key",)) filter_uniques : bool Unused in FalkorDB (kept for interface compatibility) uniq_weight_fields Unused in FalkorDB (kept for interface compatibility) uniq_weight_collections Unused in FalkorDB (kept for interface compatibility) upsert_option : bool Unused in FalkorDB (kept for interface compatibility) head : int, optional Unused in FalkorDB (kept for interface compatibility) **kwargs Additional options: - dry (bool): If True, build query but don't execute

Examples

Create KNOWS relationships between people::

edges = [
    [{"id": "1"}, {"id": "2"}, {"since": 2020}],
    [{"id": "1"}, {"id": "3"}, {"since": 2021}]
]
conn.insert_edges_batch(
    edges,
    source_class="Person",
    target_class="Person",
    relation_name="KNOWS",
    match_keys_source=["id"],
    match_keys_target=["id"]
)
Notes

Generated Cypher pattern::

UNWIND $batch AS row
MATCH (source:Label), (target:Label)
WHERE source.key = row[0].key AND target.key = row[1].key
MERGE (source)-[r:REL_TYPE]->(target)
SET r += row[2]
Source code in graflo/db/falkordb/conn.py
def insert_edges_batch(
    self,
    docs_edges: list,
    source_class: str,
    target_class: str,
    relation_name: str,
    collection_name: str | None = None,
    match_keys_source: tuple[str, ...] = ("_key",),
    match_keys_target: tuple[str, ...] = ("_key",),
    filter_uniques: bool = True,
    uniq_weight_fields=None,
    uniq_weight_collections=None,
    upsert_option: bool = False,
    head: int | None = None,
    **kwargs,
):
    """Create relationships between existing nodes using Cypher MERGE.

    Efficiently creates relationships in batch by matching source and
    target nodes, then creating or updating the relationship between them.

    Parameters
    ----------
    docs_edges : list
        Edge specifications as list of [source, target, props] triples:
        ``[[{source_props}, {target_props}, {edge_props}], ...]``
    source_class : str
        Label of source nodes (e.g., "Person")
    target_class : str
        Label of target nodes (e.g., "Company")
    relation_name : str
        Relationship type name (e.g., "WORKS_AT")
    collection_name : str, optional
        Unused in FalkorDB (kept for interface compatibility)
    match_keys_source : tuple[str, ...]
        Properties to match source nodes (default: ("_key",))
    match_keys_target : tuple[str, ...]
        Properties to match target nodes (default: ("_key",))
    filter_uniques : bool
        Unused in FalkorDB (kept for interface compatibility)
    uniq_weight_fields
        Unused in FalkorDB (kept for interface compatibility)
    uniq_weight_collections
        Unused in FalkorDB (kept for interface compatibility)
    upsert_option : bool
        Unused in FalkorDB (kept for interface compatibility)
    head : int, optional
        Unused in FalkorDB (kept for interface compatibility)
    **kwargs
        Additional options:
        - dry (bool): If True, build query but don't execute

    Examples
    --------
    Create KNOWS relationships between people::

        edges = [
            [{"id": "1"}, {"id": "2"}, {"since": 2020}],
            [{"id": "1"}, {"id": "3"}, {"since": 2021}]
        ]
        conn.insert_edges_batch(
            edges,
            source_class="Person",
            target_class="Person",
            relation_name="KNOWS",
            match_keys_source=["id"],
            match_keys_target=["id"]
        )

    Notes
    -----
    Generated Cypher pattern::

        UNWIND $batch AS row
        MATCH (source:Label), (target:Label)
        WHERE source.key = row[0].key AND target.key = row[1].key
        MERGE (source)-[r:REL_TYPE]->(target)
        SET r += row[2]
    """
    dry = kwargs.pop("dry", False)

    if not docs_edges:
        return

    # Build match conditions for source and target nodes
    source_match_str = [f"source.{key} = row[0].{key}" for key in match_keys_source]
    target_match_str = [f"target.{key} = row[1].{key}" for key in match_keys_target]

    match_clause = "WHERE " + " AND ".join(source_match_str + target_match_str)

    q = f"""
        UNWIND $batch AS row
        MATCH (source:{source_class}),
              (target:{target_class}) {match_clause}
        MERGE (source)-[r:{relation_name}]->(target)
        SET r += row[2]
    """
    if not dry:
        self.execute(q, batch=docs_edges)

insert_return_batch(docs, class_name)

Insert nodes and return their properties.

Note: Limited implementation in FalkorDB.

Parameters:

Name Type Description Default
docs

Documents to insert

required
class_name

Label to insert into

required

Raises:

Type Description
NotImplementedError

This method is not fully implemented for FalkorDB

Source code in graflo/db/falkordb/conn.py
def insert_return_batch(self, docs, class_name):
    """Insert nodes and return their properties.

    Note: Limited implementation in FalkorDB.

    Args:
        docs: Documents to insert
        class_name: Label to insert into

    Raises:
        NotImplementedError: This method is not fully implemented for FalkorDB
    """
    raise NotImplementedError("insert_return_batch is not implemented for FalkorDB")

keep_absent_documents(batch, class_name, match_keys, keep_keys, filters=None)

Keep documents that don't exist in the database.

Parameters:

Name Type Description Default
batch

Batch of documents to check

required
class_name

Label to check in

required
match_keys

Keys to match nodes

required
keep_keys

Keys to keep in result

required
filters list | dict | None

Additional query filters

None

Returns:

Name Type Description
list

Documents that don't exist in the database

Source code in graflo/db/falkordb/conn.py
def keep_absent_documents(
    self,
    batch,
    class_name,
    match_keys,
    keep_keys,
    filters: list | dict | None = None,
):
    """Keep documents that don't exist in the database.

    Args:
        batch: Batch of documents to check
        class_name: Label to check in
        match_keys: Keys to match nodes
        keep_keys: Keys to keep in result
        filters: Additional query filters

    Returns:
        list: Documents that don't exist in the database
    """
    if not batch:
        return []

    # Find documents that exist
    present_docs = self.fetch_present_documents(
        batch, class_name, match_keys, match_keys, filters=filters
    )

    # Create a set of present document keys for efficient lookup
    present_keys = set()
    for doc in present_docs:
        key_tuple = tuple(doc.get(k) for k in match_keys)
        present_keys.add(key_tuple)

    # Filter out documents that exist
    absent_docs = []
    for doc in batch:
        key_tuple = tuple(doc.get(k) for k in match_keys)
        if key_tuple not in present_keys:
            if keep_keys:
                absent_docs.append({k: doc.get(k) for k in keep_keys})
            else:
                absent_docs.append(doc)

    return absent_docs

upsert_docs_batch(docs, class_name, match_keys, **kwargs)

Upsert a batch of nodes using Cypher MERGE.

Performs atomic upsert (update-or-insert) operations on a batch of documents. Uses Cypher MERGE with ON MATCH/ON CREATE for efficiency.

The operation: 1. Sanitizes all documents (removes invalid keys/values) 2. For each document, attempts to MERGE on match_keys 3. If node exists: updates all properties 4. If node doesn't exist: creates with all properties

Parameters

docs : list[dict] Documents to upsert. Each document must contain all match_keys. class_name : str Node label (e.g., "Person", "Product") match_keys : list[str] Properties used to identify existing nodes. These form the MERGE pattern: MERGE (n:Label {key1: val1, key2: val2}) **kwargs Additional options: - dry (bool): If True, build query but don't execute

Raises

ValueError If any document is missing a required match_key or has None value

Examples

Insert or update users by email::

docs = [
    {"email": "alice@example.com", "name": "Alice", "age": 30},
    {"email": "bob@example.com", "name": "Bob", "age": 25}
]
conn.upsert_docs_batch(docs, "User", match_keys=["email"])
Notes

The generated Cypher query uses UNWIND for batch efficiency::

UNWIND $batch AS row
MERGE (n:Label {match_key: row.match_key})
ON MATCH SET n += row
ON CREATE SET n += row
Source code in graflo/db/falkordb/conn.py
def upsert_docs_batch(
    self, docs: list[dict], class_name: str, match_keys: list[str], **kwargs
):
    """Upsert a batch of nodes using Cypher MERGE.

    Performs atomic upsert (update-or-insert) operations on a batch of
    documents. Uses Cypher MERGE with ON MATCH/ON CREATE for efficiency.

    The operation:
    1. Sanitizes all documents (removes invalid keys/values)
    2. For each document, attempts to MERGE on match_keys
    3. If node exists: updates all properties
    4. If node doesn't exist: creates with all properties

    Parameters
    ----------
    docs : list[dict]
        Documents to upsert. Each document must contain all match_keys.
    class_name : str
        Node label (e.g., "Person", "Product")
    match_keys : list[str]
        Properties used to identify existing nodes. These form the
        MERGE pattern: ``MERGE (n:Label {key1: val1, key2: val2})``
    **kwargs
        Additional options:
        - dry (bool): If True, build query but don't execute

    Raises
    ------
    ValueError
        If any document is missing a required match_key or has None value

    Examples
    --------
    Insert or update users by email::

        docs = [
            {"email": "alice@example.com", "name": "Alice", "age": 30},
            {"email": "bob@example.com", "name": "Bob", "age": 25}
        ]
        conn.upsert_docs_batch(docs, "User", match_keys=["email"])

    Notes
    -----
    The generated Cypher query uses UNWIND for batch efficiency::

        UNWIND $batch AS row
        MERGE (n:Label {match_key: row.match_key})
        ON MATCH SET n += row
        ON CREATE SET n += row
    """
    dry = kwargs.pop("dry", False)

    if not docs:
        return

    # Sanitize documents: filter invalid keys/values, validate match_keys
    sanitized_docs = self._sanitize_batch(docs, match_keys)

    if not sanitized_docs:
        return

    # Build the MERGE clause with match keys
    index_str = ", ".join([f"{k}: row.{k}" for k in match_keys])
    q = f"""
        UNWIND $batch AS row
        MERGE (n:{class_name} {{ {index_str} }})
        ON MATCH SET n += row
        ON CREATE SET n += row
    """
    if not dry:
        self.execute(q, batch=sanitized_docs)