Skip to content

graphcast.db.neo4j.conn

Neo4j connection implementation for graph database operations.

This module implements the Connection interface for Neo4j, providing specific functionality for graph operations in Neo4j. It handles: - Node and relationship management - Cypher query execution - Index creation and management - Batch operations - Graph traversal and pattern matching

Key Features
  • Label-based node organization
  • Relationship type management
  • Property indices
  • Cypher query execution
  • Batch node and relationship operations
Example

conn = Neo4jConnection(config) conn.init_db(schema, clean_start=True) conn.upsert_docs_batch(docs, "User", match_keys=["email"])

Neo4jConnection

Bases: Connection

Neo4j-specific implementation of the Connection interface.

This class provides Neo4j-specific implementations for all database operations, including node management, relationship operations, and Cypher query execution. It uses the Neo4j Python driver for all operations.

Attributes:

Name Type Description
flavor

Database flavor identifier (NEO4J)

conn

Neo4j session instance

Source code in graphcast/db/neo4j/conn.py
class Neo4jConnection(Connection):
    """Neo4j-specific implementation of the Connection interface.

    This class provides Neo4j-specific implementations for all database
    operations, including node management, relationship operations, and
    Cypher query execution. It uses the Neo4j Python driver for all operations.

    Attributes:
        flavor: Database flavor identifier (NEO4J)
        conn: Neo4j session instance
    """

    flavor = DBFlavor.NEO4J

    def __init__(self, config: Neo4jConnectionConfig):
        """Initialize Neo4j connection.

        Args:
            config: Neo4j connection configuration containing URL and credentials
        """
        super().__init__()
        driver = GraphDatabase.driver(
            uri=config.url, auth=(config.cred_name, config.cred_pass)
        )
        self.conn = driver.session()

    def execute(self, query, **kwargs):
        """Execute a Cypher query.

        Args:
            query: Cypher query string to execute
            **kwargs: Additional query parameters

        Returns:
            Result: Neo4j query result
        """
        cursor = self.conn.run(query, **kwargs)
        return cursor

    def close(self):
        """Close the Neo4j connection and session."""
        self.conn.close()

    def create_database(self, name: str):
        """Create a new Neo4j database.

        Note: This operation is only supported in Neo4j Enterprise Edition.

        Args:
            name: Name of the database to create
        """
        try:
            self.execute(f"CREATE DATABASE {name}")
        except Exception as e:
            logger.error(f"{e}")

    def delete_database(self, name: str):
        """Delete a Neo4j database.

        Note: This operation is only supported in Neo4j Enterprise Edition.
        As a fallback, it deletes all nodes and relationships.

        Args:
            name: Name of the database to delete
        """
        try:
            self.execute("MATCH (n) DETACH DELETE n")
        except Exception as e:
            logger.error(f"Could not clean database : {e}")

    def define_vertex_indices(self, vertex_config: VertexConfig):
        """Define indices for vertex labels.

        Creates indices for each vertex label based on the configuration.

        Args:
            vertex_config: Vertex configuration containing index definitions
        """
        for c in vertex_config.vertex_set:
            for index_obj in vertex_config.indexes(c):
                self._add_index(c, index_obj)

    def define_edge_indices(self, edges: list[Edge]):
        """Define indices for relationship types.

        Creates indices for each relationship type based on the configuration.

        Args:
            edges: List of edge configurations containing index definitions
        """
        for edge in edges:
            for index_obj in edge.indexes:
                if edge.relation is not None:
                    self._add_index(edge.relation, index_obj, is_vertex_index=False)

    def _add_index(self, obj_name, index: Index, is_vertex_index=True):
        """Add an index to a label or relationship type.

        Args:
            obj_name: Label or relationship type name
            index: Index configuration to create
            is_vertex_index: If True, create index on nodes, otherwise on relationships
        """
        fields_str = ", ".join([f"x.{f}" for f in index.fields])
        fields_str2 = "_".join(index.fields)
        index_name = f"{obj_name}_{fields_str2}"
        if is_vertex_index:
            formula = f"(x:{obj_name})"
        else:
            formula = f"()-[x:{obj_name}]-()"

        q = f"CREATE INDEX {index_name} IF NOT EXISTS FOR {formula} ON ({fields_str});"

        self.execute(q)

    def define_collections(self, schema: Schema):
        """Define collections based on schema.

        Note: This is a no-op in Neo4j as collections are implicit.

        Args:
            schema: Schema containing collection definitions
        """
        pass

    def define_vertex_collections(self, schema: Schema):
        """Define vertex collections based on schema.

        Note: This is a no-op in Neo4j as vertex collections are implicit.

        Args:
            schema: Schema containing vertex definitions
        """
        pass

    def define_edge_collections(self, edges: list[Edge]):
        """Define edge collections based on schema.

        Note: This is a no-op in Neo4j as edge collections are implicit.

        Args:
            edges: List of edge configurations
        """
        pass

    def delete_collections(self, cnames=(), gnames=(), delete_all=False):
        """Delete nodes and relationships from the database.

        Args:
            cnames: Label names to delete nodes for
            gnames: Unused in Neo4j
            delete_all: If True, delete all nodes and relationships
        """
        if cnames:
            for c in cnames:
                q = f"MATCH (n:{c}) DELETE n"
                self.execute(q)
        else:
            q = "MATCH (n) DELETE n"
            self.execute(q)

    def init_db(self, schema: Schema, clean_start):
        """Initialize Neo4j with the given schema.

        Args:
            schema: Schema containing graph structure definitions
            clean_start: If True, delete all existing data before initialization
        """
        if clean_start:
            self.delete_database("")
        self.define_indexes(schema)

    def upsert_docs_batch(self, docs, class_name, match_keys, **kwargs):
        """Upsert a batch of nodes using Cypher.

        Performs an upsert operation on a batch of nodes, using the specified
        match keys to determine whether to update existing nodes or create new ones.

        Args:
            docs: List of node documents to upsert
            class_name: Label to upsert into
            match_keys: Keys to match for upsert operation
            **kwargs: Additional options:
                - dry: If True, don't execute the query
        """
        dry = kwargs.pop("dry", False)

        index_str = ", ".join([f"{k}: row.{k}" for k in match_keys])
        q = f"""
            WITH $batch AS batch 
            UNWIND batch as row 
            MERGE (n:{class_name} {{ {index_str} }}) 
            ON MATCH set n += row 
            ON CREATE set n += row
        """
        if not dry:
            self.execute(q, batch=docs)

    def insert_edges_batch(
        self,
        docs_edges,
        source_class,
        target_class,
        relation_name,
        collection_name=None,
        match_keys_source=("_key",),
        match_keys_target=("_key",),
        filter_uniques=True,
        uniq_weight_fields=None,
        uniq_weight_collections=None,
        upsert_option=False,
        head=None,
        **kwargs,
    ):
        """Insert a batch of relationships using Cypher.

        Creates relationships between source and target nodes, with support for
        property matching and unique constraints.

        Args:
            docs_edges: List of edge documents in format [{__source: source_doc, __target: target_doc}]
            source_class: Source node label
            target_class: Target node label
            relation_name: Relationship type name
            collection_name: Unused in Neo4j
            match_keys_source: Keys to match source nodes
            match_keys_target: Keys to match target nodes
            filter_uniques: Unused in Neo4j
            uniq_weight_fields: Unused in Neo4j
            uniq_weight_collections: Unused in Neo4j
            upsert_option: Unused in Neo4j
            head: Optional limit on number of relationships to insert
            **kwargs: Additional options:
                - dry: If True, don't execute the query
        """
        dry = kwargs.pop("dry", False)

        source_match_str = [
            f"source.{key} = row['__source'].{key}" for key in match_keys_source
        ]
        target_match_str = [
            f"target.{key} = row['__target'].{key}" for key in match_keys_target
        ]

        match_clause = "WHERE " + " AND ".join(source_match_str + target_match_str)

        q = f"""
            WITH $batch AS batch 
            UNWIND batch as row 
            MATCH (source:{source_class}), 
                  (target:{target_class}) {match_clause} 
                        MERGE (source)-[r:{relation_name}]->(target)
        """
        if not dry:
            self.execute(q, batch=docs_edges)

    def insert_return_batch(self, docs, class_name):
        """Insert nodes and return their properties.

        Note: Not implemented in Neo4j.

        Args:
            docs: Documents to insert
            class_name: Label to insert into

        Raises:
            NotImplementedError: This method is not implemented for Neo4j
        """
        raise NotImplementedError()

    def fetch_docs(
        self,
        class_name,
        filters: list | dict | None = None,
        limit: int | None = None,
        return_keys: list | None = None,
        unset_keys: list | None = None,
    ):
        """Fetch nodes from a label.

        Args:
            class_name: Label to fetch from
            filters: Query filters
            limit: Maximum number of nodes to return
            return_keys: Keys to return
            unset_keys: Unused in Neo4j

        Returns:
            list: Fetched nodes
        """
        if filters is not None:
            ff = Expression.from_dict(filters)
            filter_clause = f"WHERE {ff(doc_name='n', kind=DBFlavor.NEO4J)}"
        else:
            filter_clause = ""

        if return_keys is not None:
            keep_clause_ = ", ".join([f".{item}" for item in return_keys])
            keep_clause = f"{{ {keep_clause_} }}"
        else:
            keep_clause = ""

        if limit is not None and isinstance(limit, int):
            limit_clause = f"LIMIT {limit}"
        else:
            limit_clause = ""

        q = (
            f"MATCH (n:{class_name})"
            f"  {filter_clause}"
            f"  RETURN n {keep_clause}"
            f"  {limit_clause}"
        )
        cursor = self.execute(q)
        r = [item["n"] for item in cursor.data()]
        return r

    def fetch_present_documents(
        self,
        batch,
        class_name,
        match_keys,
        keep_keys,
        flatten=False,
        filters: list | dict | None = None,
    ):
        """Fetch nodes that exist in the database.

        Note: Not implemented in Neo4j.

        Args:
            batch: Batch of documents to check
            class_name: Label to check in
            match_keys: Keys to match nodes
            keep_keys: Keys to keep in result
            flatten: Unused in Neo4j
            filters: Additional query filters

        Raises:
            NotImplementedError: This method is not implemented for Neo4j
        """
        raise NotImplementedError

    def aggregate(
        self,
        class_name,
        aggregation_function: AggregationType,
        discriminant: str | None = None,
        aggregated_field: str | None = None,
        filters: list | dict | None = None,
    ):
        """Perform aggregation on nodes.

        Note: Not implemented in Neo4j.

        Args:
            class_name: Label to aggregate
            aggregation_function: Type of aggregation to perform
            discriminant: Field to group by
            aggregated_field: Field to aggregate
            filters: Query filters

        Raises:
            NotImplementedError: This method is not implemented for Neo4j
        """
        raise NotImplementedError

    def keep_absent_documents(
        self,
        batch,
        class_name,
        match_keys,
        keep_keys,
        filters: list | dict | None = None,
    ):
        """Keep nodes that don't exist in the database.

        Note: Not implemented in Neo4j.

        Args:
            batch: Batch of documents to check
            class_name: Label to check in
            match_keys: Keys to match nodes
            keep_keys: Keys to keep in result
            filters: Additional query filters

        Raises:
            NotImplementedError: This method is not implemented for Neo4j
        """
        raise NotImplementedError

