Skip to content

ontocast.tool.triple_manager

Triple store management package for OntoCast.

This package provides a unified interface for managing RDF triple stores across different backends. It includes abstract base classes and concrete implementations for various triple store technologies.

The package supports: - Abstract interfaces for triple store operations - Neo4j implementation using the n10s plugin - Fuseki implementation using Apache Fuseki - Filesystem implementation for local storage

All implementations support: - Fetching and storing ontologies - Serializing and retrieving facts - Authentication and connection management - Error handling and logging

Example

from ontocast.tool.triple_manager import Neo4jTripleStoreManager manager = Neo4jTripleStoreManager(uri="bolt://localhost:7687") ontologies = manager.fetch_ontologies()

FilesystemTripleStoreManager

Bases: TripleStoreManager

Filesystem-based implementation of triple store management.

This class provides a concrete implementation of triple store management using the local filesystem for storage. It reads and writes ontologies and facts as Turtle (.ttl) files in specified directories.

The manager supports: - Loading ontologies from a dedicated ontology directory - Storing ontologies with versioned filenames - Storing facts with customizable filenames based on specifications - Error handling for file operations

Attributes:

Name Type Description
working_directory Optional[Path]

Path to the working directory for storing data.

ontology_path Optional[Path]

Optional path to the ontology directory for loading ontologies.

Source code in ontocast/tool/triple_manager/filesystem_manager.py
class FilesystemTripleStoreManager(TripleStoreManager):
    """Filesystem-based implementation of triple store management.

    This class provides a concrete implementation of triple store management
    using the local filesystem for storage. It reads and writes ontologies
    and facts as Turtle (.ttl) files in specified directories.

    The manager supports:
    - Loading ontologies from a dedicated ontology directory
    - Storing ontologies with versioned filenames
    - Storing facts with customizable filenames based on specifications
    - Error handling for file operations

    Attributes:
        working_directory: Path to the working directory for storing data.
        ontology_path: Optional path to the ontology directory for loading ontologies.
    """

    working_directory: Optional[pathlib.Path]
    ontology_path: Optional[pathlib.Path]

    def __init__(self, **kwargs):
        """Initialize the filesystem triple store manager.

        This method sets up the filesystem manager with the specified
        working and ontology directories.

        Args:
            **kwargs: Additional keyword arguments passed to the parent class.
                working_directory: Path to the working directory for storing data.
                ontology_path: Path to the ontology directory for loading ontologies.

        Example:
            >>> manager = FilesystemTripleStoreManager(
            ...     working_directory="/path/to/work",
            ...     ontology_path="/path/to/ontologies"
            ... )
        """
        super().__init__(**kwargs)

    def fetch_ontologies(self) -> list[Ontology]:
        """Fetch all available ontologies from the filesystem.

        This method scans the ontology directory for Turtle (.ttl) files
        and loads each one as an Ontology object. Files are processed
        in sorted order for consistent results.

        Returns:
            list[Ontology]: List of all ontologies found in the ontology directory.

        Example:
            >>> ontologies = manager.fetch_ontologies()
            >>> for onto in ontologies:
            ...     print(f"Loaded ontology: {onto.ontology_id}")
        """
        ontologies = []
        if self.ontology_path is not None:
            sorted_files = sorted(self.ontology_path.glob("*.ttl"))
            for fname in sorted_files:
                try:
                    ontology = Ontology.from_file(fname)
                    ontologies.append(ontology)
                    logger.debug(f"Successfully loaded ontology from {fname}")
                except Exception as e:
                    logger.error(f"Failed to load ontology {fname}: {str(e)}")
        return ontologies

    def serialize_ontology(self, o: Ontology, **kwargs):
        """Store an ontology in the filesystem.

        This method stores the given ontology as a Turtle file in the
        working directory. The filename is generated using the ontology
        ID and version to ensure uniqueness.

        Args:
            o: The ontology to store.
            **kwargs: Additional keyword arguments for serialization (not used).

        Example:
            >>> ontology = Ontology(ontology_id="test", version="1.0", graph=graph)
            >>> manager.serialize_ontology(ontology)
            # Creates: working_directory/ontology_test_1.0.ttl
        """
        if self.working_directory is not None:
            fname = f"ontology_{o.ontology_id}_{o.version}"
            output_path = self.working_directory / f"{fname}.ttl"
            o.graph.serialize(format="turtle", destination=output_path)
            logger.info(f"Ontology saved to {output_path}")

    def serialize_facts(self, g: Graph, **kwargs):
        """Store a graph with facts in the filesystem.

        This method stores the given RDF graph containing facts as a
        Turtle file in the working directory. The filename can be
        customized using the spec parameter.

        Args:
            g: The RDF graph containing facts to store.
            **kwargs: Additional keyword arguments for serialization.
                spec: Optional specification for the filename. If provided as a string,
                      it will be processed to create a meaningful filename.

        Raises:
            TypeError: If spec is provided but not a string.

        Example:
            >>> facts = RDFGraph()
            >>> manager.serialize_facts(facts, spec="domain/subdomain")
            # Creates: working_directory/facts_domain_subdomain.ttl
        """
        spec = kwargs.pop("spec", None)
        if spec is None:
            fname = "current.ttl"
        elif isinstance(spec, str):
            s = spec.split("/")[-2:]
            s = "_".join([x for x in s if x])
            fname = f"facts_{s}.ttl"
        else:
            raise TypeError(f"string expected for spec {spec}")

        if self.working_directory is not None:
            filename = self.working_directory / fname
            g.serialize(format="turtle", destination=filename)
            logger.info(f"Facts saved to {filename}")

__init__(**kwargs)

Initialize the filesystem triple store manager.

This method sets up the filesystem manager with the specified working and ontology directories.

Parameters:

Name Type Description Default
**kwargs

Additional keyword arguments passed to the parent class. working_directory: Path to the working directory for storing data. ontology_path: Path to the ontology directory for loading ontologies.

{}
Example

manager = FilesystemTripleStoreManager( ... working_directory="/path/to/work", ... ontology_path="/path/to/ontologies" ... )

Source code in ontocast/tool/triple_manager/filesystem_manager.py
def __init__(self, **kwargs):
    """Initialize the filesystem triple store manager.

    This method sets up the filesystem manager with the specified
    working and ontology directories.

    Args:
        **kwargs: Additional keyword arguments passed to the parent class.
            working_directory: Path to the working directory for storing data.
            ontology_path: Path to the ontology directory for loading ontologies.

    Example:
        >>> manager = FilesystemTripleStoreManager(
        ...     working_directory="/path/to/work",
        ...     ontology_path="/path/to/ontologies"
        ... )
    """
    super().__init__(**kwargs)

fetch_ontologies()

Fetch all available ontologies from the filesystem.

This method scans the ontology directory for Turtle (.ttl) files and loads each one as an Ontology object. Files are processed in sorted order for consistent results.

Returns:

Type Description
list[Ontology]

list[Ontology]: List of all ontologies found in the ontology directory.

Example

ontologies = manager.fetch_ontologies() for onto in ontologies: ... print(f"Loaded ontology: {onto.ontology_id}")

