Skip to content

graflo.db.conn

Abstract database connection interface for graph databases.

This module defines the abstract interface for database connections, providing a unified API for different graph database implementations. It includes methods for database management, graph structure operations, and data manipulation.

Key Components
  • Connection: Abstract base class for database connections
  • ConnectionType: Type variable for connection implementations
The connection interface supports
  • Database/Graph creation and deletion
  • Graph structure management (vertex types, edge types)
  • Index definition
  • Document operations (insert, update, fetch)
  • Edge operations
  • Aggregation queries
Database Organization Terminology

Different databases organize graph data differently:

  • ArangoDB:

    • Database: Top-level container (like a schema)
    • Collections: Container for vertices (vertex collections)
    • Edge Collections: Container for edges
    • Graph: Named graph that connects vertex and edge collections
  • Neo4j:

    • Database: Top-level container
    • Labels: Categories for nodes (equivalent to vertex types)
    • Relationship Types: Types of relationships (equivalent to edge types)
    • No explicit "graph" concept - all nodes/relationships are in the database
  • TigerGraph:

    • Graph: Top-level container (functions like a database in ArangoDB)
    • Vertex Types: Global vertex type definitions (can be shared across graphs)
    • Edge Types: Global edge type definitions (can be shared across graphs)
    • Vertex and edge types are associated with graphs

When using the Connection interface, the terms "vertex type" and "edge type" are used generically to refer to the appropriate concept in each database.

Example

class MyConnection(Connection): ... def create_database(self, name: str): ... # Implementation ... def execute(self, query, **kwargs): ... # Implementation

Connection

Bases: ABC

Abstract base class for database connections.

This class defines the interface that all database connection implementations must follow. It provides methods for database/graph operations, graph structure management (vertex types, edge types), and data manipulation.

Note

All methods marked with @abc.abstractmethod must be implemented by concrete connection classes.

