Skip to content

graphcast.db.arango.conn

ArangoDB connection implementation for graph database operations.

This module implements the Connection interface for ArangoDB, providing specific functionality for graph operations in ArangoDB. It handles: - Graph and collection management - Document and edge operations - Index creation and management - AQL query execution - Batch operations with upsert support

Key Features
  • Graph-based document organization
  • Edge collection management
  • Persistent, hash, skiplist, and fulltext indices
  • Batch document and edge operations
  • AQL query generation and execution
Example

conn = ArangoConnection(config) conn.init_db(schema, clean_start=True) conn.upsert_docs_batch(docs, "users", match_keys=["email"])

ArangoConnection

Bases: Connection

ArangoDB-specific implementation of the Connection interface.

This class provides ArangoDB-specific implementations for all database operations, including graph management, document operations, and query execution. It uses the ArangoDB Python driver for all operations.

Attributes:

Name Type Description
conn

ArangoDB database connection instance

Source code in graphcast/db/arango/conn.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
class ArangoConnection(Connection):
    """ArangoDB-specific implementation of the Connection interface.

    This class provides ArangoDB-specific implementations for all database
    operations, including graph management, document operations, and query
    execution. It uses the ArangoDB Python driver for all operations.

    Attributes:
        conn: ArangoDB database connection instance
    """

    def __init__(self, config: ArangoConnectionConfig):
        """Initialize ArangoDB connection.

        Args:
            config: ArangoDB connection configuration containing URL, credentials,
                and database name
        """
        super().__init__()
        client = ArangoClient(hosts=config.url, request_timeout=config.request_timeout)

        self.conn = client.db(
            config.database,
            username=config.cred_name,
            password=config.cred_pass,
        )

    def create_database(self, name: str):
        """Create a new ArangoDB database.

        Args:
            name: Name of the database to create
        """
        if not self.conn.has_database(name):
            self.conn.create_database(name)

    def delete_database(self, name: str):
        """Delete an ArangoDB database.

        Args:
            name: Name of the database to delete
        """
        if not self.conn.has_database(name):
            self.conn.delete_database(name)

    def execute(self, query, **kwargs):
        """Execute an AQL query.

        Args:
            query: AQL query string to execute
            **kwargs: Additional query parameters

        Returns:
            Cursor: ArangoDB cursor for the query results
        """
        cursor = self.conn.aql.execute(query)
        return cursor

    def close(self):
        """Close the ArangoDB connection."""
        # self.conn.close()
        pass

    def init_db(self, schema: Schema, clean_start):
        """Initialize ArangoDB with the given schema.

        Args:
            schema: Schema containing graph structure definitions
            clean_start: If True, delete all existing collections before initialization
        """
        if clean_start:
            self.delete_collections([], [], delete_all=True)
        self.define_collections(schema)
        self.define_indexes(schema)

    def define_collections(self, schema: Schema):
        """Define ArangoDB collections based on schema.

        Args:
            schema: Schema containing collection definitions
        """
        self.define_vertex_collections(schema)
        self.define_edge_collections(schema.edge_config.edges_list(include_aux=True))

    def define_vertex_collections(self, schema: Schema):
        """Define vertex collections in ArangoDB.

        Creates vertex collections for both connected and disconnected vertices,
        organizing them into appropriate graphs.

        Args:
            schema: Schema containing vertex definitions
        """
        vertex_config = schema.vertex_config
        disconnected_vertex_collections = (
            set(vertex_config.vertex_set) - schema.edge_config.vertices
        )
        for item in schema.edge_config.edges_list():
            u, v = item.source, item.target
            gname = item.graph_name
            logger.info(f"{item.source}, {item.target}, {gname}")
            if self.conn.has_graph(gname):
                g = self.conn.graph(gname)
            else:
                g = self.conn.create_graph(gname)  # type: ignore

            _ = self.create_collection(
                vertex_config.vertex_dbname(u), vertex_config.index(u), g
            )

            _ = self.create_collection(
                vertex_config.vertex_dbname(v), vertex_config.index(v), g
            )
        for v in disconnected_vertex_collections:
            _ = self.create_collection(
                vertex_config.vertex_dbname(v), vertex_config.index(v), None
            )

    def define_edge_collections(self, edges: list[Edge]):
        """Define edge collections in ArangoDB.

        Creates edge collections and their definitions in the appropriate graphs.

        Args:
            edges: List of edge configurations to create
        """
        for item in edges:
            gname = item.graph_name
            if self.conn.has_graph(gname):
                g = self.conn.graph(gname)
            else:
                g = self.conn.create_graph(gname)  # type: ignore
            if not g.has_edge_definition(item.collection_name):
                _ = g.create_edge_definition(
                    edge_collection=item.collection_name,
                    from_vertex_collections=[item.source_collection],
                    to_vertex_collections=[item.target_collection],
                )

    def _add_index(self, general_collection, index: Index):
        """Add an index to an ArangoDB collection.

        Supports persistent, hash, skiplist, and fulltext indices.

        Args:
            general_collection: ArangoDB collection to add index to
            index: Index configuration to create

        Returns:
            IndexHandle: Handle to the created index
        """
        data = index.db_form(DBFlavor.ARANGO)
        if index.type == IndexType.PERSISTENT:
            ih = general_collection.add_index(data)
        if index.type == IndexType.HASH:
            ih = general_collection.add_index(data)
        elif index.type == IndexType.SKIPLIST:
            ih = general_collection.add_skiplist_index(
                fields=index.fields, unique=index.unique
            )
        elif index.type == IndexType.FULLTEXT:
            ih = general_collection.add_index(
                data={"fields": index.fields, "type": "fulltext"}
            )
        else:
            ih = None
        return ih

    def define_vertex_indices(self, vertex_config: VertexConfig):
        """Define indices for vertex collections.

        Creates indices for each vertex collection based on the configuration.

        Args:
            vertex_config: Vertex configuration containing index definitions
        """
        for c in vertex_config.vertex_set:
            general_collection = self.conn.collection(vertex_config.vertex_dbname(c))
            ixs = general_collection.indexes()
            field_combinations = [tuple(ix["fields"]) for ix in ixs]
            for index_obj in vertex_config.indexes(c):
                if tuple(index_obj.fields) not in field_combinations:
                    self._add_index(general_collection, index_obj)

    def define_edge_indices(self, edges: list[Edge]):
        """Define indices for edge collections.

        Creates indices for each edge collection based on the configuration.

        Args:
            edges: List of edge configurations containing index definitions
        """
        for edge in edges:
            general_collection = self.conn.collection(edge.collection_name)
            for index_obj in edge.indexes:
                self._add_index(general_collection, index_obj)

    def fetch_indexes(self, db_class_name: Optional[str] = None):
        """Fetch all indices from the database.

        Args:
            db_class_name: Optional collection name to fetch indices for

        Returns:
            dict: Mapping of collection names to their indices
        """
        if db_class_name is None:
            classes = self.conn.collections()
        elif self.conn.has_collection(db_class_name):
            classes = [self.conn.collection(db_class_name)]
        else:
            classes = []

        r = {}
        for cname in classes:
            assert isinstance(cname["name"], str)
            c = self.conn.collection(cname["name"])
            r[cname["name"]] = c.indexes()
        return r

    def create_collection(self, db_class_name, index: None | Index = None, g=None):
        """Create a new ArangoDB collection.

        Args:
            db_class_name: Name of the collection to create
            index: Optional index to create on the collection
            g: Optional graph to create the collection in

        Returns:
            IndexHandle: Handle to the created index if one was created
        """
        if not self.conn.has_collection(db_class_name):
            if g is not None:
                _ = g.create_vertex_collection(db_class_name)
            else:
                self.conn.create_collection(db_class_name)
            general_collection = self.conn.collection(db_class_name)
            if index is not None and index.fields != ["_key"]:
                ih = self._add_index(general_collection, index)
                return ih
            else:
                return None

    def delete_collections(self, cnames=(), gnames=(), delete_all=False):
        """Delete collections and graphs from ArangoDB.

        Args:
            cnames: Collection names to delete
            gnames: Graph names to delete
            delete_all: If True, delete all non-system collections and graphs
        """
        logger.info("collections (non system):")
        logger.info([c for c in self.conn.collections() if c["name"][0] != "_"])

        if delete_all:
            cnames = [c["name"] for c in self.conn.collections() if c["name"][0] != "_"]
            gnames = [g["name"] for g in self.conn.graphs()]

        for gn in gnames:
            if self.conn.has_graph(gn):
                self.conn.delete_graph(gn)

        logger.info("graphs (after delete operation):")
        logger.info(self.conn.graphs())

        for cn in cnames:
            if self.conn.has_collection(cn):
                self.conn.delete_collection(cn)

        logger.info("collections (after delete operation):")
        logger.info([c for c in self.conn.collections() if c["name"][0] != "_"])

        logger.info("graphs:")
        logger.info(self.conn.graphs())

    def get_collections(self):
        """Get all collections in the database.

        Returns:
            list: List of collection information dictionaries
        """
        return self.conn.collections()

    def upsert_docs_batch(
        self,
        docs,
        class_name,
        match_keys: list[str] | None = None,
        **kwargs,
    ):
        """Upsert a batch of documents using AQL.

        Performs an upsert operation on a batch of documents, using the specified
        match keys to determine whether to update existing documents or insert new ones.

        Args:
            docs: List of documents to upsert
            class_name: Collection name to upsert into
            match_keys: Keys to match for upsert operation
            **kwargs: Additional options:
                - dry: If True, don't execute the query
                - update_keys: Keys to update on match
                - filter_uniques: If True, filter duplicate documents
        """
        dry = kwargs.pop("dry", False)
        update_keys = kwargs.pop("update_keys", None)
        filter_uniques = kwargs.pop("filter_uniques", True)

        if isinstance(docs, list):
            if filter_uniques:
                docs = pick_unique_dict(docs)
            docs = json.dumps(docs)
        if match_keys is None:
            upsert_clause = ""
            update_clause = ""
        else:
            upsert_clause = ", ".join([f'"{k}": doc.{k}' for k in match_keys])
            upsert_clause = f"UPSERT {{{upsert_clause}}}"

            if isinstance(update_keys, list):
                update_clause = ", ".join([f'"{k}": doc.{k}' for k in update_keys])
                update_clause = f"{{{update_clause}}}"
            elif update_keys == "doc":
                update_clause = "doc"
            else:
                update_clause = "{}"
            update_clause = f"UPDATE {update_clause}"

        options = "OPTIONS {exclusive: true, ignoreErrors: true}"

        q_update = f"""FOR doc in {docs}
                            {upsert_clause}
                            INSERT doc
                            {update_clause} 
                                IN {class_name} {options}"""
        if not dry:
            self.execute(q_update)

    def insert_edges_batch(
        self,
        docs_edges,
        source_class,
        target_class,
        relation_name=None,
        collection_name=None,
        match_keys_source=("_key",),
        match_keys_target=("_key",),
        filter_uniques=True,
        uniq_weight_fields=None,
        uniq_weight_collections=None,
        upsert_option=False,
        head=None,
        **kwargs,
    ):
        """Insert a batch of edges using AQL.

        Creates edges between source and target vertices, with support for
        weight fields and unique constraints.

        Args:
            docs_edges: List of edge documents in format [{_source_aux: source_doc, _target_aux: target_doc}]
            source_class: Source vertex collection name
            target_class: Target vertex collection name
            relation_name: Optional relation name for the edges
            collection_name: Edge collection name
            match_keys_source: Keys to match source vertices
            match_keys_target: Keys to match target vertices
            filter_uniques: If True, filter duplicate edges
            uniq_weight_fields: Fields to consider for uniqueness
            uniq_weight_collections: Collections to consider for uniqueness
            upsert_option: If True, use upsert instead of insert
            head: Optional limit on number of edges to insert
            **kwargs: Additional options:
                - dry: If True, don't execute the query
        """
        dry = kwargs.pop("dry", False)

        if isinstance(docs_edges, list):
            if docs_edges:
                logger.debug(f" docs_edges[0] = {docs_edges[0]}")
            if head is not None:
                docs_edges = docs_edges[:head]
            if filter_uniques:
                docs_edges = pick_unique_dict(docs_edges)
            docs_edges_str = json.dumps(docs_edges)
        else:
            return ""

        if match_keys_source[0] == "_key":
            result_from = f'CONCAT("{source_class}/", edge.{SOURCE_AUX}._key)'
            source_filter = ""
        else:
            result_from = "sources[0]._id"
            filter_source = " && ".join(
                [f"v.{k} == edge.{SOURCE_AUX}.{k}" for k in match_keys_source]
            )
            source_filter = (
                f"LET sources = (FOR v IN {source_class} FILTER"
                f" {filter_source} LIMIT 1 RETURN v)"
            )

        if match_keys_target[0] == "_key":
            result_to = f'CONCAT("{target_class}/", edge.{TARGET_AUX}._key)'
            target_filter = ""
        else:
            result_to = "targets[0]._id"
            filter_target = " && ".join(
                [f"v.{k} == edge.{TARGET_AUX}.{k}" for k in match_keys_target]
            )
            target_filter = (
                f"LET targets = (FOR v IN {target_class} FILTER"
                f" {filter_target} LIMIT 1 RETURN v)"
            )

        doc_definition = (
            f"MERGE({{_from : {result_from}, _to : {result_to}}},"
            f" UNSET(edge, '{SOURCE_AUX}', '{TARGET_AUX}'))"
        )

        logger.debug(f" source_filter = {source_filter}")
        logger.debug(f" target_filter = {target_filter}")
        logger.debug(f" doc = {doc_definition}")

        if upsert_option:
            ups_from = result_from if source_filter else "doc._from"
            ups_to = result_to if target_filter else "doc._to"

            weight_fs = []
            if uniq_weight_fields is not None:
                weight_fs += uniq_weight_fields
            if uniq_weight_collections is not None:
                weight_fs += uniq_weight_collections
            if relation_name is not None:
                weight_fs += ["relation"]

            if weight_fs:
                weights_clause = ", " + ", ".join(
                    [f"'{x}' : edge.{x}" for x in weight_fs]
                )
            else:
                weights_clause = ""

            upsert = f"{{'_from': {ups_from}, '_to': {ups_to}" + weights_clause + "}"
            logger.debug(f" upsert clause: {upsert}")
            clauses = f"UPSERT {upsert} INSERT doc UPDATE {{}}"
            options = "OPTIONS {exclusive: true}"
        else:
            if relation_name is None:
                doc_clause = "doc"
            else:
                doc_clause = f"MERGE(doc, {{'relation': '{relation_name}' }})"
            clauses = f"INSERT {doc_clause}"
            options = "OPTIONS {exclusive: true, ignoreErrors: true}"

        q_update = f"""
            FOR edge in {docs_edges_str} {source_filter} {target_filter}
                LET doc = {doc_definition}
                {clauses}
                in {collection_name} {options}"""
        if not dry:
            self.execute(q_update)

    def insert_return_batch(self, docs, class_name):
        """Insert documents and return their keys.

        Args:
            docs: Documents to insert
            class_name: Collection to insert into

        Returns:
            str: AQL query string for the operation
        """
        docs = json.dumps(docs)
        query0 = f"""FOR doc in {docs}
              INSERT doc
              INTO {class_name}
              LET inserted = NEW
              RETURN {{_key: inserted._key}}
        """
        return query0

    def fetch_present_documents(
        self,
        batch,
        class_name,
        match_keys,
        keep_keys,
        flatten=False,
        filters: None | Clause | list | dict = None,
    ) -> list | dict:
        """Fetch documents that exist in the database.

        Args:
            batch: Batch of documents to check
            class_name: Collection to check in
            match_keys: Keys to match documents
            keep_keys: Keys to keep in result
            flatten: If True, flatten the result into a list
            filters: Additional query filters

        Returns:
            Union[list, dict]: Documents that exist in the database, either as a
                flat list or a dictionary mapping batch indices to documents
        """
        q0 = fetch_fields_query(
            collection_name=class_name,
            docs=batch,
            match_keys=match_keys,
            keep_keys=keep_keys,
            filters=filters,
        )
        # {"__i": i, "_group": [doc]}
        cursor = self.execute(q0)

        if flatten:
            rdata = []
            for item in get_data_from_cursor(cursor):
                group = item.pop("_group", [])
                rdata += [sub_item for sub_item in group]
            return rdata
        else:
            rdata_dict = {}
            for item in get_data_from_cursor(cursor):
                __i = item.pop("__i")
                group = item.pop("_group")
                rdata_dict[__i] = group
            return rdata_dict

    def fetch_docs(
        self,
        class_name,
        filters: None | Clause | list | dict = None,
        limit: int | None = None,
        return_keys: list | None = None,
        unset_keys: list | None = None,
    ):
        """Fetch documents from a collection.

        Args:
            class_name: Collection to fetch from
            filters: Query filters
            limit: Maximum number of documents to return
            return_keys: Keys to return
            unset_keys: Keys to unset

        Returns:
            list: Fetched documents
        """
        filter_clause = render_filters(filters, doc_name="d")

        if return_keys is None:
            if unset_keys is None:
                return_clause = "d"
            else:
                tmp_clause = ", ".join([f'"{item}"' for item in unset_keys])
                return_clause = f"UNSET(d, {tmp_clause})"
        else:
            if unset_keys is None:
                tmp_clause = ", ".join([f'"{item}"' for item in return_keys])
                return_clause = f"KEEP(d, {tmp_clause})"
            else:
                raise ValueError("both return_keys and unset_keys are set")

        if limit is not None and isinstance(limit, int):
            limit_clause = f"LIMIT {limit}"
        else:
            limit_clause = ""

        q = (
            f"FOR d in {class_name}"
            f"  {filter_clause}"
            f"  {limit_clause}"
            f"  RETURN {return_clause}"
        )
        cursor = self.execute(q)
        return get_data_from_cursor(cursor)

    def aggregate(
        self,
        class_name,
        aggregation_function: AggregationType,
        discriminant: str | None = None,
        aggregated_field: str | None = None,
        filters: None | Clause | list | dict = None,
    ):
        """Perform aggregation on a collection.

        Args:
            class_name: Collection to aggregate
            aggregation_function: Type of aggregation to perform
            discriminant: Field to group by
            aggregated_field: Field to aggregate
            filters: Query filters

        Returns:
            list: Aggregation results
        """
        filter_clause = render_filters(filters, doc_name="doc")

        if (
            aggregated_field is not None
            and aggregation_function != AggregationType.COUNT
        ):
            group_unit = f"g[*].doc.{aggregated_field}"
        else:
            group_unit = "g"

        if discriminant is not None:
            collect_clause = f"COLLECT value = doc['{discriminant}'] INTO g"
            return_clause = f"""{{ '{discriminant}' : value, '_value': {aggregation_function}({group_unit})}}"""
        else:
            if (
                aggregated_field is None
                and aggregation_function == AggregationType.COUNT
            ):
                collect_clause = (
                    f"COLLECT AGGREGATE value =  {aggregation_function} (doc)"
                )
            else:
                collect_clause = (
                    "COLLECT AGGREGATE value ="
                    f" {aggregation_function}(doc['{aggregated_field}'])"
                )
            return_clause = """{ '_value' : value }"""

        q = f"""FOR doc IN {class_name} 
                    {filter_clause}
                    {collect_clause}
                    RETURN {return_clause}"""

        cursor = self.execute(q)
        data = get_data_from_cursor(cursor)
        return data

    def keep_absent_documents(
        self,
        batch,
        class_name,
        match_keys,
        keep_keys,
        filters: None | Clause | list | dict = None,
    ):
        """Keep documents that don't exist in the database.

        Args:
            batch: Batch of documents to check
            class_name: Collection to check in
            match_keys: Keys to match documents
            keep_keys: Keys to keep in result
            filters: Additional query filters

        Returns:
            list: Documents that don't exist in the database
        """
        present_docs_keys = self.fetch_present_documents(
            batch=batch,
            class_name=class_name,
            match_keys=match_keys,
            keep_keys=keep_keys,
            flatten=False,
            filters=filters,
        )

        assert isinstance(present_docs_keys, dict)

        if any([len(v) > 1 for v in present_docs_keys.values()]):
            logger.warning(
                "fetch_present_documents returned multiple docs per filtering condition"
            )

        absent_indices = sorted(set(range(len(batch))) - set(present_docs_keys.keys()))
        batch_absent = [batch[j] for j in absent_indices]
        return batch_absent

    def update_to_numeric(self, collection_name, field):
        """Update a field to numeric type in all documents.

        Args:
            collection_name: Collection to update
            field: Field to convert to numeric

        Returns:
            str: AQL query string for the operation
        """
        s1 = f"FOR p IN {collection_name} FILTER p.{field} update p with {{"
        s2 = f"{field}: TO_NUMBER(p.{field}) "
        s3 = f"}} in {collection_name}"
        q0 = s1 + s2 + s3
        return q0