Source code in ontocast/tool/triple_manager/filesystem_manager.py
def fetch_ontologies(self) -> list[Ontology]:
    """Fetch all available ontologies from the filesystem.

    This method scans the ontology directory for Turtle (.ttl) files
    and loads each one as an Ontology object. Files are processed
    in sorted order for consistent results.

    Returns:
        list[Ontology]: List of all ontologies found in the ontology directory.

    Example:
        >>> ontologies = manager.fetch_ontologies()
        >>> for onto in ontologies:
        ...     print(f"Loaded ontology: {onto.ontology_id}")
    """
    ontologies = []
    if self.ontology_path is not None:
        sorted_files = sorted(self.ontology_path.glob("*.ttl"))
        for fname in sorted_files:
            try:
                ontology = Ontology.from_file(fname)
                ontologies.append(ontology)
                logger.debug(f"Successfully loaded ontology from {fname}")
            except Exception as e:
                logger.error(f"Failed to load ontology {fname}: {str(e)}")
    return ontologies

serialize_facts(g, **kwargs)

Store a graph with facts in the filesystem.

This method stores the given RDF graph containing facts as a Turtle file in the working directory. The filename can be customized using the spec parameter.

Parameters:

Name Type Description Default
g Graph

The RDF graph containing facts to store.

required
**kwargs

Additional keyword arguments for serialization. spec: Optional specification for the filename. If provided as a string, it will be processed to create a meaningful filename.

{}

Raises:

Type Description
TypeError

If spec is provided but not a string.

Example

facts = RDFGraph() manager.serialize_facts(facts, spec="domain/subdomain")

Creates: working_directory/facts_domain_subdomain.ttl

Source code in ontocast/tool/triple_manager/filesystem_manager.py
def serialize_facts(self, g: Graph, **kwargs):
    """Store a graph with facts in the filesystem.

    This method stores the given RDF graph containing facts as a
    Turtle file in the working directory. The filename can be
    customized using the spec parameter.

    Args:
        g: The RDF graph containing facts to store.
        **kwargs: Additional keyword arguments for serialization.
            spec: Optional specification for the filename. If provided as a string,
                  it will be processed to create a meaningful filename.

    Raises:
        TypeError: If spec is provided but not a string.

    Example:
        >>> facts = RDFGraph()
        >>> manager.serialize_facts(facts, spec="domain/subdomain")
        # Creates: working_directory/facts_domain_subdomain.ttl
    """
    spec = kwargs.pop("spec", None)
    if spec is None:
        fname = "current.ttl"
    elif isinstance(spec, str):
        s = spec.split("/")[-2:]
        s = "_".join([x for x in s if x])
        fname = f"facts_{s}.ttl"
    else:
        raise TypeError(f"string expected for spec {spec}")

    if self.working_directory is not None:
        filename = self.working_directory / fname
        g.serialize(format="turtle", destination=filename)
        logger.info(f"Facts saved to {filename}")

serialize_ontology(o, **kwargs)

Store an ontology in the filesystem.

This method stores the given ontology as a Turtle file in the working directory. The filename is generated using the ontology ID and version to ensure uniqueness.

Parameters:

Name Type Description Default
o Ontology

The ontology to store.

required
**kwargs

Additional keyword arguments for serialization (not used).

{}
Example

ontology = Ontology(ontology_id="test", version="1.0", graph=graph) manager.serialize_ontology(ontology)

Creates: working_directory/ontology_test_1.0.ttl

Source code in ontocast/tool/triple_manager/filesystem_manager.py
def serialize_ontology(self, o: Ontology, **kwargs):
    """Store an ontology in the filesystem.

    This method stores the given ontology as a Turtle file in the
    working directory. The filename is generated using the ontology
    ID and version to ensure uniqueness.

    Args:
        o: The ontology to store.
        **kwargs: Additional keyword arguments for serialization (not used).

    Example:
        >>> ontology = Ontology(ontology_id="test", version="1.0", graph=graph)
        >>> manager.serialize_ontology(ontology)
        # Creates: working_directory/ontology_test_1.0.ttl
    """
    if self.working_directory is not None:
        fname = f"ontology_{o.ontology_id}_{o.version}"
        output_path = self.working_directory / f"{fname}.ttl"
        o.graph.serialize(format="turtle", destination=output_path)
        logger.info(f"Ontology saved to {output_path}")

FusekiTripleStoreManager

Bases: TripleStoreManagerWithAuth

Fuseki-based triple store manager.

This class provides a concrete implementation of triple store management using Apache Fuseki. It stores ontologies as named graphs using their URIs as graph names, and supports dataset creation and cleanup.

The manager uses Fuseki's REST API for all operations, including: - Dataset creation and management - Named graph operations for ontologies - SPARQL queries for ontology discovery - Graph-level data operations

Attributes:

Name Type Description
dataset Optional[str]

The Fuseki dataset name to use for storage.

clean

Whether to clean the dataset on initialization.

