| |
| """ |
| 模式多样性/“有多少模式簇”的快速估计(不改原始代码)。 |
| |
| 流程: |
| 1) 从 npz 的 arr_0 顺序流式读取,使用 reservoir sampling 抽样 M 张图像(默认 2000) |
| 2) 用 samples/evaluator.py 的 Evaluator 计算 Inception pool_3 特征(2048维) |
| 3) 输出: |
| - cosine 距离的 pairwise 统计(mean/median/分位数) |
| - 在归一化特征上跑 k-means(候选 k 列表),打印 inertia 与“elbow”建议的簇数 |
| |
| 注意: |
| - 这不是严格的“真实模式数”,而是基于特征距离/聚类启发式的估计。 |
| - 抽样过程需要扫完整个 npz(一次读流),但 Inception 计算只对 M 张做,通常更省。 |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import os |
| from typing import Iterable, Tuple |
|
|
| import numpy as np |
| import tensorflow.compat.v1 as tf |
|
|
| from evaluator import Evaluator, open_npz_array |
|
|
|
|
| def reservoir_sample_images(reader, m: int, seed: int = 0) -> np.ndarray: |
| """ |
| 从 reader 流式读取 arr_0,均匀抽样 m 张图像。 |
| |
| reader: evaluator.open_npz_array 返回的 NpzArrayReader(支持 read_batch) |
| 返回: (m, H, W, 3) uint8 |
| """ |
| rng = np.random.default_rng(seed) |
| reservoir = [] |
| seen = 0 |
|
|
| |
| read_bs = 256 |
| while True: |
| batch = reader.read_batch(read_bs) |
| if batch is None: |
| break |
|
|
| |
| bs = batch.shape[0] |
| for i in range(bs): |
| x = batch[i] |
| if len(reservoir) < m: |
| reservoir.append(x.copy()) |
| else: |
| |
| j = int(rng.integers(0, seen + 1)) |
| if j < m: |
| reservoir[j] = x.copy() |
| seen += 1 |
|
|
| if len(reservoir) != m: |
| raise ValueError(f"Reservoir size mismatch: expected {m}, got {len(reservoir)}") |
| return np.stack(reservoir, axis=0) |
|
|
|
|
| def iter_batches(x: np.ndarray, batch_size: int) -> Iterable[np.ndarray]: |
| n = x.shape[0] |
| for i in range(0, n, batch_size): |
| yield x[i : i + batch_size] |
|
|
|
|
| def cosine_pairwise_stats(feat: np.ndarray) -> dict: |
| """ |
| 在归一化后的特征上计算 cosine 距离 pairwise 分布(取上三角)。 |
| feat: (N,D) float32 |
| """ |
| f = feat / (np.linalg.norm(feat, axis=1, keepdims=True) + 1e-12) |
| |
| sim = f @ f.T |
| cos_dist = 1.0 - sim |
|
|
| n = cos_dist.shape[0] |
| iu = np.triu_indices(n, k=1) |
| vals = cos_dist[iu] |
|
|
| qs = np.quantile(vals, [0.01, 0.05, 0.1, 0.5, 0.9, 0.95, 0.99]) |
| return { |
| "n": int(n), |
| "mean": float(vals.mean()), |
| "median": float(np.median(vals)), |
| "q01": float(qs[0]), |
| "q05": float(qs[1]), |
| "q10": float(qs[2]), |
| "q50": float(qs[3]), |
| "q90": float(qs[4]), |
| "q95": float(qs[5]), |
| "q99": float(qs[6]), |
| } |
|
|
|
|
| def kmeans_euclidean(x: np.ndarray, k: int, iters: int = 10, seed: int = 0) -> Tuple[np.ndarray, float]: |
| """ |
| 简单 k-means(欧氏距离),带 k-means++ 初始化。 |
| x: (N,D) float32/float64 |
| 返回: (centroids, inertia) |
| """ |
| rng = np.random.default_rng(seed) |
| n = x.shape[0] |
|
|
| |
| centroids = np.empty((k, x.shape[1]), dtype=x.dtype) |
| idx0 = int(rng.integers(0, n)) |
| centroids[0] = x[idx0] |
|
|
| |
| dist2 = np.sum((x - centroids[0]) ** 2, axis=1) |
| for ci in range(1, k): |
| probs = dist2 / max(dist2.sum(), 1e-12) |
| next_idx = int(rng.choice(n, p=probs)) |
| centroids[ci] = x[next_idx] |
| dist2 = np.minimum(dist2, np.sum((x - centroids[ci]) ** 2, axis=1)) |
|
|
| |
| for _ in range(iters): |
| |
| x2 = np.sum(x * x, axis=1, keepdims=True) |
| c2 = np.sum(centroids * centroids, axis=1, keepdims=True).T |
| dists2 = x2 + c2 - 2.0 * (x @ centroids.T) |
| assign = np.argmin(dists2, axis=1) |
|
|
| new_centroids = np.zeros_like(centroids) |
| for ci in range(k): |
| mask = assign == ci |
| if not np.any(mask): |
| |
| new_centroids[ci] = x[int(rng.integers(0, n))] |
| else: |
| new_centroids[ci] = x[mask].mean(axis=0) |
| centroids = new_centroids |
|
|
| |
| x2 = np.sum(x * x, axis=1, keepdims=True) |
| c2 = np.sum(centroids * centroids, axis=1, keepdims=True).T |
| dists2 = x2 + c2 - 2.0 * (x @ centroids.T) |
| assign = np.argmin(dists2, axis=1) |
| inertia = float(np.sum(dists2[np.arange(n), assign])) |
| return centroids, inertia |
|
|
|
|
| def estimate_k_by_elbow(k_list: list, inertia_list: list, rel_drop_threshold: float = 0.15) -> int: |
| """ |
| 粗略 elbow:从小到大看相对下降率,下降率低于阈值就停。 |
| """ |
| best_k = k_list[-1] |
| for i in range(1, len(k_list)): |
| k_prev = k_list[i - 1] |
| k_now = k_list[i] |
| drop = (inertia_list[i - 1] - inertia_list[i]) / max(inertia_list[i - 1], 1e-12) |
| if drop < rel_drop_threshold: |
| best_k = k_prev |
| break |
| return int(best_k) |
|
|
|
|
| def main(): |
| ap = argparse.ArgumentParser(description="Mode diversity check via Inception features") |
| ap.add_argument( |
| "--npz", |
| required=True, |
| help="输入 npz(需包含 arr_0,形状 (N,H,W,3),数值范围 [0,255])", |
| ) |
| ap.add_argument("--m", type=int, default=2000, help="抽样图片数(建议 1000~4000)") |
| ap.add_argument("--seed", type=int, default=0) |
| ap.add_argument("--tf-batch", type=int, default=64, help="Inception 前向 batch size") |
| ap.add_argument("--max-k", type=int, default=20, help="k-means 候选最大 k(会使用一组默认候选)") |
| args = ap.parse_args() |
|
|
| npz_path = os.path.abspath(args.npz) |
|
|
| |
| print(f"[1/3] Reservoir sampling from: {npz_path} (m={args.m})") |
| with open_npz_array(npz_path, "arr_0") as reader: |
| |
| if hasattr(reader, "shape"): |
| n_total = int(reader.shape[0]) |
| else: |
| n_total = None |
| if n_total is not None and args.m > n_total: |
| raise ValueError(f"m={args.m} > N={n_total}") |
| imgs = reservoir_sample_images(reader, m=args.m, seed=args.seed) |
| print(f"[1/3] Done. sampled imgs shape={imgs.shape}, dtype={imgs.dtype}") |
|
|
| |
| print("[2/3] Compute Inception pool_3 activations...") |
| os.environ["CUDA_VISIBLE_DEVICES"] = "" |
| config = tf.ConfigProto(allow_soft_placement=True, device_count={"GPU": 0}) |
| evaluator = Evaluator(tf.Session(config=config)) |
| evaluator.warmup() |
|
|
| acts, _ = evaluator.compute_activations(iter_batches(imgs, args.tf_batch)) |
| |
| print(f"[2/3] Done. acts shape={acts.shape}") |
|
|
| |
| print("[3/3] Pairwise cosine distance stats...") |
| s = cosine_pairwise_stats(acts.astype(np.float32)) |
| print("pairwise_cosine_distance:") |
| for k in ["mean", "median", "q01", "q05", "q10", "q50", "q90", "q95", "q99"]: |
| print(f" {k}={s[k]:.6f}") |
|
|
| |
| fn = acts.astype(np.float32) |
| fn = fn / (np.linalg.norm(fn, axis=1, keepdims=True) + 1e-12) |
|
|
| k_candidates = [] |
| for k in [2, 3, 4, 5, 6, 8, 10, 12, 15, 20]: |
| if k <= args.max_k: |
| k_candidates.append(k) |
| if k_candidates[-1] != min(args.max_k, k_candidates[-1]): |
| pass |
|
|
| print(f"[3/3] K-means over candidates: {k_candidates}") |
| inertia_list = [] |
| for k in k_candidates: |
| _, inertia = kmeans_euclidean(fn, k=k, iters=10, seed=args.seed) |
| inertia_list.append(inertia) |
| print(f" k={k:2d} inertia={inertia:.4e}") |
|
|
| est_k = estimate_k_by_elbow(k_candidates, inertia_list, rel_drop_threshold=0.12) |
| print(f"[3/3] Estimated #mode clusters (elbow heuristic): {est_k}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|
|
|