Source code in graflo/db/conn.py
class Connection(abc.ABC):
    """Abstract base class for database connections.

    This class defines the interface that all database connection implementations
    must follow. It provides methods for database/graph operations, graph structure
    management (vertex types, edge types), and data manipulation.

    Note:
        All methods marked with @abc.abstractmethod must be implemented by
        concrete connection classes.
    """

    def __init__(self):
        """Initialize the connection."""
        pass

    @abc.abstractmethod
    def create_database(self, name: str):
        """Create a new database.

        Args:
            name: Name of the database to create
        """
        pass

    @abc.abstractmethod
    def delete_database(self, name: str):
        """Delete a database.

        Args:
            name: Name of the database to delete
        """
        pass

    @abc.abstractmethod
    def execute(self, query, **kwargs):
        """Execute a database query.

        Args:
            query: Query to execute
            **kwargs: Additional query parameters
        """
        pass

    @abc.abstractmethod
    def close(self):
        """Close the database connection."""
        pass

    def define_indexes(self, schema: Schema):
        """Define indexes for vertices and edges in the schema.

        Args:
            schema: Schema containing vertex and edge configurations
        """
        self.define_vertex_indices(schema.vertex_config)
        self.define_edge_indices(schema.edge_config.edges_list(include_aux=True))

    @abc.abstractmethod
    def define_schema(self, schema: Schema):
        """Define collections based on the schema.

        Args:
            schema: Schema containing collection definitions
        """
        pass

    @abc.abstractmethod
    def delete_graph_structure(self, vertex_types=(), graph_names=(), delete_all=False):
        """Delete graph structure (graphs, vertex types, edge types) from the database.

        This method deletes graphs and their associated vertex/edge types.
        The exact behavior depends on the database implementation:

        - ArangoDB: Deletes graphs and collections (vertex/edge collections)
        - Neo4j: Deletes nodes from labels (vertex types) and relationships
        - TigerGraph: Deletes graphs, vertex types, edge types, and jobs

        Args:
            vertex_types: Vertex type names to delete (database-specific interpretation)
            graph_names: Graph/database names to delete
            delete_all: If True, delete all graphs and their associated structures
        """
        pass

    @abc.abstractmethod
    def init_db(self, schema: Schema, clean_start):
        """Initialize the database with the given schema.

        Args:
            schema: Schema to initialize the database with
            clean_start: Whether to clean existing data
        """
        pass

    @abc.abstractmethod
    def upsert_docs_batch(self, docs, class_name, match_keys, **kwargs):
        """Upsert a batch of documents.

        Args:
            docs: Documents to upsert
            class_name: Name of the vertex type (or collection/label in database-specific terms)
            match_keys: Keys to match for upsert
            **kwargs: Additional upsert parameters
        """
        pass

    @abc.abstractmethod
    def insert_edges_batch(
        self,
        docs_edges,
        source_class,
        target_class,
        relation_name,
        collection_name,
        match_keys_source,
        match_keys_target,
        filter_uniques=True,
        uniq_weight_fields=None,
        uniq_weight_collections=None,
        upsert_option=False,
        head=None,
        **kwargs,
    ):
        """Insert a batch of edges.

        Args:
            docs_edges: Edge documents to insert
            source_class: Source vertex type/class
            target_class: Target vertex type/class
            relation_name: Name of the edge type/relation
            collection_name: Name of the edge type (database-specific: collection/relationship type)
            match_keys_source: Keys to match source vertices
            match_keys_target: Keys to match target vertices
            filter_uniques: Whether to filter unique edges
            uniq_weight_fields: Fields to consider for uniqueness
            uniq_weight_collections: Vertex/edge types to consider for uniqueness (database-specific)
            upsert_option: Whether to upsert existing edges
            head: Optional head document
            **kwargs: Additional insertion parameters
        """
        pass

    @abc.abstractmethod
    def insert_return_batch(self, docs, class_name):
        """Insert documents and return the inserted documents.

        Args:
            docs: Documents to insert
            class_name: Name of the vertex type (or collection/label in database-specific terms)

        Returns:
            list: Inserted documents
        """
        pass

    @abc.abstractmethod
    def fetch_docs(
        self,
        class_name,
        filters,
        limit,
        return_keys,
        unset_keys,
        **kwargs,
    ):
        """Fetch documents from a vertex type.

        Args:
            class_name: Name of the vertex type (or collection/label in database-specific terms)
            filters: Query filters
            limit: Maximum number of documents to return
            return_keys: Keys to return
            unset_keys: Keys to unset
            **kwargs: Additional database-specific parameters (e.g., field_types for TigerGraph)

        Returns:
            list: Fetched documents
        """
        pass

    @abc.abstractmethod
    def fetch_edges(
        self,
        from_type: str,
        from_id: str,
        edge_type: str | None = None,
        to_type: str | None = None,
        to_id: str | None = None,
        filters: list | dict | None = None,
        limit: int | None = None,
        return_keys: list | None = None,
        unset_keys: list | None = None,
        **kwargs,
    ):
        """Fetch edges from the database.

        Args:
            from_type: Source vertex type
            from_id: Source vertex ID (required)
            edge_type: Optional edge type to filter by
            to_type: Optional target vertex type to filter by
            to_id: Optional target vertex ID to filter by
            filters: Additional query filters
            limit: Maximum number of edges to return
            return_keys: Keys to return (projection)
            unset_keys: Keys to exclude (projection)
            **kwargs: Additional database-specific parameters

        Returns:
            list: List of fetched edges
        """
        pass

    @abc.abstractmethod
    def fetch_present_documents(
        self,
        batch,
        class_name,
        match_keys,
        keep_keys,
        flatten=False,
        filters: list | dict | None = None,
    ):
        """Fetch documents that exist in the database.

        Args:
            batch: Batch of documents to check
            class_name: Name of the collection
            match_keys: Keys to match
            keep_keys: Keys to keep in result
            flatten: Whether to flatten the result
            filters: Additional query filters

        Returns:
            list: Documents that exist in the database
        """
        pass

    @abc.abstractmethod
    def aggregate(
        self,
        class_name,
        aggregation_function: AggregationType,
        discriminant: str | None = None,
        aggregated_field: str | None = None,
        filters: list | dict | None = None,
    ):
        """Perform aggregation on a collection.

        Args:
            class_name: Name of the collection
            aggregation_function: Type of aggregation to perform
            discriminant: Field to group by
            aggregated_field: Field to aggregate
            filters: Query filters

        Returns:
            dict: Aggregation results
        """
        pass

    @abc.abstractmethod
    def keep_absent_documents(
        self,
        batch,
        class_name,
        match_keys,
        keep_keys,
        filters: list | dict | None = None,
    ):
        """Keep documents that don't exist in the database.

        Args:
            batch: Batch of documents to check
            class_name: Name of the collection
            match_keys: Keys to match
            keep_keys: Keys to keep in result
            filters: Additional query filters

        Returns:
            list: Documents that don't exist in the database
        """
        pass

    @abc.abstractmethod
    def define_vertex_indices(self, vertex_config: VertexConfig):
        """Define indices for vertex collections.

        Args:
            vertex_config: Vertex configuration containing index definitions
        """
        pass

    @abc.abstractmethod
    def define_edge_indices(self, edges: list[Edge]):
        """Define indices for edge collections.

        Args:
            edges: List of edge configurations containing index definitions
        """
        pass

