Skip to content

pelinker.clustering_grid

HDBSCAN min_cluster_size grid evaluation, cross-sample aggregation, and smooth optimum selection.

AggregatedGridPoint dataclass

One grid value of min_cluster_size with aggregated metrics across samples.

Source code in pelinker/clustering_grid.py
@dataclass(frozen=True)
class AggregatedGridPoint:
    """One grid value of ``min_cluster_size`` with aggregated metrics across samples."""

    min_cluster_size: int
    dbcv: ScalarMetricAggregate
    icm_mean: float
    n_clusters_mean: float
    ari: ScalarMetricAggregate

AggregatedGridReport dataclass

Typed aggregation of per-sample grid metrics; points are sorted by min_cluster_size.

Source code in pelinker/clustering_grid.py
@dataclass(frozen=True)
class AggregatedGridReport:
    """Typed aggregation of per-sample grid metrics; points are sorted by ``min_cluster_size``."""

    points: tuple[AggregatedGridPoint, ...]

    def __post_init__(self) -> None:
        sizes = [p.min_cluster_size for p in self.points]
        if sizes != sorted(sizes):
            raise ValueError(
                "AggregatedGridReport.points must be sorted by min_cluster_size"
            )
        if len(set(sizes)) != len(sizes):
            raise ValueError(
                "Duplicate min_cluster_size in AggregatedGridReport.points"
            )

ScalarMetricAggregate dataclass

Mean, dispersion, and sample count for one metric at a single grid point.

Source code in pelinker/clustering_grid.py
@dataclass(frozen=True)
class ScalarMetricAggregate:
    """Mean, dispersion, and sample count for one metric at a single grid point."""

    mean: float
    std: float
    count: int

SmoothedGridOptimumResult dataclass

Diagnostics for solve_optimal_min_cluster_size_from_aggregated.

score_mean_at_chosen / score_std_at_chosen refer to the raw objective (before smoothing) at the chosen grid point.

Source code in pelinker/clustering_grid.py
@dataclass(frozen=True)
class SmoothedGridOptimumResult:
    """Diagnostics for ``solve_optimal_min_cluster_size_from_aggregated``.

    ``score_mean_at_chosen`` / ``score_std_at_chosen`` refer to the raw objective (before
    smoothing) at the chosen grid point.
    """

    chosen_min_cluster_size: int
    score_mean_at_chosen: float
    score_std_at_chosen: float
    x: tuple[float, ...]
    y_objective: tuple[float, ...]
    y_smooth: tuple[float, ...]
    dy_dx: tuple[float, ...]
    d2y_dx2: tuple[float, ...]
    selection: Literal["plateau_derivative", "smoothed_argmax"]

aggregate_grid_metrics(all_metrics_dfs)

Aggregate grid evaluation metrics across multiple samples into a typed report.

Per min_cluster_size we keep DBCV mean, std, and count (so uncertainty is not discarded). ICM and cluster count are aggregated as means for diagnostics.