Source code in ontocast/tool/triple_manager/fuseki.py
class FusekiTripleStoreManager(TripleStoreManagerWithAuth):
    """Fuseki-based triple store manager.

    This class provides a concrete implementation of triple store management
    using Apache Fuseki. It stores ontologies as named graphs using their
    URIs as graph names, and supports dataset creation and cleanup.

    The manager uses Fuseki's REST API for all operations, including:
    - Dataset creation and management
    - Named graph operations for ontologies
    - SPARQL queries for ontology discovery
    - Graph-level data operations

    Attributes:
        dataset: The Fuseki dataset name to use for storage.
        clean: Whether to clean the dataset on initialization.
    """

    dataset: Optional[str] = Field(default=None, description="Fuseki dataset name")

    def __init__(self, uri=None, auth=None, dataset=None, clean=False, **kwargs):
        """Initialize the Fuseki triple store manager.

        This method sets up the connection to Fuseki, creates the dataset
        if it doesn't exist, and optionally cleans all data from the dataset.

        Args:
            uri: Fuseki server URI (e.g., "http://localhost:3030").
            auth: Authentication tuple (username, password) or string in "user/password" format.
            dataset: Dataset name to use for storage.
            clean: If True, delete all data from the dataset on initialization.
            **kwargs: Additional keyword arguments passed to the parent class.

        Raises:
            ValueError: If dataset is not specified in URI or as argument.

        Example:
            >>> manager = FusekiTripleStoreManager(
            ...     uri="http://localhost:3030",
            ...     dataset="test",
            ...     clean=True
            ... )
        """
        super().__init__(
            uri=uri, auth=auth, env_uri="FUSEKI_URI", env_auth="FUSEKI_AUTH", **kwargs
        )
        self.dataset = dataset
        self.clean = clean
        self.init_dataset(self.dataset)
        if self.dataset is None:
            raise ValueError("Dataset must be specified in FUSEKI_URI or as argument")

        # Clean dataset if requested
        if self.clean:
            self._clean_dataset()

    def _clean_dataset(self):
        """Delete all data from the dataset.

        This method removes all named graphs and clears the default graph
        from the Fuseki dataset. It uses Fuseki's REST API to perform
        the cleanup operations.

        The method handles errors gracefully and logs the results of
        each cleanup operation.
        """
        try:
            # Delete all graphs in the dataset
            sparql_url = f"{self._get_dataset_url()}/sparql"
            query = """
            SELECT DISTINCT ?g WHERE {
              GRAPH ?g { ?s ?p ?o }
            }
            """
            response = requests.post(
                sparql_url,
                data={"query": query, "format": "application/sparql-results+json"},
                auth=self.auth,
            )

            if response.status_code == 200:
                results = response.json()
                for binding in results.get("results", {}).get("bindings", []):
                    graph_uri = binding["g"]["value"]
                    # Delete the named graph
                    delete_url = f"{self._get_dataset_url()}/data?graph={graph_uri}"
                    delete_response = requests.delete(delete_url, auth=self.auth)
                    if delete_response.status_code in (200, 204):
                        logger.debug(f"Deleted named graph: {graph_uri}")
                    else:
                        logger.warning(
                            f"Failed to delete graph {graph_uri}: {delete_response.status_code}"
                        )

            # Clear the default graph
            clear_url = f"{self._get_dataset_url()}/data"
            clear_response = requests.delete(clear_url, auth=self.auth)
            if clear_response.status_code in (200, 204):
                logger.debug("Cleared default graph")
            else:
                logger.warning(
                    f"Failed to clear default graph: {clear_response.status_code}"
                )

            logger.info(f"Fuseki dataset '{self.dataset}' cleaned (all data deleted)")

        except Exception as e:
            logger.warning(f"Fuseki cleanup failed: {e}")

    def init_dataset(self, dataset_name):
        """Initialize a Fuseki dataset.

        This method creates a new dataset in Fuseki if it doesn't already exist.
        It uses Fuseki's admin API to create the dataset with TDB2 storage.

        Args:
            dataset_name: Name of the dataset to create.

        Note:
            This method will not fail if the dataset already exists.
        """
        fuseki_admin_url = f"{self.uri}/$/datasets"

        payload = {"dbName": dataset_name, "dbType": "tdb2"}

        headers = {"Content-Type": "application/x-www-form-urlencoded"}

        response = requests.post(
            fuseki_admin_url, data=payload, headers=headers, auth=self.auth
        )

        if response.status_code == 200 or response.status_code == 201:
            logger.info(f"Dataset '{dataset_name}' created successfully.")
        else:
            logger.error(f"Failed to upload data. Status code: {response.status_code}")
            logger.error(f"Response: {response.text}")

    def _parse_dataset_from_uri(self, uri: str) -> Optional[str]:
        """Extract dataset name from a Fuseki URI.

        This method parses a Fuseki URI to extract the dataset name.
        It expects URIs in the format "http://host:port/dataset".

        Args:
            uri: The Fuseki URI to parse.

        Returns:
            Optional[str]: The dataset name if found, None otherwise.

        Example:
            >>> manager._parse_dataset_from_uri("http://localhost:3030/test")
            "test"
        """
        parts = uri.rstrip("/").split("/")
        if len(parts) > 0:
            return parts[-1]
        return None

    def _get_dataset_url(self):
        """Get the full URL for the dataset.

        Returns:
            str: The complete URL for the dataset endpoint.
        """
        return f"{self.uri}/{self.dataset}"

    def fetch_ontologies(self) -> list[Ontology]:
        """Fetch all ontologies from their corresponding named graphs.

        This method discovers all ontologies in the Fuseki dataset and
        fetches each one from its corresponding named graph. It uses
        a two-step process:

        1. Discovery: Query for all ontology URIs using SPARQL
        2. Fetching: Retrieve each ontology from its named graph

        The method handles both named graphs and the default graph,
        and verifies that each ontology is properly typed as owl:Ontology.

        Returns:
            list[Ontology]: List of all ontologies found in the dataset.

        Example:
            >>> ontologies = manager.fetch_ontologies()
            >>> for onto in ontologies:
            ...     print(f"Found ontology: {onto.iri}")
        """
        sparql_url = f"{self._get_dataset_url()}/sparql"

        # Step 1: List all ontology URIs from all graphs
        list_query = """
        SELECT DISTINCT ?s WHERE {
          { GRAPH ?g { ?s a <http://www.w3.org/2002/07/owl#Ontology> } }
          UNION
          { ?s a <http://www.w3.org/2002/07/owl#Ontology> }
        }
        """
        response = requests.post(
            sparql_url,
            data={"query": list_query, "format": "application/sparql-results+json"},
            auth=self.auth,
        )
        if response.status_code != 200:
            logger.error(f"Failed to list ontologies from Fuseki: {response.text}")
            return []

        results = response.json()
        ontology_iris = []
        for binding in results.get("results", {}).get("bindings", []):
            onto_iri = binding["s"]["value"]
            ontology_iris.append(onto_iri)

        logger.debug(f"Found {len(ontology_iris)} ontology URIs: {ontology_iris}")

        # Step 2: Fetch each ontology from its corresponding named graph
        ontologies = []
        for onto_iri in ontology_iris:
            # Fetch the ontology from its corresponding named graph
            graph = RDFGraph()
            export_url = f"{self._get_dataset_url()}/get?graph={onto_iri}"
            export_resp = requests.get(
                export_url, auth=self.auth, headers={"Accept": "text/turtle"}
            )

            if export_resp.status_code == 200:
                graph.parse(data=export_resp.text, format="turtle")
                # Verify the ontology is actually in this graph
                onto_iri_ref = URIRef(onto_iri)
                if (onto_iri_ref, RDF.type, OWL.Ontology) in graph:
                    ontology_id = derive_ontology_id(onto_iri)
                    ontologies.append(
                        Ontology(
                            graph=graph,
                            iri=onto_iri,
                            ontology_id=ontology_id,
                        )
                    )
                    logger.debug(f"Successfully loaded ontology: {onto_iri}")
                else:
                    logger.warning(f"Ontology {onto_iri} not found in its named graph")
            else:
                logger.warning(
                    f"Failed to fetch ontology graph {onto_iri}: {export_resp.status_code}"
                )

        logger.info(f"Successfully loaded {len(ontologies)} ontologies from Fuseki")
        return ontologies

    def serialize_ontology(self, o: Ontology, **kwargs):
        """Store an ontology as a named graph in Fuseki.

        This method stores the given ontology as a named graph in Fuseki,
        using the ontology's IRI as the graph name. This ensures that
        each ontology is stored separately and can be retrieved individually.

        Args:
            o: The ontology to store.
            **kwargs: Additional keyword arguments (not used).

        Returns:
            bool: True if the ontology was successfully stored, False otherwise.

        Example:
            >>> ontology = Ontology(iri="http://example.org/onto", graph=graph)
            >>> success = manager.serialize_ontology(ontology)
        """
        turtle_data = o.graph.serialize(format="turtle")
        graph_uri = o.iri or f"urn:ontology:{o.ontology_id}"
        url = f"{self._get_dataset_url()}/data?graph={graph_uri}"
        headers = {"Content-Type": "text/turtle;charset=utf-8"}
        response = requests.put(url, headers=headers, data=turtle_data, auth=self.auth)
        if response.status_code in (200, 201, 204):
            logger.info(f"Ontology {graph_uri} uploaded to Fuseki as named graph.")
            return True
        else:
            logger.error(
                f"Failed to upload ontology {graph_uri}. Status code: {response.status_code}"
            )
            logger.error(f"Response: {response.text}")
            return False

    def serialize_facts(self, g: Graph, **kwargs):
        """Store facts (RDF graph) as a named graph in Fuseki.

        This method stores the given RDF graph containing facts as a named
        graph in Fuseki. The graph name is taken from the chunk_uri parameter
        or defaults to "urn:chunk:default".

        Args:
            g: The RDF graph containing facts to store.
            **kwargs: Additional keyword arguments.
                chunk_uri: URI to use as the named graph name (optional).

        Returns:
            bool: True if the facts were successfully stored, False otherwise.

        Example:
            >>> facts = RDFGraph()
            >>> success = manager.serialize_facts(facts, chunk_uri="http://example.org/chunk1")
        """
        turtle_data = g.serialize(format="turtle")
        # Use chunk URI from kwargs or generate a default one
        chunk_uri = kwargs.get("chunk_uri", "urn:chunk:default")
        url = f"{self._get_dataset_url()}/data?graph={chunk_uri}"
        headers = {"Content-Type": "text/turtle;charset=utf-8"}
        response = requests.put(url, headers=headers, data=turtle_data, auth=self.auth)
        if response.status_code in (200, 201, 204):
            logger.info(f"Facts uploaded to Fuseki as named graph: {chunk_uri}")
            return True
        else:
            logger.error(f"Failed to upload facts. Status code: {response.status_code}")
            logger.error(f"Response: {response.text}")
            return False

