Skip to content

graflo.db.conn

Abstract database connection interface for graph databases.

This module defines the abstract interface for database connections, providing a unified API for different graph database implementations. It includes methods for database management, graph structure operations, and data manipulation.

Key Components:

- Connection: Abstract base class for database connections
- ConnectionType: Type variable for connection implementations

The connection interface supports:

- Database/Graph creation and deletion
- Graph structure management (vertex types, edge types)
- Index definition
- Document operations (insert, update, fetch)
- Edge operations
- Aggregation queries
Database Organization Terminology

Different databases organize graph data differently:

  • ArangoDB:

    • Database: Top-level container (like a schema)
    • Collections (ArangoDB-specific): Container for vertices (vertex collections)
    • Edge Collections (ArangoDB-specific): Container for edges
    • Graph: Named graph that connects vertex and edge collections
  • Neo4j:

    • Database: Top-level container
    • Labels: Categories for nodes (equivalent to vertex types)
    • Relationship Types: Types of relationships (equivalent to edge types)
    • No explicit "graph" concept - all nodes/relationships are in the database
  • TigerGraph:

    • Graph: Top-level container (functions like a database in ArangoDB)
    • Vertex Types: Global vertex type definitions (can be shared across graphs)
    • Edge Types: Global edge type definitions (can be shared across graphs)
    • Vertex and edge types are associated with graphs

When using the Connection interface, the terms "vertex type" and "edge type" are used generically to refer to the appropriate concept in each database.

Example

class MyConnection(Connection): ... def create_database(self, name: str): ... # Implementation ... def execute(self, query, **kwargs): ... # Implementation

Connection

Bases: ABC

Abstract base class for database connections.

This class defines the interface that all database connection implementations must follow. It provides methods for database/graph operations, graph structure management (vertex types, edge types), and data manipulation.

Note

All methods marked with @abc.abstractmethod must be implemented by concrete connection classes. Subclasses must set the class attribute flavor to their DBType.

