Skip to content

graphcast.db

Database connection and management components.

This package provides database connection implementations and management utilities for different graph databases (ArangoDB, Neo4j). It includes connection interfaces, query execution, and database operations.

Key Components
  • Connection: Abstract database connection interface
  • ConnectionManager: Database connection management
  • ArangoDB: ArangoDB-specific implementation
  • Neo4j: Neo4j-specific implementation
  • Query: Query generation and execution utilities
Example

from graphcast.db import ConnectionManager from graphcast.db.arango import ArangoConnection manager = ConnectionManager( ... connection_config={"url": "http://localhost:8529"}, ... conn_class=ArangoConnection ... ) with manager as conn: ... conn.init_db(schema)

Connection

Bases: ABC

Abstract base class for database connections.

This class defines the interface that all database connection implementations must follow. It provides methods for database operations, collection management, and data manipulation.

Note

All methods marked with @abc.abstractmethod must be implemented by concrete connection classes.

Source code in graphcast/db/connection.py
class Connection(abc.ABC):
    """Abstract base class for database connections.

    This class defines the interface that all database connection implementations
    must follow. It provides methods for database operations, collection management,
    and data manipulation.

    Note:
        All methods marked with @abc.abstractmethod must be implemented by
        concrete connection classes.
    """

    def __init__(self):
        """Initialize the connection."""
        pass

    @abc.abstractmethod
    def create_database(self, name: str):
        """Create a new database.

        Args:
            name: Name of the database to create
        """
        pass

    @abc.abstractmethod
    def delete_database(self, name: str):
        """Delete a database.

        Args:
            name: Name of the database to delete
        """
        pass

    @abc.abstractmethod
    def execute(self, query, **kwargs):
        """Execute a database query.

        Args:
            query: Query to execute
            **kwargs: Additional query parameters
        """
        pass

    @abc.abstractmethod
    def close(self):
        """Close the database connection."""
        pass

    def define_indexes(self, schema: Schema):
        """Define indexes for vertices and edges in the schema.

        Args:
            schema: Schema containing vertex and edge configurations
        """
        self.define_vertex_indices(schema.vertex_config)
        self.define_edge_indices(schema.edge_config.edges_list(include_aux=True))

    @abc.abstractmethod
    def define_collections(self, schema: Schema):
        """Define collections based on the schema.

        Args:
            schema: Schema containing collection definitions
        """
        pass

    @abc.abstractmethod
    def delete_collections(self, cnames=(), gnames=(), delete_all=False):
        """Delete collections from the database.

        Args:
            cnames: Collection names to delete
            gnames: Graph names to delete
            delete_all: Whether to delete all collections
        """
        pass

    @abc.abstractmethod
    def init_db(self, schema: Schema, clean_start):
        """Initialize the database with the given schema.

        Args:
            schema: Schema to initialize the database with
            clean_start: Whether to clean existing data
        """
        pass

    @abc.abstractmethod
    def upsert_docs_batch(self, docs, class_name, match_keys, **kwargs):
        """Upsert a batch of documents.

        Args:
            docs: Documents to upsert
            class_name: Name of the collection
            match_keys: Keys to match for upsert
            **kwargs: Additional upsert parameters
        """
        pass

    @abc.abstractmethod
    def insert_edges_batch(
        self,
        docs_edges,
        source_class,
        target_class,
        relation_name,
        collection_name,
        match_keys_source,
        match_keys_target,
        filter_uniques=True,
        uniq_weight_fields=None,
        uniq_weight_collections=None,
        upsert_option=False,
        head=None,
        **kwargs,
    ):
        """Insert a batch of edges.

        Args:
            docs_edges: Edge documents to insert
            source_class: Source vertex class
            target_class: Target vertex class
            relation_name: Name of the relation
            collection_name: Name of the edge collection
            match_keys_source: Keys to match source vertices
            match_keys_target: Keys to match target vertices
            filter_uniques: Whether to filter unique edges
            uniq_weight_fields: Fields to consider for uniqueness
            uniq_weight_collections: Collections to consider for uniqueness
            upsert_option: Whether to upsert existing edges
            head: Optional head document
            **kwargs: Additional insertion parameters
        """
        pass

    @abc.abstractmethod
    def insert_return_batch(self, docs, class_name):
        """Insert documents and return the inserted documents.

        Args:
            docs: Documents to insert
            class_name: Name of the collection

        Returns:
            list: Inserted documents
        """
        pass

    @abc.abstractmethod
    def fetch_docs(self, class_name, filters, limit, return_keys, unset_keys):
        """Fetch documents from a collection.

        Args:
            class_name: Name of the collection
            filters: Query filters
            limit: Maximum number of documents to return
            return_keys: Keys to return
            unset_keys: Keys to unset

        Returns:
            list: Fetched documents
        """
        pass

    @abc.abstractmethod
    def fetch_present_documents(
        self,
        batch,
        class_name,
        match_keys,
        keep_keys,
        flatten=False,
        filters: list | dict | None = None,
    ):
        """Fetch documents that exist in the database.

        Args:
            batch: Batch of documents to check
            class_name: Name of the collection
            match_keys: Keys to match
            keep_keys: Keys to keep in result
            flatten: Whether to flatten the result
            filters: Additional query filters

        Returns:
            list: Documents that exist in the database
        """
        pass

    @abc.abstractmethod
    def aggregate(
        self,
        class_name,
        aggregation_function: AggregationType,
        discriminant: str | None = None,
        aggregated_field: str | None = None,
        filters: list | dict | None = None,
    ):
        """Perform aggregation on a collection.

        Args:
            class_name: Name of the collection
            aggregation_function: Type of aggregation to perform
            discriminant: Field to group by
            aggregated_field: Field to aggregate
            filters: Query filters

        Returns:
            dict: Aggregation results
        """
        pass

    @abc.abstractmethod
    def keep_absent_documents(
        self,
        batch,
        class_name,
        match_keys,
        keep_keys,
        filters: list | dict | None = None,
    ):
        """Keep documents that don't exist in the database.

        Args:
            batch: Batch of documents to check
            class_name: Name of the collection
            match_keys: Keys to match
            keep_keys: Keys to keep in result
            filters: Additional query filters

        Returns:
            list: Documents that don't exist in the database
        """
        pass

    @abc.abstractmethod
    def define_vertex_indices(self, vertex_config: VertexConfig):
        """Define indices for vertex collections.

        Args:
            vertex_config: Vertex configuration containing index definitions
        """
        pass

    @abc.abstractmethod
    def define_edge_indices(self, edges: list[Edge]):
        """Define indices for edge collections.

        Args:
            edges: List of edge configurations containing index definitions
        """
        pass