__init__(uri=None, auth=None, dataset=None, clean=False, **kwargs)

Initialize the Fuseki triple store manager.

This method sets up the connection to Fuseki, creates the dataset if it doesn't exist, and optionally cleans all data from the dataset.

Parameters:

Name Type Description Default
uri

Fuseki server URI (e.g., "http://localhost:3030").

None
auth

Authentication tuple (username, password) or string in "user/password" format.

None
dataset

Dataset name to use for storage.

None
clean

If True, delete all data from the dataset on initialization.

False
**kwargs

Additional keyword arguments passed to the parent class.

{}

Raises:

Type Description
ValueError

If dataset is not specified in URI or as argument.

Example

manager = FusekiTripleStoreManager( ... uri="http://localhost:3030", ... dataset="test", ... clean=True ... )

Source code in ontocast/tool/triple_manager/fuseki.py
def __init__(self, uri=None, auth=None, dataset=None, clean=False, **kwargs):
    """Initialize the Fuseki triple store manager.

    This method sets up the connection to Fuseki, creates the dataset
    if it doesn't exist, and optionally cleans all data from the dataset.

    Args:
        uri: Fuseki server URI (e.g., "http://localhost:3030").
        auth: Authentication tuple (username, password) or string in "user/password" format.
        dataset: Dataset name to use for storage.
        clean: If True, delete all data from the dataset on initialization.
        **kwargs: Additional keyword arguments passed to the parent class.

    Raises:
        ValueError: If dataset is not specified in URI or as argument.

    Example:
        >>> manager = FusekiTripleStoreManager(
        ...     uri="http://localhost:3030",
        ...     dataset="test",
        ...     clean=True
        ... )
    """
    super().__init__(
        uri=uri, auth=auth, env_uri="FUSEKI_URI", env_auth="FUSEKI_AUTH", **kwargs
    )
    self.dataset = dataset
    self.clean = clean
    self.init_dataset(self.dataset)
    if self.dataset is None:
        raise ValueError("Dataset must be specified in FUSEKI_URI or as argument")

    # Clean dataset if requested
    if self.clean:
        self._clean_dataset()

fetch_ontologies()

Fetch all ontologies from their corresponding named graphs.

This method discovers all ontologies in the Fuseki dataset and fetches each one from its corresponding named graph. It uses a two-step process:

  1. Discovery: Query for all ontology URIs using SPARQL
  2. Fetching: Retrieve each ontology from its named graph

The method handles both named graphs and the default graph, and verifies that each ontology is properly typed as owl:Ontology.

Returns:

Type Description
list[Ontology]

list[Ontology]: List of all ontologies found in the dataset.

Example

ontologies = manager.fetch_ontologies() for onto in ontologies: ... print(f"Found ontology: {onto.iri}")

Source code in ontocast/tool/triple_manager/fuseki.py
def fetch_ontologies(self) -> list[Ontology]:
    """Fetch all ontologies from their corresponding named graphs.

    This method discovers all ontologies in the Fuseki dataset and
    fetches each one from its corresponding named graph. It uses
    a two-step process:

    1. Discovery: Query for all ontology URIs using SPARQL
    2. Fetching: Retrieve each ontology from its named graph

    The method handles both named graphs and the default graph,
    and verifies that each ontology is properly typed as owl:Ontology.

    Returns:
        list[Ontology]: List of all ontologies found in the dataset.

    Example:
        >>> ontologies = manager.fetch_ontologies()
        >>> for onto in ontologies:
        ...     print(f"Found ontology: {onto.iri}")
    """
    sparql_url = f"{self._get_dataset_url()}/sparql"

    # Step 1: List all ontology URIs from all graphs
    list_query = """
    SELECT DISTINCT ?s WHERE {
      { GRAPH ?g { ?s a <http://www.w3.org/2002/07/owl#Ontology> } }
      UNION
      { ?s a <http://www.w3.org/2002/07/owl#Ontology> }
    }
    """
    response = requests.post(
        sparql_url,
        data={"query": list_query, "format": "application/sparql-results+json"},
        auth=self.auth,
    )
    if response.status_code != 200:
        logger.error(f"Failed to list ontologies from Fuseki: {response.text}")
        return []

    results = response.json()
    ontology_iris = []
    for binding in results.get("results", {}).get("bindings", []):
        onto_iri = binding["s"]["value"]
        ontology_iris.append(onto_iri)

    logger.debug(f"Found {len(ontology_iris)} ontology URIs: {ontology_iris}")

    # Step 2: Fetch each ontology from its corresponding named graph
    ontologies = []
    for onto_iri in ontology_iris:
        # Fetch the ontology from its corresponding named graph
        graph = RDFGraph()
        export_url = f"{self._get_dataset_url()}/get?graph={onto_iri}"
        export_resp = requests.get(
            export_url, auth=self.auth, headers={"Accept": "text/turtle"}
        )

        if export_resp.status_code == 200:
            graph.parse(data=export_resp.text, format="turtle")
            # Verify the ontology is actually in this graph
            onto_iri_ref = URIRef(onto_iri)
            if (onto_iri_ref, RDF.type, OWL.Ontology) in graph:
                ontology_id = derive_ontology_id(onto_iri)
                ontologies.append(
                    Ontology(
                        graph=graph,
                        iri=onto_iri,
                        ontology_id=ontology_id,
                    )
                )
                logger.debug(f"Successfully loaded ontology: {onto_iri}")
            else:
                logger.warning(f"Ontology {onto_iri} not found in its named graph")
        else:
            logger.warning(
                f"Failed to fetch ontology graph {onto_iri}: {export_resp.status_code}"
            )

    logger.info(f"Successfully loaded {len(ontologies)} ontologies from Fuseki")
    return ontologies

init_dataset(dataset_name)

Initialize a Fuseki dataset.

This method creates a new dataset in Fuseki if it doesn't already exist. It uses Fuseki's admin API to create the dataset with TDB2 storage.

Parameters:

Name Type Description Default
dataset_name

Name of the dataset to create.

required
Note

This method will not fail if the dataset already exists.

Source code in ontocast/tool/triple_manager/fuseki.py
def init_dataset(self, dataset_name):
    """Initialize a Fuseki dataset.

    This method creates a new dataset in Fuseki if it doesn't already exist.
    It uses Fuseki's admin API to create the dataset with TDB2 storage.

    Args:
        dataset_name: Name of the dataset to create.

    Note:
        This method will not fail if the dataset already exists.
    """
    fuseki_admin_url = f"{self.uri}/$/datasets"

    payload = {"dbName": dataset_name, "dbType": "tdb2"}

    headers = {"Content-Type": "application/x-www-form-urlencoded"}

    response = requests.post(
        fuseki_admin_url, data=payload, headers=headers, auth=self.auth
    )

    if response.status_code == 200 or response.status_code == 201:
        logger.info(f"Dataset '{dataset_name}' created successfully.")
    else:
        logger.error(f"Failed to upload data. Status code: {response.status_code}")
        logger.error(f"Response: {response.text}")

