OntoCast API server implementation.
This module provides a web server implementation for the OntoCast framework
using Robyn. It exposes REST API endpoints for processing documents and
extracting semantic triples with ontology assistance.
The server supports:
- Health check endpoint (/health)
- Service information endpoint (/info)
- Document processing endpoint (/process)
- Triple store flush endpoint (/flush)
- Multiple input formats (JSON, multipart/form-data)
- Streaming workflow execution
- Comprehensive error handling and logging
The server integrates with the OntoCast workflow graph to process documents
through the complete pipeline: chunking, ontology selection, fact extraction,
and aggregation.
Example
With Fuseki backend (auto-detected from FUSEKI_URI and FUSEKI_AUTH)
ontocast --env-path .env
Process specific file
ontocast --env-path .env --input-path ./document.pdf
Process with chunk limit
ontocast --env-path .env --head-chunks 5
calculate_recursion_limit(head_chunks, server_config)
Calculate the recursion limit based on max_visits and head_chunks.
Parameters:
| Name |
Type |
Description |
Default |
head_chunks
|
int | None
|
Optional maximum number of chunks to process
|
required
|
Returns:
| Name | Type |
Description |
int |
int
|
Calculated recursion limit
|
Source code in ontocast/cli/serve.py
| def calculate_recursion_limit(
head_chunks: int | None,
server_config: ServerConfig,
) -> int:
"""Calculate the recursion limit based on max_visits and head_chunks.
Args:
head_chunks: Optional maximum number of chunks to process
Returns:
int: Calculated recursion limit
"""
if head_chunks is not None:
# If we know the number of chunks, calculate exact limit
return max(
server_config.base_recursion_limit,
server_config.max_visits * head_chunks * 10,
)
else:
# If we don't know chunks, use a conservative estimate
return max(
server_config.base_recursion_limit,
server_config.max_visits * server_config.estimated_chunks * 10,
)
|
run(env_file, input_path, head_chunks)
Main entry point for the OntoCast server/CLI.
Backend selection is automatically inferred from available configuration:
- Fuseki: If FUSEKI_URI and FUSEKI_AUTH are provided (preferred)
- Neo4j: If NEO4J_URI and NEO4J_AUTH are provided (fallback)
- Filesystem Triple Store: If ONTOCAST_WORKING_DIRECTORY and ONTOCAST_ONTOLOGY_DIRECTORY are provided
- Filesystem Manager: If ONTOCAST_WORKING_DIRECTORY is provided (can be combined with other backends)
No explicit backend configuration flags are needed - backends are automatically detected.
Source code in ontocast/cli/serve.py
| @click.command()
@click.option(
"--env-file",
type=click.Path(path_type=pathlib.Path),
required=True,
default=".env",
help="Path to .env file containing backend and configuration settings",
)
@click.option("--input-path", type=click.Path(path_type=pathlib.Path), default=None)
@click.option("--head-chunks", type=int, default=None)
def run(
env_file: pathlib.Path,
input_path: pathlib.Path | None,
head_chunks: int | None,
):
"""
Main entry point for the OntoCast server/CLI.
Backend selection is automatically inferred from available configuration:
- Fuseki: If FUSEKI_URI and FUSEKI_AUTH are provided (preferred)
- Neo4j: If NEO4J_URI and NEO4J_AUTH are provided (fallback)
- Filesystem Triple Store: If ONTOCAST_WORKING_DIRECTORY and ONTOCAST_ONTOLOGY_DIRECTORY are provided
- Filesystem Manager: If ONTOCAST_WORKING_DIRECTORY is provided (can be combined with other backends)
No explicit backend configuration flags are needed - backends are automatically detected.
"""
_ = load_dotenv(dotenv_path=env_file.expanduser())
# Global configuration instance
config = Config()
# Validate LLM configuration
config.validate_llm_config()
if config.logging_level is not None:
try:
logger_conf = f"logging.{config.logging_level}.conf"
logging.config.fileConfig(logger_conf, disable_existing_loggers=False)
logger.debug("debug is on")
except Exception as e:
logger.error(f"could set logging level correctly {e}")
if config.tool_config.path_config.working_directory is not None:
config.tool_config.path_config.working_directory = pathlib.Path(
config.tool_config.path_config.working_directory
).expanduser()
config.tool_config.path_config.working_directory.mkdir(
parents=True, exist_ok=True
)
else:
raise ValueError(
"Working directory must be provided via CLI argument or WORKING_DIRECTORY config"
)
if config.tool_config.path_config.ontology_directory is not None:
config.tool_config.path_config.ontology_directory = pathlib.Path(
config.tool_config.path_config.ontology_directory
).expanduser()
# Create ToolBox with config
tools: ToolBox = ToolBox(config)
asyncio.run(tools.initialize())
workflow: CompiledStateGraph = create_agent_graph(tools)
if input_path:
input_path = input_path.expanduser()
files = sorted(
crawl_directories(
input_path,
suffixes=tuple([".json"] + list(tools.converter.supported_extensions)),
)
)
recursion_limit = calculate_recursion_limit(
head_chunks,
config.server,
)
async def process_files():
for file_path in files:
try:
state = AgentState(
files={file_path.as_posix(): file_path.read_bytes()},
max_visits=config.server.max_visits,
max_chunks=head_chunks,
skip_ontology_development=config.server.skip_ontology_development,
skip_facts_rendering=config.server.skip_facts_rendering,
dataset=config.tool_config.fuseki.dataset,
)
async for _ in workflow.astream(
state,
stream_mode="values",
config=RunnableConfig(recursion_limit=recursion_limit),
):
pass
except Exception as e:
logger.error(f"Error processing {file_path}: {str(e)}")
asyncio.run(process_files())
else:
app = create_app(
tools=tools,
server_config=config.server,
head_chunks=head_chunks,
)
logger.info(f"Starting Ontocast server on port {config.server.port}")
app.start(port=config.server.port)
|