__init__()

Initialize the connection.

Source code in graphcast/db/connection.py
def __init__(self):
    """Initialize the connection."""
    pass

aggregate(class_name, aggregation_function, discriminant=None, aggregated_field=None, filters=None) abstractmethod

Perform aggregation on a collection.

Parameters:

Name Type Description Default
class_name

Name of the collection

required
aggregation_function AggregationType

Type of aggregation to perform

required
discriminant str | None

Field to group by

None
aggregated_field str | None

Field to aggregate

None
filters list | dict | None

Query filters

None

Returns:

Name Type Description
dict

Aggregation results

Source code in graphcast/db/connection.py
@abc.abstractmethod
def aggregate(
    self,
    class_name,
    aggregation_function: AggregationType,
    discriminant: str | None = None,
    aggregated_field: str | None = None,
    filters: list | dict | None = None,
):
    """Perform aggregation on a collection.

    Args:
        class_name: Name of the collection
        aggregation_function: Type of aggregation to perform
        discriminant: Field to group by
        aggregated_field: Field to aggregate
        filters: Query filters

    Returns:
        dict: Aggregation results
    """
    pass

close() abstractmethod

Close the database connection.

Source code in graphcast/db/connection.py
@abc.abstractmethod
def close(self):
    """Close the database connection."""
    pass

create_database(name) abstractmethod

Create a new database.

Parameters:

Name Type Description Default
name str

Name of the database to create

required
Source code in graphcast/db/connection.py
@abc.abstractmethod
def create_database(self, name: str):
    """Create a new database.

    Args:
        name: Name of the database to create
    """
    pass

define_collections(schema) abstractmethod

Define collections based on the schema.

Parameters:

Name Type Description Default
schema Schema

Schema containing collection definitions

required
Source code in graphcast/db/connection.py
@abc.abstractmethod
def define_collections(self, schema: Schema):
    """Define collections based on the schema.

    Args:
        schema: Schema containing collection definitions
    """
    pass

define_edge_indices(edges) abstractmethod

Define indices for edge collections.

Parameters:

Name Type Description Default
edges list[Edge]

List of edge configurations containing index definitions

