Skip to content

ontocast.tool

Tool package for OntoCast.

This package provides a collection of tools that support the OntoCast workflow, including document processing, ontology management, triple store operations, and LLM interactions.

The package includes: - LLMTool: Language model interaction and prompting - OntologyManager: Ontology loading and management - TripleStoreManager: Abstract interface for triple store operations - FusekiTripleStoreManager: Fuseki-specific triple store implementation (preferred) - Neo4jTripleStoreManager: Neo4j-specific triple store implementation - FilesystemTripleStoreManager: Filesystem-based triple store implementation - ConverterTool: Document format conversion utilities - ChunkerTool: Text chunking and segmentation

All tools inherit from the base Tool class and provide standardized interfaces for integration into the OntoCast workflow.

Example

from ontocast.tool import LLMTool, OntologyManager llm = LLMTool.create(provider="openai", model="gpt-4") om = OntologyManager()

AtomicToolBox

Small tool surface used by atomic render/critic paths.

Source code in ontocast/tool/atomic.py
class AtomicToolBox:
    """Small tool surface used by atomic render/critic paths."""

    def __init__(
        self,
        llm_provider: AtomicLLMProvider,
        search_provider: AtomicSearchProvider | None = None,
        web_search_config: WebSearchConfig | None = None,
        web_search_enabled: bool = False,
        web_search_top_k: int = 3,
        web_search_max_snippet_chars: int = 400,
        web_search_max_total_chars: int = 1800,
        web_search_for_ontology_render: bool = True,
        web_search_for_ontology_critic: bool = True,
        web_search_for_facts_render: bool = False,
        web_search_for_facts_critic: bool = False,
        web_search_planner_enabled: bool = True,
        web_search_planner_max_queries: int = 3,
        web_search_planner_min_query_chars: int = 12,
        web_search_planner_min_confidence: float = 0.35,
        web_search_reuse_evidence_across_attempt: bool = True,
        web_search_allowed_domains: tuple[str, ...] = (),
        web_search_blocked_domains: tuple[str, ...] = (),
        web_search_min_snippet_chars: int = 40,
    ):
        self.llm_provider = llm_provider
        self.search_provider = search_provider
        self.web_search_config = web_search_config

        if web_search_config is not None:
            self.web_search_enabled = web_search_config.enabled
            self.web_search_top_k = web_search_config.top_k
            self.web_search_max_snippet_chars = web_search_config.max_snippet_chars
            self.web_search_max_total_chars = web_search_config.max_total_chars
            self.web_search_for_ontology_render = (
                web_search_config.ontology_render_enabled
            )
            self.web_search_for_ontology_critic = (
                web_search_config.ontology_critic_enabled
            )
            self.web_search_for_facts_render = web_search_config.facts_render_enabled
            self.web_search_for_facts_critic = web_search_config.facts_critic_enabled
            self.web_search_planner_enabled = web_search_config.planner_enabled
            self.web_search_planner_max_queries = web_search_config.planner_max_queries
            self.web_search_planner_min_query_chars = (
                web_search_config.planner_min_query_chars
            )
            self.web_search_planner_min_confidence = (
                web_search_config.planner_min_confidence
            )
            self.web_search_reuse_evidence_across_attempt = (
                web_search_config.reuse_evidence_across_attempt
            )
            self.web_search_allowed_domains = {
                value.strip().lower()
                for value in web_search_config.allowed_domains
                if value.strip()
            }
            self.web_search_blocked_domains = {
                value.strip().lower()
                for value in web_search_config.blocked_domains
                if value.strip()
            }
            self.web_search_min_snippet_chars = web_search_config.min_snippet_chars
        else:
            self.web_search_enabled = web_search_enabled
            self.web_search_top_k = web_search_top_k
            self.web_search_max_snippet_chars = web_search_max_snippet_chars
            self.web_search_max_total_chars = web_search_max_total_chars
            self.web_search_for_ontology_render = web_search_for_ontology_render
            self.web_search_for_ontology_critic = web_search_for_ontology_critic
            self.web_search_for_facts_render = web_search_for_facts_render
            self.web_search_for_facts_critic = web_search_for_facts_critic
            self.web_search_planner_enabled = web_search_planner_enabled
            self.web_search_planner_max_queries = web_search_planner_max_queries
            self.web_search_planner_min_query_chars = web_search_planner_min_query_chars
            self.web_search_planner_min_confidence = web_search_planner_min_confidence
            self.web_search_reuse_evidence_across_attempt = (
                web_search_reuse_evidence_across_attempt
            )
            self.web_search_allowed_domains = {
                value.strip().lower()
                for value in web_search_allowed_domains
                if value.strip()
            }
            self.web_search_blocked_domains = {
                value.strip().lower()
                for value in web_search_blocked_domains
                if value.strip()
            }
            self.web_search_min_snippet_chars = web_search_min_snippet_chars

    async def get_llm_tool(self, budget_tracker) -> LLMTool:
        """Return a budget-aware LLM tool instance."""
        return await self.llm_provider.get_llm_tool(budget_tracker)

    async def search(
        self, query: str, max_results: int | None = None
    ) -> list[SearchHit]:
        """Run optional web search and return normalized hits."""
        if not self.web_search_enabled or self.search_provider is None:
            return []

        result_limit = max_results if max_results is not None else self.web_search_top_k
        return await self.search_provider.search(query=query, max_results=result_limit)

    def web_grounding_enabled_for_node(self, node: WorkflowNode) -> bool:
        """Return whether web grounding is enabled for a workflow node."""
        if not self.web_search_enabled:
            return False
        mapping = {
            WorkflowNode.TEXT_TO_ONTOLOGY: self.web_search_for_ontology_render,
            WorkflowNode.CRITICISE_ONTOLOGY: self.web_search_for_ontology_critic,
            WorkflowNode.TEXT_TO_FACTS: self.web_search_for_facts_render,
            WorkflowNode.CRITICISE_FACTS: self.web_search_for_facts_critic,
        }
        return mapping.get(node, False)

get_llm_tool(budget_tracker) async

Return a budget-aware LLM tool instance.

Source code in ontocast/tool/atomic.py
async def get_llm_tool(self, budget_tracker) -> LLMTool:
    """Return a budget-aware LLM tool instance."""
    return await self.llm_provider.get_llm_tool(budget_tracker)

search(query, max_results=None) async

Run optional web search and return normalized hits.

Source code in ontocast/tool/atomic.py
async def search(
    self, query: str, max_results: int | None = None
) -> list[SearchHit]:
    """Run optional web search and return normalized hits."""
    if not self.web_search_enabled or self.search_provider is None:
        return []

    result_limit = max_results if max_results is not None else self.web_search_top_k
    return await self.search_provider.search(query=query, max_results=result_limit)

web_grounding_enabled_for_node(node)

Return whether web grounding is enabled for a workflow node.

Source code in ontocast/tool/atomic.py
def web_grounding_enabled_for_node(self, node: WorkflowNode) -> bool:
    """Return whether web grounding is enabled for a workflow node."""
    if not self.web_search_enabled:
        return False
    mapping = {
        WorkflowNode.TEXT_TO_ONTOLOGY: self.web_search_for_ontology_render,
        WorkflowNode.CRITICISE_ONTOLOGY: self.web_search_for_ontology_critic,
        WorkflowNode.TEXT_TO_FACTS: self.web_search_for_facts_render,
        WorkflowNode.CRITICISE_FACTS: self.web_search_for_facts_critic,
    }
    return mapping.get(node, False)

ChunkerTool

Bases: Tool

Tool for semantic chunking of documents.

Falls back to naive chunking if sentence-transformers is not available. Includes caching to avoid re-chunking the same text with the same parameters.

Source code in ontocast/tool/chunk/chunker.py
class ChunkerTool(Tool):
    """Tool for semantic chunking of documents.

    Falls back to naive chunking if sentence-transformers is not available.
    Includes caching to avoid re-chunking the same text with the same parameters.
    """

    model: str = Field(
        default="sentence-transformers/paraphrase-multilingual-mpnet-base-v2",
        description="HuggingFace model name for embeddings",
    )
    config: ChunkConfig = Field(
        default_factory=ChunkConfig, description="Chunking configuration parameters"
    )
    chunking_mode: Literal["semantic", "naive"] = Field(
        default="semantic" if SEMANTIC_CHUNKING_AVAILABLE else "naive",
        description="Chunking mode: semantic (requires sentence-transformers) or naive (fallback)",
    )
    cache: Any = Field(default=None, exclude=True)

    def __init__(
        self,
        chunk_config: ChunkConfig | None = None,
        cache: Cacher | None = None,
        **kwargs,
    ):
        """Initialize the ChunkerTool.

        Args:
            chunk_config: Chunking configuration. If None, uses default ChunkConfig.
            cache: Optional shared Cacher instance. If None, creates a new one.
            **kwargs: Additional keyword arguments passed to the parent class.
        """
        super().__init__(**kwargs)
        self._model: Any | None = None
        self._model_lock = threading.Lock()  # Lock for thread-safe model initialization

        # Initialize cache - use shared cacher or create new one
        if cache is not None:
            self.cache = ToolCacher(cache, "chunker")
        else:
            # Fallback for backward compatibility
            shared_cache = Cacher()
            self.cache = ToolCacher(shared_cache, "chunker")

        # Override config if provided
        if chunk_config is not None:
            self.config = chunk_config

        # Override chunking mode if semantic chunking is not available
        if not SEMANTIC_CHUNKING_AVAILABLE and self.chunking_mode == "semantic":
            self.chunking_mode = "naive"
            logger.warning(
                "Semantic chunking not available (sentence-transformers not installed). "
                "Falling back to naive chunking."
            )

    def _init_model(self):
        """Initialize the embedding model in a thread-safe manner.

        Uses double-checked locking pattern to ensure the model is only
        initialized once, even when called concurrently from multiple threads.
        """
        # Fast path: if model already initialized, return immediately
        if self._model is not None:
            return

        # Acquire lock for thread-safe initialization
        with self._model_lock:
            # Double-check: another thread might have initialized it while we waited
            if self._model is None and SEMANTIC_CHUNKING_AVAILABLE:
                if embedding_model_cls is not None:
                    try:
                        self._model = embedding_model_cls(
                            model_name=self.model,
                            model_kwargs={
                                "device": "cuda"
                                if torch_module is not None
                                and torch_module.cuda.is_available()
                                else "cpu"
                            },
                            encode_kwargs={"normalize_embeddings": False},
                        )
                        logger.debug(f"Initialized embedding model: {self.model}")
                    except Exception as e:
                        logger.error(f"Failed to initialize embedding model: {e}")
                        # Set to a sentinel value to prevent repeated failed attempts
                        self._model = None

    def _naive_chunk(self, doc: str) -> list[str]:
        """Naive chunking fallback when semantic chunking is not available.

        Args:
            doc: The document text to chunk.

        Returns:
            List of text chunks.
        """
        # Split by paragraphs first (double newlines)
        paragraphs = re.split(r"\n\s*\n", doc.strip())

        chunks = []
        current_chunk = ""

        for paragraph in paragraphs:
            paragraph = paragraph.strip()
            if not paragraph:
                continue

            # If adding this paragraph would exceed max_size, start a new chunk
            if (
                current_chunk
                and len(current_chunk) + len(paragraph) + 2 > self.config.max_size
            ):
                if current_chunk:
                    chunks.append(current_chunk.strip())
                current_chunk = paragraph
            else:
                if current_chunk:
                    current_chunk += "\n\n" + paragraph
                else:
                    current_chunk = paragraph

            # If a single paragraph is too large, split it by sentences
            if len(current_chunk) > self.config.max_size:
                # Save the previous chunk if it exists
                if len(current_chunk) - len(paragraph) - 2 > 0:
                    prev_chunk = current_chunk[
                        : len(current_chunk) - len(paragraph) - 2
                    ].strip()
                    if prev_chunk:
                        chunks.append(prev_chunk)

                # Split the large paragraph by sentences
                sentences = re.split(r"(?<=[.!?])\s+", paragraph)
                temp_chunk = ""

                for sentence in sentences:
                    if len(temp_chunk) + len(sentence) + 1 > self.config.max_size:
                        if temp_chunk:
                            chunks.append(temp_chunk.strip())
                        temp_chunk = sentence
                    else:
                        if temp_chunk:
                            temp_chunk += " " + sentence
                        else:
                            temp_chunk = sentence

                current_chunk = temp_chunk

        # Add the last chunk
        if current_chunk:
            chunks.append(current_chunk.strip())

        # Filter out chunks that are too small
        chunks = [chunk for chunk in chunks if len(chunk) >= self.config.min_size]

        logger.info(f"Naive chunking produced {len(chunks)} chunks")
        return chunks

    def __call__(self, doc: str) -> list[str]:
        """Chunk the document using either semantic or naive chunking.

        Args:
            doc: The document text to chunk.

        Returns:
            List of text chunks.
        """
        # Prepare configuration for caching
        config_dict = {
            "model": self.model,
            "chunking_mode": self.chunking_mode,
            "max_size": self.config.max_size,
            "min_size": self.config.min_size,
            "breakpoint_threshold_type": self.config.breakpoint_threshold_type,
            "breakpoint_threshold_amount": self.config.breakpoint_threshold_amount,
        }

        # Check cache first
        cached_result = self.cache.get(doc, config=config_dict)
        if cached_result is not None:
            logger.debug("Cache hit for document chunking")
            return cached_result

        # Perform chunking
        if self.chunking_mode == "naive":
            result = self._naive_chunk(doc)
        else:
            # Semantic chunking (requires sentence-transformers)
            if not SEMANTIC_CHUNKING_AVAILABLE:
                logger.warning(
                    "Semantic chunking requested but not available. Falling back to naive chunking."
                )
                result = self._naive_chunk(doc)
            else:
                self._init_model()
                documents = [doc]

                if self._model is None:
                    logger.warning(
                        "Model not initialized. Falling back to naive chunking."
                    )
                    result = self._naive_chunk(doc)
                elif SemanticChunker is None:
                    logger.warning(
                        "SemanticChunker not available. Falling back to naive chunking."
                    )
                    result = self._naive_chunk(doc)
                else:
                    text_splitter = SemanticChunker(
                        embeddings=self._model,
                        chunk_config=self.config,
                        sentence_split_regex=SENTENCE_SPLIT_REGEX,
                    )

                    # SemanticChunker now handles max_size internally
                    result_docs = text_splitter.create_documents(documents)
                    result = [doc.page_content for doc in result_docs]

                    # Log chunk lengths for debugging
                    lens = [len(chunk) for chunk in result]
                    logger.info(
                        f"Semantic chunking produced {len(result)} chunks with lengths: {lens}"
                    )

        # Cache the result
        self.cache.set(doc, result, config=config_dict)
        logger.debug("Cached document chunking result")

        return result

__call__(doc)

Chunk the document using either semantic or naive chunking.

Parameters:

Name Type Description Default
doc str

The document text to chunk.

required

Returns:

Type Description
list[str]

List of text chunks.

Source code in ontocast/tool/chunk/chunker.py
def __call__(self, doc: str) -> list[str]:
    """Chunk the document using either semantic or naive chunking.

    Args:
        doc: The document text to chunk.

    Returns:
        List of text chunks.
    """
    # Prepare configuration for caching
    config_dict = {
        "model": self.model,
        "chunking_mode": self.chunking_mode,
        "max_size": self.config.max_size,
        "min_size": self.config.min_size,
        "breakpoint_threshold_type": self.config.breakpoint_threshold_type,
        "breakpoint_threshold_amount": self.config.breakpoint_threshold_amount,
    }

    # Check cache first
    cached_result = self.cache.get(doc, config=config_dict)
    if cached_result is not None:
        logger.debug("Cache hit for document chunking")
        return cached_result

    # Perform chunking
    if self.chunking_mode == "naive":
        result = self._naive_chunk(doc)
    else:
        # Semantic chunking (requires sentence-transformers)
        if not SEMANTIC_CHUNKING_AVAILABLE:
            logger.warning(
                "Semantic chunking requested but not available. Falling back to naive chunking."
            )
            result = self._naive_chunk(doc)
        else:
            self._init_model()
            documents = [doc]

            if self._model is None:
                logger.warning(
                    "Model not initialized. Falling back to naive chunking."
                )
                result = self._naive_chunk(doc)
            elif SemanticChunker is None:
                logger.warning(
                    "SemanticChunker not available. Falling back to naive chunking."
                )
                result = self._naive_chunk(doc)
            else:
                text_splitter = SemanticChunker(
                    embeddings=self._model,
                    chunk_config=self.config,
                    sentence_split_regex=SENTENCE_SPLIT_REGEX,
                )

                # SemanticChunker now handles max_size internally
                result_docs = text_splitter.create_documents(documents)
                result = [doc.page_content for doc in result_docs]

                # Log chunk lengths for debugging
                lens = [len(chunk) for chunk in result]
                logger.info(
                    f"Semantic chunking produced {len(result)} chunks with lengths: {lens}"
                )

    # Cache the result
    self.cache.set(doc, result, config=config_dict)
    logger.debug("Cached document chunking result")

    return result

__init__(chunk_config=None, cache=None, **kwargs)

Initialize the ChunkerTool.

Parameters:

Name Type Description Default
chunk_config ChunkConfig | None

Chunking configuration. If None, uses default ChunkConfig.

None
cache Cacher | None

Optional shared Cacher instance. If None, creates a new one.

None
**kwargs

Additional keyword arguments passed to the parent class.

{}
Source code in ontocast/tool/chunk/chunker.py
def __init__(
    self,
    chunk_config: ChunkConfig | None = None,
    cache: Cacher | None = None,
    **kwargs,
):
    """Initialize the ChunkerTool.

    Args:
        chunk_config: Chunking configuration. If None, uses default ChunkConfig.
        cache: Optional shared Cacher instance. If None, creates a new one.
        **kwargs: Additional keyword arguments passed to the parent class.
    """
    super().__init__(**kwargs)
    self._model: Any | None = None
    self._model_lock = threading.Lock()  # Lock for thread-safe model initialization

    # Initialize cache - use shared cacher or create new one
    if cache is not None:
        self.cache = ToolCacher(cache, "chunker")
    else:
        # Fallback for backward compatibility
        shared_cache = Cacher()
        self.cache = ToolCacher(shared_cache, "chunker")

    # Override config if provided
    if chunk_config is not None:
        self.config = chunk_config

    # Override chunking mode if semantic chunking is not available
    if not SEMANTIC_CHUNKING_AVAILABLE and self.chunking_mode == "semantic":
        self.chunking_mode = "naive"
        logger.warning(
            "Semantic chunking not available (sentence-transformers not installed). "
            "Falling back to naive chunking."
        )

ConverterTool

Bases: Tool

Tool for converting documents to structured data.

This class provides functionality for converting various document formats into structured data that can be processed by the OntoCast system. It includes caching to avoid re-converting the same documents.

Attributes:

Name Type Description
supported_extensions set[str]

Set of supported file extensions.

cache Any

Cacher instance for caching conversion results.

Source code in ontocast/tool/converter.py
class ConverterTool(Tool):
    """Tool for converting documents to structured data.

    This class provides functionality for converting various document formats
    into structured data that can be processed by the OntoCast system.
    It includes caching to avoid re-converting the same documents.

    Attributes:
        supported_extensions: Set of supported file extensions.
        cache: Cacher instance for caching conversion results.
    """

    supported_extensions: set[str] = Field(
        default={".pdf", ".ppt", ".pptx"},
        description="Set of supported file extensions",
    )
    cache: Any = Field(default=None, exclude=True)

    def __init__(
        self,
        cache: Cacher | None = None,
        **kwargs,
    ):
        """Initialize the converter tool.

        Args:
            cache: Optional shared Cacher instance. If None, creates a new one.
            **kwargs: Additional keyword arguments passed to the parent class.
        """
        super().__init__(**kwargs)
        self._converter = None
        self._converter_lock = threading.Lock()  # Lock for thread-safe converter access

        # Initialize cache - use shared cacher or create new one
        if cache is not None:
            self.cache = ToolCacher(cache, "converter")
        else:
            # Fallback for backward compatibility
            shared_cache = Cacher()
            self.cache = ToolCacher(shared_cache, "converter")

        try:
            document_converter_module = importlib.import_module(
                "docling.document_converter"
            )
            DocumentConverter = getattr(document_converter_module, "DocumentConverter")
            self._converter = DocumentConverter()
        except ImportError as e:
            logger.error(f"Could not import DocumentConverter: {e}")

    def __call__(self, file_input: Union[bytes, str, pathlib.Path]) -> dict[str, Any]:
        """Convert a document to structured data.

        Args:
            file_input: The input file as either bytes, string, or pathlib.Path.

        Returns:
            dict[str, Any]: The converted document data.
        """
        # For plain text input, no caching needed
        if isinstance(file_input, str):
            return {"text": file_input}

        # Prepare content for caching
        if isinstance(file_input, bytes):
            content_for_cache = file_input
        elif isinstance(file_input, pathlib.Path):
            content_for_cache = file_input.read_bytes()
        else:
            # Fallback for other types
            return {"text": str(file_input)}

        # Check cache first
        cached_result = self.cache.get(content_for_cache)
        if cached_result is not None:
            logger.debug("Cache hit for document conversion")
            return cached_result

        # Convert document (with thread-safe access to converter)
        with self._converter_lock:
            if isinstance(file_input, bytes):
                if self._converter is None:
                    raise ImportError("DocumentConverter not available")
                try:
                    base_models_module = importlib.import_module(
                        "docling.datamodel.base_models"
                    )
                    DocumentStream = getattr(base_models_module, "DocumentStream")
                    ds = DocumentStream(name="doc", stream=BytesIO(file_input))
                except ImportError:
                    raise ImportError(
                        f"Could not import DocumentConverter: {file_input}"
                    )
                result = self._converter.convert(ds)
                doc = result.document.export_to_markdown()
                converted_result = {"text": doc}
            elif isinstance(file_input, pathlib.Path):
                if self._converter is None:
                    raise ImportError(
                        f"Could not import DocumentConverter: {file_input}"
                    )
                result = self._converter.convert(file_input)
                doc = result.document.export_to_markdown()
                converted_result = {"text": doc}
            else:
                # Fallback for other types
                converted_result = {"text": str(file_input)}

        # Cache the result
        self.cache.set(content_for_cache, converted_result)
        logger.debug("Cached document conversion result")

        return converted_result

__call__(file_input)

Convert a document to structured data.

Parameters:

Name Type Description Default
file_input Union[bytes, str, Path]

The input file as either bytes, string, or pathlib.Path.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The converted document data.

Source code in ontocast/tool/converter.py
def __call__(self, file_input: Union[bytes, str, pathlib.Path]) -> dict[str, Any]:
    """Convert a document to structured data.

    Args:
        file_input: The input file as either bytes, string, or pathlib.Path.

    Returns:
        dict[str, Any]: The converted document data.
    """
    # For plain text input, no caching needed
    if isinstance(file_input, str):
        return {"text": file_input}

    # Prepare content for caching
    if isinstance(file_input, bytes):
        content_for_cache = file_input
    elif isinstance(file_input, pathlib.Path):
        content_for_cache = file_input.read_bytes()
    else:
        # Fallback for other types
        return {"text": str(file_input)}

    # Check cache first
    cached_result = self.cache.get(content_for_cache)
    if cached_result is not None:
        logger.debug("Cache hit for document conversion")
        return cached_result

    # Convert document (with thread-safe access to converter)
    with self._converter_lock:
        if isinstance(file_input, bytes):
            if self._converter is None:
                raise ImportError("DocumentConverter not available")
            try:
                base_models_module = importlib.import_module(
                    "docling.datamodel.base_models"
                )
                DocumentStream = getattr(base_models_module, "DocumentStream")
                ds = DocumentStream(name="doc", stream=BytesIO(file_input))
            except ImportError:
                raise ImportError(
                    f"Could not import DocumentConverter: {file_input}"
                )
            result = self._converter.convert(ds)
            doc = result.document.export_to_markdown()
            converted_result = {"text": doc}
        elif isinstance(file_input, pathlib.Path):
            if self._converter is None:
                raise ImportError(
                    f"Could not import DocumentConverter: {file_input}"
                )
            result = self._converter.convert(file_input)
            doc = result.document.export_to_markdown()
            converted_result = {"text": doc}
        else:
            # Fallback for other types
            converted_result = {"text": str(file_input)}

    # Cache the result
    self.cache.set(content_for_cache, converted_result)
    logger.debug("Cached document conversion result")

    return converted_result

__init__(cache=None, **kwargs)

Initialize the converter tool.

Parameters:

Name Type Description Default
cache Cacher | None

Optional shared Cacher instance. If None, creates a new one.

None
**kwargs

Additional keyword arguments passed to the parent class.

{}
Source code in ontocast/tool/converter.py
def __init__(
    self,
    cache: Cacher | None = None,
    **kwargs,
):
    """Initialize the converter tool.

    Args:
        cache: Optional shared Cacher instance. If None, creates a new one.
        **kwargs: Additional keyword arguments passed to the parent class.
    """
    super().__init__(**kwargs)
    self._converter = None
    self._converter_lock = threading.Lock()  # Lock for thread-safe converter access

    # Initialize cache - use shared cacher or create new one
    if cache is not None:
        self.cache = ToolCacher(cache, "converter")
    else:
        # Fallback for backward compatibility
        shared_cache = Cacher()
        self.cache = ToolCacher(shared_cache, "converter")

    try:
        document_converter_module = importlib.import_module(
            "docling.document_converter"
        )
        DocumentConverter = getattr(document_converter_module, "DocumentConverter")
        self._converter = DocumentConverter()
    except ImportError as e:
        logger.error(f"Could not import DocumentConverter: {e}")

EmbeddingBasedAggregator

Main aggregator using embedding-based entity disambiguation.

Pipeline stages: 1. Entity normalisation (with semantic context) 2. Parallel embedding 3. Similarity-based clustering 4. Representative selection (prefer ontology, then simplicity) 5. URI normalisation (PascalCase/camelCase under DEFAULT_IRI) 6. Graph rewriting

ContentUnit types are handled as follows: - facts: entities under base_iri are normalised. - ontology: all other entities are considered ontology entities and preserved.

