Spaces:
Sleeping
Sleeping
| import math | |
| import numpy as np | |
| import random | |
| from sklearn.datasets import make_classification | |
| from sklearn.linear_model import SGDClassifier | |
| from sklearn.metrics import log_loss | |
| from sklearn.preprocessing import StandardScaler | |
| from models import Observation, Action, EnvState | |
| from sampling import sample_uncertainty, sample_diversity, sample_random, entropy | |
| from reward import mean_cosine, running_mean | |
| class DatasetState: | |
| def __init__(self, X_pool, y_pool, X_val, y_val, X_train, y_train, budget, y_all, noise_mask): | |
| self.X_pool = X_pool | |
| self.y_pool = y_pool | |
| self.X_val = X_val | |
| self.y_val = y_val | |
| self.X_train = X_train | |
| self.y_train = y_train | |
| self.budget = budget | |
| self.steps = 0 | |
| self.performance = 0.0 | |
| self.train_centroid = np.mean(X_train, axis=0) | |
| self.y_all = y_all | |
| self.noise_mask = noise_mask | |
| class DataSelectEnv: | |
| WARMUP = 2 | |
| def __init__(self, cfg, seed: int = 42): | |
| """ | |
| cfg β environment config dict (budget, max_steps, alpha, data) | |
| seed β random seed; stored here, applied at reset() so every | |
| episode is reproducible regardless of how many times | |
| the env has been constructed or stepped. | |
| """ | |
| self.cfg = cfg | |
| self.seed = seed | |
| self._episode_state = None # set by reset() | |
| # ------------------------------------------------------------------ | |
| # Core OpenEnv interface | |
| # ------------------------------------------------------------------ | |
| def reset(self): | |
| """Start a new episode. Returns initial Observation.""" | |
| # Seed here β ensures every reset() call produces identical data | |
| np.random.seed(self.seed) | |
| random.seed(self.seed) | |
| X, y = make_classification( | |
| n_samples=1500, | |
| n_features=20, | |
| n_informative=5, | |
| n_redundant=5, | |
| n_clusters_per_class=2, | |
| class_sep=1.0, | |
| flip_y=0.1, | |
| random_state=42, # dataset skeleton is fixed; noise injection varies by seed | |
| ) | |
| # Scaling β must happen before splits | |
| self.scaler = StandardScaler() | |
| X = self.scaler.fit_transform(X) | |
| X_seed, y_seed = X[:100], y[:100] | |
| X_val, y_val = X[200:400], y[200:400] | |
| X_pool, y_pool = X[400:], y[400:] | |
| X_pool = X_pool + np.random.normal(0, 0.5, X_pool.shape) | |
| # Noise injection β flip_y from task cfg overrides base value | |
| flip_prob = self.cfg["data"].get("flip_y", 0.1) | |
| noise_mask = np.random.rand(len(y_pool)) < flip_prob | |
| y_pool_noisy = y_pool.copy() | |
| # Guaranteed label flip (1-y), not random assignment. | |
| # Random assignment gives the correct label 50% of the time, halving | |
| # effective noise. Flipping guarantees every noise_mask sample is wrong. | |
| y_pool_noisy[noise_mask] = 1 - y_pool[noise_mask] | |
| X_pool[noise_mask] += np.random.normal(0, 0.1, X_pool[noise_mask].shape) | |
| # Fresh model every episode | |
| self.model = SGDClassifier( | |
| loss="log_loss", | |
| learning_rate="adaptive", | |
| eta0=0.01, | |
| max_iter=1, | |
| tol=None, | |
| random_state=42, | |
| ) | |
| # Warm start on seed data | |
| for _ in range(10): | |
| self.model.partial_fit(X_seed, y_seed, classes=np.unique(y)) | |
| self._episode_state = DatasetState( | |
| X_pool, y_pool_noisy, X_val, y_val, | |
| X_seed.copy(), y_seed.copy(), | |
| self.cfg["budget"], y, noise_mask, | |
| ) | |
| # Noisy centroid β used in step() for the noise trap | |
| if np.any(noise_mask): | |
| self._episode_state.noisy_centroid = np.mean(X_pool[noise_mask], axis=0) | |
| else: | |
| self._episode_state.noisy_centroid = np.zeros(X_pool.shape[1]) | |
| self._episode_state.performance = self._score() | |
| return self._obs() | |
| def step(self, action: Action): | |
| """Execute one action. Returns (Observation, reward, done, info).""" | |
| s = self._episode_state | |
| if s is None: | |
| raise RuntimeError("Call reset() before step().") | |
| # Pool empty guard | |
| if len(s.X_pool) == 0: | |
| return self._obs(), 0.0, True, {"reason": "pool_empty"} | |
| # Sanity check β noise_mask must always stay in sync with pool | |
| assert len(s.noise_mask) == len(s.X_pool), ( | |
| f"noise_mask length {len(s.noise_mask)} != X_pool length {len(s.X_pool)}" | |
| ) | |
| # Normalize strategy weights | |
| w = action.strategy_weights | |
| total = sum(w.values()) + 1e-8 | |
| w = {k: v / total for k, v in w.items()} | |
| min_b = self.cfg.get("min_batch", 1) | |
| b = min(action.batch_size, s.budget) | |
| if b < min_b and action.action_type != "stop": | |
| b = min_b # enforce minimum; prevents single-sample gaming | |
| if b <= 0: | |
| return self._obs(), -0.01, False, {"error": "empty batch"} | |
| if action.action_type == "stop": | |
| perf_threshold = self.cfg.get("stop_threshold", 0.60) | |
| if s.performance >= perf_threshold: | |
| stop_reward = 0.05 * s.budget | |
| else: | |
| stop_reward = -1.0 | |
| return self._obs(), stop_reward, True, {} | |
| # Uncertainty + noise trap | |
| proba_pool = self.model.predict_proba(s.X_pool) | |
| H = entropy(proba_pool) | |
| # Noise trap: boost entropy of noisy samples so uncertainty sampling is | |
| # attracted to them. Capped at 0.55 (< log(2) β 0.693 max binary entropy) | |
| # so clean uncertain samples can still compete β trap misleads rather | |
| # than completely overrides, keeping uncertainty a near-miss on hard. | |
| max_entropy = math.log(2) # β 0.693 for binary classifier | |
| flip_prob = self.cfg["data"].get("flip_y", 0.1) | |
| boost_raw = 0.1 + flip_prob * 2.0 | |
| noise_boost = s.noise_mask.astype(float) * min(boost_raw, 0.55) | |
| H_adj = H + noise_boost | |
| # Sampling | |
| n_u = int(b * w.get("uncertainty", 0)) | |
| n_d = int(b * w.get("diversity", 0)) | |
| n_r = max(0, b - n_u - n_d) | |
| sel = set() | |
| sel.update(sample_uncertainty(s, H_adj, n_u, sel)) | |
| sel.update(sample_diversity(s, n_d, sel)) | |
| sel.update(sample_random(s, n_r, sel)) | |
| idx = np.array(list(sel), dtype=int) | |
| if len(idx) == 0: | |
| return self._obs(), -0.01, False, {"error": "no samples selected"} | |
| Xb, yb = s.X_pool[idx], s.y_pool[idx] | |
| selected_noise = s.noise_mask[idx] | |
| noise_ratio = float(np.mean(selected_noise)) | |
| # Remove selected samples from pool β keep noise_mask in sync | |
| keep = np.ones(len(s.X_pool), dtype=bool) | |
| keep[idx] = False | |
| s.X_pool = s.X_pool[keep] | |
| s.y_pool = s.y_pool[keep] | |
| s.noise_mask = s.noise_mask[keep] | |
| # Incremental training | |
| for _ in range(3): | |
| self.model.partial_fit(Xb, yb, classes=np.unique(s.y_all)) | |
| old = s.performance | |
| new = self._score() | |
| # ---------------------------------------------------------------- | |
| # Reward design | |
| # ---------------------------------------------------------------- | |
| gain = (new - old) * 5.0 | |
| # Distance-based diversity bonus: rewards batches that cover regions | |
| # far from existing training data. Diversity sampling scores high | |
| # (~0.25), random scores average (~0.22), uncertainty scores low | |
| # (~0.15) because boundary samples cluster near the centroid. | |
| diversity_bonus = float(np.mean( | |
| np.linalg.norm(Xb - s.train_centroid, axis=1) | |
| )) * 0.05 | |
| gain += diversity_bonus | |
| redundancy = mean_cosine(Xb, s.train_centroid) | |
| if redundancy > 0.8: | |
| gain *= 0.5 | |
| if new > 0.85: | |
| gain *= 0.7 | |
| # Noise penalty scales with task difficulty: easy is forgiving, | |
| # hard severely punishes noisy selections. | |
| flip_prob = self.cfg["data"].get("flip_y", 0.1) | |
| noise_scale = 1.0 + flip_prob * 2.0 # 1.1 easy | 1.5 medium | 1.6 hard | |
| noise_penalty = noise_scale * noise_ratio | |
| reward = gain - 0.01 * b - 0.3 * redundancy - noise_penalty | |
| reward += 0.15 # baseline: keeps reward in mixed-sign territory so | |
| # RL agents receive positive signal for good steps | |
| # ---------------------------------------------------------------- | |
| # Update state | |
| s.train_centroid = running_mean(s.train_centroid, Xb) | |
| s.X_train = np.vstack([s.X_train, Xb]) | |
| s.y_train = np.concatenate([s.y_train, yb]) | |
| s.performance = new | |
| s.budget -= b | |
| s.steps += 1 | |
| info = { | |
| "gain": float(gain), | |
| "redundancy": float(redundancy), | |
| "noise_ratio": float(noise_ratio), | |
| } | |
| done = (s.budget <= 0) or (s.steps >= self.cfg["max_steps"]) | |
| return self._obs(), float(reward), done, info | |
| def get_state(self) -> EnvState: | |
| """ | |
| OpenEnv spec state() requirement. | |
| Returns episode metadata β called by GET /state on the server. | |
| Named get_state() to avoid collision with the self._episode_state attribute. | |
| """ | |
| if self._episode_state is None: | |
| return EnvState( | |
| step_count=0, | |
| remaining_budget=None, | |
| current_performance=None, | |
| pool_size=None, | |
| done=False, | |
| ) | |
| s = self._episode_state | |
| return EnvState( | |
| step_count=int(s.steps), | |
| remaining_budget=int(s.budget), | |
| current_performance=float(s.performance), | |
| pool_size=int(len(s.X_pool)), | |
| done=(s.budget <= 0 or s.steps >= self.cfg["max_steps"]), | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Internal helpers | |
| # ------------------------------------------------------------------ | |
| def _score(self): | |
| p = self.model.predict_proba(self._episode_state.X_val) | |
| return max(0.05, float(1 / (1 + log_loss(self._episode_state.y_val, p)))) | |
| def _obs(self): | |
| s = self._episode_state | |
| return Observation( | |
| remaining_budget=int(s.budget), | |
| diversity_score=float(np.std(s.X_train)), | |
| noise_estimate=float(np.mean(s.noise_mask)), | |
| current_performance=float(s.performance), | |
| samples_available=int(len(s.X_pool)), | |
| ) |