required
Source code in graphcast/db/connection.py
@abc.abstractmethod
def define_edge_indices(self, edges: list[Edge]):
    """Define indices for edge collections.

    Args:
        edges: List of edge configurations containing index definitions
    """
    pass

define_indexes(schema)

Define indexes for vertices and edges in the schema.

Parameters:

Name Type Description Default
schema Schema

Schema containing vertex and edge configurations

required
Source code in graphcast/db/connection.py
def define_indexes(self, schema: Schema):
    """Define indexes for vertices and edges in the schema.

    Args:
        schema: Schema containing vertex and edge configurations
    """
    self.define_vertex_indices(schema.vertex_config)
    self.define_edge_indices(schema.edge_config.edges_list(include_aux=True))

define_vertex_indices(vertex_config) abstractmethod

Define indices for vertex collections.

Parameters:

Name Type Description Default
vertex_config VertexConfig

Vertex configuration containing index definitions

required
Source code in graphcast/db/connection.py
@abc.abstractmethod
def define_vertex_indices(self, vertex_config: VertexConfig):
    """Define indices for vertex collections.

    Args:
        vertex_config: Vertex configuration containing index definitions
    """
    pass

delete_collections(cnames=(), gnames=(), delete_all=False) abstractmethod

Delete collections from the database.

Parameters:

Name Type Description Default
cnames

Collection names to delete

()
gnames

Graph names to delete

()
delete_all

Whether to delete all collections

False
Source code in graphcast/db/connection.py
@abc.abstractmethod
def delete_collections(self, cnames=(), gnames=(), delete_all=False):
    """Delete collections from the database.

    Args:
        cnames: Collection names to delete
        gnames: Graph names to delete
        delete_all: Whether to delete all collections
    """
    pass

delete_database(name) abstractmethod

Delete a database.

Parameters:

Name Type Description Default
name str

Name of the database to delete

required
Source code in graphcast/db/connection.py
@abc.abstractmethod
def delete_database(self, name: str):
    """Delete a database.

    Args:
        name: Name of the database to delete
    """
    pass

execute(query, **kwargs) abstractmethod

Execute a database query.

Parameters:

Name Type Description Default
query

Query to execute

required
**kwargs

Additional query parameters

{}
Source code in graphcast/db/connection.py
@abc.abstractmethod
def execute(self, query, **kwargs):
    """Execute a database query.

    Args:
        query: Query to execute
        **kwargs: Additional query parameters
    """
    pass

fetch_docs(class_name, filters, limit, return_keys, unset_keys) abstractmethod

Fetch documents from a collection.

Parameters:

Name Type Description Default
class_name

Name of the collection

required
filters

Query filters

required
limit

Maximum number of documents to return

required
return_keys

Keys to return

required
unset_keys

Keys to unset

required

Returns:

Name Type Description
list

Fetched documents

Source code in graphcast/db/connection.py
@abc.abstractmethod
def fetch_docs(self, class_name, filters, limit, return_keys, unset_keys):
    """Fetch documents from a collection.

    Args:
        class_name: Name of the collection
        filters: Query filters
        limit: Maximum number of documents to return
        return_keys: Keys to return
        unset_keys: Keys to unset

    Returns:
        list: Fetched documents
    """
    pass

fetch_present_documents(batch, class_name, match_keys, keep_keys, flatten=False, filters=None) abstractmethod

Fetch documents that exist in the database.

Parameters:

Name Type Description Default
batch

Batch of documents to check

required
class_name

Name of the collection

required
match_keys

Keys to match

required
keep_keys

Keys to keep in result

required
flatten

Whether to flatten the result

False
filters list | dict | None

Additional query filters

None

Returns:

Name Type Description
list

Documents that exist in the database

Source code in graphcast/db/connection.py
@abc.abstractmethod
def fetch_present_documents(
    self,
    batch,
    class_name,
    match_keys,
    keep_keys,
    flatten=False,
    filters: list | dict | None = None,
):
    """Fetch documents that exist in the database.

    Args:
        batch: Batch of documents to check
        class_name: Name of the collection
        match_keys: Keys to match
        keep_keys: Keys to keep in result
        flatten: Whether to flatten the result
        filters: Additional query filters

    Returns:
        list: Documents that exist in the database
    """
    pass

init_db(schema, clean_start) abstractmethod

Initialize the database with the given schema.

Parameters:

Name Type Description Default
schema Schema

Schema to initialize the database with

required
clean_start

Whether to clean existing data