Source code in pelinker/clustering_grid.py
def aggregate_grid_metrics(all_metrics_dfs: list[pd.DataFrame]) -> AggregatedGridReport:
    """
    Aggregate grid evaluation metrics across multiple samples into a typed report.

    Per ``min_cluster_size`` we keep DBCV mean, std, and count (so uncertainty is not
    discarded). ICM and cluster count are aggregated as means for diagnostics.
    """
    if not all_metrics_dfs:
        return AggregatedGridReport(points=())

    combined = pd.concat(all_metrics_dfs, ignore_index=True)
    if "ari" not in combined.columns:
        combined["ari"] = np.nan

    aggregated = (
        combined.groupby("min_cluster_size")
        .agg(
            {
                "dbcv": ["mean", "std", "count"],
                "icm": "mean",
                "n_clusters": "mean",
                "ari": ["mean", "std", "count"],
            }
        )
        .reset_index()
    )

    aggregated.columns = [
        "min_cluster_size",
        "dbcv_mean",
        "dbcv_std",
        "dbcv_count",
        "icm_mean",
        "n_clusters_mean",
        "ari_mean",
        "ari_std",
        "ari_count",
    ]

    aggregated["dbcv_std"] = aggregated["dbcv_std"].fillna(0.0)
    aggregated["dbcv_count"] = aggregated["dbcv_count"].astype(int)
    aggregated["ari_std"] = aggregated["ari_std"].fillna(0.0)
    aggregated["ari_count"] = aggregated["ari_count"].astype(int)

    points: list[AggregatedGridPoint] = []
    for _, row in aggregated.sort_values("min_cluster_size").iterrows():
        mcs = int(row["min_cluster_size"])
        points.append(
            AggregatedGridPoint(
                min_cluster_size=mcs,
                dbcv=ScalarMetricAggregate(
                    mean=float(row["dbcv_mean"]),
                    std=float(row["dbcv_std"]),
                    count=int(row["dbcv_count"]),
                ),
                icm_mean=float(row["icm_mean"]),
                n_clusters_mean=float(row["n_clusters_mean"]),
                ari=ScalarMetricAggregate(
                    mean=float(row["ari_mean"]),
                    std=float(row["ari_std"]),
                    count=int(row["ari_count"]),
                ),
            )
        )

    return AggregatedGridReport(points=tuple(points))

aggregated_grid_report_to_dataframe(report)

Lossless round-trip style export for notebooks (typed report → table).

Source code in pelinker/clustering_grid.py
def aggregated_grid_report_to_dataframe(report: AggregatedGridReport) -> pd.DataFrame:
    """Lossless round-trip style export for notebooks (typed report → table)."""
    rows: list[dict[str, float | int]] = []
    for p in report.points:
        rows.append(
            {
                "min_cluster_size": p.min_cluster_size,
                "dbcv_mean": p.dbcv.mean,
                "dbcv_std": p.dbcv.std,
                "dbcv_count": p.dbcv.count,
                "icm_mean": p.icm_mean,
                "n_clusters_mean": p.n_clusters_mean,
                "ari_mean": p.ari.mean,
                "ari_std": p.ari.std,
                "ari_count": p.ari.count,
            }
        )
    return pd.DataFrame(rows)

cosine_similarity_std(tensor, max_pairs=200000, random_seed=13)

Calculate the standard deviation of pairwise cosine similarities for a tensor of shape (n_b, dim_emb).

Source code in pelinker/clustering_grid.py
def cosine_similarity_std(
    tensor: torch.Tensor, max_pairs: int = 200_000, random_seed: int = 13
) -> torch.Tensor:
    """
    Calculate the standard deviation of pairwise cosine similarities
    for a tensor of shape (n_b, dim_emb).
    """
    normalized = F.normalize(tensor.float(), p=2, dim=1)

    n_points = normalized.size(0)
    if n_points < 2:
        return torch.tensor(float("nan"), dtype=normalized.dtype)

    total_pairs = n_points * (n_points - 1) // 2

    if total_pairs <= max_pairs:
        cos_sim_matrix = torch.mm(normalized, normalized.t())
        triu_indices = torch.triu_indices(
            cos_sim_matrix.size(0), cos_sim_matrix.size(1), offset=1
        )
        cos_similarities = cos_sim_matrix[triu_indices[0], triu_indices[1]]
        return torch.std(cos_similarities)

    sample_size = min(max_pairs, total_pairs)
    generator = torch.Generator(device=normalized.device)
    generator.manual_seed(random_seed)

    idx_i = torch.randint(
        0, n_points, (sample_size,), generator=generator, device=normalized.device
    )
    idx_j = torch.randint(
        0, n_points - 1, (sample_size,), generator=generator, device=normalized.device
    )
    idx_j = idx_j + (idx_j >= idx_i).long()  # ensure i != j
    cos_similarities = (normalized[idx_i] * normalized[idx_j]).sum(dim=1)
    return torch.std(cos_similarities)