Source code in graflo/db/conn.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
class Connection(abc.ABC):
    """Abstract base class for database connections.

    This class defines the interface that all database connection implementations
    must follow. It provides methods for database/graph operations, graph structure
    management (vertex types, edge types), and data manipulation.

    Note:
        All methods marked with @abc.abstractmethod must be implemented by
        concrete connection classes. Subclasses must set the class attribute
        `flavor` to their DBType.
    """

    flavor: ClassVar[DBType] = DBType.ARANGO  # Overridden by subclasses

    def __init__(self):
        """Initialize the connection."""
        pass

    @classmethod
    def expression_flavor(cls) -> ExpressionFlavor:
        """Expression flavor for filter rendering (AQL, CYPHER, GSQL).

        Graph connection subclasses must set class attribute `flavor` to a
        DBType present in DB_TYPE_TO_EXPRESSION_FLAVOR.
        """
        return DB_TYPE_TO_EXPRESSION_FLAVOR[cls.flavor]

    @abc.abstractmethod
    def create_database(self, name: str):
        """Create a new database.

        Args:
            name: Name of the database to create
        """
        pass

    @abc.abstractmethod
    def delete_database(self, name: str):
        """Delete a database.

        Args:
            name: Name of the database to delete
        """
        pass

    @abc.abstractmethod
    def execute(self, query: str | Any, **kwargs: Any) -> Any:
        """Execute a database query.

        Args:
            query: Query to execute
            **kwargs: Additional query parameters

        Returns:
            Query result (database-specific)
        """
        pass

    @abc.abstractmethod
    def close(self):
        """Close the database connection."""
        pass

    def define_indexes(self, schema: Schema):
        """Define indexes for vertices and edges in the schema.

        Args:
            schema: Schema containing vertex and edge configurations
        """
        self.define_vertex_indexes(schema.core_schema.vertex_config, schema=schema)
        self.define_edge_indexes(
            list(schema.core_schema.edge_config.values()), schema=schema
        )

    @abc.abstractmethod
    def define_schema(self, schema: Schema):
        """Define vertex and edge classes based on the schema.

        Args:
            schema: Schema containing vertex and edge class definitions
        """
        pass

    @abc.abstractmethod
    def delete_graph_structure(
        self,
        vertex_types: tuple[str, ...] | list[str] = (),
        graph_names: tuple[str, ...] | list[str] = (),
        delete_all: bool = False,
    ) -> None:
        """Delete graph structure (graphs, vertex types, edge types) from the database.

        This method deletes graphs and their associated vertex/edge types.
        The exact behavior depends on the database implementation:

        - ArangoDB: Deletes graphs and collections (vertex/edge collections)
        - Neo4j: Deletes nodes from labels (vertex types) and relationships
        - TigerGraph: Deletes graphs, vertex types, edge types, and jobs

        Args:
            vertex_types: Vertex type names to delete (database-specific interpretation)
            graph_names: Graph/database names to delete
            delete_all: If True, delete all graphs and their associated structures
        """
        pass

    @abc.abstractmethod
    def init_db(self, schema: Schema, recreate_schema: bool) -> None:
        """Initialize the database with the given schema.

        If the schema/graph already exists and recreate_schema is False, raises
        SchemaExistsError and the script halts.

        Args:
            schema: Schema to initialize the database with
            recreate_schema: If True, drop existing schema and define new one.
                If False and schema/graph already exists, raises SchemaExistsError.
        """
        pass

    @abc.abstractmethod
    def clear_data(self, schema: Schema) -> None:
        """Remove all data from the graph without dropping or changing the schema.

        Args:
            schema: Schema describing the graph (used to identify collections/labels).
        """
        pass

    @abc.abstractmethod
    def upsert_docs_batch(
        self,
        docs: list[dict[str, Any]],
        class_name: str,
        match_keys: list[str] | tuple[str, ...],
        **kwargs: Any,
    ) -> None:
        """Upsert a batch of documents.

        Args:
            docs: Documents to upsert
            class_name: Name of the vertex type (or collection/label in database-specific terms)
            match_keys: Keys to match for upsert
            **kwargs: Additional upsert parameters
        """
        pass

    @abc.abstractmethod
    def insert_edges_batch(
        self,
        docs_edges: list[list[dict[str, Any]]] | list[Any] | None,
        source_class: str,
        target_class: str,
        relation_name: str,
        match_keys_source: tuple[str, ...],
        match_keys_target: tuple[str, ...],
        filter_uniques: bool = True,
        head: int | None = None,
        **kwargs: Any,
    ) -> None:
        """Insert a batch of edges.

        Args:
            docs_edges: Edge documents to insert
            source_class: Source vertex type/class
            target_class: Target vertex type/class
            relation_name: Name of the edge type/relation
            match_keys_source: Keys to match source vertices
            match_keys_target: Keys to match target vertices
            filter_uniques: Whether to filter unique edges
            head: Optional limit on number of edges to insert
            **kwargs: Additional insertion parameters (see also
                :func:`consume_insert_edges_kwargs`):
                - dry: If True, do not execute writes (supported where implemented)
                - collection_name: Edge collection (ArangoDB) or unused type-specific name
                - uniq_weight_fields: Uniqueness fields (ArangoDB upsert)
                - uniq_weight_collections: Uniqueness collections (ArangoDB upsert)
                - upsert_option: Use upsert instead of insert (ArangoDB)
                - relationship_merge_properties: Property names for Cypher MERGE
                  (Neo4j, FalkorDB, Memgraph) so parallel edges differ by weights
        """
        pass

    @abc.abstractmethod
    def insert_return_batch(
        self, docs: list[dict[str, Any]], class_name: str
    ) -> list[dict[str, Any]] | str:
        """Insert documents and return the inserted documents.

        Args:
            docs: Documents to insert
            class_name: Name of the vertex type (or collection/label in database-specific terms)

        Returns:
            list | str: Inserted documents, or a query string (database-specific behavior).
                Most implementations return a list of inserted documents. ArangoDB returns
                an AQL query string for deferred execution.
        """
        pass

    @abc.abstractmethod
    def fetch_docs(
        self,
        class_name: str,
        filters: list[Any] | dict[str, Any] | None = None,
        limit: int | None = None,
        return_keys: list[str] | None = None,
        unset_keys: list[str] | None = None,
        **kwargs: Any,
    ) -> list[dict[str, Any]]:
        """Fetch documents from a vertex type.

        Args:
            class_name: Name of the vertex type (or collection/label in database-specific terms)
            filters: Query filters
            limit: Maximum number of documents to return
            return_keys: Keys to return
            unset_keys: Keys to unset
            **kwargs: Additional database-specific parameters (e.g., field_types for TigerGraph)

        Returns:
            list: Fetched documents
        """
        pass

    @abc.abstractmethod
    def fetch_edges(
        self,
        from_type: str,
        from_id: str,
        edge_type: str | None = None,
        to_type: str | None = None,
        to_id: str | None = None,
        filters: list[Any] | dict[str, Any] | None = None,
        limit: int | None = None,
        return_keys: list[str] | None = None,
        unset_keys: list[str] | None = None,
        **kwargs: Any,
    ) -> list[dict[str, Any]]:
        """Fetch edges from the database.

        Args:
            from_type: Source vertex type
            from_id: Source vertex ID (required)
            edge_type: Optional edge type to filter by
            to_type: Optional target vertex type to filter by
            to_id: Optional target vertex ID to filter by
            filters: Additional query filters
            limit: Maximum number of edges to return
            return_keys: Keys to return (projection)
            unset_keys: Keys to exclude (projection)
            **kwargs: Additional database-specific parameters

        Returns:
            list: List of fetched edges
        """
        pass

    @abc.abstractmethod
    def fetch_present_documents(
        self,
        batch: list[dict[str, Any]],
        class_name: str,
        match_keys: list[str] | tuple[str, ...],
        keep_keys: list[str] | tuple[str, ...] | None = None,
        flatten: bool = False,
        filters: list[Any] | dict[str, Any] | None = None,
    ) -> list[dict[str, Any]] | dict[int, list[dict[str, Any]]]:
        """Fetch documents that exist in the database.

        Args:
            batch: Batch of documents to check
            class_name: Name of the collection
            match_keys: Keys to match
            keep_keys: Keys to keep in result
            flatten: Whether to flatten the result. If True, returns a flat list.
                If False, returns a dict mapping batch indices to matching documents.
            filters: Additional query filters

        Returns:
            list | dict: Documents that exist in the database. Returns a list if
                flatten=True, otherwise returns a dict mapping batch indices to documents.
        """
        pass

    @abc.abstractmethod
    def aggregate(
        self,
        class_name: str,
        aggregation_function: AggregationType,
        discriminant: str | None = None,
        aggregated_field: str | None = None,
        filters: list[Any] | dict[str, Any] | None = None,
    ) -> int | float | list[dict[str, Any]] | dict[str, int | float] | None:
        """Perform aggregation on a collection.

        Args:
            class_name: Name of the collection
            aggregation_function: Type of aggregation to perform
            discriminant: Field to group by
            aggregated_field: Field to aggregate
            filters: Query filters

        Returns:
            Aggregation results (type depends on aggregation function)
        """
        pass

    @abc.abstractmethod
    def keep_absent_documents(
        self,
        batch: list[dict[str, Any]],
        class_name: str,
        match_keys: list[str] | tuple[str, ...],
        keep_keys: list[str] | tuple[str, ...] | None = None,
        filters: list[Any] | dict[str, Any] | None = None,
    ) -> list[dict[str, Any]]:
        """Keep documents that don't exist in the database.

        Args:
            batch: Batch of documents to check
            class_name: Name of the collection
            match_keys: Keys to match
            keep_keys: Keys to keep in result
            filters: Additional query filters

        Returns:
            list: Documents that don't exist in the database
        """
        pass

    @abc.abstractmethod
    def define_vertex_indexes(
        self, vertex_config: VertexConfig, schema: Schema | None = None
    ):
        """Define indexes for vertex classes.

        Args:
            vertex_config: Vertex configuration containing index definitions
        """
        pass

    @abc.abstractmethod
    def define_edge_indexes(self, edges: list[Edge], schema: Schema | None = None):
        """Define indexes for edge classes.

        Args:
            edges: List of edge configurations containing index definitions
        """
        pass

    def define_vertex_classes(self, schema: Schema) -> None:
        """Define vertex classes based on schema.

        This method is called from define_schema() to create vertex types/collections.
        Most implementations take a Schema. Some implementations (like TigerGraph)
        may override with a more specific signature (VertexConfig).

        Default implementation is a no-op. Override in subclasses as needed.

        Args:
            schema: Schema containing vertex definitions
        """
        pass

    def define_edge_classes(self, edges: list[Edge]) -> None:
        """Define edge classes based on edge configurations.

        This method is called from define_schema() to create edge types/collections.

        Default implementation is a no-op. Override in subclasses as needed.

        Args:
            edges: List of edge configurations to create
        """
        pass