__init__(config)

Initialize Neo4j connection.

Parameters:

Name Type Description Default
config Neo4jConnectionConfig

Neo4j connection configuration containing URL and credentials

required
Source code in graphcast/db/neo4j/conn.py
def __init__(self, config: Neo4jConnectionConfig):
    """Initialize Neo4j connection.

    Args:
        config: Neo4j connection configuration containing URL and credentials
    """
    super().__init__()
    driver = GraphDatabase.driver(
        uri=config.url, auth=(config.cred_name, config.cred_pass)
    )
    self.conn = driver.session()

aggregate(class_name, aggregation_function, discriminant=None, aggregated_field=None, filters=None)

Perform aggregation on nodes.

Note: Not implemented in Neo4j.

Parameters:

Name Type Description Default
class_name

Label to aggregate

required
aggregation_function AggregationType

Type of aggregation to perform

required
discriminant str | None

Field to group by

None
aggregated_field str | None

Field to aggregate

None
filters list | dict | None

Query filters

None

Raises:

Type Description
NotImplementedError

This method is not implemented for Neo4j

Source code in graphcast/db/neo4j/conn.py
def aggregate(
    self,
    class_name,
    aggregation_function: AggregationType,
    discriminant: str | None = None,
    aggregated_field: str | None = None,
    filters: list | dict | None = None,
):
    """Perform aggregation on nodes.

    Note: Not implemented in Neo4j.

    Args:
        class_name: Label to aggregate
        aggregation_function: Type of aggregation to perform
        discriminant: Field to group by
        aggregated_field: Field to aggregate
        filters: Query filters

    Raises:
        NotImplementedError: This method is not implemented for Neo4j
    """
    raise NotImplementedError

