Skip to content

ontocast.tool.triple_manager

Triple store management package for OntoCast.

This package provides a unified interface for managing RDF triple stores across different backends. It includes abstract base classes and concrete implementations for various triple store technologies.

The package supports: - Abstract interfaces for triple store operations - Neo4j implementation using the n10s plugin - Fuseki implementation using Apache Fuseki - Filesystem implementation for local storage

All implementations support: - Fetching and storing ontologies - Serializing and retrieving facts - Authentication and connection management - Error handling and logging

Example

from ontocast.tool.triple_manager import Neo4jTripleStoreManager manager = Neo4jTripleStoreManager(uri="bolt://localhost:7687") ontologies = manager.fetch_ontologies()

FilesystemTripleStoreManager

Bases: TripleStoreManager

Filesystem-based implementation of triple store management.

This class provides a concrete implementation of triple store management using the local filesystem for storage. It reads and writes ontologies and facts as Turtle (.ttl) files in specified directories.

The manager supports: - Loading ontologies from a dedicated ontology directory - Storing ontologies with versioned filenames - Storing facts with customizable filenames based on specifications - Error handling for file operations

Attributes:

Name Type Description
working_directory Path | None

Path to the working directory for storing data.

ontology_path Path | None

Optional path to the ontology directory for loading ontologies.

Source code in ontocast/tool/triple_manager/filesystem_manager.py
class FilesystemTripleStoreManager(TripleStoreManager):
    """Filesystem-based implementation of triple store management.

    This class provides a concrete implementation of triple store management
    using the local filesystem for storage. It reads and writes ontologies
    and facts as Turtle (.ttl) files in specified directories.

    The manager supports:
    - Loading ontologies from a dedicated ontology directory
    - Storing ontologies with versioned filenames
    - Storing facts with customizable filenames based on specifications
    - Error handling for file operations

    Attributes:
        working_directory: Path to the working directory for storing data.
        ontology_path: Optional path to the ontology directory for loading ontologies.
    """

    working_directory: pathlib.Path | None
    ontology_path: pathlib.Path | None

    def __init__(self, **kwargs):
        """Initialize the filesystem triple store manager.

        This method sets up the filesystem manager with the specified
        working and ontology directories.

        Args:
            **kwargs: Additional keyword arguments passed to the parent class.
                working_directory: Path to the working directory for storing data.
                ontology_path: Path to the ontology directory for loading ontologies.

        Example:
            >>> manager = FilesystemTripleStoreManager(
            ...     working_directory="/path/to/work",
            ...     ontology_path="/path/to/ontologies"
            ... )
        """
        super().__init__(**kwargs)

    def fetch_ontologies(self) -> list[Ontology]:
        """Fetch all available ontologies from the filesystem.

        This method scans the ontology directory for Turtle (.ttl) files
        and loads each one as an Ontology object. Files are processed
        in sorted order for consistent results.

        Returns:
            list[Ontology]: List of all ontologies found in the ontology directory.

        Example:
            >>> ontologies = manager.fetch_ontologies()
            >>> for onto in ontologies:
            ...     print(f"Loaded ontology: {onto.ontology_id}")
        """
        ontologies = []
        if self.ontology_path is not None:
            sorted_files = sorted(self.ontology_path.glob("*.ttl"))
            for fname in sorted_files:
                try:
                    ontology = Ontology.from_file(fname)
                    ontologies.append(ontology)
                    logger.debug(f"Successfully loaded ontology from {fname}")
                except Exception as e:
                    logger.error(f"Failed to load ontology {fname}: {str(e)}")
        return ontologies

    def serialize_graph(self, graph: Graph, **kwargs) -> bool | None:
        """Store an RDF graph in the filesystem.

        This method stores the given RDF graph as a Turtle file in the
        working directory. The filename is generated based on the graph_uri
        parameter or defaults to "current.ttl".

        Args:
            graph: The RDF graph to store.
            fname:  str

        Example:
            >>> graph = RDFGraph()
            >>> manager.serialize_graph(graph)
            # Creates: working_directory/current.ttl

            >>> manager.serialize_graph(graph, fname="facts_abc.ttl")
        """
        if self.working_directory is None:
            return

        fname: str = kwargs.pop("fname")
        output_path = self.working_directory / fname
        graph.serialize(format="turtle", destination=output_path)
        logger.info(f"Graph saved to {output_path}")

    def serialize(self, o: Ontology | RDFGraph, graph_uri: str | None = None):
        if isinstance(o, Ontology):
            graph = o.graph
            fname = f"ontology_{o.ontology_id}_{o.version}.ttl"
        elif isinstance(o, RDFGraph):
            graph = o
            if graph_uri:
                s = graph_uri.split("/")[-2:]
                s = "_".join([x for x in s if x])
                fname = f"facts_{s}.ttl"
            else:
                fname = "facts_default.ttl"
        else:
            raise TypeError(f"unsupported obj of type {type(o)} received")

        self.serialize_graph(graph=graph, fname=fname)

    async def clean(self, dataset: str | None = None) -> None:
        """Clean/flush all data from the filesystem triple store.

        This method deletes all Turtle (.ttl) files from both the working
        directory and the ontology directory.

        Args:
            dataset: Optional dataset parameter (ignored for Filesystem, which doesn't
                support datasets). Included for interface compatibility.

        Warning: This operation is irreversible and will delete all data.

        Raises:
            Exception: If the cleanup operation fails.
        """
        if dataset is not None:
            logger.warning(
                f"Dataset parameter '{dataset}' ignored for Filesystem (datasets not supported)"
            )
            logger.warning(
                "clean method not implemented for FilesystemTripleStoreManager"
            )

__init__(**kwargs)

Initialize the filesystem triple store manager.

This method sets up the filesystem manager with the specified working and ontology directories.

Parameters:

Name Type Description Default
**kwargs

Additional keyword arguments passed to the parent class. working_directory: Path to the working directory for storing data. ontology_path: Path to the ontology directory for loading ontologies.

{}
Example

manager = FilesystemTripleStoreManager( ... working_directory="/path/to/work", ... ontology_path="/path/to/ontologies" ... )

Source code in ontocast/tool/triple_manager/filesystem_manager.py
def __init__(self, **kwargs):
    """Initialize the filesystem triple store manager.

    This method sets up the filesystem manager with the specified
    working and ontology directories.

    Args:
        **kwargs: Additional keyword arguments passed to the parent class.
            working_directory: Path to the working directory for storing data.
            ontology_path: Path to the ontology directory for loading ontologies.

    Example:
        >>> manager = FilesystemTripleStoreManager(
        ...     working_directory="/path/to/work",
        ...     ontology_path="/path/to/ontologies"
        ... )
    """
    super().__init__(**kwargs)

clean(dataset=None) async

Clean/flush all data from the filesystem triple store.

This method deletes all Turtle (.ttl) files from both the working directory and the ontology directory.

Parameters:

Name Type Description Default
dataset str | None

