Skip to content

pelinker.embedder

embed_kb_corpus(*, metadata, training, output_parquet_path=None, output_parquet_paths=None)

Embed a knowledge base corpus by processing text data and extracting mentions.

For len(metadata.sources) > 1, pass output_parquet_paths with one path per source (same order as metadata.sources). For a single source, pass output_parquet_path.

Source code in pelinker/embedder.py
def embed_kb_corpus(
    *,
    metadata: EmbeddingModelMetadata,
    training: EmbeddingTrainingConfig,
    output_parquet_path: pathlib.Path | None = None,
    output_parquet_paths: Sequence[pathlib.Path] | None = None,
) -> None:
    """
    Embed a knowledge base corpus by processing text data and extracting mentions.

    For ``len(metadata.sources) > 1``, pass ``output_parquet_paths`` with one path per
    source (same order as ``metadata.sources``). For a single source, pass
    ``output_parquet_path``.
    """
    k = len(metadata.sources)
    if k == 1:
        if output_parquet_path is None:
            raise ValueError(
                "output_parquet_path is required for a single embedding source"
            )
        ope = expand_config_path(output_parquet_path)
        if ope is None:
            raise ValueError("output_parquet_path resolved to None")
        paths: list[pathlib.Path] = [pathlib.Path(ope)]
    else:
        if output_parquet_paths is None:
            raise ValueError(
                "output_parquet_paths is required when metadata has multiple sources "
                f"({k}); provide one output path per source in metadata order."
            )
        paths = []
        for p in output_parquet_paths:
            ep = expand_config_path(p)
            if ep is None:
                raise ValueError("output_parquet_paths must not contain None")
            paths.append(pathlib.Path(ep))
        if len(paths) != k:
            raise ValueError(
                f"output_parquet_paths must have length {k} (metadata.sources), got {len(paths)}"
            )

    use_gpu = training.use_gpu
    if use_gpu:
        if torch.cuda.is_available():
            logger.info("Using GPU in upcoming processes")
        else:
            logger.warning("CUDA is not available. Running on CPU instead")
            use_gpu = False

    for spec, out_path in zip(metadata.sources, paths):
        _embed_corpus_single_source(
            source=spec,
            training=training,
            output_parquet_path=out_path,
            use_gpu=use_gpu,
        )

    logger.info("All %s embedding source(s) written.", k)