DateSelectEnv / env.py
Mihir1107's picture
Complete OpenEnv hackathon submission
c90be96
import math
import numpy as np
import random
from sklearn.datasets import make_classification
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import log_loss
from sklearn.preprocessing import StandardScaler
from models import Observation, Action, EnvState
from sampling import sample_uncertainty, sample_diversity, sample_random, entropy
from reward import mean_cosine, running_mean
class DatasetState:
def __init__(self, X_pool, y_pool, X_val, y_val, X_train, y_train, budget, y_all, noise_mask):
self.X_pool = X_pool
self.y_pool = y_pool
self.X_val = X_val
self.y_val = y_val
self.X_train = X_train
self.y_train = y_train
self.budget = budget
self.steps = 0
self.performance = 0.0
self.train_centroid = np.mean(X_train, axis=0)
self.y_all = y_all
self.noise_mask = noise_mask
class DataSelectEnv:
WARMUP = 2
def __init__(self, cfg, seed: int = 42):
"""
cfg β€” environment config dict (budget, max_steps, alpha, data)
seed β€” random seed; stored here, applied at reset() so every
episode is reproducible regardless of how many times
the env has been constructed or stepped.
"""
self.cfg = cfg
self.seed = seed
self._episode_state = None # set by reset()
# ------------------------------------------------------------------
# Core OpenEnv interface
# ------------------------------------------------------------------
def reset(self):
"""Start a new episode. Returns initial Observation."""
# Seed here β€” ensures every reset() call produces identical data
np.random.seed(self.seed)
random.seed(self.seed)
X, y = make_classification(
n_samples=1500,
n_features=20,
n_informative=5,
n_redundant=5,
n_clusters_per_class=2,
class_sep=1.0,
flip_y=0.1,
random_state=42, # dataset skeleton is fixed; noise injection varies by seed
)
# Scaling β€” must happen before splits
self.scaler = StandardScaler()
X = self.scaler.fit_transform(X)
X_seed, y_seed = X[:100], y[:100]
X_val, y_val = X[200:400], y[200:400]
X_pool, y_pool = X[400:], y[400:]
X_pool = X_pool + np.random.normal(0, 0.5, X_pool.shape)
# Noise injection β€” flip_y from task cfg overrides base value
flip_prob = self.cfg["data"].get("flip_y", 0.1)
noise_mask = np.random.rand(len(y_pool)) < flip_prob
y_pool_noisy = y_pool.copy()
# Guaranteed label flip (1-y), not random assignment.
# Random assignment gives the correct label 50% of the time, halving
# effective noise. Flipping guarantees every noise_mask sample is wrong.
y_pool_noisy[noise_mask] = 1 - y_pool[noise_mask]
X_pool[noise_mask] += np.random.normal(0, 0.1, X_pool[noise_mask].shape)
# Fresh model every episode
self.model = SGDClassifier(
loss="log_loss",
learning_rate="adaptive",
eta0=0.01,
max_iter=1,
tol=None,
random_state=42,
)
# Warm start on seed data
for _ in range(10):
self.model.partial_fit(X_seed, y_seed, classes=np.unique(y))
self._episode_state = DatasetState(
X_pool, y_pool_noisy, X_val, y_val,
X_seed.copy(), y_seed.copy(),
self.cfg["budget"], y, noise_mask,
)
# Noisy centroid β€” used in step() for the noise trap
if np.any(noise_mask):
self._episode_state.noisy_centroid = np.mean(X_pool[noise_mask], axis=0)
else:
self._episode_state.noisy_centroid = np.zeros(X_pool.shape[1])
self._episode_state.performance = self._score()
return self._obs()
def step(self, action: Action):
"""Execute one action. Returns (Observation, reward, done, info)."""
s = self._episode_state
if s is None:
raise RuntimeError("Call reset() before step().")
# Pool empty guard
if len(s.X_pool) == 0:
return self._obs(), 0.0, True, {"reason": "pool_empty"}
# Sanity check β€” noise_mask must always stay in sync with pool
assert len(s.noise_mask) == len(s.X_pool), (
f"noise_mask length {len(s.noise_mask)} != X_pool length {len(s.X_pool)}"
)
# Normalize strategy weights
w = action.strategy_weights
total = sum(w.values()) + 1e-8
w = {k: v / total for k, v in w.items()}
min_b = self.cfg.get("min_batch", 1)
b = min(action.batch_size, s.budget)
if b < min_b and action.action_type != "stop":
b = min_b # enforce minimum; prevents single-sample gaming
if b <= 0:
return self._obs(), -0.01, False, {"error": "empty batch"}
if action.action_type == "stop":
perf_threshold = self.cfg.get("stop_threshold", 0.60)
if s.performance >= perf_threshold:
stop_reward = 0.05 * s.budget
else:
stop_reward = -1.0
return self._obs(), stop_reward, True, {}
# Uncertainty + noise trap
proba_pool = self.model.predict_proba(s.X_pool)
H = entropy(proba_pool)
# Noise trap: boost entropy of noisy samples so uncertainty sampling is
# attracted to them. Capped at 0.55 (< log(2) β‰ˆ 0.693 max binary entropy)
# so clean uncertain samples can still compete β€” trap misleads rather
# than completely overrides, keeping uncertainty a near-miss on hard.
max_entropy = math.log(2) # β‰ˆ 0.693 for binary classifier
flip_prob = self.cfg["data"].get("flip_y", 0.1)
boost_raw = 0.1 + flip_prob * 2.0
noise_boost = s.noise_mask.astype(float) * min(boost_raw, 0.55)
H_adj = H + noise_boost
# Sampling
n_u = int(b * w.get("uncertainty", 0))
n_d = int(b * w.get("diversity", 0))
n_r = max(0, b - n_u - n_d)
sel = set()
sel.update(sample_uncertainty(s, H_adj, n_u, sel))
sel.update(sample_diversity(s, n_d, sel))
sel.update(sample_random(s, n_r, sel))
idx = np.array(list(sel), dtype=int)
if len(idx) == 0:
return self._obs(), -0.01, False, {"error": "no samples selected"}
Xb, yb = s.X_pool[idx], s.y_pool[idx]
selected_noise = s.noise_mask[idx]
noise_ratio = float(np.mean(selected_noise))
# Remove selected samples from pool β€” keep noise_mask in sync
keep = np.ones(len(s.X_pool), dtype=bool)
keep[idx] = False
s.X_pool = s.X_pool[keep]
s.y_pool = s.y_pool[keep]
s.noise_mask = s.noise_mask[keep]
# Incremental training
for _ in range(3):
self.model.partial_fit(Xb, yb, classes=np.unique(s.y_all))
old = s.performance
new = self._score()
# ----------------------------------------------------------------
# Reward design
# ----------------------------------------------------------------
gain = (new - old) * 5.0
# Distance-based diversity bonus: rewards batches that cover regions
# far from existing training data. Diversity sampling scores high
# (~0.25), random scores average (~0.22), uncertainty scores low
# (~0.15) because boundary samples cluster near the centroid.
diversity_bonus = float(np.mean(
np.linalg.norm(Xb - s.train_centroid, axis=1)
)) * 0.05
gain += diversity_bonus
redundancy = mean_cosine(Xb, s.train_centroid)
if redundancy > 0.8:
gain *= 0.5
if new > 0.85:
gain *= 0.7
# Noise penalty scales with task difficulty: easy is forgiving,
# hard severely punishes noisy selections.
flip_prob = self.cfg["data"].get("flip_y", 0.1)
noise_scale = 1.0 + flip_prob * 2.0 # 1.1 easy | 1.5 medium | 1.6 hard
noise_penalty = noise_scale * noise_ratio
reward = gain - 0.01 * b - 0.3 * redundancy - noise_penalty
reward += 0.15 # baseline: keeps reward in mixed-sign territory so
# RL agents receive positive signal for good steps
# ----------------------------------------------------------------
# Update state
s.train_centroid = running_mean(s.train_centroid, Xb)
s.X_train = np.vstack([s.X_train, Xb])
s.y_train = np.concatenate([s.y_train, yb])
s.performance = new
s.budget -= b
s.steps += 1
info = {
"gain": float(gain),
"redundancy": float(redundancy),
"noise_ratio": float(noise_ratio),
}
done = (s.budget <= 0) or (s.steps >= self.cfg["max_steps"])
return self._obs(), float(reward), done, info
def get_state(self) -> EnvState:
"""
OpenEnv spec state() requirement.
Returns episode metadata β€” called by GET /state on the server.
Named get_state() to avoid collision with the self._episode_state attribute.
"""
if self._episode_state is None:
return EnvState(
step_count=0,
remaining_budget=None,
current_performance=None,
pool_size=None,
done=False,
)
s = self._episode_state
return EnvState(
step_count=int(s.steps),
remaining_budget=int(s.budget),
current_performance=float(s.performance),
pool_size=int(len(s.X_pool)),
done=(s.budget <= 0 or s.steps >= self.cfg["max_steps"]),
)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _score(self):
p = self.model.predict_proba(self._episode_state.X_val)
return max(0.05, float(1 / (1 + log_loss(self._episode_state.y_val, p))))
def _obs(self):
s = self._episode_state
return Observation(
remaining_budget=int(s.budget),
diversity_score=float(np.std(s.X_train)),
noise_estimate=float(np.mean(s.noise_mask)),
current_performance=float(s.performance),
samples_available=int(len(s.X_pool)),
)