__init__(config)

Initialize ArangoDB connection.

Parameters:

Name Type Description Default
config ArangoConnectionConfig

ArangoDB connection configuration containing URL, credentials, and database name

required
Source code in graphcast/db/arango/conn.py
def __init__(self, config: ArangoConnectionConfig):
    """Initialize ArangoDB connection.

    Args:
        config: ArangoDB connection configuration containing URL, credentials,
            and database name
    """
    super().__init__()
    client = ArangoClient(hosts=config.url, request_timeout=config.request_timeout)

    self.conn = client.db(
        config.database,
        username=config.cred_name,
        password=config.cred_pass,
    )

aggregate(class_name, aggregation_function, discriminant=None, aggregated_field=None, filters=None)

Perform aggregation on a collection.

Parameters:

Name Type Description Default
class_name

Collection to aggregate

required
aggregation_function AggregationType

Type of aggregation to perform

required
discriminant str | None

Field to group by

None
aggregated_field str | None

Field to aggregate

None
filters None | Clause | list | dict

Query filters

None

Returns:

Name Type Description
list

Aggregation results

Source code in graphcast/db/arango/conn.py
def aggregate(
    self,
    class_name,
    aggregation_function: AggregationType,
    discriminant: str | None = None,
    aggregated_field: str | None = None,
    filters: None | Clause | list | dict = None,
):
    """Perform aggregation on a collection.

    Args:
        class_name: Collection to aggregate
        aggregation_function: Type of aggregation to perform
        discriminant: Field to group by
        aggregated_field: Field to aggregate
        filters: Query filters

    Returns:
        list: Aggregation results
    """
    filter_clause = render_filters(filters, doc_name="doc")

    if (
        aggregated_field is not None
        and aggregation_function != AggregationType.COUNT
    ):
        group_unit = f"g[*].doc.{aggregated_field}"
    else:
        group_unit = "g"

    if discriminant is not None:
        collect_clause = f"COLLECT value = doc['{discriminant}'] INTO g"
        return_clause = f"""{{ '{discriminant}' : value, '_value': {aggregation_function}({group_unit})}}"""
    else:
        if (
            aggregated_field is None
            and aggregation_function == AggregationType.COUNT
        ):
            collect_clause = (
                f"COLLECT AGGREGATE value =  {aggregation_function} (doc)"
            )
        else:
            collect_clause = (
                "COLLECT AGGREGATE value ="
                f" {aggregation_function}(doc['{aggregated_field}'])"
            )
        return_clause = """{ '_value' : value }"""

    q = f"""FOR doc IN {class_name} 
                {filter_clause}
                {collect_clause}
                RETURN {return_clause}"""

    cursor = self.execute(q)
    data = get_data_from_cursor(cursor)
    return data