evaluate_cluster_size_grid(dfr2, umap_columns, sizes, max_pairs_per_cluster=200000)

Evaluate clustering metrics on a grid of min_cluster_size values.

Uses DBCV (Density-Based Clustering Validation) and, when entity is present, adjusted Rand index vs. entity codes (noise label -1 excluded).

Returns:

Type Description
DataFrame

DataFrame with columns: min_cluster_size, icm, n_clusters, dbcv, ari

Source code in pelinker/clustering_grid.py
def evaluate_cluster_size_grid(
    dfr2: pd.DataFrame,
    umap_columns: list[str],
    sizes: list[int],
    max_pairs_per_cluster: int = 200_000,
) -> pd.DataFrame:
    """
    Evaluate clustering metrics on a grid of min_cluster_size values.

    Uses DBCV (Density-Based Clustering Validation) and, when ``entity`` is present,
    adjusted Rand index vs. entity codes (noise label -1 excluded).

    Returns:
        DataFrame with columns: min_cluster_size, icm, n_clusters, dbcv, ari
    """
    umap_values = dfr2[umap_columns].to_numpy(dtype=np.float32, copy=False)
    entity_codes: np.ndarray | None = None
    if "entity" in dfr2.columns:
        entity_codes = (
            dfr2["entity"]
            .astype("category")
            .cat.codes.to_numpy(dtype=np.int64, copy=False)
        )

    metrics = []
    for size in sizes:
        clusterer = hdbscan.HDBSCAN(min_cluster_size=size, gen_min_span_tree=True)
        labels = clusterer.fit_predict(umap_values)

        ic = []
        for ix in np.unique(labels):
            if ix == -1:
                continue
            cluster_values = umap_values[labels == ix]
            if len(cluster_values) < 2:
                continue
            tgroup = torch.from_numpy(cluster_values)
            st = cosine_similarity_std(
                tgroup, max_pairs=max_pairs_per_cluster, random_seed=13
            )
            ic += [float(st)]

        icm = np.mean(ic) if ic else np.nan

        unique_labels = set(labels)
        n_clusters = len(unique_labels) - (1 if -1 in unique_labels else 0)

        if n_clusters >= 2:
            rv = getattr(clusterer, "relative_validity_", None)
            dbcv = float(rv) if rv is not None else float("nan")
        else:
            dbcv = np.nan

        if entity_codes is not None:
            ari = _adjusted_rand_index_vs_entity_codes(entity_codes, labels)
        else:
            ari = float("nan")

        if n_clusters >= 1:
            metrics += [(size, icm, n_clusters, dbcv, ari)]

    return pd.DataFrame(
        metrics, columns=["min_cluster_size", "icm", "n_clusters", "dbcv", "ari"]
    )

solve_optimal_min_cluster_size_from_aggregated(report, *, objective='dbcv', method='mean', uncertainty_penalty=1.0, smooth_window=3, plateau_fraction=0.92, derivative_rel_tol=0.12, precision_weighted_smooth=None)

Choose min_cluster_size from aggregated noisy grid scores.

Builds f(x) from objective (single metric or pooled DBCV+ARI), then optionally method (mean / lower_bound / weighted). Smooths f with a centered moving average, then prefers the leftmost x where the smoothed curve is near the top of its range (f ≥ y_min + plateau_fraction · (y_max - y_min) on the smoothed curve, with |df/dx| small). If none qualify, uses the smoothed argmax.

precision_weighted_smooth defaults to True for lower_bound and weighted, and False for mean.

