| |
| """ |
| Orbit Wars — Efficient PPO Self-Play Training for Adaptive Parameter Controller. |
| |
| Optimized version: loads agent module ONCE, modifies globals in-place each step. |
| """ |
|
|
| import copy |
| import math |
| import os |
| import random |
| import sys |
| import time |
| from collections import defaultdict, deque |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any |
|
|
| import numpy as np |
| import torch |
| import torch.nn as nn |
| from torch.distributions import Normal |
|
|
| |
| |
| |
| |
| import urllib.request |
| _SUBMISSION_PATH = '/app/submission.py' |
| if not os.path.exists(_SUBMISSION_PATH): |
| print("Downloading submission.py from HF Hub...") |
| urllib.request.urlretrieve( |
| "https://huggingface.co/Builder-Neekhil/orbit-wars-agent/resolve/main/submission.py", |
| _SUBMISSION_PATH |
| ) |
| print("Downloaded.") |
|
|
| sys.path.insert(0, '/app') |
| _BASE_NS = {} |
| exec(open(_SUBMISSION_PATH).read(), _BASE_NS) |
| print("Base agent loaded successfully.") |
|
|
| |
| _OPP_NS = {} |
| exec(open(_SUBMISSION_PATH).read(), _OPP_NS) |
|
|
| from kaggle_environments import make as _make_env |
|
|
| |
| |
| |
|
|
| FEATURE_DIM = 33 |
|
|
| def extract_features(obs): |
| get = obs.get if isinstance(obs, dict) else lambda k, d=None: getattr(obs, k, d) |
| player = int(get("player", 0) or 0) |
| step = int(get("step", 0) or 0) |
| planets = get("planets") or [] |
| fleets = get("fleets") or [] |
| ang_vel = float(get("angular_velocity", 0.0) or 0.0) |
| comet_ids = set(get("comet_planet_ids") or []) |
| |
| my_p = my_s = my_pr = en_p = en_s = en_pr = ne_p = ne_s = 0 |
| my_st = my_ro = en_st = 0 |
| en_by = defaultdict(int) |
| |
| for p in planets: |
| _, owner, x, y, radius, ships, prod = p |
| is_st = (math.hypot(x - 50, y - 50) + radius) >= 50.0 |
| if owner == player: |
| my_p += 1; my_s += ships; my_pr += prod |
| my_st += is_st; my_ro += (not is_st) |
| elif owner == -1: |
| ne_p += 1; ne_s += ships |
| else: |
| en_p += 1; en_s += ships; en_pr += prod; en_by[owner] += ships |
| en_st += is_st |
| |
| my_fs = sum(f[6] for f in fleets if f[1] == player) |
| en_fs = sum(f[6] for f in fleets if f[1] != player) |
| my_fc = sum(1 for f in fleets if f[1] == player) |
| en_fc = sum(1 for f in fleets if f[1] != player) |
| mt = my_s + my_fs; et = en_s + en_fs; ta = mt + et + ne_s |
| ne = len(en_by) |
| mx_e = max(en_by.values()) if en_by else 0 |
| mn_e = min(en_by.values()) if en_by else 0 |
| nc = sum(1 for p in planets if p[0] in comet_ids) |
| |
| return np.array([ |
| step/500, min(1, step/100), max(0, (500-step)/500), float(step > 400), |
| min(1, my_p/15), min(1, en_p/15), min(1, ne_p/15), min(1, my_st/10), min(1, my_ro/10), |
| min(1, mt/max(1, ta)), min(1, et/max(1, ta)), |
| math.log1p(mt)/10, math.log1p(et)/10, math.log1p(my_fs)/10, math.log1p(en_fs)/10, |
| min(1, my_pr/max(1, my_pr+en_pr)), my_pr/30, en_pr/30, |
| np.clip((mt-et)/max(1, ta), -1, 1), np.clip((my_p-en_p)/15, -1, 1), np.clip((my_pr-en_pr)/15, -1, 1), |
| min(1, ne/3), float(ne >= 3), min(1, mx_e/max(1, et)), min(1, mn_e/max(1, mx_e+1)), min(1, en_fc/20), |
| min(1, my_fc/20), my_fs/max(1, mt), en_fs/max(1, et), |
| abs(ang_vel)*100, min(1, nc/5), min(1, len(planets)/30), ne_s/max(1, ta), |
| ], dtype=np.float32) |
|
|
|
|
| class OpponentProfiler: |
| def __init__(self): |
| self.a = 0.1; self.agg = 0.5; self.exp = 0.5; self.trt = 0.5 |
| self.pp = 0; self.pf = 0; self.ps = 0; self.sc = 0 |
|
|
| def update(self, obs): |
| get = obs.get if isinstance(obs, dict) else lambda k, d=None: getattr(obs, k, d) |
| player = int(get("player", 0) or 0) |
| planets = get("planets") or []; fleets = get("fleets") or [] |
| ep = sum(1 for p in planets if p[1] not in (-1, player)) |
| ef = sum(1 for f in fleets if f[1] != player) |
| es = sum(p[5] for p in planets if p[1] not in (-1, player)) |
| es += sum(f[6] for f in fleets if f[1] != player) |
| if self.sc > 0: |
| fd = max(0, ef - self.pf) |
| self.agg = (1-self.a)*self.agg + self.a*min(1, fd/5) |
| pd = ep - self.pp |
| self.exp = (1-self.a)*self.exp + self.a*np.clip(pd/3+0.5, 0, 1) |
| efs = sum(f[6] for f in fleets if f[1] != player) |
| t = 1 - min(1, efs/max(1, es)) if es > 0 else 0.5 |
| self.trt = (1-self.a)*self.trt + self.a*t |
| self.pp = ep; self.pf = ef; self.ps = es; self.sc += 1 |
| return np.array([self.agg, self.exp, self.trt, min(1, self.sc/100), float(self.sc > 50)], dtype=np.float32) |
|
|
|
|
| |
| |
| |
|
|
| TUNABLE_PARAMS = { |
| "HOSTILE_TARGET_VALUE_MULT": (2.05, 1.0, 3.0), |
| "ELIMINATION_BONUS": (55.0, 10.0, 100.0), |
| "PROACTIVE_DEFENSE_RATIO": (0.28, 0.05, 0.5), |
| "FINISHING_HOSTILE_VALUE_MULT": (1.3, 0.8, 2.0), |
| "WEAK_ENEMY_THRESHOLD": (110.0, 30.0, 200.0), |
| "ATTACK_COST_TURN_WEIGHT": (0.50, 0.2, 0.8), |
| "HOSTILE_MARGIN_BASE": (3.0, 1.0, 6.0), |
| "FOUR_PLAYER_TARGET_MARGIN": (2.0, 0.0, 5.0), |
| "FINISHING_HOSTILE_SEND_BONUS": (5.0, 1.0, 10.0), |
| "STATIC_HOSTILE_VALUE_MULT": (1.65, 1.0, 2.5), |
| "GANG_UP_VALUE_MULT": (1.4, 1.0, 2.0), |
| "EXPOSED_PLANET_VALUE_MULT": (2.0, 1.0, 3.0), |
| "REINFORCE_VALUE_MULT": (1.35, 0.8, 2.0), |
| "DEFENSE_SHIP_VALUE": (0.55, 0.2, 1.0), |
| "BEHIND_DOMINATION": (-0.20, -0.5, 0.0), |
| "AHEAD_DOMINATION": (0.15, 0.0, 0.4), |
| "LATE_REMAINING_TURNS": (70.0, 40.0, 100.0), |
| "REAR_SEND_RATIO_TWO_PLAYER": (0.62, 0.3, 0.9), |
| "COMET_VALUE_MULT": (0.65, 0.3, 1.2), |
| "SNIPE_VALUE_MULT": (1.12, 0.7, 1.6), |
| } |
| PARAM_NAMES = list(TUNABLE_PARAMS.keys()) |
| NUM_PARAMS = len(PARAM_NAMES) |
| INPUT_DIM = FEATURE_DIM + 5 |
|
|
|
|
| class ParameterController(nn.Module): |
| def __init__(self, input_dim=INPUT_DIM, hidden_size=128): |
| super().__init__() |
| self.shared = nn.Sequential( |
| nn.Linear(input_dim, hidden_size), nn.ReLU(), |
| nn.Linear(hidden_size, hidden_size), nn.ReLU(), |
| ) |
| self.param_mean = nn.Sequential( |
| nn.Linear(hidden_size, hidden_size // 2), nn.ReLU(), |
| nn.Linear(hidden_size // 2, NUM_PARAMS), |
| ) |
| self.param_log_std = nn.Parameter(torch.zeros(NUM_PARAMS)) |
| self.value_head = nn.Sequential( |
| nn.Linear(hidden_size, hidden_size // 2), nn.ReLU(), |
| nn.Linear(hidden_size // 2, 1), |
| ) |
|
|
| def forward(self, x): |
| h = self.shared(x) |
| return torch.tanh(self.param_mean(h)), self.param_log_std, self.value_head(h).squeeze(-1) |
|
|
|
|
| def decode_params(raw): |
| params = {} |
| for i, name in enumerate(PARAM_NAMES): |
| _, low, high = TUNABLE_PARAMS[name] |
| t = (float(raw[i]) + 1.0) / 2.0 |
| params[name] = low + t * (high - low) |
| return params |
|
|
|
|
| def apply_params(ns, params): |
| """Apply parameter overrides to agent namespace (in-place, very fast).""" |
| for name, value in params.items(): |
| if name in ns: |
| ns[name] = value |
|
|
|
|
| def reset_params(ns): |
| """Reset parameters to defaults.""" |
| for name, (default, _, _) in TUNABLE_PARAMS.items(): |
| if name in ns: |
| ns[name] = default |
|
|
|
|
| |
| |
| |
|
|
| def compute_potential(obs, player): |
| get = obs.get if isinstance(obs, dict) else lambda k, d=None: getattr(obs, k, d) |
| planets = get("planets") or []; fleets = get("fleets") or [] |
| my_p = my_s = my_pr = en_p = en_s = en_pr = 0 |
| for p in planets: |
| _, owner, _, _, _, ships, prod = p |
| if owner == player: my_p += 1; my_s += ships; my_pr += prod |
| elif owner >= 0: en_p += 1; en_s += ships; en_pr += prod |
| for f in fleets: |
| _, owner, _, _, _, _, ships = f |
| if owner == player: my_s += ships |
| elif owner >= 0: en_s += ships |
| eps = 1e-6; lr = math.log(10.0) |
| pp = np.clip(math.log((my_p+eps)/(en_p+eps))/lr, -1, 1) |
| ps = np.clip(math.log((my_s+eps)/(en_s+eps))/lr, -1, 1) |
| pprod = np.clip(math.log((my_pr+eps)/(en_pr+eps))/lr, -1, 1) |
| return 0.4*pp + 0.3*ps + 0.3*pprod |
|
|
|
|
| |
| |
| |
|
|
| def run_episode_vs_random(learner_ns, seed, learner_slot=0): |
| """Run episode against Kaggle's built-in random agent (very fast).""" |
| from kaggle_environments.envs.orbit_wars.orbit_wars import random_agent |
| |
| env = _make_env("orbit_wars", configuration={"seed": seed}, debug=False) |
| env.reset(num_agents=2) |
| learner_ns['_agent_step'] = 0 |
| |
| profiler = OpponentProfiler() |
| states = env.step([[], []]) |
| learner_obs = states[learner_slot].observation |
| |
| features = extract_features(learner_obs) |
| profile = profiler.update(learner_obs) |
| initial_obs_vec = np.concatenate([features, profile]) |
| |
| player = int(learner_obs.get("player", 0) if isinstance(learner_obs, dict) else learner_obs.player) |
| prev_potential = compute_potential(learner_obs, player) |
| total_shaped_reward = 0.0 |
| step_count = 0 |
| done = False |
| |
| while not done: |
| try: |
| learner_moves = learner_ns['agent'](learner_obs) |
| except Exception: |
| learner_moves = [] |
| |
| opp_obs = states[1 - learner_slot].observation |
| try: |
| opponent_moves = random_agent(opp_obs) |
| except Exception: |
| opponent_moves = [] |
| |
| if learner_slot == 0: |
| actions = [learner_moves, opponent_moves] |
| else: |
| actions = [opponent_moves, learner_moves] |
| |
| states = env.step(actions) |
| learner_state = states[learner_slot] |
| learner_obs = learner_state.observation |
| done = learner_state.status != "ACTIVE" |
| |
| curr_potential = compute_potential(learner_obs, player) |
| step_reward = 0.99 * curr_potential - prev_potential |
| prev_potential = curr_potential |
| |
| if done: |
| raw_reward = float(learner_state.reward) if learner_state.reward else 0.0 |
| step_reward += raw_reward |
| |
| total_shaped_reward += step_reward |
| step_count += 1 |
| profile = profiler.update(learner_obs) |
| |
| final_features = extract_features(learner_obs) |
| final_obs_vec = np.concatenate([final_features, profile]) |
| final_reward = float(learner_state.reward) if learner_state.reward else 0.0 |
| |
| return initial_obs_vec, final_obs_vec, total_shaped_reward, final_reward, step_count |
|
|
|
|
| def run_episode(learner_ns, opponent_ns, seed, learner_slot=0): |
| """Run a full game episode. Returns (transitions, final_reward). |
| |
| Each transition: (features, reward, done) |
| The controller makes ONE decision per episode (parameter setting for the whole game). |
| This is much more efficient than per-step parameter tuning. |
| """ |
| env = _make_env("orbit_wars", configuration={"seed": seed}, debug=False) |
| env.reset(num_agents=2) |
| |
| |
| learner_ns['_agent_step'] = 0 |
| opponent_ns['_agent_step'] = 0 |
| |
| profiler = OpponentProfiler() |
| |
| |
| states = env.step([[], []]) |
| learner_obs = states[learner_slot].observation |
| opp_obs = states[1 - learner_slot].observation |
| |
| |
| features = extract_features(learner_obs) |
| profile = profiler.update(learner_obs) |
| initial_obs_vec = np.concatenate([features, profile]) |
| |
| prev_potential = compute_potential(learner_obs, |
| int(learner_obs.get("player", 0) if isinstance(learner_obs, dict) else learner_obs.player)) |
| |
| total_shaped_reward = 0.0 |
| step_count = 0 |
| done = False |
| |
| |
| while not done: |
| |
| try: |
| learner_moves = learner_ns['agent'](learner_obs) |
| except Exception: |
| learner_moves = [] |
| |
| try: |
| opponent_moves = opponent_ns['agent'](opp_obs) |
| except Exception: |
| opponent_moves = [] |
| |
| if learner_slot == 0: |
| actions = [learner_moves, opponent_moves] |
| else: |
| actions = [opponent_moves, learner_moves] |
| |
| states = env.step(actions) |
| learner_state = states[learner_slot] |
| opp_state = states[1 - learner_slot] |
| |
| learner_obs = learner_state.observation |
| opp_obs = opp_state.observation |
| done = learner_state.status != "ACTIVE" |
| |
| |
| player = int(learner_obs.get("player", 0) if isinstance(learner_obs, dict) else learner_obs.player) |
| curr_potential = compute_potential(learner_obs, player) |
| step_reward = 0.99 * curr_potential - prev_potential |
| prev_potential = curr_potential |
| |
| if done: |
| raw_reward = float(learner_state.reward) if learner_state.reward else 0.0 |
| step_reward += raw_reward |
| |
| total_shaped_reward += step_reward |
| step_count += 1 |
| |
| |
| profile = profiler.update(learner_obs) |
| |
| |
| final_features = extract_features(learner_obs) |
| final_obs_vec = np.concatenate([final_features, profile]) |
| |
| final_reward = float(learner_state.reward) if learner_state.reward else 0.0 |
| |
| return initial_obs_vec, final_obs_vec, total_shaped_reward, final_reward, step_count |
|
|
|
|
| def train(): |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| print(f"Device: {device}") |
| |
| |
| total_updates = int(os.environ.get("TOTAL_UPDATES", "500")) |
| episodes_per_update = int(os.environ.get("EPISODES_PER_UPDATE", "4")) |
| eval_every = int(os.environ.get("EVAL_EVERY", "25")) |
| eval_games = int(os.environ.get("EVAL_GAMES", "6")) |
| lr = float(os.environ.get("LR", "3e-4")) |
| gamma = 0.99 |
| clip_coef = 0.2 |
| ent_coef = 0.01 |
| vf_coef = 0.5 |
| epochs = 4 |
| pool_size = 3 |
| save_dir = Path(os.environ.get("SAVE_DIR", "/app/checkpoints")) |
| save_dir.mkdir(parents=True, exist_ok=True) |
| random.seed(42); np.random.seed(42); torch.manual_seed(42) |
| |
| controller = ParameterController().to(device) |
| optimizer = torch.optim.Adam(controller.parameters(), lr=lr) |
| |
| |
| opponent_pool = [None] |
| best_win_rate = 0.0 |
| seed_counter = 0 |
| |
| |
| from kaggle_environments.envs.orbit_wars.orbit_wars import random_agent |
| |
| |
| def get_opponent_ns(update_idx): |
| """Return opponent namespace and label based on training phase.""" |
| phase_fraction = update_idx / total_updates |
| |
| if phase_fraction < 0.2: |
| |
| return None, "random" |
| elif phase_fraction < 0.5: |
| |
| reset_params(_OPP_NS) |
| return _OPP_NS, "baseline" |
| else: |
| |
| opp_params = random.choice(opponent_pool) |
| reset_params(_OPP_NS) |
| if opp_params is not None: |
| apply_params(_OPP_NS, opp_params) |
| return _OPP_NS, "pool" |
| |
| print(f"\nTraining: {total_updates} updates × {episodes_per_update} episodes") |
| print(f"Phase 1 (0-20%): vs random | Phase 2 (20-50%): vs baseline | Phase 3 (50-100%): self-play") |
| print(f"Eval every {eval_every} updates, {eval_games} games\n") |
| |
| for update in range(total_updates): |
| t0 = time.time() |
| |
| |
| obs_batch = [] |
| reward_batch = [] |
| wins = 0 |
| total_steps = 0 |
| |
| for ep in range(episodes_per_update): |
| seed_counter += 1 |
| learner_slot = (update * episodes_per_update + ep) % 2 |
| |
| |
| opp_ns, opp_label = get_opponent_ns(update) |
| |
| |
| with torch.inference_mode(): |
| |
| |
| dummy_obs = np.zeros(INPUT_DIM, dtype=np.float32) |
| dummy_obs[0] = 0.0 |
| x = torch.from_numpy(dummy_obs).unsqueeze(0).to(device) |
| param_mean, log_std, value = controller(x) |
| |
| std = torch.exp(log_std) |
| dist = Normal(param_mean.squeeze(0), std) |
| action = dist.sample() |
| log_prob = dist.log_prob(action).sum().item() |
| value_np = value.item() |
| action_np = action.cpu().numpy() |
| |
| |
| params = decode_params(np.clip(action_np, -1, 1)) |
| reset_params(_BASE_NS) |
| apply_params(_BASE_NS, params) |
| |
| |
| if opp_ns is None: |
| |
| init_obs, final_obs, shaped_reward, raw_reward, steps = run_episode_vs_random( |
| _BASE_NS, seed=seed_counter * 37 + 1, learner_slot=learner_slot |
| ) |
| else: |
| init_obs, final_obs, shaped_reward, raw_reward, steps = run_episode( |
| _BASE_NS, opp_ns, seed=seed_counter * 37 + 1, learner_slot=learner_slot |
| ) |
| |
| obs_batch.append((init_obs, action_np, log_prob, value_np, shaped_reward)) |
| reward_batch.append(raw_reward) |
| if raw_reward > 0: |
| wins += 1 |
| total_steps += steps |
| |
| |
| if obs_batch: |
| obs_t = torch.tensor(np.stack([o[0] for o in obs_batch]), dtype=torch.float32, device=device) |
| actions_t = torch.tensor(np.stack([o[1] for o in obs_batch]), dtype=torch.float32, device=device) |
| old_log_probs_t = torch.tensor([o[2] for o in obs_batch], dtype=torch.float32, device=device) |
| old_values_t = torch.tensor([o[3] for o in obs_batch], dtype=torch.float32, device=device) |
| rewards_t = torch.tensor([o[4] for o in obs_batch], dtype=torch.float32, device=device) |
| |
| |
| returns_t = rewards_t |
| advantages_t = returns_t - old_values_t |
| if advantages_t.std() > 1e-6: |
| advantages_t = (advantages_t - advantages_t.mean()) / (advantages_t.std() + 1e-8) |
| |
| metrics = {"loss": 0, "pl": 0, "vl": 0, "ent": 0} |
| n_updates = 0 |
| |
| for _ in range(epochs): |
| param_mean, log_std, values = controller(obs_t) |
| std = torch.exp(log_std) |
| dist = Normal(param_mean, std) |
| new_log_probs = dist.log_prob(actions_t).sum(-1) |
| entropy = dist.entropy().sum(-1) |
| |
| ratio = (new_log_probs - old_log_probs_t).exp() |
| s1 = -advantages_t * ratio |
| s2 = -advantages_t * torch.clamp(ratio, 1 - clip_coef, 1 + clip_coef) |
| pl = torch.max(s1, s2).mean() |
| vl = 0.5 * (returns_t - values).pow(2).mean() |
| el = -entropy.mean() |
| |
| loss = pl + vf_coef * vl + ent_coef * el |
| optimizer.zero_grad() |
| loss.backward() |
| nn.utils.clip_grad_norm_(controller.parameters(), 0.5) |
| optimizer.step() |
| |
| metrics["loss"] += loss.item() |
| metrics["pl"] += pl.item() |
| metrics["vl"] += vl.item() |
| metrics["ent"] += entropy.mean().item() |
| n_updates += 1 |
| |
| metrics = {k: v / max(1, n_updates) for k, v in metrics.items()} |
| |
| elapsed = time.time() - t0 |
| win_rate = wins / episodes_per_update |
| avg_reward = np.mean(reward_batch) if reward_batch else 0 |
| |
| print(f"U{update+1:4d}/{total_updates} | " |
| f"WR: {win_rate:.0%} | R: {avg_reward:+.2f} | " |
| f"L: {metrics.get('loss',0):.4f} PL: {metrics.get('pl',0):.4f} " |
| f"VL: {metrics.get('vl',0):.4f} Ent: {metrics.get('ent',0):.3f} | " |
| f"Steps: {total_steps} | {elapsed:.1f}s | vs: {opp_label}") |
| |
| |
| if (update + 1) % eval_every == 0: |
| print(f"\n Evaluating vs baseline ({eval_games} games)...") |
| eval_wins = 0 |
| |
| |
| with torch.inference_mode(): |
| x = torch.zeros(1, INPUT_DIM, device=device) |
| pm, _, _ = controller(x) |
| eval_params = decode_params(pm.squeeze(0).cpu().numpy()) |
| |
| for g in range(eval_games): |
| slot = g % 2 |
| reset_params(_BASE_NS); apply_params(_BASE_NS, eval_params) |
| reset_params(_OPP_NS) |
| |
| _, _, _, raw_r, _ = run_episode(_BASE_NS, _OPP_NS, seed=10000 + g, learner_slot=slot) |
| if raw_r > 0: |
| eval_wins += 1 |
| print(f" Game {g+1}: {'WIN' if raw_r > 0 else 'LOSS'} (slot={slot})") |
| |
| wr = eval_wins / eval_games |
| print(f" Win rate: {wr:.0%} ({eval_wins}/{eval_games})") |
| |
| |
| if wr >= 0.45: |
| if len(opponent_pool) >= pool_size: |
| opponent_pool.pop(0) |
| opponent_pool.append(copy.deepcopy(eval_params)) |
| print(f" ✓ Added to pool (size={len(opponent_pool)})") |
| |
| if wr > best_win_rate: |
| best_win_rate = wr |
| torch.save({ |
| "controller": controller.state_dict(), |
| "params": eval_params, |
| "win_rate": wr, |
| "update": update + 1, |
| }, save_dir / "best_controller.pt") |
| print(f" ★ New best: {wr:.0%}") |
| print() |
| |
| |
| if (update + 1) % 100 == 0: |
| torch.save({ |
| "controller": controller.state_dict(), |
| "optimizer": optimizer.state_dict(), |
| "update": update + 1, |
| }, save_dir / f"ckpt_{update+1:05d}.pt") |
| |
| |
| torch.save({ |
| "controller": controller.state_dict(), |
| "best_win_rate": best_win_rate, |
| }, save_dir / "final_controller.pt") |
| |
| print(f"\nDone! Best win rate: {best_win_rate:.0%}") |
| print(f"Checkpoints: {save_dir}") |
| |
| |
| try: |
| from huggingface_hub import HfApi |
| api = HfApi(token=os.environ.get("HF_TOKEN")) |
| |
| |
| best_path = save_dir / "best_controller.pt" |
| if best_path.exists(): |
| api.upload_file( |
| path_or_fileobj=str(best_path), |
| path_in_repo="best_controller.pt", |
| repo_id="Builder-Neekhil/orbit-wars-agent", |
| commit_message=f"Upload trained controller (WR: {best_win_rate:.0%})" |
| ) |
| print(f"Uploaded best_controller.pt to HF Hub") |
| |
| |
| final_path = save_dir / "final_controller.pt" |
| if not best_path.exists(): |
| best_path = final_path |
| if best_path.exists(): |
| |
| gen_script = '/app/generate_submission.py' |
| if not os.path.exists(gen_script): |
| urllib.request.urlretrieve( |
| "https://huggingface.co/Builder-Neekhil/orbit-wars-agent/resolve/main/generate_submission.py", |
| gen_script |
| ) |
| sys.path.insert(0, '/app') |
| from generate_submission import generate_submission |
| generate_submission( |
| base_agent_path=_SUBMISSION_PATH, |
| checkpoint_path=str(best_path), |
| output_path="/app/submission_adaptive.py", |
| ) |
| api.upload_file( |
| path_or_fileobj="/app/submission_adaptive.py", |
| path_in_repo="submission_adaptive.py", |
| repo_id="Builder-Neekhil/orbit-wars-agent", |
| commit_message=f"Upload adaptive submission (WR: {best_win_rate:.0%})" |
| ) |
| print("Uploaded submission_adaptive.py to HF Hub") |
| except Exception as e: |
| print(f"Hub upload error: {e}") |
|
|
|
|
| if __name__ == "__main__": |
| train() |
|
|