close()

Close the Neo4j connection and session.

Source code in graphcast/db/neo4j/conn.py
def close(self):
    """Close the Neo4j connection and session."""
    self.conn.close()

create_database(name)

Create a new Neo4j database.

Note: This operation is only supported in Neo4j Enterprise Edition.

Parameters:

Name Type Description Default
name str

Name of the database to create

required
Source code in graphcast/db/neo4j/conn.py
def create_database(self, name: str):
    """Create a new Neo4j database.

    Note: This operation is only supported in Neo4j Enterprise Edition.

    Args:
        name: Name of the database to create
    """
    try:
        self.execute(f"CREATE DATABASE {name}")
    except Exception as e:
        logger.error(f"{e}")

define_collections(schema)

Define collections based on schema.

Note: This is a no-op in Neo4j as collections are implicit.

Parameters:

Name Type Description Default
schema Schema

Schema containing collection definitions

required
Source code in graphcast/db/neo4j/conn.py
def define_collections(self, schema: Schema):
    """Define collections based on schema.

    Note: This is a no-op in Neo4j as collections are implicit.

    Args:
        schema: Schema containing collection definitions
    """
    pass

define_edge_collections(edges)

Define edge collections based on schema.

Note: This is a no-op in Neo4j as edge collections are implicit.