Source code in pelinker/clustering_grid.py
def solve_optimal_min_cluster_size_from_aggregated(
    report: AggregatedGridReport,
    *,
    objective: GridObjectiveSpec = "dbcv",
    method: str = "mean",
    uncertainty_penalty: float = 1.0,
    smooth_window: int = 3,
    plateau_fraction: float = 0.92,
    derivative_rel_tol: float = 0.12,
    precision_weighted_smooth: bool | None = None,
) -> SmoothedGridOptimumResult:
    """
    Choose ``min_cluster_size`` from aggregated noisy grid scores.

    Builds f(x) from ``objective`` (single metric or pooled DBCV+ARI), then optionally
    ``method`` (mean / lower_bound / weighted). Smooths f with a centered moving average,
    then prefers the **leftmost** x where the smoothed curve is near the top of its **range**
    (f ≥ ``y_min + plateau_fraction · (y_max - y_min)`` on the smoothed curve, with
    ``|df/dx|`` small). If none qualify, uses the smoothed argmax.

    ``precision_weighted_smooth`` defaults to True for ``lower_bound`` and ``weighted``,
    and False for ``mean``.
    """
    if len(report.points) == 0:
        raise ValueError("No aggregated grid points provided")

    x = np.array([p.min_cluster_size for p in report.points], dtype=np.float64)
    base_means, stds, counts = _objective_mean_std_count(report, objective)
    y = _apply_optimization_method(base_means, stds, method, uncertainty_penalty)

    finite = np.isfinite(x) & np.isfinite(y)
    if not np.any(finite):
        raise ValueError("No finite objective values in aggregated grid report")

    x = x[finite]
    y = y[finite]
    base_means = base_means[finite]
    stds = stds[finite]
    counts = counts[finite]
    order = np.argsort(x)
    x = x[order]
    y = y[order]
    base_means = base_means[order]
    stds = stds[order]
    counts = counts[order]

    if precision_weighted_smooth is None:
        precision_weighted_smooth = method in ("lower_bound", "weighted")

    if precision_weighted_smooth:
        pweights = _precision_weights(stds, counts)
        y_s = _weighted_centered_moving_average(y, pweights, smooth_window)
    else:
        y_s = _uniform_centered_moving_average(y, smooth_window)

    dydx = np.gradient(y_s, x)
    d2ydx2 = np.gradient(dydx, x)

    smooth_finite = np.isfinite(y_s)
    if not np.any(smooth_finite):
        raise ValueError("Smoothed objective is non-finite")
    y_max = float(np.max(y_s[smooth_finite]))
    y_min = float(np.min(y_s[smooth_finite]))
    if not np.isfinite(y_max):
        raise ValueError("Smoothed objective is non-finite")

    abs_dydx = np.abs(dydx)
    scale = float(np.nanmax(abs_dydx))
    if not np.isfinite(scale) or scale <= 0:
        scale = 1.0
    thresh = derivative_rel_tol * scale
    level = y_min + plateau_fraction * (y_max - y_min)

    chosen_idx: int | None = None
    selection: Literal["plateau_derivative", "smoothed_argmax"] = "smoothed_argmax"
    for i in range(len(x)):
        if not np.isfinite(y_s[i]) or not np.isfinite(dydx[i]):
            continue
        if y_s[i] >= level and abs_dydx[i] <= thresh:
            chosen_idx = i
            selection = "plateau_derivative"
            break

    if chosen_idx is None:
        chosen_idx = int(np.nanargmax(y_s))

    chosen_x = int(x[chosen_idx])
    score_mean_at = float(base_means[chosen_idx])
    score_std_at = float(stds[chosen_idx])

    return SmoothedGridOptimumResult(
        chosen_min_cluster_size=chosen_x,
        score_mean_at_chosen=score_mean_at,
        score_std_at_chosen=score_std_at,
        x=tuple(float(v) for v in x),
        y_objective=tuple(float(v) for v in y),
        y_smooth=tuple(float(v) for v in y_s),
        dy_dx=tuple(float(v) for v in dydx),
        d2y_dx2=tuple(float(v) for v in d2ydx2),
        selection=selection,
    )