Embed a knowledge base corpus by processing text data and extracting mentions.
For len(metadata.sources) > 1, pass output_parquet_paths with one path per
source (same order as metadata.sources). For a single source, pass
output_parquet_path.
Source code in pelinker/embedder.py
| def embed_kb_corpus(
*,
metadata: EmbeddingModelMetadata,
training: EmbeddingTrainingConfig,
output_parquet_path: pathlib.Path | None = None,
output_parquet_paths: Sequence[pathlib.Path] | None = None,
) -> None:
"""
Embed a knowledge base corpus by processing text data and extracting mentions.
For ``len(metadata.sources) > 1``, pass ``output_parquet_paths`` with one path per
source (same order as ``metadata.sources``). For a single source, pass
``output_parquet_path``.
"""
k = len(metadata.sources)
if k == 1:
if output_parquet_path is None:
raise ValueError(
"output_parquet_path is required for a single embedding source"
)
ope = expand_config_path(output_parquet_path)
if ope is None:
raise ValueError("output_parquet_path resolved to None")
paths: list[pathlib.Path] = [pathlib.Path(ope)]
else:
if output_parquet_paths is None:
raise ValueError(
"output_parquet_paths is required when metadata has multiple sources "
f"({k}); provide one output path per source in metadata order."
)
paths = []
for p in output_parquet_paths:
ep = expand_config_path(p)
if ep is None:
raise ValueError("output_parquet_paths must not contain None")
paths.append(pathlib.Path(ep))
if len(paths) != k:
raise ValueError(
f"output_parquet_paths must have length {k} (metadata.sources), got {len(paths)}"
)
use_gpu = training.use_gpu
if use_gpu:
if torch.cuda.is_available():
logger.info("Using GPU in upcoming processes")
else:
logger.warning("CUDA is not available. Running on CPU instead")
use_gpu = False
for spec, out_path in zip(metadata.sources, paths):
_embed_corpus_single_source(
source=spec,
training=training,
output_parquet_path=out_path,
use_gpu=use_gpu,
)
logger.info("All %s embedding source(s) written.", k)
|