serialize_facts(g, **kwargs)

Store facts (RDF graph) as a named graph in Fuseki.

This method stores the given RDF graph containing facts as a named graph in Fuseki. The graph name is taken from the chunk_uri parameter or defaults to "urn:chunk:default".

Parameters:

Name Type Description Default
g Graph

The RDF graph containing facts to store.

required
**kwargs

Additional keyword arguments. chunk_uri: URI to use as the named graph name (optional).

{}

Returns:

Name Type Description
bool

True if the facts were successfully stored, False otherwise.

Example

facts = RDFGraph() success = manager.serialize_facts(facts, chunk_uri="http://example.org/chunk1")

Source code in ontocast/tool/triple_manager/fuseki.py
def serialize_facts(self, g: Graph, **kwargs):
    """Store facts (RDF graph) as a named graph in Fuseki.

    This method stores the given RDF graph containing facts as a named
    graph in Fuseki. The graph name is taken from the chunk_uri parameter
    or defaults to "urn:chunk:default".

    Args:
        g: The RDF graph containing facts to store.
        **kwargs: Additional keyword arguments.
            chunk_uri: URI to use as the named graph name (optional).

    Returns:
        bool: True if the facts were successfully stored, False otherwise.

    Example:
        >>> facts = RDFGraph()
        >>> success = manager.serialize_facts(facts, chunk_uri="http://example.org/chunk1")
    """
    turtle_data = g.serialize(format="turtle")
    # Use chunk URI from kwargs or generate a default one
    chunk_uri = kwargs.get("chunk_uri", "urn:chunk:default")
    url = f"{self._get_dataset_url()}/data?graph={chunk_uri}"
    headers = {"Content-Type": "text/turtle;charset=utf-8"}
    response = requests.put(url, headers=headers, data=turtle_data, auth=self.auth)
    if response.status_code in (200, 201, 204):
        logger.info(f"Facts uploaded to Fuseki as named graph: {chunk_uri}")
        return True
    else:
        logger.error(f"Failed to upload facts. Status code: {response.status_code}")
        logger.error(f"Response: {response.text}")
        return False

serialize_ontology(o, **kwargs)

Store an ontology as a named graph in Fuseki.

This method stores the given ontology as a named graph in Fuseki, using the ontology's IRI as the graph name. This ensures that each ontology is stored separately and can be retrieved individually.

Parameters:

Name Type Description Default
o Ontology

The ontology to store.

required
**kwargs

Additional keyword arguments (not used).

{}

Returns:

Name Type Description
bool

True if the ontology was successfully stored, False otherwise.

Example

ontology = Ontology(iri="http://example.org/onto", graph=graph) success = manager.serialize_ontology(ontology)

Source code in ontocast/tool/triple_manager/fuseki.py
def serialize_ontology(self, o: Ontology, **kwargs):
    """Store an ontology as a named graph in Fuseki.

    This method stores the given ontology as a named graph in Fuseki,
    using the ontology's IRI as the graph name. This ensures that
    each ontology is stored separately and can be retrieved individually.

    Args:
        o: The ontology to store.
        **kwargs: Additional keyword arguments (not used).

    Returns:
        bool: True if the ontology was successfully stored, False otherwise.

    Example:
        >>> ontology = Ontology(iri="http://example.org/onto", graph=graph)
        >>> success = manager.serialize_ontology(ontology)
    """
    turtle_data = o.graph.serialize(format="turtle")
    graph_uri = o.iri or f"urn:ontology:{o.ontology_id}"
    url = f"{self._get_dataset_url()}/data?graph={graph_uri}"
    headers = {"Content-Type": "text/turtle;charset=utf-8"}
    response = requests.put(url, headers=headers, data=turtle_data, auth=self.auth)
    if response.status_code in (200, 201, 204):
        logger.info(f"Ontology {graph_uri} uploaded to Fuseki as named graph.")
        return True
    else:
        logger.error(
            f"Failed to upload ontology {graph_uri}. Status code: {response.status_code}"
        )
        logger.error(f"Response: {response.text}")
        return False

Neo4jTripleStoreManager

Bases: TripleStoreManagerWithAuth

Neo4j-based triple store manager using n10s (neosemantics) plugin.

This implementation handles RDF data more faithfully by using both the n10s property graph representation and raw RDF triple storage for accurate reconstruction. It provides comprehensive ontology management with namespace-based organization.

The manager uses Neo4j's n10s plugin for RDF operations, including: - RDF import and export via n10s - Ontology metadata storage and retrieval - Namespace-based ontology organization - Faithful RDF graph reconstruction

Attributes:

Name Type Description
clean bool

Whether to clean the database on initialization.

_driver

Private Neo4j driver instance.