required
Source code in graphcast/db/connection.py
@abc.abstractmethod
def init_db(self, schema: Schema, clean_start):
    """Initialize the database with the given schema.

    Args:
        schema: Schema to initialize the database with
        clean_start: Whether to clean existing data
    """
    pass

insert_edges_batch(docs_edges, source_class, target_class, relation_name, collection_name, match_keys_source, match_keys_target, filter_uniques=True, uniq_weight_fields=None, uniq_weight_collections=None, upsert_option=False, head=None, **kwargs) abstractmethod

Insert a batch of edges.

Parameters:

Name Type Description Default
docs_edges

Edge documents to insert

required
source_class

Source vertex class

required
target_class

Target vertex class

required
relation_name

Name of the relation

required
collection_name

Name of the edge collection

required
match_keys_source

Keys to match source vertices

required
match_keys_target

Keys to match target vertices

required
filter_uniques

Whether to filter unique edges

True
uniq_weight_fields

Fields to consider for uniqueness

None
uniq_weight_collections

Collections to consider for uniqueness

None
upsert_option

Whether to upsert existing edges

False
head

Optional head document

None
**kwargs

Additional insertion parameters

{}
Source code in graphcast/db/connection.py
@abc.abstractmethod
def insert_edges_batch(
    self,
    docs_edges,
    source_class,
    target_class,
    relation_name,
    collection_name,
    match_keys_source,
    match_keys_target,
    filter_uniques=True,
    uniq_weight_fields=None,
    uniq_weight_collections=None,
    upsert_option=False,
    head=None,
    **kwargs,
):
    """Insert a batch of edges.

    Args:
        docs_edges: Edge documents to insert
        source_class: Source vertex class
        target_class: Target vertex class
        relation_name: Name of the relation
        collection_name: Name of the edge collection
        match_keys_source: Keys to match source vertices
        match_keys_target: Keys to match target vertices
        filter_uniques: Whether to filter unique edges
        uniq_weight_fields: Fields to consider for uniqueness
        uniq_weight_collections: Collections to consider for uniqueness
        upsert_option: Whether to upsert existing edges
        head: Optional head document
        **kwargs: Additional insertion parameters
    """
    pass

insert_return_batch(docs, class_name) abstractmethod

Insert documents and return the inserted documents.

Parameters:

Name Type Description Default
docs

Documents to insert

required
class_name

Name of the collection

required

Returns:

Name Type Description
list

Inserted documents

Source code in graphcast/db/connection.py
@abc.abstractmethod
def insert_return_batch(self, docs, class_name):
    """Insert documents and return the inserted documents.

    Args:
        docs: Documents to insert
        class_name: Name of the collection

    Returns:
        list: Inserted documents
    """
    pass

keep_absent_documents(batch, class_name, match_keys, keep_keys, filters=None) abstractmethod

Keep documents that don't exist in the database.

Parameters:

Name Type Description Default
batch

Batch of documents to check

required
class_name

Name of the collection

required
match_keys

Keys to match

required
keep_keys

Keys to keep in result

required
filters list | dict | None

Additional query filters

None

Returns:

Name Type Description
list

Documents that don't exist in the database

Source code in graphcast/db/connection.py
@abc.abstractmethod
def keep_absent_documents(
    self,
    batch,
    class_name,
    match_keys,
    keep_keys,
    filters: list | dict | None = None,
):
    """Keep documents that don't exist in the database.

    Args:
        batch: Batch of documents to check
        class_name: Name of the collection
        match_keys: Keys to match
        keep_keys: Keys to keep in result
        filters: Additional query filters

    Returns:
        list: Documents that don't exist in the database
    """
    pass

upsert_docs_batch(docs, class_name, match_keys, **kwargs) abstractmethod

Upsert a batch of documents.

Parameters:

Name Type Description Default
docs

Documents to upsert

required
class_name

Name of the collection

required
match_keys

Keys to match for upsert

required
**kwargs

Additional upsert parameters

{}
Source code in graphcast/db/connection.py
@abc.abstractmethod
def upsert_docs_batch(self, docs, class_name, match_keys, **kwargs):
    """Upsert a batch of documents.

    Args:
        docs: Documents to upsert
        class_name: Name of the collection
        match_keys: Keys to match for upsert
        **kwargs: Additional upsert parameters
    """
    pass

ConnectionManager

Manager for database connections.

This class manages database connections to different graph database implementations. It provides a context manager interface for safe connection handling and automatic cleanup.

Attributes:

Name Type Description
conn_class_mapping

Mapping of connection types to connection classes

config ProtoConnectionConfig

Connection configuration

working_db