__init__()

Initialize the connection.

Source code in graflo/db/conn.py
def __init__(self):
    """Initialize the connection."""
    pass

aggregate(class_name, aggregation_function, discriminant=None, aggregated_field=None, filters=None) abstractmethod

Perform aggregation on a collection.

Parameters:

Name Type Description Default
class_name

Name of the collection

required
aggregation_function AggregationType

Type of aggregation to perform

required
discriminant str | None

Field to group by

None
aggregated_field str | None

Field to aggregate

None
filters list | dict | None

Query filters

None

Returns:

Name Type Description
dict

Aggregation results

Source code in graflo/db/conn.py
@abc.abstractmethod
def aggregate(
    self,
    class_name,
    aggregation_function: AggregationType,
    discriminant: str | None = None,
    aggregated_field: str | None = None,
    filters: list | dict | None = None,
):
    """Perform aggregation on a collection.

    Args:
        class_name: Name of the collection
        aggregation_function: Type of aggregation to perform
        discriminant: Field to group by
        aggregated_field: Field to aggregate
        filters: Query filters

    Returns:
        dict: Aggregation results
    """
    pass

close() abstractmethod

Close the database connection.

Source code in graflo/db/conn.py
@abc.abstractmethod
def close(self):
    """Close the database connection."""
    pass

create_database(name) abstractmethod

Create a new database.

Parameters:

Name Type Description Default
name str

Name of the database to create

required
Source code in graflo/db/conn.py
@abc.abstractmethod
def create_database(self, name: str):
    """Create a new database.

    Args:
        name: Name of the database to create
    """
    pass

define_edge_indices(edges) abstractmethod

Define indices for edge collections.

Parameters:

Name Type Description Default
edges list[Edge]

List of edge configurations containing index definitions

required
Source code in graflo/db/conn.py
@abc.abstractmethod
def define_edge_indices(self, edges: list[Edge]):
    """Define indices for edge collections.

    Args:
        edges: List of edge configurations containing index definitions
    """
    pass

define_indexes(schema)

Define indexes for vertices and edges in the schema.

Parameters:

Name Type Description Default
schema Schema

Schema containing vertex and edge configurations

required
Source code in graflo/db/conn.py
def define_indexes(self, schema: Schema):
    """Define indexes for vertices and edges in the schema.

    Args:
        schema: Schema containing vertex and edge configurations
    """
    self.define_vertex_indices(schema.vertex_config)
    self.define_edge_indices(schema.edge_config.edges_list(include_aux=True))

