| |
| """ |
| Generate a self-contained Kaggle submission from trained controller weights. |
| |
| Takes the base rule-based agent + trained ParameterController weights |
| and produces a single submission.py that includes: |
| 1. The full base agent code |
| 2. Embedded controller weights (base64-encoded) |
| 3. Lightweight inference: feature extraction → controller → parameter override → base agent |
| 4. In-match opponent profiling for real-time adaptation |
| """ |
|
|
| import base64 |
| import io |
| import json |
| import os |
| import sys |
| from pathlib import Path |
|
|
| import torch |
| import numpy as np |
|
|
|
|
| def generate_submission( |
| base_agent_path: str = "/app/submission.py", |
| checkpoint_path: str = "/app/checkpoints/best_controller.pt", |
| output_path: str = "/app/submission_adaptive.py", |
| ): |
| """Generate self-contained submission file.""" |
| |
| |
| base_code = Path(base_agent_path).read_text() |
| |
| |
| ckpt = torch.load(checkpoint_path, map_location="cpu", weights_only=False) |
| controller_state = ckpt["controller"] |
| |
| |
| buf = io.BytesIO() |
| torch.save(controller_state, buf) |
| weights_b64 = base64.b64encode(buf.getvalue()).decode('ascii') |
| |
| |
| submission = f'''#!/usr/bin/env python3 |
| """ |
| Orbit Wars — Adaptive Agent with Learned Parameter Controller |
| Generated by train_adaptive.py |
| |
| This agent combines: |
| 1. A strong rule-based core (1100+ ELO) |
| 2. A PPO-trained neural network that adjusts 20 key parameters in real-time |
| 3. In-match opponent profiling for style adaptation |
| """ |
| |
| import base64 |
| import io |
| import math |
| import time |
| from collections import defaultdict, namedtuple |
| from dataclasses import dataclass, field |
| from itertools import combinations |
| |
| # ============================================================ |
| # Embedded Controller Weights (base64) |
| # ============================================================ |
| _CONTROLLER_WEIGHTS_B64 = """{weights_b64}""" |
| |
| # ============================================================ |
| # Lightweight NN (pure Python + minimal torch for inference) |
| # ============================================================ |
| import numpy as np |
| |
| _TORCH_AVAILABLE = False |
| try: |
| import torch |
| import torch.nn as nn |
| _TORCH_AVAILABLE = True |
| except ImportError: |
| pass |
| |
| # Tunable parameter definitions |
| _TUNABLE_PARAMS = {{ |
| "HOSTILE_TARGET_VALUE_MULT": (2.05, 1.0, 3.0), |
| "ELIMINATION_BONUS": (55.0, 10.0, 100.0), |
| "PROACTIVE_DEFENSE_RATIO": (0.28, 0.05, 0.5), |
| "FINISHING_HOSTILE_VALUE_MULT": (1.3, 0.8, 2.0), |
| "WEAK_ENEMY_THRESHOLD": (110.0, 30.0, 200.0), |
| "ATTACK_COST_TURN_WEIGHT": (0.50, 0.2, 0.8), |
| "HOSTILE_MARGIN_BASE": (3.0, 1.0, 6.0), |
| "FOUR_PLAYER_TARGET_MARGIN": (2.0, 0.0, 5.0), |
| "FINISHING_HOSTILE_SEND_BONUS": (5.0, 1.0, 10.0), |
| "STATIC_HOSTILE_VALUE_MULT": (1.65, 1.0, 2.5), |
| "GANG_UP_VALUE_MULT": (1.4, 1.0, 2.0), |
| "EXPOSED_PLANET_VALUE_MULT": (2.0, 1.0, 3.0), |
| "REINFORCE_VALUE_MULT": (1.35, 0.8, 2.0), |
| "DEFENSE_SHIP_VALUE": (0.55, 0.2, 1.0), |
| "BEHIND_DOMINATION": (-0.20, -0.5, 0.0), |
| "AHEAD_DOMINATION": (0.15, 0.0, 0.4), |
| "LATE_REMAINING_TURNS": (70.0, 40.0, 100.0), |
| "REAR_SEND_RATIO_TWO_PLAYER": (0.62, 0.3, 0.9), |
| "COMET_VALUE_MULT": (0.65, 0.3, 1.2), |
| "SNIPE_VALUE_MULT": (1.12, 0.7, 1.6), |
| }} |
| _PARAM_NAMES = list(_TUNABLE_PARAMS.keys()) |
| _NUM_PARAMS = len(_PARAM_NAMES) |
| _INPUT_DIM = 38 # 33 features + 5 opponent profile |
| |
| |
| class _ParameterController(nn.Module): |
| def __init__(self, input_dim=_INPUT_DIM, hidden_size=128): |
| super().__init__() |
| self.shared = nn.Sequential( |
| nn.Linear(input_dim, hidden_size), |
| nn.ReLU(), |
| nn.Linear(hidden_size, hidden_size), |
| nn.ReLU(), |
| ) |
| self.param_mean = nn.Sequential( |
| nn.Linear(hidden_size, hidden_size // 2), |
| nn.ReLU(), |
| nn.Linear(hidden_size // 2, _NUM_PARAMS), |
| ) |
| self.param_log_std = nn.Parameter(torch.zeros(_NUM_PARAMS)) |
| self.value_head = nn.Sequential( |
| nn.Linear(hidden_size, hidden_size // 2), |
| nn.ReLU(), |
| nn.Linear(hidden_size // 2, 1), |
| ) |
| |
| def forward(self, x): |
| hidden = self.shared(x) |
| param_mean = torch.tanh(self.param_mean(hidden)) |
| value = self.value_head(hidden).squeeze(-1) |
| return param_mean, self.param_log_std, value |
| |
| |
| def _load_controller(): |
| if not _TORCH_AVAILABLE: |
| return None |
| try: |
| raw = base64.b64decode(_CONTROLLER_WEIGHTS_B64) |
| buf = io.BytesIO(raw) |
| state_dict = torch.load(buf, map_location="cpu", weights_only=False) |
| controller = _ParameterController() |
| controller.load_state_dict(state_dict) |
| controller.eval() |
| return controller |
| except Exception: |
| return None |
| |
| |
| def _extract_features(obs): |
| get = obs.get if isinstance(obs, dict) else lambda k, d=None: getattr(obs, k, d) |
| player = int(get("player", 0) or 0) |
| step = int(get("step", 0) or 0) |
| planets = get("planets") or [] |
| fleets = get("fleets") or [] |
| ang_vel = float(get("angular_velocity", 0.0) or 0.0) |
| comet_ids = set(get("comet_planet_ids") or []) |
| |
| my_p = my_s = my_pr = 0 |
| en_p = en_s = en_pr = 0 |
| ne_p = ne_s = 0 |
| my_st = my_ro = en_st = 0 |
| en_by = defaultdict(int) |
| |
| for p in planets: |
| pid, owner, x, y, radius, ships, prod = p |
| is_st = (math.hypot(x - 50, y - 50) + radius) >= 50.0 |
| if owner == player: |
| my_p += 1; my_s += ships; my_pr += prod |
| if is_st: my_st += 1 |
| else: my_ro += 1 |
| elif owner == -1: |
| ne_p += 1; ne_s += ships |
| else: |
| en_p += 1; en_s += ships; en_pr += prod; en_by[owner] += ships |
| if is_st: en_st += 1 |
| |
| my_fs = sum(f[6] for f in fleets if f[1] == player) |
| en_fs = sum(f[6] for f in fleets if f[1] != player) |
| my_fc = sum(1 for f in fleets if f[1] == player) |
| en_fc = sum(1 for f in fleets if f[1] != player) |
| |
| mt = my_s + my_fs; et = en_s + en_fs; ta = mt + et + ne_s |
| ne = len(en_by) |
| mx_e = max(en_by.values()) if en_by else 0 |
| mn_e = min(en_by.values()) if en_by else 0 |
| nc = sum(1 for p in planets if p[0] in comet_ids) |
| |
| return np.array([ |
| step/500, min(1,step/100), max(0,(500-step)/500), float(step>400), |
| min(1,my_p/15), min(1,en_p/15), min(1,ne_p/15), min(1,my_st/10), min(1,my_ro/10), |
| min(1,mt/max(1,ta)), min(1,et/max(1,ta)), |
| math.log1p(mt)/10, math.log1p(et)/10, math.log1p(my_fs)/10, math.log1p(en_fs)/10, |
| min(1,my_pr/max(1,my_pr+en_pr)), my_pr/30, en_pr/30, |
| np.clip((mt-et)/max(1,ta),-1,1), np.clip((my_p-en_p)/15,-1,1), np.clip((my_pr-en_pr)/15,-1,1), |
| min(1,ne/3), float(ne>=3), min(1,mx_e/max(1,et)), min(1,mn_e/max(1,mx_e+1)), min(1,en_fc/20), |
| min(1,my_fc/20), my_fs/max(1,mt), en_fs/max(1,et), |
| abs(ang_vel)*100, min(1,nc/5), min(1,len(planets)/30), ne_s/max(1,ta), |
| ], dtype=np.float32) |
| |
| |
| class _OpponentProfiler: |
| def __init__(self): |
| self.a = 0.1; self.agg = 0.5; self.exp = 0.5; self.trt = 0.5 |
| self.pp = 0; self.pf = 0; self.ps = 0; self.sc = 0 |
| |
| def update(self, obs): |
| get = obs.get if isinstance(obs, dict) else lambda k, d=None: getattr(obs, k, d) |
| player = int(get("player", 0) or 0) |
| planets = get("planets") or []; fleets = get("fleets") or [] |
| ep = sum(1 for p in planets if p[1] not in (-1, player)) |
| ef = sum(1 for f in fleets if f[1] != player) |
| es = sum(p[5] for p in planets if p[1] not in (-1, player)) |
| es += sum(f[6] for f in fleets if f[1] != player) |
| if self.sc > 0: |
| fd = max(0, ef - self.pf) |
| self.agg = (1-self.a)*self.agg + self.a*min(1, fd/5) |
| pd = ep - self.pp |
| self.exp = (1-self.a)*self.exp + self.a*np.clip(pd/3+0.5, 0, 1) |
| efs = sum(f[6] for f in fleets if f[1] != player) |
| t = 1 - min(1, efs/max(1, es)) if es > 0 else 0.5 |
| self.trt = (1-self.a)*self.trt + self.a*t |
| self.pp = ep; self.pf = ef; self.ps = es; self.sc += 1 |
| return np.array([self.agg, self.exp, self.trt, min(1,self.sc/100), float(self.sc>50)], dtype=np.float32) |
| |
| |
| def _decode_params(raw): |
| params = {{}} |
| for i, name in enumerate(_PARAM_NAMES): |
| base, low, high = _TUNABLE_PARAMS[name] |
| t = (float(raw[i]) + 1.0) / 2.0 |
| params[name] = low + t * (high - low) |
| return params |
| |
| |
| # ============================================================ |
| # Initialize controller at module load time |
| # ============================================================ |
| _controller = _load_controller() |
| _profiler = _OpponentProfiler() |
| _base_globals = {{}} |
| |
| |
| # ============================================================ |
| # === BASE AGENT CODE BELOW === |
| # ============================================================ |
| |
| ''' |
|
|
| |
| |
| |
| |
| agent_marker = "\ndef agent(obs, config=None):" |
| agent_idx = base_code.find(agent_marker) |
| |
| if agent_idx == -1: |
| |
| submission += base_code |
| else: |
| |
| pre_agent = base_code[:agent_idx] |
| |
| submission += pre_agent |
| submission += f''' |
| |
| # ============================================================ |
| # Adaptive Agent Entry Point |
| # ============================================================ |
| |
| _agent_step = 0 |
| |
| def _read(obs, key, default=None): |
| if isinstance(obs, dict): |
| return obs.get(key, default) |
| return getattr(obs, key, default) |
| |
| |
| def build_world(obs, inferred_step=None): |
| player = _read(obs, "player", 0) |
| obs_step = _read(obs, "step", 0) or 0 |
| step = max(obs_step, inferred_step or 0) |
| raw_planets = _read(obs, "planets", []) or [] |
| raw_fleets = _read(obs, "fleets", []) or [] |
| ang_vel = _read(obs, "angular_velocity", 0.0) or 0.0 |
| raw_init = _read(obs, "initial_planets", []) or [] |
| comets = _read(obs, "comets", []) or [] |
| comet_ids = set(_read(obs, "comet_planet_ids", []) or []) |
| |
| planets = [Planet(*planet) for planet in raw_planets] |
| fleets = [Fleet(*fleet) for fleet in raw_fleets] |
| initial_planets = [Planet(*planet) for planet in raw_init] |
| initial_by_id = {{planet.id: planet for planet in initial_planets}} |
| |
| return WorldModel( |
| player=player, step=step, planets=planets, fleets=fleets, |
| initial_by_id=initial_by_id, ang_vel=ang_vel, comets=comets, comet_ids=comet_ids, |
| ) |
| |
| |
| def agent(obs, config=None): |
| global _agent_step, _controller, _profiler |
| global HOSTILE_TARGET_VALUE_MULT, ELIMINATION_BONUS, PROACTIVE_DEFENSE_RATIO |
| global FINISHING_HOSTILE_VALUE_MULT, WEAK_ENEMY_THRESHOLD, ATTACK_COST_TURN_WEIGHT |
| global HOSTILE_MARGIN_BASE, FOUR_PLAYER_TARGET_MARGIN, FINISHING_HOSTILE_SEND_BONUS |
| global STATIC_HOSTILE_VALUE_MULT, GANG_UP_VALUE_MULT, EXPOSED_PLANET_VALUE_MULT |
| global REINFORCE_VALUE_MULT, DEFENSE_SHIP_VALUE, BEHIND_DOMINATION, AHEAD_DOMINATION |
| global LATE_REMAINING_TURNS, REAR_SEND_RATIO_TWO_PLAYER, COMET_VALUE_MULT, SNIPE_VALUE_MULT |
| |
| _agent_step += 1 |
| start_time = time.perf_counter() |
| |
| # Apply learned parameter adjustments if controller is available |
| if _controller is not None and _TORCH_AVAILABLE: |
| try: |
| features = _extract_features(obs) |
| profile = _profiler.update(obs) |
| combined = np.concatenate([features, profile]) |
| with torch.inference_mode(): |
| x = torch.from_numpy(combined).unsqueeze(0) |
| param_mean, _, _ = _controller(x) |
| raw = param_mean.squeeze(0).cpu().numpy() |
| params = _decode_params(raw) |
| |
| # Apply parameter overrides |
| HOSTILE_TARGET_VALUE_MULT = params["HOSTILE_TARGET_VALUE_MULT"] |
| ELIMINATION_BONUS = params["ELIMINATION_BONUS"] |
| PROACTIVE_DEFENSE_RATIO = params["PROACTIVE_DEFENSE_RATIO"] |
| FINISHING_HOSTILE_VALUE_MULT = params["FINISHING_HOSTILE_VALUE_MULT"] |
| WEAK_ENEMY_THRESHOLD = params["WEAK_ENEMY_THRESHOLD"] |
| ATTACK_COST_TURN_WEIGHT = params["ATTACK_COST_TURN_WEIGHT"] |
| HOSTILE_MARGIN_BASE = params["HOSTILE_MARGIN_BASE"] |
| FOUR_PLAYER_TARGET_MARGIN = params["FOUR_PLAYER_TARGET_MARGIN"] |
| FINISHING_HOSTILE_SEND_BONUS = params["FINISHING_HOSTILE_SEND_BONUS"] |
| STATIC_HOSTILE_VALUE_MULT = params["STATIC_HOSTILE_VALUE_MULT"] |
| GANG_UP_VALUE_MULT = params["GANG_UP_VALUE_MULT"] |
| EXPOSED_PLANET_VALUE_MULT = params["EXPOSED_PLANET_VALUE_MULT"] |
| REINFORCE_VALUE_MULT = params["REINFORCE_VALUE_MULT"] |
| DEFENSE_SHIP_VALUE = params["DEFENSE_SHIP_VALUE"] |
| BEHIND_DOMINATION = params["BEHIND_DOMINATION"] |
| AHEAD_DOMINATION = params["AHEAD_DOMINATION"] |
| LATE_REMAINING_TURNS = params["LATE_REMAINING_TURNS"] |
| REAR_SEND_RATIO_TWO_PLAYER = params["REAR_SEND_RATIO_TWO_PLAYER"] |
| COMET_VALUE_MULT = params["COMET_VALUE_MULT"] |
| SNIPE_VALUE_MULT = params["SNIPE_VALUE_MULT"] |
| except Exception: |
| pass # Fall through to base parameters if controller fails |
| |
| world = build_world(obs, inferred_step=_agent_step - 1) |
| if not world.my_planets: |
| return [] |
| act_timeout = _read(config, "actTimeout", 1.0) if config is not None else 1.0 |
| soft_budget = min(SOFT_ACT_DEADLINE, max(0.55, act_timeout * 0.82)) |
| deadline = start_time + soft_budget |
| return plan_moves(world, deadline=deadline) |
| |
| |
| __all__ = ["agent", "build_world"] |
| ''' |
|
|
| |
| Path(output_path).write_text(submission) |
| print(f"Generated adaptive submission: {output_path}") |
| print(f" Size: {len(submission):,} chars") |
| print(f" Controller weights: {len(weights_b64):,} chars (base64)") |
| |
|
|
| if __name__ == "__main__": |
| generate_submission() |
|
|