__init__()

Initialize the connection.

Source code in graflo/db/conn.py
def __init__(self):
    """Initialize the connection."""
    pass

aggregate(class_name, aggregation_function, discriminant=None, aggregated_field=None, filters=None) abstractmethod

Perform aggregation on a collection.

Parameters:

Name Type Description Default
class_name str

Name of the collection

required
aggregation_function AggregationType

Type of aggregation to perform

required
discriminant str | None

Field to group by

None
aggregated_field str | None

Field to aggregate

None
filters list[Any] | dict[str, Any] | None

Query filters

None

Returns:

Type Description
int | float | list[dict[str, Any]] | dict[str, int | float] | None

Aggregation results (type depends on aggregation function)

Source code in graflo/db/conn.py
@abc.abstractmethod
def aggregate(
    self,
    class_name: str,
    aggregation_function: AggregationType,
    discriminant: str | None = None,
    aggregated_field: str | None = None,
    filters: list[Any] | dict[str, Any] | None = None,
) -> int | float | list[dict[str, Any]] | dict[str, int | float] | None:
    """Perform aggregation on a collection.

    Args:
        class_name: Name of the collection
        aggregation_function: Type of aggregation to perform
        discriminant: Field to group by
        aggregated_field: Field to aggregate
        filters: Query filters

    Returns:
        Aggregation results (type depends on aggregation function)
    """
    pass