Source code in ontocast/tool/agg/aggregate.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
class EmbeddingBasedAggregator:
    """Main aggregator using embedding-based entity disambiguation.

    Pipeline stages:
    1. Entity normalisation (with semantic context)
    2. Parallel embedding
    3. Similarity-based clustering
    4. Representative selection (prefer ontology, then simplicity)
    5. URI normalisation (PascalCase/camelCase under DEFAULT_IRI)
    6. Graph rewriting

    ContentUnit types are handled as follows:
    - ``facts``: entities under ``base_iri`` are normalised.
    - ``ontology``: all other entities are considered ontology entities and preserved.
    """

    def __init__(
        self,
        embedding_model: str = "paraphrase-multilingual-MiniLM-L12-v2",
        similarity_threshold: float = 0.80,
        candidate_similarity_threshold: float = 0.70,
        add_sameas_links: bool = True,
        base_iri: str = DEFAULT_IRI,
    ):
        """Initialise the embedding-based aggregator.

        Args:
            embedding_model: Name of sentence transformer model.
            similarity_threshold: Cosine similarity threshold for clustering (0-1).
            candidate_similarity_threshold: Lower cosine threshold used to
                generate permissive merge candidates before symbolic validation.
            add_sameas_links: Whether to add owl:sameAs for merged entities.
            base_iri: Base IRI for fact entity URIs (default: DEFAULT_IRI).
                Entities under this namespace are facts; everything else is
                treated as an ontology entity and left unchanged.
        """
        self.base_iri = base_iri
        self.candidate_similarity_threshold = candidate_similarity_threshold

        # Pipeline components
        self.normalizer = EntityNormalizer(facts_iri=self.base_iri)
        self.clusterer = EntityClusterer(
            embedding_model=embedding_model,
            similarity_threshold=similarity_threshold,
        )
        self.selector = ClusterRepresentativeSelector()
        self.uri_builder = URIBuilder(base_iri=self.base_iri)
        self.rewriter = GraphRewriter(
            add_sameas_links=add_sameas_links,
            blocked_sameas_namespaces=(self.base_iri,),
        )

    @staticmethod
    def _entity_in_namespace(entity: URIRef, namespace: URIRef | str | None) -> bool:
        """Return True when *entity* is under the provided namespace."""
        if namespace is None:
            return False
        return is_in_namespace(str(entity), str(namespace), context="auto")

    def _is_fact_entity_in_unit(self, entity: URIRef, unit: ContentUnit) -> bool:
        """Classify whether an entity should be treated as a fact in this unit.

        Facts are entities in either:
        - the configured base facts namespace (``base_iri``), or
        - the unit document namespace (``unit.doc_iri``).
        """
        return self._entity_in_namespace(
            entity, self.base_iri
        ) or self._entity_in_namespace(entity, unit.doc_iri)

    @staticmethod
    def _is_standard_ontology_entity(entity: URIRef) -> bool:
        """Return True for entities from built-in standard RDF vocabularies."""
        entity_str = str(entity)
        return any(entity_str.startswith(prefix) for prefix in _STANDARD_NAMESPACES)

    def _build_known_ontology_entities(
        self, ontology_graph: RDFGraph | None
    ) -> set[URIRef]:
        """Build a set of known ontology entities from ontology and std vocabularies."""
        known_entities: set[URIRef] = set()

        if ontology_graph is not None:
            for s, p, o in ontology_graph:
                if isinstance(s, URIRef):
                    known_entities.add(s)
                if isinstance(p, URIRef):
                    known_entities.add(p)
                if isinstance(o, URIRef):
                    known_entities.add(o)

        return known_entities

    @staticmethod
    def _tokenize(text: str) -> set[str]:
        return {token for token in text.split() if len(token) > 2}

    @staticmethod
    def _role_key(representation: EntityRepresentation) -> str:
        role = (
            representation.role
            if representation.role is not None
            else EntityRole.INSTANCE
        )
        return str(role)

    @staticmethod
    def _jaccard(left: set[str], right: set[str]) -> float:
        if not left and not right:
            return 1.0
        union = left | right
        return len(left & right) / len(union)

    @staticmethod
    def _instance_like_local_name(entity: URIRef) -> str | None:
        """Return normalized local name when URI ends with numeric suffix."""
        local_name = normalize_uri_local_name(entity).replace(" ", "")
        if not local_name:
            return None
        match = _INSTANCE_LOCAL_NAME_RE.match(local_name)
        if match is None:
            return None
        if len(match.group("stem")) < 3:
            return None
        return local_name

    def _are_roles_compatible(
        self,
        left: URIRef,
        right: URIRef,
        representations: dict[URIRef, EntityRepresentation],
    ) -> bool:
        left_rep = representations.get(left)
        right_rep = representations.get(right)
        if left_rep is None or right_rep is None:
            return False
        return self._role_key(left_rep) == self._role_key(right_rep)

    def _are_types_compatible(
        self,
        left: URIRef,
        right: URIRef,
        representations: dict[URIRef, EntityRepresentation],
    ) -> bool:
        left_rep = representations.get(left)
        right_rep = representations.get(right)
        if left_rep is None or right_rep is None:
            return False
        left_types = set(left_rep.types)
        right_types = set(right_rep.types)
        if not left_types or not right_types:
            return True
        return bool(left_types & right_types)

    def _are_lexical_aliases(
        self,
        left: URIRef,
        right: URIRef,
        representations: dict[URIRef, EntityRepresentation],
    ) -> bool:
        left_rep = representations.get(left)
        right_rep = representations.get(right)
        if left_rep is None or right_rep is None:
            return False
        if left_rep.normal_form == right_rep.normal_form:
            return True

        left_instance_name = self._instance_like_local_name(left)
        right_instance_name = self._instance_like_local_name(right)
        if (
            left_instance_name is not None
            and right_instance_name is not None
            and left_instance_name == right_instance_name
        ):
            return True

        left_label_tokens = {
            self.normalizer.normalize_string(label)
            for label in left_rep.labels + left_rep.alt_labels
            if label.strip()
        }
        right_label_tokens = {
            self.normalizer.normalize_string(label)
            for label in right_rep.labels + right_rep.alt_labels
            if label.strip()
        }
        if left_label_tokens & right_label_tokens:
            return True
        if left_label_tokens and right_label_tokens:
            max_label_overlap = 0.0
            for left_label in left_label_tokens:
                left_tokens = self._tokenize(left_label)
                for right_label in right_label_tokens:
                    right_tokens = self._tokenize(right_label)
                    overlap = self._jaccard(left_tokens, right_tokens)
                    max_label_overlap = max(max_label_overlap, overlap)
            if max_label_overlap >= 0.2:
                return True

        left_normalized = left_rep.normal_form.strip()
        right_normalized = right_rep.normal_form.strip()
        if left_normalized and right_normalized:
            if left_normalized != right_normalized and (
                left_normalized.startswith(f"{right_normalized} ")
                or right_normalized.startswith(f"{left_normalized} ")
            ):
                return False

        ratio = SequenceMatcher(
            None, left_rep.normal_form, right_rep.normal_form
        ).ratio()
        if ratio >= 0.90:
            return True

        left_tokens = self._tokenize(left_rep.normal_form)
        right_tokens = self._tokenize(right_rep.normal_form)
        if len(left_tokens) >= 2 and len(right_tokens) >= 2:
            if self._jaccard(left_tokens, right_tokens) >= 0.75:
                return True

        return False

    def _can_merge_as_identity(
        self,
        left: URIRef,
        right: URIRef,
        representations: dict[URIRef, EntityRepresentation],
        direct_relation_pairs: set[frozenset[URIRef]] | None = None,
    ) -> bool:
        if (
            direct_relation_pairs is not None
            and frozenset((left, right)) in direct_relation_pairs
        ):
            return False
        return (
            self._are_roles_compatible(left, right, representations)
            and self._are_types_compatible(left, right, representations)
            and self._are_lexical_aliases(left, right, representations)
        )

    def _cluster_entities_by_role(
        self, representations: dict[URIRef, EntityRepresentation]
    ) -> tuple[list[list[URIRef]], dict[URIRef, np.ndarray]]:
        grouped_entities: dict[str, dict[URIRef, EntityRepresentation]] = {}
        for entity, representation in representations.items():
            grouped_entities.setdefault(self._role_key(representation), {})[entity] = (
                representation
            )

        all_clusters: list[list[URIRef]] = []
        all_embeddings: dict[URIRef, np.ndarray] = {}
        original_threshold = self.clusterer.similarity_threshold
        self.clusterer.similarity_threshold = self.candidate_similarity_threshold
        try:
            for role_representations in grouped_entities.values():
                role_clusters, role_embeddings = self.clusterer.cluster_entities(
                    role_representations
                )
                all_clusters.extend(role_clusters)
                all_embeddings.update(role_embeddings)
        finally:
            self.clusterer.similarity_threshold = original_threshold
        return all_clusters, all_embeddings

    @staticmethod
    def _candidate_similarity(
        left: URIRef,
        right: URIRef,
        embeddings: dict[URIRef, np.ndarray],
    ) -> float | None:
        left_embedding = embeddings.get(left)
        right_embedding = embeddings.get(right)
        if left_embedding is None or right_embedding is None:
            return None

        denominator = float(
            np.linalg.norm(left_embedding) * np.linalg.norm(right_embedding)
        )
        if denominator == 0:
            return None
        return float(np.dot(left_embedding, right_embedding) / denominator)

    def _merge_validation_failures(
        self,
        left: URIRef,
        right: URIRef,
        representations: dict[URIRef, EntityRepresentation],
    ) -> list[str]:
        failures: list[str] = []
        if not self._are_roles_compatible(left, right, representations):
            failures.append("role")
        if not self._are_types_compatible(left, right, representations):
            failures.append("type")
        if not self._are_lexical_aliases(left, right, representations):
            failures.append("lexical")
        return failures

    def _build_identity_clusters(
        self,
        candidate_clusters: list[list[URIRef]],
        representations: dict[URIRef, EntityRepresentation],
        embeddings: dict[URIRef, np.ndarray],
        direct_relation_pairs: set[frozenset[URIRef]] | None = None,
    ) -> tuple[
        list[list[URIRef]], list[tuple[URIRef, URIRef, float | None, tuple[str, ...]]]
    ]:
        validated_clusters: list[list[URIRef]] = []
        rejected_merges: list[tuple[URIRef, URIRef, float | None, tuple[str, ...]]] = []

        for candidate_cluster in candidate_clusters:
            if len(candidate_cluster) <= 1:
                validated_clusters.append(candidate_cluster)
                continue

            parents: dict[URIRef, URIRef] = {
                entity: entity for entity in candidate_cluster
            }

            def find(entity: URIRef) -> URIRef:
                root = parents[entity]
                if root != entity:
                    parents[entity] = find(root)
                return parents[entity]

            def union(left: URIRef, right: URIRef) -> None:
                left_root = find(left)
                right_root = find(right)
                if left_root == right_root:
                    return
                if str(left_root) <= str(right_root):
                    parents[right_root] = left_root
                else:
                    parents[left_root] = right_root

            for left, right in combinations(candidate_cluster, 2):
                score = self._candidate_similarity(left, right, embeddings)
                if score is not None and score < self.candidate_similarity_threshold:
                    continue
                if self._can_merge_as_identity(
                    left,
                    right,
                    representations,
                    direct_relation_pairs=direct_relation_pairs,
                ):
                    union(left, right)
                    continue
                rejected_merges.append(
                    (
                        left,
                        right,
                        score,
                        tuple(
                            self._merge_validation_failures(
                                left, right, representations
                            )
                        ),
                    )
                )

            grouped: dict[URIRef, list[URIRef]] = {}
            for entity in candidate_cluster:
                grouped.setdefault(find(entity), []).append(entity)

            for group in grouped.values():
                sorted_group = cast(list[URIRef], sorted(group, key=str))
                validated_clusters.append(sorted_group)

        return validated_clusters, rejected_merges

    def _select_ontology_anchor_candidates(
        self,
        tentative_entities: list[URIRef],
        tentative_representations: dict[URIRef, EntityRepresentation],
        tentative_doc_iris: dict[URIRef, URIRef],
        ontology_graph: RDFGraph | None,
        known_ontology_entities: set[URIRef],
    ) -> dict[URIRef, URIRef]:
        """Pick ontology anchors and preserve the triggering document IRI."""
        if (
            ontology_graph is None
            or not tentative_entities
            or not known_ontology_entities
        ):
            return {}

        ontology_entities = [
            entity
            for entity in known_ontology_entities
            if not self._is_standard_ontology_entity(entity)
        ]
        if not ontology_entities:
            return {}

        ontology_graphs = {entity: ontology_graph for entity in ontology_entities}
        ontology_representations = self.normalizer.create_representations_batch(
            ontology_entities, ontology_graphs
        )

        token_index: dict[str, set[URIRef]] = {}
        for entity, representation in ontology_representations.items():
            for token in self._tokenize(representation.representation):
                token_index.setdefault(token, set()).add(entity)

        selected: dict[URIRef, URIRef] = {}
        for tentative_entity in tentative_entities:
            tentative_representation = tentative_representations.get(tentative_entity)
            if tentative_representation is None:
                continue
            tentative_doc_iri = tentative_doc_iris.get(tentative_entity)
            if tentative_doc_iri is None:
                continue
            tentative_tokens = self._tokenize(tentative_representation.representation)
            if not tentative_tokens:
                continue

            candidate_pool: set[URIRef] = set()
            for token in tentative_tokens:
                candidate_pool.update(token_index.get(token, set()))

            if not candidate_pool:
                continue

            scored: list[tuple[int, URIRef]] = []
            for candidate in candidate_pool:
                candidate_representation = ontology_representations.get(candidate)
                if candidate_representation is None:
                    continue
                candidate_tokens = self._tokenize(
                    candidate_representation.representation
                )
                overlap = len(tentative_tokens & candidate_tokens)
                if overlap >= 2:
                    scored.append((overlap, candidate))

            scored.sort(key=lambda item: (-item[0], str(item[1])))
            for _, candidate in scored[:3]:
                selected.setdefault(candidate, tentative_doc_iri)

        return selected

    def _classify_entity_for_unit(
        self,
        entity: URIRef,
        unit: ContentUnit,
        known_ontology_entities: set[URIRef],
    ) -> EntityClassification:
        """Classify an entity as fact, known ontology, or tentative ontology."""
        if unit.type == OutputType.ONTOLOGIES:
            return EntityClassification.KNOWN_ONTOLOGY

        if self._is_fact_entity_in_unit(entity, unit):
            return EntityClassification.FACT

        if entity in known_ontology_entities or self._is_standard_ontology_entity(
            entity
        ):
            return EntityClassification.KNOWN_ONTOLOGY

        return EntityClassification.TENTATIVE_ONTOLOGY

    @staticmethod
    def _classification_priority(classification: EntityClassification) -> int:
        """Return priority for multi-unit classification merging."""
        if classification == EntityClassification.KNOWN_ONTOLOGY:
            return 3
        if classification == EntityClassification.TENTATIVE_ONTOLOGY:
            return 2
        return 1

    @staticmethod
    def _merge_into_context_graph(target: RDFGraph, source: RDFGraph) -> None:
        """Merge source triples/namespaces into a per-entity context graph."""
        target += source

    def _register_entity(
        self,
        *,
        entity: URIRef,
        unit: ContentUnit,
        state: _EntityCollectionState,
    ) -> None:
        """Register one URI entity with merged context and stable classification."""
        state.entities.add(entity)
        state.source_entities.add(entity)
        if entity not in state.entity_graphs:
            state.entity_graphs[entity] = unit.graph.copy()
        else:
            self._merge_into_context_graph(state.entity_graphs[entity], unit.graph)
        state.entity_doc_iris.setdefault(entity, unit.doc_iri)
        current = state.entity_classification.get(entity, EntityClassification.FACT)
        candidate = self._classify_entity_for_unit(entity, unit, state.known_entities)
        state.entity_classification[entity] = (
            candidate
            if self._classification_priority(candidate)
            >= self._classification_priority(current)
            else current
        )

    @staticmethod
    def _register_direct_relation(
        state: _EntityCollectionState,
        subject: URIRef,
        obj: URIRef,
    ) -> None:
        """Record direct subject-object URI relation pair in collection state."""
        if subject == obj:
            return
        state.direct_relation_pairs.add(frozenset((subject, obj)))

    def _collect_all_entities(
        self,
        units: list[ContentUnit],
        known_ontology_entities: set[URIRef] | None = None,
    ) -> tuple[
        list[URIRef],
        set[URIRef],
        dict[URIRef, RDFGraph],
        dict[URIRef, URIRef],
        dict[URIRef, EntityClassification],
        set[frozenset[URIRef]],
    ]:
        """Collect all entities from all content unit graphs.

        Each entity is associated with the graph it was found in and the
        ``doc_iri`` of the :class:`ContentUnit` that produced it.  When an
        entity appears in several units the *last-seen* ``doc_iri`` wins (in
        practice most pipelines aggregate chunks of the same document, so all
        ``doc_iri`` values are identical).

        Args:
            units: List of content units to aggregate.

        Returns:
            Tuple of (
                entities,
                entity_to_graph,
                entity_to_doc_iri,
                entity_to_is_ontology,
            ).
        """
        state = _EntityCollectionState(known_entities=known_ontology_entities or set())

        for unit in units:
            if unit.graph is None:
                continue
            unit.graph.sanitize_prefixes_namespaces()
            # Keep collection in the same URI space that rewrite/merge consumes
            # (unit.graph). Using graph_absolute here causes mapping keys to miss
            # during rewrite, because unit.graph still contains the original terms.
            for s, p, o in unit.graph:
                if isinstance(s, URIRef) and isinstance(o, URIRef):
                    self._register_direct_relation(state=state, subject=s, obj=o)
                for term in (s, p, o):
                    if isinstance(term, URIRef):
                        self._register_entity(entity=term, unit=unit, state=state)

        return (
            list(state.entities),
            state.source_entities,
            state.entity_graphs,
            state.entity_doc_iris,
            state.entity_classification,
            state.direct_relation_pairs,
        )

    def aggregate_graphs(
        self,
        units: list[ContentUnit],
        ontology_graph: RDFGraph,
    ) -> RDFGraph:
        """Aggregate multiple content unit graphs with embedding-based disambiguation.

        Args:
            units: List of ContentUnits to aggregate.
            ontology_graph: Selected ontology graph used to distinguish
                known ontology entities from tentative ontology-like aliases.

        Returns:
            Merged RDF graph with provenance annotations.
        """
        logger.info(f"Starting aggregation with metadata for {len(units)} units")
        if ontology_graph is None:
            raise ValueError("ontology_graph must not be None for facts aggregation")

        if not units:
            return RDFGraph()

        # Steps 1-3: Collect, normalise, candidate clustering
        known_ontology_entities = self._build_known_ontology_entities(ontology_graph)
        (
            entities,
            source_entities,
            entity_graphs,
            entity_doc_iris,
            entity_classification,
            direct_relation_pairs,
        ) = self._collect_all_entities(units, known_ontology_entities)
        representations = self.normalizer.create_representations_batch(
            entities, entity_graphs
        )
        decisions: dict[URIRef, EntityDecision] = {
            entity: EntityDecision(
                classification=classification,
                identity_target=entity,
            )
            for entity, classification in entity_classification.items()
        }
        tentative_entities = [
            entity
            for entity, decision in decisions.items()
            if decision.classification == EntityClassification.TENTATIVE_ONTOLOGY
        ]
        anchor_candidates = self._select_ontology_anchor_candidates(
            tentative_entities=tentative_entities,
            tentative_representations=representations,
            tentative_doc_iris=entity_doc_iris,
            ontology_graph=ontology_graph,
            known_ontology_entities=known_ontology_entities,
        )
        if anchor_candidates:
            for ontology_entity, anchor_doc_iri in anchor_candidates.items():
                if ontology_entity in entity_graphs:
                    continue
                entities.append(ontology_entity)
                entity_graphs[ontology_entity] = ontology_graph
                entity_doc_iris[ontology_entity] = anchor_doc_iri
                entity_classification[ontology_entity] = (
                    EntityClassification.KNOWN_ONTOLOGY
                )
                decisions[ontology_entity] = EntityDecision(
                    classification=EntityClassification.KNOWN_ONTOLOGY,
                    identity_target=ontology_entity,
                )
                representations[ontology_entity] = (
                    self.normalizer.create_representation(
                        ontology_entity, ontology_graph
                    )
                )
        entity_is_known_ontology = {
            entity: decision.classification == EntityClassification.KNOWN_ONTOLOGY
            for entity, decision in decisions.items()
        }
        if logger.isEnabledFor(logging.INFO):
            known_count = sum(
                1 for is_known in entity_is_known_ontology.values() if is_known
            )
            fact_count = sum(
                1
                for decision in decisions.values()
                if decision.classification == EntityClassification.FACT
            )
            logger.info(
                "Aggregation entity classification stats: fact=%d known_ontology=%d "
                "tentative_ontology=%d",
                fact_count,
                known_count,
                len(tentative_entities),
            )

        candidate_clusters, embeddings = self._cluster_entities_by_role(representations)
        clusters, rejected_merges = self._build_identity_clusters(
            candidate_clusters=candidate_clusters,
            representations=representations,
            embeddings=embeddings,
            direct_relation_pairs=direct_relation_pairs,
        )
        if rejected_merges:
            logger.info(
                "Rejected %d candidate merges after symbolic validation",
                len(rejected_merges),
            )
            for left, right, score, failed_checks in rejected_merges:
                logger.debug(
                    "Rejected candidate merge: %s <-> %s (score=%s, failed=%s)",
                    left,
                    right,
                    f"{score:.3f}" if score is not None else "n/a",
                    ",".join(failed_checks) if failed_checks else "unknown",
                )

        # Step 4: Canonical identity mapping (no URI policy yet)
        identity_mapping = self.selector.create_mapping(
            clusters,
            representations,
            entity_is_known_ontology=entity_is_known_ontology,
        )

        # Keep known ontology entities stable. Tentative ontology-like entities are:
        # - mapped to known ontology representatives when present in a mixed cluster
        # - preserved as-is when only tentative entities are present
        suppress_sameas_origins: set[URIRef] = set()
        suppress_fact_subject_sources: set[URIRef] = set()
        for cluster in clusters:
            known_ontology_entities_in_cluster = [
                entity
                for entity in cluster
                if decisions.get(entity) is not None
                and decisions[entity].classification
                == EntityClassification.KNOWN_ONTOLOGY
            ]
            tentative_entities_in_cluster = [
                entity
                for entity in cluster
                if decisions.get(entity) is not None
                and decisions[entity].classification
                == EntityClassification.TENTATIVE_ONTOLOGY
            ]
            fact_entities_in_cluster = [
                entity
                for entity in cluster
                if decisions.get(entity) is not None
                and decisions[entity].classification == EntityClassification.FACT
            ]

            for entity in known_ontology_entities_in_cluster:
                identity_mapping[entity] = entity

            if known_ontology_entities_in_cluster:
                canonical_known_ontology = self.selector.select_representative(
                    known_ontology_entities_in_cluster,
                    representations,
                    entity_is_known_ontology=entity_is_known_ontology,
                )
                for tentative_entity in tentative_entities_in_cluster:
                    if self._can_merge_as_identity(
                        tentative_entity,
                        canonical_known_ontology,
                        representations,
                        direct_relation_pairs=direct_relation_pairs,
                    ):
                        identity_mapping[tentative_entity] = canonical_known_ontology
                        decisions[tentative_entity].suppress_sameas = True
                    else:
                        identity_mapping[tentative_entity] = tentative_entity
                for fact_entity in fact_entities_in_cluster:
                    if self._can_merge_as_identity(
                        fact_entity,
                        canonical_known_ontology,
                        representations,
                        direct_relation_pairs=direct_relation_pairs,
                    ):
                        identity_mapping[fact_entity] = canonical_known_ontology
                        decisions[fact_entity].suppress_sameas = True
                        decisions[fact_entity].suppress_fact_subject_assertions = True
                    else:
                        identity_mapping[fact_entity] = fact_entity

            elif tentative_entities_in_cluster:
                # In mixed FACT + TENTATIVE clusters with no known ontology
                # entity, prefer the FACT side when symbolic identity checks
                # agree (e.g. hallucinated ontology prefix on an instance).
                if fact_entities_in_cluster:
                    canonical_fact = self.selector.select_representative(
                        fact_entities_in_cluster,
                        representations,
                        entity_is_known_ontology=entity_is_known_ontology,
                    )
                    for fact_entity in fact_entities_in_cluster:
                        identity_mapping[fact_entity] = canonical_fact
                    for tentative_entity in tentative_entities_in_cluster:
                        if self._can_merge_as_identity(
                            tentative_entity,
                            canonical_fact,
                            representations,
                            direct_relation_pairs=direct_relation_pairs,
                        ):
                            identity_mapping[tentative_entity] = canonical_fact
                            decisions[tentative_entity].suppress_sameas = True
                        else:
                            identity_mapping[tentative_entity] = tentative_entity
                else:
                    for tentative_entity in tentative_entities_in_cluster:
                        identity_mapping[tentative_entity] = tentative_entity

        for entity, target in identity_mapping.items():
            if entity in decisions:
                decisions[entity].identity_target = target

        suppress_sameas_origins = {
            entity for entity, decision in decisions.items() if decision.suppress_sameas
        }
        suppress_fact_subject_sources = {
            entity
            for entity, decision in decisions.items()
            if decision.suppress_fact_subject_assertions
        }

        # Step 5: URI assignment from canonical identity + namespace policy
        final_mapping = self.uri_builder.create_entity_uri_mapping(
            identity_mapping=identity_mapping,
            representations=representations,
            entity_doc_iris=entity_doc_iris,
            entity_is_ontology={
                entity: (
                    decisions.get(entity) is not None
                    and decisions[entity].classification != EntityClassification.FACT
                )
                for entity in representations
            },
        )
        for entity, final_uri in final_mapping.items():
            if entity in decisions:
                decisions[entity].final_uri = final_uri
        known_ontology_entities_all = {
            entity
            for entity, decision in decisions.items()
            if decision.classification == EntityClassification.KNOWN_ONTOLOGY
        }
        assert all(
            identity_mapping.get(entity, entity) == entity
            for entity in known_ontology_entities_all
        ), "Known ontology entities must remain identity-mapped"
        assert not (known_ontology_entities_all & suppress_sameas_origins), (
            "Known ontology entities cannot be suppress_sameas origins"
        )
        assert not (known_ontology_entities_all & suppress_fact_subject_sources), (
            "Known ontology entities cannot be suppress_fact_subject origins"
        )
        assert all(entity in decisions for entity in source_entities), (
            "Every source entity must have a decision record"
        )
        final_mapping = {
            entity: mapped
            for entity, mapped in final_mapping.items()
            if entity in source_entities
        }

        # Step 7: Rewrite and merge with provenance
        active_units = [u for u in units if u.graph is not None]
        merged_graph = self.rewriter.merge_graphs_with_provenance(
            active_units,
            final_mapping,
            suppress_sameas_origins=suppress_sameas_origins,
            suppress_fact_subject_sources=suppress_fact_subject_sources,
        )

        logger.info("Aggregation with metadata complete")
        return merged_graph

    def postprocess_facts_units(
        self,
        units: list[ContentUnit],
        ontology_graph: RDFGraph,
    ) -> RDFGraph:
        """Sanitize facts units, then run aggregation/normalization.

        This method is intentionally safe for both single-unit and multi-unit
        inputs so unit-pipeline and graph-pipeline paths share the same
        post-processing behavior.
        """
        for unit in units:
            unit.sanitize()
        return self.aggregate_graphs(units=units, ontology_graph=ontology_graph)

__init__(embedding_model='paraphrase-multilingual-MiniLM-L12-v2', similarity_threshold=0.8, candidate_similarity_threshold=0.7, add_sameas_links=True, base_iri=DEFAULT_IRI)

Initialise the embedding-based aggregator.

Parameters:

Name Type Description Default
embedding_model str

Name of sentence transformer model.

'paraphrase-multilingual-MiniLM-L12-v2'
similarity_threshold float

Cosine similarity threshold for clustering (0-1).

0.8
candidate_similarity_threshold float

Lower cosine threshold used to generate permissive merge candidates before symbolic validation.

0.7
add_sameas_links bool

Whether to add owl:sameAs for merged entities.

True
base_iri str

Base IRI for fact entity URIs (default: DEFAULT_IRI). Entities under this namespace are facts; everything else is treated as an ontology entity and left unchanged.

DEFAULT_IRI
Source code in ontocast/tool/agg/aggregate.py
def __init__(
    self,
    embedding_model: str = "paraphrase-multilingual-MiniLM-L12-v2",
    similarity_threshold: float = 0.80,
    candidate_similarity_threshold: float = 0.70,
    add_sameas_links: bool = True,
    base_iri: str = DEFAULT_IRI,
):
    """Initialise the embedding-based aggregator.

    Args:
        embedding_model: Name of sentence transformer model.
        similarity_threshold: Cosine similarity threshold for clustering (0-1).
        candidate_similarity_threshold: Lower cosine threshold used to
            generate permissive merge candidates before symbolic validation.
        add_sameas_links: Whether to add owl:sameAs for merged entities.
        base_iri: Base IRI for fact entity URIs (default: DEFAULT_IRI).
            Entities under this namespace are facts; everything else is
            treated as an ontology entity and left unchanged.
    """
    self.base_iri = base_iri
    self.candidate_similarity_threshold = candidate_similarity_threshold

    # Pipeline components
    self.normalizer = EntityNormalizer(facts_iri=self.base_iri)
    self.clusterer = EntityClusterer(
        embedding_model=embedding_model,
        similarity_threshold=similarity_threshold,
    )
    self.selector = ClusterRepresentativeSelector()
    self.uri_builder = URIBuilder(base_iri=self.base_iri)
    self.rewriter = GraphRewriter(
        add_sameas_links=add_sameas_links,
        blocked_sameas_namespaces=(self.base_iri,),
    )

aggregate_graphs(units, ontology_graph)

Aggregate multiple content unit graphs with embedding-based disambiguation.

Parameters:

Name Type Description Default
units list[ContentUnit]

List of ContentUnits to aggregate.

required
ontology_graph RDFGraph

Selected ontology graph used to distinguish known ontology entities from tentative ontology-like aliases.

required

Returns:

Type Description
RDFGraph

Merged RDF graph with provenance annotations.

Source code in ontocast/tool/agg/aggregate.py
def aggregate_graphs(
    self,
    units: list[ContentUnit],
    ontology_graph: RDFGraph,
) -> RDFGraph:
    """Aggregate multiple content unit graphs with embedding-based disambiguation.

    Args:
        units: List of ContentUnits to aggregate.
        ontology_graph: Selected ontology graph used to distinguish
            known ontology entities from tentative ontology-like aliases.

    Returns:
        Merged RDF graph with provenance annotations.
    """
    logger.info(f"Starting aggregation with metadata for {len(units)} units")
    if ontology_graph is None:
        raise ValueError("ontology_graph must not be None for facts aggregation")

    if not units:
        return RDFGraph()

    # Steps 1-3: Collect, normalise, candidate clustering
    known_ontology_entities = self._build_known_ontology_entities(ontology_graph)
    (
        entities,
        source_entities,
        entity_graphs,
        entity_doc_iris,
        entity_classification,
        direct_relation_pairs,
    ) = self._collect_all_entities(units, known_ontology_entities)
    representations = self.normalizer.create_representations_batch(
        entities, entity_graphs
    )
    decisions: dict[URIRef, EntityDecision] = {
        entity: EntityDecision(
            classification=classification,
            identity_target=entity,
        )
        for entity, classification in entity_classification.items()
    }
    tentative_entities = [
        entity
        for entity, decision in decisions.items()
        if decision.classification == EntityClassification.TENTATIVE_ONTOLOGY
    ]
    anchor_candidates = self._select_ontology_anchor_candidates(
        tentative_entities=tentative_entities,
        tentative_representations=representations,
        tentative_doc_iris=entity_doc_iris,
        ontology_graph=ontology_graph,
        known_ontology_entities=known_ontology_entities,
    )
    if anchor_candidates:
        for ontology_entity, anchor_doc_iri in anchor_candidates.items():
            if ontology_entity in entity_graphs:
                continue
            entities.append(ontology_entity)
            entity_graphs[ontology_entity] = ontology_graph
            entity_doc_iris[ontology_entity] = anchor_doc_iri
            entity_classification[ontology_entity] = (
                EntityClassification.KNOWN_ONTOLOGY
            )
            decisions[ontology_entity] = EntityDecision(
                classification=EntityClassification.KNOWN_ONTOLOGY,
                identity_target=ontology_entity,
            )
            representations[ontology_entity] = (
                self.normalizer.create_representation(
                    ontology_entity, ontology_graph
                )
            )
    entity_is_known_ontology = {
        entity: decision.classification == EntityClassification.KNOWN_ONTOLOGY
        for entity, decision in decisions.items()
    }
    if logger.isEnabledFor(logging.INFO):
        known_count = sum(
            1 for is_known in entity_is_known_ontology.values() if is_known
        )
        fact_count = sum(
            1
            for decision in decisions.values()
            if decision.classification == EntityClassification.FACT
        )
        logger.info(
            "Aggregation entity classification stats: fact=%d known_ontology=%d "
            "tentative_ontology=%d",
            fact_count,
            known_count,
            len(tentative_entities),
        )

    candidate_clusters, embeddings = self._cluster_entities_by_role(representations)
    clusters, rejected_merges = self._build_identity_clusters(
        candidate_clusters=candidate_clusters,
        representations=representations,
        embeddings=embeddings,
        direct_relation_pairs=direct_relation_pairs,
    )
    if rejected_merges:
        logger.info(
            "Rejected %d candidate merges after symbolic validation",
            len(rejected_merges),
        )
        for left, right, score, failed_checks in rejected_merges:
            logger.debug(
                "Rejected candidate merge: %s <-> %s (score=%s, failed=%s)",
                left,
                right,
                f"{score:.3f}" if score is not None else "n/a",
                ",".join(failed_checks) if failed_checks else "unknown",
            )

    # Step 4: Canonical identity mapping (no URI policy yet)
    identity_mapping = self.selector.create_mapping(
        clusters,
        representations,
        entity_is_known_ontology=entity_is_known_ontology,
    )

    # Keep known ontology entities stable. Tentative ontology-like entities are:
    # - mapped to known ontology representatives when present in a mixed cluster
    # - preserved as-is when only tentative entities are present
    suppress_sameas_origins: set[URIRef] = set()
    suppress_fact_subject_sources: set[URIRef] = set()
    for cluster in clusters:
        known_ontology_entities_in_cluster = [
            entity
            for entity in cluster
            if decisions.get(entity) is not None
            and decisions[entity].classification
            == EntityClassification.KNOWN_ONTOLOGY
        ]
        tentative_entities_in_cluster = [
            entity
            for entity in cluster
            if decisions.get(entity) is not None
            and decisions[entity].classification
            == EntityClassification.TENTATIVE_ONTOLOGY
        ]
        fact_entities_in_cluster = [
            entity
            for entity in cluster
            if decisions.get(entity) is not None
            and decisions[entity].classification == EntityClassification.FACT
        ]

        for entity in known_ontology_entities_in_cluster:
            identity_mapping[entity] = entity

        if known_ontology_entities_in_cluster:
            canonical_known_ontology = self.selector.select_representative(
                known_ontology_entities_in_cluster,
                representations,
                entity_is_known_ontology=entity_is_known_ontology,
            )
            for tentative_entity in tentative_entities_in_cluster:
                if self._can_merge_as_identity(
                    tentative_entity,
                    canonical_known_ontology,
                    representations,
                    direct_relation_pairs=direct_relation_pairs,
                ):
                    identity_mapping[tentative_entity] = canonical_known_ontology
                    decisions[tentative_entity].suppress_sameas = True
                else:
                    identity_mapping[tentative_entity] = tentative_entity
            for fact_entity in fact_entities_in_cluster:
                if self._can_merge_as_identity(
                    fact_entity,
                    canonical_known_ontology,
                    representations,
                    direct_relation_pairs=direct_relation_pairs,
                ):
                    identity_mapping[fact_entity] = canonical_known_ontology
                    decisions[fact_entity].suppress_sameas = True
                    decisions[fact_entity].suppress_fact_subject_assertions = True
                else:
                    identity_mapping[fact_entity] = fact_entity

        elif tentative_entities_in_cluster:
            # In mixed FACT + TENTATIVE clusters with no known ontology
            # entity, prefer the FACT side when symbolic identity checks
            # agree (e.g. hallucinated ontology prefix on an instance).
            if fact_entities_in_cluster:
                canonical_fact = self.selector.select_representative(
                    fact_entities_in_cluster,
                    representations,
                    entity_is_known_ontology=entity_is_known_ontology,
                )
                for fact_entity in fact_entities_in_cluster:
                    identity_mapping[fact_entity] = canonical_fact
                for tentative_entity in tentative_entities_in_cluster:
                    if self._can_merge_as_identity(
                        tentative_entity,
                        canonical_fact,
                        representations,
                        direct_relation_pairs=direct_relation_pairs,
                    ):
                        identity_mapping[tentative_entity] = canonical_fact
                        decisions[tentative_entity].suppress_sameas = True
                    else:
                        identity_mapping[tentative_entity] = tentative_entity
            else:
                for tentative_entity in tentative_entities_in_cluster:
                    identity_mapping[tentative_entity] = tentative_entity

    for entity, target in identity_mapping.items():
        if entity in decisions:
            decisions[entity].identity_target = target

    suppress_sameas_origins = {
        entity for entity, decision in decisions.items() if decision.suppress_sameas
    }
    suppress_fact_subject_sources = {
        entity
        for entity, decision in decisions.items()
        if decision.suppress_fact_subject_assertions
    }

    # Step 5: URI assignment from canonical identity + namespace policy
    final_mapping = self.uri_builder.create_entity_uri_mapping(
        identity_mapping=identity_mapping,
        representations=representations,
        entity_doc_iris=entity_doc_iris,
        entity_is_ontology={
            entity: (
                decisions.get(entity) is not None
                and decisions[entity].classification != EntityClassification.FACT
            )
            for entity in representations
        },
    )
    for entity, final_uri in final_mapping.items():
        if entity in decisions:
            decisions[entity].final_uri = final_uri
    known_ontology_entities_all = {
        entity
        for entity, decision in decisions.items()
        if decision.classification == EntityClassification.KNOWN_ONTOLOGY
    }
    assert all(
        identity_mapping.get(entity, entity) == entity
        for entity in known_ontology_entities_all
    ), "Known ontology entities must remain identity-mapped"
    assert not (known_ontology_entities_all & suppress_sameas_origins), (
        "Known ontology entities cannot be suppress_sameas origins"
    )
    assert not (known_ontology_entities_all & suppress_fact_subject_sources), (
        "Known ontology entities cannot be suppress_fact_subject origins"
    )
    assert all(entity in decisions for entity in source_entities), (
        "Every source entity must have a decision record"
    )
    final_mapping = {
        entity: mapped
        for entity, mapped in final_mapping.items()
        if entity in source_entities
    }

    # Step 7: Rewrite and merge with provenance
    active_units = [u for u in units if u.graph is not None]
    merged_graph = self.rewriter.merge_graphs_with_provenance(
        active_units,
        final_mapping,
        suppress_sameas_origins=suppress_sameas_origins,
        suppress_fact_subject_sources=suppress_fact_subject_sources,
    )

    logger.info("Aggregation with metadata complete")
    return merged_graph

postprocess_facts_units(units, ontology_graph)

Sanitize facts units, then run aggregation/normalization.

This method is intentionally safe for both single-unit and multi-unit inputs so unit-pipeline and graph-pipeline paths share the same post-processing behavior.

Source code in ontocast/tool/agg/aggregate.py
def postprocess_facts_units(
    self,
    units: list[ContentUnit],
    ontology_graph: RDFGraph,
) -> RDFGraph:
    """Sanitize facts units, then run aggregation/normalization.

    This method is intentionally safe for both single-unit and multi-unit
    inputs so unit-pipeline and graph-pipeline paths share the same
    post-processing behavior.
    """
    for unit in units:
        unit.sanitize()
    return self.aggregate_graphs(units=units, ontology_graph=ontology_graph)

EmbeddingTool

Bases: Tool

Base embedding tool with provider-specific implementations.

Source code in ontocast/tool/vector_store/embedding.py
class EmbeddingTool(Tool):
    """Base embedding tool with provider-specific implementations."""

    config: EmbeddingConfig = Field(default_factory=EmbeddingConfig)

    @abc.abstractmethod
    def embed(self, texts: list[str]) -> list[list[float]]:
        """Return vectors for all given texts."""

    def embed_one(self, text: str) -> list[float]:
        """Return a vector for one text."""
        vectors = self.embed([text])
        if not vectors:
            raise ValueError("Embedding provider returned no vectors for query text")
        return vectors[0]

    @classmethod
    def create(cls, config: EmbeddingConfig) -> "EmbeddingTool":
        """Factory for provider-specific embedding tools."""
        if config.provider == EmbeddingProvider.HUGGINGFACE:
            return HuggingFaceEmbeddingTool(config=config)
        if config.provider == EmbeddingProvider.OPENAI:
            return OpenAIEmbeddingTool(config=config)
        if config.provider == EmbeddingProvider.OLLAMA:
            return OllamaEmbeddingTool(config=config)
        raise ValueError(f"Unsupported embedding provider: {config.provider}")

create(config) classmethod

Factory for provider-specific embedding tools.

Source code in ontocast/tool/vector_store/embedding.py
@classmethod
def create(cls, config: EmbeddingConfig) -> "EmbeddingTool":
    """Factory for provider-specific embedding tools."""
    if config.provider == EmbeddingProvider.HUGGINGFACE:
        return HuggingFaceEmbeddingTool(config=config)
    if config.provider == EmbeddingProvider.OPENAI:
        return OpenAIEmbeddingTool(config=config)
    if config.provider == EmbeddingProvider.OLLAMA:
        return OllamaEmbeddingTool(config=config)
    raise ValueError(f"Unsupported embedding provider: {config.provider}")

embed(texts) abstractmethod

Return vectors for all given texts.

Source code in ontocast/tool/vector_store/embedding.py
@abc.abstractmethod
def embed(self, texts: list[str]) -> list[list[float]]:
    """Return vectors for all given texts."""

embed_one(text)

Return a vector for one text.

Source code in ontocast/tool/vector_store/embedding.py
def embed_one(self, text: str) -> list[float]:
    """Return a vector for one text."""
    vectors = self.embed([text])
    if not vectors:
        raise ValueError("Embedding provider returned no vectors for query text")
    return vectors[0]

FilesystemTripleStoreManager

Bases: TripleStoreManager

Filesystem-based implementation of triple store management.

This class provides a concrete implementation of triple store management using the local filesystem for storage. It reads and writes ontologies and facts as Turtle (.ttl) files in specified directories.

The manager supports: - Loading ontologies from a dedicated ontology directory - Storing ontologies with versioned filenames - Storing facts with customizable filenames based on specifications - Error handling for file operations

Attributes:

Name Type Description
working_directory Path | None

Path to the working directory for storing data.

ontology_path Path | None

Optional path to the ontology directory for loading ontologies.

Source code in ontocast/tool/triple_manager/filesystem_manager.py
class FilesystemTripleStoreManager(TripleStoreManager):
    """Filesystem-based implementation of triple store management.

    This class provides a concrete implementation of triple store management
    using the local filesystem for storage. It reads and writes ontologies
    and facts as Turtle (.ttl) files in specified directories.

    The manager supports:
    - Loading ontologies from a dedicated ontology directory
    - Storing ontologies with versioned filenames
    - Storing facts with customizable filenames based on specifications
    - Error handling for file operations

    Attributes:
        working_directory: Path to the working directory for storing data.
        ontology_path: Optional path to the ontology directory for loading ontologies.
    """

    working_directory: pathlib.Path | None
    ontology_path: pathlib.Path | None

    def __init__(self, **kwargs):
        """Initialize the filesystem triple store manager.

        This method sets up the filesystem manager with the specified
        working and ontology directories.

        Args:
            **kwargs: Additional keyword arguments passed to the parent class.
                working_directory: Path to the working directory for storing data.
                ontology_path: Path to the ontology directory for loading ontologies.

        Example:
            >>> manager = FilesystemTripleStoreManager(
            ...     working_directory="/path/to/work",
            ...     ontology_path="/path/to/ontologies"
            ... )
        """
        super().__init__(**kwargs)

    def fetch_ontologies(self) -> list[Ontology]:
        """Fetch all available ontologies from the filesystem.

        This method scans the ontology directory for Turtle (.ttl) files
        and loads each one as an Ontology object. Files are processed
        in sorted order for consistent results.

        Returns:
            list[Ontology]: List of all ontologies found in the ontology directory.

        Example:
            >>> ontologies = manager.fetch_ontologies()
            >>> for onto in ontologies:
            ...     print(f"Loaded ontology: {onto.ontology_id}")
        """
        ontologies = []
        if self.ontology_path is not None:
            sorted_files = sorted(self.ontology_path.glob("*.ttl"))
            for fname in sorted_files:
                try:
                    ontology = Ontology.from_file(fname)
                    ontologies.append(ontology)
                    logger.debug(f"Successfully loaded ontology from {fname}")
                except Exception as e:
                    logger.error(f"Failed to load ontology {fname}: {str(e)}")
        return ontologies

    def serialize_graph(self, graph: Graph, **kwargs) -> bool | None:
        """Store an RDF graph in the filesystem.

        This method stores the given RDF graph as a Turtle file in the
        working directory. The filename is generated based on the graph_uri
        parameter or defaults to "current.ttl".

        Args:
            graph: The RDF graph to store.
            fname:  str

        Example:
            >>> graph = RDFGraph()
            >>> manager.serialize_graph(graph)
            # Creates: working_directory/current.ttl

            >>> manager.serialize_graph(graph, fname="facts_abc.ttl")
        """
        if self.working_directory is None:
            return

        fname: str = kwargs.pop("fname")
        dump_clean_graph: bool = kwargs.pop("dump_clean_graph", False)
        output_path = self.working_directory / fname
        self._write_turtle(graph, output_path)
        logger.info(f"Graph saved to {output_path}")

        if dump_clean_graph:
            clean_graph = self.strip_provenance(graph)
            clean_output_path = self.working_directory / self._with_suffix(
                fname, "clean"
            )
            self._write_turtle(
                clean_graph,
                clean_output_path,
                serialization_format="turtle",
            )
            logger.info(f"Graph without provenance saved to {clean_output_path}")

    @staticmethod
    def _with_suffix(fname: str, suffix: str) -> str:
        path = pathlib.Path(fname)
        return f"{path.stem}_{suffix}{path.suffix}"

    @staticmethod
    def _write_turtle(
        graph: Graph,
        output_path: pathlib.Path,
        *,
        serialization_format: str | None = None,
    ) -> None:
        # Use longturtle where available (non-oxigraph) for easier local inspection.
        if serialization_format is None:
            is_oxigraph = (
                isinstance(graph, RDFGraph)
                and type(graph.store).__name__ == "OxigraphStore"
            )
            serialization_format = "turtle" if is_oxigraph else "longturtle"
        graph.serialize(format=serialization_format, destination=output_path)

    def serialize(self, o: Ontology | RDFGraph, graph_uri: str | None = None):  # type: ignore[override]
        if isinstance(o, Ontology):
            graph = o.graph
            fname = f"ontology_{o.ontology_id}_{o.version}.ttl"
            dump_clean_graph = False
        elif isinstance(o, RDFGraph):
            graph = o
            if graph_uri:
                s = graph_uri.split("/")[-2:]
                s = "_".join([x for x in s if x])
                fname = f"facts_{s}.ttl"
            else:
                fname = "facts_default.ttl"
            dump_clean_graph = True
        else:
            raise TypeError(f"unsupported obj of type {type(o)} received")

        self.serialize_graph(
            graph=graph,
            fname=fname,
            dump_clean_graph=dump_clean_graph,
        )

    async def clean(self) -> None:
        """Clean/flush all data from the filesystem triple store.

        This method deletes all Turtle (.ttl) files from both the working
        directory and the ontology directory.

        Warning: This operation is irreversible and will delete all data.

        Raises:
            Exception: If the cleanup operation fails.
        """
        logger.warning("clean method not implemented for FilesystemTripleStoreManager")