define_schema(schema) abstractmethod

Define collections based on the schema.

Parameters:

Name Type Description Default
schema Schema

Schema containing collection definitions

required
Source code in graflo/db/conn.py
@abc.abstractmethod
def define_schema(self, schema: Schema):
    """Define collections based on the schema.

    Args:
        schema: Schema containing collection definitions
    """
    pass

define_vertex_indices(vertex_config) abstractmethod

Define indices for vertex collections.

Parameters:

Name Type Description Default
vertex_config VertexConfig

Vertex configuration containing index definitions

required
Source code in graflo/db/conn.py
@abc.abstractmethod
def define_vertex_indices(self, vertex_config: VertexConfig):
    """Define indices for vertex collections.

    Args:
        vertex_config: Vertex configuration containing index definitions
    """
    pass

delete_database(name) abstractmethod

Delete a database.

Parameters:

Name Type Description Default
name str

Name of the database to delete

required
Source code in graflo/db/conn.py
@abc.abstractmethod
def delete_database(self, name: str):
    """Delete a database.

    Args:
        name: Name of the database to delete
    """
    pass

delete_graph_structure(vertex_types=(), graph_names=(), delete_all=False) abstractmethod

Delete graph structure (graphs, vertex types, edge types) from the database.

This method deletes graphs and their associated vertex/edge types. The exact behavior depends on the database implementation:

  • ArangoDB: Deletes graphs and collections (vertex/edge collections)
  • Neo4j: Deletes nodes from labels (vertex types) and relationships
  • TigerGraph: Deletes graphs, vertex types, edge types, and jobs

Parameters:

Name Type Description Default
vertex_types

Vertex type names to delete (database-specific interpretation)

()
graph_names

Graph/database names to delete

()
delete_all

If True, delete all graphs and their associated structures

False
Source code in graflo/db/conn.py
@abc.abstractmethod
def delete_graph_structure(self, vertex_types=(), graph_names=(), delete_all=False):
    """Delete graph structure (graphs, vertex types, edge types) from the database.

    This method deletes graphs and their associated vertex/edge types.
    The exact behavior depends on the database implementation:

    - ArangoDB: Deletes graphs and collections (vertex/edge collections)
    - Neo4j: Deletes nodes from labels (vertex types) and relationships
    - TigerGraph: Deletes graphs, vertex types, edge types, and jobs

    Args:
        vertex_types: Vertex type names to delete (database-specific interpretation)
        graph_names: Graph/database names to delete
        delete_all: If True, delete all graphs and their associated structures
    """
    pass

execute(query, **kwargs) abstractmethod

Execute a database query.

Parameters:

Name Type Description Default
query

Query to execute

required
**kwargs

Additional query parameters

{}
Source code in graflo/db/conn.py
@abc.abstractmethod
def execute(self, query, **kwargs):
    """Execute a database query.

    Args:
        query: Query to execute
        **kwargs: Additional query parameters
    """
    pass

fetch_docs(class_name, filters, limit, return_keys, unset_keys, **kwargs) abstractmethod

Fetch documents from a vertex type.

Parameters:

Name Type Description Default
class_name

Name of the vertex type (or collection/label in database-specific terms)

required
filters

Query filters

required
limit

Maximum number of documents to return

required
return_keys

Keys to return

required
unset_keys

Keys to unset

required
**kwargs

Additional database-specific parameters (e.g., field_types for TigerGraph)

{}

Returns:

Name Type Description
list

Fetched documents

Source code in graflo/db/conn.py
@abc.abstractmethod
def fetch_docs(
    self,
    class_name,
    filters,
    limit,
    return_keys,
    unset_keys,
    **kwargs,
):
    """Fetch documents from a vertex type.

    Args:
        class_name: Name of the vertex type (or collection/label in database-specific terms)
        filters: Query filters
        limit: Maximum number of documents to return
        return_keys: Keys to return
        unset_keys: Keys to unset
        **kwargs: Additional database-specific parameters (e.g., field_types for TigerGraph)

    Returns:
        list: Fetched documents
    """
    pass

