Skip to content

ontocast.cli.batch_process

Batch processing client for OntoCast.

This module provides a command-line client for batch processing multiple files through the OntoCast API server. It supports async processing with configurable concurrency limits.

The client supports: - Recursive directory scanning - File pattern matching (e.g., by extension) - Async processing with concurrency control - Progress tracking and error reporting - JSON and PDF file types

Example

Process all JSON files in a directory (max 3 concurrent)

python batch_process.py --url http://localhost:8999 --path ./data --pattern "*.json" --max-concurrent 3

Target tenant/project (optional; server defaults apply when omitted)

python batch_process.py --url http://localhost:8999 --path ./data --tenant acme --project demo

find_files(path, pattern, recursive)

Find files matching the given pattern.

Parameters:

Name Type Description Default
path Path

Base path to search

required
pattern Optional[str]

Glob pattern (e.g., ".json", ".pdf") or None for all files

required
recursive bool

Whether to search recursively

required

Returns:

Type Description
list[Path]

List of matching file paths

Source code in ontocast/cli/batch_process.py
def find_files(
    path: pathlib.Path, pattern: Optional[str], recursive: bool
) -> list[pathlib.Path]:
    """Find files matching the given pattern.

    Args:
        path: Base path to search
        pattern: Glob pattern (e.g., "*.json", "*.pdf") or None for all files
        recursive: Whether to search recursively

    Returns:
        List of matching file paths
    """
    if not path.exists():
        raise click.BadParameter(f"Path does not exist: {path}", param_hint="--path")

    if path.is_file():
        return [path]

    if pattern:
        if recursive:
            files = list(path.rglob(pattern))
        else:
            files = list(path.glob(pattern))
    else:
        if recursive:
            files = [f for f in path.rglob("*") if f.is_file()]
        else:
            files = [f for f in path.glob("*") if f.is_file()]

    # Filter to only JSON and PDF files
    supported_extensions = {".json", ".pdf"}
    files = [f for f in files if f.suffix.lower() in supported_extensions]

    return sorted(files)

main(url, path, pattern, recursive, max_concurrent, output, tenant, project)

Batch process files through the OntoCast API server.

This command finds files matching the given pattern (or all supported files) and sends them to the OntoCast API server for processing. Files are processed asynchronously with a configurable concurrency limit.

Supported file types: .json, .pdf

Examples:

Process all JSON files in a directory

batch_process.py --url http://localhost:8999 --path ./data --pattern "*.json"

Process all PDFs recursively with 5 concurrent requests

batch_process.py --url http://localhost:8999 --path ./documents --pattern "*.pdf" --recursive --max-concurrent 5

Process a single file

batch_process.py --url http://localhost:8999 --path ./document.pdf

Source code in ontocast/cli/batch_process.py
@click.command()
@click.option(
    "--url",
    required=True,
    help="Base URL for the server (e.g. http://localhost:8999)",
)
@click.option(
    "--path",
    type=click.Path(path_type=pathlib.Path, exists=True),
    required=True,
    help="Path to file or directory to process",
)
@click.option(
    "--pattern",
    type=str,
    default=None,
    help="Glob pattern to match files (e.g., '*.json', '*.pdf'). If not provided, processes all supported files.",
)
@click.option(
    "--recursive",
    is_flag=True,
    default=True,
    help="Search for files recursively in subdirectories",
)
@click.option(
    "--max-concurrent",
    type=int,
    default=3,
    help="Maximum number of concurrent requests (default: 3)",
)
@click.option(
    "--output",
    type=click.Path(path_type=pathlib.Path),
    default=None,
    help="Optional path to save results summary as JSON",
)
@click.option(
    "--tenant",
    type=str,
    default=None,
    help="Tenant id sent to /process (optional).",
)
@click.option(
    "--project",
    type=str,
    default=None,
    help="Project id sent to /process (optional).",
)
def main(
    url: str,
    path: pathlib.Path,
    pattern: Optional[str],
    recursive: bool,
    max_concurrent: int,
    output: Optional[pathlib.Path],
    tenant: Optional[str],
    project: Optional[str],
):
    """Batch process files through the OntoCast API server.

    This command finds files matching the given pattern (or all supported files)
    and sends them to the OntoCast API server for processing. Files are processed
    asynchronously with a configurable concurrency limit.

    Supported file types: .json, .pdf

    Examples:
        # Process all JSON files in a directory
        batch_process.py --url http://localhost:8999 --path ./data --pattern "*.json"

        # Process all PDFs recursively with 5 concurrent requests
        batch_process.py --url http://localhost:8999 --path ./documents --pattern "*.pdf" --recursive --max-concurrent 5

        # Process a single file
        batch_process.py --url http://localhost:8999 --path ./document.pdf
    """
    if not url.endswith("/process"):
        url = f"{url.rstrip('/')}/process"

    if max_concurrent < 1:
        raise click.BadParameter(
            "max-concurrent must be at least 1", param_hint="--max-concurrent"
        )

    # Expand user path
    path = path.expanduser()

    # Find files
    try:
        file_paths = find_files(path, pattern, recursive)
    except Exception as e:
        raise click.ClickException(f"Error finding files: {e}")

    if not file_paths:
        click.echo(f"No files found matching pattern '{pattern or '*.*'}' in {path}")
        return

    click.echo(f"Found {len(file_paths)} file(s) to process")
    if pattern:
        click.echo(f"Pattern: {pattern}")
    if recursive:
        click.echo("Recursive search: enabled")
    if tenant is not None or project is not None:
        click.echo(f"Tenancy: tenant={tenant!r} project={project!r}")
    click.echo(f"Max concurrent requests: {max_concurrent}")
    click.echo("")

    # Process files
    results = asyncio.run(
        process_files_async(url, file_paths, max_concurrent, tenant, project)
    )

    # Print summary
    click.echo("")
    click.echo("=" * 60)
    click.echo("Processing Summary")
    click.echo("=" * 60)
    click.echo(f"Total files: {results['total']}")
    click.echo(f"Successful: {results['success']}")
    click.echo(f"Errors: {results['errors']}")

    if results["error_details"]:
        click.echo("")
        click.echo("Error Details:")
        for filename, details in results["error_details"].items():
            click.echo(f"  {filename}: {details['error']}")

    # Save results if output path provided
    if output:
        output = output.expanduser()
        output.parent.mkdir(parents=True, exist_ok=True)
        with open(output, "w") as f:
            json.dump(results, f, indent=2)
        click.echo("")
        click.echo(f"Results saved to: {output}")

