Skip to content

ontocast.agent

Agent module for OntoCast.

This module provides a collection of agents that handle various aspects of ontology processing, including document conversion, text chunking, fact aggregation, and ontology management. Each agent is designed to perform a specific task in the ontology processing pipeline.

aggregate_serialize(state, tools)

Create a node that saves the knowledge graph.

Source code in ontocast/agent/aggregate_facts.py
def aggregate_serialize(state: AgentState, tools: ToolBox) -> AgentState:
    """Create a node that saves the knowledge graph."""
    tsm_tool = tools.triple_store_manager

    for c in state.chunks_processed:
        c.sanitize()

    state.aggregated_facts = tools.aggregator.aggregate_graphs(
        state.chunks_processed, state.doc_namespace
    )
    logger.info(
        f"chunks proc: {len(state.chunks_processed)}\n"
        f"facts graph: {len(state.aggregated_facts)} triples\n"
        f"onto graph {len(state.current_ontology.graph)} triples"
    )
    tsm_tool.serialize_ontology(state.current_ontology)
    if len(state.aggregated_facts) > 0:
        tsm_tool.serialize_facts(state.aggregated_facts, spec=state.doc_namespace)

    return state

check_chunks_empty(state)

Check if chunks are available and manage chunk processing state.

This function checks if there are remaining chunks to process and manages the state transitions accordingly. If chunks are available, it sets up the next chunk for processing. If no chunks remain, it signals completion of the workflow.

The function performs the following operations: 1. Adds the current chunk to the processed list if it exists 2. Checks if there are remaining chunks to process 3. Sets up the next chunk and resets node visits if chunks remain 4. Sets appropriate status for workflow routing

Parameters:

Name Type Description Default
state AgentState

The current agent state containing chunks and processing status.

required

Returns:

Name Type Description
AgentState AgentState

Updated agent state with chunk processing information.

Example

state = AgentState(chunks=[chunk1, chunk2], current_chunk=None) updated_state = check_chunks_empty(state) print(updated_state.current_chunk) # chunk1 print(updated_state.status) # Status.FAILED

Source code in ontocast/agent/check_chunks.py
def check_chunks_empty(state: AgentState) -> AgentState:
    """Check if chunks are available and manage chunk processing state.

    This function checks if there are remaining chunks to process and
    manages the state transitions accordingly. If chunks are available,
    it sets up the next chunk for processing. If no chunks remain,
    it signals completion of the workflow.

    The function performs the following operations:
    1. Adds the current chunk to the processed list if it exists
    2. Checks if there are remaining chunks to process
    3. Sets up the next chunk and resets node visits if chunks remain
    4. Sets appropriate status for workflow routing

    Args:
        state: The current agent state containing chunks and processing status.

    Returns:
        AgentState: Updated agent state with chunk processing information.

    Example:
        >>> state = AgentState(chunks=[chunk1, chunk2], current_chunk=None)
        >>> updated_state = check_chunks_empty(state)
        >>> print(updated_state.current_chunk)  # chunk1
        >>> print(updated_state.status)  # Status.FAILED
    """
    logger.info(
        f"Chunks (rem): {len(state.chunks)}, "
        f"chunks proc: {len(state.chunks_processed)}. "
        f"Setting up current chunk"
    )

    if state.current_chunk is not None:
        state.chunks_processed.append(state.current_chunk)

    if state.chunks:
        state.current_chunk = state.chunks.pop(0)
        state.node_visits = defaultdict(int)
        state.status = Status.FAILED
        logger.info(
            "Chunk available, setting status to FAILED"
            " and proceeding to SELECT_ONTOLOGY"
        )
    else:
        state.current_chunk = None
        state.status = Status.SUCCESS
        logger.info(
            "No more chunks, setting status to SUCCESS "
            "and proceeding to AGGREGATE_FACTS"
        )

    return state

render_onto_triples(state, tools)

Render ontology triples into a human-readable format.

This function takes the triples from the current ontology and renders them into a more accessible format, making the ontological knowledge easier to understand.

Parameters:

Name Type Description Default
state AgentState

The current agent state containing the ontology to render.

required
tools ToolBox

The toolbox instance providing utility functions.

required

Returns:

Name Type Description
AgentState AgentState

Updated state with rendered triples.

Source code in ontocast/agent/render_ontology_triples.py
def render_onto_triples(state: AgentState, tools: ToolBox) -> AgentState:
    """Render ontology triples into a human-readable format.

    This function takes the triples from the current ontology and renders them
    into a more accessible format, making the ontological knowledge easier to
    understand.

    Args:
        state: The current agent state containing the ontology to render.
        tools: The toolbox instance providing utility functions.

    Returns:
        AgentState: Updated state with rendered triples.
    """
    logger.info("Starting to render ontology triples")
    llm_tool = tools.llm

    parser = PydanticOutputParser(pydantic_object=Ontology)

    logger.debug(f"Using domain: {state.current_domain}")

    if state.current_ontology.ontology_id == ONTOLOGY_NULL_ID:
        logger.info("Creating fresh ontology")
        ontology_instruction = ontology_instruction_fresh
        specific_ontology_instruction = specific_ontology_instruction_fresh.format(
            current_domain=state.current_domain
        )
    else:
        ontology_iri = state.current_ontology.iri
        ontology_str = state.current_ontology.graph.serialize(format="turtle")
        ontology_desc = state.current_ontology.describe()
        ontology_instruction = ontology_instruction_update.format(
            ontology_iri=ontology_iri,
            ontology_desc=ontology_desc,
            ontology_str=ontology_str,
        )
        specific_ontology_instruction = specific_ontology_instruction_update.format(
            ontology_namespace=state.current_ontology.namespace
        )

    _instructions = instructions.format(
        specific_ontology_instruction=specific_ontology_instruction
    )

    prompt = PromptTemplate(
        template=template_prompt,
        input_variables=[
            "text",
            "instructions",
            "ontology_instruction",
            "failure_instruction",
            "format_instructions",
        ],
    )

    if state.status != Status.SUCCESS and state.failure_reason is not None:
        _failure_instruction = failure_instruction.format(
            failure_stage=state.failure_stage,
            failure_reason=state.failure_reason,
        )
    else:
        _failure_instruction = ""

    try:
        response = llm_tool(
            prompt.format_prompt(
                text=state.current_chunk.text,
                instructions=_instructions,
                ontology_instruction=ontology_instruction,
                failure_instruction=_failure_instruction,
                format_instructions=parser.get_format_instructions(),
            )
        )

        state.ontology_addendum = parser.parse(response.content)
        state.ontology_addendum.graph.sanitize_prefixes_namespaces()

        logger.info(
            f"Ontology addendum has {len(state.ontology_addendum.graph)} triples."
        )
        state.clear_failure()
        return state

    except Exception as e:
        logger.error(f"Failed to generate triples: {str(e)}")
        state.set_failure(FailureStages.PARSE_TEXT_TO_ONTOLOGY_TRIPLES, str(e))
        return state