jsflow / mode_diversity_check.py
xiangzai's picture
Add files using upload-large-folder tool
b65e56d verified
#!/usr/bin/env python3
"""
模式多样性/“有多少模式簇”的快速估计(不改原始代码)。
流程:
1) 从 npz 的 arr_0 顺序流式读取,使用 reservoir sampling 抽样 M 张图像(默认 2000)
2) 用 samples/evaluator.py 的 Evaluator 计算 Inception pool_3 特征(2048维)
3) 输出:
- cosine 距离的 pairwise 统计(mean/median/分位数)
- 在归一化特征上跑 k-means(候选 k 列表),打印 inertia 与“elbow”建议的簇数
注意:
- 这不是严格的“真实模式数”,而是基于特征距离/聚类启发式的估计。
- 抽样过程需要扫完整个 npz(一次读流),但 Inception 计算只对 M 张做,通常更省。
"""
from __future__ import annotations
import argparse
import os
from typing import Iterable, Tuple
import numpy as np
import tensorflow.compat.v1 as tf
from evaluator import Evaluator, open_npz_array
def reservoir_sample_images(reader, m: int, seed: int = 0) -> np.ndarray:
"""
从 reader 流式读取 arr_0,均匀抽样 m 张图像。
reader: evaluator.open_npz_array 返回的 NpzArrayReader(支持 read_batch)
返回: (m, H, W, 3) uint8
"""
rng = np.random.default_rng(seed)
reservoir = []
seen = 0
# 先用比较大的 batch_size 读以减少 Python 调度
read_bs = 256
while True:
batch = reader.read_batch(read_bs)
if batch is None:
break
# batch: (bs,H,W,3)
bs = batch.shape[0]
for i in range(bs):
x = batch[i]
if len(reservoir) < m:
reservoir.append(x.copy())
else:
# reservoir sampling: j ~ Uniform(0..seen)
j = int(rng.integers(0, seen + 1))
if j < m:
reservoir[j] = x.copy()
seen += 1
if len(reservoir) != m:
raise ValueError(f"Reservoir size mismatch: expected {m}, got {len(reservoir)}")
return np.stack(reservoir, axis=0)
def iter_batches(x: np.ndarray, batch_size: int) -> Iterable[np.ndarray]:
n = x.shape[0]
for i in range(0, n, batch_size):
yield x[i : i + batch_size]
def cosine_pairwise_stats(feat: np.ndarray) -> dict:
"""
在归一化后的特征上计算 cosine 距离 pairwise 分布(取上三角)。
feat: (N,D) float32
"""
f = feat / (np.linalg.norm(feat, axis=1, keepdims=True) + 1e-12)
# similarity: (N,N)
sim = f @ f.T
cos_dist = 1.0 - sim
n = cos_dist.shape[0]
iu = np.triu_indices(n, k=1)
vals = cos_dist[iu]
qs = np.quantile(vals, [0.01, 0.05, 0.1, 0.5, 0.9, 0.95, 0.99])
return {
"n": int(n),
"mean": float(vals.mean()),
"median": float(np.median(vals)),
"q01": float(qs[0]),
"q05": float(qs[1]),
"q10": float(qs[2]),
"q50": float(qs[3]),
"q90": float(qs[4]),
"q95": float(qs[5]),
"q99": float(qs[6]),
}
def kmeans_euclidean(x: np.ndarray, k: int, iters: int = 10, seed: int = 0) -> Tuple[np.ndarray, float]:
"""
简单 k-means(欧氏距离),带 k-means++ 初始化。
x: (N,D) float32/float64
返回: (centroids, inertia)
"""
rng = np.random.default_rng(seed)
n = x.shape[0]
# k-means++ init
centroids = np.empty((k, x.shape[1]), dtype=x.dtype)
idx0 = int(rng.integers(0, n))
centroids[0] = x[idx0]
# squared distances to nearest centroid
dist2 = np.sum((x - centroids[0]) ** 2, axis=1)
for ci in range(1, k):
probs = dist2 / max(dist2.sum(), 1e-12)
next_idx = int(rng.choice(n, p=probs))
centroids[ci] = x[next_idx]
dist2 = np.minimum(dist2, np.sum((x - centroids[ci]) ** 2, axis=1))
# Lloyd iterations
for _ in range(iters):
# distances squared: (N,K) = ||x||^2 + ||c||^2 -2x·c
x2 = np.sum(x * x, axis=1, keepdims=True) # (N,1)
c2 = np.sum(centroids * centroids, axis=1, keepdims=True).T # (1,K)
dists2 = x2 + c2 - 2.0 * (x @ centroids.T) # (N,K)
assign = np.argmin(dists2, axis=1)
new_centroids = np.zeros_like(centroids)
for ci in range(k):
mask = assign == ci
if not np.any(mask):
# empty cluster: reinit to a random point
new_centroids[ci] = x[int(rng.integers(0, n))]
else:
new_centroids[ci] = x[mask].mean(axis=0)
centroids = new_centroids
# inertia: sum of squared distances to closest centroid
x2 = np.sum(x * x, axis=1, keepdims=True)
c2 = np.sum(centroids * centroids, axis=1, keepdims=True).T
dists2 = x2 + c2 - 2.0 * (x @ centroids.T)
assign = np.argmin(dists2, axis=1)
inertia = float(np.sum(dists2[np.arange(n), assign]))
return centroids, inertia
def estimate_k_by_elbow(k_list: list, inertia_list: list, rel_drop_threshold: float = 0.15) -> int:
"""
粗略 elbow:从小到大看相对下降率,下降率低于阈值就停。
"""
best_k = k_list[-1]
for i in range(1, len(k_list)):
k_prev = k_list[i - 1]
k_now = k_list[i]
drop = (inertia_list[i - 1] - inertia_list[i]) / max(inertia_list[i - 1], 1e-12)
if drop < rel_drop_threshold:
best_k = k_prev
break
return int(best_k)
def main():
ap = argparse.ArgumentParser(description="Mode diversity check via Inception features")
ap.add_argument(
"--npz",
required=True,
help="输入 npz(需包含 arr_0,形状 (N,H,W,3),数值范围 [0,255])",
)
ap.add_argument("--m", type=int, default=2000, help="抽样图片数(建议 1000~4000)")
ap.add_argument("--seed", type=int, default=0)
ap.add_argument("--tf-batch", type=int, default=64, help="Inception 前向 batch size")
ap.add_argument("--max-k", type=int, default=20, help="k-means 候选最大 k(会使用一组默认候选)")
args = ap.parse_args()
npz_path = os.path.abspath(args.npz)
# 1) reservoir sample: 只抽 M 张图像进内存
print(f"[1/3] Reservoir sampling from: {npz_path} (m={args.m})")
with open_npz_array(npz_path, "arr_0") as reader:
# reader.remaining() 在抽样过程中会变;这里直接用 reader.shape[0] 可能更稳
if hasattr(reader, "shape"):
n_total = int(reader.shape[0])
else:
n_total = None
if n_total is not None and args.m > n_total:
raise ValueError(f"m={args.m} > N={n_total}")
imgs = reservoir_sample_images(reader, m=args.m, seed=args.seed)
print(f"[1/3] Done. sampled imgs shape={imgs.shape}, dtype={imgs.dtype}")
# 2) Inception features
print("[2/3] Compute Inception pool_3 activations...")
os.environ["CUDA_VISIBLE_DEVICES"] = ""
config = tf.ConfigProto(allow_soft_placement=True, device_count={"GPU": 0})
evaluator = Evaluator(tf.Session(config=config))
evaluator.warmup()
acts, _ = evaluator.compute_activations(iter_batches(imgs, args.tf_batch))
# acts: (m, 2048)
print(f"[2/3] Done. acts shape={acts.shape}")
# 3) pairwise stats + kmeans elbow
print("[3/3] Pairwise cosine distance stats...")
s = cosine_pairwise_stats(acts.astype(np.float32))
print("pairwise_cosine_distance:")
for k in ["mean", "median", "q01", "q05", "q10", "q50", "q90", "q95", "q99"]:
print(f" {k}={s[k]:.6f}")
# k-means on normalized features using Euclidean distance on unit sphere
fn = acts.astype(np.float32)
fn = fn / (np.linalg.norm(fn, axis=1, keepdims=True) + 1e-12)
k_candidates = []
for k in [2, 3, 4, 5, 6, 8, 10, 12, 15, 20]:
if k <= args.max_k:
k_candidates.append(k)
if k_candidates[-1] != min(args.max_k, k_candidates[-1]):
pass
print(f"[3/3] K-means over candidates: {k_candidates}")
inertia_list = []
for k in k_candidates:
_, inertia = kmeans_euclidean(fn, k=k, iters=10, seed=args.seed)
inertia_list.append(inertia)
print(f" k={k:2d} inertia={inertia:.4e}")
est_k = estimate_k_by_elbow(k_candidates, inertia_list, rel_drop_threshold=0.12)
print(f"[3/3] Estimated #mode clusters (elbow heuristic): {est_k}")
if __name__ == "__main__":
main()