__init__(**kwargs)

Initialize the filesystem triple store manager.

This method sets up the filesystem manager with the specified working and ontology directories.

Parameters:

Name Type Description Default
**kwargs

Additional keyword arguments passed to the parent class. working_directory: Path to the working directory for storing data. ontology_path: Path to the ontology directory for loading ontologies.

{}
Example

manager = FilesystemTripleStoreManager( ... working_directory="/path/to/work", ... ontology_path="/path/to/ontologies" ... )

Source code in ontocast/tool/triple_manager/filesystem_manager.py
def __init__(self, **kwargs):
    """Initialize the filesystem triple store manager.

    This method sets up the filesystem manager with the specified
    working and ontology directories.

    Args:
        **kwargs: Additional keyword arguments passed to the parent class.
            working_directory: Path to the working directory for storing data.
            ontology_path: Path to the ontology directory for loading ontologies.

    Example:
        >>> manager = FilesystemTripleStoreManager(
        ...     working_directory="/path/to/work",
        ...     ontology_path="/path/to/ontologies"
        ... )
    """
    super().__init__(**kwargs)

clean() async

Clean/flush all data from the filesystem triple store.

This method deletes all Turtle (.ttl) files from both the working directory and the ontology directory.

Warning: This operation is irreversible and will delete all data.

Raises:

Type Description
Exception

If the cleanup operation fails.

Source code in ontocast/tool/triple_manager/filesystem_manager.py
async def clean(self) -> None:
    """Clean/flush all data from the filesystem triple store.

    This method deletes all Turtle (.ttl) files from both the working
    directory and the ontology directory.

    Warning: This operation is irreversible and will delete all data.

    Raises:
        Exception: If the cleanup operation fails.
    """
    logger.warning("clean method not implemented for FilesystemTripleStoreManager")

fetch_ontologies()

Fetch all available ontologies from the filesystem.

This method scans the ontology directory for Turtle (.ttl) files and loads each one as an Ontology object. Files are processed in sorted order for consistent results.

Returns:

Type Description
list[Ontology]

list[Ontology]: List of all ontologies found in the ontology directory.

Example

ontologies = manager.fetch_ontologies() for onto in ontologies: ... print(f"Loaded ontology: {onto.ontology_id}")

Source code in ontocast/tool/triple_manager/filesystem_manager.py
def fetch_ontologies(self) -> list[Ontology]:
    """Fetch all available ontologies from the filesystem.

    This method scans the ontology directory for Turtle (.ttl) files
    and loads each one as an Ontology object. Files are processed
    in sorted order for consistent results.

    Returns:
        list[Ontology]: List of all ontologies found in the ontology directory.

    Example:
        >>> ontologies = manager.fetch_ontologies()
        >>> for onto in ontologies:
        ...     print(f"Loaded ontology: {onto.ontology_id}")
    """
    ontologies = []
    if self.ontology_path is not None:
        sorted_files = sorted(self.ontology_path.glob("*.ttl"))
        for fname in sorted_files:
            try:
                ontology = Ontology.from_file(fname)
                ontologies.append(ontology)
                logger.debug(f"Successfully loaded ontology from {fname}")
            except Exception as e:
                logger.error(f"Failed to load ontology {fname}: {str(e)}")
    return ontologies

serialize_graph(graph, **kwargs)

Store an RDF graph in the filesystem.

This method stores the given RDF graph as a Turtle file in the working directory. The filename is generated based on the graph_uri parameter or defaults to "current.ttl".

Parameters:

Name Type Description Default
graph Graph

The RDF graph to store.

required
fname

str

required
Example

graph = RDFGraph() manager.serialize_graph(graph)

Creates: working_directory/current.ttl

manager.serialize_graph(graph, fname="facts_abc.ttl")

Source code in ontocast/tool/triple_manager/filesystem_manager.py
def serialize_graph(self, graph: Graph, **kwargs) -> bool | None:
    """Store an RDF graph in the filesystem.

    This method stores the given RDF graph as a Turtle file in the
    working directory. The filename is generated based on the graph_uri
    parameter or defaults to "current.ttl".

    Args:
        graph: The RDF graph to store.
        fname:  str

    Example:
        >>> graph = RDFGraph()
        >>> manager.serialize_graph(graph)
        # Creates: working_directory/current.ttl

        >>> manager.serialize_graph(graph, fname="facts_abc.ttl")
    """
    if self.working_directory is None:
        return

    fname: str = kwargs.pop("fname")
    dump_clean_graph: bool = kwargs.pop("dump_clean_graph", False)
    output_path = self.working_directory / fname
    self._write_turtle(graph, output_path)
    logger.info(f"Graph saved to {output_path}")

    if dump_clean_graph:
        clean_graph = self.strip_provenance(graph)
        clean_output_path = self.working_directory / self._with_suffix(
            fname, "clean"
        )
        self._write_turtle(
            clean_graph,
            clean_output_path,
            serialization_format="turtle",
        )
        logger.info(f"Graph without provenance saved to {clean_output_path}")

FusekiTripleStoreManager

Bases: TripleStoreManagerWithAuth

Fuseki-based triple store manager.

This class provides a concrete implementation of triple store management using Apache Fuseki. It stores ontologies as named graphs using their URIs as graph names, and supports dataset creation and cleanup.