clear_data(schema) abstractmethod

Remove all data from the graph without dropping or changing the schema.

Parameters:

Name Type Description Default
schema Schema

Schema describing the graph (used to identify collections/labels).

required
Source code in graflo/db/conn.py
@abc.abstractmethod
def clear_data(self, schema: Schema) -> None:
    """Remove all data from the graph without dropping or changing the schema.

    Args:
        schema: Schema describing the graph (used to identify collections/labels).
    """
    pass

close() abstractmethod

Close the database connection.

Source code in graflo/db/conn.py
@abc.abstractmethod
def close(self):
    """Close the database connection."""
    pass

create_database(name) abstractmethod

Create a new database.

Parameters:

Name Type Description Default
name str

Name of the database to create

required
Source code in graflo/db/conn.py
@abc.abstractmethod
def create_database(self, name: str):
    """Create a new database.

    Args:
        name: Name of the database to create
    """
    pass

define_edge_classes(edges)

Define edge classes based on edge configurations.

This method is called from define_schema() to create edge types/collections.

Default implementation is a no-op. Override in subclasses as needed.

Parameters:

Name Type Description Default
edges list[Edge]

List of edge configurations to create

required
Source code in graflo/db/conn.py
def define_edge_classes(self, edges: list[Edge]) -> None:
    """Define edge classes based on edge configurations.

    This method is called from define_schema() to create edge types/collections.

    Default implementation is a no-op. Override in subclasses as needed.

    Args:
        edges: List of edge configurations to create
    """
    pass

define_edge_indexes(edges, schema=None) abstractmethod

Define indexes for edge classes.

Parameters:

Name Type Description Default
edges list[Edge]

List of edge configurations containing index definitions

required
Source code in graflo/db/conn.py
@abc.abstractmethod
def define_edge_indexes(self, edges: list[Edge], schema: Schema | None = None):
    """Define indexes for edge classes.

    Args:
        edges: List of edge configurations containing index definitions
    """
    pass

define_indexes(schema)

Define indexes for vertices and edges in the schema.

Parameters:

Name Type Description Default
schema Schema

Schema containing vertex and edge configurations

required
Source code in graflo/db/conn.py
def define_indexes(self, schema: Schema):
    """Define indexes for vertices and edges in the schema.

    Args:
        schema: Schema containing vertex and edge configurations
    """
    self.define_vertex_indexes(schema.core_schema.vertex_config, schema=schema)
    self.define_edge_indexes(
        list(schema.core_schema.edge_config.values()), schema=schema
    )

define_schema(schema) abstractmethod

Define vertex and edge classes based on the schema.

Parameters:

Name Type Description Default
schema Schema

Schema containing vertex and edge class definitions

required
Source code in graflo/db/conn.py
@abc.abstractmethod
def define_schema(self, schema: Schema):
    """Define vertex and edge classes based on the schema.

    Args:
        schema: Schema containing vertex and edge class definitions
    """
    pass

define_vertex_classes(schema)

Define vertex classes based on schema.

This method is called from define_schema() to create vertex types/collections. Most implementations take a Schema. Some implementations (like TigerGraph) may override with a more specific signature (VertexConfig).

Default implementation is a no-op. Override in subclasses as needed.

Parameters:

Name Type Description Default
schema Schema

Schema containing vertex definitions

required
Source code in graflo/db/conn.py
def define_vertex_classes(self, schema: Schema) -> None:
    """Define vertex classes based on schema.

    This method is called from define_schema() to create vertex types/collections.
    Most implementations take a Schema. Some implementations (like TigerGraph)
    may override with a more specific signature (VertexConfig).

    Default implementation is a no-op. Override in subclasses as needed.

    Args:
        schema: Schema containing vertex definitions
    """
    pass