Parameters:

Name Type Description Default
edges list[Edge]

List of edge configurations

required
Source code in graphcast/db/neo4j/conn.py
def define_edge_collections(self, edges: list[Edge]):
    """Define edge collections based on schema.

    Note: This is a no-op in Neo4j as edge collections are implicit.

    Args:
        edges: List of edge configurations
    """
    pass

define_edge_indices(edges)

Define indices for relationship types.

Creates indices for each relationship type based on the configuration.

Parameters:

Name Type Description Default
edges list[Edge]

List of edge configurations containing index definitions

required
Source code in graphcast/db/neo4j/conn.py
def define_edge_indices(self, edges: list[Edge]):
    """Define indices for relationship types.

    Creates indices for each relationship type based on the configuration.

    Args:
        edges: List of edge configurations containing index definitions
    """
    for edge in edges:
        for index_obj in edge.indexes:
            if edge.relation is not None:
                self._add_index(edge.relation, index_obj, is_vertex_index=False)

define_vertex_collections(schema)

Define vertex collections based on schema.

Note: This is a no-op in Neo4j as vertex collections are implicit.

Parameters:

Name Type Description Default
schema Schema

Schema containing vertex definitions

required
Source code in graphcast/db/neo4j/conn.py
def define_vertex_collections(self, schema: Schema):
    """Define vertex collections based on schema.

    Note: This is a no-op in Neo4j as vertex collections are implicit.

    Args:
        schema: Schema containing vertex definitions
    """
    pass

define_vertex_indices(vertex_config)