URI shape: uri must be the Fuseki HTTP server root (e.g. http://localhost:3032), not a dataset path or UI URL. Dataset names are dataset / ontologies_dataset; the client calls {uri}/{dataset_name}/sparql and similar. The UI route /#/dataset/dataset_name is only for the browser; paste the origin (and optional non-dataset path prefix) into FUSEKI_URI, and set FUSEKI_DATASET to dataset_name.

The manager uses Fuseki's REST API for all operations, including: - Dataset creation and management - Named graph operations for ontologies - SPARQL queries for ontology discovery - Graph-level data operations

Attributes:

Name Type Description
dataset str | None

Facts dataset name (first path segment in Fuseki HTTP API).

ontologies_dataset str

Ontologies dataset name.

Source code in ontocast/tool/triple_manager/fuseki.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
class FusekiTripleStoreManager(TripleStoreManagerWithAuth):
    """Fuseki-based triple store manager.

    This class provides a concrete implementation of triple store management
    using Apache Fuseki. It stores ontologies as named graphs using their
    URIs as graph names, and supports dataset creation and cleanup.

    **URI shape:** ``uri`` must be the Fuseki **HTTP server root** (e.g.
    ``http://localhost:3032``), not a dataset path or UI URL. Dataset names are
    ``dataset`` / ``ontologies_dataset``; the client calls
    ``{uri}/{dataset_name}/sparql`` and similar. The UI route
    ``/#/dataset/dataset_name`` is only for the browser; paste the origin (and
    optional non-dataset path prefix) into ``FUSEKI_URI``, and set
    ``FUSEKI_DATASET`` to ``dataset_name``.

    The manager uses Fuseki's REST API for all operations, including:
    - Dataset creation and management
    - Named graph operations for ontologies
    - SPARQL queries for ontology discovery
    - Graph-level data operations

    Attributes:
        dataset: Facts dataset name (first path segment in Fuseki HTTP API).
        ontologies_dataset: Ontologies dataset name.
    """

    dataset: str | None = Field(default=None, description="Fuseki dataset name")
    ontologies_dataset: str = Field(
        default=DEFAULT_ONTOLOGIES_DATASET,
        description="Fuseki dataset name for ontologies",
    )

    def __init__(
        self,
        uri=None,
        auth=None,
        dataset=None,
        ontologies_dataset=None,
        **kwargs,
    ):
        """Initialize the Fuseki triple store manager.

        This method sets up the connection to Fuseki and creates the dataset
        if it doesn't exist. The dataset is NOT cleaned on initialization.

        Args:
            uri: Fuseki HTTP service root (e.g. ``http://localhost:3030``), not
                ``.../dataset/name`` and not a ``#/dataset/...`` UI link.
            auth: Authentication tuple (username, password) or string in "user/password" format.
            dataset: Facts dataset name (Fuseki API path segment).
            ontologies_dataset: Ontologies dataset name (separate Fuseki dataset).
            **kwargs: Additional keyword arguments passed to the parent class.

        Example:
            >>> manager = FusekiTripleStoreManager(
            ...     uri="http://localhost:3030",
            ...     dataset="acme--demo--facts",
            ...     ontologies_dataset="acme--demo--ontologies",
            ... )
            >>> await manager.clean()
        """
        super().__init__(
            uri=uri, auth=auth, env_uri="FUSEKI_URI", env_auth="FUSEKI_AUTH", **kwargs
        )
        self.uri = normalize_fuseki_server_uri(self.uri)
        if dataset is None:
            self.dataset = DEFAULT_DATASET
        else:
            self.dataset = dataset
        self.ontologies_dataset = ontologies_dataset or DEFAULT_ONTOLOGIES_DATASET

        # Initialize httpx client for async operations (recreated per event loop;
        # httpx.AsyncClient is bound to the loop it was created on).
        self._client: httpx.AsyncClient | None = None
        self._client_loop: asyncio.AbstractEventLoop | None = None

    async def async_init(self) -> None:
        """Initialize configured Fuseki datasets explicitly.

        Constructors stay side-effect free so callers can resolve tenancy first
        and then create datasets for the final dataset names.
        """
        # Use a temporary client to keep initialization independent from any
        # loop-bound long-lived client state.
        async with httpx.AsyncClient(
            auth=self._prepare_auth(), timeout=30.0
        ) as temp_client:
            # Temporarily replace the client
            original_client = self._client
            self._client = temp_client
            try:
                await self._initialize_datasets()
            finally:
                # Restore original client
                self._client = original_client

    async def _initialize_datasets(self) -> None:
        """Create configured facts/ontologies datasets when missing."""
        await self.init_dataset(self.dataset)
        if self.ontologies_dataset != self.dataset:
            await self.init_dataset(self.ontologies_dataset)

    def _prepare_auth(self) -> httpx.BasicAuth | None:
        """Prepare httpx BasicAuth from self.auth.

        Returns:
            httpx.BasicAuth instance or None if no auth is configured.
        """
        if self.auth:
            if isinstance(self.auth, tuple):
                return httpx.BasicAuth(*self.auth)
            elif isinstance(self.auth, str) and "/" in self.auth:
                parts = self.auth.split("/", 1)
                if len(parts) == 2:
                    username, password = parts[0], parts[1]
                    return httpx.BasicAuth(username, password)
        return None

    async def _get_client(self) -> httpx.AsyncClient:
        """Get or create the httpx async client for the current running event loop."""
        loop = asyncio.get_running_loop()
        if self._client is not None and self._client_loop is loop:
            return self._client
        # Client from a prior asyncio.run() is bound to a closed loop; do not await
        # aclose() on it (that schedules callbacks on the dead loop).
        self._client = None
        self._client_loop = None
        auth = self._prepare_auth()
        self._client = httpx.AsyncClient(auth=auth, timeout=30.0)
        self._client_loop = loop
        return self._client

    async def close(self):
        """Close the httpx client."""
        if self._client is not None:
            await self._client.aclose()
            self._client = None
        self._client_loop = None

    def supports_tenancy_partition(self) -> bool:
        return True

    async def update_tenancy(
        self,
        tenant: str,
        project: str,
        *,
        sep: str = TENANCY_SEP,
    ) -> None:
        """Switch facts and ontologies Fuseki datasets for ``tenant`` / ``project``."""
        facts = tenant_project_facts_name(tenant, project, sep=sep)
        ontos = tenant_project_ontologies_name(tenant, project, sep=sep)
        self.dataset = facts
        self.ontologies_dataset = ontos
        await self.init_dataset(self.dataset)
        if self.ontologies_dataset != self.dataset:
            await self.init_dataset(self.ontologies_dataset)
        logger.info(
            "Fuseki tenancy set to tenant=%r project=%r (facts=%s ontologies=%s)",
            tenant,
            project,
            self.dataset,
            self.ontologies_dataset,
        )

    async def clean(self) -> None:
        """Clear the configured facts dataset and ontologies dataset (when distinct)."""
        assert self.dataset is not None, "Dataset should never be None"
        await self._clean_dataset_by_name(self.dataset)
        logger.info("Fuseki dataset '%s' cleaned (all data deleted)", self.dataset)

        if self.ontologies_dataset != self.dataset:
            await self._clean_dataset_by_name(self.ontologies_dataset)
            logger.info(
                "Fuseki ontologies dataset '%s' cleaned (all data deleted)",
                self.ontologies_dataset,
            )

    async def clean_tenancy(
        self,
        tenant: str,
        project: str,
        *,
        sep: str = TENANCY_SEP,
    ) -> None:
        """Flush facts and ontologies datasets for ``tenant`` / ``project`` (by derived names)."""
        facts = tenant_project_facts_name(tenant, project, sep=sep)
        ontos = tenant_project_ontologies_name(tenant, project, sep=sep)
        await self._clean_dataset_by_name(facts)
        if ontos != facts:
            await self._clean_dataset_by_name(ontos)
        logger.info(
            "Fuseki tenancy flush tenant=%r project=%r (facts=%s ontologies=%s)",
            tenant,
            project,
            facts,
            ontos,
        )

    async def _clean_dataset_by_name(self, dataset_name: str) -> None:
        """Clean a specific dataset by name.

        This is a helper method that performs the actual cleaning of a single dataset.
        It deletes all named graphs and clears the default graph.

        Uses a temporary client to avoid event loop cleanup issues when called
        from different async contexts.

        Args:
            dataset_name: Name of the dataset to clean.

        Raises:
            Exception: If the cleanup operation fails.
        """
        # Use a temporary client to avoid event loop cleanup issues
        async with httpx.AsyncClient(auth=self._prepare_auth(), timeout=30.0) as client:
            try:
                dataset_url = f"{self.uri}/{dataset_name}"
                sparql_update_url = f"{dataset_url}/update"
                sparql_url = f"{dataset_url}/sparql"

                # Delete all named graphs
                query = """
                SELECT DISTINCT ?g WHERE {
                  GRAPH ?g { ?s ?p ?o }
                }
                """
                response = await client.post(
                    sparql_url,
                    data={"query": query, "format": "application/sparql-results+json"},
                )

                if response.status_code == 200:
                    results = response.json()
                    tasks = []
                    for binding in results.get("results", {}).get("bindings", []):
                        graph_uri = binding["g"]["value"]
                        # Delete the named graph using SPARQL UPDATE
                        drop_query = f"DROP GRAPH <{graph_uri}>"
                        tasks.append(
                            client.post(
                                sparql_update_url,
                                data={"update": drop_query},
                            )
                        )

                    # Execute all deletions in parallel
                    delete_responses = await asyncio.gather(
                        *tasks, return_exceptions=True
                    )
                    for i, delete_response in enumerate(delete_responses):
                        graph_uri = results["results"]["bindings"][i]["g"]["value"]
                        if isinstance(delete_response, Exception):
                            logger.warning(
                                f"Failed to delete graph {graph_uri}: {delete_response}"
                            )
                        elif isinstance(delete_response, httpx.Response):
                            if delete_response.status_code in (200, 204):
                                logger.debug(f"Deleted named graph: {graph_uri}")
                            else:
                                logger.warning(
                                    f"Failed to delete graph {graph_uri}: {delete_response.status_code}"
                                )

                # Clear the default graph using SPARQL UPDATE
                clear_query = "CLEAR DEFAULT"
                clear_response = await client.post(
                    sparql_update_url,
                    data={"update": clear_query},
                )
                if clear_response.status_code in (200, 204):
                    logger.debug(f"Cleared default graph in dataset '{dataset_name}'")
                else:
                    logger.warning(
                        f"Failed to clear default graph in dataset '{dataset_name}': {clear_response.status_code}"
                    )
            except Exception as e:
                logger.error(f"Failed to clean dataset '{dataset_name}': {e}")
                raise

    async def init_dataset(self, dataset_name):
        """Initialize a Fuseki dataset.

        This method creates a new dataset in Fuseki if it doesn't already exist.
        It uses Fuseki's admin API to create the dataset with TDB2 storage.

        Uses a temporary client to avoid event loop cleanup issues when called
        from different async contexts.

        Args:
            dataset_name: Name of the dataset to create.

        Note:
            This method will not fail if the dataset already exists.
        """
        # Use a temporary client to avoid event loop cleanup issues
        async with httpx.AsyncClient(auth=self._prepare_auth(), timeout=30.0) as client:
            fuseki_admin_url = f"{self.uri}/$/datasets"

            payload = {"dbName": dataset_name, "dbType": "tdb2"}

            headers = {"Content-Type": "application/x-www-form-urlencoded"}

            response = await client.post(
                fuseki_admin_url, data=payload, headers=headers
            )

            if response.status_code == 200 or response.status_code == 201:
                logger.info(f"Fuseki dataset '{dataset_name}' created successfully.")
            elif response.status_code == 409:
                logger.info(
                    f"Fuseki status code: {response.status_code}; {response.text.strip()}"
                )
            else:
                logger.error(
                    f"Failed to create dataset {dataset_name}. Status code: {response.status_code}"
                )
                logger.error(f"Response: {response.text.strip()}")

    def _get_dataset_url(self):
        """Get the full URL for the dataset.

        Returns:
            str: The complete URL for the dataset endpoint.
        """
        return f"{self.uri}/{self.dataset}"

    def _get_ontologies_dataset_url(self):
        """Get the full URL for the ontologies dataset.

        Returns:
            str: The complete URL for the ontologies dataset endpoint.
        """
        return f"{self.uri}/{self.ontologies_dataset}"

    async def adrop_named_graph(
        self, graph_uri: str, *, use_ontologies_dataset: bool = True
    ) -> None:
        """Drop a single named graph in the ontologies or main dataset."""
        dataset_url = (
            self._get_ontologies_dataset_url()
            if use_ontologies_dataset
            else self._get_dataset_url()
        )
        update_url = f"{dataset_url}/update"
        drop_query = f"DROP GRAPH <{graph_uri}>"
        async with httpx.AsyncClient(auth=self._prepare_auth(), timeout=30.0) as client:
            response = await client.post(update_url, data={"update": drop_query})
            if response.status_code not in (200, 204):
                logger.warning(
                    "Fuseki DROP GRAPH failed for %s: %s %s",
                    graph_uri,
                    response.status_code,
                    response.text,
                )

    async def adrop_all_ontology_graphs_for_iri(self, ontology_iri: str) -> None:
        """Remove named graphs for ``ontology_iri`` (base and ``iri#...`` versioned)."""
        prefix = f"{ontology_iri}#"
        async with httpx.AsyncClient(auth=self._prepare_auth(), timeout=30.0) as client:
            sparql_url = f"{self._get_ontologies_dataset_url()}/sparql"
            list_query = """
            SELECT DISTINCT ?g WHERE {
              GRAPH ?g { ?s ?p ?o }
            }
            """
            response = await client.post(
                sparql_url,
                data={"query": list_query, "format": "application/sparql-results+json"},
            )
            if response.status_code != 200:
                logger.error(
                    "Failed to list graphs from Fuseki ontologies dataset: %s",
                    response.text,
                )
                return
            to_drop: list[str] = []
            for binding in response.json().get("results", {}).get("bindings", []):
                g = binding["g"]["value"]
                if g == ontology_iri or g.startswith(prefix):
                    to_drop.append(g)
            update_url = f"{self._get_ontologies_dataset_url()}/update"
            for graph_uri in to_drop:
                drop_query = f"DROP GRAPH <{graph_uri}>"
                dr = await client.post(update_url, data={"update": drop_query})
                if dr.status_code not in (200, 204):
                    logger.warning(
                        "Failed to drop graph %s: %s %s",
                        graph_uri,
                        dr.status_code,
                        dr.text,
                    )

    def fetch_ontologies(self) -> list[Ontology]:
        """Synchronous wrapper for fetch_ontologies.

        For async usage, use afetch_ontologies() instead.
        """
        # Use a temporary client for this operation to avoid event loop cleanup issues
        return asyncio.run(self._fetch_ontologies_with_cleanup())

    async def afetch_ontologies(self) -> list[Ontology]:
        """Async version of fetch_ontologies.

        This is the preferred method when running in an async context.
        """
        return await self._fetch_ontologies_async()

    async def _fetch_ontologies_with_cleanup(self) -> list[Ontology]:
        """Wrapper that ensures proper cleanup when using asyncio.run().

        This method creates a temporary client and ensures it's properly closed
        before returning, preventing "Event loop is closed" errors.
        """
        async with httpx.AsyncClient(
            auth=self._prepare_auth(), timeout=30.0
        ) as temp_client:
            # Temporarily replace the client
            original_client = self._client
            self._client = temp_client
            try:
                return await self._fetch_ontologies_async()
            finally:
                # Restore original client
                self._client = original_client

    async def _fetch_ontologies_async(self) -> list[Ontology]:
        """Fetch all ontologies from their corresponding named graphs.

        This method discovers all ontologies in the Fuseki ontologies dataset and
        fetches each one from its corresponding named graph. For versioned ontologies,
        it returns only the latest version for each unique ontology IRI.

        1. Discovery: List all named graphs (which may be versioned URIs)
        2. Fetching: Retrieve each ontology from its named graph (in parallel)
        3. Deduplication: For versioned ontologies, keep only the latest version

        Returns:
            list[Ontology]: List of the latest version of each ontology found.

        Example:
            >>> ontologies = await manager.fetch_ontologies()
            >>> for onto in ontologies:
            ...     print(f"Found ontology: {onto.iri} v{onto.version}")
        """
        client = await self._get_client()
        sparql_url = f"{self._get_ontologies_dataset_url()}/sparql"

        # Step 1: List all named graphs
        list_query = """
        SELECT DISTINCT ?g WHERE {
          GRAPH ?g { ?s ?p ?o }
        }
        """
        response = await client.post(
            sparql_url,
            data={"query": list_query, "format": "application/sparql-results+json"},
        )
        if response.status_code != 200:
            logger.error(f"Failed to list graphs from Fuseki: {response.text}")
            return []

        results = response.json()
        graph_uris = []
        for binding in results.get("results", {}).get("bindings", []):
            graph_uri = binding["g"]["value"]
            graph_uris.append(graph_uri)

        logger.debug(f"Found {len(graph_uris)} named graphs: {graph_uris}")

        # Step 2: Fetch each ontology from its corresponding named graph (in parallel)
        async def fetch_single_ontology(graph_uri: str) -> Ontology | None:
            """Fetch a single ontology from a graph URI."""
            try:
                graph = RDFGraph()
                # URL encode the graph URI to handle special characters like #
                encoded_graph_uri = quote(str(graph_uri), safe="/:")
                export_url = f"{self._get_ontologies_dataset_url()}/get?graph={encoded_graph_uri}"
                export_resp = await client.get(
                    export_url, headers={"Accept": "text/turtle"}
                )

                if export_resp.status_code == 200:
                    graph.parse(data=export_resp.text, format="turtle")

                    # Re-serialize deterministically to ensure consistent cache keys
                    # This sorts both namespaces and triples alphabetically
                    deterministic_turtle = deterministic_turtle_serialization(graph)

                    # Re-parse from deterministic serialization to ensure we have RDFGraph
                    deterministic_graph = RDFGraph()
                    deterministic_graph.parse(
                        data=deterministic_turtle, format="turtle"
                    )

                    # Copy namespace bindings from original graph
                    for prefix, namespace in graph.namespaces():
                        if prefix:
                            deterministic_graph.bind(prefix, namespace)

                    graph = deterministic_graph

                    # Find the ontology IRI in the graph
                    for onto_subj, _, obj in graph.triples(
                        (None, RDF.type, OWL.Ontology)
                    ):
                        onto_iri = str(onto_subj)
                        # Extract base IRI if graph_uri is versioned
                        # Handle both hash fragments (#19193944...) and semantic versions (#v1.2.3)
                        if "#" in graph_uri:
                            namespace, _ = split_namespace_local(graph_uri)
                            base_iri = graph_uri
                            if namespace is not None and namespace.endswith("#"):
                                base_iri = namespace[:-1]
                            # Use base IRI from graph_uri (named graph identifier)
                            # The graph content should have simplified IRI, but use graph_uri as source of truth
                            onto_iri = base_iri

                        ontology = Ontology(
                            graph=graph,
                            iri=onto_iri,
                        )
                        # Load properties from graph (will strip any hash fragments if present)
                        ontology.sync_properties_from_graph()
                        logger.debug(
                            f"Successfully loaded ontology: {onto_iri} version: {ontology.version}"
                        )
                        return ontology
                else:
                    logger.warning(
                        f"Failed to fetch graph {graph_uri}: {export_resp.status_code}"
                    )
            except Exception as e:
                logger.warning(f"Error fetching ontology from {graph_uri}: {e}")
            return None

        # Fetch all ontologies in parallel
        all_ontologies_results = await asyncio.gather(
            *[fetch_single_ontology(uri) for uri in graph_uris], return_exceptions=True
        )

        # Filter out None and exceptions
        all_ontologies = []
        for result in all_ontologies_results:
            if isinstance(result, Exception):
                logger.warning(f"Exception fetching ontology: {result}")
            elif result is not None:
                all_ontologies.append(result)

        # Step 3: Deduplicate and keep latest terminal versions
        ontology_dict = defaultdict(list)

        for onto in all_ontologies:
            ontology_dict[onto.iri].append(onto)

        # Build set of all parent hashes to identify terminal ontologies
        # A terminal ontology is one that is not a parent for any other ontology
        all_parent_hashes = set()

        for onto in all_ontologies:
            if onto.hash:
                # Collect all parent hashes
                for parent_hash in onto.parent_hashes:
                    all_parent_hashes.add(parent_hash)

        # For each unique IRI, select the latest terminal ontology
        ontologies = []

        for iri, versions in ontology_dict.items():
            if len(versions) == 1:
                ontologies.append(versions[0])
            else:
                # Multiple versions - find terminal ontologies (not parents)
                terminal_versions = [
                    v for v in versions if v.hash and v.hash not in all_parent_hashes
                ]

                if not terminal_versions:
                    # No terminal ontologies found - all are parents
                    # Fall back to non-terminal versions
                    logger.warning(
                        f"No terminal ontologies found for {iri}, "
                        f"using all versions for selection"
                    )
                    terminal_versions = versions

                # Select latest by created_at among terminal ontologies
                try:
                    versions_with_created = [
                        v for v in terminal_versions if v.created_at is not None
                    ]

                    if versions_with_created:
                        # Sort by created_at (most recent first)
                        versions_with_created.sort(
                            key=lambda x: x.created_at, reverse=True
                        )
                        selected = versions_with_created[0]
                        hash_str = (
                            f"{selected.hash[:16]}..." if selected.hash else "no hash"
                        )
                        logger.debug(
                            f"Selected terminal ontology for {iri} "
                            f"by created_at: {selected.created_at} "
                            f"(hash: {hash_str})"
                        )
                        ontologies.append(selected)
                    else:
                        # No created_at available - fall back to version-based sorting
                        versions_with_ver = [v for v in terminal_versions if v.version]
                        if versions_with_ver:
                            versions_with_ver.sort(
                                key=lambda x: str(x.version), reverse=False
                            )
                            selected = versions_with_ver[-1]
                            logger.debug(
                                f"Selected terminal ontology for {iri} "
                                f"by version: {selected.version} "
                                f"(no created_at available)"
                            )
                            ontologies.append(selected)
                        else:
                            # No version info either - use first terminal ontology
                            selected = terminal_versions[0]
                            logger.debug(
                                f"Selected first terminal ontology for {iri} "
                                f"(no created_at or version available)"
                            )
                            ontologies.append(selected)
                except Exception as e:
                    logger.warning(
                        f"Could not select terminal ontology for {iri}: {e}, "
                        f"using first version"
                    )
                    ontologies.append(terminal_versions[0])

        logger.info(
            f"Successfully loaded {len(ontologies)} unique ontologies from Fuseki "
        )
        return ontologies

    def serialize_graph(self, graph: Graph, **kwargs) -> bool | None:
        """Synchronous wrapper for serialize_graph.

        For async usage, use aserialize_graph() instead.
        """
        return asyncio.run(self._serialize_graph_with_cleanup(graph, **kwargs))

    async def aserialize_graph(self, graph: Graph, **kwargs) -> bool | None:
        """Async version of serialize_graph.

        This is the preferred method when running in an async context.
        """
        return await self._serialize_graph_async(graph, **kwargs)

    async def _serialize_graph_with_cleanup(
        self, graph: Graph, **kwargs
    ) -> bool | None:
        """Wrapper that ensures proper cleanup when using asyncio.run().

        This method creates a temporary client and ensures it's properly closed
        before returning, preventing "Event loop is closed" errors.
        """
        async with httpx.AsyncClient(
            auth=self._prepare_auth(), timeout=30.0
        ) as temp_client:
            # Temporarily replace the client
            original_client = self._client
            self._client = temp_client
            try:
                return await self._serialize_graph_async(graph, **kwargs)
            finally:
                # Restore original client
                self._client = original_client

    async def _serialize_graph_async(self, graph: Graph, **kwargs) -> bool | None:
        """Store an RDF graph as a named graph in a specific Fuseki dataset.

        This is a private helper method that handles the common logic for storing
        graphs in Fuseki datasets.

        Args:
            graph: The RDF graph to store.
            **kwargs: Additional parameters including graph_uri, dataset_url, default_graph_uri, log_prefix.

        Returns:
            bool: True if the graph was successfully stored, False otherwise.
        """
        client = await self._get_client()
        graph_uri = kwargs.get("graph_uri")
        dataset_url = kwargs.get("dataset_url")
        default_graph_uri = kwargs.get("default_graph_uri")
        log_prefix = kwargs.get("log_prefix")

        if isinstance(graph, RDFGraph):
            turtle_data = graph.serialize_canonical_turtle()
        else:
            rdf_graph = RDFGraph()
            for triple in graph:
                rdf_graph.add(triple)
            for prefix, namespace in graph.namespaces():
                rdf_graph.bind(prefix, namespace)
            turtle_data = rdf_graph.serialize_canonical_turtle()
        if graph_uri is None:
            graph_uri = default_graph_uri

        # URL encode the graph URI to handle special characters like #
        encoded_graph_uri = quote(str(graph_uri), safe="/:")
        url = f"{dataset_url}/data?graph={encoded_graph_uri}"
        headers = {"Content-Type": "text/turtle;charset=utf-8"}
        response = await client.put(url, headers=headers, content=turtle_data)
        if response.status_code in (200, 201, 204):
            logger.info(
                f"{log_prefix} graph {graph_uri} uploaded to Fuseki as named graph."
            )
            return True
        else:
            logger.error(
                f"Failed to upload {log_prefix.lower() if log_prefix else 'unknown'} graph {graph_uri}. Status code: {response.status_code}"
            )
            logger.error(f"Response: {response.text}")
            return False

    def serialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
        """Synchronous wrapper for serialize.

        For async usage, use aserialize() instead.
        """
        return asyncio.run(self._serialize_with_cleanup(o, **kwargs))

    async def aserialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
        """Async version of serialize.

        This is the preferred method when running in an async context.
        """
        return await self._serialize_async(o, **kwargs)

    async def _serialize_with_cleanup(
        self, o: Ontology | RDFGraph, **kwargs
    ) -> bool | None:
        """Wrapper that ensures proper cleanup when using asyncio.run().

        This method creates a temporary client and ensures it's properly closed
        before returning, preventing "Event loop is closed" errors.
        """
        async with httpx.AsyncClient(
            auth=self._prepare_auth(), timeout=30.0
        ) as temp_client:
            # Temporarily replace the client
            original_client = self._client
            self._client = temp_client
            try:
                return await self._serialize_async(o, **kwargs)
            finally:
                # Restore original client
                self._client = original_client

    async def _serialize_async(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
        """Store an RDF graph as a named graph in Fuseki.

        This method stores the given RDF graph as a named graph in Fuseki.
        The graph name is taken from the graph_uri parameter or defaults to
        "urn:data:default".

        Args:
            o: RDF graph or Ontology object.
            **kwargs: Additional parameters including graph_uri.

        Returns:
            bool: True if the graph was successfully stored, False otherwise.

        Example:
            >>> graph = RDFGraph()
            >>> success = await manager.serialize(graph)

            >>> success = await manager.serialize(graph, graph_uri="http://example.org/chunk1")
        """
        graph_uri = kwargs.get("graph_uri")

        if isinstance(o, Ontology):
            graph = o.graph
            # Use versioned IRI for storage to enable multiple versions to coexist
            graph_uri = o.versioned_iri
            default_graph_uri = "urn:ontology:default"
            log_prefix = "Ontology"
            # Use ontologies dataset for ontology storage
            dataset_url = self._get_ontologies_dataset_url()
        elif isinstance(o, RDFGraph):
            graph = o
            default_graph_uri = "urn:data:default"
            log_prefix = "Graph"
            # Use regular dataset for facts storage
            dataset_url = self._get_dataset_url()
        else:
            raise TypeError(f"unsupported obj of type {type(o)} received")

        return await self._serialize_graph_async(
            graph=graph,
            graph_uri=graph_uri,
            dataset_url=dataset_url,
            default_graph_uri=default_graph_uri,
            log_prefix=log_prefix,
        )

__init__(uri=None, auth=None, dataset=None, ontologies_dataset=None, **kwargs)

Initialize the Fuseki triple store manager.

This method sets up the connection to Fuseki and creates the dataset if it doesn't exist. The dataset is NOT cleaned on initialization.

Parameters:

Name Type Description Default
uri

Fuseki HTTP service root (e.g. http://localhost:3030), not .../dataset/name and not a #/dataset/... UI link.

None
auth

Authentication tuple (username, password) or string in "user/password" format.

None
dataset

Facts dataset name (Fuseki API path segment).

None
ontologies_dataset

Ontologies dataset name (separate Fuseki dataset).

None
**kwargs

Additional keyword arguments passed to the parent class.

{}
Example

manager = FusekiTripleStoreManager( ... uri="http://localhost:3030", ... dataset="acme--demo--facts", ... ontologies_dataset="acme--demo--ontologies", ... ) await manager.clean()

Source code in ontocast/tool/triple_manager/fuseki.py
def __init__(
    self,
    uri=None,
    auth=None,
    dataset=None,
    ontologies_dataset=None,
    **kwargs,
):
    """Initialize the Fuseki triple store manager.

    This method sets up the connection to Fuseki and creates the dataset
    if it doesn't exist. The dataset is NOT cleaned on initialization.

    Args:
        uri: Fuseki HTTP service root (e.g. ``http://localhost:3030``), not
            ``.../dataset/name`` and not a ``#/dataset/...`` UI link.
        auth: Authentication tuple (username, password) or string in "user/password" format.
        dataset: Facts dataset name (Fuseki API path segment).
        ontologies_dataset: Ontologies dataset name (separate Fuseki dataset).
        **kwargs: Additional keyword arguments passed to the parent class.

    Example:
        >>> manager = FusekiTripleStoreManager(
        ...     uri="http://localhost:3030",
        ...     dataset="acme--demo--facts",
        ...     ontologies_dataset="acme--demo--ontologies",
        ... )
        >>> await manager.clean()
    """
    super().__init__(
        uri=uri, auth=auth, env_uri="FUSEKI_URI", env_auth="FUSEKI_AUTH", **kwargs
    )
    self.uri = normalize_fuseki_server_uri(self.uri)
    if dataset is None:
        self.dataset = DEFAULT_DATASET
    else:
        self.dataset = dataset
    self.ontologies_dataset = ontologies_dataset or DEFAULT_ONTOLOGIES_DATASET

    # Initialize httpx client for async operations (recreated per event loop;
    # httpx.AsyncClient is bound to the loop it was created on).
    self._client: httpx.AsyncClient | None = None
    self._client_loop: asyncio.AbstractEventLoop | None = None

adrop_all_ontology_graphs_for_iri(ontology_iri) async

Remove named graphs for ontology_iri (base and iri#... versioned).

Source code in ontocast/tool/triple_manager/fuseki.py
async def adrop_all_ontology_graphs_for_iri(self, ontology_iri: str) -> None:
    """Remove named graphs for ``ontology_iri`` (base and ``iri#...`` versioned)."""
    prefix = f"{ontology_iri}#"
    async with httpx.AsyncClient(auth=self._prepare_auth(), timeout=30.0) as client:
        sparql_url = f"{self._get_ontologies_dataset_url()}/sparql"
        list_query = """
        SELECT DISTINCT ?g WHERE {
          GRAPH ?g { ?s ?p ?o }
        }
        """
        response = await client.post(
            sparql_url,
            data={"query": list_query, "format": "application/sparql-results+json"},
        )
        if response.status_code != 200:
            logger.error(
                "Failed to list graphs from Fuseki ontologies dataset: %s",
                response.text,
            )
            return
        to_drop: list[str] = []
        for binding in response.json().get("results", {}).get("bindings", []):
            g = binding["g"]["value"]
            if g == ontology_iri or g.startswith(prefix):
                to_drop.append(g)
        update_url = f"{self._get_ontologies_dataset_url()}/update"
        for graph_uri in to_drop:
            drop_query = f"DROP GRAPH <{graph_uri}>"
            dr = await client.post(update_url, data={"update": drop_query})
            if dr.status_code not in (200, 204):
                logger.warning(
                    "Failed to drop graph %s: %s %s",
                    graph_uri,
                    dr.status_code,
                    dr.text,
                )

adrop_named_graph(graph_uri, *, use_ontologies_dataset=True) async

Drop a single named graph in the ontologies or main dataset.

Source code in ontocast/tool/triple_manager/fuseki.py
async def adrop_named_graph(
    self, graph_uri: str, *, use_ontologies_dataset: bool = True
) -> None:
    """Drop a single named graph in the ontologies or main dataset."""
    dataset_url = (
        self._get_ontologies_dataset_url()
        if use_ontologies_dataset
        else self._get_dataset_url()
    )
    update_url = f"{dataset_url}/update"
    drop_query = f"DROP GRAPH <{graph_uri}>"
    async with httpx.AsyncClient(auth=self._prepare_auth(), timeout=30.0) as client:
        response = await client.post(update_url, data={"update": drop_query})
        if response.status_code not in (200, 204):
            logger.warning(
                "Fuseki DROP GRAPH failed for %s: %s %s",
                graph_uri,
                response.status_code,
                response.text,
            )

afetch_ontologies() async

Async version of fetch_ontologies.

This is the preferred method when running in an async context.

Source code in ontocast/tool/triple_manager/fuseki.py
async def afetch_ontologies(self) -> list[Ontology]:
    """Async version of fetch_ontologies.

    This is the preferred method when running in an async context.
    """
    return await self._fetch_ontologies_async()

aserialize(o, **kwargs) async

Async version of serialize.

This is the preferred method when running in an async context.

Source code in ontocast/tool/triple_manager/fuseki.py
async def aserialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
    """Async version of serialize.

    This is the preferred method when running in an async context.
    """
    return await self._serialize_async(o, **kwargs)

aserialize_graph(graph, **kwargs) async

Async version of serialize_graph.

This is the preferred method when running in an async context.

Source code in ontocast/tool/triple_manager/fuseki.py
async def aserialize_graph(self, graph: Graph, **kwargs) -> bool | None:
    """Async version of serialize_graph.

    This is the preferred method when running in an async context.
    """
    return await self._serialize_graph_async(graph, **kwargs)

async_init() async

Initialize configured Fuseki datasets explicitly.

Constructors stay side-effect free so callers can resolve tenancy first and then create datasets for the final dataset names.

Source code in ontocast/tool/triple_manager/fuseki.py
async def async_init(self) -> None:
    """Initialize configured Fuseki datasets explicitly.

    Constructors stay side-effect free so callers can resolve tenancy first
    and then create datasets for the final dataset names.
    """
    # Use a temporary client to keep initialization independent from any
    # loop-bound long-lived client state.
    async with httpx.AsyncClient(
        auth=self._prepare_auth(), timeout=30.0
    ) as temp_client:
        # Temporarily replace the client
        original_client = self._client
        self._client = temp_client
        try:
            await self._initialize_datasets()
        finally:
            # Restore original client
            self._client = original_client

clean() async

Clear the configured facts dataset and ontologies dataset (when distinct).

Source code in ontocast/tool/triple_manager/fuseki.py
async def clean(self) -> None:
    """Clear the configured facts dataset and ontologies dataset (when distinct)."""
    assert self.dataset is not None, "Dataset should never be None"
    await self._clean_dataset_by_name(self.dataset)
    logger.info("Fuseki dataset '%s' cleaned (all data deleted)", self.dataset)

    if self.ontologies_dataset != self.dataset:
        await self._clean_dataset_by_name(self.ontologies_dataset)
        logger.info(
            "Fuseki ontologies dataset '%s' cleaned (all data deleted)",
            self.ontologies_dataset,
        )

clean_tenancy(tenant, project, *, sep=TENANCY_SEP) async

Flush facts and ontologies datasets for tenant / project (by derived names).

Source code in ontocast/tool/triple_manager/fuseki.py
async def clean_tenancy(
    self,
    tenant: str,
    project: str,
    *,
    sep: str = TENANCY_SEP,
) -> None:
    """Flush facts and ontologies datasets for ``tenant`` / ``project`` (by derived names)."""
    facts = tenant_project_facts_name(tenant, project, sep=sep)
    ontos = tenant_project_ontologies_name(tenant, project, sep=sep)
    await self._clean_dataset_by_name(facts)
    if ontos != facts:
        await self._clean_dataset_by_name(ontos)
    logger.info(
        "Fuseki tenancy flush tenant=%r project=%r (facts=%s ontologies=%s)",
        tenant,
        project,
        facts,
        ontos,
    )

close() async

Close the httpx client.

Source code in ontocast/tool/triple_manager/fuseki.py
async def close(self):
    """Close the httpx client."""
    if self._client is not None:
        await self._client.aclose()
        self._client = None
    self._client_loop = None

fetch_ontologies()

Synchronous wrapper for fetch_ontologies.

For async usage, use afetch_ontologies() instead.

Source code in ontocast/tool/triple_manager/fuseki.py
def fetch_ontologies(self) -> list[Ontology]:
    """Synchronous wrapper for fetch_ontologies.

    For async usage, use afetch_ontologies() instead.
    """
    # Use a temporary client for this operation to avoid event loop cleanup issues
    return asyncio.run(self._fetch_ontologies_with_cleanup())

init_dataset(dataset_name) async

Initialize a Fuseki dataset.

This method creates a new dataset in Fuseki if it doesn't already exist. It uses Fuseki's admin API to create the dataset with TDB2 storage.

Uses a temporary client to avoid event loop cleanup issues when called from different async contexts.

Parameters:

Name Type Description Default
dataset_name

Name of the dataset to create.

required
Note

This method will not fail if the dataset already exists.

Source code in ontocast/tool/triple_manager/fuseki.py
async def init_dataset(self, dataset_name):
    """Initialize a Fuseki dataset.

    This method creates a new dataset in Fuseki if it doesn't already exist.
    It uses Fuseki's admin API to create the dataset with TDB2 storage.

    Uses a temporary client to avoid event loop cleanup issues when called
    from different async contexts.

    Args:
        dataset_name: Name of the dataset to create.

    Note:
        This method will not fail if the dataset already exists.
    """
    # Use a temporary client to avoid event loop cleanup issues
    async with httpx.AsyncClient(auth=self._prepare_auth(), timeout=30.0) as client:
        fuseki_admin_url = f"{self.uri}/$/datasets"

        payload = {"dbName": dataset_name, "dbType": "tdb2"}

        headers = {"Content-Type": "application/x-www-form-urlencoded"}

        response = await client.post(
            fuseki_admin_url, data=payload, headers=headers
        )

        if response.status_code == 200 or response.status_code == 201:
            logger.info(f"Fuseki dataset '{dataset_name}' created successfully.")
        elif response.status_code == 409:
            logger.info(
                f"Fuseki status code: {response.status_code}; {response.text.strip()}"
            )
        else:
            logger.error(
                f"Failed to create dataset {dataset_name}. Status code: {response.status_code}"
            )
            logger.error(f"Response: {response.text.strip()}")

serialize(o, **kwargs)

Synchronous wrapper for serialize.

For async usage, use aserialize() instead.

Source code in ontocast/tool/triple_manager/fuseki.py
def serialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
    """Synchronous wrapper for serialize.

    For async usage, use aserialize() instead.
    """
    return asyncio.run(self._serialize_with_cleanup(o, **kwargs))

serialize_graph(graph, **kwargs)

Synchronous wrapper for serialize_graph.

For async usage, use aserialize_graph() instead.

Source code in ontocast/tool/triple_manager/fuseki.py
def serialize_graph(self, graph: Graph, **kwargs) -> bool | None:
    """Synchronous wrapper for serialize_graph.

    For async usage, use aserialize_graph() instead.
    """
    return asyncio.run(self._serialize_graph_with_cleanup(graph, **kwargs))

update_tenancy(tenant, project, *, sep=TENANCY_SEP) async

Switch facts and ontologies Fuseki datasets for tenant / project.

Source code in ontocast/tool/triple_manager/fuseki.py
async def update_tenancy(
    self,
    tenant: str,
    project: str,
    *,
    sep: str = TENANCY_SEP,
) -> None:
    """Switch facts and ontologies Fuseki datasets for ``tenant`` / ``project``."""
    facts = tenant_project_facts_name(tenant, project, sep=sep)
    ontos = tenant_project_ontologies_name(tenant, project, sep=sep)
    self.dataset = facts
    self.ontologies_dataset = ontos
    await self.init_dataset(self.dataset)
    if self.ontologies_dataset != self.dataset:
        await self.init_dataset(self.ontologies_dataset)
    logger.info(
        "Fuseki tenancy set to tenant=%r project=%r (facts=%s ontologies=%s)",
        tenant,
        project,
        self.dataset,
        self.ontologies_dataset,
    )

LLMTool

Bases: Tool

Tool for interacting with language models.

This class provides a unified interface for working with different language model providers (OpenAI, Ollama, Anthropic, Google) through LangChain. It supports both synchronous and asynchronous operations.

Attributes:

Name Type Description
config LLMConfig

LLMConfig object containing all LLM settings.

cache Any

Cacher instance for caching LLM responses.

Source code in ontocast/tool/llm.py
class LLMTool(Tool):
    """Tool for interacting with language models.

    This class provides a unified interface for working with different language model
    providers (OpenAI, Ollama, Anthropic, Google) through LangChain. It supports both
    synchronous and
    asynchronous operations.

    Attributes:
        config: LLMConfig object containing all LLM settings.
        cache: Cacher instance for caching LLM responses.
    """

    config: LLMConfig = Field(default_factory=LLMConfig)
    cache: Any = Field(default=None, exclude=True)
    budget_tracker: Any = Field(default=None, exclude=True)

    def __init__(
        self,
        cache: Cacher | None = None,
        budget_tracker: Any = None,
        **kwargs,
    ):
        """Initialize the LLM tool.

        Args:
            cache: Optional shared Cacher instance. If None, creates a new one.
            budget_tracker: Optional budget tracker instance for usage statistics.
            **kwargs: Additional keyword arguments passed to the parent class.
        """
        super().__init__(**kwargs)
        self._llm = None
        self.budget_tracker = budget_tracker

        # Initialize cache - use shared cacher or create new one
        if cache is not None:
            self.cache = ToolCacher(cache, "llm")
        else:
            # Fallback for backward compatibility
            shared_cache = Cacher()
            self.cache = ToolCacher(shared_cache, "llm")

    @classmethod
    def create(
        cls,
        config: LLMConfig,
        cache: Cacher | None = None,
        budget_tracker: Any = None,
        **kwargs,
    ):
        """Create a new LLM tool instance synchronously.

        Args:
            config: LLMConfig object containing LLM settings.
            cache: Optional shared Cacher instance.
            budget_tracker: Optional budget tracker instance for usage statistics.
            **kwargs: Additional keyword arguments for initialization.

        Returns:
            LLMTool: A new instance of the LLM tool.
        """
        return asyncio.run(
            cls.acreate(
                config=config, cache=cache, budget_tracker=budget_tracker, **kwargs
            )
        )

    @classmethod
    async def acreate(
        cls,
        config: LLMConfig,
        cache: Cacher | None = None,
        budget_tracker: Any = None,
        **kwargs,
    ):
        """Create a new LLM tool instance asynchronously.

        Args:
            config: LLMConfig object containing LLM settings.
            cache: Optional shared Cacher instance.
            budget_tracker: Optional budget tracker instance for usage statistics.
            **kwargs: Additional keyword arguments for initialization.

        Returns:
            LLMTool: A new instance of the LLM tool.
        """
        # Create and initialize the instance with the config
        self = cls(config=config, cache=cache, budget_tracker=budget_tracker, **kwargs)
        await self.setup()
        return self

    async def setup(self):
        """Set up the language model based on the configured provider.

        Raises:
            ValueError: If the provider is not supported.
        """
        if self.config.provider == LLMProvider.OPENAI:
            if self.config.model_name.startswith("gpt-5"):
                self.config.temperature = 1.0
                logger.warning(
                    f"Setting temperature to {self.config.temperature} for gpt-5 class "
                    f"model {self.config.model_name}"
                )
            self._llm = ChatOpenAI(
                model=self.config.model_name,  # type: ignore
                temperature=self.config.temperature,
                base_url=self.config.base_url,  # type: ignore
                api_key=(
                    SecretStr(self.config.api_key) if self.config.api_key else None
                ),  # type: ignore
            )
        elif self.config.provider == LLMProvider.OLLAMA:
            self._llm = ChatOllama(
                model=self.config.model_name,
                base_url=self.config.base_url,
                temperature=self.config.temperature,
            )
        elif self.config.provider == LLMProvider.ANTHROPIC:
            anthropic_kwargs: dict[str, Any] = {
                "model": self.config.model_name,
                "temperature": self.config.temperature,
            }
            if self.config.api_key:
                anthropic_kwargs["anthropic_api_key"] = SecretStr(self.config.api_key)
            if self.config.base_url:
                anthropic_kwargs["anthropic_api_url"] = self.config.base_url
            self._llm = ChatAnthropic(**anthropic_kwargs)
        elif self.config.provider == LLMProvider.GOOGLE:
            self._llm = ChatGoogleGenerativeAI(
                model=self.config.model_name,
                temperature=self.config.temperature,
                google_api_key=self.config.api_key,
            )
        else:
            raise ValueError(f"Unsupported provider: {self.config.provider}")

    @track_llm_usage
    async def __call__(self, *args: Any, **kwds: Any) -> Any:
        """Call the language model directly (asynchronous).

        Args:
            *args: Positional arguments passed to the LLM.
            **kwds: Keyword arguments passed to the LLM.

        Returns:
            Any: The LLM's response.
        """
        # Extract prompt from args (first argument is typically the prompt)
        prompt = args[0] if args else ""

        # Prepare configuration for caching
        config_dict = {
            "provider": self.config.provider,
            "model_name": self.config.model_name,
            "temperature": self.config.temperature,
            "base_url": self.config.base_url,
        }

        # Check cache first
        cached_response = self.cache.get(prompt, config=config_dict, **kwds)

        if cached_response is not None:
            prompt_str = self._prompt_to_string(prompt)
            logger.debug(f"Cache hit for __call__: {prompt_str[:50]}...")
            # Return a mock BaseMessage object with the cached content
            content = cached_response["content"]
            content_str = content if isinstance(content, str) else str(content)
            return AIMessage(content=content_str)

        # Generate new response
        prompt_str = self._prompt_to_string(prompt)
        logger.debug(
            f"Cache miss, calling LLM for __call__, prompt size {len(prompt_str[:50])}..."
        )

        response = await self.llm.ainvoke(*args, **kwds)

        # Cache the response
        response_data = {
            "content": response.content,
            "prompt": self._prompt_to_string(prompt),
            "kwargs": kwds,
        }
        self.cache.set(prompt, response_data, config=config_dict, **kwds)

        return response

    @track_llm_usage
    async def acall(self, *args: Any, **kwds: Any) -> Any:
        """Call the language model directly (asynchronous).

        Args:
            *args: Positional arguments passed to the LLM.
            **kwds: Keyword arguments passed to the LLM.

        Returns:
            Any: The LLM's response.
        """
        # Extract prompt from args (first argument is typically the prompt)
        prompt = args[0] if args else ""

        # Prepare configuration for caching
        config_dict = {
            "provider": self.config.provider,
            "model_name": self.config.model_name,
            "temperature": self.config.temperature,
            "base_url": self.config.base_url,
        }

        # Check cache first
        cached_response = self.cache.get(prompt, config=config_dict, **kwds)

        if cached_response is not None:
            prompt_str = self._prompt_to_string(prompt)
            logger.debug(f"Cache hit for acall: {prompt_str[:50]}...")
            # Return a mock BaseMessage object with the cached content
            content = cached_response["content"]
            content_str = content if isinstance(content, str) else str(content)
            return AIMessage(content=content_str)

        # Generate new response
        prompt_str = self._prompt_to_string(prompt)
        logger.debug(f"Cache miss, calling LLM for acall: {prompt_str[:50]}...")

        response = await self.llm.ainvoke(*args, **kwds)

        # Cache the response
        response_data = {
            "content": response.content,
            "prompt": self._prompt_to_string(prompt),
            "kwargs": kwds,
        }
        self.cache.set(prompt, response_data, config=config_dict, **kwds)

        return response

    @property
    def llm(self) -> BaseChatModel:
        """Get the underlying language model instance.

        Returns:
            BaseChatModel: The configured language model.

        Raises:
            RuntimeError: If the LLM has not been properly initialized.
        """
        if self._llm is None:
            raise RuntimeError(
                "LLM resource not properly initialized. Call setup() first."
            )
        return self._llm

    def _prompt_to_string(self, prompt) -> str:
        """Convert various prompt types to string for caching.

        Args:
            prompt: The prompt object (string, StringPromptValue, etc.)

        Returns:
            str: String representation of the prompt.
        """
        if isinstance(prompt, str):
            return prompt
        to_string = getattr(prompt, "to_string", None)
        if callable(to_string):
            return str(to_string())
        text_attr = getattr(prompt, "text", None)
        if isinstance(text_attr, str):
            return text_attr
        content_attr = getattr(prompt, "content", None)
        if content_attr is not None:
            return str(content_attr)
        return str(prompt)

    @track_llm_usage
    async def complete(self, prompt: str, **kwargs) -> Any:
        """Generate a completion for the given prompt.

        Args:
            prompt: The input prompt for generation.
            **kwargs: Additional keyword arguments for generation.

        Returns:
            Any: The generated completion.
        """
        # Prepare configuration for caching
        config_dict = {
            "provider": self.config.provider,
            "model_name": self.config.model_name,
            "temperature": self.config.temperature,
            "base_url": self.config.base_url,
        }

        # Check cache first
        cached_response = self.cache.get(prompt, config=config_dict, **kwargs)

        if cached_response is not None:
            logger.debug(f"Cache hit for prompt: {prompt[:50]}...")
            content = cached_response["content"]
            return content if isinstance(content, str) else str(content)

        # Generate new response
        logger.debug(f"Cache miss, calling LLM for prompt: {prompt[:50]}...")

        response = await self.llm.ainvoke(prompt, **kwargs)

        # Cache the response
        response_data = {
            "content": response.content,
            "prompt": self._prompt_to_string(prompt),
            "kwargs": kwargs,
        }
        self.cache.set(prompt, response_data, config=config_dict, **kwargs)

        return response.content

    @track_llm_usage
    async def extract(self, prompt: str, output_schema: Type[T], **kwargs) -> T:
        """Extract structured data from the prompt according to a schema.

        Args:
            prompt: The input prompt for extraction.
            output_schema: The Pydantic model class defining the output structure.
            **kwargs: Additional keyword arguments for extraction.

        Returns:
            T: The extracted data conforming to the output schema.
        """
        parser = PydanticOutputParser(pydantic_object=output_schema)
        format_instructions = parser.get_format_instructions()

        full_prompt = f"{prompt}\n\n{format_instructions}"

        # Prepare configuration for caching
        config_dict = {
            "provider": self.config.provider,
            "model_name": self.config.model_name,
            "temperature": self.config.temperature,
            "base_url": self.config.base_url,
            "output_schema": output_schema.__name__,
        }

        # Check cache first
        cached_response = self.cache.get(full_prompt, config=config_dict, **kwargs)

        if cached_response is not None:
            logger.debug(f"Cache hit for extraction: {prompt[:50]}...")
            # Parse the cached content
            content = cached_response["content"]
            if isinstance(content, str):
                return parser.parse(content)
            else:
                # Fallback: convert to string if it's not already
                return parser.parse(str(content))

        # Generate new response
        logger.debug(f"Cache miss, calling LLM for extraction: {prompt[:50]}...")

        response = await self.llm.ainvoke(full_prompt, **kwargs)

        # Cache the response
        response_data = {
            "content": response.content,
            "prompt": self._prompt_to_string(full_prompt),
            "output_schema": output_schema.__name__,
            "kwargs": kwargs,
        }
        self.cache.set(full_prompt, response_data, config=config_dict, **kwargs)

        content = response.content
        return parser.parse(content if isinstance(content, str) else str(content))

llm property

Get the underlying language model instance.

Returns:

Name Type Description
BaseChatModel BaseChatModel

The configured language model.

Raises:

Type Description
RuntimeError

If the LLM has not been properly initialized.

__call__(*args, **kwds) async

Call the language model directly (asynchronous).

Parameters:

Name Type Description Default
*args Any

Positional arguments passed to the LLM.

()
**kwds Any

Keyword arguments passed to the LLM.

{}

Returns:

Name Type Description
Any Any

The LLM's response.

Source code in ontocast/tool/llm.py
@track_llm_usage
async def __call__(self, *args: Any, **kwds: Any) -> Any:
    """Call the language model directly (asynchronous).

    Args:
        *args: Positional arguments passed to the LLM.
        **kwds: Keyword arguments passed to the LLM.

    Returns:
        Any: The LLM's response.
    """
    # Extract prompt from args (first argument is typically the prompt)
    prompt = args[0] if args else ""

    # Prepare configuration for caching
    config_dict = {
        "provider": self.config.provider,
        "model_name": self.config.model_name,
        "temperature": self.config.temperature,
        "base_url": self.config.base_url,
    }

    # Check cache first
    cached_response = self.cache.get(prompt, config=config_dict, **kwds)

    if cached_response is not None:
        prompt_str = self._prompt_to_string(prompt)
        logger.debug(f"Cache hit for __call__: {prompt_str[:50]}...")
        # Return a mock BaseMessage object with the cached content
        content = cached_response["content"]
        content_str = content if isinstance(content, str) else str(content)
        return AIMessage(content=content_str)

    # Generate new response
    prompt_str = self._prompt_to_string(prompt)
    logger.debug(
        f"Cache miss, calling LLM for __call__, prompt size {len(prompt_str[:50])}..."
    )

    response = await self.llm.ainvoke(*args, **kwds)

    # Cache the response
    response_data = {
        "content": response.content,
        "prompt": self._prompt_to_string(prompt),
        "kwargs": kwds,
    }
    self.cache.set(prompt, response_data, config=config_dict, **kwds)

    return response

__init__(cache=None, budget_tracker=None, **kwargs)

Initialize the LLM tool.

Parameters:

Name Type Description Default
cache Cacher | None

Optional shared Cacher instance. If None, creates a new one.

None
budget_tracker Any

Optional budget tracker instance for usage statistics.

None
**kwargs

Additional keyword arguments passed to the parent class.

{}
Source code in ontocast/tool/llm.py
def __init__(
    self,
    cache: Cacher | None = None,
    budget_tracker: Any = None,
    **kwargs,
):
    """Initialize the LLM tool.

    Args:
        cache: Optional shared Cacher instance. If None, creates a new one.
        budget_tracker: Optional budget tracker instance for usage statistics.
        **kwargs: Additional keyword arguments passed to the parent class.
    """
    super().__init__(**kwargs)
    self._llm = None
    self.budget_tracker = budget_tracker

    # Initialize cache - use shared cacher or create new one
    if cache is not None:
        self.cache = ToolCacher(cache, "llm")
    else:
        # Fallback for backward compatibility
        shared_cache = Cacher()
        self.cache = ToolCacher(shared_cache, "llm")

acall(*args, **kwds) async

Call the language model directly (asynchronous).

Parameters:

Name Type Description Default
*args Any

Positional arguments passed to the LLM.

()
**kwds Any

Keyword arguments passed to the LLM.

{}

Returns:

Name Type Description
Any Any

The LLM's response.

Source code in ontocast/tool/llm.py
@track_llm_usage
async def acall(self, *args: Any, **kwds: Any) -> Any:
    """Call the language model directly (asynchronous).

    Args:
        *args: Positional arguments passed to the LLM.
        **kwds: Keyword arguments passed to the LLM.

    Returns:
        Any: The LLM's response.
    """
    # Extract prompt from args (first argument is typically the prompt)
    prompt = args[0] if args else ""

    # Prepare configuration for caching
    config_dict = {
        "provider": self.config.provider,
        "model_name": self.config.model_name,
        "temperature": self.config.temperature,
        "base_url": self.config.base_url,
    }

    # Check cache first
    cached_response = self.cache.get(prompt, config=config_dict, **kwds)

    if cached_response is not None:
        prompt_str = self._prompt_to_string(prompt)
        logger.debug(f"Cache hit for acall: {prompt_str[:50]}...")
        # Return a mock BaseMessage object with the cached content
        content = cached_response["content"]
        content_str = content if isinstance(content, str) else str(content)
        return AIMessage(content=content_str)

    # Generate new response
    prompt_str = self._prompt_to_string(prompt)
    logger.debug(f"Cache miss, calling LLM for acall: {prompt_str[:50]}...")

    response = await self.llm.ainvoke(*args, **kwds)

    # Cache the response
    response_data = {
        "content": response.content,
        "prompt": self._prompt_to_string(prompt),
        "kwargs": kwds,
    }
    self.cache.set(prompt, response_data, config=config_dict, **kwds)

    return response

acreate(config, cache=None, budget_tracker=None, **kwargs) async classmethod

Create a new LLM tool instance asynchronously.

Parameters:

Name Type Description Default
config LLMConfig

LLMConfig object containing LLM settings.

required
cache Cacher | None

Optional shared Cacher instance.

None
budget_tracker Any

Optional budget tracker instance for usage statistics.

None
**kwargs

Additional keyword arguments for initialization.

{}

Returns:

Name Type Description
LLMTool

A new instance of the LLM tool.

Source code in ontocast/tool/llm.py
@classmethod
async def acreate(
    cls,
    config: LLMConfig,
    cache: Cacher | None = None,
    budget_tracker: Any = None,
    **kwargs,
):
    """Create a new LLM tool instance asynchronously.

    Args:
        config: LLMConfig object containing LLM settings.
        cache: Optional shared Cacher instance.
        budget_tracker: Optional budget tracker instance for usage statistics.
        **kwargs: Additional keyword arguments for initialization.

    Returns:
        LLMTool: A new instance of the LLM tool.
    """
    # Create and initialize the instance with the config
    self = cls(config=config, cache=cache, budget_tracker=budget_tracker, **kwargs)
    await self.setup()
    return self

complete(prompt, **kwargs) async

Generate a completion for the given prompt.

Parameters:

Name Type Description Default
prompt str

The input prompt for generation.

required
**kwargs

Additional keyword arguments for generation.

{}

Returns:

Name Type Description
Any Any

The generated completion.

Source code in ontocast/tool/llm.py
@track_llm_usage
async def complete(self, prompt: str, **kwargs) -> Any:
    """Generate a completion for the given prompt.

    Args:
        prompt: The input prompt for generation.
        **kwargs: Additional keyword arguments for generation.

    Returns:
        Any: The generated completion.
    """
    # Prepare configuration for caching
    config_dict = {
        "provider": self.config.provider,
        "model_name": self.config.model_name,
        "temperature": self.config.temperature,
        "base_url": self.config.base_url,
    }

    # Check cache first
    cached_response = self.cache.get(prompt, config=config_dict, **kwargs)

    if cached_response is not None:
        logger.debug(f"Cache hit for prompt: {prompt[:50]}...")
        content = cached_response["content"]
        return content if isinstance(content, str) else str(content)

    # Generate new response
    logger.debug(f"Cache miss, calling LLM for prompt: {prompt[:50]}...")

    response = await self.llm.ainvoke(prompt, **kwargs)

    # Cache the response
    response_data = {
        "content": response.content,
        "prompt": self._prompt_to_string(prompt),
        "kwargs": kwargs,
    }
    self.cache.set(prompt, response_data, config=config_dict, **kwargs)

    return response.content

create(config, cache=None, budget_tracker=None, **kwargs) classmethod

Create a new LLM tool instance synchronously.

Parameters:

Name Type Description Default
config LLMConfig

LLMConfig object containing LLM settings.

required
cache Cacher | None

Optional shared Cacher instance.

None
budget_tracker Any

Optional budget tracker instance for usage statistics.

None
**kwargs

Additional keyword arguments for initialization.

{}

Returns:

Name Type Description
LLMTool

A new instance of the LLM tool.

Source code in ontocast/tool/llm.py
@classmethod
def create(
    cls,
    config: LLMConfig,
    cache: Cacher | None = None,
    budget_tracker: Any = None,
    **kwargs,
):
    """Create a new LLM tool instance synchronously.

    Args:
        config: LLMConfig object containing LLM settings.
        cache: Optional shared Cacher instance.
        budget_tracker: Optional budget tracker instance for usage statistics.
        **kwargs: Additional keyword arguments for initialization.

    Returns:
        LLMTool: A new instance of the LLM tool.
    """
    return asyncio.run(
        cls.acreate(
            config=config, cache=cache, budget_tracker=budget_tracker, **kwargs
        )
    )

extract(prompt, output_schema, **kwargs) async

Extract structured data from the prompt according to a schema.

Parameters:

Name Type Description Default
prompt str

The input prompt for extraction.

required
output_schema Type[T]

The Pydantic model class defining the output structure.

required
**kwargs

Additional keyword arguments for extraction.

{}

Returns:

Name Type Description
T T

The extracted data conforming to the output schema.

Source code in ontocast/tool/llm.py
@track_llm_usage
async def extract(self, prompt: str, output_schema: Type[T], **kwargs) -> T:
    """Extract structured data from the prompt according to a schema.

    Args:
        prompt: The input prompt for extraction.
        output_schema: The Pydantic model class defining the output structure.
        **kwargs: Additional keyword arguments for extraction.

    Returns:
        T: The extracted data conforming to the output schema.
    """
    parser = PydanticOutputParser(pydantic_object=output_schema)
    format_instructions = parser.get_format_instructions()

    full_prompt = f"{prompt}\n\n{format_instructions}"

    # Prepare configuration for caching
    config_dict = {
        "provider": self.config.provider,
        "model_name": self.config.model_name,
        "temperature": self.config.temperature,
        "base_url": self.config.base_url,
        "output_schema": output_schema.__name__,
    }

    # Check cache first
    cached_response = self.cache.get(full_prompt, config=config_dict, **kwargs)

    if cached_response is not None:
        logger.debug(f"Cache hit for extraction: {prompt[:50]}...")
        # Parse the cached content
        content = cached_response["content"]
        if isinstance(content, str):
            return parser.parse(content)
        else:
            # Fallback: convert to string if it's not already
            return parser.parse(str(content))

    # Generate new response
    logger.debug(f"Cache miss, calling LLM for extraction: {prompt[:50]}...")

    response = await self.llm.ainvoke(full_prompt, **kwargs)

    # Cache the response
    response_data = {
        "content": response.content,
        "prompt": self._prompt_to_string(full_prompt),
        "output_schema": output_schema.__name__,
        "kwargs": kwargs,
    }
    self.cache.set(full_prompt, response_data, config=config_dict, **kwargs)

    content = response.content
    return parser.parse(content if isinstance(content, str) else str(content))

setup() async

Set up the language model based on the configured provider.

Raises:

Type Description
ValueError

If the provider is not supported.

Source code in ontocast/tool/llm.py
async def setup(self):
    """Set up the language model based on the configured provider.

    Raises:
        ValueError: If the provider is not supported.
    """
    if self.config.provider == LLMProvider.OPENAI:
        if self.config.model_name.startswith("gpt-5"):
            self.config.temperature = 1.0
            logger.warning(
                f"Setting temperature to {self.config.temperature} for gpt-5 class "
                f"model {self.config.model_name}"
            )
        self._llm = ChatOpenAI(
            model=self.config.model_name,  # type: ignore
            temperature=self.config.temperature,
            base_url=self.config.base_url,  # type: ignore
            api_key=(
                SecretStr(self.config.api_key) if self.config.api_key else None
            ),  # type: ignore
        )
    elif self.config.provider == LLMProvider.OLLAMA:
        self._llm = ChatOllama(
            model=self.config.model_name,
            base_url=self.config.base_url,
            temperature=self.config.temperature,
        )
    elif self.config.provider == LLMProvider.ANTHROPIC:
        anthropic_kwargs: dict[str, Any] = {
            "model": self.config.model_name,
            "temperature": self.config.temperature,
        }
        if self.config.api_key:
            anthropic_kwargs["anthropic_api_key"] = SecretStr(self.config.api_key)
        if self.config.base_url:
            anthropic_kwargs["anthropic_api_url"] = self.config.base_url
        self._llm = ChatAnthropic(**anthropic_kwargs)
    elif self.config.provider == LLMProvider.GOOGLE:
        self._llm = ChatGoogleGenerativeAI(
            model=self.config.model_name,
            temperature=self.config.temperature,
            google_api_key=self.config.api_key,
        )
    else:
        raise ValueError(f"Unsupported provider: {self.config.provider}")

Neo4jTripleStoreManager

Bases: TripleStoreManagerWithAuth

Neo4j-based triple store manager using n10s (neosemantics) plugin.

This implementation handles RDF data more faithfully by using both the n10s property graph representation and raw RDF triple storage for accurate reconstruction. It provides comprehensive ontology management with namespace-based organization.

The manager uses Neo4j's n10s plugin for RDF operations, including: - RDF import and export via n10s - Ontology metadata storage and retrieval - Namespace-based ontology organization - Faithful RDF graph reconstruction

Attributes:

Name Type Description
_driver Any

Private Neo4j driver instance.

Source code in ontocast/tool/triple_manager/neo4j.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
class Neo4jTripleStoreManager(TripleStoreManagerWithAuth):
    """Neo4j-based triple store manager using n10s (neosemantics) plugin.

    This implementation handles RDF data more faithfully by using both the n10s
    property graph representation and raw RDF triple storage for accurate reconstruction.
    It provides comprehensive ontology management with namespace-based organization.

    The manager uses Neo4j's n10s plugin for RDF operations, including:
    - RDF import and export via n10s
    - Ontology metadata storage and retrieval
    - Namespace-based ontology organization
    - Faithful RDF graph reconstruction

    Attributes:
        _driver: Private Neo4j driver instance.
    """

    _driver: Any = None  # private attribute, not a pydantic field

    def __init__(self, uri=None, auth=None, **kwargs):
        """Initialize the Neo4j triple store manager.

        This method sets up the connection to Neo4j, initializes the n10s
        plugin configuration, and creates necessary constraints and indexes.
        The database is NOT cleaned on initialization.

        Args:
            uri: Neo4j connection URI (e.g., "bolt://localhost:7687").
            auth: Authentication tuple (username, password) or string in "user/password" format.
            **kwargs: Additional keyword arguments passed to the parent class.

        Raises:
            ImportError: If the neo4j Python driver is not installed.

        Example:
            >>> manager = Neo4jTripleStoreManager(
            ...     uri="bolt://localhost:7687",
            ...     auth="neo4j/password"
            ... )
            >>> # To clean the database, use the clean() method explicitly:
            >>> await manager.clean()
        """
        super().__init__(
            uri=uri, auth=auth, env_uri="NEO4J_URI", env_auth="NEO4J_AUTH", **kwargs
        )
        if GraphDatabase is None:
            raise ImportError("neo4j Python driver is not installed.")
        if self.uri is None:
            raise ValueError("Neo4j URI is required but not provided.")
        self._driver = GraphDatabase.driver(self.uri, auth=self.auth)

        # Type assertion: we know _driver is not None after initialization
        assert self._driver is not None

        with self._driver.session() as session:
            # Initialize n10s configuration
            self._init_n10s_config(session)

            # Create constraints and indexes
            self._create_constraints_and_indexes(session)

    async def clean(self) -> None:
        """Clean/flush all data from the Neo4j database.

        This method deletes all nodes and relationships from the Neo4j database,
        effectively clearing all stored data.

        Warning: This operation is irreversible and will delete all data.

        Raises:
            Exception: If the cleanup operation fails.
        """
        if self._driver is None:
            raise ValueError("Neo4j driver is not initialized")

        with self._driver.session() as session:
            try:
                session.run("MATCH (n) DETACH DELETE n")
                logger.info("Neo4j database cleaned (all nodes deleted)")
            except Exception as e:
                logger.error(f"Neo4j cleanup failed: {e}")
                raise

    def _init_n10s_config(self, session):
        """Initialize n10s configuration with better RDF handling.

        This method configures the n10s plugin for optimal RDF handling.
        It sets up the configuration to preserve vocabulary URIs, handle
        multivalued properties, and maintain RDF types as nodes.

        Args:
            session: Neo4j session for executing configuration commands.
        """
        try:
            # Check if already configured
            result = session.run("CALL n10s.graphconfig.show()")
            if result.single():
                logger.debug("n10s already configured")
        except:
            pass

        try:
            session.run("""
                CALL n10s.graphconfig.init({
                    handleVocabUris: "KEEP",
                    handleMultival: "OVERWRITE",
                    typesToLabels: false,
                    keepLangTag: false,
                    keepCustomDataTypes: true,
                    handleRDFTypes: "NODES"
                })
            """)
            logger.debug("n10s configuration initialized")
        except Exception as e:
            logger.warning(f"n10s configuration failed: {e}")

    def _create_constraints_and_indexes(self, session):
        """Create necessary constraints and indexes for optimal performance.

        This method creates Neo4j constraints and indexes that are needed
        for efficient ontology operations and data integrity.

        Args:
            session: Neo4j session for executing constraint/index creation commands.
        """
        constraints = [
            "CREATE CONSTRAINT n10s_unique_uri IF NOT EXISTS FOR (r:Resource) REQUIRE r.uri IS UNIQUE",
            "CREATE CONSTRAINT ontology_iri_unique IF NOT EXISTS FOR (o:Ontology) REQUIRE o.uri IS UNIQUE",
            "CREATE INDEX namespace_prefix IF NOT EXISTS FOR (ns:Namespace) ON (ns.prefix)",
        ]

        for constraint in constraints:
            try:
                session.run(constraint)
                logger.debug(f"Created constraint/index: {constraint.split()[-1]}")
            except Exception as e:
                logger.debug(f"Constraint/index creation (might already exist): {e}")

    def _extract_namespace_prefix(self, uri: str) -> tuple[str, str]:
        """Extract namespace and local name from URI.

        This method parses a URI to extract the namespace and local name
        using common separators (#, /, :).

        Args:
            uri: The URI to parse.

        Returns:
            tuple[str, str]: A tuple of (namespace, local_name).

        Example:
            >>> manager._extract_namespace_prefix("http://example.org/onto#Class")
            ("http://example.org/onto#", "Class")
        """
        common_separators = ["#", "/", ":"]
        for sep in common_separators:
            if sep in uri:
                parts = uri.rsplit(sep, 1)
                if len(parts) == 2:
                    return parts[0] + sep, parts[1]
        return uri, ""

    def _get_ontology_namespaces(self, session) -> dict:
        """Get all known ontology namespaces from the database.

        This method queries the Neo4j database to retrieve all known
        namespace prefixes and their corresponding URIs.

        Args:
            session: Neo4j session for executing queries.

        Returns:
            dict: Dictionary mapping namespace prefixes to URIs.
        """
        result = session.run("""
            MATCH (ns:Namespace)
            RETURN ns.prefix as prefix, ns.uri as uri
            UNION
            MATCH (o:Ontology)
            RETURN null as prefix, o.uri as uri
        """)

        namespaces = {}
        for record in result:
            uri = record.get("uri")
            prefix = record.get("prefix")
            if uri:
                if prefix:
                    namespaces[prefix] = uri
                else:
                    # Extract potential namespace from ontology URI
                    ns, _ = self._extract_namespace_prefix(uri)
                    if ns != uri:  # Only if we actually found a namespace
                        namespaces[ns] = ns

        return namespaces

    def fetch_ontologies(self) -> list[Ontology]:
        """Fetch ontologies from Neo4j with faithful RDF reconstruction.

        This method retrieves all ontologies from Neo4j and reconstructs
        their RDF graphs faithfully. It uses a multi-step process:

        1. Identifies distinct ontologies by their namespace URIs
        2. Fetches all entities belonging to each ontology
        3. Reconstructs the RDF graph faithfully using stored triples when available
        4. Falls back to n10s property graph conversion when needed

        Returns:
            list[Ontology]: List of all ontologies found in the database.

        Example:
            >>> ontologies = manager.fetch_ontologies()
            >>> for onto in ontologies:
            ...     print(f"Found ontology: {onto.iri}")
        """
        ontologies = []

        # Type assertion: we know _driver is not None after initialization
        assert self._driver is not None
        with self._driver.session() as session:
            try:
                # First, try to get explicitly stored ontology metadata
                ontology_iris = self._fetch_ontology_iris(session)

                if ontology_iris:
                    for ont_iri in ontology_iris:
                        ontology = self._reconstruct_ontology_from_metadata(
                            session, ont_iri
                        )
                        if ontology:
                            ontologies.append(ontology)

            except Exception as e:
                logger.error(f"Error in fetch_ontologies: {e}")

        logger.info(f"Successfully loaded {len(ontologies)} ontologies")
        return ontologies

    def _fetch_ontology_iris(self, session) -> list[str]:
        """Fetch explicit ontology metadata from Neo4j.

        This method queries Neo4j to find all entities that are explicitly
        typed as owl:Ontology.

        Args:
            session: Neo4j session for executing queries.

        Returns:
            list[str]: List of ontology IRIs found in the database.
        """
        result = session.run(f"""
            MATCH (o)-[:`{str(RDF.type)}`]->(t:Resource {{ uri: "{str(OWL.Ontology)}" }})
            WHERE o.uri IS NOT NULL
            RETURN
              o.uri AS iri
        """)

        iris = []
        for record in result:
            iri = record.get("iri", None)
            iris += [iri]
        iris = [iri for iri in iris if iri is not None]
        return iris

    def _reconstruct_ontology_from_metadata(self, session, iri) -> Ontology | None:
        """Reconstruct an ontology from its metadata and related entities.

        This method takes an ontology IRI and reconstructs the complete
        ontology by fetching all related entities from the namespace.

        Args:
            session: Neo4j session for executing queries.
            iri: The ontology IRI to reconstruct.

        Returns:
            Ontology | None: The reconstructed ontology, or None if failed.
        """
        namespace_uri, _ = self._extract_namespace_prefix(iri)

        logger.debug(f"Reconstructing ontology: {iri} with namespace: {namespace_uri}")

        # Fallback to n10s export for this namespace
        graph = self._export_namespace_via_n10s(session, namespace_uri)
        if graph and len(graph) > 0:
            return self._create_ontology_object(iri, iri, graph)

    def _export_namespace_via_n10s(
        self, session, namespace_uri: str
    ) -> RDFGraph | None:
        """Export entities belonging to a namespace using n10s.

        This method uses Neo4j's n10s plugin to export all entities
        belonging to a specific namespace as RDF triples.

        Args:
            session: Neo4j session for executing queries.
            namespace_uri: The namespace URI to export.

        Returns:
            RDFGraph | None: The exported RDF graph, or None if failed.
        """
        try:
            result = session.run(
                f"""
                CALL n10s.rdf.export.cypher(
                    'MATCH (n)-[r]->(m) WHERE n.uri STARTS WITH "{namespace_uri}" RETURN n,r,m',
                    {{format: 'Turtle'}}
                )
                YIELD subject, predicate, object, isLiteral, literalType, literalLang
                RETURN subject, predicate, object, isLiteral, literalType, literalLang
                """
            )

            # Process into Turtle format
            turtle_lines = []

            for record in result:
                subj = record["subject"]
                pred = record["predicate"]
                obj = record["object"]
                is_literal = record["isLiteral"]
                literal_type = record["literalType"]
                literal_lang = record["literalLang"]

                # Format object
                if is_literal:
                    # Escape special characters in literals
                    obj = obj.replace('"', r"\"")
                    obj_str = f'"{obj}"'

                    # Add datatype or language tag if present
                    if literal_lang:
                        obj_str += f"@{literal_lang}"
                    elif literal_type:
                        obj_str += f"^^<{literal_type}>"
                else:
                    obj_str = f"<{obj}>"

                # Format triple
                turtle_lines.append(f"<{subj}> <{pred}> {obj_str} .")

            # Combine into single string
            turtle_string = "\n".join(turtle_lines)

            if turtle_string.strip():
                graph = RDFGraph()
                graph.parse(data=turtle_string, format="turtle")
                logger.debug(
                    f"Exported {len(graph)} triples via n10s for namespace {namespace_uri}"
                )
                return graph
            return None

        except Exception as e:
            logger.debug(
                f"Failed to export via n10s for namespace {namespace_uri}: {e}"
            )

        return None

    def _create_ontology_object(
        self, iri: str, metadata: dict, graph: RDFGraph
    ) -> Ontology:
        """Create an Ontology object from IRI, metadata, and graph.

        Args:
            iri: The ontology IRI.
            metadata: Metadata dictionary (currently unused, kept for compatibility).
            graph: The RDF graph containing the ontology data.

        Returns:
            Ontology: The created ontology object.
        """
        ontology_id = derive_ontology_id(iri)
        return Ontology(graph=graph, iri=iri, ontology_id=ontology_id)

    def serialize_graph(self, graph: Graph, **kwargs) -> bool | None:
        """Serialize an RDF graph to Neo4j with both n10s and raw triple storage.

        This method stores the given RDF graph in Neo4j using the n10s plugin
        for RDF import. The data is stored as RDF triples that can be faithfully
        reconstructed later.

        Args:
            graph: The RDF graph to store.
            **kwargs: Additional parameters (not used by Neo4j implementation).

        Returns:
            Any: The result summary from n10s import operation.
        """
        # Convert to RDFGraph if needed
        if not isinstance(graph, RDFGraph):
            rdf_graph = RDFGraph()
            for triple in graph:
                rdf_graph.add(triple)
            for prefix, namespace in graph.namespaces():
                rdf_graph.bind(prefix, namespace)
            graph = rdf_graph

        turtle_data = graph.serialize_canonical_turtle()

        # Type assertion: we know _driver is not None after initialization
        assert self._driver is not None
        with self._driver.session() as session:
            # Store via n10s for graph queries
            result = session.run(
                "CALL n10s.rdf.import.inline($ttl, 'Turtle')", ttl=turtle_data
            )
            summary = result.single()

        return summary

    def serialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
        """Serialize an Ontology or RDFGraph to Neo4j with both n10s and raw triple storage.

        This method stores the given Ontology or RDFGraph in Neo4j using the n10s plugin
        for RDF import. The data is stored as RDF triples that can be faithfully
        reconstructed later.

        Args:
            o: Ontology or RDFGraph object to store.
            **kwargs: Additional keyword arguments (not used by Neo4j implementation).

        Returns:
            Any: The result summary from n10s import operation.
        """
        if isinstance(o, Ontology):
            graph = o.graph
        elif isinstance(o, RDFGraph):
            graph = o
        else:
            raise TypeError(f"unsupported obj of type {type(o)} received")

        return self.serialize_graph(graph)

    def close(self):
        """Close the Neo4j driver connection.

        This method should be called when the manager is no longer needed
        to properly close the database connection and free resources.
        """
        if self._driver:
            self._driver.close()

__init__(uri=None, auth=None, **kwargs)

Initialize the Neo4j triple store manager.

This method sets up the connection to Neo4j, initializes the n10s plugin configuration, and creates necessary constraints and indexes. The database is NOT cleaned on initialization.

Parameters:

Name Type Description Default
uri

Neo4j connection URI (e.g., "bolt://localhost:7687").

None
auth

Authentication tuple (username, password) or string in "user/password" format.

None
**kwargs

Additional keyword arguments passed to the parent class.

{}

Raises:

Type Description
ImportError

If the neo4j Python driver is not installed.

Example

manager = Neo4jTripleStoreManager( ... uri="bolt://localhost:7687", ... auth="neo4j/password" ... )

To clean the database, use the clean() method explicitly:

await manager.clean()

Source code in ontocast/tool/triple_manager/neo4j.py
def __init__(self, uri=None, auth=None, **kwargs):
    """Initialize the Neo4j triple store manager.

    This method sets up the connection to Neo4j, initializes the n10s
    plugin configuration, and creates necessary constraints and indexes.
    The database is NOT cleaned on initialization.

    Args:
        uri: Neo4j connection URI (e.g., "bolt://localhost:7687").
        auth: Authentication tuple (username, password) or string in "user/password" format.
        **kwargs: Additional keyword arguments passed to the parent class.

    Raises:
        ImportError: If the neo4j Python driver is not installed.

    Example:
        >>> manager = Neo4jTripleStoreManager(
        ...     uri="bolt://localhost:7687",
        ...     auth="neo4j/password"
        ... )
        >>> # To clean the database, use the clean() method explicitly:
        >>> await manager.clean()
    """
    super().__init__(
        uri=uri, auth=auth, env_uri="NEO4J_URI", env_auth="NEO4J_AUTH", **kwargs
    )
    if GraphDatabase is None:
        raise ImportError("neo4j Python driver is not installed.")
    if self.uri is None:
        raise ValueError("Neo4j URI is required but not provided.")
    self._driver = GraphDatabase.driver(self.uri, auth=self.auth)

    # Type assertion: we know _driver is not None after initialization
    assert self._driver is not None

    with self._driver.session() as session:
        # Initialize n10s configuration
        self._init_n10s_config(session)

        # Create constraints and indexes
        self._create_constraints_and_indexes(session)

clean() async

Clean/flush all data from the Neo4j database.

This method deletes all nodes and relationships from the Neo4j database, effectively clearing all stored data.

Warning: This operation is irreversible and will delete all data.

Raises:

Type Description
Exception

If the cleanup operation fails.

Source code in ontocast/tool/triple_manager/neo4j.py
async def clean(self) -> None:
    """Clean/flush all data from the Neo4j database.

    This method deletes all nodes and relationships from the Neo4j database,
    effectively clearing all stored data.

    Warning: This operation is irreversible and will delete all data.

    Raises:
        Exception: If the cleanup operation fails.
    """
    if self._driver is None:
        raise ValueError("Neo4j driver is not initialized")

    with self._driver.session() as session:
        try:
            session.run("MATCH (n) DETACH DELETE n")
            logger.info("Neo4j database cleaned (all nodes deleted)")
        except Exception as e:
            logger.error(f"Neo4j cleanup failed: {e}")
            raise

close()

Close the Neo4j driver connection.

This method should be called when the manager is no longer needed to properly close the database connection and free resources.

Source code in ontocast/tool/triple_manager/neo4j.py
def close(self):
    """Close the Neo4j driver connection.

    This method should be called when the manager is no longer needed
    to properly close the database connection and free resources.
    """
    if self._driver:
        self._driver.close()

fetch_ontologies()

Fetch ontologies from Neo4j with faithful RDF reconstruction.

This method retrieves all ontologies from Neo4j and reconstructs their RDF graphs faithfully. It uses a multi-step process:

  1. Identifies distinct ontologies by their namespace URIs
  2. Fetches all entities belonging to each ontology
  3. Reconstructs the RDF graph faithfully using stored triples when available
  4. Falls back to n10s property graph conversion when needed

Returns:

Type Description
list[Ontology]

list[Ontology]: List of all ontologies found in the database.

Example

ontologies = manager.fetch_ontologies() for onto in ontologies: ... print(f"Found ontology: {onto.iri}")

Source code in ontocast/tool/triple_manager/neo4j.py
def fetch_ontologies(self) -> list[Ontology]:
    """Fetch ontologies from Neo4j with faithful RDF reconstruction.

    This method retrieves all ontologies from Neo4j and reconstructs
    their RDF graphs faithfully. It uses a multi-step process:

    1. Identifies distinct ontologies by their namespace URIs
    2. Fetches all entities belonging to each ontology
    3. Reconstructs the RDF graph faithfully using stored triples when available
    4. Falls back to n10s property graph conversion when needed

    Returns:
        list[Ontology]: List of all ontologies found in the database.

    Example:
        >>> ontologies = manager.fetch_ontologies()
        >>> for onto in ontologies:
        ...     print(f"Found ontology: {onto.iri}")
    """
    ontologies = []

    # Type assertion: we know _driver is not None after initialization
    assert self._driver is not None
    with self._driver.session() as session:
        try:
            # First, try to get explicitly stored ontology metadata
            ontology_iris = self._fetch_ontology_iris(session)

            if ontology_iris:
                for ont_iri in ontology_iris:
                    ontology = self._reconstruct_ontology_from_metadata(
                        session, ont_iri
                    )
                    if ontology:
                        ontologies.append(ontology)

        except Exception as e:
            logger.error(f"Error in fetch_ontologies: {e}")

    logger.info(f"Successfully loaded {len(ontologies)} ontologies")
    return ontologies

serialize(o, **kwargs)

Serialize an Ontology or RDFGraph to Neo4j with both n10s and raw triple storage.

This method stores the given Ontology or RDFGraph in Neo4j using the n10s plugin for RDF import. The data is stored as RDF triples that can be faithfully reconstructed later.

Parameters:

Name Type Description Default
o Ontology | RDFGraph

Ontology or RDFGraph object to store.

required
**kwargs

Additional keyword arguments (not used by Neo4j implementation).

{}

Returns:

Name Type Description
Any bool | None

The result summary from n10s import operation.

Source code in ontocast/tool/triple_manager/neo4j.py
def serialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
    """Serialize an Ontology or RDFGraph to Neo4j with both n10s and raw triple storage.

    This method stores the given Ontology or RDFGraph in Neo4j using the n10s plugin
    for RDF import. The data is stored as RDF triples that can be faithfully
    reconstructed later.

    Args:
        o: Ontology or RDFGraph object to store.
        **kwargs: Additional keyword arguments (not used by Neo4j implementation).

    Returns:
        Any: The result summary from n10s import operation.
    """
    if isinstance(o, Ontology):
        graph = o.graph
    elif isinstance(o, RDFGraph):
        graph = o
    else:
        raise TypeError(f"unsupported obj of type {type(o)} received")

    return self.serialize_graph(graph)

serialize_graph(graph, **kwargs)

Serialize an RDF graph to Neo4j with both n10s and raw triple storage.

This method stores the given RDF graph in Neo4j using the n10s plugin for RDF import. The data is stored as RDF triples that can be faithfully reconstructed later.

Parameters:

Name Type Description Default
graph Graph

The RDF graph to store.

required
**kwargs

Additional parameters (not used by Neo4j implementation).

{}

Returns:

Name Type Description
Any bool | None

The result summary from n10s import operation.

Source code in ontocast/tool/triple_manager/neo4j.py
def serialize_graph(self, graph: Graph, **kwargs) -> bool | None:
    """Serialize an RDF graph to Neo4j with both n10s and raw triple storage.

    This method stores the given RDF graph in Neo4j using the n10s plugin
    for RDF import. The data is stored as RDF triples that can be faithfully
    reconstructed later.

    Args:
        graph: The RDF graph to store.
        **kwargs: Additional parameters (not used by Neo4j implementation).

    Returns:
        Any: The result summary from n10s import operation.
    """
    # Convert to RDFGraph if needed
    if not isinstance(graph, RDFGraph):
        rdf_graph = RDFGraph()
        for triple in graph:
            rdf_graph.add(triple)
        for prefix, namespace in graph.namespaces():
            rdf_graph.bind(prefix, namespace)
        graph = rdf_graph

    turtle_data = graph.serialize_canonical_turtle()

    # Type assertion: we know _driver is not None after initialization
    assert self._driver is not None
    with self._driver.session() as session:
        # Store via n10s for graph queries
        result = session.run(
            "CALL n10s.rdf.import.inline($ttl, 'Turtle')", ttl=turtle_data
        )
        summary = result.single()

    return summary

OntologyManager

Bases: Tool

Manager for handling multiple ontologies with version tracking.

This class provides functionality for managing a collection of ontologies, tracking version lineage using hash-based identifiers. For each IRI, it maintains a tree/graph of all versions identified by their hashes.

Attributes:

Name Type Description
ontology_versions dict[str, list[Ontology]]

Dictionary mapping IRI to list of all ontology versions (identified by hash). Each IRI can have multiple versions forming a lineage tree.

Source code in ontocast/tool/ontology_manager.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
class OntologyManager(Tool):
    """Manager for handling multiple ontologies with version tracking.

    This class provides functionality for managing a collection of ontologies,
    tracking version lineage using hash-based identifiers. For each IRI,
    it maintains a tree/graph of all versions identified by their hashes.

    Attributes:
        ontology_versions: Dictionary mapping IRI to list of all
            ontology versions (identified by hash). Each IRI can have
            multiple versions forming a lineage tree.
    """

    ontology_versions: dict[str, list[Ontology]] = Field(default_factory=dict)

    def __init__(self, **kwargs):
        """Initialize the ontology manager.

        Args:
            **kwargs: Additional keyword arguments passed to the parent class.
        """
        super().__init__(**kwargs)
        # Cache dictionary mapping IRI to hash of freshest terminal ontology.
        # Updated incrementally when ontologies are added.
        self._cached_ontologies: dict[str, str] = {}
        self._patch_retriever: OntologyPatchRetriever | None = None
        self._iri_to_identity: dict[str, str] = {}
        self._identity_to_iri: dict[str, str] = {}

    @staticmethod
    def _build_identity_key(ontology: Ontology) -> str:
        identity = (ontology.ontology_id or ontology.prefix or "").strip().lower()
        if not identity:
            raise ValueError(
                "Ontology identity is missing: provide ontology_id or ontology prefix"
            )
        return identity

    def validate_identity_uniqueness(self, ontology: Ontology) -> None:
        """Validate ontology IRI<->identity bijection across the manager."""
        iri = (ontology.iri or "").strip()
        if not iri:
            raise ValueError("Ontology IRI is missing")
        if iri == NULL_ONTOLOGY.iri:
            raise ValueError("Null ontology IRI cannot be registered")

        identity = self._build_identity_key(ontology)

        existing_identity = self._iri_to_identity.get(iri)
        if existing_identity is not None and existing_identity != identity:
            raise ValueError(
                "Ontology identity conflict: IRI "
                f"'{iri}' is already bound to identity '{existing_identity}', "
                f"received '{identity}'"
            )

        existing_iri = self._identity_to_iri.get(identity)
        if existing_iri is not None and existing_iri != iri:
            raise ValueError(
                "Ontology identity conflict: identity "
                f"'{identity}' is already bound to IRI '{existing_iri}', "
                f"received '{iri}'"
            )

    def _register_identity(self, ontology: Ontology) -> None:
        iri = ontology.iri.strip()
        identity = self._build_identity_key(ontology)
        self._iri_to_identity[iri] = identity
        self._identity_to_iri[identity] = iri

    def __contains__(self, item):
        """Check if an item (IRI or ontology_id) is in the ontology manager.

        Args:
            item: The IRI or ontology_id to check.

        Returns:
            bool: True if the item exists in any version of any ontology.
        """
        # Check by IRI (primary key)
        if item in self.ontology_versions:
            return True
        # Check by ontology_id (fallback for backward compatibility)
        for versions in self.ontology_versions.values():
            for o in versions:
                if o.ontology_id == item:
                    return True
        return False

    def add_ontology(
        self, ontology: Ontology, *, skip_vector_index: bool = False
    ) -> None:
        """Add an ontology to the version tree for its IRI.

        If an ontology with the same hash already exists, it is not added again.
        The ontology is added to the version tree for its IRI.
        Ensures that created_at is set if not already present.

        Args:
            ontology: The ontology to add.
            skip_vector_index: If True, do not call the vector store (caller
                already materialized embeddings, e.g. during ToolBox.initialize).
        """
        if not ontology.iri or ontology.iri == NULL_ONTOLOGY.iri:
            logger.warning(
                f"Cannot add ontology without valid IRI (ontology_id: {ontology.ontology_id})"
            )
            return

        if not ontology.hash:
            logger.warning(f"Cannot add ontology without hash (IRI: {ontology.iri})")
            return

        self.validate_identity_uniqueness(ontology)
        self._register_identity(ontology)

        # Ensure created_at is set
        if not ontology.created_at:
            from datetime import datetime, timezone

            ontology.created_at = datetime.now(timezone.utc)
            logger.debug(
                f"Set created_at for ontology {ontology.iri} with hash {ontology.hash[:8]}..."
            )

        if ontology.iri not in self.ontology_versions:
            self.ontology_versions[ontology.iri] = []

        # Check if this hash already exists
        existing_hashes = {o.hash for o in self.ontology_versions[ontology.iri]}
        if ontology.hash not in existing_hashes:
            self.ontology_versions[ontology.iri].append(ontology)
            if self._patch_retriever is not None and not skip_vector_index:
                self._patch_retriever.vector_store.reindex_ontology(ontology)
            # Update cache for this specific IRI (store hash only)
            freshest = self.get_freshest_terminal_ontology_by_iri(ontology.iri)
            if freshest and freshest.hash:
                self._cached_ontologies[ontology.iri] = freshest.hash
            logger.debug(
                f"Added ontology {ontology.iri} with hash {ontology.hash[:8]}..."
            )
        else:
            logger.debug(
                f"Ontology {ontology.iri} with hash {ontology.hash[:8]}... already exists"
            )

    def remove_ontology_by_iri(self, iri: str) -> None:
        """Drop all tracked versions for an ontology IRI and clear caches."""
        self.ontology_versions.pop(iri, None)
        self._cached_ontologies.pop(iri, None)
        removed_identity = self._iri_to_identity.pop(iri, None)
        if removed_identity is not None:
            self._identity_to_iri.pop(removed_identity, None)

    def register_vector_store(self, retriever: "OntologyPatchRetriever") -> None:
        """Register a patch retriever for vector context lookups."""
        self._patch_retriever = retriever

    def _effective_patch_top_k(self, top_k: int | None) -> int:
        if top_k is not None:
            return top_k
        if self._patch_retriever is not None:
            return self._patch_retriever.vector_store.config.top_k
        return 10

    def get_patch_context(
        self,
        query: str,
        top_k: int | None = None,
        subgraph_depth: int = 1,
        max_total_triples: int = 300,
        estimated_triples_per_query: int = 24,
    ) -> RDFGraph | None:
        """Retrieve multi-ontology patch context for a query.

        Falls back to the freshest available ontology graph if vector retrieval
        is not configured or yields no atoms.
        """
        graph, _ = self.get_patch_context_with_sources(
            query=query,
            top_k=top_k,
            subgraph_depth=subgraph_depth,
            max_total_triples=max_total_triples,
            estimated_triples_per_query=estimated_triples_per_query,
        )
        return graph

    async def aget_patch_context(
        self,
        query: str,
        top_k: int | None = None,
        subgraph_depth: int = 1,
        max_total_triples: int = 300,
        estimated_triples_per_query: int = 24,
    ) -> RDFGraph | None:
        """Async variant of :meth:`get_patch_context`."""
        graph, _ = await self.aget_patch_context_with_sources(
            query=query,
            top_k=top_k,
            subgraph_depth=subgraph_depth,
            max_total_triples=max_total_triples,
            estimated_triples_per_query=estimated_triples_per_query,
        )
        return graph

    def get_patch_context_with_sources(
        self,
        query: str,
        top_k: int | None = None,
        subgraph_depth: int = 1,
        max_total_triples: int = 300,
        estimated_triples_per_query: int = 24,
    ) -> tuple[RDFGraph | None, list[str]]:
        """Retrieve patch context and contributing ontology IRIs."""
        results = self.get_patch_contexts_with_sources(
            queries=[query],
            top_k=top_k,
            subgraph_depth=subgraph_depth,
            max_total_triples=max_total_triples,
            estimated_triples_per_query=estimated_triples_per_query,
        )
        if not results:
            return None, []
        return results[0]

    async def aget_patch_context_with_sources(
        self,
        query: str,
        top_k: int | None = None,
        subgraph_depth: int = 1,
        max_total_triples: int = 300,
        estimated_triples_per_query: int = 24,
    ) -> tuple[RDFGraph | None, list[str]]:
        """Async variant of :meth:`get_patch_context_with_sources`."""
        results = await self.aget_patch_contexts_with_sources(
            queries=[query],
            top_k=top_k,
            subgraph_depth=subgraph_depth,
            max_total_triples=max_total_triples,
            estimated_triples_per_query=estimated_triples_per_query,
        )
        if not results:
            return None, []
        return results[0]

    def get_patch_contexts_with_sources(
        self,
        queries: list[str],
        top_k: int | None = None,
        subgraph_depth: int = 1,
        max_total_triples: int = 300,
        estimated_triples_per_query: int = 24,
    ) -> list[tuple[RDFGraph | None, list[str]]]:
        """Retrieve patch contexts for many queries in a batched pass.

        With a patch retriever, the list has length 1 (ensemble graph + sources).
        Without it, length matches ``queries`` (fallback ontology per query).
        """
        if not queries:
            return []
        if self._patch_retriever is not None:
            graph, sources = self._patch_retriever.retrieve_ensemble(
                queries=queries,
                top_k=self._effective_patch_top_k(top_k),
                subgraph_depth=subgraph_depth,
                max_total_triples=max_total_triples,
                estimated_triples_per_query=estimated_triples_per_query,
            )
            return [(graph, sources) if len(graph) > 0 else (RDFGraph(), sources)]

        fallback = self.get_freshest_terminal_ontology_by_iri(None)
        if fallback is None:
            return [(None, []) for _ in queries]
        fallback_graph = deepcopy(fallback.graph)
        return [(deepcopy(fallback_graph), [fallback.iri]) for _ in queries]

    async def aget_patch_contexts_with_sources(
        self,
        queries: list[str],
        top_k: int | None = None,
        subgraph_depth: int = 1,
        max_total_triples: int = 300,
        estimated_triples_per_query: int = 24,
    ) -> list[tuple[RDFGraph | None, list[str]]]:
        """Async patch retrieval (vector + induced subgraph) for many queries.

        With a patch retriever, returns a one-element list: a single induced graph for
        the union of hits over ``queries``, plus contributing ontology IRIs.
        """
        if not queries:
            return []
        if self._patch_retriever is not None:
            graph, sources = await self._patch_retriever.aretrieve_ensemble(
                queries=queries,
                top_k=self._effective_patch_top_k(top_k),
                subgraph_depth=subgraph_depth,
                max_total_triples=max_total_triples,
                estimated_triples_per_query=estimated_triples_per_query,
            )
            return [(graph, sources) if len(graph) > 0 else (RDFGraph(), sources)]

        fallback = self.get_freshest_terminal_ontology_by_iri(None)
        if fallback is None:
            return [(None, []) for _ in queries]
        fallback_graph = deepcopy(fallback.graph)
        return [(deepcopy(fallback_graph), [fallback.iri]) for _ in queries]

    def get_terminal_ontologies_by_iri(self, iri: str | None = None) -> list[Ontology]:
        """Get terminal (leaf) ontologies in the version graph.

        Terminal ontologies are those that are not parents of any other ontology
        in the version tree. If iri is provided, returns terminals for
        that ontology only; otherwise returns terminals for all ontologies.

        Args:
            iri: Optional IRI to filter by.

        Returns:
            list[Ontology]: List of terminal ontologies.
        """
        if iri:
            if iri not in self.ontology_versions:
                return []
            ontologies = self.ontology_versions[iri]
        else:
            ontologies = [
                o for versions in self.ontology_versions.values() for o in versions
            ]

        if not ontologies:
            return []

        # Build a set of all parent hashes
        all_parent_hashes = set()
        for o in ontologies:
            all_parent_hashes.update(o.parent_hashes)

        # Terminal nodes are those whose hash is not in any parent_hashes
        terminal_hashes = {o.hash for o in ontologies} - all_parent_hashes

        return [o for o in ontologies if o.hash in terminal_hashes]

    def get_terminal_ontologies(self, ontology_id: str | None = None) -> list[Ontology]:
        """Get terminal (leaf) ontologies by ontology_id (backward compatibility wrapper).

        Args:
            ontology_id: Optional ontology_id to filter by.

        Returns:
            list[Ontology]: List of terminal ontologies.
        """
        if ontology_id:
            # Find IRI(s) matching this ontology_id
            matching_iris = [
                iri
                for iri, versions in self.ontology_versions.items()
                if any(o.ontology_id == ontology_id for o in versions)
            ]
            if not matching_iris:
                return []
            # Get terminals for all matching IRIs
            all_terminals = []
            for iri in matching_iris:
                all_terminals.extend(self.get_terminal_ontologies_by_iri(iri))
            return all_terminals
        else:
            return self.get_terminal_ontologies_by_iri(None)

    def get_freshest_terminal_ontology_by_iri(
        self, iri: str | None = None
    ) -> Ontology | None:
        """Get the freshest terminal ontology based on created_at timestamp.

        Returns the terminal ontology with the most recent `created_at` timestamp.
        If multiple terminal ontologies exist, returns the one that was most recently
        created. If no created_at is set, falls back to the first terminal ontology.

        Args:
            iri: Optional IRI to filter by. If None, searches across
                all ontologies.

        Returns:
            Ontology: The freshest terminal ontology, or None if no terminal
                ontologies exist.
        """
        terminals = self.get_terminal_ontologies_by_iri(iri)

        if not terminals:
            return None

        # Filter out ontologies without created_at and sort by created_at
        with_timestamp = [o for o in terminals if o.created_at is not None]
        without_timestamp = [o for o in terminals if o.created_at is None]

        if with_timestamp:
            # Sort by created_at descending (most recent first)
            # Type assertion: we know created_at is not None due to filter above
            from datetime import datetime
            from typing import cast

            freshest = max(
                with_timestamp,
                key=lambda o: cast(datetime, o.created_at),
            )
            return freshest
        elif without_timestamp:
            # Fallback to first terminal if no timestamps available
            return without_timestamp[0]

        return None

    def get_freshest_terminal_ontology(
        self, ontology_id: str | None = None
    ) -> Ontology | None:
        """Get the freshest terminal ontology by ontology_id (backward compatibility wrapper).

        Args:
            ontology_id: Optional ontology_id to filter by.

        Returns:
            Ontology: The freshest terminal ontology, or None if no terminal
                ontologies exist.
        """
        if ontology_id:
            # Find IRI(s) matching this ontology_id
            matching_iris = [
                iri
                for iri, versions in self.ontology_versions.items()
                if any(o.ontology_id == ontology_id for o in versions)
            ]
            if not matching_iris:
                return None
            # Get freshest for all matching IRIs and return the most recent
            candidates = []
            for iri in matching_iris:
                freshest = self.get_freshest_terminal_ontology_by_iri(iri)
                if freshest:
                    candidates.append(freshest)
            if not candidates:
                return None
            # Return the most recent among all candidates
            from datetime import datetime
            from typing import cast

            with_timestamp = [o for o in candidates if o.created_at is not None]
            if with_timestamp:
                return max(with_timestamp, key=lambda o: cast(datetime, o.created_at))
            return candidates[0]
        else:
            return self.get_freshest_terminal_ontology_by_iri(None)

    def get_ontology_versions_by_iri(self, iri: str) -> list[Ontology]:
        """Get all versions of an ontology by IRI.

        Args:
            iri: The IRI to retrieve versions for.

        Returns:
            list[Ontology]: List of all versions of the ontology.
        """
        return self.ontology_versions.get(iri, [])

    def get_ontology_versions(self, ontology_id: str) -> list[Ontology]:
        """Get all versions of an ontology by ontology_id (backward compatibility wrapper).

        Args:
            ontology_id: The ontology_id to retrieve versions for.

        Returns:
            list[Ontology]: List of all versions of the ontology.
        """
        # Find all IRIs matching this ontology_id
        all_versions = []
        for iri, versions in self.ontology_versions.items():
            if any(o.ontology_id == ontology_id for o in versions):
                all_versions.extend(versions)
        return all_versions

    def get_lineage_graph_by_iri(self, iri: str):
        """Get the lineage graph for a specific IRI.

        Args:
            iri: The IRI to get the lineage graph for.

        Returns:
            networkx.DiGraph: The lineage graph for the ontology, or None if not found.
        """
        if iri not in self.ontology_versions:
            return None

        return Ontology.build_lineage_graph(self.ontology_versions[iri])

    def get_lineage_graph(self, ontology_id: str):
        """Get the lineage graph for a specific ontology_id (backward compatibility wrapper).

        Args:
            ontology_id: The ontology_id to get the lineage graph for.

        Returns:
            networkx.DiGraph: The lineage graph for the ontology, or None if not found.
        """
        # Find first IRI matching this ontology_id
        for iri, versions in self.ontology_versions.items():
            if any(o.ontology_id == ontology_id for o in versions):
                return Ontology.build_lineage_graph(versions)
        return None

    def get_ontology(
        self,
        ontology_id: str | None = None,
        ontology_iri: str | None = None,
        hash: str | None = None,
    ) -> Ontology:
        """Get an ontology by its IRI, ontology_id, or hash.

        If hash is provided, returns the specific version. Otherwise, returns
        a terminal (most recent) version if multiple versions exist.
        IRI is preferred over ontology_id for lookup.

        Args:
            ontology_id: The short name of the ontology to retrieve (optional, for backward compatibility).
            ontology_iri: The IRI of the ontology to retrieve (preferred).
            hash: The hash of a specific version to retrieve (optional).

        Returns:
            Ontology: The matching ontology if found, NULL_ONTOLOGY otherwise.
        """
        # If hash is provided, search by hash first
        if hash:
            for versions in self.ontology_versions.values():
                for o in versions:
                    if o.hash == hash:
                        return o

        # Try by IRI first (preferred method)
        if ontology_iri is not None:
            if ontology_iri in self.ontology_versions:
                versions = self.ontology_versions[ontology_iri]
                if hash:
                    # Find specific version by hash
                    for o in versions:
                        if o.hash == hash:
                            return o
                else:
                    # Return terminal version (most recent)
                    terminals = self.get_terminal_ontologies_by_iri(ontology_iri)
                    if terminals:
                        return terminals[0]
                    # Fallback to first version if no terminals
                    if versions:
                        return versions[0]

        # Try by ontology_id if provided (backward compatibility)
        if ontology_id is not None:
            # Find IRI(s) matching this ontology_id
            matching_iris = [
                iri
                for iri, versions in self.ontology_versions.items()
                if any(o.ontology_id == ontology_id for o in versions)
            ]
            if matching_iris:
                # Use first matching IRI
                iri = matching_iris[0]
                versions = self.ontology_versions[iri]
                if hash:
                    # Find specific version by hash
                    for o in versions:
                        if o.hash == hash:
                            return o
                else:
                    # Return terminal version (most recent)
                    terminals = self.get_terminal_ontologies_by_iri(iri)
                    if terminals:
                        return terminals[0]
                    # Fallback to first version if no terminals
                    if versions:
                        return versions[0]

                # If IRI is also provided, check consistency
                if ontology_iri and ontology_iri != iri:
                    logger.warning(
                        f"Ontology id '{ontology_id}' matches IRI '{iri}' but different IRI '{ontology_iri}' was provided"
                    )

        # Not found
        return NULL_ONTOLOGY

    def get_ontology_iris(self) -> list[str]:
        """Get a list of all ontology IRIs.

        Returns:
            list[str]: List of ontology IRIs.
        """
        return list(self.ontology_versions.keys())

    def get_ontology_names(self) -> list[str]:
        """Get a list of all ontology short names (backward compatibility wrapper).

        Returns:
            list[str]: List of unique ontology short names.
        """
        names = set()
        for versions in self.ontology_versions.values():
            for o in versions:
                if o.ontology_id:
                    names.add(o.ontology_id)
        return sorted(list(names))

    @property
    def has_ontologies(self) -> bool:
        """Check if there are any ontologies available.

        Returns:
            bool: True if there are any ontologies, False otherwise.
        """
        return len(self._cached_ontologies) > 0 or len(self.ontology_versions) > 0

    @property
    def ontologies(self) -> list[Ontology]:
        """Get freshest terminal ontology for each IRI.

        This property provides backward compatibility with code that expects
        a list of ontologies. Returns the freshest (most recently created)
        terminal version for each IRI.

        The result is cached per IRI (as hashes) and updated incrementally
        when ontologies are added.

        Returns:
            list[Ontology]: List of freshest terminal ontologies, one per IRI.
        """
        result = []

        # Ensure cache is up to date for all IRIs
        for iri in self.ontology_versions.keys():
            if iri not in self._cached_ontologies:
                freshest = self.get_freshest_terminal_ontology_by_iri(iri)
                if freshest and freshest.hash:
                    self._cached_ontologies[iri] = freshest.hash

        # Remove entries for IRIs that no longer exist
        cached_iris = set(self._cached_ontologies.keys())
        current_iris = set(self.ontology_versions.keys())
        for removed_iri in cached_iris - current_iris:
            del self._cached_ontologies[removed_iri]

        # Look up actual ontology objects by hash
        for iri, cached_hash in self._cached_ontologies.items():
            if iri in self.ontology_versions:
                # Find ontology with matching hash
                for ontology in self.ontology_versions[iri]:
                    if ontology.hash == cached_hash:
                        result.append(ontology)
                        break

        return result

    def update_ontology(self, ontology_id: str, ontology_addendum: RDFGraph):
        """Update an existing ontology with additional triples.

        Note: This method is deprecated. Use add_ontology() with a new version
        that has the current hash in parent_hashes instead.

        Args:
            ontology_id: The short name of the ontology to update.
            ontology_addendum: The RDF graph containing additional triples to add.
        """
        logger.warning(
            "update_ontology() is deprecated. Use add_ontology() with version tracking instead."
        )
        terminals = self.get_terminal_ontologies(ontology_id)
        if terminals:
            terminals[0] += ontology_addendum
            # Update cache for the IRI (though this method is deprecated)
            iri = terminals[0].iri
            freshest = self.get_freshest_terminal_ontology_by_iri(iri)
            if freshest and freshest.hash:
                self._cached_ontologies[iri] = freshest.hash

has_ontologies property

Check if there are any ontologies available.

Returns:

Name Type Description
bool bool

True if there are any ontologies, False otherwise.

ontologies property

Get freshest terminal ontology for each IRI.

This property provides backward compatibility with code that expects a list of ontologies. Returns the freshest (most recently created) terminal version for each IRI.

The result is cached per IRI (as hashes) and updated incrementally when ontologies are added.

Returns:

Type Description
list[Ontology]

list[Ontology]: List of freshest terminal ontologies, one per IRI.

__contains__(item)

Check if an item (IRI or ontology_id) is in the ontology manager.

Parameters:

Name Type Description Default
item

The IRI or ontology_id to check.

required

Returns:

Name Type Description
bool

True if the item exists in any version of any ontology.

Source code in ontocast/tool/ontology_manager.py
def __contains__(self, item):
    """Check if an item (IRI or ontology_id) is in the ontology manager.

    Args:
        item: The IRI or ontology_id to check.

    Returns:
        bool: True if the item exists in any version of any ontology.
    """
    # Check by IRI (primary key)
    if item in self.ontology_versions:
        return True
    # Check by ontology_id (fallback for backward compatibility)
    for versions in self.ontology_versions.values():
        for o in versions:
            if o.ontology_id == item:
                return True
    return False

__init__(**kwargs)

Initialize the ontology manager.

Parameters:

Name Type Description Default
**kwargs

Additional keyword arguments passed to the parent class.

{}
Source code in ontocast/tool/ontology_manager.py
def __init__(self, **kwargs):
    """Initialize the ontology manager.

    Args:
        **kwargs: Additional keyword arguments passed to the parent class.
    """
    super().__init__(**kwargs)
    # Cache dictionary mapping IRI to hash of freshest terminal ontology.
    # Updated incrementally when ontologies are added.
    self._cached_ontologies: dict[str, str] = {}
    self._patch_retriever: OntologyPatchRetriever | None = None
    self._iri_to_identity: dict[str, str] = {}
    self._identity_to_iri: dict[str, str] = {}

add_ontology(ontology, *, skip_vector_index=False)

Add an ontology to the version tree for its IRI.

If an ontology with the same hash already exists, it is not added again. The ontology is added to the version tree for its IRI. Ensures that created_at is set if not already present.

Parameters:

Name Type Description Default
ontology Ontology

The ontology to add.

required
skip_vector_index bool

If True, do not call the vector store (caller already materialized embeddings, e.g. during ToolBox.initialize).

False
Source code in ontocast/tool/ontology_manager.py
def add_ontology(
    self, ontology: Ontology, *, skip_vector_index: bool = False
) -> None:
    """Add an ontology to the version tree for its IRI.

    If an ontology with the same hash already exists, it is not added again.
    The ontology is added to the version tree for its IRI.
    Ensures that created_at is set if not already present.

    Args:
        ontology: The ontology to add.
        skip_vector_index: If True, do not call the vector store (caller
            already materialized embeddings, e.g. during ToolBox.initialize).
    """
    if not ontology.iri or ontology.iri == NULL_ONTOLOGY.iri:
        logger.warning(
            f"Cannot add ontology without valid IRI (ontology_id: {ontology.ontology_id})"
        )
        return

    if not ontology.hash:
        logger.warning(f"Cannot add ontology without hash (IRI: {ontology.iri})")
        return

    self.validate_identity_uniqueness(ontology)
    self._register_identity(ontology)

    # Ensure created_at is set
    if not ontology.created_at:
        from datetime import datetime, timezone

        ontology.created_at = datetime.now(timezone.utc)
        logger.debug(
            f"Set created_at for ontology {ontology.iri} with hash {ontology.hash[:8]}..."
        )

    if ontology.iri not in self.ontology_versions:
        self.ontology_versions[ontology.iri] = []

    # Check if this hash already exists
    existing_hashes = {o.hash for o in self.ontology_versions[ontology.iri]}
    if ontology.hash not in existing_hashes:
        self.ontology_versions[ontology.iri].append(ontology)
        if self._patch_retriever is not None and not skip_vector_index:
            self._patch_retriever.vector_store.reindex_ontology(ontology)
        # Update cache for this specific IRI (store hash only)
        freshest = self.get_freshest_terminal_ontology_by_iri(ontology.iri)
        if freshest and freshest.hash:
            self._cached_ontologies[ontology.iri] = freshest.hash
        logger.debug(
            f"Added ontology {ontology.iri} with hash {ontology.hash[:8]}..."
        )
    else:
        logger.debug(
            f"Ontology {ontology.iri} with hash {ontology.hash[:8]}... already exists"
        )

aget_patch_context(query, top_k=None, subgraph_depth=1, max_total_triples=300, estimated_triples_per_query=24) async

Async variant of :meth:get_patch_context.

Source code in ontocast/tool/ontology_manager.py
async def aget_patch_context(
    self,
    query: str,
    top_k: int | None = None,
    subgraph_depth: int = 1,
    max_total_triples: int = 300,
    estimated_triples_per_query: int = 24,
) -> RDFGraph | None:
    """Async variant of :meth:`get_patch_context`."""
    graph, _ = await self.aget_patch_context_with_sources(
        query=query,
        top_k=top_k,
        subgraph_depth=subgraph_depth,
        max_total_triples=max_total_triples,
        estimated_triples_per_query=estimated_triples_per_query,
    )
    return graph

aget_patch_context_with_sources(query, top_k=None, subgraph_depth=1, max_total_triples=300, estimated_triples_per_query=24) async

Async variant of :meth:get_patch_context_with_sources.

Source code in ontocast/tool/ontology_manager.py
async def aget_patch_context_with_sources(
    self,
    query: str,
    top_k: int | None = None,
    subgraph_depth: int = 1,
    max_total_triples: int = 300,
    estimated_triples_per_query: int = 24,
) -> tuple[RDFGraph | None, list[str]]:
    """Async variant of :meth:`get_patch_context_with_sources`."""
    results = await self.aget_patch_contexts_with_sources(
        queries=[query],
        top_k=top_k,
        subgraph_depth=subgraph_depth,
        max_total_triples=max_total_triples,
        estimated_triples_per_query=estimated_triples_per_query,
    )
    if not results:
        return None, []
    return results[0]

aget_patch_contexts_with_sources(queries, top_k=None, subgraph_depth=1, max_total_triples=300, estimated_triples_per_query=24) async

Async patch retrieval (vector + induced subgraph) for many queries.

With a patch retriever, returns a one-element list: a single induced graph for the union of hits over queries, plus contributing ontology IRIs.

Source code in ontocast/tool/ontology_manager.py
async def aget_patch_contexts_with_sources(
    self,
    queries: list[str],
    top_k: int | None = None,
    subgraph_depth: int = 1,
    max_total_triples: int = 300,
    estimated_triples_per_query: int = 24,
) -> list[tuple[RDFGraph | None, list[str]]]:
    """Async patch retrieval (vector + induced subgraph) for many queries.

    With a patch retriever, returns a one-element list: a single induced graph for
    the union of hits over ``queries``, plus contributing ontology IRIs.
    """
    if not queries:
        return []
    if self._patch_retriever is not None:
        graph, sources = await self._patch_retriever.aretrieve_ensemble(
            queries=queries,
            top_k=self._effective_patch_top_k(top_k),
            subgraph_depth=subgraph_depth,
            max_total_triples=max_total_triples,
            estimated_triples_per_query=estimated_triples_per_query,
        )
        return [(graph, sources) if len(graph) > 0 else (RDFGraph(), sources)]

    fallback = self.get_freshest_terminal_ontology_by_iri(None)
    if fallback is None:
        return [(None, []) for _ in queries]
    fallback_graph = deepcopy(fallback.graph)
    return [(deepcopy(fallback_graph), [fallback.iri]) for _ in queries]

get_freshest_terminal_ontology(ontology_id=None)

Get the freshest terminal ontology by ontology_id (backward compatibility wrapper).

Parameters:

Name Type Description Default
ontology_id str | None

Optional ontology_id to filter by.

None

Returns:

Name Type Description
Ontology Ontology | None

The freshest terminal ontology, or None if no terminal ontologies exist.

Source code in ontocast/tool/ontology_manager.py
def get_freshest_terminal_ontology(
    self, ontology_id: str | None = None
) -> Ontology | None:
    """Get the freshest terminal ontology by ontology_id (backward compatibility wrapper).

    Args:
        ontology_id: Optional ontology_id to filter by.

    Returns:
        Ontology: The freshest terminal ontology, or None if no terminal
            ontologies exist.
    """
    if ontology_id:
        # Find IRI(s) matching this ontology_id
        matching_iris = [
            iri
            for iri, versions in self.ontology_versions.items()
            if any(o.ontology_id == ontology_id for o in versions)
        ]
        if not matching_iris:
            return None
        # Get freshest for all matching IRIs and return the most recent
        candidates = []
        for iri in matching_iris:
            freshest = self.get_freshest_terminal_ontology_by_iri(iri)
            if freshest:
                candidates.append(freshest)
        if not candidates:
            return None
        # Return the most recent among all candidates
        from datetime import datetime
        from typing import cast

        with_timestamp = [o for o in candidates if o.created_at is not None]
        if with_timestamp:
            return max(with_timestamp, key=lambda o: cast(datetime, o.created_at))
        return candidates[0]
    else:
        return self.get_freshest_terminal_ontology_by_iri(None)

get_freshest_terminal_ontology_by_iri(iri=None)

Get the freshest terminal ontology based on created_at timestamp.

Returns the terminal ontology with the most recent created_at timestamp. If multiple terminal ontologies exist, returns the one that was most recently created. If no created_at is set, falls back to the first terminal ontology.

Parameters:

Name Type Description Default
iri str | None

Optional IRI to filter by. If None, searches across all ontologies.

None

Returns:

Name Type Description
Ontology Ontology | None

The freshest terminal ontology, or None if no terminal ontologies exist.

Source code in ontocast/tool/ontology_manager.py
def get_freshest_terminal_ontology_by_iri(
    self, iri: str | None = None
) -> Ontology | None:
    """Get the freshest terminal ontology based on created_at timestamp.

    Returns the terminal ontology with the most recent `created_at` timestamp.
    If multiple terminal ontologies exist, returns the one that was most recently
    created. If no created_at is set, falls back to the first terminal ontology.

    Args:
        iri: Optional IRI to filter by. If None, searches across
            all ontologies.

    Returns:
        Ontology: The freshest terminal ontology, or None if no terminal
            ontologies exist.
    """
    terminals = self.get_terminal_ontologies_by_iri(iri)

    if not terminals:
        return None

    # Filter out ontologies without created_at and sort by created_at
    with_timestamp = [o for o in terminals if o.created_at is not None]
    without_timestamp = [o for o in terminals if o.created_at is None]

    if with_timestamp:
        # Sort by created_at descending (most recent first)
        # Type assertion: we know created_at is not None due to filter above
        from datetime import datetime
        from typing import cast

        freshest = max(
            with_timestamp,
            key=lambda o: cast(datetime, o.created_at),
        )
        return freshest
    elif without_timestamp:
        # Fallback to first terminal if no timestamps available
        return without_timestamp[0]

    return None

get_lineage_graph(ontology_id)

Get the lineage graph for a specific ontology_id (backward compatibility wrapper).

Parameters:

Name Type Description Default
ontology_id str

The ontology_id to get the lineage graph for.

required

Returns:

Type Description

networkx.DiGraph: The lineage graph for the ontology, or None if not found.

Source code in ontocast/tool/ontology_manager.py
def get_lineage_graph(self, ontology_id: str):
    """Get the lineage graph for a specific ontology_id (backward compatibility wrapper).

    Args:
        ontology_id: The ontology_id to get the lineage graph for.

    Returns:
        networkx.DiGraph: The lineage graph for the ontology, or None if not found.
    """
    # Find first IRI matching this ontology_id
    for iri, versions in self.ontology_versions.items():
        if any(o.ontology_id == ontology_id for o in versions):
            return Ontology.build_lineage_graph(versions)
    return None

get_lineage_graph_by_iri(iri)

Get the lineage graph for a specific IRI.

Parameters:

Name Type Description Default
iri str

The IRI to get the lineage graph for.

required

Returns:

Type Description

networkx.DiGraph: The lineage graph for the ontology, or None if not found.

Source code in ontocast/tool/ontology_manager.py
def get_lineage_graph_by_iri(self, iri: str):
    """Get the lineage graph for a specific IRI.

    Args:
        iri: The IRI to get the lineage graph for.

    Returns:
        networkx.DiGraph: The lineage graph for the ontology, or None if not found.
    """
    if iri not in self.ontology_versions:
        return None

    return Ontology.build_lineage_graph(self.ontology_versions[iri])

get_ontology(ontology_id=None, ontology_iri=None, hash=None)

Get an ontology by its IRI, ontology_id, or hash.

If hash is provided, returns the specific version. Otherwise, returns a terminal (most recent) version if multiple versions exist. IRI is preferred over ontology_id for lookup.

Parameters:

Name Type Description Default
ontology_id str | None

The short name of the ontology to retrieve (optional, for backward compatibility).

None
ontology_iri str | None

The IRI of the ontology to retrieve (preferred).

None
hash str | None

The hash of a specific version to retrieve (optional).

None

Returns:

Name Type Description
Ontology Ontology

The matching ontology if found, NULL_ONTOLOGY otherwise.

Source code in ontocast/tool/ontology_manager.py
def get_ontology(
    self,
    ontology_id: str | None = None,
    ontology_iri: str | None = None,
    hash: str | None = None,
) -> Ontology:
    """Get an ontology by its IRI, ontology_id, or hash.

    If hash is provided, returns the specific version. Otherwise, returns
    a terminal (most recent) version if multiple versions exist.
    IRI is preferred over ontology_id for lookup.

    Args:
        ontology_id: The short name of the ontology to retrieve (optional, for backward compatibility).
        ontology_iri: The IRI of the ontology to retrieve (preferred).
        hash: The hash of a specific version to retrieve (optional).

    Returns:
        Ontology: The matching ontology if found, NULL_ONTOLOGY otherwise.
    """
    # If hash is provided, search by hash first
    if hash:
        for versions in self.ontology_versions.values():
            for o in versions:
                if o.hash == hash:
                    return o

    # Try by IRI first (preferred method)
    if ontology_iri is not None:
        if ontology_iri in self.ontology_versions:
            versions = self.ontology_versions[ontology_iri]
            if hash:
                # Find specific version by hash
                for o in versions:
                    if o.hash == hash:
                        return o
            else:
                # Return terminal version (most recent)
                terminals = self.get_terminal_ontologies_by_iri(ontology_iri)
                if terminals:
                    return terminals[0]
                # Fallback to first version if no terminals
                if versions:
                    return versions[0]

    # Try by ontology_id if provided (backward compatibility)
    if ontology_id is not None:
        # Find IRI(s) matching this ontology_id
        matching_iris = [
            iri
            for iri, versions in self.ontology_versions.items()
            if any(o.ontology_id == ontology_id for o in versions)
        ]
        if matching_iris:
            # Use first matching IRI
            iri = matching_iris[0]
            versions = self.ontology_versions[iri]
            if hash:
                # Find specific version by hash
                for o in versions:
                    if o.hash == hash:
                        return o
            else:
                # Return terminal version (most recent)
                terminals = self.get_terminal_ontologies_by_iri(iri)
                if terminals:
                    return terminals[0]
                # Fallback to first version if no terminals
                if versions:
                    return versions[0]

            # If IRI is also provided, check consistency
            if ontology_iri and ontology_iri != iri:
                logger.warning(
                    f"Ontology id '{ontology_id}' matches IRI '{iri}' but different IRI '{ontology_iri}' was provided"
                )

    # Not found
    return NULL_ONTOLOGY

get_ontology_iris()

Get a list of all ontology IRIs.

Returns:

Type Description
list[str]

list[str]: List of ontology IRIs.

Source code in ontocast/tool/ontology_manager.py
def get_ontology_iris(self) -> list[str]:
    """Get a list of all ontology IRIs.

    Returns:
        list[str]: List of ontology IRIs.
    """
    return list(self.ontology_versions.keys())

get_ontology_names()

Get a list of all ontology short names (backward compatibility wrapper).

Returns:

Type Description
list[str]

list[str]: List of unique ontology short names.

Source code in ontocast/tool/ontology_manager.py
def get_ontology_names(self) -> list[str]:
    """Get a list of all ontology short names (backward compatibility wrapper).

    Returns:
        list[str]: List of unique ontology short names.
    """
    names = set()
    for versions in self.ontology_versions.values():
        for o in versions:
            if o.ontology_id:
                names.add(o.ontology_id)
    return sorted(list(names))

get_ontology_versions(ontology_id)

Get all versions of an ontology by ontology_id (backward compatibility wrapper).

Parameters:

Name Type Description Default
ontology_id str

The ontology_id to retrieve versions for.

required

Returns:

Type Description
list[Ontology]

list[Ontology]: List of all versions of the ontology.

Source code in ontocast/tool/ontology_manager.py
def get_ontology_versions(self, ontology_id: str) -> list[Ontology]:
    """Get all versions of an ontology by ontology_id (backward compatibility wrapper).

    Args:
        ontology_id: The ontology_id to retrieve versions for.

    Returns:
        list[Ontology]: List of all versions of the ontology.
    """
    # Find all IRIs matching this ontology_id
    all_versions = []
    for iri, versions in self.ontology_versions.items():
        if any(o.ontology_id == ontology_id for o in versions):
            all_versions.extend(versions)
    return all_versions

get_ontology_versions_by_iri(iri)

Get all versions of an ontology by IRI.

Parameters:

Name Type Description Default
iri str

The IRI to retrieve versions for.

required

Returns:

Type Description
list[Ontology]

list[Ontology]: List of all versions of the ontology.

Source code in ontocast/tool/ontology_manager.py
def get_ontology_versions_by_iri(self, iri: str) -> list[Ontology]:
    """Get all versions of an ontology by IRI.

    Args:
        iri: The IRI to retrieve versions for.

    Returns:
        list[Ontology]: List of all versions of the ontology.
    """
    return self.ontology_versions.get(iri, [])

get_patch_context(query, top_k=None, subgraph_depth=1, max_total_triples=300, estimated_triples_per_query=24)

Retrieve multi-ontology patch context for a query.

Falls back to the freshest available ontology graph if vector retrieval is not configured or yields no atoms.

Source code in ontocast/tool/ontology_manager.py
def get_patch_context(
    self,
    query: str,
    top_k: int | None = None,
    subgraph_depth: int = 1,
    max_total_triples: int = 300,
    estimated_triples_per_query: int = 24,
) -> RDFGraph | None:
    """Retrieve multi-ontology patch context for a query.

    Falls back to the freshest available ontology graph if vector retrieval
    is not configured or yields no atoms.
    """
    graph, _ = self.get_patch_context_with_sources(
        query=query,
        top_k=top_k,
        subgraph_depth=subgraph_depth,
        max_total_triples=max_total_triples,
        estimated_triples_per_query=estimated_triples_per_query,
    )
    return graph

get_patch_context_with_sources(query, top_k=None, subgraph_depth=1, max_total_triples=300, estimated_triples_per_query=24)

Retrieve patch context and contributing ontology IRIs.

Source code in ontocast/tool/ontology_manager.py
def get_patch_context_with_sources(
    self,
    query: str,
    top_k: int | None = None,
    subgraph_depth: int = 1,
    max_total_triples: int = 300,
    estimated_triples_per_query: int = 24,
) -> tuple[RDFGraph | None, list[str]]:
    """Retrieve patch context and contributing ontology IRIs."""
    results = self.get_patch_contexts_with_sources(
        queries=[query],
        top_k=top_k,
        subgraph_depth=subgraph_depth,
        max_total_triples=max_total_triples,
        estimated_triples_per_query=estimated_triples_per_query,
    )
    if not results:
        return None, []
    return results[0]

get_patch_contexts_with_sources(queries, top_k=None, subgraph_depth=1, max_total_triples=300, estimated_triples_per_query=24)

Retrieve patch contexts for many queries in a batched pass.

With a patch retriever, the list has length 1 (ensemble graph + sources). Without it, length matches queries (fallback ontology per query).

Source code in ontocast/tool/ontology_manager.py
def get_patch_contexts_with_sources(
    self,
    queries: list[str],
    top_k: int | None = None,
    subgraph_depth: int = 1,
    max_total_triples: int = 300,
    estimated_triples_per_query: int = 24,
) -> list[tuple[RDFGraph | None, list[str]]]:
    """Retrieve patch contexts for many queries in a batched pass.

    With a patch retriever, the list has length 1 (ensemble graph + sources).
    Without it, length matches ``queries`` (fallback ontology per query).
    """
    if not queries:
        return []
    if self._patch_retriever is not None:
        graph, sources = self._patch_retriever.retrieve_ensemble(
            queries=queries,
            top_k=self._effective_patch_top_k(top_k),
            subgraph_depth=subgraph_depth,
            max_total_triples=max_total_triples,
            estimated_triples_per_query=estimated_triples_per_query,
        )
        return [(graph, sources) if len(graph) > 0 else (RDFGraph(), sources)]

    fallback = self.get_freshest_terminal_ontology_by_iri(None)
    if fallback is None:
        return [(None, []) for _ in queries]
    fallback_graph = deepcopy(fallback.graph)
    return [(deepcopy(fallback_graph), [fallback.iri]) for _ in queries]

get_terminal_ontologies(ontology_id=None)

Get terminal (leaf) ontologies by ontology_id (backward compatibility wrapper).

Parameters:

Name Type Description Default
ontology_id str | None

Optional ontology_id to filter by.

None

Returns:

Type Description
list[Ontology]

list[Ontology]: List of terminal ontologies.

Source code in ontocast/tool/ontology_manager.py
def get_terminal_ontologies(self, ontology_id: str | None = None) -> list[Ontology]:
    """Get terminal (leaf) ontologies by ontology_id (backward compatibility wrapper).

    Args:
        ontology_id: Optional ontology_id to filter by.

    Returns:
        list[Ontology]: List of terminal ontologies.
    """
    if ontology_id:
        # Find IRI(s) matching this ontology_id
        matching_iris = [
            iri
            for iri, versions in self.ontology_versions.items()
            if any(o.ontology_id == ontology_id for o in versions)
        ]
        if not matching_iris:
            return []
        # Get terminals for all matching IRIs
        all_terminals = []
        for iri in matching_iris:
            all_terminals.extend(self.get_terminal_ontologies_by_iri(iri))
        return all_terminals
    else:
        return self.get_terminal_ontologies_by_iri(None)

get_terminal_ontologies_by_iri(iri=None)

Get terminal (leaf) ontologies in the version graph.

Terminal ontologies are those that are not parents of any other ontology in the version tree. If iri is provided, returns terminals for that ontology only; otherwise returns terminals for all ontologies.

Parameters:

Name Type Description Default
iri str | None

Optional IRI to filter by.

None

Returns:

Type Description
list[Ontology]

list[Ontology]: List of terminal ontologies.

Source code in ontocast/tool/ontology_manager.py
def get_terminal_ontologies_by_iri(self, iri: str | None = None) -> list[Ontology]:
    """Get terminal (leaf) ontologies in the version graph.

    Terminal ontologies are those that are not parents of any other ontology
    in the version tree. If iri is provided, returns terminals for
    that ontology only; otherwise returns terminals for all ontologies.

    Args:
        iri: Optional IRI to filter by.

    Returns:
        list[Ontology]: List of terminal ontologies.
    """
    if iri:
        if iri not in self.ontology_versions:
            return []
        ontologies = self.ontology_versions[iri]
    else:
        ontologies = [
            o for versions in self.ontology_versions.values() for o in versions
        ]

    if not ontologies:
        return []

    # Build a set of all parent hashes
    all_parent_hashes = set()
    for o in ontologies:
        all_parent_hashes.update(o.parent_hashes)

    # Terminal nodes are those whose hash is not in any parent_hashes
    terminal_hashes = {o.hash for o in ontologies} - all_parent_hashes

    return [o for o in ontologies if o.hash in terminal_hashes]

register_vector_store(retriever)

Register a patch retriever for vector context lookups.

Source code in ontocast/tool/ontology_manager.py
def register_vector_store(self, retriever: "OntologyPatchRetriever") -> None:
    """Register a patch retriever for vector context lookups."""
    self._patch_retriever = retriever

remove_ontology_by_iri(iri)

Drop all tracked versions for an ontology IRI and clear caches.

Source code in ontocast/tool/ontology_manager.py
def remove_ontology_by_iri(self, iri: str) -> None:
    """Drop all tracked versions for an ontology IRI and clear caches."""
    self.ontology_versions.pop(iri, None)
    self._cached_ontologies.pop(iri, None)
    removed_identity = self._iri_to_identity.pop(iri, None)
    if removed_identity is not None:
        self._identity_to_iri.pop(removed_identity, None)

update_ontology(ontology_id, ontology_addendum)

Update an existing ontology with additional triples.

Note: This method is deprecated. Use add_ontology() with a new version that has the current hash in parent_hashes instead.

Parameters:

Name Type Description Default
ontology_id str

The short name of the ontology to update.

required
ontology_addendum RDFGraph

The RDF graph containing additional triples to add.

required
Source code in ontocast/tool/ontology_manager.py
def update_ontology(self, ontology_id: str, ontology_addendum: RDFGraph):
    """Update an existing ontology with additional triples.

    Note: This method is deprecated. Use add_ontology() with a new version
    that has the current hash in parent_hashes instead.

    Args:
        ontology_id: The short name of the ontology to update.
        ontology_addendum: The RDF graph containing additional triples to add.
    """
    logger.warning(
        "update_ontology() is deprecated. Use add_ontology() with version tracking instead."
    )
    terminals = self.get_terminal_ontologies(ontology_id)
    if terminals:
        terminals[0] += ontology_addendum
        # Update cache for the IRI (though this method is deprecated)
        iri = terminals[0].iri
        freshest = self.get_freshest_terminal_ontology_by_iri(iri)
        if freshest and freshest.hash:
            self._cached_ontologies[iri] = freshest.hash

validate_identity_uniqueness(ontology)

Validate ontology IRI<->identity bijection across the manager.

Source code in ontocast/tool/ontology_manager.py
def validate_identity_uniqueness(self, ontology: Ontology) -> None:
    """Validate ontology IRI<->identity bijection across the manager."""
    iri = (ontology.iri or "").strip()
    if not iri:
        raise ValueError("Ontology IRI is missing")
    if iri == NULL_ONTOLOGY.iri:
        raise ValueError("Null ontology IRI cannot be registered")

    identity = self._build_identity_key(ontology)

    existing_identity = self._iri_to_identity.get(iri)
    if existing_identity is not None and existing_identity != identity:
        raise ValueError(
            "Ontology identity conflict: IRI "
            f"'{iri}' is already bound to identity '{existing_identity}', "
            f"received '{identity}'"
        )

    existing_iri = self._identity_to_iri.get(identity)
    if existing_iri is not None and existing_iri != iri:
        raise ValueError(
            "Ontology identity conflict: identity "
            f"'{identity}' is already bound to IRI '{existing_iri}', "
            f"received '{iri}'"
        )

OntologyPatchRetriever

Bases: Tool

Combines vector retrieval into one composite ontology graph.

Source code in ontocast/tool/vector_store/patch_retriever.py
class OntologyPatchRetriever(Tool):
    """Combines vector retrieval into one composite ontology graph."""

    vector_store: QdrantVectorStore = Field(exclude=True)
    sparql_tool: Any | None = Field(default=None, exclude=True)
    patch: PatchRetrievalConfig = Field(
        default_factory=PatchRetrievalConfig,
        exclude=True,
    )

    def _effective_top_k(self, top_k: int | None) -> int:
        if top_k is not None:
            return top_k
        return self.vector_store.config.top_k

    def retrieve(
        self,
        query: str,
        top_k: int | None = None,
        expand_sparql: bool = True,
        subgraph_depth: int = 1,
        max_total_triples: int = 300,
        estimated_triples_per_query: int = 24,
    ) -> tuple[RDFGraph, list[str]]:
        """Retrieve top-k hits for one query and optional induced subgraph; returns source ontology IRIs."""
        try:
            asyncio.get_running_loop()
        except RuntimeError:
            return asyncio.run(
                self.aretrieve(
                    query=query,
                    top_k=top_k,
                    expand_sparql=expand_sparql,
                    subgraph_depth=subgraph_depth,
                    max_total_triples=max_total_triples,
                    estimated_triples_per_query=estimated_triples_per_query,
                )
            )
        raise RuntimeError(
            "retrieve() cannot be called from async code; use await aretrieve()"
        )

    def retrieve_ensemble(
        self,
        queries: list[str],
        top_k: int | None = None,
        expand_sparql: bool = True,
        subgraph_depth: int = 1,
        max_total_triples: int = 300,
        estimated_triples_per_query: int = 24,
    ) -> tuple[RDFGraph, list[str]]:
        """Sync: one induced graph and source IRIs for the union of vector hits over ``queries``."""
        try:
            asyncio.get_running_loop()
        except RuntimeError:
            return asyncio.run(
                self.aretrieve_ensemble(
                    queries=queries,
                    top_k=top_k,
                    expand_sparql=expand_sparql,
                    subgraph_depth=subgraph_depth,
                    max_total_triples=max_total_triples,
                    estimated_triples_per_query=estimated_triples_per_query,
                )
            )
        raise RuntimeError(
            "retrieve_ensemble() is not allowed inside async code; use aretrieve_ensemble()"
        )

    async def aretrieve(
        self,
        query: str,
        top_k: int | None = None,
        expand_sparql: bool = True,
        subgraph_depth: int = 1,
        max_total_triples: int = 300,
        estimated_triples_per_query: int = 24,
    ) -> tuple[RDFGraph, list[str]]:
        """Async single-query variant of :meth:`aretrieve_ensemble`."""
        return await self.aretrieve_ensemble(
            queries=[query],
            top_k=top_k,
            expand_sparql=expand_sparql,
            subgraph_depth=subgraph_depth,
            max_total_triples=max_total_triples,
            estimated_triples_per_query=estimated_triples_per_query,
        )

    async def aretrieve_ensemble(
        self,
        queries: list[str],
        top_k: int | None = None,
        expand_sparql: bool = True,
        subgraph_depth: int = 1,
        max_total_triples: int = 300,
        estimated_triples_per_query: int = 24,
    ) -> tuple[RDFGraph, list[str]]:
        """Vector search over all ``queries`` once, score-filter, dedupe, single subgraph expansion.

        Hits are filtered per query and per channel relative to each channel's best
        score (see ``PatchRetrievalConfig`` per-query ratio fields for core,
        neighborhood, and BM25), then merged by rank fusion so channels with
        different score distributions all contribute. Optional per-channel
        min-best filters and ``min_merged_max_score`` reject weak or irrelevant
        candidates.

        Returns the merged RDF graph (possibly disconnected across ontologies) and sorted
        distinct ontology IRIs that contributed vector hits.
        """
        if not queries:
            return RDFGraph(), []
        eff_top_k = self._effective_top_k(top_k)
        hits_by_query = await self.vector_store.asearch_patch_hits_many(
            queries=queries,
            top_k=eff_top_k,
        )
        qc = self.vector_store.config
        pc = self.patch
        merged = _filter_and_merge_patch_hits(
            hits_by_query,
            qdrant_config=qc,
            per_query_core_score_ratio=pc.per_query_core_score_ratio,
            per_query_neighborhood_score_ratio=pc.per_query_neighborhood_score_ratio,
            per_query_bm25_score_ratio=pc.per_query_bm25_score_ratio,
            min_core_query_best_score=pc.min_core_query_best_score,
            min_neighborhood_query_best_score=pc.min_neighborhood_query_best_score,
            min_bm25_query_best_score=pc.min_bm25_query_best_score,
            min_merged_max_score=pc.min_merged_max_score,
        )
        if merged and pc.merged_score_ratio > 0.0:
            merged_top = float(merged[0].score or 0.0)
            merged_floor = merged_top * pc.merged_score_ratio
            merged = [
                atom for atom in merged if float(atom.score or 0.0) >= merged_floor
            ]
        if merged and pc.mmr_lambda < 1.0:
            vectors = await self.vector_store.afetch_vectors(
                [atom.atom_id for atom in merged]
            )
            merged = _mmr_rerank(
                merged,
                vectors,
                mmr_lambda=pc.mmr_lambda,
                max_atoms=pc.max_atoms,
                core_weight=qc.fusion_core_weight,
                neighborhood_weight=qc.fusion_neighborhood_weight,
            )
        elif pc.max_atoms > 0:
            merged = merged[: pc.max_atoms]
        source_iris = _source_iris_from_atoms(merged)

        if not expand_sparql or self.sparql_tool is None:
            return RDFGraph(), source_iris

        if not merged:
            return RDFGraph(), []

        entity_uris, entity_relevance = _ranked_entity_weights(merged)
        ontology_iris = sorted(
            {atom.ontology_iri for atom in merged if atom.ontology_iri}
        )
        ontology_version_filters: dict[str, set[str]] = {}
        ontology_hash_filters: dict[str, set[str]] = {}
        for atom in merged:
            if atom.ontology_iri and atom.ontology_version:
                ontology_version_filters.setdefault(atom.ontology_iri, set()).add(
                    str(atom.ontology_version)
                )
            if atom.ontology_iri and atom.ontology_hash:
                ontology_hash_filters.setdefault(atom.ontology_iri, set()).add(
                    atom.ontology_hash
                )

        graph = await self.sparql_tool.aget_induced_subgraph(
            entity_uris=entity_uris,
            entity_relevance=entity_relevance,
            ontology_iris=ontology_iris,
            depth=subgraph_depth,
            max_total_triples=max_total_triples,
            estimated_triples_per_query=estimated_triples_per_query,
            ontology_version_filters=ontology_version_filters or None,
            ontology_hash_filters=ontology_hash_filters or None,
        )
        _bind_common_vocab_prefixes(graph)
        return graph, source_iris

aretrieve(query, top_k=None, expand_sparql=True, subgraph_depth=1, max_total_triples=300, estimated_triples_per_query=24) async

Async single-query variant of :meth:aretrieve_ensemble.

Source code in ontocast/tool/vector_store/patch_retriever.py
async def aretrieve(
    self,
    query: str,
    top_k: int | None = None,
    expand_sparql: bool = True,
    subgraph_depth: int = 1,
    max_total_triples: int = 300,
    estimated_triples_per_query: int = 24,
) -> tuple[RDFGraph, list[str]]:
    """Async single-query variant of :meth:`aretrieve_ensemble`."""
    return await self.aretrieve_ensemble(
        queries=[query],
        top_k=top_k,
        expand_sparql=expand_sparql,
        subgraph_depth=subgraph_depth,
        max_total_triples=max_total_triples,
        estimated_triples_per_query=estimated_triples_per_query,
    )

aretrieve_ensemble(queries, top_k=None, expand_sparql=True, subgraph_depth=1, max_total_triples=300, estimated_triples_per_query=24) async

Vector search over all queries once, score-filter, dedupe, single subgraph expansion.

Hits are filtered per query and per channel relative to each channel's best score (see PatchRetrievalConfig per-query ratio fields for core, neighborhood, and BM25), then merged by rank fusion so channels with different score distributions all contribute. Optional per-channel min-best filters and min_merged_max_score reject weak or irrelevant candidates.

Returns the merged RDF graph (possibly disconnected across ontologies) and sorted distinct ontology IRIs that contributed vector hits.

Source code in ontocast/tool/vector_store/patch_retriever.py
async def aretrieve_ensemble(
    self,
    queries: list[str],
    top_k: int | None = None,
    expand_sparql: bool = True,
    subgraph_depth: int = 1,
    max_total_triples: int = 300,
    estimated_triples_per_query: int = 24,
) -> tuple[RDFGraph, list[str]]:
    """Vector search over all ``queries`` once, score-filter, dedupe, single subgraph expansion.

    Hits are filtered per query and per channel relative to each channel's best
    score (see ``PatchRetrievalConfig`` per-query ratio fields for core,
    neighborhood, and BM25), then merged by rank fusion so channels with
    different score distributions all contribute. Optional per-channel
    min-best filters and ``min_merged_max_score`` reject weak or irrelevant
    candidates.

    Returns the merged RDF graph (possibly disconnected across ontologies) and sorted
    distinct ontology IRIs that contributed vector hits.
    """
    if not queries:
        return RDFGraph(), []
    eff_top_k = self._effective_top_k(top_k)
    hits_by_query = await self.vector_store.asearch_patch_hits_many(
        queries=queries,
        top_k=eff_top_k,
    )
    qc = self.vector_store.config
    pc = self.patch
    merged = _filter_and_merge_patch_hits(
        hits_by_query,
        qdrant_config=qc,
        per_query_core_score_ratio=pc.per_query_core_score_ratio,
        per_query_neighborhood_score_ratio=pc.per_query_neighborhood_score_ratio,
        per_query_bm25_score_ratio=pc.per_query_bm25_score_ratio,
        min_core_query_best_score=pc.min_core_query_best_score,
        min_neighborhood_query_best_score=pc.min_neighborhood_query_best_score,
        min_bm25_query_best_score=pc.min_bm25_query_best_score,
        min_merged_max_score=pc.min_merged_max_score,
    )
    if merged and pc.merged_score_ratio > 0.0:
        merged_top = float(merged[0].score or 0.0)
        merged_floor = merged_top * pc.merged_score_ratio
        merged = [
            atom for atom in merged if float(atom.score or 0.0) >= merged_floor
        ]
    if merged and pc.mmr_lambda < 1.0:
        vectors = await self.vector_store.afetch_vectors(
            [atom.atom_id for atom in merged]
        )
        merged = _mmr_rerank(
            merged,
            vectors,
            mmr_lambda=pc.mmr_lambda,
            max_atoms=pc.max_atoms,
            core_weight=qc.fusion_core_weight,
            neighborhood_weight=qc.fusion_neighborhood_weight,
        )
    elif pc.max_atoms > 0:
        merged = merged[: pc.max_atoms]
    source_iris = _source_iris_from_atoms(merged)

    if not expand_sparql or self.sparql_tool is None:
        return RDFGraph(), source_iris

    if not merged:
        return RDFGraph(), []

    entity_uris, entity_relevance = _ranked_entity_weights(merged)
    ontology_iris = sorted(
        {atom.ontology_iri for atom in merged if atom.ontology_iri}
    )
    ontology_version_filters: dict[str, set[str]] = {}
    ontology_hash_filters: dict[str, set[str]] = {}
    for atom in merged:
        if atom.ontology_iri and atom.ontology_version:
            ontology_version_filters.setdefault(atom.ontology_iri, set()).add(
                str(atom.ontology_version)
            )
        if atom.ontology_iri and atom.ontology_hash:
            ontology_hash_filters.setdefault(atom.ontology_iri, set()).add(
                atom.ontology_hash
            )

    graph = await self.sparql_tool.aget_induced_subgraph(
        entity_uris=entity_uris,
        entity_relevance=entity_relevance,
        ontology_iris=ontology_iris,
        depth=subgraph_depth,
        max_total_triples=max_total_triples,
        estimated_triples_per_query=estimated_triples_per_query,
        ontology_version_filters=ontology_version_filters or None,
        ontology_hash_filters=ontology_hash_filters or None,
    )
    _bind_common_vocab_prefixes(graph)
    return graph, source_iris

retrieve(query, top_k=None, expand_sparql=True, subgraph_depth=1, max_total_triples=300, estimated_triples_per_query=24)

Retrieve top-k hits for one query and optional induced subgraph; returns source ontology IRIs.

Source code in ontocast/tool/vector_store/patch_retriever.py
def retrieve(
    self,
    query: str,
    top_k: int | None = None,
    expand_sparql: bool = True,
    subgraph_depth: int = 1,
    max_total_triples: int = 300,
    estimated_triples_per_query: int = 24,
) -> tuple[RDFGraph, list[str]]:
    """Retrieve top-k hits for one query and optional induced subgraph; returns source ontology IRIs."""
    try:
        asyncio.get_running_loop()
    except RuntimeError:
        return asyncio.run(
            self.aretrieve(
                query=query,
                top_k=top_k,
                expand_sparql=expand_sparql,
                subgraph_depth=subgraph_depth,
                max_total_triples=max_total_triples,
                estimated_triples_per_query=estimated_triples_per_query,
            )
        )
    raise RuntimeError(
        "retrieve() cannot be called from async code; use await aretrieve()"
    )

retrieve_ensemble(queries, top_k=None, expand_sparql=True, subgraph_depth=1, max_total_triples=300, estimated_triples_per_query=24)

Source code in ontocast/tool/vector_store/patch_retriever.py
def retrieve_ensemble(
    self,
    queries: list[str],
    top_k: int | None = None,
    expand_sparql: bool = True,
    subgraph_depth: int = 1,
    max_total_triples: int = 300,
    estimated_triples_per_query: int = 24,
) -> tuple[RDFGraph, list[str]]:
    """Sync: one induced graph and source IRIs for the union of vector hits over ``queries``."""
    try:
        asyncio.get_running_loop()
    except RuntimeError:
        return asyncio.run(
            self.aretrieve_ensemble(
                queries=queries,
                top_k=top_k,
                expand_sparql=expand_sparql,
                subgraph_depth=subgraph_depth,
                max_total_triples=max_total_triples,
                estimated_triples_per_query=estimated_triples_per_query,
            )
        )
    raise RuntimeError(
        "retrieve_ensemble() is not allowed inside async code; use aretrieve_ensemble()"
    )

QdrantVectorStore

Bases: VectorStoreTool

Stores ontology atoms in Qdrant and supports similarity lookup.

Source code in ontocast/tool/vector_store/qdrant.py
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
class QdrantVectorStore(VectorStoreTool):
    """Stores ontology atoms in Qdrant and supports similarity lookup."""

    config: QdrantConfig = Field(default_factory=QdrantConfig)
    embedding: EmbeddingTool = Field(..., exclude=True)
    sparse_embedding: FastembedBm25SparseTool | None = Field(default=None, exclude=True)
    atomizer: GraphAtomizer = Field(default_factory=GraphAtomizer, exclude=True)
    _client: QdrantClient | None = PrivateAttr(default=None)

    @property
    def embedding_config(self) -> EmbeddingConfig:
        return self.embedding.config

    def _require_sparse_embedding_tool(self) -> FastembedBm25SparseTool:
        if self.sparse_embedding is None:
            raise ValueError(
                "BM25 sparse embedding is required for vector search but "
                "sparse_embedding was not wired"
            )
        return self.sparse_embedding

    def _encode_single_query_vectors(
        self, query: str
    ) -> tuple[list[float], list[float], qdrant_models.SparseVector]:
        triples = self._encode_query_vectors_batch([query])
        return triples[0]

    def _encode_query_vectors_batch(
        self, queries: list[str]
    ) -> list[tuple[list[float], list[float], qdrant_models.SparseVector]]:
        n = len(queries)
        if n == 0:
            return []
        dense_vecs = self.embedding.embed(queries)
        if len(dense_vecs) != n:
            raise ValueError(
                "Embedding provider returned mismatched vectors for queries"
            )
        for i, vec in enumerate(dense_vecs):
            self._require_embedding_vector_length(vec, role=f"Query embedding[{i}]")
        sparse_vecs = self._require_sparse_embedding_tool().embed_sparse(queries)
        if len(sparse_vecs) != n:
            raise ValueError(
                "BM25 embedder returned mismatched sparse vectors for queries"
            )
        return [(dense_vecs[i], dense_vecs[i], sparse_vecs[i]) for i in range(n)]

    def _normalized_fusion_weights(self) -> tuple[float, float, float]:
        """Weights for core / neighborhood / BM25 reciprocal-rank fusion (sum to 1)."""
        cw = self.config.fusion_core_weight
        nw = self.config.fusion_neighborhood_weight
        bw = self.config.fusion_bm25_weight
        total = cw + nw + bw
        if total <= 0.0:
            return (1.0 / 3.0, 1.0 / 3.0, 1.0 / 3.0)
        return (cw / total, nw / total, bw / total)

    @property
    def client(self) -> QdrantClient:
        if self._client is None:
            if self.config.uri is None:
                raise ValueError(
                    "Qdrant URI is required to initialize vector store client"
                )
            self._client = QdrantClient(
                url=self.config.uri,
                api_key=self.config.api_key,
                grpc_port=self.config.grpc_port,
                prefer_grpc=self.config.use_grpc,
            )
        return self._client

    def _ontology_collection_name(self) -> str:
        name = self.config.ontology_collection
        if name is None:
            raise ValueError(
                "Qdrant ontology_collection is unset; ensure QdrantConfig validation"
                " ran or call apply_tenancy before vector operations"
            )
        return name

    def supports_tenancy_partition(self) -> bool:
        return True

    async def initialize(self) -> None:
        """Create ontology/facts collections and payload indexes if missing."""
        ontology_col = self.config.ontology_collection
        facts_col = self.config.facts_collection
        assert ontology_col is not None
        assert facts_col is not None
        self._ensure_named_vector_collection(ontology_col)
        self._ensure_named_vector_collection(facts_col)

        self._ensure_payload_index(
            collection_name=ontology_col, field_name="ontology_iri"
        )
        self._ensure_payload_index(
            collection_name=ontology_col, field_name="ontology_version"
        )
        self._ensure_payload_index(
            collection_name=ontology_col, field_name="ontology_hash"
        )
        self._ensure_payload_index(collection_name=ontology_col, field_name="iri")

    async def clean_tenancy(
        self,
        tenant: str,
        project: str,
        *,
        sep: str = TENANCY_SEP,
    ) -> None:
        """Delete Qdrant collections named for ``tenant`` / ``project``."""
        t, p = tenant.strip(), project.strip()
        for name in (
            tenant_project_ontologies_name(t, p, sep=sep),
            tenant_project_facts_name(t, p, sep=sep),
        ):
            if self.client.collection_exists(collection_name=name):
                self.client.delete_collection(collection_name=name)
                logger.info("Deleted Qdrant collection %s", name)

    def apply_tenancy(
        self,
        tenant: str,
        project: str,
        *,
        sep: str = TENANCY_SEP,
    ) -> None:
        """Point config at collections for ``tenant`` / ``project``.

        Call :meth:`initialize` after.
        """
        t, p = tenant.strip(), project.strip()
        self.config.ontology_collection = tenant_project_ontologies_name(t, p, sep=sep)
        self.config.facts_collection = tenant_project_facts_name(t, p, sep=sep)

    def _dense_dimension(self) -> int:
        """Dense vector length for ``VectorParams`` and dense query validation."""
        return self.config.vector_size or self.embedding_config.dimension

    def _metadata_embedding_dimension(self) -> int:
        """Dense dimension stored in collection metadata."""
        return self._dense_dimension()

    def _embedding_model_fingerprint(self) -> str:
        ec = self.embedding_config
        dense_part = f"dense:{ec.provider.value}:{ec.model_name}"
        return f"{dense_part}|bm25={ec.bm25_model_name}"

    def _embedding_fingerprint_matches(self, stored: str) -> bool:
        return stored == self._embedding_model_fingerprint()

    def _collection_embedding_metadata(self, metadata_dim: int) -> dict[str, Any]:
        return {
            QDRANT_META_EMBEDDING_DIMENSION: metadata_dim,
            QDRANT_META_EMBEDDING_MODEL: self._embedding_model_fingerprint(),
        }

    def _coerce_metadata_int(self, value: Any, *, field: str, collection: str) -> int:
        if type(value) is bool:
            raise ValueError(
                f"Qdrant collection '{collection}' metadata {field!r} has invalid type"
            )
        if isinstance(value, int):
            return value
        if isinstance(value, float) and value.is_integer():
            return int(value)
        if isinstance(value, str):
            try:
                return int(value.strip(), 10)
            except ValueError as exc:
                raise ValueError(
                    f"Qdrant collection '{collection}' metadata {field!r} "
                    "is not an integer"
                ) from exc
        raise ValueError(
            f"Qdrant collection '{collection}' metadata {field!r} has invalid type"
        )

    def _validate_existing_embedding_contract(
        self, collection: str, info: qdrant_models.CollectionInfo
    ) -> None:
        raw = info.config.metadata
        if raw is None:
            meta = {}
        elif isinstance(raw, Mapping):
            meta = dict(raw)
        else:
            raise ValueError(
                f"Qdrant collection '{collection}' has unsupported metadata type "
                f"{type(raw).__name__}"
            )
        dim_key = QDRANT_META_EMBEDDING_DIMENSION
        model_key = QDRANT_META_EMBEDDING_MODEL
        if dim_key not in meta or model_key not in meta:
            raise EmbeddingContractMismatchError(
                f"Qdrant collection '{collection}' is missing OntoCast "
                f"embedding metadata ({dim_key!r}, {model_key!r}). "
                "Drop and recreate the collection. " + _embedding_contract_help()
            )
        stored_dim = self._coerce_metadata_int(
            meta[dim_key], field=dim_key, collection=collection
        )
        stored_model = meta[model_key]
        if not isinstance(stored_model, str):
            raise ValueError(
                f"Qdrant collection '{collection}' metadata {model_key!r} "
                "must be a string"
            )
        expected_meta_dim = self._metadata_embedding_dimension()
        if stored_dim != expected_meta_dim or not self._embedding_fingerprint_matches(
            stored_model
        ):
            raise EmbeddingContractMismatchError(
                f"Qdrant collection '{collection}' embedding contract mismatch: "
                f"collection has dimension={stored_dim}, model={stored_model!r}; "
                f"current config expects dimension={expected_meta_dim}, "
                f"model={self._embedding_model_fingerprint()!r}. "
                + _embedding_contract_help()
            )

    def _vectors_and_sparse_for_create(
        self,
    ) -> tuple[
        dict[str, qdrant_models.VectorParams],
        dict[str, qdrant_models.SparseVectorParams],
    ]:
        distance = self.config.distance
        dense_dim = self._dense_dimension()
        vectors: dict[str, qdrant_models.VectorParams] = {
            CORE_VECTOR_NAME: qdrant_models.VectorParams(
                size=dense_dim, distance=distance
            ),
            NEIGHBORHOOD_VECTOR_NAME: qdrant_models.VectorParams(
                size=dense_dim, distance=distance
            ),
        }
        # BM25 sparse scoring on plain dot product (no sparse modifier).
        sparse: dict[str, qdrant_models.SparseVectorParams] = {
            BM25_VECTOR_NAME: qdrant_models.SparseVectorParams(modifier=None)
        }
        return (vectors, sparse)

    def _validate_collection_vector_layout(
        self, collection: str, info: qdrant_models.CollectionInfo
    ) -> None:
        distance = self.config.distance
        dense_dim = self._dense_dimension()
        params = info.config.params
        raw_vectors = params.vectors
        vectors_map: dict[str, qdrant_models.VectorParams] = (
            dict(raw_vectors) if isinstance(raw_vectors, dict) else {}
        )
        raw_sparse = params.sparse_vectors
        sparse_map: dict[str, qdrant_models.SparseVectorParams] = (
            dict(raw_sparse) if isinstance(raw_sparse, dict) else {}
        )

        def _require_dense(name: str) -> None:
            if name not in vectors_map:
                raise ValueError(
                    f"Qdrant collection '{collection}' missing dense vector {name!r}; "
                    f"have dense keys {set(vectors_map.keys())}"
                )
            cfg = vectors_map[name]
            if cfg.size != dense_dim:
                raise EmbeddingContractMismatchError(
                    f"Qdrant collection '{collection}' vector {name!r} size "
                    f"{cfg.size} does not match configured dense size {dense_dim}. "
                    + _embedding_contract_help()
                )
            if cfg.distance != distance:
                raise ValueError(
                    f"Qdrant collection '{collection}' vector {name!r} "
                    f"uses distance {cfg.distance!r}; config expects {distance!r}."
                )

        _require_dense(CORE_VECTOR_NAME)
        _require_dense(NEIGHBORHOOD_VECTOR_NAME)

        bm25_cfg = sparse_map.get(BM25_VECTOR_NAME)
        if bm25_cfg is None:
            raise ValueError(
                f"Qdrant collection '{collection}' missing sparse vector "
                f"{BM25_VECTOR_NAME!r}; have sparse keys {set(sparse_map.keys())}"
            )
        if bm25_cfg.modifier is not None:
            raise ValueError(
                f"Qdrant collection '{collection}' sparse vector {BM25_VECTOR_NAME!r} "
                f"uses modifier {bm25_cfg.modifier!r}; expected no modifier "
                "(dot-product sparse scoring). Recreate the collection."
            )

    def _ensure_named_vector_collection(self, collection: str) -> None:
        metadata_dim = self._metadata_embedding_dimension()
        embedding_meta = self._collection_embedding_metadata(metadata_dim)
        vectors_cfg, sparse_cfg = self._vectors_and_sparse_for_create()
        if not self.client.collection_exists(collection_name=collection):
            self.client.create_collection(
                collection_name=collection,
                vectors_config=vectors_cfg,
                sparse_vectors_config=sparse_cfg,
                metadata=embedding_meta,
            )
            logger.info(
                "Created Qdrant collection '%s' metadata_dim=%s distance=%s model=%s",
                collection,
                metadata_dim,
                self.config.distance.value,
                embedding_meta[QDRANT_META_EMBEDDING_MODEL],
            )
        else:
            info = self.client.get_collection(collection_name=collection)
            self._validate_collection_vector_layout(collection, info)
            self._validate_existing_embedding_contract(collection, info)

    def index_ontology(self, ontology: Ontology) -> int:
        """Atomize + embed + upsert ontology neighborhoods."""
        atoms = self.atomizer.atomize(source=ontology, depth=1)
        if not atoms:
            return 0
        core_texts = [atom.core_representation for atom in atoms]
        neighborhood_texts = [atom.neighborhood_representation for atom in atoms]
        minimal_texts = [atom.minimal_representation for atom in atoms]

        core_vectors = self._embed_texts_batched(core_texts)
        neighborhood_vectors = self._embed_texts_batched(neighborhood_texts)
        bm25_vectors = self._embed_texts_batched_sparse(minimal_texts)

        if len(core_vectors) != len(atoms) or len(neighborhood_vectors) != len(atoms):
            raise ValueError(
                "Embedding provider returned mismatched vector counts for atoms"
            )
        if len(bm25_vectors) != len(atoms):
            raise ValueError(
                "BM25 embedder returned mismatched sparse vector counts for atoms"
            )

        points: list[qdrant_models.PointStruct] = []
        for i, atom in enumerate(atoms):
            vec_map: dict[str, Any] = {
                CORE_VECTOR_NAME: core_vectors[i],
                NEIGHBORHOOD_VECTOR_NAME: neighborhood_vectors[i],
                BM25_VECTOR_NAME: bm25_vectors[i],
            }
            points.append(
                qdrant_models.PointStruct(
                    id=self._point_id_for_atom(atom),
                    vector=vec_map,
                    payload=self._atom_payload(atom),
                )
            )
        collection = self._ontology_collection_name()
        for points_batch in self._iter_batches(points, self.config.upsert_batch_size):
            self.client.upsert(collection_name=collection, points=points_batch)
        return len(points)

    def search_patches(
        self,
        query: str,
        top_k: int | None = None,
        filter_iri: str | None = None,
        filter_version: str | None = None,
        filter_hash: str | None = None,
    ) -> list[GraphAtom]:
        """Search ontology atoms by text query using weighted multi-vector fusion."""
        core_q, neigh_q, bm25_q = self._encode_single_query_vectors(query)
        return self.search_by_vector(
            core_vector=core_q,
            neighborhood_vector=neigh_q,
            bm25_query_vector=bm25_q,
            top_k=top_k,
            filter_iri=filter_iri,
            filter_version=filter_version,
            filter_hash=filter_hash,
        )

    def search_patch_hits(
        self,
        query: str,
        top_k: int | None = None,
        filter_iri: str | None = None,
        filter_version: str | None = None,
        filter_hash: str | None = None,
    ) -> list[OntologySearchHit]:
        """Search ontology atoms and return rank-fused scored hit objects."""
        core_q, neigh_q, bm25_q = self._encode_single_query_vectors(query)
        channel_hits = self.search_hits_by_vector(
            core_vector=core_q,
            neighborhood_vector=neigh_q,
            bm25_query_vector=bm25_q,
            top_k=top_k,
            filter_iri=filter_iri,
            filter_version=filter_version,
            filter_hash=filter_hash,
        )
        eff_top_k = self._effective_top_k(top_k)
        return self._rank_fuse_channel_hits(
            channel_hits.core_hits,
            channel_hits.neighborhood_hits,
            channel_hits.bm25_hits,
            limit=eff_top_k,
        )

    def _search_patch_hits_for_query_triples(
        self,
        triples: list[tuple[list[float], list[float], qdrant_models.SparseVector]],
        top_k: int,
        filter_iri: str | None,
        filter_version: str | None,
        filter_hash: str | None,
    ) -> list[OntologySearchHitsByChannel]:
        """Run split-channel search per query (dense core/neighborhood + BM25)."""
        if not triples:
            return []

        def search_one(
            t: tuple[list[float], list[float], qdrant_models.SparseVector],
        ) -> OntologySearchHitsByChannel:
            core_v, neigh_v, bm25_v = t
            return self.search_hits_by_vector(
                core_vector=core_v,
                neighborhood_vector=neigh_v,
                bm25_query_vector=bm25_v,
                top_k=top_k,
                filter_iri=filter_iri,
                filter_version=filter_version,
                filter_hash=filter_hash,
            )

        workers = min(32, len(triples))
        with ThreadPoolExecutor(max_workers=workers) as pool:
            return list(pool.map(search_one, triples))

    def _search_patch_hits_many_impl(
        self,
        queries: list[str],
        top_k: int | None,
        filter_iri: str | None,
        filter_version: str | None,
        filter_hash: str | None,
    ) -> list[OntologySearchHitsByChannel]:
        if not queries:
            return []

        eff_top_k = self._effective_top_k(top_k)
        triples = self._encode_query_vectors_batch(queries)
        return self._search_patch_hits_for_query_triples(
            triples,
            eff_top_k,
            filter_iri,
            filter_version,
            filter_hash,
        )

    def search_patch_hits_many(
        self,
        queries: list[str],
        top_k: int | None = None,
        filter_iri: str | None = None,
        filter_version: str | None = None,
        filter_hash: str | None = None,
    ) -> list[OntologySearchHitsByChannel]:
        """Search ontology atoms for many queries with split-channel outputs."""
        return self._search_patch_hits_many_impl(
            queries,
            top_k,
            filter_iri,
            filter_version,
            filter_hash,
        )

    async def asearch_patch_hits_many(
        self,
        queries: list[str],
        top_k: int | None = None,
        filter_iri: str | None = None,
        filter_version: str | None = None,
        filter_hash: str | None = None,
    ) -> list[OntologySearchHitsByChannel]:
        """Async variant: one batched embed, then parallel split-channel searches."""
        if not queries:
            return []
        eff_top_k = self._effective_top_k(top_k)
        triples = await asyncio.to_thread(self._encode_query_vectors_batch, queries)
        tasks = [
            asyncio.to_thread(
                self.search_hits_by_vector,
                core_v,
                neigh_v,
                bm25_v,
                eff_top_k,
                filter_iri,
                filter_version,
                filter_hash,
            )
            for core_v, neigh_v, bm25_v in triples
        ]
        return await asyncio.gather(*tasks)

    def _parse_dense_vector(self, raw: Any) -> list[float] | None:
        if isinstance(raw, list):
            if not raw or not all(isinstance(v, int | float) for v in raw):
                return None
            return [float(v) for v in cast(list[int | float], raw)]
        return None

    def fetch_vectors(
        self,
        atom_ids: list[str],
    ) -> dict[str, tuple[list[float], list[float]]]:
        """Batch-fetch dense core/neighborhood vectors for MMR (BM25 not used)."""
        if not atom_ids:
            return {}
        point_id_to_atom_id = {self._point_id(atom_id): atom_id for atom_id in atom_ids}
        points = self.client.retrieve(
            collection_name=self._ontology_collection_name(),
            ids=list(point_id_to_atom_id.keys()),
            with_vectors=True,
            with_payload=False,
        )
        out: dict[str, tuple[list[float], list[float]]] = {}
        for point in points:
            atom_id = point_id_to_atom_id.get(str(point.id))
            if atom_id is None:
                continue
            point_vector = point.vector
            if not isinstance(point_vector, dict):
                continue
            core_raw = point_vector.get(CORE_VECTOR_NAME)
            neighborhood_raw = point_vector.get(NEIGHBORHOOD_VECTOR_NAME)
            core = self._parse_dense_vector(core_raw)
            neighborhood = self._parse_dense_vector(neighborhood_raw)
            if core is None or neighborhood is None:
                continue
            out[atom_id] = (core, neighborhood)
        return out

    async def afetch_vectors(
        self,
        atom_ids: list[str],
    ) -> dict[str, tuple[list[float], list[float]]]:
        """Async wrapper around :meth:`fetch_vectors`."""
        return await asyncio.to_thread(self.fetch_vectors, atom_ids)

    def search_by_vector(
        self,
        core_vector: list[float],
        neighborhood_vector: list[float],
        bm25_query_vector: qdrant_models.SparseVector | None = None,
        top_k: int | None = None,
        filter_iri: str | None = None,
        filter_version: str | None = None,
        filter_hash: str | None = None,
    ) -> list[GraphAtom]:
        """Search ontology atoms with rank fusion over named vectors."""
        channel_hits = self.search_hits_by_vector(
            core_vector=core_vector,
            neighborhood_vector=neighborhood_vector,
            bm25_query_vector=bm25_query_vector,
            top_k=top_k,
            filter_iri=filter_iri,
            filter_version=filter_version,
            filter_hash=filter_hash,
        )
        eff_top_k = self._effective_top_k(top_k)
        fused_hits = self._rank_fuse_channel_hits(
            channel_hits.core_hits,
            channel_hits.neighborhood_hits,
            channel_hits.bm25_hits,
            limit=eff_top_k,
        )
        return [hit.atom for hit in fused_hits]

    def search_hits_by_vector(
        self,
        core_vector: list[float],
        neighborhood_vector: list[float],
        bm25_query_vector: qdrant_models.SparseVector | None = None,
        top_k: int | None = None,
        filter_iri: str | None = None,
        filter_version: str | None = None,
        filter_hash: str | None = None,
    ) -> OntologySearchHitsByChannel:
        """Search ontology atoms and return channel-separated scored hit objects."""
        eff_top_k = self._effective_top_k(top_k)
        self._require_embedding_vector_length(core_vector, role="Query core vector")
        self._require_embedding_vector_length(
            neighborhood_vector, role="Query neighborhood vector"
        )
        search_filter = self._build_filter(
            filter_iri=filter_iri,
            filter_version=filter_version,
            filter_hash=filter_hash,
        )
        core_hits = self._query_named_vector(
            vector_name=CORE_VECTOR_NAME,
            vector=core_vector,
            limit=eff_top_k,
            search_filter=search_filter,
        )
        neighborhood_hits = self._query_named_vector(
            vector_name=NEIGHBORHOOD_VECTOR_NAME,
            vector=neighborhood_vector,
            limit=eff_top_k,
            search_filter=search_filter,
        )
        bm25_hits_raw: list[Any] = []
        if bm25_query_vector is not None:
            bm25_hits_raw = self._query_named_vector(
                vector_name=BM25_VECTOR_NAME,
                vector=bm25_query_vector,
                limit=eff_top_k,
                search_filter=search_filter,
            )
        core_typed_hits = self._points_to_hits(core_hits)
        neighborhood_typed_hits = self._points_to_hits(
            neighborhood_hits, apply_neighborhood_empty_penalty=True
        )
        bm25_typed_hits = self._points_to_hits(bm25_hits_raw)
        if self.config.dedup_query_hits_by_iri:
            core_typed_hits = self._dedupe_hits_by_identity(core_typed_hits)
            neighborhood_typed_hits = self._dedupe_hits_by_identity(
                neighborhood_typed_hits
            )
            bm25_typed_hits = self._dedupe_hits_by_identity(bm25_typed_hits)
        return OntologySearchHitsByChannel(
            core_hits=core_typed_hits,
            neighborhood_hits=neighborhood_typed_hits,
            bm25_hits=bm25_typed_hits,
        )

    def _points_to_hits(
        self,
        points: list[Any],
        *,
        apply_neighborhood_empty_penalty: bool = False,
    ) -> list[OntologySearchHit]:
        hits: list[OntologySearchHit] = []
        for point in points:
            score = float(point.score) if point.score is not None else 0.0
            if apply_neighborhood_empty_penalty:
                payload = point.payload or {}
                neighborhood_text = str(payload.get("neighborhood_representation", ""))
                if (
                    neighborhood_text.strip().lower()
                    == "no neighborhood facts available"
                ):
                    score = 0.0
            atom = self._point_to_atom(point)
            atom.score = score
            hits.append(OntologySearchHit(atom=atom, score=score))
        return hits

    def _rank_fuse_channel_hits(
        self,
        core_hits: list[OntologySearchHit],
        neighborhood_hits: list[OntologySearchHit],
        bm25_hits: list[OntologySearchHit],
        *,
        limit: int,
    ) -> list[OntologySearchHit]:
        rank_scores: dict[str, float] = {}
        best_hit_by_id: dict[str, OntologySearchHit] = {}

        core_weight, neighborhood_weight, bm25_weight = (
            self._normalized_fusion_weights()
        )
        for rank, hit in enumerate(core_hits, start=1):
            atom_id = hit.atom.atom_id
            rank_scores[atom_id] = rank_scores.get(atom_id, 0.0) + (core_weight / rank)
            prev = best_hit_by_id.get(atom_id)
            if prev is None or hit.score > prev.score:
                best_hit_by_id[atom_id] = hit
        for rank, hit in enumerate(neighborhood_hits, start=1):
            atom_id = hit.atom.atom_id
            rank_scores[atom_id] = rank_scores.get(atom_id, 0.0) + (
                neighborhood_weight / rank
            )
            prev = best_hit_by_id.get(atom_id)
            if prev is None or hit.score > prev.score:
                best_hit_by_id[atom_id] = hit
        for rank, hit in enumerate(bm25_hits, start=1):
            atom_id = hit.atom.atom_id
            rank_scores[atom_id] = rank_scores.get(atom_id, 0.0) + (bm25_weight / rank)
            prev = best_hit_by_id.get(atom_id)
            if prev is None or hit.score > prev.score:
                best_hit_by_id[atom_id] = hit

        ranked_atom_ids = sorted(
            rank_scores.keys(),
            key=lambda atom_id: (
                rank_scores[atom_id],
                float(best_hit_by_id[atom_id].score),
                atom_id,
            ),
            reverse=True,
        )[:limit]
        out: list[OntologySearchHit] = []
        for atom_id in ranked_atom_ids:
            source_hit = best_hit_by_id[atom_id]
            atom = source_hit.atom.model_copy(update={"score": rank_scores[atom_id]})
            out.append(OntologySearchHit(atom=atom, score=rank_scores[atom_id]))
        return out

    def delete_ontology(
        self,
        iri: str,
        version: str | None = None,
        ontology_hash: str | None = None,
    ) -> None:
        """Delete atoms associated with one ontology IRI and optional version/hash."""
        delete_filter = self._build_filter(
            filter_iri=iri, filter_version=version, filter_hash=ontology_hash
        )
        if delete_filter is None:
            return
        self.client.delete(
            collection_name=self._ontology_collection_name(),
            points_selector=qdrant_models.FilterSelector(filter=delete_filter),
        )

    def reindex_ontology(self, ontology: Ontology) -> int:
        """Replace all atoms for a given ontology and return indexed count."""
        self.delete_ontology(ontology.iri)
        return self.index_ontology(ontology)

    def _build_filter(
        self,
        filter_iri: str | None = None,
        filter_version: str | None = None,
        filter_hash: str | None = None,
    ) -> qdrant_models.Filter | None:
        conditions: list[qdrant_models.Condition] = []
        if filter_iri is not None:
            conditions.append(
                qdrant_models.FieldCondition(
                    key="ontology_iri", match=qdrant_models.MatchValue(value=filter_iri)
                )
            )
        if filter_version is not None:
            conditions.append(
                qdrant_models.FieldCondition(
                    key="ontology_version",
                    match=qdrant_models.MatchValue(value=filter_version),
                )
            )
        if filter_hash is not None:
            conditions.append(
                qdrant_models.FieldCondition(
                    key="ontology_hash",
                    match=qdrant_models.MatchValue(value=filter_hash),
                )
            )
        if not conditions:
            return None
        return qdrant_models.Filter(must=conditions)

    def _point_to_atom(self, point: Any) -> GraphAtom:
        payload = point.payload or {}
        created_at_raw = payload.get("created_at")
        created_at = self._parse_created_at(created_at_raw)
        return GraphAtom(
            atom_id=str(payload.get("atom_id", point.id)),
            ontology_iri=str(payload.get("ontology_iri", "")),
            ontology_id=payload.get("ontology_id"),
            ontology_hash=payload.get("ontology_hash"),
            ontology_version=payload.get("ontology_version"),
            iri=str(payload.get("iri", "")),
            entity_role=canonicalize_entity_role(payload.get("entity_role")),
            core_representation=str(payload.get("core_representation", "")),
            minimal_representation=str(payload.get("minimal_representation", "")),
            neighborhood_representation=str(
                payload.get("neighborhood_representation", "")
            ),
            created_at=created_at,
            score=float(point.score) if point.score is not None else None,
        )

    def _atom_payload(self, atom: GraphAtom) -> dict[str, Any]:
        return {
            "atom_id": atom.atom_id,
            "ontology_iri": atom.ontology_iri,
            "ontology_id": atom.ontology_id,
            "ontology_hash": atom.ontology_hash,
            "ontology_version": atom.ontology_version,
            "iri": atom.iri,
            "entity_role": canonicalize_entity_role(atom.entity_role),
            "core_representation": atom.core_representation,
            "minimal_representation": atom.minimal_representation,
            "neighborhood_representation": atom.neighborhood_representation,
            "created_at": atom.created_at.isoformat(),
        }

    def _parse_created_at(self, value: Any) -> datetime:
        if isinstance(value, datetime):
            return value
        if isinstance(value, str):
            try:
                return datetime.fromisoformat(value.replace("Z", "+00:00"))
            except ValueError:
                pass
        return datetime.now(timezone.utc)

    def _effective_top_k(self, top_k: int | None) -> int:
        """Resolve retrieval depth: explicit ``top_k`` overrides :attr:`QdrantConfig.top_k`."""
        if top_k is not None:
            return top_k
        return self.config.top_k

    def _require_embedding_vector_length(
        self,
        vector: list[float],
        *,
        role: str,
    ) -> None:
        expected = self._dense_dimension()
        if len(vector) != expected:
            raise EmbeddingContractMismatchError(
                f"{role} vector length {len(vector)} does not match the configured "
                f"collection embedding dimension {expected}. "
                + _embedding_contract_help()
            )

    def _point_id(self, atom_id: str) -> str:
        """Return a Qdrant-compatible point id (UUID string)."""
        try:
            return str(uuid.UUID(atom_id))
        except ValueError:
            return str(uuid.uuid5(uuid.NAMESPACE_URL, atom_id))

    def _point_id_for_atom(self, atom: GraphAtom) -> str:
        if self.config.dedup_mode == QdrantDedupMode.ATOM_ID:
            return self._point_id(atom.atom_id)
        return self._point_id(self._identity_key_for_atom(atom))

    def _identity_key_for_atom(self, atom: GraphAtom) -> str:
        if self.config.dedup_mode == QdrantDedupMode.ATOM_ID:
            return atom.atom_id
        parts: list[str] = [
            atom.ontology_iri or "",
            atom.iri or "",
        ]
        if self.config.dedup_include_version:
            parts.append(atom.ontology_version or "")
        if self.config.dedup_include_hash:
            parts.append(atom.ontology_hash or "")
        return "|".join(parts)

    def _dedupe_hits_by_identity(
        self, hits: list[OntologySearchHit]
    ) -> list[OntologySearchHit]:
        if not hits:
            return []
        best_by_key: dict[str, OntologySearchHit] = {}
        order_index: dict[str, int] = {}
        for index, hit in enumerate(hits):
            key = self._identity_key_for_atom(hit.atom)
            previous = best_by_key.get(key)
            if previous is None:
                best_by_key[key] = hit
                order_index[key] = index
                continue
            if float(hit.score) > float(previous.score):
                best_by_key[key] = hit
        deduped = list(best_by_key.values())
        deduped.sort(
            key=lambda h: (
                -float(h.score),
                order_index[self._identity_key_for_atom(h.atom)],
            )
        )
        return deduped

    def delete_duplicate_iri_points(self, *, batch_size: int = 512) -> int:
        """Delete duplicate points sharing the same configured identity key.

        Keeps the first point for each key encountered in collection order.
        Intended as a one-off cleanup for collections created before strict dedup mode.
        """
        collection_name = self._ontology_collection_name()
        seen_by_key: dict[str, qdrant_models.ExtendedPointId] = {}
        duplicate_ids: list[qdrant_models.ExtendedPointId] = []
        offset: Any = None
        while True:
            points, next_offset = self.client.scroll(
                collection_name=collection_name,
                with_payload=True,
                with_vectors=False,
                offset=offset,
                limit=batch_size,
            )
            if not points:
                break
            for point in points:
                atom = self._point_to_atom(point)
                key = self._identity_key_for_atom(atom)
                if key in seen_by_key:
                    duplicate_ids.append(point.id)
                else:
                    seen_by_key[key] = point.id
            if next_offset is None:
                break
            offset = next_offset
        if not duplicate_ids:
            return 0
        self.client.delete(
            collection_name=collection_name,
            points_selector=qdrant_models.PointIdsList(points=duplicate_ids),
        )
        return len(duplicate_ids)

    def _query_named_vector(
        self,
        vector_name: str,
        vector: ChannelVector,
        limit: int,
        search_filter: qdrant_models.Filter | None,
    ) -> list[Any]:
        response = self.client.query_points(
            collection_name=self._ontology_collection_name(),
            query=vector,
            using=vector_name,
            query_filter=search_filter,
            with_payload=True,
            limit=limit,
        )
        return response.points

    def _embed_texts_batched(self, texts: list[str]) -> list[list[float]]:
        if not texts:
            return []
        vectors: list[list[float]] = []
        for batch in self._iter_batches(texts, self.config.embedding_batch_size):
            batch_vectors = self.embedding.embed(batch)
            if len(batch_vectors) != len(batch):
                raise ValueError(
                    "Embedding provider returned mismatched vectors for batch"
                )
            for j, vec in enumerate(batch_vectors):
                self._require_embedding_vector_length(
                    vec,
                    role=f"Index embedding batch offset {len(vectors) + j}",
                )
            vectors.extend(batch_vectors)
        return vectors

    def _embed_texts_batched_sparse(
        self, texts: list[str]
    ) -> list[qdrant_models.SparseVector]:
        if not texts:
            return []
        out: list[qdrant_models.SparseVector] = []
        sparse_tool = self._require_sparse_embedding_tool()
        for batch in self._iter_batches(texts, self.config.embedding_batch_size):
            batch_vectors = sparse_tool.embed_sparse(batch)
            if len(batch_vectors) != len(batch):
                raise ValueError(
                    "BM25 embedder returned mismatched sparse vectors for batch"
                )
            out.extend(batch_vectors)
        return out

    def _iter_batches(self, items: list[Any], batch_size: int) -> list[list[Any]]:
        batches: list[list[Any]] = []
        for index in range(0, len(items), batch_size):
            batches.append(items[index : index + batch_size])
        return batches

    def _ensure_payload_index(self, collection_name: str, field_name: str) -> None:
        try:
            self.client.create_payload_index(
                collection_name=collection_name,
                field_name=field_name,
                field_schema=qdrant_models.PayloadSchemaType.KEYWORD,
            )
        except Exception:
            logger.debug(
                "Qdrant payload index '%s' on '%s' already exists",
                field_name,
                collection_name,
            )

afetch_vectors(atom_ids) async

Async wrapper around :meth:fetch_vectors.

Source code in ontocast/tool/vector_store/qdrant.py
async def afetch_vectors(
    self,
    atom_ids: list[str],
) -> dict[str, tuple[list[float], list[float]]]:
    """Async wrapper around :meth:`fetch_vectors`."""
    return await asyncio.to_thread(self.fetch_vectors, atom_ids)

apply_tenancy(tenant, project, *, sep=TENANCY_SEP)

Point config at collections for tenant / project.

Call :meth:initialize after.

Source code in ontocast/tool/vector_store/qdrant.py
def apply_tenancy(
    self,
    tenant: str,
    project: str,
    *,
    sep: str = TENANCY_SEP,
) -> None:
    """Point config at collections for ``tenant`` / ``project``.

    Call :meth:`initialize` after.
    """
    t, p = tenant.strip(), project.strip()
    self.config.ontology_collection = tenant_project_ontologies_name(t, p, sep=sep)
    self.config.facts_collection = tenant_project_facts_name(t, p, sep=sep)

asearch_patch_hits_many(queries, top_k=None, filter_iri=None, filter_version=None, filter_hash=None) async

Async variant: one batched embed, then parallel split-channel searches.

Source code in ontocast/tool/vector_store/qdrant.py
async def asearch_patch_hits_many(
    self,
    queries: list[str],
    top_k: int | None = None,
    filter_iri: str | None = None,
    filter_version: str | None = None,
    filter_hash: str | None = None,
) -> list[OntologySearchHitsByChannel]:
    """Async variant: one batched embed, then parallel split-channel searches."""
    if not queries:
        return []
    eff_top_k = self._effective_top_k(top_k)
    triples = await asyncio.to_thread(self._encode_query_vectors_batch, queries)
    tasks = [
        asyncio.to_thread(
            self.search_hits_by_vector,
            core_v,
            neigh_v,
            bm25_v,
            eff_top_k,
            filter_iri,
            filter_version,
            filter_hash,
        )
        for core_v, neigh_v, bm25_v in triples
    ]
    return await asyncio.gather(*tasks)

clean_tenancy(tenant, project, *, sep=TENANCY_SEP) async

Delete Qdrant collections named for tenant / project.

Source code in ontocast/tool/vector_store/qdrant.py
async def clean_tenancy(
    self,
    tenant: str,
    project: str,
    *,
    sep: str = TENANCY_SEP,
) -> None:
    """Delete Qdrant collections named for ``tenant`` / ``project``."""
    t, p = tenant.strip(), project.strip()
    for name in (
        tenant_project_ontologies_name(t, p, sep=sep),
        tenant_project_facts_name(t, p, sep=sep),
    ):
        if self.client.collection_exists(collection_name=name):
            self.client.delete_collection(collection_name=name)
            logger.info("Deleted Qdrant collection %s", name)

delete_duplicate_iri_points(*, batch_size=512)

Delete duplicate points sharing the same configured identity key.

Keeps the first point for each key encountered in collection order. Intended as a one-off cleanup for collections created before strict dedup mode.

Source code in ontocast/tool/vector_store/qdrant.py
def delete_duplicate_iri_points(self, *, batch_size: int = 512) -> int:
    """Delete duplicate points sharing the same configured identity key.

    Keeps the first point for each key encountered in collection order.
    Intended as a one-off cleanup for collections created before strict dedup mode.
    """
    collection_name = self._ontology_collection_name()
    seen_by_key: dict[str, qdrant_models.ExtendedPointId] = {}
    duplicate_ids: list[qdrant_models.ExtendedPointId] = []
    offset: Any = None
    while True:
        points, next_offset = self.client.scroll(
            collection_name=collection_name,
            with_payload=True,
            with_vectors=False,
            offset=offset,
            limit=batch_size,
        )
        if not points:
            break
        for point in points:
            atom = self._point_to_atom(point)
            key = self._identity_key_for_atom(atom)
            if key in seen_by_key:
                duplicate_ids.append(point.id)
            else:
                seen_by_key[key] = point.id
        if next_offset is None:
            break
        offset = next_offset
    if not duplicate_ids:
        return 0
    self.client.delete(
        collection_name=collection_name,
        points_selector=qdrant_models.PointIdsList(points=duplicate_ids),
    )
    return len(duplicate_ids)

delete_ontology(iri, version=None, ontology_hash=None)

Delete atoms associated with one ontology IRI and optional version/hash.

Source code in ontocast/tool/vector_store/qdrant.py
def delete_ontology(
    self,
    iri: str,
    version: str | None = None,
    ontology_hash: str | None = None,
) -> None:
    """Delete atoms associated with one ontology IRI and optional version/hash."""
    delete_filter = self._build_filter(
        filter_iri=iri, filter_version=version, filter_hash=ontology_hash
    )
    if delete_filter is None:
        return
    self.client.delete(
        collection_name=self._ontology_collection_name(),
        points_selector=qdrant_models.FilterSelector(filter=delete_filter),
    )

fetch_vectors(atom_ids)

Batch-fetch dense core/neighborhood vectors for MMR (BM25 not used).

Source code in ontocast/tool/vector_store/qdrant.py
def fetch_vectors(
    self,
    atom_ids: list[str],
) -> dict[str, tuple[list[float], list[float]]]:
    """Batch-fetch dense core/neighborhood vectors for MMR (BM25 not used)."""
    if not atom_ids:
        return {}
    point_id_to_atom_id = {self._point_id(atom_id): atom_id for atom_id in atom_ids}
    points = self.client.retrieve(
        collection_name=self._ontology_collection_name(),
        ids=list(point_id_to_atom_id.keys()),
        with_vectors=True,
        with_payload=False,
    )
    out: dict[str, tuple[list[float], list[float]]] = {}
    for point in points:
        atom_id = point_id_to_atom_id.get(str(point.id))
        if atom_id is None:
            continue
        point_vector = point.vector
        if not isinstance(point_vector, dict):
            continue
        core_raw = point_vector.get(CORE_VECTOR_NAME)
        neighborhood_raw = point_vector.get(NEIGHBORHOOD_VECTOR_NAME)
        core = self._parse_dense_vector(core_raw)
        neighborhood = self._parse_dense_vector(neighborhood_raw)
        if core is None or neighborhood is None:
            continue
        out[atom_id] = (core, neighborhood)
    return out

index_ontology(ontology)

Atomize + embed + upsert ontology neighborhoods.

Source code in ontocast/tool/vector_store/qdrant.py
def index_ontology(self, ontology: Ontology) -> int:
    """Atomize + embed + upsert ontology neighborhoods."""
    atoms = self.atomizer.atomize(source=ontology, depth=1)
    if not atoms:
        return 0
    core_texts = [atom.core_representation for atom in atoms]
    neighborhood_texts = [atom.neighborhood_representation for atom in atoms]
    minimal_texts = [atom.minimal_representation for atom in atoms]

    core_vectors = self._embed_texts_batched(core_texts)
    neighborhood_vectors = self._embed_texts_batched(neighborhood_texts)
    bm25_vectors = self._embed_texts_batched_sparse(minimal_texts)

    if len(core_vectors) != len(atoms) or len(neighborhood_vectors) != len(atoms):
        raise ValueError(
            "Embedding provider returned mismatched vector counts for atoms"
        )
    if len(bm25_vectors) != len(atoms):
        raise ValueError(
            "BM25 embedder returned mismatched sparse vector counts for atoms"
        )

    points: list[qdrant_models.PointStruct] = []
    for i, atom in enumerate(atoms):
        vec_map: dict[str, Any] = {
            CORE_VECTOR_NAME: core_vectors[i],
            NEIGHBORHOOD_VECTOR_NAME: neighborhood_vectors[i],
            BM25_VECTOR_NAME: bm25_vectors[i],
        }
        points.append(
            qdrant_models.PointStruct(
                id=self._point_id_for_atom(atom),
                vector=vec_map,
                payload=self._atom_payload(atom),
            )
        )
    collection = self._ontology_collection_name()
    for points_batch in self._iter_batches(points, self.config.upsert_batch_size):
        self.client.upsert(collection_name=collection, points=points_batch)
    return len(points)

initialize() async

Create ontology/facts collections and payload indexes if missing.

Source code in ontocast/tool/vector_store/qdrant.py
async def initialize(self) -> None:
    """Create ontology/facts collections and payload indexes if missing."""
    ontology_col = self.config.ontology_collection
    facts_col = self.config.facts_collection
    assert ontology_col is not None
    assert facts_col is not None
    self._ensure_named_vector_collection(ontology_col)
    self._ensure_named_vector_collection(facts_col)

    self._ensure_payload_index(
        collection_name=ontology_col, field_name="ontology_iri"
    )
    self._ensure_payload_index(
        collection_name=ontology_col, field_name="ontology_version"
    )
    self._ensure_payload_index(
        collection_name=ontology_col, field_name="ontology_hash"
    )
    self._ensure_payload_index(collection_name=ontology_col, field_name="iri")

reindex_ontology(ontology)

Replace all atoms for a given ontology and return indexed count.

Source code in ontocast/tool/vector_store/qdrant.py
def reindex_ontology(self, ontology: Ontology) -> int:
    """Replace all atoms for a given ontology and return indexed count."""
    self.delete_ontology(ontology.iri)
    return self.index_ontology(ontology)

search_by_vector(core_vector, neighborhood_vector, bm25_query_vector=None, top_k=None, filter_iri=None, filter_version=None, filter_hash=None)

Search ontology atoms with rank fusion over named vectors.

Source code in ontocast/tool/vector_store/qdrant.py
def search_by_vector(
    self,
    core_vector: list[float],
    neighborhood_vector: list[float],
    bm25_query_vector: qdrant_models.SparseVector | None = None,
    top_k: int | None = None,
    filter_iri: str | None = None,
    filter_version: str | None = None,
    filter_hash: str | None = None,
) -> list[GraphAtom]:
    """Search ontology atoms with rank fusion over named vectors."""
    channel_hits = self.search_hits_by_vector(
        core_vector=core_vector,
        neighborhood_vector=neighborhood_vector,
        bm25_query_vector=bm25_query_vector,
        top_k=top_k,
        filter_iri=filter_iri,
        filter_version=filter_version,
        filter_hash=filter_hash,
    )
    eff_top_k = self._effective_top_k(top_k)
    fused_hits = self._rank_fuse_channel_hits(
        channel_hits.core_hits,
        channel_hits.neighborhood_hits,
        channel_hits.bm25_hits,
        limit=eff_top_k,
    )
    return [hit.atom for hit in fused_hits]

search_hits_by_vector(core_vector, neighborhood_vector, bm25_query_vector=None, top_k=None, filter_iri=None, filter_version=None, filter_hash=None)

Search ontology atoms and return channel-separated scored hit objects.

Source code in ontocast/tool/vector_store/qdrant.py
def search_hits_by_vector(
    self,
    core_vector: list[float],
    neighborhood_vector: list[float],
    bm25_query_vector: qdrant_models.SparseVector | None = None,
    top_k: int | None = None,
    filter_iri: str | None = None,
    filter_version: str | None = None,
    filter_hash: str | None = None,
) -> OntologySearchHitsByChannel:
    """Search ontology atoms and return channel-separated scored hit objects."""
    eff_top_k = self._effective_top_k(top_k)
    self._require_embedding_vector_length(core_vector, role="Query core vector")
    self._require_embedding_vector_length(
        neighborhood_vector, role="Query neighborhood vector"
    )
    search_filter = self._build_filter(
        filter_iri=filter_iri,
        filter_version=filter_version,
        filter_hash=filter_hash,
    )
    core_hits = self._query_named_vector(
        vector_name=CORE_VECTOR_NAME,
        vector=core_vector,
        limit=eff_top_k,
        search_filter=search_filter,
    )
    neighborhood_hits = self._query_named_vector(
        vector_name=NEIGHBORHOOD_VECTOR_NAME,
        vector=neighborhood_vector,
        limit=eff_top_k,
        search_filter=search_filter,
    )
    bm25_hits_raw: list[Any] = []
    if bm25_query_vector is not None:
        bm25_hits_raw = self._query_named_vector(
            vector_name=BM25_VECTOR_NAME,
            vector=bm25_query_vector,
            limit=eff_top_k,
            search_filter=search_filter,
        )
    core_typed_hits = self._points_to_hits(core_hits)
    neighborhood_typed_hits = self._points_to_hits(
        neighborhood_hits, apply_neighborhood_empty_penalty=True
    )
    bm25_typed_hits = self._points_to_hits(bm25_hits_raw)
    if self.config.dedup_query_hits_by_iri:
        core_typed_hits = self._dedupe_hits_by_identity(core_typed_hits)
        neighborhood_typed_hits = self._dedupe_hits_by_identity(
            neighborhood_typed_hits
        )
        bm25_typed_hits = self._dedupe_hits_by_identity(bm25_typed_hits)
    return OntologySearchHitsByChannel(
        core_hits=core_typed_hits,
        neighborhood_hits=neighborhood_typed_hits,
        bm25_hits=bm25_typed_hits,
    )

search_patch_hits(query, top_k=None, filter_iri=None, filter_version=None, filter_hash=None)

Search ontology atoms and return rank-fused scored hit objects.

Source code in ontocast/tool/vector_store/qdrant.py
def search_patch_hits(
    self,
    query: str,
    top_k: int | None = None,
    filter_iri: str | None = None,
    filter_version: str | None = None,
    filter_hash: str | None = None,
) -> list[OntologySearchHit]:
    """Search ontology atoms and return rank-fused scored hit objects."""
    core_q, neigh_q, bm25_q = self._encode_single_query_vectors(query)
    channel_hits = self.search_hits_by_vector(
        core_vector=core_q,
        neighborhood_vector=neigh_q,
        bm25_query_vector=bm25_q,
        top_k=top_k,
        filter_iri=filter_iri,
        filter_version=filter_version,
        filter_hash=filter_hash,
    )
    eff_top_k = self._effective_top_k(top_k)
    return self._rank_fuse_channel_hits(
        channel_hits.core_hits,
        channel_hits.neighborhood_hits,
        channel_hits.bm25_hits,
        limit=eff_top_k,
    )

search_patch_hits_many(queries, top_k=None, filter_iri=None, filter_version=None, filter_hash=None)

Search ontology atoms for many queries with split-channel outputs.

Source code in ontocast/tool/vector_store/qdrant.py
def search_patch_hits_many(
    self,
    queries: list[str],
    top_k: int | None = None,
    filter_iri: str | None = None,
    filter_version: str | None = None,
    filter_hash: str | None = None,
) -> list[OntologySearchHitsByChannel]:
    """Search ontology atoms for many queries with split-channel outputs."""
    return self._search_patch_hits_many_impl(
        queries,
        top_k,
        filter_iri,
        filter_version,
        filter_hash,
    )

search_patches(query, top_k=None, filter_iri=None, filter_version=None, filter_hash=None)

Search ontology atoms by text query using weighted multi-vector fusion.

Source code in ontocast/tool/vector_store/qdrant.py
def search_patches(
    self,
    query: str,
    top_k: int | None = None,
    filter_iri: str | None = None,
    filter_version: str | None = None,
    filter_hash: str | None = None,
) -> list[GraphAtom]:
    """Search ontology atoms by text query using weighted multi-vector fusion."""
    core_q, neigh_q, bm25_q = self._encode_single_query_vectors(query)
    return self.search_by_vector(
        core_vector=core_q,
        neighborhood_vector=neigh_q,
        bm25_query_vector=bm25_q,
        top_k=top_k,
        filter_iri=filter_iri,
        filter_version=filter_version,
        filter_hash=filter_hash,
    )

SearchHit

Bases: BaseModel

Single web-search hit used as optional grounding context.

Source code in ontocast/tool/atomic.py
class SearchHit(BaseModel):
    """Single web-search hit used as optional grounding context."""

    title: str
    url: str
    snippet: str

Tool

Bases: BasePydanticModel

Base class for all OntoCast tools.

This class serves as the foundation for all tools in the OntoCast system. It provides common functionality and interface that all tools must implement. Tools should inherit from this class and implement their specific functionality.

Source code in ontocast/tool/onto.py
class Tool(BasePydanticModel):
    """Base class for all OntoCast tools.

    This class serves as the foundation for all tools in the OntoCast system.
    It provides common functionality and interface that all tools must implement.
    Tools should inherit from this class and implement their specific functionality.

    Attributes:
        Inherits all attributes from BasePydanticModel.
    """

    def __init__(self, **kwargs):
        """Initialize the tool.

        Args:
            **kwargs: Keyword arguments passed to the parent class.
        """
        super().__init__(**kwargs)

__init__(**kwargs)

Initialize the tool.

Parameters:

Name Type Description Default
**kwargs

Keyword arguments passed to the parent class.

{}
Source code in ontocast/tool/onto.py
def __init__(self, **kwargs):
    """Initialize the tool.

    Args:
        **kwargs: Keyword arguments passed to the parent class.
    """
    super().__init__(**kwargs)

TripleStoreManager

Bases: Tool

Source code in ontocast/tool/triple_manager/core.py
class TripleStoreManager(Tool):
    _PROVENANCE_METADATA_PREDICATES: ClassVar[set] = {
        PROV.generatedAtTime,
        SCHEMA.position,
        SCHEMA.identifier,
    }

    """Base class for managing RDF triple stores.

    This class defines the interface for triple store management operations,
    including fetching and storing ontologies and their graphs. All concrete
    triple store implementations should inherit from this class.

    This is an abstract base class that must be implemented by specific
    triple store backends (e.g., Neo4j, Fuseki, Filesystem).
    """

    def __init__(self, **kwargs):
        """Initialize the triple store manager.

        Args:
            **kwargs: Additional keyword arguments passed to the parent class.
        """
        super().__init__(**kwargs)

    @abc.abstractmethod
    def fetch_ontologies(self) -> list[Ontology]:
        """Fetch all available ontologies from the triple store.

        This method should retrieve all ontologies stored in the triple store
        and return them as Ontology objects with their associated RDF graphs.

        Returns:
            list[Ontology]: List of available ontologies with their graphs.
        """
        return []

    async def afetch_ontologies(self) -> list[Ontology]:
        """Async fetch helper for backends without native async I/O."""
        return await asyncio.to_thread(self.fetch_ontologies)

    @abc.abstractmethod
    def serialize_graph(self, graph: Graph, **kwargs) -> bool | None:
        """Store an RDF graph in the triple store.

        This method should store the given RDF graph in the triple store.
        The implementation may choose how to organize the storage (e.g., as named graphs,
        in specific collections, etc.).

        Args:
            graph: The RDF graph to store.
            **kwargs: Implementation-specific arguments (e.g., fname for filesystem, graph_uri for Fuseki).

        Returns:
            bool | None: Implementation-specific return value (bool for Fuseki, summary for Neo4j, None for Filesystem).
        """
        pass

    @abc.abstractmethod
    def serialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:  # type: ignore[override]
        """Store an RDF graph in the triple store.

        This method should store the given RDF graph in the triple store.
        The implementation may choose how to organize the storage (e.g., as named graphs,
        in specific collections, etc.).

        Args:
            o: RDF graph or Ontology object to store.
            **kwargs: Implementation-specific arguments (e.g., graph_uri for Fuseki).

        Returns:
            bool | None: Implementation-specific return value (bool for Fuseki, summary for Neo4j, None for Filesystem).
        """
        pass

    async def aserialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
        """Async serialize helper for backends without native async I/O."""
        return await asyncio.to_thread(self.serialize, o, **kwargs)

    @classmethod
    def strip_provenance(cls, graph: Graph) -> RDFGraph:
        """Return a graph without reification/provenance scaffolding triples."""
        clean = RDFGraph()
        for prefix, namespace in graph.namespaces():
            clean.bind(prefix, namespace)

        reifier_nodes = set(graph.subjects(RDF_REIFIES, None))
        source_nodes = set(graph.objects(None, PROV.wasDerivedFrom))

        for subject, predicate, object_ in graph:
            if predicate in {RDF_REIFIES, PROV.wasDerivedFrom}:
                continue
            if subject in reifier_nodes:
                continue
            if subject in source_nodes:
                continue
            clean.add((subject, predicate, object_))

        return clean

    @abc.abstractmethod
    async def clean(self) -> None:
        """Clean/flush data managed by this store (backend-specific scope).

        Warning: This operation is irreversible and will delete data.

        Raises:
            NotImplementedError: If the triple store doesn't support cleaning.
        """
        raise NotImplementedError("clean() method must be implemented by subclasses")

    def supports_tenancy_partition(self) -> bool:
        """True if this backend isolates facts/ontologies by :func:`tenant_project_*` names."""
        return False

    async def clean_tenancy(self, tenant: str, project: str) -> None:
        """Remove all triples for datasets derived from ``tenant`` / ``project``.

        Backends without per-tenant partitions raise :class:`NotImplementedError`.
        """
        raise NotImplementedError(
            f"{type(self).__name__} does not isolate data by tenant/project"
        )

__init__(**kwargs)

Initialize the triple store manager.

Parameters:

Name Type Description Default
**kwargs

Additional keyword arguments passed to the parent class.

{}
Source code in ontocast/tool/triple_manager/core.py
def __init__(self, **kwargs):
    """Initialize the triple store manager.

    Args:
        **kwargs: Additional keyword arguments passed to the parent class.
    """
    super().__init__(**kwargs)

afetch_ontologies() async

Async fetch helper for backends without native async I/O.

Source code in ontocast/tool/triple_manager/core.py
async def afetch_ontologies(self) -> list[Ontology]:
    """Async fetch helper for backends without native async I/O."""
    return await asyncio.to_thread(self.fetch_ontologies)

aserialize(o, **kwargs) async

Async serialize helper for backends without native async I/O.

Source code in ontocast/tool/triple_manager/core.py
async def aserialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:
    """Async serialize helper for backends without native async I/O."""
    return await asyncio.to_thread(self.serialize, o, **kwargs)

clean() abstractmethod async

Clean/flush data managed by this store (backend-specific scope).

Warning: This operation is irreversible and will delete data.

Raises:

Type Description
NotImplementedError

If the triple store doesn't support cleaning.

Source code in ontocast/tool/triple_manager/core.py
@abc.abstractmethod
async def clean(self) -> None:
    """Clean/flush data managed by this store (backend-specific scope).

    Warning: This operation is irreversible and will delete data.

    Raises:
        NotImplementedError: If the triple store doesn't support cleaning.
    """
    raise NotImplementedError("clean() method must be implemented by subclasses")

clean_tenancy(tenant, project) async

Remove all triples for datasets derived from tenant / project.

Backends without per-tenant partitions raise :class:NotImplementedError.

Source code in ontocast/tool/triple_manager/core.py
async def clean_tenancy(self, tenant: str, project: str) -> None:
    """Remove all triples for datasets derived from ``tenant`` / ``project``.

    Backends without per-tenant partitions raise :class:`NotImplementedError`.
    """
    raise NotImplementedError(
        f"{type(self).__name__} does not isolate data by tenant/project"
    )

fetch_ontologies() abstractmethod

Fetch all available ontologies from the triple store.

This method should retrieve all ontologies stored in the triple store and return them as Ontology objects with their associated RDF graphs.

Returns:

Type Description
list[Ontology]

list[Ontology]: List of available ontologies with their graphs.

Source code in ontocast/tool/triple_manager/core.py
@abc.abstractmethod
def fetch_ontologies(self) -> list[Ontology]:
    """Fetch all available ontologies from the triple store.

    This method should retrieve all ontologies stored in the triple store
    and return them as Ontology objects with their associated RDF graphs.

    Returns:
        list[Ontology]: List of available ontologies with their graphs.
    """
    return []

serialize(o, **kwargs) abstractmethod

Store an RDF graph in the triple store.

This method should store the given RDF graph in the triple store. The implementation may choose how to organize the storage (e.g., as named graphs, in specific collections, etc.).

Parameters:

Name Type Description Default
o Ontology | RDFGraph

RDF graph or Ontology object to store.

required
**kwargs

Implementation-specific arguments (e.g., graph_uri for Fuseki).

{}

Returns:

Type Description
bool | None

bool | None: Implementation-specific return value (bool for Fuseki, summary for Neo4j, None for Filesystem).

Source code in ontocast/tool/triple_manager/core.py
@abc.abstractmethod
def serialize(self, o: Ontology | RDFGraph, **kwargs) -> bool | None:  # type: ignore[override]
    """Store an RDF graph in the triple store.

    This method should store the given RDF graph in the triple store.
    The implementation may choose how to organize the storage (e.g., as named graphs,
    in specific collections, etc.).

    Args:
        o: RDF graph or Ontology object to store.
        **kwargs: Implementation-specific arguments (e.g., graph_uri for Fuseki).

    Returns:
        bool | None: Implementation-specific return value (bool for Fuseki, summary for Neo4j, None for Filesystem).
    """
    pass

serialize_graph(graph, **kwargs) abstractmethod

Store an RDF graph in the triple store.

This method should store the given RDF graph in the triple store. The implementation may choose how to organize the storage (e.g., as named graphs, in specific collections, etc.).

Parameters:

Name Type Description Default
graph Graph

The RDF graph to store.

required
**kwargs

Implementation-specific arguments (e.g., fname for filesystem, graph_uri for Fuseki).

{}

Returns:

Type Description
bool | None

bool | None: Implementation-specific return value (bool for Fuseki, summary for Neo4j, None for Filesystem).

Source code in ontocast/tool/triple_manager/core.py
@abc.abstractmethod
def serialize_graph(self, graph: Graph, **kwargs) -> bool | None:
    """Store an RDF graph in the triple store.

    This method should store the given RDF graph in the triple store.
    The implementation may choose how to organize the storage (e.g., as named graphs,
    in specific collections, etc.).

    Args:
        graph: The RDF graph to store.
        **kwargs: Implementation-specific arguments (e.g., fname for filesystem, graph_uri for Fuseki).

    Returns:
        bool | None: Implementation-specific return value (bool for Fuseki, summary for Neo4j, None for Filesystem).
    """
    pass

strip_provenance(graph) classmethod

Return a graph without reification/provenance scaffolding triples.

Source code in ontocast/tool/triple_manager/core.py
@classmethod
def strip_provenance(cls, graph: Graph) -> RDFGraph:
    """Return a graph without reification/provenance scaffolding triples."""
    clean = RDFGraph()
    for prefix, namespace in graph.namespaces():
        clean.bind(prefix, namespace)

    reifier_nodes = set(graph.subjects(RDF_REIFIES, None))
    source_nodes = set(graph.objects(None, PROV.wasDerivedFrom))

    for subject, predicate, object_ in graph:
        if predicate in {RDF_REIFIES, PROV.wasDerivedFrom}:
            continue
        if subject in reifier_nodes:
            continue
        if subject in source_nodes:
            continue
        clean.add((subject, predicate, object_))

    return clean

supports_tenancy_partition()

True if this backend isolates facts/ontologies by :func:tenant_project_* names.

Source code in ontocast/tool/triple_manager/core.py
def supports_tenancy_partition(self) -> bool:
    """True if this backend isolates facts/ontologies by :func:`tenant_project_*` names."""
    return False