process_file(client, url, file_path, semaphore, results, tenant=None, project=None) async

Process a single file by sending it to the OntoCast API.

Parameters:

Name Type Description Default
client AsyncClient

httpx async client

required
url str

API endpoint URL

required
file_path Path

Path to the file to process

required
semaphore Semaphore

Semaphore to limit concurrent requests

required
results dict

Dictionary to store results (success/error counts)

required
tenant Optional[str]

Optional tenant id (passed as query parameter when set)

None
project Optional[str]

Optional project id (passed as query parameter when set)

None
Source code in ontocast/cli/batch_process.py
async def process_file(
    client: httpx.AsyncClient,
    url: str,
    file_path: pathlib.Path,
    semaphore: asyncio.Semaphore,
    results: dict,
    tenant: Optional[str] = None,
    project: Optional[str] = None,
) -> None:
    """Process a single file by sending it to the OntoCast API.

    Args:
        client: httpx async client
        url: API endpoint URL
        file_path: Path to the file to process
        semaphore: Semaphore to limit concurrent requests
        results: Dictionary to store results (success/error counts)
        tenant: Optional tenant id (passed as query parameter when set)
        project: Optional project id (passed as query parameter when set)
    """
    async with semaphore:
        try:
            file_ext = file_path.suffix.lower()
            mime_type = "application/pdf" if file_ext == ".pdf" else "application/json"

            with open(file_path, "rb") as f:
                file_content = f.read()

            files = {"file": (file_path.name, file_content, mime_type)}

            params: dict[str, str] = {}
            if tenant:
                params["tenant"] = tenant
            if project:
                params["project"] = project

            response = await client.post(url, files=files, params=params)
            status = response.status_code

            if status == 200:
                results["success"] += 1
                click.echo(f"✓ {file_path.name} - Success")
            else:
                error_text = (
                    response.text[:200] if response.text else "No error message"
                )
                results["errors"] += 1
                results["error_details"][file_path.name] = {
                    "status": status,
                    "error": error_text,
                }
                click.echo(f"✗ {file_path.name} - Error {status}")

        except Exception as e:
            results["errors"] += 1
            results["error_details"][file_path.name] = {
                "status": None,
                "error": str(e)[:200],
            }
            click.echo(f"✗ {file_path.name} - Exception: {str(e)[:100]}")

process_files_async(url, file_paths, max_concurrent, tenant=None, project=None) async

Process multiple files asynchronously with concurrency control.

Parameters:

Name Type Description Default
url str

API endpoint URL

required
file_paths list[Path]

List of file paths to process

required
max_concurrent int

Maximum number of concurrent requests

required
tenant Optional[str]

Optional tenant query parameter

None
project Optional[str]

Optional project query parameter

None

Returns:

Type Description
dict

Dictionary with processing results (success count, error count, details)

Source code in ontocast/cli/batch_process.py
async def process_files_async(
    url: str,
    file_paths: list[pathlib.Path],
    max_concurrent: int,
    tenant: Optional[str] = None,
    project: Optional[str] = None,
) -> dict:
    """Process multiple files asynchronously with concurrency control.

    Args:
        url: API endpoint URL
        file_paths: List of file paths to process
        max_concurrent: Maximum number of concurrent requests
        tenant: Optional tenant query parameter
        project: Optional project query parameter

    Returns:
        Dictionary with processing results (success count, error count, details)
    """
    results = {
        "success": 0,
        "errors": 0,
        "error_details": {},
        "total": len(file_paths),
    }

    if not file_paths:
        click.echo("No files found to process.")
        return results

    semaphore = asyncio.Semaphore(max_concurrent)
    click.echo(
        f"Processing {len(file_paths)} file(s) with max {max_concurrent} concurrent requests..."
    )
    if tenant is not None or project is not None:
        click.echo(f"Using tenancy: tenant={tenant!r} project={project!r}")

    async with httpx.AsyncClient(timeout=300.0) as client:
        tasks = [
            process_file(client, url, file_path, semaphore, results, tenant, project)
            for file_path in file_paths
        ]
        await asyncio.gather(*tasks)

    return results