define_vertex_indexes(vertex_config, schema=None) abstractmethod

Define indexes for vertex classes.

Parameters:

Name Type Description Default
vertex_config VertexConfig

Vertex configuration containing index definitions

required
Source code in graflo/db/conn.py
@abc.abstractmethod
def define_vertex_indexes(
    self, vertex_config: VertexConfig, schema: Schema | None = None
):
    """Define indexes for vertex classes.

    Args:
        vertex_config: Vertex configuration containing index definitions
    """
    pass

delete_database(name) abstractmethod

Delete a database.

Parameters:

Name Type Description Default
name str

Name of the database to delete

required
Source code in graflo/db/conn.py
@abc.abstractmethod
def delete_database(self, name: str):
    """Delete a database.

    Args:
        name: Name of the database to delete
    """
    pass

delete_graph_structure(vertex_types=(), graph_names=(), delete_all=False) abstractmethod

Delete graph structure (graphs, vertex types, edge types) from the database.

This method deletes graphs and their associated vertex/edge types. The exact behavior depends on the database implementation:

  • ArangoDB: Deletes graphs and collections (vertex/edge collections)
  • Neo4j: Deletes nodes from labels (vertex types) and relationships
  • TigerGraph: Deletes graphs, vertex types, edge types, and jobs

Parameters:

Name Type Description Default
vertex_types tuple[str, ...] | list[str]

Vertex type names to delete (database-specific interpretation)

()
graph_names tuple[str, ...] | list[str]

Graph/database names to delete

()
delete_all bool

If True, delete all graphs and their associated structures

False
Source code in graflo/db/conn.py
@abc.abstractmethod
def delete_graph_structure(
    self,
    vertex_types: tuple[str, ...] | list[str] = (),
    graph_names: tuple[str, ...] | list[str] = (),
    delete_all: bool = False,
) -> None:
    """Delete graph structure (graphs, vertex types, edge types) from the database.

    This method deletes graphs and their associated vertex/edge types.
    The exact behavior depends on the database implementation:

    - ArangoDB: Deletes graphs and collections (vertex/edge collections)
    - Neo4j: Deletes nodes from labels (vertex types) and relationships
    - TigerGraph: Deletes graphs, vertex types, edge types, and jobs

    Args:
        vertex_types: Vertex type names to delete (database-specific interpretation)
        graph_names: Graph/database names to delete
        delete_all: If True, delete all graphs and their associated structures
    """
    pass

execute(query, **kwargs) abstractmethod

Execute a database query.

Parameters:

Name Type Description Default
query str | Any

Query to execute

required
**kwargs Any

Additional query parameters

{}

Returns:

Type Description
Any

Query result (database-specific)

Source code in graflo/db/conn.py
@abc.abstractmethod
def execute(self, query: str | Any, **kwargs: Any) -> Any:
    """Execute a database query.

    Args:
        query: Query to execute
        **kwargs: Additional query parameters

    Returns:
        Query result (database-specific)
    """
    pass

expression_flavor() classmethod

Expression flavor for filter rendering (AQL, CYPHER, GSQL).

Graph connection subclasses must set class attribute flavor to a DBType present in DB_TYPE_TO_EXPRESSION_FLAVOR.

Source code in graflo/db/conn.py
@classmethod
def expression_flavor(cls) -> ExpressionFlavor:
    """Expression flavor for filter rendering (AQL, CYPHER, GSQL).

    Graph connection subclasses must set class attribute `flavor` to a
    DBType present in DB_TYPE_TO_EXPRESSION_FLAVOR.
    """
    return DB_TYPE_TO_EXPRESSION_FLAVOR[cls.flavor]

fetch_docs(class_name, filters=None, limit=None, return_keys=None, unset_keys=None, **kwargs) abstractmethod

Fetch documents from a vertex type.

Parameters:

Name Type Description Default
class_name str

Name of the vertex type (or collection/label in database-specific terms)

required
filters list[Any] | dict[str, Any] | None

Query filters

None
limit int | None

Maximum number of documents to return

None
return_keys list[str] | None

Keys to return

None
unset_keys list[str] | None

Keys to unset

None
**kwargs Any

Additional database-specific parameters (e.g., field_types for TigerGraph)

{}

Returns:

Name Type Description
list list[dict[str, Any]]

Fetched documents

