Skip to content

ontocast.agent

Agent module for OntoCast.

This module provides a collection of agents that handle various aspects of ontology processing, including document conversion, text chunking, fact aggregation, and ontology management. Each agent is designed to perform a specific task in the ontology processing pipeline.

aggregate(state, tools)

Aggregate facts from multiple processed chunks into a single RDF graph.

Parameters:

Name Type Description Default
state AgentState

Current agent state with processed chunks

required
tools ToolBox

ToolBox containing aggregation tools

required

Returns:

Type Description
AgentState

Updated agent state with aggregated facts

Source code in ontocast/agent/aggregate_serialize.py
def aggregate(state: AgentState, tools: ToolBox) -> AgentState:
    """Aggregate facts from multiple processed chunks into a single RDF graph.

    Args:
        state: Current agent state with processed chunks
        tools: ToolBox containing aggregation tools

    Returns:
        Updated agent state with aggregated facts
    """
    for c in state.chunks_processed:
        c.sanitize()

    state.aggregated_facts = tools.aggregator.aggregate_graphs(
        chunks=state.chunks_processed, doc_namespace=state.doc_namespace
    )
    total_chunks = len(state.chunks_processed)
    logger.info(
        f"Aggregating {total_chunks} processed chunks: "
        f"ontology {len(state.current_ontology.graph)} triples; "
        f"facts graph: {len(state.aggregated_facts)} triples"
    )

    # Add provenance information if source URL is available
    if state.source_url and state.doc_namespace:
        doc_iri = URIRef(state.doc_namespace)
        source_url_uri = URIRef(state.source_url)
        state.aggregated_facts.add((doc_iri, DCTERMS.source, source_url_uri))
        logger.info(
            f"Added provenance: {state.doc_namespace} dcterms:source {state.source_url}"
        )

    return state

check_chunks_empty(state)

Check if chunks are available and manage chunk processing state.

This function checks if there are remaining chunks to process and manages the state transitions accordingly. If chunks are available, it sets up the next chunk for processing. If no chunks remain, it signals completion of the workflow.

The function performs the following operations: 1. Adds the current chunk to the processed list if it exists 2. Checks if there are remaining chunks to process 3. Sets up the next chunk and resets node visits if chunks remain 4. Sets appropriate status for workflow routing

Parameters:

Name Type Description Default
state AgentState

The current agent state containing chunks and processing status.

required

Returns:

Name Type Description
AgentState AgentState

Updated agent state with chunk processing information.

Example

state = AgentState(chunks=[chunk1, chunk2], current_chunk=None) updated_state = check_chunks_empty(state) print(updated_state.current_chunk) # chunk1 print(updated_state.status) # Status.FAILED

Source code in ontocast/agent/check_chunks.py
def check_chunks_empty(state: AgentState) -> AgentState:
    """Check if chunks are available and manage chunk processing state.

    This function checks if there are remaining chunks to process and
    manages the state transitions accordingly. If chunks are available,
    it sets up the next chunk for processing. If no chunks remain,
    it signals completion of the workflow.

    The function performs the following operations:
    1. Adds the current chunk to the processed list if it exists
    2. Checks if there are remaining chunks to process
    3. Sets up the next chunk and resets node visits if chunks remain
    4. Sets appropriate status for workflow routing

    Args:
        state: The current agent state containing chunks and processing status.

    Returns:
        AgentState: Updated agent state with chunk processing information.

    Example:
        >>> state = AgentState(chunks=[chunk1, chunk2], current_chunk=None)
        >>> updated_state = check_chunks_empty(state)
        >>> print(updated_state.current_chunk)  # chunk1
        >>> print(updated_state.status)  # Status.FAILED
    """

    if CHUNK_NULL_IRI not in state.current_chunk.iri:
        state.current_chunk.processed = True
        state.current_chunk.graph.remap_namespaces(
            old_namespace=DEFAULT_CHUNK_IRI, new_namespace=state.current_chunk.namespace
        )
        state.chunks_processed.append(state.current_chunk)

    if state.chunks:
        state.current_chunk = state.chunks.pop(0)
        state.node_visits = defaultdict(int)
        state.status = Status.FAILED
    else:
        state.current_chunk = Chunk(
            text="",
            hid="default",
            doc_iri=CHUNK_NULL_IRI,
        )
        state.status = Status.SUCCESS
        logger.info(
            f"All chunks processed ({len(state.chunks_processed)} total), "
            "setting status to SUCCESS and proceeding to AGGREGATE_FACTS"
        )

    return state

render_facts_fresh(state, tools) async

Render fresh facts from the current chunk into Turtle format.

Parameters:

Name Type Description Default
state AgentState

The current agent state containing the chunk to render.

required
tools ToolBox

The toolbox instance providing utility functions.

required

Returns:

Name Type Description
AgentState AgentState

Updated state with rendered facts.

Source code in ontocast/agent/render_facts.py
async def render_facts_fresh(state: AgentState, tools: ToolBox) -> AgentState:
    """Render fresh facts from the current chunk into Turtle format.

    Args:
        state: The current agent state containing the chunk to render.
        tools: The toolbox instance providing utility functions.

    Returns:
        AgentState: Updated state with rendered facts.
    """
    logger.info("Rendering fresh facts")
    llm_tool = tools.llm
    parser = PydanticOutputParser(pydantic_object=SemanticTriplesFactsReport)

    prompt_data = _prepare_prompt_data(state)
    prompt_data_fresh = {
        "preamble": preamble,
        "improvement_instruction": "",
        "output_instruction": output_instruction_empty,
    }
    prompt_data.update(prompt_data_fresh)

    prompt = _create_prompt_template()

    try:
        proj = await call_llm_with_retry(
            llm_tool=llm_tool,
            prompt=prompt,
            parser=parser,
            prompt_kwargs={
                "format_instructions": parser.get_format_instructions(),
                **prompt_data,
            },
        )
        proj.semantic_graph.sanitize_prefixes_namespaces()
        state.current_chunk.graph = proj.semantic_graph

        # Track triples in budget tracker (fresh facts)
        num_triples = len(proj.semantic_graph)
        logger.info(f"Fresh facts generated with {num_triples} triple(s).")
        state.budget_tracker.add_facts_update(num_operations=1, num_triples=num_triples)

        state.clear_failure()
        return state

    except Exception as e:
        return _handle_rendering_error(state, e, FailureStage.GENERATE_TTL_FOR_FACTS)