fetch_edges(from_type, from_id, edge_type=None, to_type=None, to_id=None, filters=None, limit=None, return_keys=None, unset_keys=None, **kwargs) abstractmethod

Fetch edges from the database.

Parameters:

Name Type Description Default
from_type str

Source vertex type

required
from_id str

Source vertex ID (required)

required
edge_type str | None

Optional edge type to filter by

None
to_type str | None

Optional target vertex type to filter by

None
to_id str | None

Optional target vertex ID to filter by

None
filters list | dict | None

Additional query filters

None
limit int | None

Maximum number of edges to return

None
return_keys list | None

Keys to return (projection)

None
unset_keys list | None

Keys to exclude (projection)

None
**kwargs

Additional database-specific parameters

{}

Returns:

Name Type Description
list

List of fetched edges

Source code in graflo/db/conn.py
@abc.abstractmethod
def fetch_edges(
    self,
    from_type: str,
    from_id: str,
    edge_type: str | None = None,
    to_type: str | None = None,
    to_id: str | None = None,
    filters: list | dict | None = None,
    limit: int | None = None,
    return_keys: list | None = None,
    unset_keys: list | None = None,
    **kwargs,
):
    """Fetch edges from the database.

    Args:
        from_type: Source vertex type
        from_id: Source vertex ID (required)
        edge_type: Optional edge type to filter by
        to_type: Optional target vertex type to filter by
        to_id: Optional target vertex ID to filter by
        filters: Additional query filters
        limit: Maximum number of edges to return
        return_keys: Keys to return (projection)
        unset_keys: Keys to exclude (projection)
        **kwargs: Additional database-specific parameters

    Returns:
        list: List of fetched edges
    """
    pass

fetch_present_documents(batch, class_name, match_keys, keep_keys, flatten=False, filters=None) abstractmethod

Fetch documents that exist in the database.

Parameters:

Name Type Description Default
batch

Batch of documents to check

required
class_name

Name of the collection

required
match_keys

Keys to match

required
keep_keys

Keys to keep in result

required
flatten

Whether to flatten the result

False
filters list | dict | None

Additional query filters

None

Returns:

Name Type Description
list

Documents that exist in the database

Source code in graflo/db/conn.py
@abc.abstractmethod
def fetch_present_documents(
    self,
    batch,
    class_name,
    match_keys,
    keep_keys,
    flatten=False,
    filters: list | dict | None = None,
):
    """Fetch documents that exist in the database.

    Args:
        batch: Batch of documents to check
        class_name: Name of the collection
        match_keys: Keys to match
        keep_keys: Keys to keep in result
        flatten: Whether to flatten the result
        filters: Additional query filters

    Returns:
        list: Documents that exist in the database
    """
    pass

init_db(schema, clean_start) abstractmethod

Initialize the database with the given schema.

Parameters:

Name Type Description Default
schema Schema

Schema to initialize the database with

required
clean_start

Whether to clean existing data

required
Source code in graflo/db/conn.py
@abc.abstractmethod
def init_db(self, schema: Schema, clean_start):
    """Initialize the database with the given schema.

    Args:
        schema: Schema to initialize the database with
        clean_start: Whether to clean existing data
    """
    pass

insert_edges_batch(docs_edges, source_class, target_class, relation_name, collection_name, match_keys_source, match_keys_target, filter_uniques=True, uniq_weight_fields=None, uniq_weight_collections=None, upsert_option=False, head=None, **kwargs) abstractmethod

Insert a batch of edges.

Parameters:

Name Type Description Default
docs_edges

Edge documents to insert

required
source_class

Source vertex type/class

required
target_class

Target vertex type/class