Define indices for vertex labels.

Creates indices for each vertex label based on the configuration.

Parameters:

Name Type Description Default
vertex_config VertexConfig

Vertex configuration containing index definitions

required
Source code in graphcast/db/neo4j/conn.py
def define_vertex_indices(self, vertex_config: VertexConfig):
    """Define indices for vertex labels.

    Creates indices for each vertex label based on the configuration.

    Args:
        vertex_config: Vertex configuration containing index definitions
    """
    for c in vertex_config.vertex_set:
        for index_obj in vertex_config.indexes(c):
            self._add_index(c, index_obj)

delete_collections(cnames=(), gnames=(), delete_all=False)

Delete nodes and relationships from the database.

Parameters:

Name Type Description Default
cnames

Label names to delete nodes for

()
gnames

Unused in Neo4j

()
delete_all

If True, delete all nodes and relationships

False
Source code in graphcast/db/neo4j/conn.py
def delete_collections(self, cnames=(), gnames=(), delete_all=False):
    """Delete nodes and relationships from the database.

    Args:
        cnames: Label names to delete nodes for
        gnames: Unused in Neo4j
        delete_all: If True, delete all nodes and relationships
    """
    if cnames:
        for c in cnames:
            q = f"MATCH (n:{c}) DELETE n"
            self.execute(q)
    else:
        q = "MATCH (n) DELETE n"
        self.execute(q)

delete_database(name)

Delete a Neo4j database.

Note: This operation is only supported in Neo4j Enterprise Edition. As a fallback, it deletes all nodes and relationships.

Parameters:

Name Type Description Default
name str

Name of the database to delete

required
Source code in graphcast/db/neo4j/conn.py
def delete_database(self, name: str):
    """Delete a Neo4j database.

    Note: This operation is only supported in Neo4j Enterprise Edition.
    As a fallback, it deletes all nodes and relationships.

    Args:
        name: Name of the database to delete
    """
    try:
        self.execute("MATCH (n) DETACH DELETE n")
    except Exception as e:
        logger.error(f"Could not clean database : {e}")

execute(query, **kwargs)

Execute a Cypher query.

Parameters:

Name Type Description Default
query

Cypher query string to execute

required
**kwargs

Additional query parameters

{}

Returns:

Name Type Description
Result

Neo4j query result

Source code in graphcast/db/neo4j/conn.py
def execute(self, query, **kwargs):
    """Execute a Cypher query.

    Args:
        query: Cypher query string to execute
        **kwargs: Additional query parameters

    Returns:
        Result: Neo4j query result
    """
    cursor = self.conn.run(query, **kwargs)
    return cursor

fetch_docs(class_name, filters=None, limit=None, return_keys=None, unset_keys=None)

Fetch nodes from a label.

Parameters:

Name Type Description Default
class_name

Label to fetch from

required
filters list | dict | None

Query filters

None
limit int | None

Maximum number of nodes to return

None
return_keys list | None

Keys to return

None
unset_keys list | None

Unused in Neo4j

None

Returns:

Name Type Description
list

Fetched nodes

Source code in graphcast/db/neo4j/conn.py
def fetch_docs(
    self,
    class_name,
    filters: list | dict | None = None,
    limit: int | None = None,
    return_keys: list | None = None,
    unset_keys: list | None = None,
):
    """Fetch nodes from a label.

    Args:
        class_name: Label to fetch from
        filters: Query filters
        limit: Maximum number of nodes to return
        return_keys: Keys to return
        unset_keys: Unused in Neo4j

    Returns:
        list: Fetched nodes
    """
    if filters is not None:
        ff = Expression.from_dict(filters)
        filter_clause = f"WHERE {ff(doc_name='n', kind=DBFlavor.NEO4J)}"
    else:
        filter_clause = ""

    if return_keys is not None:
        keep_clause_ = ", ".join([f".{item}" for item in return_keys])
        keep_clause = f"{{ {keep_clause_} }}"
    else:
        keep_clause = ""

    if limit is not None and isinstance(limit, int):
        limit_clause = f"LIMIT {limit}"
    else:
        limit_clause = ""

    q = (
        f"MATCH (n:{class_name})"
        f"  {filter_clause}"
        f"  RETURN n {keep_clause}"
        f"  {limit_clause}"
    )
    cursor = self.execute(q)
    r = [item["n"] for item in cursor.data()]
    return r

