Skip to content

ontocast.cli.serve

OntoCast API server implementation.

This module provides a web server implementation for the OntoCast framework using Robyn. It exposes REST API endpoints for processing documents and extracting semantic triples with ontology assistance.

The server supports: - Health check endpoint (/health) - Service information endpoint (/info) - Document processing endpoint (/process) - Triple store flush endpoint (/flush) - Multiple input formats (JSON, multipart/form-data) - Streaming workflow execution - Comprehensive error handling and logging

The server integrates with the OntoCast workflow graph to process documents through the complete pipeline: chunking, ontology selection, fact extraction, and aggregation.

Example

With Fuseki backend (auto-detected from FUSEKI_URI and FUSEKI_AUTH)

ontocast --env-path .env

Process specific file

ontocast --env-path .env --input-path ./document.pdf

Process with chunk limit

ontocast --env-path .env --head-chunks 5

calculate_recursion_limit(head_chunks, server_config)

Calculate the recursion limit based on max_visits and head_chunks.

Parameters:

Name Type Description Default
head_chunks int | None

Optional maximum number of chunks to process

required

Returns:

Name Type Description
int int

Calculated recursion limit

Source code in ontocast/cli/serve.py
def calculate_recursion_limit(
    head_chunks: int | None,
    server_config: ServerConfig,
) -> int:
    """Calculate the recursion limit based on max_visits and head_chunks.

    Args:
        head_chunks: Optional maximum number of chunks to process

    Returns:
        int: Calculated recursion limit
    """
    if head_chunks is not None:
        # If we know the number of chunks, calculate exact limit
        return max(
            server_config.base_recursion_limit,
            server_config.max_visits * head_chunks * 10,
        )
    else:
        # If we don't know chunks, use a conservative estimate
        return max(
            server_config.base_recursion_limit,
            server_config.max_visits * server_config.estimated_chunks * 10,
        )

run(env_file, input_path, head_chunks)

Main entry point for the OntoCast server/CLI.

Backend selection is automatically inferred from available configuration: - Fuseki: If FUSEKI_URI and FUSEKI_AUTH are provided (preferred) - Neo4j: If NEO4J_URI and NEO4J_AUTH are provided (fallback) - Filesystem Triple Store: If ONTOCAST_WORKING_DIRECTORY and ONTOCAST_ONTOLOGY_DIRECTORY are provided - Filesystem Manager: If ONTOCAST_WORKING_DIRECTORY is provided (can be combined with other backends)

No explicit backend configuration flags are needed - backends are automatically detected.

Source code in ontocast/cli/serve.py
@click.command()
@click.option(
    "--env-file",
    type=click.Path(path_type=pathlib.Path),
    required=True,
    default=".env",
    help="Path to .env file containing backend and configuration settings",
)
@click.option("--input-path", type=click.Path(path_type=pathlib.Path), default=None)
@click.option("--head-chunks", type=int, default=None)
def run(
    env_file: pathlib.Path,
    input_path: pathlib.Path | None,
    head_chunks: int | None,
):
    """
    Main entry point for the OntoCast server/CLI.

    Backend selection is automatically inferred from available configuration:
    - Fuseki: If FUSEKI_URI and FUSEKI_AUTH are provided (preferred)
    - Neo4j: If NEO4J_URI and NEO4J_AUTH are provided (fallback)
    - Filesystem Triple Store: If ONTOCAST_WORKING_DIRECTORY and ONTOCAST_ONTOLOGY_DIRECTORY are provided
    - Filesystem Manager: If ONTOCAST_WORKING_DIRECTORY is provided (can be combined with other backends)

    No explicit backend configuration flags are needed - backends are automatically detected.

    """

    _ = load_dotenv(dotenv_path=env_file.expanduser())
    # Global configuration instance
    config = Config()

    # Validate LLM configuration
    config.validate_llm_config()

    if config.logging_level is not None:
        try:
            logger_conf = f"logging.{config.logging_level}.conf"
            logging.config.fileConfig(logger_conf, disable_existing_loggers=False)
            logger.debug("debug is on")
        except Exception as e:
            logger.error(f"could set logging level correctly {e}")

    if config.tool_config.path_config.working_directory is not None:
        config.tool_config.path_config.working_directory = pathlib.Path(
            config.tool_config.path_config.working_directory
        ).expanduser()
        config.tool_config.path_config.working_directory.mkdir(
            parents=True, exist_ok=True
        )
    else:
        raise ValueError(
            "Working directory must be provided via CLI argument or WORKING_DIRECTORY config"
        )

    if config.tool_config.path_config.ontology_directory is not None:
        config.tool_config.path_config.ontology_directory = pathlib.Path(
            config.tool_config.path_config.ontology_directory
        ).expanduser()

    # Create ToolBox with config
    tools: ToolBox = ToolBox(config)
    asyncio.run(tools.initialize())

    workflow: CompiledStateGraph = create_agent_graph(tools)

    if input_path:
        input_path = input_path.expanduser()

        files = sorted(
            crawl_directories(
                input_path,
                suffixes=tuple([".json"] + list(tools.converter.supported_extensions)),
            )
        )

        recursion_limit = calculate_recursion_limit(
            head_chunks,
            config.server,
        )

        async def process_files():
            for file_path in files:
                try:
                    state = AgentState(
                        files={file_path.as_posix(): file_path.read_bytes()},
                        max_visits=config.server.max_visits,
                        max_chunks=head_chunks,
                        skip_ontology_development=config.server.skip_ontology_development,
                        skip_facts_rendering=config.server.skip_facts_rendering,
                        dataset=config.tool_config.fuseki.dataset,
                    )
                    async for _ in workflow.astream(
                        state,
                        stream_mode="values",
                        config=RunnableConfig(recursion_limit=recursion_limit),
                    ):
                        pass

                except Exception as e:
                    logger.error(f"Error processing {file_path}: {str(e)}")

        asyncio.run(process_files())
    else:
        app = create_app(
            tools=tools,
            server_config=config.server,
            head_chunks=head_chunks,
        )
        logger.info(f"Starting Ontocast server on port {config.server.port}")
        app.start(port=config.server.port)