Current working database name

conn

Active database connection

Source code in graphcast/db/manager.py
class ConnectionManager:
    """Manager for database connections.

    This class manages database connections to different graph database
    implementations. It provides a context manager interface for safe
    connection handling and automatic cleanup.

    Attributes:
        conn_class_mapping: Mapping of connection types to connection classes
        config: Connection configuration
        working_db: Current working database name
        conn: Active database connection
    """

    conn_class_mapping = {
        ConnectionKind.ARANGO: ArangoConnection,
        ConnectionKind.NEO4J: Neo4jConnection,
    }

    def __init__(
        self,
        secret_path=None,
        args=None,
        connection_config: Optional[ProtoConnectionConfig] = None,
        **kwargs,
    ):
        """Initialize the connection manager.

        Args:
            secret_path: Path to configuration file
            args: Command line arguments
            connection_config: Optional connection configuration
            **kwargs: Additional configuration parameters
        """
        self.config: ProtoConnectionConfig = (
            ConfigFactory.create_config(secret_path, args)
            if connection_config is None
            else connection_config
        )
        self.working_db = kwargs.pop("working_db", None)
        self.conn = None

    def __enter__(self):
        """Enter the context manager.

        Creates and returns a new database connection.

        Returns:
            Connection: Database connection instance
        """
        cls = self.conn_class_mapping[self.config.connection_type]
        if self.working_db is not None:
            self.config.database = self.working_db
        self.conn = cls(config=self.config)
        return self.conn

    def close(self):
        """Close the database connection.

        Closes the active connection and performs any necessary cleanup.
        """
        self.conn.close()
        if self.config.connection_type == ConnectionKind.NEO4J:
            self.conn.conn.close()

    def __exit__(self, exc_type, exc_value, exc_traceback):
        """Exit the context manager.

        Ensures the connection is properly closed when exiting the context.

        Args:
            exc_type: Exception type if an exception occurred
            exc_value: Exception value if an exception occurred
            exc_traceback: Exception traceback if an exception occurred
        """
        self.close()

__enter__()

Enter the context manager.

Creates and returns a new database connection.

Returns:

Name Type Description
Connection

Database connection instance

Source code in graphcast/db/manager.py
def __enter__(self):
    """Enter the context manager.

    Creates and returns a new database connection.

    Returns:
        Connection: Database connection instance
    """
    cls = self.conn_class_mapping[self.config.connection_type]
    if self.working_db is not None:
        self.config.database = self.working_db
    self.conn = cls(config=self.config)
    return self.conn

__exit__(exc_type, exc_value, exc_traceback)

Exit the context manager.

Ensures the connection is properly closed when exiting the context.

Parameters:

Name Type Description Default
exc_type

Exception type if an exception occurred

required
exc_value

Exception value if an exception occurred

required
exc_traceback

Exception traceback if an exception occurred

required
Source code in graphcast/db/manager.py
def __exit__(self, exc_type, exc_value, exc_traceback):
    """Exit the context manager.

    Ensures the connection is properly closed when exiting the context.

    Args:
        exc_type: Exception type if an exception occurred
        exc_value: Exception value if an exception occurred
        exc_traceback: Exception traceback if an exception occurred
    """
    self.close()

__init__(secret_path=None, args=None, connection_config=None, **kwargs)

Initialize the connection manager.

Parameters:

Name Type Description Default
secret_path

Path to configuration file

None
args

Command line arguments

None
connection_config Optional[ProtoConnectionConfig]

Optional connection configuration

None
**kwargs

Additional configuration parameters

{}
Source code in graphcast/db/manager.py
def __init__(
    self,
    secret_path=None,
    args=None,
    connection_config: Optional[ProtoConnectionConfig] = None,
    **kwargs,
):
    """Initialize the connection manager.

    Args:
        secret_path: Path to configuration file
        args: Command line arguments
        connection_config: Optional connection configuration
        **kwargs: Additional configuration parameters
    """
    self.config: ProtoConnectionConfig = (
        ConfigFactory.create_config(secret_path, args)
        if connection_config is None
        else connection_config
    )
    self.working_db = kwargs.pop("working_db", None)
    self.conn = None

close()

Close the database connection.

Closes the active connection and performs any necessary cleanup.

Source code in graphcast/db/manager.py
def close(self):
    """Close the database connection.

    Closes the active connection and performs any necessary cleanup.
    """
    self.conn.close()
    if self.config.connection_type == ConnectionKind.NEO4J:
        self.conn.conn.close()