Optional dataset parameter (ignored for Filesystem, which doesn't support datasets). Included for interface compatibility.

None

Raises:

Type Description
Exception

If the cleanup operation fails.

Source code in ontocast/tool/triple_manager/filesystem_manager.py
async def clean(self, dataset: str | None = None) -> None:
    """Clean/flush all data from the filesystem triple store.

    This method deletes all Turtle (.ttl) files from both the working
    directory and the ontology directory.

    Args:
        dataset: Optional dataset parameter (ignored for Filesystem, which doesn't
            support datasets). Included for interface compatibility.

    Warning: This operation is irreversible and will delete all data.

    Raises:
        Exception: If the cleanup operation fails.
    """
    if dataset is not None:
        logger.warning(
            f"Dataset parameter '{dataset}' ignored for Filesystem (datasets not supported)"
        )
        logger.warning(
            "clean method not implemented for FilesystemTripleStoreManager"
        )

fetch_ontologies()

Fetch all available ontologies from the filesystem.

This method scans the ontology directory for Turtle (.ttl) files and loads each one as an Ontology object. Files are processed in sorted order for consistent results.

Returns:

Type Description
list[Ontology]

list[Ontology]: List of all ontologies found in the ontology directory.

Example

ontologies = manager.fetch_ontologies() for onto in ontologies: ... print(f"Loaded ontology: {onto.ontology_id}")

Source code in ontocast/tool/triple_manager/filesystem_manager.py
def fetch_ontologies(self) -> list[Ontology]:
    """Fetch all available ontologies from the filesystem.

    This method scans the ontology directory for Turtle (.ttl) files
    and loads each one as an Ontology object. Files are processed
    in sorted order for consistent results.

    Returns:
        list[Ontology]: List of all ontologies found in the ontology directory.

    Example:
        >>> ontologies = manager.fetch_ontologies()
        >>> for onto in ontologies:
        ...     print(f"Loaded ontology: {onto.ontology_id}")
    """
    ontologies = []
    if self.ontology_path is not None:
        sorted_files = sorted(self.ontology_path.glob("*.ttl"))
        for fname in sorted_files:
            try:
                ontology = Ontology.from_file(fname)
                ontologies.append(ontology)
                logger.debug(f"Successfully loaded ontology from {fname}")
            except Exception as e:
                logger.error(f"Failed to load ontology {fname}: {str(e)}")
    return ontologies

serialize_graph(graph, **kwargs)

Store an RDF graph in the filesystem.

This method stores the given RDF graph as a Turtle file in the working directory. The filename is generated based on the graph_uri parameter or defaults to "current.ttl".

Parameters:

Name Type Description Default
graph Graph

The RDF graph to store.

required
fname

str

required
Example

graph = RDFGraph() manager.serialize_graph(graph)

Creates: working_directory/current.ttl

manager.serialize_graph(graph, fname="facts_abc.ttl")

Source code in ontocast/tool/triple_manager/filesystem_manager.py
def serialize_graph(self, graph: Graph, **kwargs) -> bool | None:
    """Store an RDF graph in the filesystem.

    This method stores the given RDF graph as a Turtle file in the
    working directory. The filename is generated based on the graph_uri
    parameter or defaults to "current.ttl".

    Args:
        graph: The RDF graph to store.
        fname:  str

    Example:
        >>> graph = RDFGraph()
        >>> manager.serialize_graph(graph)
        # Creates: working_directory/current.ttl

        >>> manager.serialize_graph(graph, fname="facts_abc.ttl")
    """
    if self.working_directory is None:
        return

    fname: str = kwargs.pop("fname")
    output_path = self.working_directory / fname
    graph.serialize(format="turtle", destination=output_path)
    logger.info(f"Graph saved to {output_path}")

FusekiTripleStoreManager

Bases: TripleStoreManagerWithAuth

Fuseki-based triple store manager.

This class provides a concrete implementation of triple store management using Apache Fuseki. It stores ontologies as named graphs using their URIs as graph names, and supports dataset creation and cleanup.

The manager uses Fuseki's REST API for all operations, including: - Dataset creation and management - Named graph operations for ontologies - SPARQL queries for ontology discovery - Graph-level data operations

Attributes:

Name Type Description
dataset str | None

The Fuseki dataset name to use for storage.

clean None

Whether to clean the dataset on initialization.

Source code in ontocast/tool/triple_manager/fuseki.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
class FusekiTripleStoreManager(TripleStoreManagerWithAuth):
    """Fuseki-based triple store manager.

    This class provides a concrete implementation of triple store management
    using Apache Fuseki. It stores ontologies as named graphs using their
    URIs as graph names, and supports dataset creation and cleanup.

    The manager uses Fuseki's REST API for all operations, including:
    - Dataset creation and management
    - Named graph operations for ontologies
    - SPARQL queries for ontology discovery
    - Graph-level data operations

    Attributes:
        dataset: The Fuseki dataset name to use for storage.
        clean: Whether to clean the dataset on initialization.
    """

    dataset: str | None = Field(default=None, description="Fuseki dataset name")
    ontologies_dataset: str = Field(
        default=DEFAULT_ONTOLOGIES_DATASET,
        description="Fuseki dataset name for ontologies",
    )

    def __init__(
        self,
        uri=None,
        auth=None,
        dataset=None,
        ontologies_dataset=None,
        **kwargs,
    ):
        """Initialize the Fuseki triple store manager.

        This method sets up the connection to Fuseki and creates the dataset
        if it doesn't exist. The dataset is NOT cleaned on initialization.

        Args:
            uri: Fuseki server URI (e.g., "http://localhost:3030").
            auth: Authentication tuple (username, password) or string in "user/password" format.
            dataset: Dataset name to use for storage.
            ontologies_dataset: Dataset name for ontologies (defaults to separate dataset).
            **kwargs: Additional keyword arguments passed to the parent class.

        Raises:
            ValueError: If dataset is not specified in URI or as argument.

        Example:
            >>> manager = FusekiTripleStoreManager(
            ...     uri="http://localhost:3030",
            ...     dataset="test"
            ... )
            >>> # To clean the dataset, use the clean() method explicitly:
            >>> await manager.clean()
        """
        super().__init__(
            uri=uri, auth=auth, env_uri="FUSEKI_URI", env_auth="FUSEKI_AUTH", **kwargs
        )
        if dataset is None:
            self.dataset = DEFAULT_DATASET
        else:
            self.dataset = dataset
        self.ontologies_dataset = ontologies_dataset or DEFAULT_ONTOLOGIES_DATASET

        # Initialize httpx client for async operations
        self._client: httpx.AsyncClient | None = None

        # Initialize datasets synchronously (for backward compatibility)
        # In async contexts, use async_init() instead
        asyncio.run(self._async_init_with_cleanup())

    async def _async_init_with_cleanup(self):
        """Wrapper for async_init that ensures proper cleanup when using asyncio.run().

        This method creates a temporary client and ensures it's properly closed
        before returning, preventing "Event loop is closed" errors.
        """
        async with httpx.AsyncClient(
            auth=self._prepare_auth(), timeout=30.0
        ) as temp_client:
            # Temporarily replace the client
            original_client = self._client
            self._client = temp_client
            try:
                await self._async_init()
            finally:
                # Restore original client
                self._client = original_client

    async def _async_init(self):
        """Async initialization of datasets."""
        await self.init_dataset(self.dataset)
        if self.ontologies_dataset != self.dataset:
            await self.init_dataset(self.ontologies_dataset)

    def _prepare_auth(self) -> httpx.BasicAuth | None:
        """Prepare httpx BasicAuth from self.auth.

        Returns:
            httpx.BasicAuth instance or None if no auth is configured.
        """
        if self.auth:
            if isinstance(self.auth, tuple):
                return httpx.BasicAuth(*self.auth)
            elif isinstance(self.auth, str) and "/" in self.auth:
                parts = self.auth.split("/", 1)
                if len(parts) == 2:
                    username, password = parts[0], parts[1]
                    return httpx.BasicAuth(username, password)
        return None

    async def _get_client(self) -> httpx.AsyncClient:
        """Get or create the httpx async client."""
        if self._client is None:
            auth = self._prepare_auth()
            self._client = httpx.AsyncClient(auth=auth, timeout=30.0)
        return self._client

    async def close(self):
        """Close the httpx client."""
        if self._client is not None:
            await self._client.aclose()
            self._client = None

    async def update_dataset(self, new_dataset: str) -> None:
        """Update the dataset name for this manager.

        This method allows changing the dataset without recreating the entire
        manager, which is useful for API requests that specify different datasets.

        Args:
            new_dataset: The new dataset name to use.
        """
        if not new_dataset:
            raise ValueError("Dataset name cannot be empty")

        self.dataset = new_dataset
        await self.init_dataset(self.dataset)
        logger.info(f"Updated Fuseki dataset to: {self.dataset}")

    async def clean(self, dataset: str | None = None) -> None:
        """Clean/flush data from Fuseki dataset(s).

        This method removes all named graphs and clears the default graph
        from the specified dataset, or all datasets if no dataset is specified.

        Args:
            dataset: Optional dataset name to clean. If None, cleans both the main
                dataset and the ontologies dataset. If specified, cleans only that dataset.

        Warning: This operation is irreversible and will delete all data
        from the specified dataset(s).

        The method handles errors gracefully and logs the results of
        each cleanup operation.

        Example:
            >>> # Clean all datasets
            >>> await manager.clean()
            >>> # Clean specific dataset
            >>> await manager.clean(dataset="my_dataset")
        """
        if dataset is None:
            # Clean all datasets (main and ontologies)
            # self.dataset is guaranteed to be a string (set to DEFAULT_DATASET if None in __init__)
            assert self.dataset is not None, "Dataset should never be None"
            await self._clean_dataset_by_name(self.dataset)
            logger.info(f"Fuseki dataset '{self.dataset}' cleaned (all data deleted)")

            # Also clean the ontologies dataset if it's different
            if self.ontologies_dataset != self.dataset:
                await self._clean_dataset_by_name(self.ontologies_dataset)
                logger.info(
                    f"Fuseki ontologies dataset '{self.ontologies_dataset}' cleaned (all data deleted)"
                )
        else:
            # Clean only the specified dataset
            await self._clean_dataset_by_name(dataset)
            logger.info(f"Fuseki dataset '{dataset}' cleaned (all data deleted)")

    async def _clean_dataset_by_name(self, dataset_name: str) -> None:
        """Clean a specific dataset by name.

        This is a helper method that performs the actual cleaning of a single dataset.
        It deletes all named graphs and clears the default graph.

        Uses a temporary client to avoid event loop cleanup issues when called
        from different async contexts.

        Args:
            dataset_name: Name of the dataset to clean.

        Raises:
            Exception: If the cleanup operation fails.
        """
        # Use a temporary client to avoid event loop cleanup issues
        async with httpx.AsyncClient(auth=self._prepare_auth(), timeout=30.0) as client:
            try:
                dataset_url = f"{self.uri}/{dataset_name}"
                sparql_update_url = f"{dataset_url}/update"
                sparql_url = f"{dataset_url}/sparql"

                # Delete all named graphs
                query = """
                SELECT DISTINCT ?g WHERE {
                  GRAPH ?g { ?s ?p ?o }
                }
                """
                response = await client.post(
                    sparql_url,
                    data={"query": query, "format": "application/sparql-results+json"},
                )

                if response.status_code == 200:
                    results = response.json()
                    tasks = []
                    for binding in results.get("results", {}).get("bindings", []):
                        graph_uri = binding["g"]["value"]
                        # Delete the named graph using SPARQL UPDATE
                        drop_query = f"DROP GRAPH <{graph_uri}>"
                        tasks.append(
                            client.post(
                                sparql_update_url,
                                data={"update": drop_query},
                            )
                        )

                    # Execute all deletions in parallel
                    delete_responses = await asyncio.gather(
                        *tasks, return_exceptions=True
                    )
                    for i, delete_response in enumerate(delete_responses):
                        graph_uri = results["results"]["bindings"][i]["g"]["value"]
                        if isinstance(delete_response, Exception):
                            logger.warning(
                                f"Failed to delete graph {graph_uri}: {delete_response}"
                            )
                        elif isinstance(delete_response, httpx.Response):
                            if delete_response.status_code in (200, 204):
                                logger.debug(f"Deleted named graph: {graph_uri}")
                            else:
                                logger.warning(
                                    f"Failed to delete graph {graph_uri}: {delete_response.status_code}"
                                )

                # Clear the default graph using SPARQL UPDATE
                clear_query = "CLEAR DEFAULT"
                clear_response = await client.post(
                    sparql_update_url,
                    data={"update": clear_query},
                )
                if clear_response.status_code in (200, 204):
                    logger.debug(f"Cleared default graph in dataset '{dataset_name}'")
                else:
                    logger.warning(
                        f"Failed to clear default graph in dataset '{dataset_name}': {clear_response.status_code}"
                    )
            except Exception as e:
                logger.error(f"Failed to clean dataset '{dataset_name}': {e}")
                raise

    async def init_dataset(self, dataset_name):
        """Initialize a Fuseki dataset.

        This method creates a new dataset in Fuseki if it doesn't already exist.
        It uses Fuseki's admin API to create the dataset with TDB2 storage.

        Uses a temporary client to avoid event loop cleanup issues when called
        from different async contexts.

        Args:
            dataset_name: Name of the dataset to create.

        Note:
            This method will not fail if the dataset already exists.
        """
        # Use a temporary client to avoid event loop cleanup issues
        async with httpx.AsyncClient(auth=self._prepare_auth(), timeout=30.0) as client:
            fuseki_admin_url = f"{self.uri}/$/datasets"

            payload = {"dbName": dataset_name, "dbType": "tdb2"}

            headers = {"Content-Type": "application/x-www-form-urlencoded"}

            response = await client.post(
                fuseki_admin_url, data=payload, headers=headers
            )

            if response.status_code == 200 or response.status_code == 201:
                logger.info(f"Fuseki dataset '{dataset_name}' created successfully.")
            elif response.status_code == 409:
                logger.info(
                    f"Fuseki status code: {response.status_code}; {response.text.strip()}"
                )
            else:
                logger.error(
                    f"Failed to create dataset {dataset_name}. Status code: {response.status_code}"
                )
                logger.error(f"Response: {response.text.strip()}")

    def _get_dataset_url(self):
        """Get the full URL for the dataset.

        Returns:
            str: The complete URL for the dataset endpoint.
        """
        return f"{self.uri}/{self.dataset}"

    def _get_ontologies_dataset_url(self):
        """Get the full URL for the ontologies dataset.

        Returns:
            str: The complete URL for the ontologies dataset endpoint.
        """
        return f"{self.uri}/{self.ontologies_dataset}"

    def fetch_ontologies(self) -> list[Ontology]:
        """Synchronous wrapper for fetch_ontologies.

        For async usage, use afetch_ontologies() instead.
        """
        # Use a temporary client for this operation to avoid event loop cleanup issues
        return asyncio.run(self._fetch_ontologies_with_cleanup())

    async def afetch_ontologies(self) -> list[Ontology]:
        """Async version of fetch_ontologies.

        This is the preferred method when running in an async context.
        """
        return await self._fetch_ontologies_async()

    async def _fetch_ontologies_with_cleanup(self) -> list[Ontology]:
        """Wrapper that ensures proper cleanup when using asyncio.run().

        This method creates a temporary client and ensures it's properly closed
        before returning, preventing "Event loop is closed" errors.
        """
        async with httpx.AsyncClient(
            auth=self._prepare_auth(), timeout=30.0
        ) as temp_client:
            # Temporarily replace the client
            original_client = self._client
            self._client = temp_client
            try:
                return await self._fetch_ontologies_async()
            finally:
                # Restore original client
                self._client = original_client

    async def _fetch_ontologies_async(self) -> list[Ontology]:
        """Fetch all ontologies from their corresponding named graphs.

        This method discovers all ontologies in the Fuseki ontologies dataset and
        fetches each one from its corresponding named graph. For versioned ontologies,
        it returns only the latest version for each unique ontology IRI.

        1. Discovery: List all named graphs (which may be versioned URIs)
        2. Fetching: Retrieve each ontology from its named graph (in parallel)
        3. Deduplication: For versioned ontologies, keep only the latest version

        Returns:
            list[Ontology]: List of the latest version of each ontology found.

        Example:
            >>> ontologies = await manager.fetch_ontologies()
            >>> for onto in ontologies:
            ...     print(f"Found ontology: {onto.iri} v{onto.version}")
        """
        client = await self._get_client()
        sparql_url = f"{self._get_ontologies_dataset_url()}/sparql"

        # Step 1: List all named graphs
        list_query = """
        SELECT DISTINCT ?g WHERE {
          GRAPH ?g { ?s ?p ?o }
        }
        """
        response = await client.post(
            sparql_url,
            data={"query": list_query, "format": "application/sparql-results+json"},
        )
        if response.status_code != 200:
            logger.error(f"Failed to list graphs from Fuseki: {response.text}")
            return []

        results = response.json()
        graph_uris = []
        for binding in results.get("results", {}).get("bindings", []):
            graph_uri = binding["g"]["value"]
            graph_uris.append(graph_uri)

        logger.debug(f"Found {len(graph_uris)} named graphs: {graph_uris}")

        # Step 2: Fetch each ontology from its corresponding named graph (in parallel)
        async def fetch_single_ontology(graph_uri: str) -> Ontology | None:
            """Fetch a single ontology from a graph URI."""
            try:
                graph = RDFGraph()
                # URL encode the graph URI to handle special characters like #
                encoded_graph_uri = quote(str(graph_uri), safe="/:")
                export_url = f"{self._get_ontologies_dataset_url()}/get?graph={encoded_graph_uri}"
                export_resp = await client.get(
                    export_url, headers={"Accept": "text/turtle"}
                )

                if export_resp.status_code == 200:
                    graph.parse(data=export_resp.text, format="turtle")

                    # Re-serialize deterministically to ensure consistent cache keys
                    # This sorts both namespaces and triples alphabetically
                    deterministic_turtle = deterministic_turtle_serialization(graph)

                    # Re-parse from deterministic serialization to ensure we have RDFGraph
                    deterministic_graph = RDFGraph()
                    deterministic_graph.parse(
                        data=deterministic_turtle, format="turtle"
                    )

                    # Copy namespace bindings from original graph
                    for prefix, namespace in graph.namespaces():
                        if prefix:
                            deterministic_graph.bind(prefix, namespace)

                    graph = deterministic_graph

                    # Find the ontology IRI in the graph
                    for onto_subj, _, obj in graph.triples(
                        (None, RDF.type, OWL.Ontology)
                    ):
                        onto_iri = str(onto_subj)
                        # Extract base IRI if graph_uri is versioned
                        # Handle both hash fragments (#19193944...) and semantic versions (#v1.2.3)
                        if "#" in graph_uri:
                            base_iri = graph_uri.split("#")[0]
                            # Use base IRI from graph_uri (named graph identifier)
                            # The graph content should have simplified IRI, but use graph_uri as source of truth
                            onto_iri = base_iri

                        ontology = Ontology(
                            graph=graph,
                            iri=onto_iri,
                        )
                        # Load properties from graph (will strip any hash fragments if present)
                        ontology.sync_properties_from_graph()
                        logger.debug(
                            f"Successfully loaded ontology: {onto_iri} version: {ontology.version}"
                        )
                        return ontology
                else:
                    logger.warning(
                        f"Failed to fetch graph {graph_uri}: {export_resp.status_code}"
                    )
            except Exception as e:
                logger.warning(f"Error fetching ontology from {graph_uri}: {e}")
            return None

        # Fetch all ontologies in parallel
        all_ontologies_results = await asyncio.gather(
            *[fetch_single_ontology(uri) for uri in graph_uris], return_exceptions=True
        )

        # Filter out None and exceptions
        all_ontologies = []
        for result in all_ontologies_results:
            if isinstance(result, Exception):
                logger.warning(f"Exception fetching ontology: {result}")
            elif result is not None:
                all_ontologies.append(result)

        # Step 3: Deduplicate and keep latest terminal versions
        ontology_dict = defaultdict(list)

        for onto in all_ontologies:
            ontology_dict[onto.iri].append(onto)

        # Build set of all parent hashes to identify terminal ontologies
        # A terminal ontology is one that is not a parent for any other ontology
        all_parent_hashes = set()

        for onto in all_ontologies:
            if onto.hash:
                # Collect all parent hashes
                for parent_hash in onto.parent_hashes:
                    all_parent_hashes.add(parent_hash)

        # For each unique IRI, select the latest terminal ontology
        ontologies = []

        for iri, versions in ontology_dict.items():
            if len(versions) == 1:
                ontologies.append(versions[0])
            else:
                # Multiple versions - find terminal ontologies (not parents)
                terminal_versions = [
                    v for v in versions if v.hash and v.hash not in all_parent_hashes
                ]

                if not terminal_versions:
                    # No terminal ontologies found - all are parents
                    # Fall back to non-terminal versions
                    logger.warning(
                        f"No terminal ontologies found for {iri}, "
                        f"using all versions for selection"
                    )
                    terminal_versions = versions

                # Select latest by created_at among terminal ontologies
                try:
                    versions_with_created = [
                        v for v in terminal_versions if v.created_at is not None
                    ]

                    if versions_with_created:
                        # Sort by created_at (most recent first)
                        versions_with_created.sort(
                            key=lambda x: x.created_at, reverse=True
                        )
                        selected = versions_with_created[0]
                        hash_str = (
                            f"{selected.hash[:16]}..." if selected.hash else "no hash"
                        )
                        logger.debug(
                            f"Selected terminal ontology for {iri} "
                            f"by created_at: {selected.created_at} "
                            f"(hash: {hash_str})"
                        )
                        ontologies.append(selected)
                    else:
                        # No created_at available - fall back to version-based sorting
                        versions_with_ver = [v for v in terminal_versions if v.version]
                        if versions_with_ver:
                            versions_with_ver.sort(
                                key=lambda x: str(x.version), reverse=False
                            )
                            selected = versions_with_ver[-1]
                            logger.debug(
                                f"Selected terminal ontology for {iri} "
                                f"by version: {selected.version} "
                                f"(no created_at available)"
                            )
                            ontologies.append(selected)
                        else:
                            # No version info either - use first terminal ontology
                            selected = terminal_versions[0]
                            logger.debug(
                                f"Selected first terminal ontology for {iri} "
                                f"(no created_at or version available)"
                            )
                            ontologies.append(selected)
                except Exception as e:
                    logger.warning(
                        f"Could not select terminal ontology for {iri}: {e}, "
                        f"using first version"
                    )
                    ontologies.append(terminal_versions[0])

        logger.info(
            f"Successfully loaded {len(ontologies)} unique ontologies from Fuseki "
        )
        return ontologies

    def serialize_graph(self, graph: Graph, **kwargs) -> bool | None:
        """Synchronous wrapper for serialize_graph.

        For async usage, use aserialize_graph() instead.
        """
        return asyncio.run(self._serialize_graph_with_cleanup(graph, **kwargs))

    async def aserialize_graph(self, graph: Graph, **kwargs) -> bool | None:
        """Async version of serialize_graph.

        This is the preferred method when running in an async context.
        """
        return await self._serialize_graph_async(graph, **kwargs)

    async def _serialize_graph_with_cleanup(
        self, graph: Graph, **kwargs
    ) -> bool | None:
        """Wrapper that ensures proper cleanup when using asyncio.run().

        This method creates a temporary client and ensures it's properly closed
        before returning, preventing "Event loop is closed" errors.
        """
        async with httpx.AsyncClient(
            auth=self._prepare_auth(), timeout=30.0
        ) as temp_client:
            # Temporarily replace the client
            original_client = self._client
            self._client = temp_client
            try:
                return await self._serialize_graph_async(graph, **kwargs)
            finally:
                # Restore original client
                self._client = original_client

    async def _serialize_graph_async(self, graph: Graph, **kwargs) -> bool | None:
        """Store an RDF graph as a named graph in a specific Fuseki dataset.

        This is a private helper method that handles the common logic for storing
        graphs in Fuseki datasets.

        Args:
            graph: The RDF graph to store.
            **kwargs: Additional parameters including graph_uri, dataset_url, default_graph_uri, log_prefix.

        Returns:
            bool: True if the graph was successfully stored, False otherwise.
        """
        client = await self._get_client()
        graph_uri = kwargs.get("graph_uri")
        dataset_url = kwargs.get("dataset_url")
        default_graph_uri = kwargs.get("default_graph_uri")
        log_prefix = kwargs.get("log_prefix")

        turtle_data = graph.serialize(format="turtle")
        if graph_uri is None:
            graph_uri = default_graph_uri

        # URL encode the graph URI to handle special characters like #
        encoded_graph_uri = quote(str(graph_uri), safe="/:")
        url = f"{dataset_url}/data?graph={encoded_graph_uri}"
        headers = {"Content-Type": "text/turtle;charset=utf-8"}
        response = await client.put(url, headers=headers, content=turtle_data)
        if response.status_code in (200, 201, 204):
            logger.info(
                f"{log_prefix} graph {graph_uri} uploaded to Fuseki as named graph."
            )
            return True
        else:
            logger.error(
                f"Failed to upload {log_prefix.lower() if log_prefix else 'unknown'} graph {graph_uri}. Status code: {response.status_code}"
            )
            logger.error(f"Response: {response.text}")
            return False

    def serialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
        """Synchronous wrapper for serialize.

        For async usage, use aserialize() instead.
        """
        return asyncio.run(self._serialize_with_cleanup(o, **kwargs))

    async def aserialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
        """Async version of serialize.

        This is the preferred method when running in an async context.
        """
        return await self._serialize_async(o, **kwargs)

    async def _serialize_with_cleanup(
        self, o: Ontology | RDFGraph, **kwargs
    ) -> bool | None:
        """Wrapper that ensures proper cleanup when using asyncio.run().

        This method creates a temporary client and ensures it's properly closed
        before returning, preventing "Event loop is closed" errors.
        """
        async with httpx.AsyncClient(
            auth=self._prepare_auth(), timeout=30.0
        ) as temp_client:
            # Temporarily replace the client
            original_client = self._client
            self._client = temp_client
            try:
                return await self._serialize_async(o, **kwargs)
            finally:
                # Restore original client
                self._client = original_client

    async def _serialize_async(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
        """Store an RDF graph as a named graph in Fuseki.

        This method stores the given RDF graph as a named graph in Fuseki.
        The graph name is taken from the graph_uri parameter or defaults to
        "urn:data:default".

        Args:
            o: RDF graph or Ontology object.
            **kwargs: Additional parameters including graph_uri.

        Returns:
            bool: True if the graph was successfully stored, False otherwise.

        Example:
            >>> graph = RDFGraph()
            >>> success = await manager.serialize(graph)

            >>> success = await manager.serialize(graph, graph_uri="http://example.org/chunk1")
        """
        graph_uri = kwargs.get("graph_uri")

        if isinstance(o, Ontology):
            graph = o.graph
            # Use versioned IRI for storage to enable multiple versions to coexist
            graph_uri = o.versioned_iri
            default_graph_uri = "urn:ontology:default"
            log_prefix = "Ontology"
            # Use ontologies dataset for ontology storage
            dataset_url = self._get_ontologies_dataset_url()
        elif isinstance(o, RDFGraph):
            graph = o
            default_graph_uri = "urn:data:default"
            log_prefix = "Graph"
            # Use regular dataset for facts storage
            dataset_url = self._get_dataset_url()
        else:
            raise TypeError(f"unsupported obj of type {type(o)} received")

        return await self._serialize_graph_async(
            graph=graph,
            graph_uri=graph_uri,
            dataset_url=dataset_url,
            default_graph_uri=default_graph_uri,
            log_prefix=log_prefix,
        )

__init__(uri=None, auth=None, dataset=None, ontologies_dataset=None, **kwargs)

Initialize the Fuseki triple store manager.

This method sets up the connection to Fuseki and creates the dataset if it doesn't exist. The dataset is NOT cleaned on initialization.

Parameters:

Name Type Description Default
uri

Fuseki server URI (e.g., "http://localhost:3030").

None
auth

Authentication tuple (username, password) or string in "user/password" format.

None
dataset

Dataset name to use for storage.

None
ontologies_dataset

Dataset name for ontologies (defaults to separate dataset).

None
**kwargs

Additional keyword arguments passed to the parent class.

{}

Raises:

Type Description
ValueError

If dataset is not specified in URI or as argument.

Example

manager = FusekiTripleStoreManager( ... uri="http://localhost:3030", ... dataset="test" ... )

To clean the dataset, use the clean() method explicitly:

await manager.clean()

Source code in ontocast/tool/triple_manager/fuseki.py
def __init__(
    self,
    uri=None,
    auth=None,
    dataset=None,
    ontologies_dataset=None,
    **kwargs,
):
    """Initialize the Fuseki triple store manager.

    This method sets up the connection to Fuseki and creates the dataset
    if it doesn't exist. The dataset is NOT cleaned on initialization.

    Args:
        uri: Fuseki server URI (e.g., "http://localhost:3030").
        auth: Authentication tuple (username, password) or string in "user/password" format.
        dataset: Dataset name to use for storage.
        ontologies_dataset: Dataset name for ontologies (defaults to separate dataset).
        **kwargs: Additional keyword arguments passed to the parent class.

    Raises:
        ValueError: If dataset is not specified in URI or as argument.

    Example:
        >>> manager = FusekiTripleStoreManager(
        ...     uri="http://localhost:3030",
        ...     dataset="test"
        ... )
        >>> # To clean the dataset, use the clean() method explicitly:
        >>> await manager.clean()
    """
    super().__init__(
        uri=uri, auth=auth, env_uri="FUSEKI_URI", env_auth="FUSEKI_AUTH", **kwargs
    )
    if dataset is None:
        self.dataset = DEFAULT_DATASET
    else:
        self.dataset = dataset
    self.ontologies_dataset = ontologies_dataset or DEFAULT_ONTOLOGIES_DATASET

    # Initialize httpx client for async operations
    self._client: httpx.AsyncClient | None = None

    # Initialize datasets synchronously (for backward compatibility)
    # In async contexts, use async_init() instead
    asyncio.run(self._async_init_with_cleanup())

afetch_ontologies() async

Async version of fetch_ontologies.

This is the preferred method when running in an async context.

Source code in ontocast/tool/triple_manager/fuseki.py
async def afetch_ontologies(self) -> list[Ontology]:
    """Async version of fetch_ontologies.

    This is the preferred method when running in an async context.
    """
    return await self._fetch_ontologies_async()

aserialize(o, **kwargs) async

Async version of serialize.

This is the preferred method when running in an async context.

Source code in ontocast/tool/triple_manager/fuseki.py
async def aserialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
    """Async version of serialize.

    This is the preferred method when running in an async context.
    """
    return await self._serialize_async(o, **kwargs)

aserialize_graph(graph, **kwargs) async

Async version of serialize_graph.

This is the preferred method when running in an async context.

Source code in ontocast/tool/triple_manager/fuseki.py
async def aserialize_graph(self, graph: Graph, **kwargs) -> bool | None:
    """Async version of serialize_graph.

    This is the preferred method when running in an async context.
    """
    return await self._serialize_graph_async(graph, **kwargs)

clean(dataset=None) async

Clean/flush data from Fuseki dataset(s).

This method removes all named graphs and clears the default graph from the specified dataset, or all datasets if no dataset is specified.

Parameters:

Name Type Description Default
dataset str | None

Optional dataset name to clean. If None, cleans both the main dataset and the ontologies dataset. If specified, cleans only that dataset.

None

from the specified dataset(s).

The method handles errors gracefully and logs the results of each cleanup operation.

Example

Clean all datasets

await manager.clean()

Clean specific dataset

await manager.clean(dataset="my_dataset")

Source code in ontocast/tool/triple_manager/fuseki.py
async def clean(self, dataset: str | None = None) -> None:
    """Clean/flush data from Fuseki dataset(s).

    This method removes all named graphs and clears the default graph
    from the specified dataset, or all datasets if no dataset is specified.

    Args:
        dataset: Optional dataset name to clean. If None, cleans both the main
            dataset and the ontologies dataset. If specified, cleans only that dataset.

    Warning: This operation is irreversible and will delete all data
    from the specified dataset(s).

    The method handles errors gracefully and logs the results of
    each cleanup operation.

    Example:
        >>> # Clean all datasets
        >>> await manager.clean()
        >>> # Clean specific dataset
        >>> await manager.clean(dataset="my_dataset")
    """
    if dataset is None:
        # Clean all datasets (main and ontologies)
        # self.dataset is guaranteed to be a string (set to DEFAULT_DATASET if None in __init__)
        assert self.dataset is not None, "Dataset should never be None"
        await self._clean_dataset_by_name(self.dataset)
        logger.info(f"Fuseki dataset '{self.dataset}' cleaned (all data deleted)")

        # Also clean the ontologies dataset if it's different
        if self.ontologies_dataset != self.dataset:
            await self._clean_dataset_by_name(self.ontologies_dataset)
            logger.info(
                f"Fuseki ontologies dataset '{self.ontologies_dataset}' cleaned (all data deleted)"
            )
    else:
        # Clean only the specified dataset
        await self._clean_dataset_by_name(dataset)
        logger.info(f"Fuseki dataset '{dataset}' cleaned (all data deleted)")

close() async

Close the httpx client.

Source code in ontocast/tool/triple_manager/fuseki.py
async def close(self):
    """Close the httpx client."""
    if self._client is not None:
        await self._client.aclose()
        self._client = None

fetch_ontologies()

Synchronous wrapper for fetch_ontologies.

For async usage, use afetch_ontologies() instead.

Source code in ontocast/tool/triple_manager/fuseki.py
def fetch_ontologies(self) -> list[Ontology]:
    """Synchronous wrapper for fetch_ontologies.

    For async usage, use afetch_ontologies() instead.
    """
    # Use a temporary client for this operation to avoid event loop cleanup issues
    return asyncio.run(self._fetch_ontologies_with_cleanup())

init_dataset(dataset_name) async

Initialize a Fuseki dataset.

This method creates a new dataset in Fuseki if it doesn't already exist. It uses Fuseki's admin API to create the dataset with TDB2 storage.

Uses a temporary client to avoid event loop cleanup issues when called from different async contexts.

Parameters:

Name Type Description Default
dataset_name

Name of the dataset to create.

required
Note

This method will not fail if the dataset already exists.

Source code in ontocast/tool/triple_manager/fuseki.py
async def init_dataset(self, dataset_name):
    """Initialize a Fuseki dataset.

    This method creates a new dataset in Fuseki if it doesn't already exist.
    It uses Fuseki's admin API to create the dataset with TDB2 storage.

    Uses a temporary client to avoid event loop cleanup issues when called
    from different async contexts.

    Args:
        dataset_name: Name of the dataset to create.

    Note:
        This method will not fail if the dataset already exists.
    """
    # Use a temporary client to avoid event loop cleanup issues
    async with httpx.AsyncClient(auth=self._prepare_auth(), timeout=30.0) as client:
        fuseki_admin_url = f"{self.uri}/$/datasets"

        payload = {"dbName": dataset_name, "dbType": "tdb2"}

        headers = {"Content-Type": "application/x-www-form-urlencoded"}

        response = await client.post(
            fuseki_admin_url, data=payload, headers=headers
        )

        if response.status_code == 200 or response.status_code == 201:
            logger.info(f"Fuseki dataset '{dataset_name}' created successfully.")
        elif response.status_code == 409:
            logger.info(
                f"Fuseki status code: {response.status_code}; {response.text.strip()}"
            )
        else:
            logger.error(
                f"Failed to create dataset {dataset_name}. Status code: {response.status_code}"
            )
            logger.error(f"Response: {response.text.strip()}")

serialize(o, **kwargs)

Synchronous wrapper for serialize.

For async usage, use aserialize() instead.

Source code in ontocast/tool/triple_manager/fuseki.py
def serialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
    """Synchronous wrapper for serialize.

    For async usage, use aserialize() instead.
    """
    return asyncio.run(self._serialize_with_cleanup(o, **kwargs))

serialize_graph(graph, **kwargs)

Synchronous wrapper for serialize_graph.

For async usage, use aserialize_graph() instead.

Source code in ontocast/tool/triple_manager/fuseki.py
def serialize_graph(self, graph: Graph, **kwargs) -> bool | None:
    """Synchronous wrapper for serialize_graph.

    For async usage, use aserialize_graph() instead.
    """
    return asyncio.run(self._serialize_graph_with_cleanup(graph, **kwargs))

update_dataset(new_dataset) async

Update the dataset name for this manager.

This method allows changing the dataset without recreating the entire manager, which is useful for API requests that specify different datasets.

Parameters:

Name Type Description Default
new_dataset str

The new dataset name to use.

required
Source code in ontocast/tool/triple_manager/fuseki.py
async def update_dataset(self, new_dataset: str) -> None:
    """Update the dataset name for this manager.

    This method allows changing the dataset without recreating the entire
    manager, which is useful for API requests that specify different datasets.

    Args:
        new_dataset: The new dataset name to use.
    """
    if not new_dataset:
        raise ValueError("Dataset name cannot be empty")

    self.dataset = new_dataset
    await self.init_dataset(self.dataset)
    logger.info(f"Updated Fuseki dataset to: {self.dataset}")

MockFusekiTripleStoreManager

Bases: TripleStoreManagerWithAuth

Mock Fuseki triple store manager for testing.

This class simulates the behavior of FusekiTripleStoreManager without requiring an actual Fuseki server. It maintains in-memory storage and provides the same interface as the real implementation.

Attributes:

Name Type Description
dataset str | None

The mock dataset name.

ontologies_dataset str

The mock ontologies dataset name.

ontologies List[Ontology]

In-memory storage for ontologies.

graphs Dict[str, Graph]

In-memory storage for RDF graphs.

Source code in ontocast/tool/triple_manager/mock.py
class MockFusekiTripleStoreManager(TripleStoreManagerWithAuth):
    """Mock Fuseki triple store manager for testing.

    This class simulates the behavior of FusekiTripleStoreManager without
    requiring an actual Fuseki server. It maintains in-memory storage and
    provides the same interface as the real implementation.

    Attributes:
        dataset: The mock dataset name.
        ontologies_dataset: The mock ontologies dataset name.
        ontologies: In-memory storage for ontologies.
        graphs: In-memory storage for RDF graphs.
    """

    model_config = {"arbitrary_types_allowed": True}

    dataset: str | None = None
    ontologies_dataset: str = "ontologies"
    ontologies: List[Ontology] = Field(
        default_factory=list, description="In-memory storage for ontologies"
    )
    graphs: Dict[str, Graph] = Field(
        default_factory=dict, description="In-memory storage for RDF graphs"
    )

    def __init__(
        self,
        uri=None,
        auth=None,
        dataset=None,
        ontologies_dataset=None,
        clean=False,
        **kwargs,
    ):
        """Initialize the mock Fuseki triple store manager.

        Args:
            uri: Mock URI (ignored but kept for interface compatibility).
            auth: Mock authentication (ignored but kept for interface compatibility).
            dataset: Mock dataset name.
            ontologies_dataset: Mock ontologies dataset name.
            clean: Whether to clean the store on initialization.
            **kwargs: Additional keyword arguments.
        """
        super().__init__(uri=uri, auth=auth, **kwargs)
        self.dataset = dataset or "test"
        self.ontologies_dataset = ontologies_dataset or "ontologies"

        if clean:
            self.clear()

    def fetch_ontologies(self) -> List[Ontology]:
        """Fetch all available ontologies from the mock store.

        Returns:
            List[Ontology]: List of available ontologies with their graphs.
        """
        return self.ontologies.copy()

    def serialize_graph(
        self, graph: Graph, graph_uri: str | None = None
    ) -> bool | None:
        """Store an RDF graph in the mock store.

        Args:
            graph: The RDF graph to store.
            graph_uri: Optional URI to use as the graph identifier.

        Returns:
            bool: True if the graph was stored successfully.
        """
        # Create a new Graph and copy all triples
        new_graph = Graph()
        for triple in graph:
            new_graph.add(triple)

        if graph_uri:
            self.graphs[graph_uri] = new_graph
        else:
            # Generate a default URI based on graph content
            graph_uri = f"mock://{self.dataset}/graph/{len(self.graphs)}"
            self.graphs[graph_uri] = new_graph

        # Try to extract ontology information from the graph
        ontology_id = self._extract_ontology_id(graph)
        if ontology_id:
            ontology = Ontology(
                ontology_id=ontology_id,
                title=f"Mock Ontology {ontology_id}",
                description="Mock ontology for testing",
                version="1.0.0",
                iri=graph_uri,
                graph=self._create_rdf_graph_from_graph(graph),
            )
            # Update existing ontology or add new one
            existing = next(
                (o for o in self.ontologies if o.ontology_id == ontology_id), None
            )
            if existing:
                existing.graph = self._create_rdf_graph_from_graph(graph)
                existing.iri = graph_uri
            else:
                self.ontologies.append(ontology)

        return True

    def serialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
        """Store an Ontology or RDFGraph in the mock store.

        Args:
            o: Ontology or RDFGraph object to store.
            **kwargs: Additional keyword arguments.

        Returns:
            bool: True if the object was stored successfully.
        """
        if isinstance(o, Ontology):
            graph = o.graph
            graph_uri = o.iri
        elif isinstance(o, RDFGraph):
            graph = o
            graph_uri = kwargs.get("graph_uri")
        else:
            raise TypeError(f"unsupported obj of type {type(o)} received")

        return self.serialize_graph(graph, graph_uri)

    def _extract_ontology_id(self, graph: Graph) -> str | None:
        """Extract ontology ID from graph content.

        Args:
            graph: The RDF graph to analyze.

        Returns:
            str | None: The extracted ontology ID, or None if not found.
        """
        # Look for owl:Ontology declarations
        for s, p, o in graph.triples((None, RDF.type, OWL.Ontology)):
            if isinstance(s, URIRef):
                return derive_ontology_id(str(s))
        return None

    def clear(self):
        """Clear all stored data."""
        self.ontologies.clear()
        self.graphs.clear()

    async def clean(self, dataset: str | None = None) -> None:
        """Clean/flush data from the mock Fuseki triple store.

        Args:
            dataset: Optional dataset name (ignored for mock, kept for interface compatibility).
        """
        self.clear()

    def _create_rdf_graph_from_graph(self, graph: Graph) -> RDFGraph:
        """Create an RDFGraph from a regular Graph by copying all triples.

        Args:
            graph: The source graph to copy from.

        Returns:
            RDFGraph: A new RDFGraph with all triples copied.
        """
        rdf_graph = RDFGraph()
        for triple in graph:
            rdf_graph.add(triple)
        return rdf_graph

__init__(uri=None, auth=None, dataset=None, ontologies_dataset=None, clean=False, **kwargs)

Initialize the mock Fuseki triple store manager.

Parameters:

Name Type Description Default
uri

Mock URI (ignored but kept for interface compatibility).

None
auth

Mock authentication (ignored but kept for interface compatibility).

None
dataset

Mock dataset name.

None
ontologies_dataset

Mock ontologies dataset name.

None
clean

Whether to clean the store on initialization.

False
**kwargs

Additional keyword arguments.

{}
Source code in ontocast/tool/triple_manager/mock.py
def __init__(
    self,
    uri=None,
    auth=None,
    dataset=None,
    ontologies_dataset=None,
    clean=False,
    **kwargs,
):
    """Initialize the mock Fuseki triple store manager.

    Args:
        uri: Mock URI (ignored but kept for interface compatibility).
        auth: Mock authentication (ignored but kept for interface compatibility).
        dataset: Mock dataset name.
        ontologies_dataset: Mock ontologies dataset name.
        clean: Whether to clean the store on initialization.
        **kwargs: Additional keyword arguments.
    """
    super().__init__(uri=uri, auth=auth, **kwargs)
    self.dataset = dataset or "test"
    self.ontologies_dataset = ontologies_dataset or "ontologies"

    if clean:
        self.clear()

clean(dataset=None) async

Clean/flush data from the mock Fuseki triple store.

Parameters:

Name Type Description Default
dataset str | None

Optional dataset name (ignored for mock, kept for interface compatibility).

None
Source code in ontocast/tool/triple_manager/mock.py
async def clean(self, dataset: str | None = None) -> None:
    """Clean/flush data from the mock Fuseki triple store.

    Args:
        dataset: Optional dataset name (ignored for mock, kept for interface compatibility).
    """
    self.clear()

clear()

Clear all stored data.

Source code in ontocast/tool/triple_manager/mock.py
def clear(self):
    """Clear all stored data."""
    self.ontologies.clear()
    self.graphs.clear()

fetch_ontologies()

Fetch all available ontologies from the mock store.

Returns:

Type Description
List[Ontology]

List[Ontology]: List of available ontologies with their graphs.

Source code in ontocast/tool/triple_manager/mock.py
def fetch_ontologies(self) -> List[Ontology]:
    """Fetch all available ontologies from the mock store.

    Returns:
        List[Ontology]: List of available ontologies with their graphs.
    """
    return self.ontologies.copy()

serialize(o, **kwargs)

Store an Ontology or RDFGraph in the mock store.

Parameters:

Name Type Description Default
o Ontology | RDFGraph

Ontology or RDFGraph object to store.

required
**kwargs

Additional keyword arguments.

{}

Returns:

Name Type Description
bool bool | None

True if the object was stored successfully.

Source code in ontocast/tool/triple_manager/mock.py
def serialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
    """Store an Ontology or RDFGraph in the mock store.

    Args:
        o: Ontology or RDFGraph object to store.
        **kwargs: Additional keyword arguments.

    Returns:
        bool: True if the object was stored successfully.
    """
    if isinstance(o, Ontology):
        graph = o.graph
        graph_uri = o.iri
    elif isinstance(o, RDFGraph):
        graph = o
        graph_uri = kwargs.get("graph_uri")
    else:
        raise TypeError(f"unsupported obj of type {type(o)} received")

    return self.serialize_graph(graph, graph_uri)

serialize_graph(graph, graph_uri=None)

Store an RDF graph in the mock store.

Parameters:

Name Type Description Default
graph Graph

The RDF graph to store.

required
graph_uri str | None

Optional URI to use as the graph identifier.

None

Returns:

Name Type Description
bool bool | None

True if the graph was stored successfully.

Source code in ontocast/tool/triple_manager/mock.py
def serialize_graph(
    self, graph: Graph, graph_uri: str | None = None
) -> bool | None:
    """Store an RDF graph in the mock store.

    Args:
        graph: The RDF graph to store.
        graph_uri: Optional URI to use as the graph identifier.

    Returns:
        bool: True if the graph was stored successfully.
    """
    # Create a new Graph and copy all triples
    new_graph = Graph()
    for triple in graph:
        new_graph.add(triple)

    if graph_uri:
        self.graphs[graph_uri] = new_graph
    else:
        # Generate a default URI based on graph content
        graph_uri = f"mock://{self.dataset}/graph/{len(self.graphs)}"
        self.graphs[graph_uri] = new_graph

    # Try to extract ontology information from the graph
    ontology_id = self._extract_ontology_id(graph)
    if ontology_id:
        ontology = Ontology(
            ontology_id=ontology_id,
            title=f"Mock Ontology {ontology_id}",
            description="Mock ontology for testing",
            version="1.0.0",
            iri=graph_uri,
            graph=self._create_rdf_graph_from_graph(graph),
        )
        # Update existing ontology or add new one
        existing = next(
            (o for o in self.ontologies if o.ontology_id == ontology_id), None
        )
        if existing:
            existing.graph = self._create_rdf_graph_from_graph(graph)
            existing.iri = graph_uri
        else:
            self.ontologies.append(ontology)

    return True

MockNeo4jTripleStoreManager

Bases: TripleStoreManagerWithAuth

Mock Neo4j triple store manager for testing.

This class simulates the behavior of Neo4jTripleStoreManager without requiring an actual Neo4j server. It maintains in-memory storage and provides the same interface as the real implementation.

Attributes:

Name Type Description
ontologies List[Ontology]

In-memory storage for ontologies.

graphs Dict[str, Graph]

In-memory storage for RDF graphs.

Source code in ontocast/tool/triple_manager/mock.py
class MockNeo4jTripleStoreManager(TripleStoreManagerWithAuth):
    """Mock Neo4j triple store manager for testing.

    This class simulates the behavior of Neo4jTripleStoreManager without
    requiring an actual Neo4j server. It maintains in-memory storage and
    provides the same interface as the real implementation.

    Attributes:
        ontologies: In-memory storage for ontologies.
        graphs: In-memory storage for RDF graphs.
    """

    model_config = {"arbitrary_types_allowed": True}

    ontologies: List[Ontology] = Field(
        default_factory=list, description="In-memory storage for ontologies"
    )
    graphs: Dict[str, Graph] = Field(
        default_factory=dict, description="In-memory storage for RDF graphs"
    )

    def __init__(self, uri=None, auth=None, clean=False, **kwargs):
        """Initialize the mock Neo4j triple store manager.

        Args:
            uri: Mock URI (ignored but kept for interface compatibility).
            auth: Mock authentication (ignored but kept for interface compatibility).
            clean: Whether to clean the store on initialization.
            **kwargs: Additional keyword arguments.
        """
        super().__init__(uri=uri, auth=auth, **kwargs)

        if clean:
            self.clear()

    def fetch_ontologies(self) -> List[Ontology]:
        """Fetch all available ontologies from the mock store.

        Returns:
            List[Ontology]: List of available ontologies with their graphs.
        """
        return self.ontologies.copy()

    def serialize_graph(
        self, graph: Graph, graph_uri: str | None = None
    ) -> Dict[str, Any] | None:
        """Store an RDF graph in the mock store.

        Args:
            graph: The RDF graph to store.
            graph_uri: Optional URI to use as the graph identifier.

        Returns:
            Dict[str, Any]: Mock summary of the operation.
        """
        # Create a new Graph and copy all triples
        new_graph = Graph()
        for triple in graph:
            new_graph.add(triple)

        if graph_uri:
            self.graphs[graph_uri] = new_graph
        else:
            # Generate a default URI based on graph content
            graph_uri = f"mock://neo4j/graph/{len(self.graphs)}"
            self.graphs[graph_uri] = new_graph

        # Try to extract ontology information from the graph
        ontology_id = self._extract_ontology_id(graph)
        if ontology_id:
            ontology = Ontology(
                ontology_id=ontology_id,
                title=f"Mock Ontology {ontology_id}",
                description="Mock ontology for testing",
                version="1.0.0",
                iri=graph_uri,
                graph=self._create_rdf_graph_from_graph(graph),
            )
            # Update existing ontology or add new one
            existing = next(
                (o for o in self.ontologies if o.ontology_id == ontology_id), None
            )
            if existing:
                existing.graph = self._create_rdf_graph_from_graph(graph)
                existing.iri = graph_uri
            else:
                self.ontologies.append(ontology)

        # Return mock summary similar to Neo4j
        return {
            "nodes_created": len(graph),
            "relationships_created": 0,
            "properties_set": len(graph),
            "labels_added": 1,
        }

    def serialize(self, o: Ontology | RDFGraph, **kwargs) -> Dict[str, Any] | None:
        """Store an Ontology or RDFGraph in the mock store.

        Args:
            o: Ontology or RDFGraph object to store.
            **kwargs: Additional keyword arguments.

        Returns:
            Dict[str, Any]: Mock summary of the operation.
        """
        if isinstance(o, Ontology):
            graph = o.graph
            graph_uri = o.iri
        elif isinstance(o, RDFGraph):
            graph = o
            graph_uri = kwargs.get("graph_uri")
        else:
            raise TypeError(f"unsupported obj of type {type(o)} received")

        return self.serialize_graph(graph, graph_uri)

    def _extract_ontology_id(self, graph: Graph) -> str | None:
        """Extract ontology ID from graph content.

        Args:
            graph: The RDF graph to analyze.

        Returns:
            str | None: The extracted ontology ID, or None if not found.
        """
        # Look for owl:Ontology declarations
        for s, p, o in graph.triples((None, RDF.type, OWL.Ontology)):
            if isinstance(s, URIRef):
                return derive_ontology_id(str(s))
        return None

    def clear(self):
        """Clear all stored data."""
        self.ontologies.clear()
        self.graphs.clear()

    async def clean(self, dataset: str | None = None) -> None:
        """Clean/flush data from the mock Neo4j triple store.

        Args:
            dataset: Optional dataset name (ignored for Neo4j mock, kept for interface compatibility).
        """
        self.clear()

    def _create_rdf_graph_from_graph(self, graph: Graph) -> RDFGraph:
        """Create an RDFGraph from a regular Graph by copying all triples.

        Args:
            graph: The source graph to copy from.

        Returns:
            RDFGraph: A new RDFGraph with all triples copied.
        """
        rdf_graph = RDFGraph()
        for triple in graph:
            rdf_graph.add(triple)
        return rdf_graph

__init__(uri=None, auth=None, clean=False, **kwargs)

Initialize the mock Neo4j triple store manager.

Parameters:

Name Type Description Default
uri

Mock URI (ignored but kept for interface compatibility).

None
auth

Mock authentication (ignored but kept for interface compatibility).

None
clean

Whether to clean the store on initialization.

False
**kwargs

Additional keyword arguments.

{}
Source code in ontocast/tool/triple_manager/mock.py
def __init__(self, uri=None, auth=None, clean=False, **kwargs):
    """Initialize the mock Neo4j triple store manager.

    Args:
        uri: Mock URI (ignored but kept for interface compatibility).
        auth: Mock authentication (ignored but kept for interface compatibility).
        clean: Whether to clean the store on initialization.
        **kwargs: Additional keyword arguments.
    """
    super().__init__(uri=uri, auth=auth, **kwargs)

    if clean:
        self.clear()

clean(dataset=None) async

Clean/flush data from the mock Neo4j triple store.

Parameters:

Name Type Description Default
dataset str | None

Optional dataset name (ignored for Neo4j mock, kept for interface compatibility).

None
Source code in ontocast/tool/triple_manager/mock.py
async def clean(self, dataset: str | None = None) -> None:
    """Clean/flush data from the mock Neo4j triple store.

    Args:
        dataset: Optional dataset name (ignored for Neo4j mock, kept for interface compatibility).
    """
    self.clear()

clear()

Clear all stored data.

Source code in ontocast/tool/triple_manager/mock.py
def clear(self):
    """Clear all stored data."""
    self.ontologies.clear()
    self.graphs.clear()

fetch_ontologies()

Fetch all available ontologies from the mock store.

Returns:

Type Description
List[Ontology]

List[Ontology]: List of available ontologies with their graphs.

Source code in ontocast/tool/triple_manager/mock.py
def fetch_ontologies(self) -> List[Ontology]:
    """Fetch all available ontologies from the mock store.

    Returns:
        List[Ontology]: List of available ontologies with their graphs.
    """
    return self.ontologies.copy()

serialize(o, **kwargs)

Store an Ontology or RDFGraph in the mock store.

Parameters:

Name Type Description Default
o Ontology | RDFGraph

Ontology or RDFGraph object to store.

required
**kwargs

Additional keyword arguments.

{}

Returns:

Type Description
Dict[str, Any] | None

Dict[str, Any]: Mock summary of the operation.

Source code in ontocast/tool/triple_manager/mock.py
def serialize(self, o: Ontology | RDFGraph, **kwargs) -> Dict[str, Any] | None:
    """Store an Ontology or RDFGraph in the mock store.

    Args:
        o: Ontology or RDFGraph object to store.
        **kwargs: Additional keyword arguments.

    Returns:
        Dict[str, Any]: Mock summary of the operation.
    """
    if isinstance(o, Ontology):
        graph = o.graph
        graph_uri = o.iri
    elif isinstance(o, RDFGraph):
        graph = o
        graph_uri = kwargs.get("graph_uri")
    else:
        raise TypeError(f"unsupported obj of type {type(o)} received")

    return self.serialize_graph(graph, graph_uri)

serialize_graph(graph, graph_uri=None)

Store an RDF graph in the mock store.

Parameters:

Name Type Description Default
graph Graph

The RDF graph to store.

required
graph_uri str | None

Optional URI to use as the graph identifier.

None

Returns:

Type Description
Dict[str, Any] | None

Dict[str, Any]: Mock summary of the operation.

Source code in ontocast/tool/triple_manager/mock.py
def serialize_graph(
    self, graph: Graph, graph_uri: str | None = None
) -> Dict[str, Any] | None:
    """Store an RDF graph in the mock store.

    Args:
        graph: The RDF graph to store.
        graph_uri: Optional URI to use as the graph identifier.

    Returns:
        Dict[str, Any]: Mock summary of the operation.
    """
    # Create a new Graph and copy all triples
    new_graph = Graph()
    for triple in graph:
        new_graph.add(triple)

    if graph_uri:
        self.graphs[graph_uri] = new_graph
    else:
        # Generate a default URI based on graph content
        graph_uri = f"mock://neo4j/graph/{len(self.graphs)}"
        self.graphs[graph_uri] = new_graph

    # Try to extract ontology information from the graph
    ontology_id = self._extract_ontology_id(graph)
    if ontology_id:
        ontology = Ontology(
            ontology_id=ontology_id,
            title=f"Mock Ontology {ontology_id}",
            description="Mock ontology for testing",
            version="1.0.0",
            iri=graph_uri,
            graph=self._create_rdf_graph_from_graph(graph),
        )
        # Update existing ontology or add new one
        existing = next(
            (o for o in self.ontologies if o.ontology_id == ontology_id), None
        )
        if existing:
            existing.graph = self._create_rdf_graph_from_graph(graph)
            existing.iri = graph_uri
        else:
            self.ontologies.append(ontology)

    # Return mock summary similar to Neo4j
    return {
        "nodes_created": len(graph),
        "relationships_created": 0,
        "properties_set": len(graph),
        "labels_added": 1,
    }

MockTripleStoreManager

Bases: TripleStoreManager

Mock triple store manager for testing.

This class provides an in-memory implementation of triple store operations that simulates the behavior of real triple stores without requiring external services. It stores ontologies and graphs in memory and provides the same interface as concrete implementations.

Attributes:

Name Type Description
ontologies List[Ontology]

In-memory storage for ontologies.

graphs Dict[str, Graph]

In-memory storage for RDF graphs.

Source code in ontocast/tool/triple_manager/mock.py
class MockTripleStoreManager(TripleStoreManager):
    """Mock triple store manager for testing.

    This class provides an in-memory implementation of triple store operations
    that simulates the behavior of real triple stores without requiring external
    services. It stores ontologies and graphs in memory and provides the same
    interface as concrete implementations.

    Attributes:
        ontologies: In-memory storage for ontologies.
        graphs: In-memory storage for RDF graphs.
    """

    model_config = {"arbitrary_types_allowed": True}

    ontologies: List[Ontology] = Field(
        default_factory=list, description="In-memory storage for ontologies"
    )
    graphs: Dict[str, Graph] = Field(
        default_factory=dict, description="In-memory storage for RDF graphs"
    )

    def __init__(self, **kwargs):
        """Initialize the mock triple store manager.

        Args:
            **kwargs: Additional keyword arguments passed to the parent class.
        """
        super().__init__(**kwargs)

    def fetch_ontologies(self) -> List[Ontology]:
        """Fetch all available ontologies from the mock store.

        Returns:
            List[Ontology]: List of available ontologies with their graphs.
        """
        return self.ontologies.copy()

    def serialize_graph(
        self, graph: Graph, graph_uri: str | None = None
    ) -> bool | None:
        """Store an RDF graph in the mock store.

        Args:
            graph: The RDF graph to store.
            graph_uri: Optional URI to use as the graph identifier.

        Returns:
            bool: True if the graph was stored successfully.
        """
        # Create a new Graph and copy all triples
        new_graph = Graph()
        for triple in graph:
            new_graph.add(triple)

        if graph_uri:
            self.graphs[graph_uri] = new_graph
        else:
            # Generate a default URI based on graph content
            graph_uri = f"mock://graph/{len(self.graphs)}"
            self.graphs[graph_uri] = new_graph

        # Try to extract ontology information from the graph
        ontology_id = self._extract_ontology_id(graph)
        if ontology_id:
            ontology = Ontology(
                ontology_id=ontology_id,
                title=f"Mock Ontology {ontology_id}",
                description="Mock ontology for testing",
                version="1.0.0",
                iri=graph_uri,
                graph=self._create_rdf_graph_from_graph(graph),
            )
            # Update existing ontology or add new one
            existing = next(
                (o for o in self.ontologies if o.ontology_id == ontology_id), None
            )
            if existing:
                existing.graph = self._create_rdf_graph_from_graph(graph)
                existing.iri = graph_uri
            else:
                self.ontologies.append(ontology)

        return True

    def serialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
        """Store an Ontology or RDFGraph in the mock store.

        Args:
            o: Ontology or RDFGraph object to store.
            **kwargs: Additional keyword arguments.

        Returns:
            bool: True if the object was stored successfully.
        """
        if isinstance(o, Ontology):
            graph = o.graph
            graph_uri = o.iri
        elif isinstance(o, RDFGraph):
            graph = o
            graph_uri = kwargs.get("graph_uri")
        else:
            raise TypeError(f"unsupported obj of type {type(o)} received")

        return self.serialize_graph(graph, graph_uri)

    def _extract_ontology_id(self, graph: Graph) -> str | None:
        """Extract ontology ID from graph content.

        Args:
            graph: The RDF graph to analyze.

        Returns:
            str | None: The extracted ontology ID, or None if not found.
        """
        # Look for owl:Ontology declarations
        for s, p, o in graph.triples((None, RDF.type, OWL.Ontology)):
            if isinstance(s, URIRef):
                return derive_ontology_id(str(s))
        return None

    def clear(self):
        """Clear all stored data."""
        self.ontologies.clear()
        self.graphs.clear()

    async def clean(self, dataset: str | None = None) -> None:
        """Clean/flush data from the mock triple store.

        Args:
            dataset: Optional dataset name (ignored for mock, kept for interface compatibility).
        """
        self.clear()

    def _create_rdf_graph_from_graph(self, graph: Graph) -> RDFGraph:
        """Create an RDFGraph from a regular Graph by copying all triples.

        Args:
            graph: The source graph to copy from.

        Returns:
            RDFGraph: A new RDFGraph with all triples copied.
        """
        rdf_graph = RDFGraph()
        for triple in graph:
            rdf_graph.add(triple)
        return rdf_graph

__init__(**kwargs)

Initialize the mock triple store manager.

Parameters:

Name Type Description Default
**kwargs

Additional keyword arguments passed to the parent class.

{}
Source code in ontocast/tool/triple_manager/mock.py
def __init__(self, **kwargs):
    """Initialize the mock triple store manager.

    Args:
        **kwargs: Additional keyword arguments passed to the parent class.
    """
    super().__init__(**kwargs)

clean(dataset=None) async

Clean/flush data from the mock triple store.

Parameters:

Name Type Description Default
dataset str | None

Optional dataset name (ignored for mock, kept for interface compatibility).

None
Source code in ontocast/tool/triple_manager/mock.py
async def clean(self, dataset: str | None = None) -> None:
    """Clean/flush data from the mock triple store.

    Args:
        dataset: Optional dataset name (ignored for mock, kept for interface compatibility).
    """
    self.clear()

clear()

Clear all stored data.

Source code in ontocast/tool/triple_manager/mock.py
def clear(self):
    """Clear all stored data."""
    self.ontologies.clear()
    self.graphs.clear()

fetch_ontologies()

Fetch all available ontologies from the mock store.

Returns:

Type Description
List[Ontology]

List[Ontology]: List of available ontologies with their graphs.

Source code in ontocast/tool/triple_manager/mock.py
def fetch_ontologies(self) -> List[Ontology]:
    """Fetch all available ontologies from the mock store.

    Returns:
        List[Ontology]: List of available ontologies with their graphs.
    """
    return self.ontologies.copy()

serialize(o, **kwargs)

Store an Ontology or RDFGraph in the mock store.

Parameters:

Name Type Description Default
o Ontology | RDFGraph

Ontology or RDFGraph object to store.

required
**kwargs

Additional keyword arguments.

{}

Returns:

Name Type Description
bool bool | None

True if the object was stored successfully.

Source code in ontocast/tool/triple_manager/mock.py
def serialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
    """Store an Ontology or RDFGraph in the mock store.

    Args:
        o: Ontology or RDFGraph object to store.
        **kwargs: Additional keyword arguments.

    Returns:
        bool: True if the object was stored successfully.
    """
    if isinstance(o, Ontology):
        graph = o.graph
        graph_uri = o.iri
    elif isinstance(o, RDFGraph):
        graph = o
        graph_uri = kwargs.get("graph_uri")
    else:
        raise TypeError(f"unsupported obj of type {type(o)} received")

    return self.serialize_graph(graph, graph_uri)

serialize_graph(graph, graph_uri=None)

Store an RDF graph in the mock store.

Parameters:

Name Type Description Default
graph Graph

The RDF graph to store.

required
graph_uri str | None

Optional URI to use as the graph identifier.

None

Returns:

Name Type Description
bool bool | None

True if the graph was stored successfully.

Source code in ontocast/tool/triple_manager/mock.py
def serialize_graph(
    self, graph: Graph, graph_uri: str | None = None
) -> bool | None:
    """Store an RDF graph in the mock store.

    Args:
        graph: The RDF graph to store.
        graph_uri: Optional URI to use as the graph identifier.

    Returns:
        bool: True if the graph was stored successfully.
    """
    # Create a new Graph and copy all triples
    new_graph = Graph()
    for triple in graph:
        new_graph.add(triple)

    if graph_uri:
        self.graphs[graph_uri] = new_graph
    else:
        # Generate a default URI based on graph content
        graph_uri = f"mock://graph/{len(self.graphs)}"
        self.graphs[graph_uri] = new_graph

    # Try to extract ontology information from the graph
    ontology_id = self._extract_ontology_id(graph)
    if ontology_id:
        ontology = Ontology(
            ontology_id=ontology_id,
            title=f"Mock Ontology {ontology_id}",
            description="Mock ontology for testing",
            version="1.0.0",
            iri=graph_uri,
            graph=self._create_rdf_graph_from_graph(graph),
        )
        # Update existing ontology or add new one
        existing = next(
            (o for o in self.ontologies if o.ontology_id == ontology_id), None
        )
        if existing:
            existing.graph = self._create_rdf_graph_from_graph(graph)
            existing.iri = graph_uri
        else:
            self.ontologies.append(ontology)

    return True

Neo4jTripleStoreManager

Bases: TripleStoreManagerWithAuth

Neo4j-based triple store manager using n10s (neosemantics) plugin.

This implementation handles RDF data more faithfully by using both the n10s property graph representation and raw RDF triple storage for accurate reconstruction. It provides comprehensive ontology management with namespace-based organization.

The manager uses Neo4j's n10s plugin for RDF operations, including: - RDF import and export via n10s - Ontology metadata storage and retrieval - Namespace-based ontology organization - Faithful RDF graph reconstruction

Attributes:

Name Type Description
_driver Any

Private Neo4j driver instance.

Source code in ontocast/tool/triple_manager/neo4j.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
class Neo4jTripleStoreManager(TripleStoreManagerWithAuth):
    """Neo4j-based triple store manager using n10s (neosemantics) plugin.

    This implementation handles RDF data more faithfully by using both the n10s
    property graph representation and raw RDF triple storage for accurate reconstruction.
    It provides comprehensive ontology management with namespace-based organization.

    The manager uses Neo4j's n10s plugin for RDF operations, including:
    - RDF import and export via n10s
    - Ontology metadata storage and retrieval
    - Namespace-based ontology organization
    - Faithful RDF graph reconstruction

    Attributes:
        _driver: Private Neo4j driver instance.
    """

    _driver: Any = None  # private attribute, not a pydantic field

    def __init__(self, uri=None, auth=None, **kwargs):
        """Initialize the Neo4j triple store manager.

        This method sets up the connection to Neo4j, initializes the n10s
        plugin configuration, and creates necessary constraints and indexes.
        The database is NOT cleaned on initialization.

        Args:
            uri: Neo4j connection URI (e.g., "bolt://localhost:7687").
            auth: Authentication tuple (username, password) or string in "user/password" format.
            **kwargs: Additional keyword arguments passed to the parent class.

        Raises:
            ImportError: If the neo4j Python driver is not installed.

        Example:
            >>> manager = Neo4jTripleStoreManager(
            ...     uri="bolt://localhost:7687",
            ...     auth="neo4j/password"
            ... )
            >>> # To clean the database, use the clean() method explicitly:
            >>> await manager.clean()
        """
        super().__init__(
            uri=uri, auth=auth, env_uri="NEO4J_URI", env_auth="NEO4J_AUTH", **kwargs
        )
        if GraphDatabase is None:
            raise ImportError("neo4j Python driver is not installed.")
        if self.uri is None:
            raise ValueError("Neo4j URI is required but not provided.")
        self._driver = GraphDatabase.driver(self.uri, auth=self.auth)

        # Type assertion: we know _driver is not None after initialization
        assert self._driver is not None

        with self._driver.session() as session:
            # Initialize n10s configuration
            self._init_n10s_config(session)

            # Create constraints and indexes
            self._create_constraints_and_indexes(session)

    async def clean(self, dataset: str | None = None) -> None:
        """Clean/flush all data from the Neo4j database.

        This method deletes all nodes and relationships from the Neo4j database,
        effectively clearing all stored data.

        Args:
            dataset: Optional dataset parameter (ignored for Neo4j, which doesn't
                support datasets). Included for interface compatibility.

        Warning: This operation is irreversible and will delete all data.

        Raises:
            Exception: If the cleanup operation fails.
        """
        if dataset is not None:
            logger.warning(
                f"Dataset parameter '{dataset}' ignored for Neo4j (datasets not supported)"
            )

        if self._driver is None:
            raise ValueError("Neo4j driver is not initialized")

        with self._driver.session() as session:
            try:
                session.run("MATCH (n) DETACH DELETE n")
                logger.info("Neo4j database cleaned (all nodes deleted)")
            except Exception as e:
                logger.error(f"Neo4j cleanup failed: {e}")
                raise

    def _init_n10s_config(self, session):
        """Initialize n10s configuration with better RDF handling.

        This method configures the n10s plugin for optimal RDF handling.
        It sets up the configuration to preserve vocabulary URIs, handle
        multivalued properties, and maintain RDF types as nodes.

        Args:
            session: Neo4j session for executing configuration commands.
        """
        try:
            # Check if already configured
            result = session.run("CALL n10s.graphconfig.show()")
            if result.single():
                logger.debug("n10s already configured")
        except:
            pass

        try:
            session.run("""
                CALL n10s.graphconfig.init({
                    handleVocabUris: "KEEP",
                    handleMultival: "OVERWRITE",
                    typesToLabels: false,
                    keepLangTag: false,
                    keepCustomDataTypes: true,
                    handleRDFTypes: "NODES"
                })
            """)
            logger.debug("n10s configuration initialized")
        except Exception as e:
            logger.warning(f"n10s configuration failed: {e}")

    def _create_constraints_and_indexes(self, session):
        """Create necessary constraints and indexes for optimal performance.

        This method creates Neo4j constraints and indexes that are needed
        for efficient ontology operations and data integrity.

        Args:
            session: Neo4j session for executing constraint/index creation commands.
        """
        constraints = [
            "CREATE CONSTRAINT n10s_unique_uri IF NOT EXISTS FOR (r:Resource) REQUIRE r.uri IS UNIQUE",
            "CREATE CONSTRAINT ontology_iri_unique IF NOT EXISTS FOR (o:Ontology) REQUIRE o.uri IS UNIQUE",
            "CREATE INDEX namespace_prefix IF NOT EXISTS FOR (ns:Namespace) ON (ns.prefix)",
        ]

        for constraint in constraints:
            try:
                session.run(constraint)
                logger.debug(f"Created constraint/index: {constraint.split()[-1]}")
            except Exception as e:
                logger.debug(f"Constraint/index creation (might already exist): {e}")

    def _extract_namespace_prefix(self, uri: str) -> tuple[str, str]:
        """Extract namespace and local name from URI.

        This method parses a URI to extract the namespace and local name
        using common separators (#, /, :).

        Args:
            uri: The URI to parse.

        Returns:
            tuple[str, str]: A tuple of (namespace, local_name).

        Example:
            >>> manager._extract_namespace_prefix("http://example.org/onto#Class")
            ("http://example.org/onto#", "Class")
        """
        common_separators = ["#", "/", ":"]
        for sep in common_separators:
            if sep in uri:
                parts = uri.rsplit(sep, 1)
                if len(parts) == 2:
                    return parts[0] + sep, parts[1]
        return uri, ""

    def _get_ontology_namespaces(self, session) -> dict:
        """Get all known ontology namespaces from the database.

        This method queries the Neo4j database to retrieve all known
        namespace prefixes and their corresponding URIs.

        Args:
            session: Neo4j session for executing queries.

        Returns:
            dict: Dictionary mapping namespace prefixes to URIs.
        """
        result = session.run("""
            MATCH (ns:Namespace)
            RETURN ns.prefix as prefix, ns.uri as uri
            UNION
            MATCH (o:Ontology)
            RETURN null as prefix, o.uri as uri
        """)

        namespaces = {}
        for record in result:
            uri = record.get("uri")
            prefix = record.get("prefix")
            if uri:
                if prefix:
                    namespaces[prefix] = uri
                else:
                    # Extract potential namespace from ontology URI
                    ns, _ = self._extract_namespace_prefix(uri)
                    if ns != uri:  # Only if we actually found a namespace
                        namespaces[ns] = ns

        return namespaces

    def fetch_ontologies(self) -> list[Ontology]:
        """Fetch ontologies from Neo4j with faithful RDF reconstruction.

        This method retrieves all ontologies from Neo4j and reconstructs
        their RDF graphs faithfully. It uses a multi-step process:

        1. Identifies distinct ontologies by their namespace URIs
        2. Fetches all entities belonging to each ontology
        3. Reconstructs the RDF graph faithfully using stored triples when available
        4. Falls back to n10s property graph conversion when needed

        Returns:
            list[Ontology]: List of all ontologies found in the database.

        Example:
            >>> ontologies = manager.fetch_ontologies()
            >>> for onto in ontologies:
            ...     print(f"Found ontology: {onto.iri}")
        """
        ontologies = []

        # Type assertion: we know _driver is not None after initialization
        assert self._driver is not None
        with self._driver.session() as session:
            try:
                # First, try to get explicitly stored ontology metadata
                ontology_iris = self._fetch_ontology_iris(session)

                if ontology_iris:
                    for ont_iri in ontology_iris:
                        ontology = self._reconstruct_ontology_from_metadata(
                            session, ont_iri
                        )
                        if ontology:
                            ontologies.append(ontology)

            except Exception as e:
                logger.error(f"Error in fetch_ontologies: {e}")

        logger.info(f"Successfully loaded {len(ontologies)} ontologies")
        return ontologies

    def _fetch_ontology_iris(self, session) -> list[str]:
        """Fetch explicit ontology metadata from Neo4j.

        This method queries Neo4j to find all entities that are explicitly
        typed as owl:Ontology.

        Args:
            session: Neo4j session for executing queries.

        Returns:
            list[str]: List of ontology IRIs found in the database.
        """
        result = session.run(f"""
            MATCH (o)-[:`{str(RDF.type)}`]->(t:Resource {{ uri: "{str(OWL.Ontology)}" }})
            WHERE o.uri IS NOT NULL
            RETURN
              o.uri AS iri
        """)

        iris = []
        for record in result:
            iri = record.get("iri", None)
            iris += [iri]
        iris = [iri for iri in iris if iri is not None]
        return iris

    def _reconstruct_ontology_from_metadata(self, session, iri) -> Ontology | None:
        """Reconstruct an ontology from its metadata and related entities.

        This method takes an ontology IRI and reconstructs the complete
        ontology by fetching all related entities from the namespace.

        Args:
            session: Neo4j session for executing queries.
            iri: The ontology IRI to reconstruct.

        Returns:
            Ontology | None: The reconstructed ontology, or None if failed.
        """
        namespace_uri, _ = self._extract_namespace_prefix(iri)

        logger.debug(f"Reconstructing ontology: {iri} with namespace: {namespace_uri}")

        # Fallback to n10s export for this namespace
        graph = self._export_namespace_via_n10s(session, namespace_uri)
        if graph and len(graph) > 0:
            return self._create_ontology_object(iri, iri, graph)

    def _export_namespace_via_n10s(
        self, session, namespace_uri: str
    ) -> RDFGraph | None:
        """Export entities belonging to a namespace using n10s.

        This method uses Neo4j's n10s plugin to export all entities
        belonging to a specific namespace as RDF triples.

        Args:
            session: Neo4j session for executing queries.
            namespace_uri: The namespace URI to export.

        Returns:
            RDFGraph | None: The exported RDF graph, or None if failed.
        """
        try:
            result = session.run(
                f"""
                CALL n10s.rdf.export.cypher(
                    'MATCH (n)-[r]->(m) WHERE n.uri STARTS WITH "{namespace_uri}" RETURN n,r,m',
                    {{format: 'Turtle'}}
                )
                YIELD subject, predicate, object, isLiteral, literalType, literalLang
                RETURN subject, predicate, object, isLiteral, literalType, literalLang
                """
            )

            # Process into Turtle format
            turtle_lines = []

            for record in result:
                subj = record["subject"]
                pred = record["predicate"]
                obj = record["object"]
                is_literal = record["isLiteral"]
                literal_type = record["literalType"]
                literal_lang = record["literalLang"]

                # Format object
                if is_literal:
                    # Escape special characters in literals
                    obj = obj.replace('"', r"\"")
                    obj_str = f'"{obj}"'

                    # Add datatype or language tag if present
                    if literal_lang:
                        obj_str += f"@{literal_lang}"
                    elif literal_type:
                        obj_str += f"^^<{literal_type}>"
                else:
                    obj_str = f"<{obj}>"

                # Format triple
                turtle_lines.append(f"<{subj}> <{pred}> {obj_str} .")

            # Combine into single string
            turtle_string = "\n".join(turtle_lines)

            if turtle_string.strip():
                graph = RDFGraph()
                graph.parse(data=turtle_string, format="turtle")
                logger.debug(
                    f"Exported {len(graph)} triples via n10s for namespace {namespace_uri}"
                )
                return graph
            return None

        except Exception as e:
            logger.debug(
                f"Failed to export via n10s for namespace {namespace_uri}: {e}"
            )

        return None

    def _create_ontology_object(
        self, iri: str, metadata: dict, graph: RDFGraph
    ) -> Ontology:
        """Create an Ontology object from IRI, metadata, and graph.

        Args:
            iri: The ontology IRI.
            metadata: Metadata dictionary (currently unused, kept for compatibility).
            graph: The RDF graph containing the ontology data.

        Returns:
            Ontology: The created ontology object.
        """
        ontology_id = derive_ontology_id(iri)
        return Ontology(graph=graph, iri=iri, ontology_id=ontology_id)

    def serialize_graph(self, graph: Graph, **kwargs) -> bool | None:
        """Serialize an RDF graph to Neo4j with both n10s and raw triple storage.

        This method stores the given RDF graph in Neo4j using the n10s plugin
        for RDF import. The data is stored as RDF triples that can be faithfully
        reconstructed later.

        Args:
            graph: The RDF graph to store.
            **kwargs: Additional parameters (not used by Neo4j implementation).

        Returns:
            Any: The result summary from n10s import operation.
        """
        # Convert to RDFGraph if needed
        if not isinstance(graph, RDFGraph):
            rdf_graph = RDFGraph()
            for triple in graph:
                rdf_graph.add(triple)
            for prefix, namespace in graph.namespaces():
                rdf_graph.bind(prefix, namespace)
            graph = rdf_graph

        turtle_data = graph.serialize(format="turtle")

        # Type assertion: we know _driver is not None after initialization
        assert self._driver is not None
        with self._driver.session() as session:
            # Store via n10s for graph queries
            result = session.run(
                "CALL n10s.rdf.import.inline($ttl, 'Turtle')", ttl=turtle_data
            )
            summary = result.single()

        return summary

    def serialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
        """Serialize an Ontology or RDFGraph to Neo4j with both n10s and raw triple storage.

        This method stores the given Ontology or RDFGraph in Neo4j using the n10s plugin
        for RDF import. The data is stored as RDF triples that can be faithfully
        reconstructed later.

        Args:
            o: Ontology or RDFGraph object to store.
            **kwargs: Additional keyword arguments (not used by Neo4j implementation).

        Returns:
            Any: The result summary from n10s import operation.
        """
        if isinstance(o, Ontology):
            graph = o.graph
        elif isinstance(o, RDFGraph):
            graph = o
        else:
            raise TypeError(f"unsupported obj of type {type(o)} received")

        return self.serialize_graph(graph)

    def close(self):
        """Close the Neo4j driver connection.

        This method should be called when the manager is no longer needed
        to properly close the database connection and free resources.
        """
        if self._driver:
            self._driver.close()

__init__(uri=None, auth=None, **kwargs)

Initialize the Neo4j triple store manager.

This method sets up the connection to Neo4j, initializes the n10s plugin configuration, and creates necessary constraints and indexes. The database is NOT cleaned on initialization.

Parameters:

Name Type Description Default
uri

Neo4j connection URI (e.g., "bolt://localhost:7687").

None
auth

Authentication tuple (username, password) or string in "user/password" format.

None
**kwargs

Additional keyword arguments passed to the parent class.

{}

Raises:

Type Description
ImportError

If the neo4j Python driver is not installed.

Example

manager = Neo4jTripleStoreManager( ... uri="bolt://localhost:7687", ... auth="neo4j/password" ... )

To clean the database, use the clean() method explicitly:

await manager.clean()

Source code in ontocast/tool/triple_manager/neo4j.py
def __init__(self, uri=None, auth=None, **kwargs):
    """Initialize the Neo4j triple store manager.

    This method sets up the connection to Neo4j, initializes the n10s
    plugin configuration, and creates necessary constraints and indexes.
    The database is NOT cleaned on initialization.

    Args:
        uri: Neo4j connection URI (e.g., "bolt://localhost:7687").
        auth: Authentication tuple (username, password) or string in "user/password" format.
        **kwargs: Additional keyword arguments passed to the parent class.

    Raises:
        ImportError: If the neo4j Python driver is not installed.

    Example:
        >>> manager = Neo4jTripleStoreManager(
        ...     uri="bolt://localhost:7687",
        ...     auth="neo4j/password"
        ... )
        >>> # To clean the database, use the clean() method explicitly:
        >>> await manager.clean()
    """
    super().__init__(
        uri=uri, auth=auth, env_uri="NEO4J_URI", env_auth="NEO4J_AUTH", **kwargs
    )
    if GraphDatabase is None:
        raise ImportError("neo4j Python driver is not installed.")
    if self.uri is None:
        raise ValueError("Neo4j URI is required but not provided.")
    self._driver = GraphDatabase.driver(self.uri, auth=self.auth)

    # Type assertion: we know _driver is not None after initialization
    assert self._driver is not None

    with self._driver.session() as session:
        # Initialize n10s configuration
        self._init_n10s_config(session)

        # Create constraints and indexes
        self._create_constraints_and_indexes(session)

clean(dataset=None) async

Clean/flush all data from the Neo4j database.

This method deletes all nodes and relationships from the Neo4j database, effectively clearing all stored data.

Parameters:

Name Type Description Default
dataset str | None

Optional dataset parameter (ignored for Neo4j, which doesn't support datasets). Included for interface compatibility.

None

Raises:

Type Description
Exception

If the cleanup operation fails.

Source code in ontocast/tool/triple_manager/neo4j.py
async def clean(self, dataset: str | None = None) -> None:
    """Clean/flush all data from the Neo4j database.

    This method deletes all nodes and relationships from the Neo4j database,
    effectively clearing all stored data.

    Args:
        dataset: Optional dataset parameter (ignored for Neo4j, which doesn't
            support datasets). Included for interface compatibility.

    Warning: This operation is irreversible and will delete all data.

    Raises:
        Exception: If the cleanup operation fails.
    """
    if dataset is not None:
        logger.warning(
            f"Dataset parameter '{dataset}' ignored for Neo4j (datasets not supported)"
        )

    if self._driver is None:
        raise ValueError("Neo4j driver is not initialized")

    with self._driver.session() as session:
        try:
            session.run("MATCH (n) DETACH DELETE n")
            logger.info("Neo4j database cleaned (all nodes deleted)")
        except Exception as e:
            logger.error(f"Neo4j cleanup failed: {e}")
            raise

close()

Close the Neo4j driver connection.

This method should be called when the manager is no longer needed to properly close the database connection and free resources.

Source code in ontocast/tool/triple_manager/neo4j.py
def close(self):
    """Close the Neo4j driver connection.

    This method should be called when the manager is no longer needed
    to properly close the database connection and free resources.
    """
    if self._driver:
        self._driver.close()

fetch_ontologies()

Fetch ontologies from Neo4j with faithful RDF reconstruction.

This method retrieves all ontologies from Neo4j and reconstructs their RDF graphs faithfully. It uses a multi-step process:

  1. Identifies distinct ontologies by their namespace URIs
  2. Fetches all entities belonging to each ontology
  3. Reconstructs the RDF graph faithfully using stored triples when available
  4. Falls back to n10s property graph conversion when needed

Returns:

Type Description
list[Ontology]

list[Ontology]: List of all ontologies found in the database.

Example

ontologies = manager.fetch_ontologies() for onto in ontologies: ... print(f"Found ontology: {onto.iri}")

Source code in ontocast/tool/triple_manager/neo4j.py
def fetch_ontologies(self) -> list[Ontology]:
    """Fetch ontologies from Neo4j with faithful RDF reconstruction.

    This method retrieves all ontologies from Neo4j and reconstructs
    their RDF graphs faithfully. It uses a multi-step process:

    1. Identifies distinct ontologies by their namespace URIs
    2. Fetches all entities belonging to each ontology
    3. Reconstructs the RDF graph faithfully using stored triples when available
    4. Falls back to n10s property graph conversion when needed

    Returns:
        list[Ontology]: List of all ontologies found in the database.

    Example:
        >>> ontologies = manager.fetch_ontologies()
        >>> for onto in ontologies:
        ...     print(f"Found ontology: {onto.iri}")
    """
    ontologies = []

    # Type assertion: we know _driver is not None after initialization
    assert self._driver is not None
    with self._driver.session() as session:
        try:
            # First, try to get explicitly stored ontology metadata
            ontology_iris = self._fetch_ontology_iris(session)

            if ontology_iris:
                for ont_iri in ontology_iris:
                    ontology = self._reconstruct_ontology_from_metadata(
                        session, ont_iri
                    )
                    if ontology:
                        ontologies.append(ontology)

        except Exception as e:
            logger.error(f"Error in fetch_ontologies: {e}")

    logger.info(f"Successfully loaded {len(ontologies)} ontologies")
    return ontologies

serialize(o, **kwargs)

Serialize an Ontology or RDFGraph to Neo4j with both n10s and raw triple storage.

This method stores the given Ontology or RDFGraph in Neo4j using the n10s plugin for RDF import. The data is stored as RDF triples that can be faithfully reconstructed later.

Parameters:

Name Type Description Default
o Ontology | RDFGraph

Ontology or RDFGraph object to store.

required
**kwargs

Additional keyword arguments (not used by Neo4j implementation).

{}

Returns:

Name Type Description
Any bool | None

The result summary from n10s import operation.

Source code in ontocast/tool/triple_manager/neo4j.py
def serialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
    """Serialize an Ontology or RDFGraph to Neo4j with both n10s and raw triple storage.

    This method stores the given Ontology or RDFGraph in Neo4j using the n10s plugin
    for RDF import. The data is stored as RDF triples that can be faithfully
    reconstructed later.

    Args:
        o: Ontology or RDFGraph object to store.
        **kwargs: Additional keyword arguments (not used by Neo4j implementation).

    Returns:
        Any: The result summary from n10s import operation.
    """
    if isinstance(o, Ontology):
        graph = o.graph
    elif isinstance(o, RDFGraph):
        graph = o
    else:
        raise TypeError(f"unsupported obj of type {type(o)} received")

    return self.serialize_graph(graph)

serialize_graph(graph, **kwargs)

Serialize an RDF graph to Neo4j with both n10s and raw triple storage.

This method stores the given RDF graph in Neo4j using the n10s plugin for RDF import. The data is stored as RDF triples that can be faithfully reconstructed later.

Parameters:

Name Type Description Default
graph Graph

The RDF graph to store.

required
**kwargs

Additional parameters (not used by Neo4j implementation).

{}

Returns:

Name Type Description
Any bool | None

The result summary from n10s import operation.

Source code in ontocast/tool/triple_manager/neo4j.py
def serialize_graph(self, graph: Graph, **kwargs) -> bool | None:
    """Serialize an RDF graph to Neo4j with both n10s and raw triple storage.

    This method stores the given RDF graph in Neo4j using the n10s plugin
    for RDF import. The data is stored as RDF triples that can be faithfully
    reconstructed later.

    Args:
        graph: The RDF graph to store.
        **kwargs: Additional parameters (not used by Neo4j implementation).

    Returns:
        Any: The result summary from n10s import operation.
    """
    # Convert to RDFGraph if needed
    if not isinstance(graph, RDFGraph):
        rdf_graph = RDFGraph()
        for triple in graph:
            rdf_graph.add(triple)
        for prefix, namespace in graph.namespaces():
            rdf_graph.bind(prefix, namespace)
        graph = rdf_graph

    turtle_data = graph.serialize(format="turtle")

    # Type assertion: we know _driver is not None after initialization
    assert self._driver is not None
    with self._driver.session() as session:
        # Store via n10s for graph queries
        result = session.run(
            "CALL n10s.rdf.import.inline($ttl, 'Turtle')", ttl=turtle_data
        )
        summary = result.single()

    return summary

TripleStoreManager

Bases: Tool

Base class for managing RDF triple stores.

This class defines the interface for triple store management operations, including fetching and storing ontologies and their graphs. All concrete triple store implementations should inherit from this class.

This is an abstract base class that must be implemented by specific triple store backends (e.g., Neo4j, Fuseki, Filesystem).

Source code in ontocast/tool/triple_manager/core.py
class TripleStoreManager(Tool):
    """Base class for managing RDF triple stores.

    This class defines the interface for triple store management operations,
    including fetching and storing ontologies and their graphs. All concrete
    triple store implementations should inherit from this class.

    This is an abstract base class that must be implemented by specific
    triple store backends (e.g., Neo4j, Fuseki, Filesystem).
    """

    def __init__(self, **kwargs):
        """Initialize the triple store manager.

        Args:
            **kwargs: Additional keyword arguments passed to the parent class.
        """
        super().__init__(**kwargs)

    @abc.abstractmethod
    def fetch_ontologies(self) -> list[Ontology]:
        """Fetch all available ontologies from the triple store.

        This method should retrieve all ontologies stored in the triple store
        and return them as Ontology objects with their associated RDF graphs.

        Returns:
            list[Ontology]: List of available ontologies with their graphs.
        """
        return []

    @abc.abstractmethod
    def serialize_graph(self, graph: Graph, **kwargs) -> bool | None:
        """Store an RDF graph in the triple store.

        This method should store the given RDF graph in the triple store.
        The implementation may choose how to organize the storage (e.g., as named graphs,
        in specific collections, etc.).

        Args:
            graph: The RDF graph to store.
            **kwargs: Implementation-specific arguments (e.g., fname for filesystem, graph_uri for Fuseki).

        Returns:
            bool | None: Implementation-specific return value (bool for Fuseki, summary for Neo4j, None for Filesystem).
        """
        pass

    @abc.abstractmethod
    def serialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
        """Store an RDF graph in the triple store.

        This method should store the given RDF graph in the triple store.
        The implementation may choose how to organize the storage (e.g., as named graphs,
        in specific collections, etc.).

        Args:
            o: RDF graph or Ontology object to store.
            **kwargs: Implementation-specific arguments (e.g., graph_uri for Fuseki).

        Returns:
            bool | None: Implementation-specific return value (bool for Fuseki, summary for Neo4j, None for Filesystem).
        """
        pass

    @abc.abstractmethod
    async def clean(self, dataset: str | None = None) -> None:
        """Clean/flush data from the triple store.

        This method removes data from the triple store. For Fuseki, the optional
        dataset parameter allows cleaning a specific dataset, or all datasets if None.
        For Neo4j and Filesystem, the dataset parameter is ignored.

        Args:
            dataset: Optional dataset name to clean (Fuseki only). If None, cleans
                all data. For other stores, this parameter is ignored.

        Warning: This operation is irreversible and will delete all data.

        Raises:
            NotImplementedError: If the triple store doesn't support cleaning.
        """
        raise NotImplementedError("clean() method must be implemented by subclasses")

__init__(**kwargs)

Initialize the triple store manager.

Parameters:

Name Type Description Default
**kwargs

Additional keyword arguments passed to the parent class.

{}
Source code in ontocast/tool/triple_manager/core.py
def __init__(self, **kwargs):
    """Initialize the triple store manager.

    Args:
        **kwargs: Additional keyword arguments passed to the parent class.
    """
    super().__init__(**kwargs)

clean(dataset=None) abstractmethod async

Clean/flush data from the triple store.

This method removes data from the triple store. For Fuseki, the optional dataset parameter allows cleaning a specific dataset, or all datasets if None. For Neo4j and Filesystem, the dataset parameter is ignored.

Parameters:

Name Type Description Default
dataset str | None

Optional dataset name to clean (Fuseki only). If None, cleans all data. For other stores, this parameter is ignored.

None

Raises:

Type Description
NotImplementedError

If the triple store doesn't support cleaning.

Source code in ontocast/tool/triple_manager/core.py
@abc.abstractmethod
async def clean(self, dataset: str | None = None) -> None:
    """Clean/flush data from the triple store.

    This method removes data from the triple store. For Fuseki, the optional
    dataset parameter allows cleaning a specific dataset, or all datasets if None.
    For Neo4j and Filesystem, the dataset parameter is ignored.

    Args:
        dataset: Optional dataset name to clean (Fuseki only). If None, cleans
            all data. For other stores, this parameter is ignored.

    Warning: This operation is irreversible and will delete all data.

    Raises:
        NotImplementedError: If the triple store doesn't support cleaning.
    """
    raise NotImplementedError("clean() method must be implemented by subclasses")

fetch_ontologies() abstractmethod

Fetch all available ontologies from the triple store.

This method should retrieve all ontologies stored in the triple store and return them as Ontology objects with their associated RDF graphs.

Returns:

Type Description
list[Ontology]

list[Ontology]: List of available ontologies with their graphs.

Source code in ontocast/tool/triple_manager/core.py
@abc.abstractmethod
def fetch_ontologies(self) -> list[Ontology]:
    """Fetch all available ontologies from the triple store.

    This method should retrieve all ontologies stored in the triple store
    and return them as Ontology objects with their associated RDF graphs.

    Returns:
        list[Ontology]: List of available ontologies with their graphs.
    """
    return []

serialize(o, **kwargs) abstractmethod

Store an RDF graph in the triple store.

This method should store the given RDF graph in the triple store. The implementation may choose how to organize the storage (e.g., as named graphs, in specific collections, etc.).

Parameters:

Name Type Description Default
o Ontology | RDFGraph

RDF graph or Ontology object to store.

required
**kwargs

Implementation-specific arguments (e.g., graph_uri for Fuseki).

{}

Returns:

Type Description
bool | None

bool | None: Implementation-specific return value (bool for Fuseki, summary for Neo4j, None for Filesystem).

Source code in ontocast/tool/triple_manager/core.py
@abc.abstractmethod
def serialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
    """Store an RDF graph in the triple store.

    This method should store the given RDF graph in the triple store.
    The implementation may choose how to organize the storage (e.g., as named graphs,
    in specific collections, etc.).

    Args:
        o: RDF graph or Ontology object to store.
        **kwargs: Implementation-specific arguments (e.g., graph_uri for Fuseki).

    Returns:
        bool | None: Implementation-specific return value (bool for Fuseki, summary for Neo4j, None for Filesystem).
    """
    pass

serialize_graph(graph, **kwargs) abstractmethod

Store an RDF graph in the triple store.

This method should store the given RDF graph in the triple store. The implementation may choose how to organize the storage (e.g., as named graphs, in specific collections, etc.).

Parameters:

Name Type Description Default
graph Graph

The RDF graph to store.

required
**kwargs

Implementation-specific arguments (e.g., fname for filesystem, graph_uri for Fuseki).

{}

Returns:

Type Description
bool | None

bool | None: Implementation-specific return value (bool for Fuseki, summary for Neo4j, None for Filesystem).

Source code in ontocast/tool/triple_manager/core.py
@abc.abstractmethod
def serialize_graph(self, graph: Graph, **kwargs) -> bool | None:
    """Store an RDF graph in the triple store.

    This method should store the given RDF graph in the triple store.
    The implementation may choose how to organize the storage (e.g., as named graphs,
    in specific collections, etc.).

    Args:
        graph: The RDF graph to store.
        **kwargs: Implementation-specific arguments (e.g., fname for filesystem, graph_uri for Fuseki).

    Returns:
        bool | None: Implementation-specific return value (bool for Fuseki, summary for Neo4j, None for Filesystem).
    """
    pass