render_ontology_fresh(state, tools) async

Render ontology triples into a human-readable format.

This function takes the triples from the current ontology and renders them into a more accessible format, making the ontological knowledge easier to understand.

Parameters:

Name Type Description Default
state AgentState

The current agent state containing the ontology to render.

required
tools ToolBox

The toolbox instance providing utility functions.

required

Returns:

Name Type Description
AgentState AgentState

Updated state with rendered triples.

Source code in ontocast/agent/render_ontology.py
async def render_ontology_fresh(state: AgentState, tools: ToolBox) -> AgentState:
    """Render ontology triples into a human-readable format.

    This function takes the triples from the current ontology and renders them
    into a more accessible format, making the ontological knowledge easier to
    understand.

    Args:
        state: The current agent state containing the ontology to render.
        tools: The toolbox instance providing utility functions.

    Returns:
        AgentState: Updated state with rendered triples.
    """

    parser = PydanticOutputParser(pydantic_object=Ontology)
    logger.info("Rendering fresh ontology")
    intro_instruction = intro_instruction_fresh.format(
        current_domain=state.current_domain
    )
    output_instruction = output_instruction_ttl
    ontology_ttl = ""
    improvement_instruction_str = ""
    general_ontology_instruction_str = general_ontology_instruction.format(
        prefix_instruction=prefix_instruction_fresh
    )

    text_chapter = text_template.format(text=state.current_chunk.text)

    prompt = PromptTemplate(
        template=template_prompt,
        input_variables=[
            "preamble",
            "intro_instruction",
            "ontology_instruction",
            "output_instruction",
            "user_instruction",
            "improvement_instruction",
            "ontology_ttl",
            "text",
            "format_instructions",
        ],
    )

    try:
        llm_tool = await tools.get_llm_tool(state.budget_tracker)
        state.current_ontology = await call_llm_with_retry(
            llm_tool=llm_tool,
            prompt=prompt,
            parser=parser,
            prompt_kwargs={
                "preamble": system_preamble,
                "intro_instruction": intro_instruction,
                "ontology_instruction": general_ontology_instruction_str,
                "output_instruction": output_instruction,
                "ontology_ttl": ontology_ttl,
                "user_instruction": state.ontology_user_instruction,
                "improvement_instruction": improvement_instruction_str,
                "text": text_chapter,
                "format_instructions": parser.get_format_instructions(),
            },
        )
        state.current_ontology.graph.sanitize_prefixes_namespaces()

        num_triples = len(state.current_ontology.graph)
        logger.info(f"New ontology created with {num_triples} triple(s).")

        # Track triples in budget tracker (fresh ontology)
        state.budget_tracker.add_ontology_update(
            num_operations=1, num_triples=num_triples
        )

        state.clear_failure()
        state.set_node_status(WorkflowNode.TEXT_TO_ONTOLOGY, Status.SUCCESS)
        return state

    except Exception as e:
        logger.error(f"Failed to generate triples: {str(e)}")
        state.set_node_status(WorkflowNode.TEXT_TO_ONTOLOGY, Status.FAILED)
        state.set_failure(FailureStage.GENERATE_TTL_FOR_ONTOLOGY, str(e))
        return state

serialize(state, tools)

Serialize the knowledge graph to the triple store.

This function: - Handles version management for updated ontologies - Tracks budget usage - Serializes both ontology and facts to the triple store

Parameters:

Name Type Description Default
state AgentState

Current agent state with ontology and facts

required
tools ToolBox

ToolBox containing serialization tools

required

Returns:

Type Description
AgentState

Updated agent state after serialization

Source code in ontocast/agent/aggregate_serialize.py
def serialize(state: AgentState, tools: ToolBox) -> AgentState:
    """Serialize the knowledge graph to the triple store.

    This function:
    - Handles version management for updated ontologies
    - Tracks budget usage
    - Serializes both ontology and facts to the triple store

    Args:
        state: Current agent state with ontology and facts
        tools: ToolBox containing serialization tools

    Returns:
        Updated agent state after serialization
    """
    # Initialize empty facts graph if not set (for skip_facts_rendering case)
    if not hasattr(state, "aggregated_facts") or state.aggregated_facts is None:
        state.aggregated_facts = RDFGraph()
        logger.info("No facts to serialize (skip_facts_rendering mode)")

    # Check if the ontology was updated during processing
    # If there were updates applied, increment the version (MAJOR/MINOR/PATCH)
    if state.ontology_updates_applied:
        logger.info(
            f"Ontology was updated during processing ({len(state.ontology_updates_applied)} update operations). "
            f"Analyzing changes to determine version increment..."
        )
        # Pass the updates to analyze and increment version appropriately
        state.current_ontology.mark_as_updated(state.ontology_updates_applied)
        # Sync the updated properties (version and created_at) to the graph
        state.current_ontology.sync_properties_to_graph()
    else:
        logger.debug(
            f"Ontology unchanged during processing (version: {state.current_ontology.version})"
        )

    # Report LLM budget usage
    if state.budget_tracker:
        logger.info(state.budget_tracker.get_summary())
    tools.serialize(state)
    return state