File size: 4,870 Bytes
b4b2877 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | #!/usr/bin/env python3
"""
Generate action labels by clustering task descriptions using text embeddings.
No manual rules — uses sentence-transformers + K-Means clustering.
"""
import os
import json
import glob
import argparse
import numpy as np
from collections import Counter
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
ANNOTATION_DIR = "${PULSE_ROOT}"
def collect_tasks():
"""Collect all task descriptions from all annotation files."""
tasks = []
for path in sorted(glob.glob(os.path.join(ANNOTATION_DIR, 'v*/s*.json'))):
with open(path) as f:
data = json.load(f)
for seg in data.get('segments', []):
tasks.append(seg['task'])
return tasks
def embed_texts(texts):
"""Encode texts using sentence-transformers (multilingual model)."""
try:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
embeddings = model.encode(texts, show_progress_bar=True, batch_size=128)
print(f"Encoded {len(texts)} texts with sentence-transformers, dim={embeddings.shape[1]}")
return embeddings
except Exception as e:
print(f"sentence-transformers failed ({e}), falling back to TF-IDF")
from sklearn.feature_extraction.text import TfidfVectorizer
vec = TfidfVectorizer(analyzer='char', ngram_range=(1, 3), max_features=3000)
X = vec.fit_transform(texts).toarray()
print(f"Encoded {len(texts)} texts with TF-IDF char n-grams, dim={X.shape[1]}")
return X
def cluster_tasks(tasks, k_range=(10, 30)):
unique_tasks = sorted(set(tasks))
print(f"Total segments: {len(tasks)}, Unique task texts: {len(unique_tasks)}")
X = embed_texts(unique_tasks)
# Find optimal K via silhouette score
best_k, best_score = k_range[0], -1
scores = {}
for k in range(k_range[0], k_range[1] + 1):
km = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = km.fit_predict(X)
score = silhouette_score(X, labels, sample_size=min(2000, len(unique_tasks)))
scores[k] = score
if score > best_score:
best_score = score
best_k = k
print(f" K={k}: silhouette={score:.4f}" + (" *" if k == best_k else ""))
print(f"\nBest K={best_k} (silhouette={best_score:.4f})")
# Final clustering
km = KMeans(n_clusters=best_k, random_state=42, n_init=10)
labels = km.fit_predict(X)
task_to_cluster = {task: int(labels[i]) for i, task in enumerate(unique_tasks)}
# Representative task per cluster (closest to centroid)
cluster_representatives = {}
cluster_members = {}
for cid in range(best_k):
member_idx = [i for i, l in enumerate(labels) if l == cid]
members = [unique_tasks[i] for i in member_idx]
cluster_members[cid] = members
centroid = km.cluster_centers_[cid]
dists = np.linalg.norm(X[member_idx] - centroid, axis=1)
closest = member_idx[np.argmin(dists)]
cluster_representatives[cid] = unique_tasks[closest]
return task_to_cluster, cluster_representatives, cluster_members, best_k, scores
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--output_dir', type=str,
default='${PULSE_ROOT}/results/pred')
parser.add_argument('--k_min', type=int, default=10)
parser.add_argument('--k_max', type=int, default=30)
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
tasks = collect_tasks()
task_to_cluster, representatives, members, K, scores = cluster_tasks(
tasks, k_range=(args.k_min, args.k_max)
)
# Print summary
segment_counts = Counter(task_to_cluster[t] for t in tasks)
print(f"\n{'='*60}")
print(f"Clusters (K={K}):")
for cid in range(K):
rep = representatives[cid]
n_unique = len(members[cid])
n_segs = segment_counts.get(cid, 0)
examples = [m for m in members[cid] if m != rep][:3]
print(f"\n [{cid:2d}] ({n_segs:4d} segs, {n_unique:3d} unique) \"{rep}\"")
for ex in examples:
print(f" - {ex}")
# Save
output = {
'num_classes': K,
'task_to_cluster': task_to_cluster,
'cluster_representatives': {str(k): v for k, v in representatives.items()},
'cluster_sizes_unique': {str(k): len(v) for k, v in members.items()},
'cluster_sizes_segments': {str(k): v for k, v in segment_counts.items()},
'silhouette_scores': {str(k): v for k, v in scores.items()},
}
out_path = os.path.join(args.output_dir, 'action_labels.json')
with open(out_path, 'w') as f:
json.dump(output, f, indent=2, ensure_ascii=False)
print(f"\nSaved to {out_path}")
if __name__ == '__main__':
main()
|