Source code in ontocast/tool/triple_manager/neo4j.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
class Neo4jTripleStoreManager(TripleStoreManagerWithAuth):
    """Neo4j-based triple store manager using n10s (neosemantics) plugin.

    This implementation handles RDF data more faithfully by using both the n10s
    property graph representation and raw RDF triple storage for accurate reconstruction.
    It provides comprehensive ontology management with namespace-based organization.

    The manager uses Neo4j's n10s plugin for RDF operations, including:
    - RDF import and export via n10s
    - Ontology metadata storage and retrieval
    - Namespace-based ontology organization
    - Faithful RDF graph reconstruction

    Attributes:
        clean: Whether to clean the database on initialization.
        _driver: Private Neo4j driver instance.
    """

    clean: bool = Field(
        default=False, description="If True, clean the database on init."
    )
    _driver = None  # private attribute, not a pydantic field

    def __init__(self, uri=None, auth=None, clean=False, **kwargs):
        """Initialize the Neo4j triple store manager.

        This method sets up the connection to Neo4j, initializes the n10s
        plugin configuration, creates necessary constraints and indexes,
        and optionally cleans all data from the database.

        Args:
            uri: Neo4j connection URI (e.g., "bolt://localhost:7687").
            auth: Authentication tuple (username, password) or string in "user/password" format.
            clean: If True, delete all nodes in the database on initialization.
            **kwargs: Additional keyword arguments passed to the parent class.

        Raises:
            ImportError: If the neo4j Python driver is not installed.

        Example:
            >>> manager = Neo4jTripleStoreManager(
            ...     uri="bolt://localhost:7687",
            ...     auth="neo4j/password",
            ...     clean=True
            ... )
        """
        super().__init__(
            uri=uri, auth=auth, env_uri="NEO4J_URI", env_auth="NEO4J_AUTH", **kwargs
        )
        self.clean = clean
        if GraphDatabase is None:
            raise ImportError("neo4j Python driver is not installed.")
        self._driver = GraphDatabase.driver(self.uri, auth=self.auth)

        with self._driver.session() as session:
            # Clean database if requested
            if self.clean:
                try:
                    session.run("MATCH (n) DETACH DELETE n")
                    logger.debug("Neo4j database cleaned (all nodes deleted)")
                except Exception as e:
                    logger.debug(f"Neo4j cleanup failed: {e}")

            # Initialize n10s configuration
            self._init_n10s_config(session)

            # Create constraints and indexes
            self._create_constraints_and_indexes(session)

    def _init_n10s_config(self, session):
        """Initialize n10s configuration with better RDF handling.

        This method configures the n10s plugin for optimal RDF handling.
        It sets up the configuration to preserve vocabulary URIs, handle
        multivalued properties, and maintain RDF types as nodes.

        Args:
            session: Neo4j session for executing configuration commands.
        """
        try:
            # Check if already configured
            result = session.run("CALL n10s.graphconfig.show()")
            if result.single():
                logger.debug("n10s already configured")
        except:
            pass

        try:
            session.run("""
                CALL n10s.graphconfig.init({
                    handleVocabUris: "KEEP",
                    handleMultival: "OVERWRITE",
                    typesToLabels: false,
                    keepLangTag: false,
                    keepCustomDataTypes: true,
                    handleRDFTypes: "NODES"
                })
            """)
            logger.debug("n10s configuration initialized")
        except Exception as e:
            logger.warning(f"n10s configuration failed: {e}")

    def _create_constraints_and_indexes(self, session):
        """Create necessary constraints and indexes for optimal performance.

        This method creates Neo4j constraints and indexes that are needed
        for efficient ontology operations and data integrity.

        Args:
            session: Neo4j session for executing constraint/index creation commands.
        """
        constraints = [
            "CREATE CONSTRAINT n10s_unique_uri IF NOT EXISTS FOR (r:Resource) REQUIRE r.uri IS UNIQUE",
            "CREATE CONSTRAINT ontology_iri_unique IF NOT EXISTS FOR (o:Ontology) REQUIRE o.uri IS UNIQUE",
            "CREATE INDEX namespace_prefix IF NOT EXISTS FOR (ns:Namespace) ON (ns.prefix)",
        ]

        for constraint in constraints:
            try:
                session.run(constraint)
                logger.debug(f"Created constraint/index: {constraint.split()[-1]}")
            except Exception as e:
                logger.debug(f"Constraint/index creation (might already exist): {e}")

    def _extract_namespace_prefix(self, uri: str) -> tuple[str, str]:
        """Extract namespace and local name from URI.

        This method parses a URI to extract the namespace and local name
        using common separators (#, /, :).

        Args:
            uri: The URI to parse.

        Returns:
            tuple[str, str]: A tuple of (namespace, local_name).

        Example:
            >>> manager._extract_namespace_prefix("http://example.org/onto#Class")
            ("http://example.org/onto#", "Class")
        """
        common_separators = ["#", "/", ":"]
        for sep in common_separators:
            if sep in uri:
                parts = uri.rsplit(sep, 1)
                if len(parts) == 2:
                    return parts[0] + sep, parts[1]
        return uri, ""

    def _get_ontology_namespaces(self, session) -> dict:
        """Get all known ontology namespaces from the database.

        This method queries the Neo4j database to retrieve all known
        namespace prefixes and their corresponding URIs.

        Args:
            session: Neo4j session for executing queries.

        Returns:
            dict: Dictionary mapping namespace prefixes to URIs.
        """
        result = session.run("""
            MATCH (ns:Namespace)
            RETURN ns.prefix as prefix, ns.uri as uri
            UNION
            MATCH (o:Ontology)
            RETURN null as prefix, o.uri as uri
        """)

        namespaces = {}
        for record in result:
            uri = record.get("uri")
            prefix = record.get("prefix")
            if uri:
                if prefix:
                    namespaces[prefix] = uri
                else:
                    # Extract potential namespace from ontology URI
                    ns, _ = self._extract_namespace_prefix(uri)
                    if ns != uri:  # Only if we actually found a namespace
                        namespaces[ns] = ns

        return namespaces

    def fetch_ontologies(self) -> list[Ontology]:
        """Fetch ontologies from Neo4j with faithful RDF reconstruction.

        This method retrieves all ontologies from Neo4j and reconstructs
        their RDF graphs faithfully. It uses a multi-step process:

        1. Identifies distinct ontologies by their namespace URIs
        2. Fetches all entities belonging to each ontology
        3. Reconstructs the RDF graph faithfully using stored triples when available
        4. Falls back to n10s property graph conversion when needed

        Returns:
            list[Ontology]: List of all ontologies found in the database.

        Example:
            >>> ontologies = manager.fetch_ontologies()
            >>> for onto in ontologies:
            ...     print(f"Found ontology: {onto.iri}")
        """
        ontologies = []

        with self._driver.session() as session:
            try:
                # First, try to get explicitly stored ontology metadata
                ontology_iris = self._fetch_ontology_iris(session)

                if ontology_iris:
                    for ont_iri in ontology_iris:
                        ontology = self._reconstruct_ontology_from_metadata(
                            session, ont_iri
                        )
                        if ontology:
                            ontologies.append(ontology)

            except Exception as e:
                logger.error(f"Error in fetch_ontologies: {e}")

        logger.info(f"Successfully loaded {len(ontologies)} ontologies")
        return ontologies

    def _fetch_ontology_iris(self, session) -> list[str]:
        """Fetch explicit ontology metadata from Neo4j.

        This method queries Neo4j to find all entities that are explicitly
        typed as owl:Ontology.

        Args:
            session: Neo4j session for executing queries.

        Returns:
            list[str]: List of ontology IRIs found in the database.
        """
        result = session.run(f"""
            MATCH (o)-[:`{str(RDF.type)}`]->(t:Resource {{ uri: "{str(OWL.Ontology)}" }})
            WHERE o.uri IS NOT NULL
            RETURN
              o.uri AS iri
        """)

        iris = []
        for record in result:
            iri = record.get("iri", None)
            iris += [iri]
        iris = [iri for iri in iris if iri is not None]
        return iris

    def _reconstruct_ontology_from_metadata(self, session, iri) -> Optional[Ontology]:
        """Reconstruct an ontology from its metadata and related entities.

        This method takes an ontology IRI and reconstructs the complete
        ontology by fetching all related entities from the namespace.

        Args:
            session: Neo4j session for executing queries.
            iri: The ontology IRI to reconstruct.

        Returns:
            Optional[Ontology]: The reconstructed ontology, or None if failed.
        """
        namespace_uri, _ = self._extract_namespace_prefix(iri)

        logger.debug(f"Reconstructing ontology: {iri} with namespace: {namespace_uri}")

        # Fallback to n10s export for this namespace
        graph = self._export_namespace_via_n10s(session, namespace_uri)
        if graph and len(graph) > 0:
            return self._create_ontology_object(iri, iri, graph)

    def _export_namespace_via_n10s(
        self, session, namespace_uri: str
    ) -> Optional[RDFGraph]:
        """Export entities belonging to a namespace using n10s.

        This method uses Neo4j's n10s plugin to export all entities
        belonging to a specific namespace as RDF triples.

        Args:
            session: Neo4j session for executing queries.
            namespace_uri: The namespace URI to export.

        Returns:
            Optional[RDFGraph]: The exported RDF graph, or None if failed.
        """
        try:
            result = session.run(
                f"""
                CALL n10s.rdf.export.cypher(
                    'MATCH (n)-[r]->(m) WHERE n.uri STARTS WITH "{namespace_uri}" RETURN n,r,m',
                    {{format: 'Turtle'}}
                )
                YIELD subject, predicate, object, isLiteral, literalType, literalLang
                RETURN subject, predicate, object, isLiteral, literalType, literalLang
                """
            )

            # Process into Turtle format
            turtle_lines = []

            for record in result:
                subj = record["subject"]
                pred = record["predicate"]
                obj = record["object"]
                is_literal = record["isLiteral"]
                literal_type = record["literalType"]
                literal_lang = record["literalLang"]

                # Format object
                if is_literal:
                    # Escape special characters in literals
                    obj = obj.replace('"', r"\"")
                    obj_str = f'"{obj}"'

                    # Add datatype or language tag if present
                    if literal_lang:
                        obj_str += f"@{literal_lang}"
                    elif literal_type:
                        obj_str += f"^^<{literal_type}>"
                else:
                    obj_str = f"<{obj}>"

                # Format triple
                turtle_lines.append(f"<{subj}> <{pred}> {obj_str} .")

            # Combine into single string
            turtle_string = "\n".join(turtle_lines)

            if turtle_string.strip():
                graph = RDFGraph()
                graph.parse(data=turtle_string, format="turtle")
                logger.debug(
                    f"Exported {len(graph)} triples via n10s for namespace {namespace_uri}"
                )
                return graph
            return None

        except Exception as e:
            logger.debug(
                f"Failed to export via n10s for namespace {namespace_uri}: {e}"
            )

        return None

    def _create_ontology_object(
        self, iri: str, metadata: dict, graph: RDFGraph
    ) -> Ontology:
        """Create an Ontology object from IRI, metadata, and graph.

        Args:
            iri: The ontology IRI.
            metadata: Metadata dictionary (currently unused, kept for compatibility).
            graph: The RDF graph containing the ontology data.

        Returns:
            Ontology: The created ontology object.
        """
        ontology_id = derive_ontology_id(iri)
        return Ontology(graph=graph, iri=iri, ontology_id=ontology_id)

    def serialize_ontology(self, o: Ontology, **kwargs):
        """Serialize an ontology to Neo4j with both n10s and raw triple storage.

        This method stores the given ontology in Neo4j using the n10s plugin
        for RDF import. The ontology is stored as RDF triples that can be
        faithfully reconstructed later.

        Args:
            o: The ontology to store.
            **kwargs: Additional keyword arguments (not used).

        Returns:
            Any: The result summary from n10s import operation.
        """
        turtle_data = o.graph.serialize(format="turtle")

        with self._driver.session() as session:
            # Store via n10s for graph queries
            result = session.run(
                "CALL n10s.rdf.import.inline($ttl, 'Turtle')", ttl=turtle_data
            )
            summary = result.single()

        return summary

    def serialize_facts(self, g: RDFGraph, **kwargs):
        """Serialize facts (RDF graph) to Neo4j.

        This method stores the given RDF graph containing facts in Neo4j
        using the n10s plugin for RDF import.

        Args:
            g: The RDF graph containing facts to store.
            **kwargs: Additional keyword arguments (not used).

        Returns:
            Any: The result summary from n10s import operation.
        """
        turtle_data = g.serialize(format="turtle")

        with self._driver.session() as session:
            # Store via n10s
            result = session.run(
                "CALL n10s.rdf.import.inline($ttl, 'Turtle')", ttl=turtle_data
            )
            summary = result.single()

        return summary

    def close(self):
        """Close the Neo4j driver connection.

        This method should be called when the manager is no longer needed
        to properly close the database connection and free resources.
        """
        if self._driver:
            self._driver.close()