close()

Close the ArangoDB connection.

Source code in graphcast/db/arango/conn.py
def close(self):
    """Close the ArangoDB connection."""
    # self.conn.close()
    pass

create_collection(db_class_name, index=None, g=None)

Create a new ArangoDB collection.

Parameters:

Name Type Description Default
db_class_name

Name of the collection to create

required
index None | Index

Optional index to create on the collection

None
g

Optional graph to create the collection in

None

Returns:

Name Type Description
IndexHandle

Handle to the created index if one was created

Source code in graphcast/db/arango/conn.py
def create_collection(self, db_class_name, index: None | Index = None, g=None):
    """Create a new ArangoDB collection.

    Args:
        db_class_name: Name of the collection to create
        index: Optional index to create on the collection
        g: Optional graph to create the collection in

    Returns:
        IndexHandle: Handle to the created index if one was created
    """
    if not self.conn.has_collection(db_class_name):
        if g is not None:
            _ = g.create_vertex_collection(db_class_name)
        else:
            self.conn.create_collection(db_class_name)
        general_collection = self.conn.collection(db_class_name)
        if index is not None and index.fields != ["_key"]:
            ih = self._add_index(general_collection, index)
            return ih
        else:
            return None

create_database(name)

Create a new ArangoDB database.

Parameters:

Name Type Description Default
name str

Name of the database to create

required
Source code in graphcast/db/arango/conn.py
def create_database(self, name: str):
    """Create a new ArangoDB database.

    Args:
        name: Name of the database to create
    """
    if not self.conn.has_database(name):
        self.conn.create_database(name)

define_collections(schema)

Define ArangoDB collections based on schema.

Parameters:

Name Type Description Default
schema Schema

Schema containing collection definitions

required
Source code in graphcast/db/arango/conn.py
def define_collections(self, schema: Schema):
    """Define ArangoDB collections based on schema.

    Args:
        schema: Schema containing collection definitions
    """
    self.define_vertex_collections(schema)
    self.define_edge_collections(schema.edge_config.edges_list(include_aux=True))

define_edge_collections(edges)

Define edge collections in ArangoDB.

Creates edge collections and their definitions in the appropriate graphs.

Parameters:

Name Type Description Default
edges list[Edge]

List of edge configurations to create

required
Source code in graphcast/db/arango/conn.py
def define_edge_collections(self, edges: list[Edge]):
    """Define edge collections in ArangoDB.

    Creates edge collections and their definitions in the appropriate graphs.

    Args:
        edges: List of edge configurations to create
    """
    for item in edges:
        gname = item.graph_name
        if self.conn.has_graph(gname):
            g = self.conn.graph(gname)
        else:
            g = self.conn.create_graph(gname)  # type: ignore
        if not g.has_edge_definition(item.collection_name):
            _ = g.create_edge_definition(
                edge_collection=item.collection_name,
                from_vertex_collections=[item.source_collection],
                to_vertex_collections=[item.target_collection],
            )

