orbit-wars-agent / generate_submission.py
Builder-Neekhil's picture
Upload generate_submission.py
21c5a9a verified
#!/usr/bin/env python3
"""
Generate a self-contained Kaggle submission from trained controller weights.
Takes the base rule-based agent + trained ParameterController weights
and produces a single submission.py that includes:
1. The full base agent code
2. Embedded controller weights (base64-encoded)
3. Lightweight inference: feature extraction → controller → parameter override → base agent
4. In-match opponent profiling for real-time adaptation
"""
import base64
import io
import json
import os
import sys
from pathlib import Path
import torch
import numpy as np
def generate_submission(
base_agent_path: str = "/app/submission.py",
checkpoint_path: str = "/app/checkpoints/best_controller.pt",
output_path: str = "/app/submission_adaptive.py",
):
"""Generate self-contained submission file."""
# Load base agent
base_code = Path(base_agent_path).read_text()
# Load checkpoint
ckpt = torch.load(checkpoint_path, map_location="cpu", weights_only=False)
controller_state = ckpt["controller"]
# Serialize weights to base64
buf = io.BytesIO()
torch.save(controller_state, buf)
weights_b64 = base64.b64encode(buf.getvalue()).decode('ascii')
# Generate the submission
submission = f'''#!/usr/bin/env python3
"""
Orbit Wars — Adaptive Agent with Learned Parameter Controller
Generated by train_adaptive.py
This agent combines:
1. A strong rule-based core (1100+ ELO)
2. A PPO-trained neural network that adjusts 20 key parameters in real-time
3. In-match opponent profiling for style adaptation
"""
import base64
import io
import math
import time
from collections import defaultdict, namedtuple
from dataclasses import dataclass, field
from itertools import combinations
# ============================================================
# Embedded Controller Weights (base64)
# ============================================================
_CONTROLLER_WEIGHTS_B64 = """{weights_b64}"""
# ============================================================
# Lightweight NN (pure Python + minimal torch for inference)
# ============================================================
import numpy as np
_TORCH_AVAILABLE = False
try:
import torch
import torch.nn as nn
_TORCH_AVAILABLE = True
except ImportError:
pass
# Tunable parameter definitions
_TUNABLE_PARAMS = {{
"HOSTILE_TARGET_VALUE_MULT": (2.05, 1.0, 3.0),
"ELIMINATION_BONUS": (55.0, 10.0, 100.0),
"PROACTIVE_DEFENSE_RATIO": (0.28, 0.05, 0.5),
"FINISHING_HOSTILE_VALUE_MULT": (1.3, 0.8, 2.0),
"WEAK_ENEMY_THRESHOLD": (110.0, 30.0, 200.0),
"ATTACK_COST_TURN_WEIGHT": (0.50, 0.2, 0.8),
"HOSTILE_MARGIN_BASE": (3.0, 1.0, 6.0),
"FOUR_PLAYER_TARGET_MARGIN": (2.0, 0.0, 5.0),
"FINISHING_HOSTILE_SEND_BONUS": (5.0, 1.0, 10.0),
"STATIC_HOSTILE_VALUE_MULT": (1.65, 1.0, 2.5),
"GANG_UP_VALUE_MULT": (1.4, 1.0, 2.0),
"EXPOSED_PLANET_VALUE_MULT": (2.0, 1.0, 3.0),
"REINFORCE_VALUE_MULT": (1.35, 0.8, 2.0),
"DEFENSE_SHIP_VALUE": (0.55, 0.2, 1.0),
"BEHIND_DOMINATION": (-0.20, -0.5, 0.0),
"AHEAD_DOMINATION": (0.15, 0.0, 0.4),
"LATE_REMAINING_TURNS": (70.0, 40.0, 100.0),
"REAR_SEND_RATIO_TWO_PLAYER": (0.62, 0.3, 0.9),
"COMET_VALUE_MULT": (0.65, 0.3, 1.2),
"SNIPE_VALUE_MULT": (1.12, 0.7, 1.6),
}}
_PARAM_NAMES = list(_TUNABLE_PARAMS.keys())
_NUM_PARAMS = len(_PARAM_NAMES)
_INPUT_DIM = 38 # 33 features + 5 opponent profile
class _ParameterController(nn.Module):
def __init__(self, input_dim=_INPUT_DIM, hidden_size=128):
super().__init__()
self.shared = nn.Sequential(
nn.Linear(input_dim, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
)
self.param_mean = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.ReLU(),
nn.Linear(hidden_size // 2, _NUM_PARAMS),
)
self.param_log_std = nn.Parameter(torch.zeros(_NUM_PARAMS))
self.value_head = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.ReLU(),
nn.Linear(hidden_size // 2, 1),
)
def forward(self, x):
hidden = self.shared(x)
param_mean = torch.tanh(self.param_mean(hidden))
value = self.value_head(hidden).squeeze(-1)
return param_mean, self.param_log_std, value
def _load_controller():
if not _TORCH_AVAILABLE:
return None
try:
raw = base64.b64decode(_CONTROLLER_WEIGHTS_B64)
buf = io.BytesIO(raw)
state_dict = torch.load(buf, map_location="cpu", weights_only=False)
controller = _ParameterController()
controller.load_state_dict(state_dict)
controller.eval()
return controller
except Exception:
return None
def _extract_features(obs):
get = obs.get if isinstance(obs, dict) else lambda k, d=None: getattr(obs, k, d)
player = int(get("player", 0) or 0)
step = int(get("step", 0) or 0)
planets = get("planets") or []
fleets = get("fleets") or []
ang_vel = float(get("angular_velocity", 0.0) or 0.0)
comet_ids = set(get("comet_planet_ids") or [])
my_p = my_s = my_pr = 0
en_p = en_s = en_pr = 0
ne_p = ne_s = 0
my_st = my_ro = en_st = 0
en_by = defaultdict(int)
for p in planets:
pid, owner, x, y, radius, ships, prod = p
is_st = (math.hypot(x - 50, y - 50) + radius) >= 50.0
if owner == player:
my_p += 1; my_s += ships; my_pr += prod
if is_st: my_st += 1
else: my_ro += 1
elif owner == -1:
ne_p += 1; ne_s += ships
else:
en_p += 1; en_s += ships; en_pr += prod; en_by[owner] += ships
if is_st: en_st += 1
my_fs = sum(f[6] for f in fleets if f[1] == player)
en_fs = sum(f[6] for f in fleets if f[1] != player)
my_fc = sum(1 for f in fleets if f[1] == player)
en_fc = sum(1 for f in fleets if f[1] != player)
mt = my_s + my_fs; et = en_s + en_fs; ta = mt + et + ne_s
ne = len(en_by)
mx_e = max(en_by.values()) if en_by else 0
mn_e = min(en_by.values()) if en_by else 0
nc = sum(1 for p in planets if p[0] in comet_ids)
return np.array([
step/500, min(1,step/100), max(0,(500-step)/500), float(step>400),
min(1,my_p/15), min(1,en_p/15), min(1,ne_p/15), min(1,my_st/10), min(1,my_ro/10),
min(1,mt/max(1,ta)), min(1,et/max(1,ta)),
math.log1p(mt)/10, math.log1p(et)/10, math.log1p(my_fs)/10, math.log1p(en_fs)/10,
min(1,my_pr/max(1,my_pr+en_pr)), my_pr/30, en_pr/30,
np.clip((mt-et)/max(1,ta),-1,1), np.clip((my_p-en_p)/15,-1,1), np.clip((my_pr-en_pr)/15,-1,1),
min(1,ne/3), float(ne>=3), min(1,mx_e/max(1,et)), min(1,mn_e/max(1,mx_e+1)), min(1,en_fc/20),
min(1,my_fc/20), my_fs/max(1,mt), en_fs/max(1,et),
abs(ang_vel)*100, min(1,nc/5), min(1,len(planets)/30), ne_s/max(1,ta),
], dtype=np.float32)
class _OpponentProfiler:
def __init__(self):
self.a = 0.1; self.agg = 0.5; self.exp = 0.5; self.trt = 0.5
self.pp = 0; self.pf = 0; self.ps = 0; self.sc = 0
def update(self, obs):
get = obs.get if isinstance(obs, dict) else lambda k, d=None: getattr(obs, k, d)
player = int(get("player", 0) or 0)
planets = get("planets") or []; fleets = get("fleets") or []
ep = sum(1 for p in planets if p[1] not in (-1, player))
ef = sum(1 for f in fleets if f[1] != player)
es = sum(p[5] for p in planets if p[1] not in (-1, player))
es += sum(f[6] for f in fleets if f[1] != player)
if self.sc > 0:
fd = max(0, ef - self.pf)
self.agg = (1-self.a)*self.agg + self.a*min(1, fd/5)
pd = ep - self.pp
self.exp = (1-self.a)*self.exp + self.a*np.clip(pd/3+0.5, 0, 1)
efs = sum(f[6] for f in fleets if f[1] != player)
t = 1 - min(1, efs/max(1, es)) if es > 0 else 0.5
self.trt = (1-self.a)*self.trt + self.a*t
self.pp = ep; self.pf = ef; self.ps = es; self.sc += 1
return np.array([self.agg, self.exp, self.trt, min(1,self.sc/100), float(self.sc>50)], dtype=np.float32)
def _decode_params(raw):
params = {{}}
for i, name in enumerate(_PARAM_NAMES):
base, low, high = _TUNABLE_PARAMS[name]
t = (float(raw[i]) + 1.0) / 2.0
params[name] = low + t * (high - low)
return params
# ============================================================
# Initialize controller at module load time
# ============================================================
_controller = _load_controller()
_profiler = _OpponentProfiler()
_base_globals = {{}}
# ============================================================
# === BASE AGENT CODE BELOW ===
# ============================================================
'''
# Now embed the full base agent code, but modify the agent function
# We need to strip the original agent() function and replace it with our adaptive wrapper
# Find the agent function boundary
agent_marker = "\ndef agent(obs, config=None):"
agent_idx = base_code.find(agent_marker)
if agent_idx == -1:
# Fallback: just append everything
submission += base_code
else:
# Include everything up to agent()
pre_agent = base_code[:agent_idx]
submission += pre_agent
submission += f'''
# ============================================================
# Adaptive Agent Entry Point
# ============================================================
_agent_step = 0
def _read(obs, key, default=None):
if isinstance(obs, dict):
return obs.get(key, default)
return getattr(obs, key, default)
def build_world(obs, inferred_step=None):
player = _read(obs, "player", 0)
obs_step = _read(obs, "step", 0) or 0
step = max(obs_step, inferred_step or 0)
raw_planets = _read(obs, "planets", []) or []
raw_fleets = _read(obs, "fleets", []) or []
ang_vel = _read(obs, "angular_velocity", 0.0) or 0.0
raw_init = _read(obs, "initial_planets", []) or []
comets = _read(obs, "comets", []) or []
comet_ids = set(_read(obs, "comet_planet_ids", []) or [])
planets = [Planet(*planet) for planet in raw_planets]
fleets = [Fleet(*fleet) for fleet in raw_fleets]
initial_planets = [Planet(*planet) for planet in raw_init]
initial_by_id = {{planet.id: planet for planet in initial_planets}}
return WorldModel(
player=player, step=step, planets=planets, fleets=fleets,
initial_by_id=initial_by_id, ang_vel=ang_vel, comets=comets, comet_ids=comet_ids,
)
def agent(obs, config=None):
global _agent_step, _controller, _profiler
global HOSTILE_TARGET_VALUE_MULT, ELIMINATION_BONUS, PROACTIVE_DEFENSE_RATIO
global FINISHING_HOSTILE_VALUE_MULT, WEAK_ENEMY_THRESHOLD, ATTACK_COST_TURN_WEIGHT
global HOSTILE_MARGIN_BASE, FOUR_PLAYER_TARGET_MARGIN, FINISHING_HOSTILE_SEND_BONUS
global STATIC_HOSTILE_VALUE_MULT, GANG_UP_VALUE_MULT, EXPOSED_PLANET_VALUE_MULT
global REINFORCE_VALUE_MULT, DEFENSE_SHIP_VALUE, BEHIND_DOMINATION, AHEAD_DOMINATION
global LATE_REMAINING_TURNS, REAR_SEND_RATIO_TWO_PLAYER, COMET_VALUE_MULT, SNIPE_VALUE_MULT
_agent_step += 1
start_time = time.perf_counter()
# Apply learned parameter adjustments if controller is available
if _controller is not None and _TORCH_AVAILABLE:
try:
features = _extract_features(obs)
profile = _profiler.update(obs)
combined = np.concatenate([features, profile])
with torch.inference_mode():
x = torch.from_numpy(combined).unsqueeze(0)
param_mean, _, _ = _controller(x)
raw = param_mean.squeeze(0).cpu().numpy()
params = _decode_params(raw)
# Apply parameter overrides
HOSTILE_TARGET_VALUE_MULT = params["HOSTILE_TARGET_VALUE_MULT"]
ELIMINATION_BONUS = params["ELIMINATION_BONUS"]
PROACTIVE_DEFENSE_RATIO = params["PROACTIVE_DEFENSE_RATIO"]
FINISHING_HOSTILE_VALUE_MULT = params["FINISHING_HOSTILE_VALUE_MULT"]
WEAK_ENEMY_THRESHOLD = params["WEAK_ENEMY_THRESHOLD"]
ATTACK_COST_TURN_WEIGHT = params["ATTACK_COST_TURN_WEIGHT"]
HOSTILE_MARGIN_BASE = params["HOSTILE_MARGIN_BASE"]
FOUR_PLAYER_TARGET_MARGIN = params["FOUR_PLAYER_TARGET_MARGIN"]
FINISHING_HOSTILE_SEND_BONUS = params["FINISHING_HOSTILE_SEND_BONUS"]
STATIC_HOSTILE_VALUE_MULT = params["STATIC_HOSTILE_VALUE_MULT"]
GANG_UP_VALUE_MULT = params["GANG_UP_VALUE_MULT"]
EXPOSED_PLANET_VALUE_MULT = params["EXPOSED_PLANET_VALUE_MULT"]
REINFORCE_VALUE_MULT = params["REINFORCE_VALUE_MULT"]
DEFENSE_SHIP_VALUE = params["DEFENSE_SHIP_VALUE"]
BEHIND_DOMINATION = params["BEHIND_DOMINATION"]
AHEAD_DOMINATION = params["AHEAD_DOMINATION"]
LATE_REMAINING_TURNS = params["LATE_REMAINING_TURNS"]
REAR_SEND_RATIO_TWO_PLAYER = params["REAR_SEND_RATIO_TWO_PLAYER"]
COMET_VALUE_MULT = params["COMET_VALUE_MULT"]
SNIPE_VALUE_MULT = params["SNIPE_VALUE_MULT"]
except Exception:
pass # Fall through to base parameters if controller fails
world = build_world(obs, inferred_step=_agent_step - 1)
if not world.my_planets:
return []
act_timeout = _read(config, "actTimeout", 1.0) if config is not None else 1.0
soft_budget = min(SOFT_ACT_DEADLINE, max(0.55, act_timeout * 0.82))
deadline = start_time + soft_budget
return plan_moves(world, deadline=deadline)
__all__ = ["agent", "build_world"]
'''
# Write output
Path(output_path).write_text(submission)
print(f"Generated adaptive submission: {output_path}")
print(f" Size: {len(submission):,} chars")
print(f" Controller weights: {len(weights_b64):,} chars (base64)")
if __name__ == "__main__":
generate_submission()