fetch_present_documents(batch, class_name, match_keys, keep_keys, flatten=False, filters=None)

Fetch nodes that exist in the database.

Note: Not implemented in Neo4j.

Parameters:

Name Type Description Default
batch

Batch of documents to check

required
class_name

Label to check in

required
match_keys

Keys to match nodes

required
keep_keys

Keys to keep in result

required
flatten

Unused in Neo4j

False
filters list | dict | None

Additional query filters

None

Raises:

Type Description
NotImplementedError

This method is not implemented for Neo4j

Source code in graphcast/db/neo4j/conn.py
def fetch_present_documents(
    self,
    batch,
    class_name,
    match_keys,
    keep_keys,
    flatten=False,
    filters: list | dict | None = None,
):
    """Fetch nodes that exist in the database.

    Note: Not implemented in Neo4j.

    Args:
        batch: Batch of documents to check
        class_name: Label to check in
        match_keys: Keys to match nodes
        keep_keys: Keys to keep in result
        flatten: Unused in Neo4j
        filters: Additional query filters

    Raises:
        NotImplementedError: This method is not implemented for Neo4j
    """
    raise NotImplementedError

init_db(schema, clean_start)

Initialize Neo4j with the given schema.

Parameters:

Name Type Description Default
schema Schema

Schema containing graph structure definitions

required
clean_start

If True, delete all existing data before initialization

required
Source code in graphcast/db/neo4j/conn.py
def init_db(self, schema: Schema, clean_start):
    """Initialize Neo4j with the given schema.

    Args:
        schema: Schema containing graph structure definitions
        clean_start: If True, delete all existing data before initialization
    """
    if clean_start:
        self.delete_database("")
    self.define_indexes(schema)

insert_edges_batch(docs_edges, source_class, target_class, relation_name, collection_name=None, match_keys_source=('_key',), match_keys_target=('_key',), filter_uniques=True, uniq_weight_fields=None, uniq_weight_collections=None, upsert_option=False, head=None, **kwargs)

Insert a batch of relationships using Cypher.

Creates relationships between source and target nodes, with support for property matching and unique constraints.

Parameters:

Name Type Description Default
docs_edges

List of edge documents in format [{__source: source_doc, __target: target_doc}]

required
source_class

Source node label

required
target_class

Target node label

required
relation_name

Relationship type name

required
collection_name

Unused in Neo4j

None
match_keys_source

Keys to match source nodes

('_key',)
match_keys_target

Keys to match target nodes

('_key',)
filter_uniques

Unused in Neo4j

True
uniq_weight_fields

Unused in Neo4j

None
uniq_weight_collections

Unused in Neo4j

None
upsert_option

Unused in Neo4j

False
head

Optional limit on number of relationships to insert

None
**kwargs

Additional options: - dry: If True, don't execute the query