define_edge_indices(edges)

Define indices for edge collections.

Creates indices for each edge collection based on the configuration.

Parameters:

Name Type Description Default
edges list[Edge]

List of edge configurations containing index definitions

required
Source code in graphcast/db/arango/conn.py
def define_edge_indices(self, edges: list[Edge]):
    """Define indices for edge collections.

    Creates indices for each edge collection based on the configuration.

    Args:
        edges: List of edge configurations containing index definitions
    """
    for edge in edges:
        general_collection = self.conn.collection(edge.collection_name)
        for index_obj in edge.indexes:
            self._add_index(general_collection, index_obj)

define_vertex_collections(schema)

Define vertex collections in ArangoDB.

Creates vertex collections for both connected and disconnected vertices, organizing them into appropriate graphs.

Parameters:

Name Type Description Default
schema Schema

Schema containing vertex definitions

required
Source code in graphcast/db/arango/conn.py
def define_vertex_collections(self, schema: Schema):
    """Define vertex collections in ArangoDB.

    Creates vertex collections for both connected and disconnected vertices,
    organizing them into appropriate graphs.

    Args:
        schema: Schema containing vertex definitions
    """
    vertex_config = schema.vertex_config
    disconnected_vertex_collections = (
        set(vertex_config.vertex_set) - schema.edge_config.vertices
    )
    for item in schema.edge_config.edges_list():
        u, v = item.source, item.target
        gname = item.graph_name
        logger.info(f"{item.source}, {item.target}, {gname}")
        if self.conn.has_graph(gname):
            g = self.conn.graph(gname)
        else:
            g = self.conn.create_graph(gname)  # type: ignore

        _ = self.create_collection(
            vertex_config.vertex_dbname(u), vertex_config.index(u), g
        )

        _ = self.create_collection(
            vertex_config.vertex_dbname(v), vertex_config.index(v), g
        )
    for v in disconnected_vertex_collections:
        _ = self.create_collection(
            vertex_config.vertex_dbname(v), vertex_config.index(v), None
        )

define_vertex_indices(vertex_config)

Define indices for vertex collections.

Creates indices for each vertex collection based on the configuration.

Parameters:

Name Type Description Default
vertex_config VertexConfig

Vertex configuration containing index definitions

required
Source code in graphcast/db/arango/conn.py
def define_vertex_indices(self, vertex_config: VertexConfig):
    """Define indices for vertex collections.

    Creates indices for each vertex collection based on the configuration.

    Args:
        vertex_config: Vertex configuration containing index definitions
    """
    for c in vertex_config.vertex_set:
        general_collection = self.conn.collection(vertex_config.vertex_dbname(c))
        ixs = general_collection.indexes()
        field_combinations = [tuple(ix["fields"]) for ix in ixs]
        for index_obj in vertex_config.indexes(c):
            if tuple(index_obj.fields) not in field_combinations:
                self._add_index(general_collection, index_obj)

delete_collections(cnames=(), gnames=(), delete_all=False)

Delete collections and graphs from ArangoDB.

Parameters:

Name Type Description Default
cnames

Collection names to delete

()
gnames

Graph names to delete

()
delete_all

If True, delete all non-system collections and graphs