Source code in graflo/db/conn.py
@abc.abstractmethod
def fetch_docs(
    self,
    class_name: str,
    filters: list[Any] | dict[str, Any] | None = None,
    limit: int | None = None,
    return_keys: list[str] | None = None,
    unset_keys: list[str] | None = None,
    **kwargs: Any,
) -> list[dict[str, Any]]:
    """Fetch documents from a vertex type.

    Args:
        class_name: Name of the vertex type (or collection/label in database-specific terms)
        filters: Query filters
        limit: Maximum number of documents to return
        return_keys: Keys to return
        unset_keys: Keys to unset
        **kwargs: Additional database-specific parameters (e.g., field_types for TigerGraph)

    Returns:
        list: Fetched documents
    """
    pass

fetch_edges(from_type, from_id, edge_type=None, to_type=None, to_id=None, filters=None, limit=None, return_keys=None, unset_keys=None, **kwargs) abstractmethod

Fetch edges from the database.

Parameters:

Name Type Description Default
from_type str

Source vertex type

required
from_id str

Source vertex ID (required)

required
edge_type str | None

Optional edge type to filter by

None
to_type str | None

Optional target vertex type to filter by

None
to_id str | None

Optional target vertex ID to filter by

None
filters list[Any] | dict[str, Any] | None

Additional query filters

None
limit int | None

Maximum number of edges to return

None
return_keys list[str] | None

Keys to return (projection)

None
unset_keys list[str] | None

Keys to exclude (projection)

None
**kwargs Any

Additional database-specific parameters

{}

Returns:

Name Type Description
list list[dict[str, Any]]

List of fetched edges

Source code in graflo/db/conn.py
@abc.abstractmethod
def fetch_edges(
    self,
    from_type: str,
    from_id: str,
    edge_type: str | None = None,
    to_type: str | None = None,
    to_id: str | None = None,
    filters: list[Any] | dict[str, Any] | None = None,
    limit: int | None = None,
    return_keys: list[str] | None = None,
    unset_keys: list[str] | None = None,
    **kwargs: Any,
) -> list[dict[str, Any]]:
    """Fetch edges from the database.

    Args:
        from_type: Source vertex type
        from_id: Source vertex ID (required)
        edge_type: Optional edge type to filter by
        to_type: Optional target vertex type to filter by
        to_id: Optional target vertex ID to filter by
        filters: Additional query filters
        limit: Maximum number of edges to return
        return_keys: Keys to return (projection)
        unset_keys: Keys to exclude (projection)
        **kwargs: Additional database-specific parameters

    Returns:
        list: List of fetched edges
    """
    pass

fetch_present_documents(batch, class_name, match_keys, keep_keys=None, flatten=False, filters=None) abstractmethod

Fetch documents that exist in the database.

Parameters:

Name Type Description Default
batch list[dict[str, Any]]

Batch of documents to check

required
class_name str

Name of the collection

required
match_keys list[str] | tuple[str, ...]

Keys to match

required
keep_keys list[str] | tuple[str, ...] | None

Keys to keep in result

None
flatten bool

Whether to flatten the result. If True, returns a flat list. If False, returns a dict mapping batch indices to matching documents.

False
filters list[Any] | dict[str, Any] | None

Additional query filters

None

Returns:

Type Description
list[dict[str, Any]] | dict[int, list[dict[str, Any]]]

list | dict: Documents that exist in the database. Returns a list if flatten=True, otherwise returns a dict mapping batch indices to documents.

Source code in graflo/db/conn.py
@abc.abstractmethod
def fetch_present_documents(
    self,
    batch: list[dict[str, Any]],
    class_name: str,
    match_keys: list[str] | tuple[str, ...],
    keep_keys: list[str] | tuple[str, ...] | None = None,
    flatten: bool = False,
    filters: list[Any] | dict[str, Any] | None = None,
) -> list[dict[str, Any]] | dict[int, list[dict[str, Any]]]:
    """Fetch documents that exist in the database.

    Args:
        batch: Batch of documents to check
        class_name: Name of the collection
        match_keys: Keys to match
        keep_keys: Keys to keep in result
        flatten: Whether to flatten the result. If True, returns a flat list.
            If False, returns a dict mapping batch indices to matching documents.
        filters: Additional query filters

    Returns:
        list | dict: Documents that exist in the database. Returns a list if
            flatten=True, otherwise returns a dict mapping batch indices to documents.
    """
    pass

init_db(schema, recreate_schema) abstractmethod

Initialize the database with the given schema.

If the schema/graph already exists and recreate_schema is False, raises SchemaExistsError and the script halts.