__init__(uri=None, auth=None, clean=False, **kwargs)

Initialize the Neo4j triple store manager.

This method sets up the connection to Neo4j, initializes the n10s plugin configuration, creates necessary constraints and indexes, and optionally cleans all data from the database.

Parameters:

Name Type Description Default
uri

Neo4j connection URI (e.g., "bolt://localhost:7687").

None
auth

Authentication tuple (username, password) or string in "user/password" format.

None
clean

If True, delete all nodes in the database on initialization.

False
**kwargs

Additional keyword arguments passed to the parent class.

{}

Raises:

Type Description
ImportError

If the neo4j Python driver is not installed.

Example

manager = Neo4jTripleStoreManager( ... uri="bolt://localhost:7687", ... auth="neo4j/password", ... clean=True ... )

Source code in ontocast/tool/triple_manager/neo4j.py
def __init__(self, uri=None, auth=None, clean=False, **kwargs):
    """Initialize the Neo4j triple store manager.

    This method sets up the connection to Neo4j, initializes the n10s
    plugin configuration, creates necessary constraints and indexes,
    and optionally cleans all data from the database.

    Args:
        uri: Neo4j connection URI (e.g., "bolt://localhost:7687").
        auth: Authentication tuple (username, password) or string in "user/password" format.
        clean: If True, delete all nodes in the database on initialization.
        **kwargs: Additional keyword arguments passed to the parent class.

    Raises:
        ImportError: If the neo4j Python driver is not installed.

    Example:
        >>> manager = Neo4jTripleStoreManager(
        ...     uri="bolt://localhost:7687",
        ...     auth="neo4j/password",
        ...     clean=True
        ... )
    """
    super().__init__(
        uri=uri, auth=auth, env_uri="NEO4J_URI", env_auth="NEO4J_AUTH", **kwargs
    )
    self.clean = clean
    if GraphDatabase is None:
        raise ImportError("neo4j Python driver is not installed.")
    self._driver = GraphDatabase.driver(self.uri, auth=self.auth)

    with self._driver.session() as session:
        # Clean database if requested
        if self.clean:
            try:
                session.run("MATCH (n) DETACH DELETE n")
                logger.debug("Neo4j database cleaned (all nodes deleted)")
            except Exception as e:
                logger.debug(f"Neo4j cleanup failed: {e}")

        # Initialize n10s configuration
        self._init_n10s_config(session)

        # Create constraints and indexes
        self._create_constraints_and_indexes(session)

close()

Close the Neo4j driver connection.

This method should be called when the manager is no longer needed to properly close the database connection and free resources.

Source code in ontocast/tool/triple_manager/neo4j.py
def close(self):
    """Close the Neo4j driver connection.

    This method should be called when the manager is no longer needed
    to properly close the database connection and free resources.
    """
    if self._driver:
        self._driver.close()

fetch_ontologies()

Fetch ontologies from Neo4j with faithful RDF reconstruction.

This method retrieves all ontologies from Neo4j and reconstructs their RDF graphs faithfully. It uses a multi-step process:

  1. Identifies distinct ontologies by their namespace URIs
  2. Fetches all entities belonging to each ontology
  3. Reconstructs the RDF graph faithfully using stored triples when available
  4. Falls back to n10s property graph conversion when needed

Returns:

Type Description
list[Ontology]

list[Ontology]: List of all ontologies found in the database.

Example

ontologies = manager.fetch_ontologies() for onto in ontologies: ... print(f"Found ontology: {onto.iri}")

Source code in ontocast/tool/triple_manager/neo4j.py
def fetch_ontologies(self) -> list[Ontology]:
    """Fetch ontologies from Neo4j with faithful RDF reconstruction.

    This method retrieves all ontologies from Neo4j and reconstructs
    their RDF graphs faithfully. It uses a multi-step process:

    1. Identifies distinct ontologies by their namespace URIs
    2. Fetches all entities belonging to each ontology
    3. Reconstructs the RDF graph faithfully using stored triples when available
    4. Falls back to n10s property graph conversion when needed

    Returns:
        list[Ontology]: List of all ontologies found in the database.

    Example:
        >>> ontologies = manager.fetch_ontologies()
        >>> for onto in ontologies:
        ...     print(f"Found ontology: {onto.iri}")
    """
    ontologies = []

    with self._driver.session() as session:
        try:
            # First, try to get explicitly stored ontology metadata
            ontology_iris = self._fetch_ontology_iris(session)

            if ontology_iris:
                for ont_iri in ontology_iris:
                    ontology = self._reconstruct_ontology_from_metadata(
                        session, ont_iri
                    )
                    if ontology:
                        ontologies.append(ontology)

        except Exception as e:
            logger.error(f"Error in fetch_ontologies: {e}")

    logger.info(f"Successfully loaded {len(ontologies)} ontologies")
    return ontologies