False
Source code in graphcast/db/arango/conn.py
def delete_collections(self, cnames=(), gnames=(), delete_all=False):
    """Delete collections and graphs from ArangoDB.

    Args:
        cnames: Collection names to delete
        gnames: Graph names to delete
        delete_all: If True, delete all non-system collections and graphs
    """
    logger.info("collections (non system):")
    logger.info([c for c in self.conn.collections() if c["name"][0] != "_"])

    if delete_all:
        cnames = [c["name"] for c in self.conn.collections() if c["name"][0] != "_"]
        gnames = [g["name"] for g in self.conn.graphs()]

    for gn in gnames:
        if self.conn.has_graph(gn):
            self.conn.delete_graph(gn)

    logger.info("graphs (after delete operation):")
    logger.info(self.conn.graphs())

    for cn in cnames:
        if self.conn.has_collection(cn):
            self.conn.delete_collection(cn)

    logger.info("collections (after delete operation):")
    logger.info([c for c in self.conn.collections() if c["name"][0] != "_"])

    logger.info("graphs:")
    logger.info(self.conn.graphs())

delete_database(name)

Delete an ArangoDB database.

Parameters:

Name Type Description Default
name str

Name of the database to delete

required
Source code in graphcast/db/arango/conn.py
def delete_database(self, name: str):
    """Delete an ArangoDB database.

    Args:
        name: Name of the database to delete
    """
    if not self.conn.has_database(name):
        self.conn.delete_database(name)

execute(query, **kwargs)

Execute an AQL query.

Parameters:

Name Type Description Default
query

AQL query string to execute

required
**kwargs

Additional query parameters

{}

Returns:

Name Type Description
Cursor

ArangoDB cursor for the query results

Source code in graphcast/db/arango/conn.py
def execute(self, query, **kwargs):
    """Execute an AQL query.

    Args:
        query: AQL query string to execute
        **kwargs: Additional query parameters

    Returns:
        Cursor: ArangoDB cursor for the query results
    """
    cursor = self.conn.aql.execute(query)
    return cursor

fetch_docs(class_name, filters=None, limit=None, return_keys=None, unset_keys=None)

Fetch documents from a collection.

Parameters:

Name Type Description Default
class_name

Collection to fetch from

required
filters None | Clause | list | dict

Query filters

None
limit int | None

Maximum number of documents to return

None
return_keys list | None

Keys to return

None
unset_keys list | None

Keys to unset

None

Returns:

Name Type Description
list

Fetched documents

Source code in graphcast/db/arango/conn.py
def fetch_docs(
    self,
    class_name,
    filters: None | Clause | list | dict = None,
    limit: int | None = None,
    return_keys: list | None = None,
    unset_keys: list | None = None,
):
    """Fetch documents from a collection.

    Args:
        class_name: Collection to fetch from
        filters: Query filters
        limit: Maximum number of documents to return
        return_keys: Keys to return
        unset_keys: Keys to unset

    Returns:
        list: Fetched documents
    """
    filter_clause = render_filters(filters, doc_name="d")

    if return_keys is None:
        if unset_keys is None:
            return_clause = "d"
        else:
            tmp_clause = ", ".join([f'"{item}"' for item in unset_keys])
            return_clause = f"UNSET(d, {tmp_clause})"
    else:
        if unset_keys is None:
            tmp_clause = ", ".join([f'"{item}"' for item in return_keys])
            return_clause = f"KEEP(d, {tmp_clause})"
        else:
            raise ValueError("both return_keys and unset_keys are set")

    if limit is not None and isinstance(limit, int):
        limit_clause = f"LIMIT {limit}"
    else:
        limit_clause = ""

    q = (
        f"FOR d in {class_name}"
        f"  {filter_clause}"
        f"  {limit_clause}"
        f"  RETURN {return_clause}"
    )
    cursor = self.execute(q)
    return get_data_from_cursor(cursor)

fetch_indexes(db_class_name=None)

Fetch all indices from the database.

Parameters:

Name Type Description Default
db_class_name Optional[str]

Optional collection name to fetch indices for

None

Returns:

Name Type Description
dict

Mapping of collection names to their indices

Source code in graphcast/db/arango/conn.py
def fetch_indexes(self, db_class_name: Optional[str] = None):
    """Fetch all indices from the database.

    Args:
        db_class_name: Optional collection name to fetch indices for

    Returns:
        dict: Mapping of collection names to their indices
    """
    if db_class_name is None:
        classes = self.conn.collections()
    elif self.conn.has_collection(db_class_name):
        classes = [self.conn.collection(db_class_name)]
    else:
        classes = []

    r = {}
    for cname in classes:
        assert isinstance(cname["name"], str)
        c = self.conn.collection(cname["name"])
        r[cname["name"]] = c.indexes()
    return r

fetch_present_documents(batch, class_name, match_keys, keep_keys, flatten=False, filters=None)

Fetch documents that exist in the database.

Parameters:

Name Type Description Default
batch

Batch of documents to check

required
class_name

Collection to check in

required
match_keys

Keys to match documents

required
keep_keys

Keys to keep in result

required
flatten

If True, flatten the result into a list

False
filters None | Clause | list | dict

Additional query filters

None

Returns:

Type Description
list | dict

Union[list, dict]: Documents that exist in the database, either as a flat list or a dictionary mapping batch indices to documents

Source code in graphcast/db/arango/conn.py
def fetch_present_documents(
    self,
    batch,
    class_name,
    match_keys,
    keep_keys,
    flatten=False,
    filters: None | Clause | list | dict = None,
) -> list | dict:
    """Fetch documents that exist in the database.

    Args:
        batch: Batch of documents to check
        class_name: Collection to check in
        match_keys: Keys to match documents
        keep_keys: Keys to keep in result
        flatten: If True, flatten the result into a list
        filters: Additional query filters

    Returns:
        Union[list, dict]: Documents that exist in the database, either as a
            flat list or a dictionary mapping batch indices to documents
    """
    q0 = fetch_fields_query(
        collection_name=class_name,
        docs=batch,
        match_keys=match_keys,
        keep_keys=keep_keys,
        filters=filters,
    )
    # {"__i": i, "_group": [doc]}
    cursor = self.execute(q0)

    if flatten:
        rdata = []
        for item in get_data_from_cursor(cursor):
            group = item.pop("_group", [])
            rdata += [sub_item for sub_item in group]
        return rdata
    else:
        rdata_dict = {}
        for item in get_data_from_cursor(cursor):
            __i = item.pop("__i")
            group = item.pop("_group")
            rdata_dict[__i] = group
        return rdata_dict