Parameters:

Name Type Description Default
schema Schema

Schema to initialize the database with

required
recreate_schema bool

If True, drop existing schema and define new one. If False and schema/graph already exists, raises SchemaExistsError.

required
Source code in graflo/db/conn.py
@abc.abstractmethod
def init_db(self, schema: Schema, recreate_schema: bool) -> None:
    """Initialize the database with the given schema.

    If the schema/graph already exists and recreate_schema is False, raises
    SchemaExistsError and the script halts.

    Args:
        schema: Schema to initialize the database with
        recreate_schema: If True, drop existing schema and define new one.
            If False and schema/graph already exists, raises SchemaExistsError.
    """
    pass

insert_edges_batch(docs_edges, source_class, target_class, relation_name, match_keys_source, match_keys_target, filter_uniques=True, head=None, **kwargs) abstractmethod

Insert a batch of edges.

Parameters:

Name Type Description Default
docs_edges list[list[dict[str, Any]]] | list[Any] | None

Edge documents to insert

required
source_class str

Source vertex type/class

required
target_class str

Target vertex type/class

required
relation_name str

Name of the edge type/relation

required
match_keys_source tuple[str, ...]

Keys to match source vertices

required
match_keys_target tuple[str, ...]

Keys to match target vertices

required
filter_uniques bool

Whether to filter unique edges

True
head int | None

Optional limit on number of edges to insert

None
**kwargs Any

Additional insertion parameters (see also :func:consume_insert_edges_kwargs): - dry: If True, do not execute writes (supported where implemented) - collection_name: Edge collection (ArangoDB) or unused type-specific name - uniq_weight_fields: Uniqueness fields (ArangoDB upsert) - uniq_weight_collections: Uniqueness collections (ArangoDB upsert) - upsert_option: Use upsert instead of insert (ArangoDB) - relationship_merge_properties: Property names for Cypher MERGE (Neo4j, FalkorDB, Memgraph) so parallel edges differ by weights

{}
Source code in graflo/db/conn.py
@abc.abstractmethod
def insert_edges_batch(
    self,
    docs_edges: list[list[dict[str, Any]]] | list[Any] | None,
    source_class: str,
    target_class: str,
    relation_name: str,
    match_keys_source: tuple[str, ...],
    match_keys_target: tuple[str, ...],
    filter_uniques: bool = True,
    head: int | None = None,
    **kwargs: Any,
) -> None:
    """Insert a batch of edges.

    Args:
        docs_edges: Edge documents to insert
        source_class: Source vertex type/class
        target_class: Target vertex type/class
        relation_name: Name of the edge type/relation
        match_keys_source: Keys to match source vertices
        match_keys_target: Keys to match target vertices
        filter_uniques: Whether to filter unique edges
        head: Optional limit on number of edges to insert
        **kwargs: Additional insertion parameters (see also
            :func:`consume_insert_edges_kwargs`):
            - dry: If True, do not execute writes (supported where implemented)
            - collection_name: Edge collection (ArangoDB) or unused type-specific name
            - uniq_weight_fields: Uniqueness fields (ArangoDB upsert)
            - uniq_weight_collections: Uniqueness collections (ArangoDB upsert)
            - upsert_option: Use upsert instead of insert (ArangoDB)
            - relationship_merge_properties: Property names for Cypher MERGE
              (Neo4j, FalkorDB, Memgraph) so parallel edges differ by weights
    """
    pass

insert_return_batch(docs, class_name) abstractmethod

Insert documents and return the inserted documents.

Parameters:

Name Type Description Default
docs list[dict[str, Any]]

Documents to insert

required
class_name str

Name of the vertex type (or collection/label in database-specific terms)

required

Returns:

Type Description
list[dict[str, Any]] | str

list | str: Inserted documents, or a query string (database-specific behavior). Most implementations return a list of inserted documents. ArangoDB returns an AQL query string for deferred execution.

Source code in graflo/db/conn.py
@abc.abstractmethod
def insert_return_batch(
    self, docs: list[dict[str, Any]], class_name: str
) -> list[dict[str, Any]] | str:
    """Insert documents and return the inserted documents.

    Args:
        docs: Documents to insert
        class_name: Name of the vertex type (or collection/label in database-specific terms)

    Returns:
        list | str: Inserted documents, or a query string (database-specific behavior).
            Most implementations return a list of inserted documents. ArangoDB returns
            an AQL query string for deferred execution.
    """
    pass