serialize_facts(g, **kwargs)

Serialize facts (RDF graph) to Neo4j.

This method stores the given RDF graph containing facts in Neo4j using the n10s plugin for RDF import.

Parameters:

Name Type Description Default
g RDFGraph

The RDF graph containing facts to store.

required
**kwargs

Additional keyword arguments (not used).

{}

Returns:

Name Type Description
Any

The result summary from n10s import operation.

Source code in ontocast/tool/triple_manager/neo4j.py
def serialize_facts(self, g: RDFGraph, **kwargs):
    """Serialize facts (RDF graph) to Neo4j.

    This method stores the given RDF graph containing facts in Neo4j
    using the n10s plugin for RDF import.

    Args:
        g: The RDF graph containing facts to store.
        **kwargs: Additional keyword arguments (not used).

    Returns:
        Any: The result summary from n10s import operation.
    """
    turtle_data = g.serialize(format="turtle")

    with self._driver.session() as session:
        # Store via n10s
        result = session.run(
            "CALL n10s.rdf.import.inline($ttl, 'Turtle')", ttl=turtle_data
        )
        summary = result.single()

    return summary

serialize_ontology(o, **kwargs)

Serialize an ontology to Neo4j with both n10s and raw triple storage.

This method stores the given ontology in Neo4j using the n10s plugin for RDF import. The ontology is stored as RDF triples that can be faithfully reconstructed later.

Parameters:

Name Type Description Default
o Ontology

The ontology to store.

required
**kwargs

Additional keyword arguments (not used).

{}

Returns:

Name Type Description
Any

The result summary from n10s import operation.

Source code in ontocast/tool/triple_manager/neo4j.py
def serialize_ontology(self, o: Ontology, **kwargs):
    """Serialize an ontology to Neo4j with both n10s and raw triple storage.

    This method stores the given ontology in Neo4j using the n10s plugin
    for RDF import. The ontology is stored as RDF triples that can be
    faithfully reconstructed later.

    Args:
        o: The ontology to store.
        **kwargs: Additional keyword arguments (not used).

    Returns:
        Any: The result summary from n10s import operation.
    """
    turtle_data = o.graph.serialize(format="turtle")

    with self._driver.session() as session:
        # Store via n10s for graph queries
        result = session.run(
            "CALL n10s.rdf.import.inline($ttl, 'Turtle')", ttl=turtle_data
        )
        summary = result.single()

    return summary

TripleStoreManager

Bases: Tool

Base class for managing RDF triple stores.

This class defines the interface for triple store management operations, including fetching and storing ontologies and their graphs. All concrete triple store implementations should inherit from this class.

This is an abstract base class that must be implemented by specific triple store backends (e.g., Neo4j, Fuseki, Filesystem).

Source code in ontocast/tool/triple_manager/core.py
class TripleStoreManager(Tool):
    """Base class for managing RDF triple stores.

    This class defines the interface for triple store management operations,
    including fetching and storing ontologies and their graphs. All concrete
    triple store implementations should inherit from this class.

    This is an abstract base class that must be implemented by specific
    triple store backends (e.g., Neo4j, Fuseki, Filesystem).
    """

    def __init__(self, **kwargs):
        """Initialize the triple store manager.

        Args:
            **kwargs: Additional keyword arguments passed to the parent class.
        """
        super().__init__(**kwargs)

    @abc.abstractmethod
    def fetch_ontologies(self) -> list[Ontology]:
        """Fetch all available ontologies from the triple store.

        This method should retrieve all ontologies stored in the triple store
        and return them as Ontology objects with their associated RDF graphs.

        Returns:
            list[Ontology]: List of available ontologies with their graphs.
        """
        return []

    @abc.abstractmethod
    def serialize_ontology(self, o: Ontology, **kwargs):
        """Store an ontology in the triple store.

        This method should store the given ontology and its associated RDF graph
        in the triple store. The implementation may choose how to organize
        the storage (e.g., as named graphs, in specific collections, etc.).

        Args:
            o: The ontology to store.
            **kwargs: Additional keyword arguments for serialization.
        """
        pass

    @abc.abstractmethod
    def serialize_facts(self, g: Graph, **kwargs):
        """Store a graph with facts in the triple store.

        This method should store the given RDF graph containing facts
        in the triple store. The implementation may choose how to organize
        the storage (e.g., as named graphs, in specific collections, etc.).

        Args:
            g: The RDF graph containing facts to store.
            **kwargs: Additional keyword arguments for serialization.
        """
        pass

__init__(**kwargs)

Initialize the triple store manager.

Parameters:

Name Type Description Default
**kwargs

Additional keyword arguments passed to the parent class.

{}
Source code in ontocast/tool/triple_manager/core.py
def __init__(self, **kwargs):
    """Initialize the triple store manager.

    Args:
        **kwargs: Additional keyword arguments passed to the parent class.
    """
    super().__init__(**kwargs)

fetch_ontologies() abstractmethod

Fetch all available ontologies from the triple store.

This method should retrieve all ontologies stored in the triple store and return them as Ontology objects with their associated RDF graphs.

Returns:

Type Description
list[Ontology]

list[Ontology]: List of available ontologies with their graphs.

Source code in ontocast/tool/triple_manager/core.py
@abc.abstractmethod
def fetch_ontologies(self) -> list[Ontology]:
    """Fetch all available ontologies from the triple store.

    This method should retrieve all ontologies stored in the triple store
    and return them as Ontology objects with their associated RDF graphs.

    Returns:
        list[Ontology]: List of available ontologies with their graphs.
    """
    return []

serialize_facts(g, **kwargs) abstractmethod

Store a graph with facts in the triple store.

This method should store the given RDF graph containing facts in the triple store. The implementation may choose how to organize the storage (e.g., as named graphs, in specific collections, etc.).

Parameters:

Name Type Description Default
g Graph

The RDF graph containing facts to store.

required
**kwargs

Additional keyword arguments for serialization.

{}
Source code in ontocast/tool/triple_manager/core.py
@abc.abstractmethod
def serialize_facts(self, g: Graph, **kwargs):
    """Store a graph with facts in the triple store.

    This method should store the given RDF graph containing facts
    in the triple store. The implementation may choose how to organize
    the storage (e.g., as named graphs, in specific collections, etc.).

    Args:
        g: The RDF graph containing facts to store.
        **kwargs: Additional keyword arguments for serialization.
    """
    pass

serialize_ontology(o, **kwargs) abstractmethod

Store an ontology in the triple store.

This method should store the given ontology and its associated RDF graph in the triple store. The implementation may choose how to organize the storage (e.g., as named graphs, in specific collections, etc.).

Parameters:

Name Type Description Default
o Ontology

The ontology to store.

required
**kwargs

Additional keyword arguments for serialization.

{}
Source code in ontocast/tool/triple_manager/core.py
@abc.abstractmethod
def serialize_ontology(self, o: Ontology, **kwargs):
    """Store an ontology in the triple store.

    This method should store the given ontology and its associated RDF graph
    in the triple store. The implementation may choose how to organize
    the storage (e.g., as named graphs, in specific collections, etc.).

    Args:
        o: The ontology to store.
        **kwargs: Additional keyword arguments for serialization.
    """
    pass