get_collections()

Get all collections in the database.

Returns:

Name Type Description
list

List of collection information dictionaries

Source code in graphcast/db/arango/conn.py
def get_collections(self):
    """Get all collections in the database.

    Returns:
        list: List of collection information dictionaries
    """
    return self.conn.collections()

init_db(schema, clean_start)

Initialize ArangoDB with the given schema.

Parameters:

Name Type Description Default
schema Schema

Schema containing graph structure definitions

required
clean_start

If True, delete all existing collections before initialization

required
Source code in graphcast/db/arango/conn.py
def init_db(self, schema: Schema, clean_start):
    """Initialize ArangoDB with the given schema.

    Args:
        schema: Schema containing graph structure definitions
        clean_start: If True, delete all existing collections before initialization
    """
    if clean_start:
        self.delete_collections([], [], delete_all=True)
    self.define_collections(schema)
    self.define_indexes(schema)

insert_edges_batch(docs_edges, source_class, target_class, relation_name=None, collection_name=None, match_keys_source=('_key',), match_keys_target=('_key',), filter_uniques=True, uniq_weight_fields=None, uniq_weight_collections=None, upsert_option=False, head=None, **kwargs)

Insert a batch of edges using AQL.

Creates edges between source and target vertices, with support for weight fields and unique constraints.

Parameters:

Name Type Description Default
docs_edges

List of edge documents in format [{_source_aux: source_doc, _target_aux: target_doc}]

required
source_class

Source vertex collection name

required
target_class

Target vertex collection name

required
relation_name

Optional relation name for the edges

None
collection_name

Edge collection name

None
match_keys_source

Keys to match source vertices

('_key',)
match_keys_target

Keys to match target vertices

('_key',)
filter_uniques

If True, filter duplicate edges

True
uniq_weight_fields

Fields to consider for uniqueness

None
uniq_weight_collections

Collections to consider for uniqueness

None
upsert_option

If True, use upsert instead of insert

False
head

Optional limit on number of edges to insert

None
**kwargs

Additional options: - dry: If True, don't execute the query

{}
Source code in graphcast/db/arango/conn.py
def insert_edges_batch(
    self,
    docs_edges,
    source_class,
    target_class,
    relation_name=None,
    collection_name=None,
    match_keys_source=("_key",),
    match_keys_target=("_key",),
    filter_uniques=True,
    uniq_weight_fields=None,
    uniq_weight_collections=None,
    upsert_option=False,
    head=None,
    **kwargs,
):
    """Insert a batch of edges using AQL.

    Creates edges between source and target vertices, with support for
    weight fields and unique constraints.

    Args:
        docs_edges: List of edge documents in format [{_source_aux: source_doc, _target_aux: target_doc}]
        source_class: Source vertex collection name
        target_class: Target vertex collection name
        relation_name: Optional relation name for the edges
        collection_name: Edge collection name
        match_keys_source: Keys to match source vertices
        match_keys_target: Keys to match target vertices
        filter_uniques: If True, filter duplicate edges
        uniq_weight_fields: Fields to consider for uniqueness
        uniq_weight_collections: Collections to consider for uniqueness
        upsert_option: If True, use upsert instead of insert
        head: Optional limit on number of edges to insert
        **kwargs: Additional options:
            - dry: If True, don't execute the query
    """
    dry = kwargs.pop("dry", False)

    if isinstance(docs_edges, list):
        if docs_edges:
            logger.debug(f" docs_edges[0] = {docs_edges[0]}")
        if head is not None:
            docs_edges = docs_edges[:head]
        if filter_uniques:
            docs_edges = pick_unique_dict(docs_edges)
        docs_edges_str = json.dumps(docs_edges)
    else:
        return ""

    if match_keys_source[0] == "_key":
        result_from = f'CONCAT("{source_class}/", edge.{SOURCE_AUX}._key)'
        source_filter = ""
    else:
        result_from = "sources[0]._id"
        filter_source = " && ".join(
            [f"v.{k} == edge.{SOURCE_AUX}.{k}" for k in match_keys_source]
        )
        source_filter = (
            f"LET sources = (FOR v IN {source_class} FILTER"
            f" {filter_source} LIMIT 1 RETURN v)"
        )

    if match_keys_target[0] == "_key":
        result_to = f'CONCAT("{target_class}/", edge.{TARGET_AUX}._key)'
        target_filter = ""
    else:
        result_to = "targets[0]._id"
        filter_target = " && ".join(
            [f"v.{k} == edge.{TARGET_AUX}.{k}" for k in match_keys_target]
        )
        target_filter = (
            f"LET targets = (FOR v IN {target_class} FILTER"
            f" {filter_target} LIMIT 1 RETURN v)"
        )

    doc_definition = (
        f"MERGE({{_from : {result_from}, _to : {result_to}}},"
        f" UNSET(edge, '{SOURCE_AUX}', '{TARGET_AUX}'))"
    )

    logger.debug(f" source_filter = {source_filter}")
    logger.debug(f" target_filter = {target_filter}")
    logger.debug(f" doc = {doc_definition}")

    if upsert_option:
        ups_from = result_from if source_filter else "doc._from"
        ups_to = result_to if target_filter else "doc._to"

        weight_fs = []
        if uniq_weight_fields is not None:
            weight_fs += uniq_weight_fields
        if uniq_weight_collections is not None:
            weight_fs += uniq_weight_collections
        if relation_name is not None:
            weight_fs += ["relation"]

        if weight_fs:
            weights_clause = ", " + ", ".join(
                [f"'{x}' : edge.{x}" for x in weight_fs]
            )
        else:
            weights_clause = ""

        upsert = f"{{'_from': {ups_from}, '_to': {ups_to}" + weights_clause + "}"
        logger.debug(f" upsert clause: {upsert}")
        clauses = f"UPSERT {upsert} INSERT doc UPDATE {{}}"
        options = "OPTIONS {exclusive: true}"
    else:
        if relation_name is None:
            doc_clause = "doc"
        else:
            doc_clause = f"MERGE(doc, {{'relation': '{relation_name}' }})"
        clauses = f"INSERT {doc_clause}"
        options = "OPTIONS {exclusive: true, ignoreErrors: true}"

    q_update = f"""
        FOR edge in {docs_edges_str} {source_filter} {target_filter}
            LET doc = {doc_definition}
            {clauses}
            in {collection_name} {options}"""
    if not dry:
        self.execute(q_update)