required
relation_name

Name of the edge type/relation

required
collection_name

Name of the edge type (database-specific: collection/relationship type)

required
match_keys_source

Keys to match source vertices

required
match_keys_target

Keys to match target vertices

required
filter_uniques

Whether to filter unique edges

True
uniq_weight_fields

Fields to consider for uniqueness

None
uniq_weight_collections

Vertex/edge types to consider for uniqueness (database-specific)

None
upsert_option

Whether to upsert existing edges

False
head

Optional head document

None
**kwargs

Additional insertion parameters

{}
Source code in graflo/db/conn.py
@abc.abstractmethod
def insert_edges_batch(
    self,
    docs_edges,
    source_class,
    target_class,
    relation_name,
    collection_name,
    match_keys_source,
    match_keys_target,
    filter_uniques=True,
    uniq_weight_fields=None,
    uniq_weight_collections=None,
    upsert_option=False,
    head=None,
    **kwargs,
):
    """Insert a batch of edges.

    Args:
        docs_edges: Edge documents to insert
        source_class: Source vertex type/class
        target_class: Target vertex type/class
        relation_name: Name of the edge type/relation
        collection_name: Name of the edge type (database-specific: collection/relationship type)
        match_keys_source: Keys to match source vertices
        match_keys_target: Keys to match target vertices
        filter_uniques: Whether to filter unique edges
        uniq_weight_fields: Fields to consider for uniqueness
        uniq_weight_collections: Vertex/edge types to consider for uniqueness (database-specific)
        upsert_option: Whether to upsert existing edges
        head: Optional head document
        **kwargs: Additional insertion parameters
    """
    pass

insert_return_batch(docs, class_name) abstractmethod

Insert documents and return the inserted documents.

Parameters:

Name Type Description Default
docs

Documents to insert

required
class_name

Name of the vertex type (or collection/label in database-specific terms)

required

Returns:

Name Type Description
list

Inserted documents

Source code in graflo/db/conn.py
@abc.abstractmethod
def insert_return_batch(self, docs, class_name):
    """Insert documents and return the inserted documents.

    Args:
        docs: Documents to insert
        class_name: Name of the vertex type (or collection/label in database-specific terms)

    Returns:
        list: Inserted documents
    """
    pass

keep_absent_documents(batch, class_name, match_keys, keep_keys, filters=None) abstractmethod

Keep documents that don't exist in the database.

Parameters:

Name Type Description Default
batch

Batch of documents to check

required
class_name

Name of the collection

required
match_keys

Keys to match

required
keep_keys

Keys to keep in result

required
filters list | dict | None

Additional query filters

None

Returns:

Name Type Description
list

Documents that don't exist in the database

Source code in graflo/db/conn.py
@abc.abstractmethod
def keep_absent_documents(
    self,
    batch,
    class_name,
    match_keys,
    keep_keys,
    filters: list | dict | None = None,
):
    """Keep documents that don't exist in the database.

    Args:
        batch: Batch of documents to check
        class_name: Name of the collection
        match_keys: Keys to match
        keep_keys: Keys to keep in result
        filters: Additional query filters

    Returns:
        list: Documents that don't exist in the database
    """
    pass

upsert_docs_batch(docs, class_name, match_keys, **kwargs) abstractmethod

Upsert a batch of documents.

Parameters:

Name Type Description Default
docs

Documents to upsert

required
class_name

Name of the vertex type (or collection/label in database-specific terms)

required
match_keys

Keys to match for upsert

required
**kwargs

Additional upsert parameters

{}
Source code in graflo/db/conn.py
@abc.abstractmethod
def upsert_docs_batch(self, docs, class_name, match_keys, **kwargs):
    """Upsert a batch of documents.

    Args:
        docs: Documents to upsert
        class_name: Name of the vertex type (or collection/label in database-specific terms)
        match_keys: Keys to match for upsert
        **kwargs: Additional upsert parameters
    """
    pass