Join and concatenate embeddings from multiple parquet sources (same KB export schema).
Mention-level fusion joins on (pmid, entity, mention) and concatenates embed vectors
in source order. Entity-level fusion averages mentions within each file per entity,
takes the intersection of entities across sources, and concatenates those means.
Linker.predict fuses mention tensors across sources in metadata order; when
model_type differs between sources, span alignment is not guaranteed—use the same
backbone for all sources if order-sensitive fusion is required.
concat_mention_level_embedding_sources(paths, *, batch_size, n_embedding_batches=None, read_status=None, show_read_progress=False)
Load one or more mention-level parquet sources like estimate_model_clustering:
read each path in batches, optionally inner-join across sources, return one frame
(no frac sampling).
Source code in pelinker/embedding_fusion.py
| def concat_mention_level_embedding_sources(
paths: Sequence[pathlib.Path],
*,
batch_size: int,
n_embedding_batches: int | None = None,
read_status: Callable[[str], None] | None = None,
show_read_progress: bool = False,
) -> pd.DataFrame | None:
"""
Load one or more mention-level parquet sources like ``estimate_model_clustering``:
read each path in batches, optionally inner-join across sources, return one frame
(no ``frac`` sampling).
"""
if len(paths) == 0:
return None
parts: list[pd.DataFrame] = []
n_paths = len(paths)
for pi, p in enumerate(paths):
prefix = f"[{pi + 1}/{n_paths}] " if n_paths > 1 else ""
def path_read(msg: str, _pfx: str = prefix) -> None:
if read_status is not None:
read_status(_pfx + msg)
part = read_embedding_parquet_batches_concat(
p,
batch_size=batch_size,
n_embedding_batches=n_embedding_batches,
read_status=path_read if read_status is not None else None,
show_read_progress=show_read_progress and read_status is None,
)
if part is None or len(part) == 0:
return None
parts.append(part)
if len(parts) == 1:
frame = parts[0]
else:
try:
frame = mention_level_concat_frames(parts)
except Exception:
return None
if len(frame) == 0:
return None
return frame
|
dedupe_mean_embed_by_keys(df, *, keys=JOIN_KEYS)
Collapse duplicate rows sharing the same join keys by averaging embed vectors.
Source code in pelinker/embedding_fusion.py
| def dedupe_mean_embed_by_keys(
df: pd.DataFrame, *, keys: Sequence[str] = JOIN_KEYS
) -> pd.DataFrame:
"""
Collapse duplicate rows sharing the same join keys by averaging embed vectors.
"""
missing = [c for c in keys if c not in df.columns]
if missing:
raise ValueError(f"DataFrame missing columns: {missing}")
def _mean_group(embed_series: pd.Series) -> np.ndarray:
vecs = np.stack([_to_embed_array(x) for x in embed_series.values])
return np.mean(vecs, axis=0)
grouped = (
df.groupby(list(keys), sort=False)
.agg(embed=("embed", _mean_group))
.reset_index()
)
return grouped
|
fused_property_vectors_from_paths(paths, kb_labels, *, batch_size=1000, n_embedding_batches=None, read_status=None, show_read_progress=False)
Per-source per-entity means, then concatenate vectors for entities in the intersection.
Source order matches paths order (must align with EmbeddingModelMetadata.sources).
Parquet inputs are read in batches (same mechanism as clustering analysis on loaded frames).
Source code in pelinker/embedding_fusion.py
| def fused_property_vectors_from_paths(
paths: Sequence[pathlib.Path],
kb_labels: set[str] | None,
*,
batch_size: int = 1000,
n_embedding_batches: int | None = None,
read_status: Callable[[str], None] | None = None,
show_read_progress: bool = False,
) -> dict[str, np.ndarray]:
"""
Per-source per-entity means, then concatenate vectors for entities in the intersection.
Source order matches ``paths`` order (must align with ``EmbeddingModelMetadata.sources``).
Parquet inputs are read in batches (same mechanism as clustering analysis on loaded frames).
"""
if len(paths) == 0:
raise ValueError("paths must be non-empty")
n_paths = len(paths)
per_source: list[dict[str, np.ndarray]] = []
for pi, p in enumerate(paths):
prefix = f"[{pi + 1}/{n_paths}] " if n_paths > 1 else ""
def path_read(msg: str, _pfx: str = prefix) -> None:
if read_status is not None:
read_status(_pfx + msg)
per_source.append(
property_mean_vectors_from_parquet_batches(
p,
kb_labels,
batch_size=batch_size,
n_embedding_batches=n_embedding_batches,
read_status=path_read if read_status is not None else None,
show_read_progress=show_read_progress and read_status is None,
)
)
common = set(per_source[0].keys())
for d in per_source[1:]:
common &= set(d.keys())
if not common:
return {}
fused: dict[str, np.ndarray] = {}
for entity in sorted(common):
parts = [per_source[i][entity] for i in range(len(paths))]
fused[entity] = np.concatenate(parts, axis=0)
return fused
|
mean_embedding_per_property(df, kb_labels, *, embed_column='embed', entity_column='entity')
Average all mention embeddings per entity label (same logic as Linker single-file load).
Source code in pelinker/embedding_fusion.py
| def mean_embedding_per_property(
df: pd.DataFrame,
kb_labels: set[str] | None,
*,
embed_column: str = "embed",
entity_column: str = "entity",
) -> dict[str, np.ndarray]:
"""
Average all mention embeddings per entity label (same logic as Linker single-file load).
"""
df = _normalize_entity_column(df)
if kb_labels is not None:
work = df[df[entity_column].isin(kb_labels)].copy()
else:
work = df.copy()
if len(work) == 0:
return {}
work["_emb_arr"] = work[embed_column].apply(
lambda x: np.asarray(x, dtype=np.float64)
)
out: dict[str, np.ndarray] = {}
for entity_label, group in work.groupby(entity_column):
stacked = np.stack(group["_emb_arr"].tolist())
out[str(entity_label)] = np.mean(stacked, axis=0)
return out
|
mention_level_concat_frames(dfs)
Inner-join mention rows across sources and set embed to concatenated vectors.
Duplicate keys within a single source are averaged before the join.
Concatenation order matches dfs index order (must match metadata.sources).
Source code in pelinker/embedding_fusion.py
| def mention_level_concat_frames(dfs: Sequence[pd.DataFrame]) -> pd.DataFrame:
"""
Inner-join mention rows across sources and set ``embed`` to concatenated vectors.
Duplicate keys within a single source are averaged before the join.
Concatenation order matches ``dfs`` index order (must match metadata.sources).
"""
if len(dfs) == 0:
raise ValueError("mention_level_concat_frames requires at least one frame")
prepared: list[pd.DataFrame] = []
for i, df in enumerate(dfs):
df = _normalize_entity_column(df)
cols = set(df.columns)
if not REQUIRED_MENTION_COLUMNS.issubset(cols):
missing = REQUIRED_MENTION_COLUMNS - cols
raise ValueError(f"Frame {i} missing columns: {sorted(missing)}")
sub = df[list(JOIN_KEYS) + ["embed"]].copy()
sub = dedupe_mean_embed_by_keys(sub)
sub = sub.rename(columns={"embed": f"_e{i}"})
prepared.append(sub)
out = prepared[0]
for i in range(1, len(prepared)):
out = out.merge(prepared[i], on=list(JOIN_KEYS), how="inner")
if len(out) == 0:
break
emb_cols = [f"_e{i}" for i in range(len(dfs))]
if len(out) == 0:
return pd.DataFrame(columns=list(JOIN_KEYS) + ["embed"])
if len(dfs) == 1:
return out.rename(columns={"_e0": "embed"})
def _concat_row(row: pd.Series) -> np.ndarray:
parts = [np.asarray(row[c], dtype=np.float32) for c in emb_cols]
return np.concatenate(parts, axis=0)
out = out.copy()
out["embed"] = [_concat_row(out.iloc[i]) for i in range(len(out))]
out = out.drop(columns=emb_cols)
return out
|
property_fused_dataframe_for_linker_order(fused_vectors, labels_map)
Rows sorted like Linker fused load: sorted entity labels that resolve to an
entity_id in labels_map. Columns: entity_id, entity, embed.
Source code in pelinker/embedding_fusion.py
| def property_fused_dataframe_for_linker_order(
fused_vectors: Mapping[str, np.ndarray],
labels_map: Mapping[str, str],
) -> pd.DataFrame:
"""
Rows sorted like Linker fused load: sorted entity labels that resolve to an
``entity_id`` in ``labels_map``. Columns: ``entity_id``, ``entity``, ``embed``.
"""
rows: list[dict[str, object]] = []
for entity_label in sorted(fused_vectors.keys()):
entity_id = None
for eid, label in labels_map.items():
if label == entity_label:
entity_id = eid
break
if entity_id is None:
logger.warning(
"Entity label '%s' not found in labels_map, skipping", entity_label
)
continue
vec = fused_vectors[entity_label]
rows.append(
{
"entity_id": entity_id,
"entity": entity_label,
"embed": vec.astype(np.float32, copy=False),
}
)
if not rows:
return pd.DataFrame(columns=["entity_id", "entity", "embed"])
return pd.DataFrame(rows)
|
property_mean_vectors_from_parquet_batches(path, kb_labels, *, batch_size, n_embedding_batches=None, read_status=None, show_read_progress=False)
Mean embedding per entity by streaming parquet batches (no full-file read).
Source code in pelinker/embedding_fusion.py
| def property_mean_vectors_from_parquet_batches(
path: pathlib.Path,
kb_labels: set[str] | None,
*,
batch_size: int,
n_embedding_batches: int | None = None,
read_status: Callable[[str], None] | None = None,
show_read_progress: bool = False,
) -> dict[str, np.ndarray]:
"""Mean embedding per entity by streaming parquet batches (no full-file read)."""
sums: dict[str, np.ndarray] = {}
counts: dict[str, int] = {}
def on_batch(_i: int, batch: pd.DataFrame) -> None:
_accumulate_property_means_from_frame(sums, counts, batch, kb_labels)
_for_each_embedding_parquet_batch(
path,
batch_size,
n_embedding_batches,
read_status=read_status,
show_read_progress=show_read_progress,
on_batch=on_batch,
)
return {k: sums[k] / counts[k] for k in sums}
|
read_embedding_parquet_batches_concat(path, *, batch_size, n_embedding_batches=None, read_status=None, show_read_progress=False)
Stream mention-level embedding parquet via read_batches and concatenate batches.
Same batching contract as ClusteringOptimizationConfig (batch_size rows per
batch; optional cap on batch count).
If read_status is set, it receives short status lines (for combining with an
outer Rich progress bar). If it is omitted and show_read_progress is True, a
compact transient progress display is used for this read only.
Source code in pelinker/embedding_fusion.py
| def read_embedding_parquet_batches_concat(
path: pathlib.Path,
*,
batch_size: int,
n_embedding_batches: int | None = None,
read_status: Callable[[str], None] | None = None,
show_read_progress: bool = False,
) -> pd.DataFrame | None:
"""
Stream mention-level embedding parquet via ``read_batches`` and concatenate batches.
Same batching contract as ``ClusteringOptimizationConfig`` (``batch_size`` rows per
batch; optional cap on batch count).
If ``read_status`` is set, it receives short status lines (for combining with an
outer Rich progress bar). If it is omitted and ``show_read_progress`` is True, a
compact transient progress display is used for this read only.
"""
if not path.exists():
return None
agg: list[pd.DataFrame] = []
try:
def on_batch(_i: int, batch: pd.DataFrame) -> None:
agg.append(batch)
_for_each_embedding_parquet_batch(
path,
batch_size,
n_embedding_batches,
read_status=read_status,
show_read_progress=show_read_progress,
on_batch=on_batch,
)
except Exception:
return None
if not agg:
return None
return pd.concat(agg, ignore_index=True)
|