insert_return_batch(docs, class_name)

Insert documents and return their keys.

Parameters:

Name Type Description Default
docs

Documents to insert

required
class_name

Collection to insert into

required

Returns:

Name Type Description
str

AQL query string for the operation

Source code in graphcast/db/arango/conn.py
def insert_return_batch(self, docs, class_name):
    """Insert documents and return their keys.

    Args:
        docs: Documents to insert
        class_name: Collection to insert into

    Returns:
        str: AQL query string for the operation
    """
    docs = json.dumps(docs)
    query0 = f"""FOR doc in {docs}
          INSERT doc
          INTO {class_name}
          LET inserted = NEW
          RETURN {{_key: inserted._key}}
    """
    return query0

keep_absent_documents(batch, class_name, match_keys, keep_keys, filters=None)

Keep documents that don't exist in the database.

Parameters:

Name Type Description Default
batch

Batch of documents to check

required
class_name

Collection to check in

required
match_keys

Keys to match documents

required
keep_keys

Keys to keep in result

required
filters None | Clause | list | dict

Additional query filters

None

Returns:

Name Type Description
list

Documents that don't exist in the database

Source code in graphcast/db/arango/conn.py
def keep_absent_documents(
    self,
    batch,
    class_name,
    match_keys,
    keep_keys,
    filters: None | Clause | list | dict = None,
):
    """Keep documents that don't exist in the database.

    Args:
        batch: Batch of documents to check
        class_name: Collection to check in
        match_keys: Keys to match documents
        keep_keys: Keys to keep in result
        filters: Additional query filters

    Returns:
        list: Documents that don't exist in the database
    """
    present_docs_keys = self.fetch_present_documents(
        batch=batch,
        class_name=class_name,
        match_keys=match_keys,
        keep_keys=keep_keys,
        flatten=False,
        filters=filters,
    )

    assert isinstance(present_docs_keys, dict)

    if any([len(v) > 1 for v in present_docs_keys.values()]):
        logger.warning(
            "fetch_present_documents returned multiple docs per filtering condition"
        )

    absent_indices = sorted(set(range(len(batch))) - set(present_docs_keys.keys()))
    batch_absent = [batch[j] for j in absent_indices]
    return batch_absent

update_to_numeric(collection_name, field)

Update a field to numeric type in all documents.

Parameters:

Name Type Description Default
collection_name

Collection to update

required
field

Field to convert to numeric

required

Returns:

Name Type Description
str

AQL query string for the operation

Source code in graphcast/db/arango/conn.py
def update_to_numeric(self, collection_name, field):
    """Update a field to numeric type in all documents.

    Args:
        collection_name: Collection to update
        field: Field to convert to numeric

    Returns:
        str: AQL query string for the operation
    """
    s1 = f"FOR p IN {collection_name} FILTER p.{field} update p with {{"
    s2 = f"{field}: TO_NUMBER(p.{field}) "
    s3 = f"}} in {collection_name}"
    q0 = s1 + s2 + s3
    return q0

upsert_docs_batch(docs, class_name, match_keys=None, **kwargs)

Upsert a batch of documents using AQL.

Performs an upsert operation on a batch of documents, using the specified match keys to determine whether to update existing documents or insert new ones.

Parameters:

Name Type Description Default
docs

List of documents to upsert

required
class_name

Collection name to upsert into

required
match_keys list[str] | None

Keys to match for upsert operation

None
**kwargs

Additional options: - dry: If True, don't execute the query - update_keys: Keys to update on match - filter_uniques: If True, filter duplicate documents

{}
Source code in graphcast/db/arango/conn.py
def upsert_docs_batch(
    self,
    docs,
    class_name,
    match_keys: list[str] | None = None,
    **kwargs,
):
    """Upsert a batch of documents using AQL.

    Performs an upsert operation on a batch of documents, using the specified
    match keys to determine whether to update existing documents or insert new ones.

    Args:
        docs: List of documents to upsert
        class_name: Collection name to upsert into
        match_keys: Keys to match for upsert operation
        **kwargs: Additional options:
            - dry: If True, don't execute the query
            - update_keys: Keys to update on match
            - filter_uniques: If True, filter duplicate documents
    """
    dry = kwargs.pop("dry", False)
    update_keys = kwargs.pop("update_keys", None)
    filter_uniques = kwargs.pop("filter_uniques", True)

    if isinstance(docs, list):
        if filter_uniques:
            docs = pick_unique_dict(docs)
        docs = json.dumps(docs)
    if match_keys is None:
        upsert_clause = ""
        update_clause = ""
    else:
        upsert_clause = ", ".join([f'"{k}": doc.{k}' for k in match_keys])
        upsert_clause = f"UPSERT {{{upsert_clause}}}"

        if isinstance(update_keys, list):
            update_clause = ", ".join([f'"{k}": doc.{k}' for k in update_keys])
            update_clause = f"{{{update_clause}}}"
        elif update_keys == "doc":
            update_clause = "doc"
        else:
            update_clause = "{}"
        update_clause = f"UPDATE {update_clause}"

    options = "OPTIONS {exclusive: true, ignoreErrors: true}"

    q_update = f"""FOR doc in {docs}
                        {upsert_clause}
                        INSERT doc
                        {update_clause} 
                            IN {class_name} {options}"""
    if not dry:
        self.execute(q_update)