{}
Source code in graphcast/db/neo4j/conn.py
def insert_edges_batch(
    self,
    docs_edges,
    source_class,
    target_class,
    relation_name,
    collection_name=None,
    match_keys_source=("_key",),
    match_keys_target=("_key",),
    filter_uniques=True,
    uniq_weight_fields=None,
    uniq_weight_collections=None,
    upsert_option=False,
    head=None,
    **kwargs,
):
    """Insert a batch of relationships using Cypher.

    Creates relationships between source and target nodes, with support for
    property matching and unique constraints.

    Args:
        docs_edges: List of edge documents in format [{__source: source_doc, __target: target_doc}]
        source_class: Source node label
        target_class: Target node label
        relation_name: Relationship type name
        collection_name: Unused in Neo4j
        match_keys_source: Keys to match source nodes
        match_keys_target: Keys to match target nodes
        filter_uniques: Unused in Neo4j
        uniq_weight_fields: Unused in Neo4j
        uniq_weight_collections: Unused in Neo4j
        upsert_option: Unused in Neo4j
        head: Optional limit on number of relationships to insert
        **kwargs: Additional options:
            - dry: If True, don't execute the query
    """
    dry = kwargs.pop("dry", False)

    source_match_str = [
        f"source.{key} = row['__source'].{key}" for key in match_keys_source
    ]
    target_match_str = [
        f"target.{key} = row['__target'].{key}" for key in match_keys_target
    ]

    match_clause = "WHERE " + " AND ".join(source_match_str + target_match_str)

    q = f"""
        WITH $batch AS batch 
        UNWIND batch as row 
        MATCH (source:{source_class}), 
              (target:{target_class}) {match_clause} 
                    MERGE (source)-[r:{relation_name}]->(target)
    """
    if not dry:
        self.execute(q, batch=docs_edges)

insert_return_batch(docs, class_name)

Insert nodes and return their properties.

Note: Not implemented in Neo4j.

Parameters:

Name Type Description Default
docs

Documents to insert

required
class_name

Label to insert into

required

Raises:

Type Description
NotImplementedError

This method is not implemented for Neo4j

Source code in graphcast/db/neo4j/conn.py
def insert_return_batch(self, docs, class_name):
    """Insert nodes and return their properties.

    Note: Not implemented in Neo4j.

    Args:
        docs: Documents to insert
        class_name: Label to insert into

    Raises:
        NotImplementedError: This method is not implemented for Neo4j
    """
    raise NotImplementedError()

keep_absent_documents(batch, class_name, match_keys, keep_keys, filters=None)

Keep nodes that don't exist in the database.

Note: Not implemented in Neo4j.

Parameters:

Name Type Description Default
batch

Batch of documents to check

required
class_name

Label to check in

required
match_keys

Keys to match nodes

required
keep_keys

Keys to keep in result

required
filters list | dict | None

Additional query filters

None

Raises:

Type Description
NotImplementedError

This method is not implemented for Neo4j

Source code in graphcast/db/neo4j/conn.py
def keep_absent_documents(
    self,
    batch,
    class_name,
    match_keys,
    keep_keys,
    filters: list | dict | None = None,
):
    """Keep nodes that don't exist in the database.

    Note: Not implemented in Neo4j.

    Args:
        batch: Batch of documents to check
        class_name: Label to check in
        match_keys: Keys to match nodes
        keep_keys: Keys to keep in result
        filters: Additional query filters

    Raises:
        NotImplementedError: This method is not implemented for Neo4j
    """
    raise NotImplementedError

upsert_docs_batch(docs, class_name, match_keys, **kwargs)

Upsert a batch of nodes using Cypher.

Performs an upsert operation on a batch of nodes, using the specified match keys to determine whether to update existing nodes or create new ones.

Parameters:

Name Type Description Default
docs

List of node documents to upsert

required
class_name

Label to upsert into

required
match_keys

Keys to match for upsert operation

required
**kwargs

Additional options: - dry: If True, don't execute the query

{}
Source code in graphcast/db/neo4j/conn.py
def upsert_docs_batch(self, docs, class_name, match_keys, **kwargs):
    """Upsert a batch of nodes using Cypher.

    Performs an upsert operation on a batch of nodes, using the specified
    match keys to determine whether to update existing nodes or create new ones.

    Args:
        docs: List of node documents to upsert
        class_name: Label to upsert into
        match_keys: Keys to match for upsert operation
        **kwargs: Additional options:
            - dry: If True, don't execute the query
    """
    dry = kwargs.pop("dry", False)

    index_str = ", ".join([f"{k}: row.{k}" for k in match_keys])
    q = f"""
        WITH $batch AS batch 
        UNWIND batch as row 
        MERGE (n:{class_name} {{ {index_str} }}) 
        ON MATCH set n += row 
        ON CREATE set n += row
    """
    if not dry:
        self.execute(q, batch=docs)