keep_absent_documents(batch, class_name, match_keys, keep_keys=None, filters=None) abstractmethod

Keep documents that don't exist in the database.

Parameters:

Name Type Description Default
batch list[dict[str, Any]]

Batch of documents to check

required
class_name str

Name of the collection

required
match_keys list[str] | tuple[str, ...]

Keys to match

required
keep_keys list[str] | tuple[str, ...] | None

Keys to keep in result

None
filters list[Any] | dict[str, Any] | None

Additional query filters

None

Returns:

Name Type Description
list list[dict[str, Any]]

Documents that don't exist in the database

Source code in graflo/db/conn.py
@abc.abstractmethod
def keep_absent_documents(
    self,
    batch: list[dict[str, Any]],
    class_name: str,
    match_keys: list[str] | tuple[str, ...],
    keep_keys: list[str] | tuple[str, ...] | None = None,
    filters: list[Any] | dict[str, Any] | None = None,
) -> list[dict[str, Any]]:
    """Keep documents that don't exist in the database.

    Args:
        batch: Batch of documents to check
        class_name: Name of the collection
        match_keys: Keys to match
        keep_keys: Keys to keep in result
        filters: Additional query filters

    Returns:
        list: Documents that don't exist in the database
    """
    pass

upsert_docs_batch(docs, class_name, match_keys, **kwargs) abstractmethod

Upsert a batch of documents.

Parameters:

Name Type Description Default
docs list[dict[str, Any]]

Documents to upsert

required
class_name str

Name of the vertex type (or collection/label in database-specific terms)

required
match_keys list[str] | tuple[str, ...]

Keys to match for upsert

required
**kwargs Any

Additional upsert parameters

{}
Source code in graflo/db/conn.py
@abc.abstractmethod
def upsert_docs_batch(
    self,
    docs: list[dict[str, Any]],
    class_name: str,
    match_keys: list[str] | tuple[str, ...],
    **kwargs: Any,
) -> None:
    """Upsert a batch of documents.

    Args:
        docs: Documents to upsert
        class_name: Name of the vertex type (or collection/label in database-specific terms)
        match_keys: Keys to match for upsert
        **kwargs: Additional upsert parameters
    """
    pass

InsertEdgesKwArgs dataclass

Keyword arguments shared by :meth:Connection.insert_edges_batch implementations.

Source code in graflo/db/conn.py
@dataclass(frozen=True)
class InsertEdgesKwArgs:
    """Keyword arguments shared by :meth:`Connection.insert_edges_batch` implementations."""

    dry: bool
    collection_name: str | None
    uniq_weight_fields: Any
    uniq_weight_collections: Any
    upsert_option: bool
    relationship_merge_properties: Any

SchemaExistsError

Bases: RuntimeError

Raised when schema/graph already exists and recreate_schema is False.

Set recreate_schema=True to replace the existing schema, or use clear_data=True before ingestion to only clear data without touching the schema.

Source code in graflo/db/conn.py
class SchemaExistsError(RuntimeError):
    """Raised when schema/graph already exists and recreate_schema is False.

    Set recreate_schema=True to replace the existing schema, or use clear_data=True
    before ingestion to only clear data without touching the schema.
    """

consume_insert_edges_kwargs(kwargs)

Pop standard insert_edges_batch keys from kwargs and warn on unknown keys.

Mutates kwargs in place (removes consumed keys). Callers should not pass additional keyword arguments beyond those documented on :meth:Connection.insert_edges_batch.

Source code in graflo/db/conn.py
def consume_insert_edges_kwargs(kwargs: dict[str, Any]) -> InsertEdgesKwArgs:
    """Pop standard ``insert_edges_batch`` keys from *kwargs* and warn on unknown keys.

    Mutates *kwargs* in place (removes consumed keys). Callers should not pass
    additional keyword arguments beyond those documented on
    :meth:`Connection.insert_edges_batch`.
    """
    result = InsertEdgesKwArgs(
        dry=bool(kwargs.pop("dry", False)),
        collection_name=kwargs.pop("collection_name", None),
        uniq_weight_fields=kwargs.pop("uniq_weight_fields", None),
        uniq_weight_collections=kwargs.pop("uniq_weight_collections", None),
        upsert_option=bool(kwargs.pop("upsert_option", False)),
        relationship_merge_properties=kwargs.pop("relationship_merge_properties", None),
    )
    if kwargs:
        logger.warning(
            "insert_edges_batch: unsupported keyword arguments ignored: %s",
            sorted(kwargs.keys()),
        )
        kwargs.clear()
    return result