#!/usr/bin/env python3 """ Full training pipeline: Phase 1 -> Phase 2 -> Phase 3 TIL-26-AE Bomberman Agent Training References: - Pommerman multi-agent RL: arxiv:2407.00662 - MAPPO best practices: arxiv:2103.01955 - Invalid Action Masking: arxiv:2006.14171 """ import os import sys import subprocess # Bootstrap: download and set up the TIL environment if not present repo_path = "/app/til-26-ae-repo/til-26-ae" if not os.path.exists(repo_path): try: from huggingface_hub import snapshot_download snapshot_download( repo_id='e-rong/til-26-ae', repo_type='space', local_dir='/app/til-26-ae-repo', local_dir_use_symlinks=False ) except Exception: subprocess.run( ["git", "clone", "https://huggingface.co/spaces/e-rong/til-26-ae", "/app/til-26-ae-repo"], capture_output=True, check=False ) if os.path.exists(repo_path): sys.path.insert(0, repo_path) elif os.path.exists("/app/til-26-ae-repo"): sys.path.insert(0, "/app/til-26-ae-repo") import numpy as np import gymnasium as gym from gymnasium.spaces import Box, Discrete import torch from til_environment.bomberman_env import Bomberman from til_environment.config import default_config from pettingzoo.utils.conversions import aec_to_parallel from sb3_contrib import MaskablePPO from sb3_contrib.common.wrappers import ActionMasker from stable_baselines3.common.callbacks import BaseCallback, CheckpointCallback from stable_baselines3.common.monitor import Monitor import trackio # ============================================================================ # PHASE 1: Base environment wrapper # ============================================================================ class BombermanSingleAgentEnv(gym.Env): """ Wraps parallel PettingZoo Bomberman into a single-agent gymnasium env. Agent 0 is the learning agent; opponents use random valid actions. """ def __init__(self, cfg=None, seed=None, opponent_policy="random"): super().__init__() self.cfg = cfg or default_config() self.cfg.env.render_mode = None raw = Bomberman(self.cfg) self._parallel_env = aec_to_parallel(raw) self.agent_id = "agent_0" self.opponent_policy = opponent_policy self._episode_seed = seed self._episode_count = 0 self.action_space = Discrete(6) self._last_action_mask = None self._obs_size = None self._last_obs_dict = None self._compute_obs_space() def _compute_obs_space(self): cfg = self.cfg viewcone_l = int(cfg.dynamics.vision.behind) + int(cfg.dynamics.vision.ahead) + 1 viewcone_w = int(cfg.dynamics.vision.left) + int(cfg.dynamics.vision.right) + 1 agent_viewcone_size = viewcone_l * viewcone_w * 25 base_r = int(cfg.entities.base.vision_radius) base_side = 2 * base_r + 1 base_viewcone_size = base_side * base_side * 25 scalar_size = 11 self._obs_size = agent_viewcone_size + base_viewcone_size + scalar_size self.observation_space = Box( low=-np.inf, high=np.inf, shape=(self._obs_size,), dtype=np.float32, ) def _get_agents(self): """Get list of currently active agents from obs_dict.""" if self._last_obs_dict is not None: return list(self._last_obs_dict.keys()) return self._parallel_env.possible_agents def reset(self, seed=None, options=None): if seed is not None: self._episode_seed = seed else: self._episode_seed = self._episode_count self._episode_count += 1 obs_dict, info_dict = self._parallel_env.reset(seed=self._episode_seed, options=options) self._last_obs_dict = obs_dict self._store_action_mask(obs_dict[self.agent_id]) return self._flatten_obs(obs_dict[self.agent_id]), {} def step(self, action): actions = {} for agent_id in self._get_agents(): if agent_id == self.agent_id: actions[agent_id] = action else: mask = ( self._last_obs_dict[agent_id].get("action_mask") if self._last_obs_dict and agent_id in self._last_obs_dict else np.ones(6, dtype=np.int8) ) valid = np.where(mask == 1)[0] actions[agent_id] = int(np.random.choice(valid)) if len(valid) > 0 else 0 obs_dict, rewards, terminations, truncations, infos = self._parallel_env.step(actions) self._last_obs_dict = obs_dict if self.agent_id not in obs_dict: return np.zeros(self._obs_size, dtype=np.float32), 0.0, True, False, {} self._store_action_mask(obs_dict[self.agent_id]) obs = self._flatten_obs(obs_dict[self.agent_id]) reward = float(rewards.get(self.agent_id, 0.0)) done = terminations.get(self.agent_id, False) or truncations.get(self.agent_id, False) return obs, reward, done, False, infos.get(self.agent_id, {}) def _store_action_mask(self, obs_dict): if "action_mask" in obs_dict: self._last_action_mask = obs_dict["action_mask"].copy().astype(bool) else: self._last_action_mask = np.ones(6, dtype=bool) def action_masks(self): return self._last_action_mask def _flatten_obs(self, obs_dict): return np.concatenate( [ obs_dict["agent_viewcone"].flatten(), obs_dict["base_viewcone"].flatten(), np.array([obs_dict["direction"]], dtype=np.float32), obs_dict["location"].flatten().astype(np.float32), obs_dict["base_location"].flatten().astype(np.float32), obs_dict["health"].flatten().astype(np.float32), np.array([obs_dict["frozen_ticks"]], dtype=np.float32), obs_dict["base_health"].flatten().astype(np.float32), obs_dict["team_resources"].flatten().astype(np.float32), np.array([obs_dict["team_bombs"]], dtype=np.float32), np.array([obs_dict["step"]], dtype=np.float32), ], dtype=np.float32, ) def close(self): self._parallel_env.close() # ============================================================================ # PHASE 2: Exploration reward shaping # ============================================================================ class RewardShapingWrapper(gym.Wrapper): """ Adds visit-count exploration bonus with adaptive annealing. alpha = 1 - tanh(k * avg_enemy_deaths) gradually reduces exploration weight. """ def __init__(self, env, adaptive_k=1.2, base_explore_weight=0.5): super().__init__(env) self.adaptive_k = adaptive_k self.base_explore_weight = base_explore_weight self._visit_counts = None self._grid_size = 16 self._avg_enemy_deaths = 0.0 self._episode_count = 0 self._episode_enemy_deaths = 0 self._explore_weight = base_explore_weight def reset(self, **kwargs): self._visit_counts = np.zeros((self._grid_size, self._grid_size), dtype=np.int32) self._episode_enemy_deaths = 0 return self.env.reset(**kwargs) def step(self, action): obs, reward, done, truncated, info = self.env.step(action) pos = info.get("location", None) visit_bonus = 0.0 if pos is not None: x, y = int(pos[0]), int(pos[1]) if 0 <= x < self._grid_size and 0 <= y < self._grid_size: visits = self._visit_counts[x, y] visit_bonus = 1.0 / (1.0 + visits) self._visit_counts[x, y] += 1 if done: self._episode_count += 1 alpha = 1.0 - np.tanh(self.adaptive_k * self._avg_enemy_deaths) self._explore_weight = self.base_explore_weight * max(0.1, alpha) self._avg_enemy_deaths = 0.95 * self._avg_enemy_deaths + 0.05 * self._episode_enemy_deaths shaped_reward = reward + self._explore_weight * visit_bonus info["raw_reward"] = reward info["explore_bonus"] = visit_bonus info["explore_weight"] = self._explore_weight return obs, shaped_reward, done, truncated, info def action_masks(self): return self.env.action_masks() # ============================================================================ # PHASE 3: Rule-based opponents + curriculum # ============================================================================ class RuleBasedOpponent: """Rule-based Bomberman opponent with three difficulty levels.""" def __init__(self, team_id=1, difficulty="simple"): self.team_id = team_id self.difficulty = difficulty self.visited = None self.grid_size = 16 def reset(self): self.visited = np.zeros((self.grid_size, self.grid_size), dtype=np.int32) def act(self, obs_dict): action_mask = obs_dict["action_mask"] valid_actions = np.where(action_mask == 1)[0] if len(valid_actions) == 0: return 4 # STAY if self.difficulty == "static": return 4 elif self.difficulty == "simple": viewcone = obs_dict["agent_viewcone"] has_enemy = np.any(viewcone[..., 10] > 0) has_enemy_base = np.any(viewcone[..., 12] > 0) if (has_enemy or has_enemy_base) and 5 in valid_actions: return 5 movement_actions = [a for a in valid_actions if a < 4] if len(movement_actions) > 0: return int(np.random.choice(movement_actions)) return 4 elif self.difficulty == "smart": return self._smart_policy(obs_dict, valid_actions) return 4 def _smart_policy(self, obs, valid_actions): viewcone = obs["agent_viewcone"] h, w, _ = viewcone.shape collectibles = np.stack([ viewcone[..., 7], viewcone[..., 8], viewcone[..., 6], ], axis=-1) has_collectible = np.any(collectibles > 0, axis=-1) cx, cy = 3, 2 best_action = 4 best_score = -1 for action in valid_actions: if action == 4 or action == 5: continue if action == 0: nx, ny = cx - 1, cy elif action == 1: nx, ny = cx + 1, cy elif action == 2: nx, ny = cx, cy - 1 elif action == 3: nx, ny = cx, cy + 1 else: continue if 0 <= nx < h and 0 <= ny < w: score = 0 if has_collectible[nx, ny]: score += 10.0 if viewcone[nx, ny, 0] < 1: score -= 5.0 wall_score = ( viewcone[nx, ny, 1] + viewcone[nx, ny, 2] + viewcone[nx, ny, 3] + viewcone[nx, ny, 4] ) score -= wall_score * 2.0 if score > best_score: best_score = score best_action = action for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1), (0, 0)]: nx, ny = cx + dx, cy + dy if 0 <= nx < h and 0 <= ny < w: if viewcone[nx, ny, 10] > 0 or viewcone[nx, ny, 12] > 0: if 5 in valid_actions and np.random.random() < 0.7: return 5 break return int(best_action) if best_score > -1 else 4 class CurriculumEnv(gym.Env): """Single-agent env with curriculum-based opponent difficulty.""" CURRICULUM_STAGES = ["static", "simple", "smart", "mixed"] WIN_RATE_THRESHOLD = 0.55 EPISODES_PER_STAGE = 500 def __init__(self, cfg=None, seed=None): super().__init__() self.cfg = cfg or default_config() self.cfg.env.render_mode = None raw = Bomberman(self.cfg) self._parallel_env = aec_to_parallel(raw) self.agent_id = "agent_0" self._episode_seed = seed self._episode_count = 0 self.action_space = Discrete(6) self._last_action_mask = None self._obs_size = None self._last_obs_dict = None self._compute_obs_space() self.stage_idx = 0 self.stage_episodes = 0 self.stage_wins = 0 self.stage_rewards = [] self.opponents = {} self._init_opponents() def _compute_obs_space(self): cfg = self.cfg viewcone_l = int(cfg.dynamics.vision.behind) + int(cfg.dynamics.vision.ahead) + 1 viewcone_w = int(cfg.dynamics.vision.left) + int(cfg.dynamics.vision.right) + 1 agent_viewcone_size = viewcone_l * viewcone_w * 25 base_r = int(cfg.entities.base.vision_radius) base_side = 2 * base_r + 1 base_viewcone_size = base_side * base_side * 25 scalar_size = 11 self._obs_size = agent_viewcone_size + base_viewcone_size + scalar_size self.observation_space = Box( low=-np.inf, high=np.inf, shape=(self._obs_size,), dtype=np.float32, ) def _get_agents(self): if self._last_obs_dict is not None: return list(self._last_obs_dict.keys()) return self._parallel_env.possible_agents def _init_opponents(self): for i in range(1, self.cfg.env.num_teams): opp_id = f"agent_{i}" self.opponents[opp_id] = RuleBasedOpponent(team_id=i, difficulty="static") def _update_opponent_difficulty(self): stage = self.CURRICULUM_STAGES[self.stage_idx] for opp_id, opp in self.opponents.items(): if stage == "mixed": opp.difficulty = "smart" if (int(opp_id.split("_")[1]) % 2 == 0) else "simple" else: opp.difficulty = stage def _check_stage_advance(self): if self.stage_idx >= len(self.CURRICULUM_STAGES) - 1: return False if len(self.stage_rewards) >= self.EPISODES_PER_STAGE: win_rate = self.stage_wins / max(1, len(self.stage_rewards)) avg_reward = np.mean(self.stage_rewards) if win_rate >= self.WIN_RATE_THRESHOLD or len(self.stage_rewards) >= self.EPISODES_PER_STAGE: trackio.alert( "Curriculum Advance", f"Stage {self.CURRICULUM_STAGES[self.stage_idx]} complete: " f"win_rate={win_rate:.2%}, avg_reward={avg_reward:.1f}. " f"Advancing to {self.CURRICULUM_STAGES[self.stage_idx + 1]}", trackio.AlertLevel.INFO, ) self.stage_idx += 1 self.stage_episodes = 0 self.stage_wins = 0 self.stage_rewards = [] self._update_opponent_difficulty() return True return False def reset(self, seed=None, options=None): if seed is not None: self._episode_seed = seed else: self._episode_seed = self._episode_count self._episode_count += 1 for opp in self.opponents.values(): opp.reset() obs_dict, info_dict = self._parallel_env.reset( seed=self._episode_seed, options=options ) self._last_obs_dict = obs_dict self._store_action_mask(obs_dict[self.agent_id]) return self._flatten_obs(obs_dict[self.agent_id]), {} def step(self, action): actions = {} for agent_id in self._get_agents(): if agent_id == self.agent_id: actions[agent_id] = action else: opp = self.opponents.get(agent_id) if opp is not None and agent_id in self._last_obs_dict: actions[agent_id] = opp.act(self._last_obs_dict[agent_id]) else: actions[agent_id] = 4 obs_dict, rewards, terminations, truncations, infos = self._parallel_env.step(actions) self._last_obs_dict = obs_dict if self.agent_id not in obs_dict: self.stage_episodes += 1 return np.zeros(self._obs_size, dtype=np.float32), 0.0, True, False, {} self._store_action_mask(obs_dict[self.agent_id]) obs = self._flatten_obs(obs_dict[self.agent_id]) reward = float(rewards.get(self.agent_id, 0.0)) done = terminations.get(self.agent_id, False) or truncations.get(self.agent_id, False) if done: self.stage_episodes += 1 self.stage_rewards.append(reward) if reward > 10.0: self.stage_wins += 1 self._check_stage_advance() info = dict(infos.get(self.agent_id, {})) info["curriculum_stage"] = self.stage_idx info["curriculum_stage_name"] = self.CURRICULUM_STAGES[self.stage_idx] return obs, reward, done, False, info def _store_action_mask(self, obs_dict): if "action_mask" in obs_dict: self._last_action_mask = obs_dict["action_mask"].copy().astype(bool) else: self._last_action_mask = np.ones(6, dtype=bool) def action_masks(self): return self._last_action_mask def _flatten_obs(self, obs_dict): return np.concatenate( [ obs_dict["agent_viewcone"].flatten(), obs_dict["base_viewcone"].flatten(), np.array([obs_dict["direction"]], dtype=np.float32), obs_dict["location"].flatten().astype(np.float32), obs_dict["base_location"].flatten().astype(np.float32), obs_dict["health"].flatten().astype(np.float32), np.array([obs_dict["frozen_ticks"]], dtype=np.float32), obs_dict["base_health"].flatten().astype(np.float32), obs_dict["team_resources"].flatten().astype(np.float32), np.array([obs_dict["team_bombs"]], dtype=np.float32), np.array([obs_dict["step"]], dtype=np.float32), ], dtype=np.float32, ) def close(self): self._parallel_env.close() # ============================================================================ # Trackio logging callback # ============================================================================ class TrackioLoggingCallback(BaseCallback): def __init__(self, project, run_name, log_interval=2048, verbose=0): super().__init__(verbose) self.project = project self.run_name = run_name self.log_interval = log_interval self._last_mean_reward = 0.0 def _on_training_start(self): trackio.init(project=self.project, name=self.run_name) trackio.alert("Training Started", f"{self.run_name} training began.", trackio.AlertLevel.INFO) def _on_step(self): if self.n_calls % self.log_interval == 0: infos = self.locals.get("infos", [{}]) ep_rewards = [info.get("episode", {}).get("r", 0) for info in infos if "episode" in info] ep_lengths = [info.get("episode", {}).get("l", 0) for info in infos if "episode" in info] explore_bonuses = [info.get("explore_bonus", 0) for info in infos] stages = [info.get("curriculum_stage", 0) for info in infos] if ep_rewards: mean_r = float(np.mean(ep_rewards)) self._last_mean_reward = mean_r log_dict = { "train/mean_episode_reward": mean_r, "train/mean_episode_length": float(np.mean(ep_lengths)) if ep_lengths else 0.0, "train/timesteps": self.num_timesteps, } if explore_bonuses: log_dict["train/mean_explore_bonus"] = float(np.mean(explore_bonuses)) if stages: log_dict["train/curriculum_stage"] = float(np.mean(stages)) trackio.log(log_dict) if mean_r < -5.0 and self.num_timesteps > 50_000: trackio.alert("Low Reward Warning", f"mean_reward={mean_r:.2f} at step {self.num_timesteps} -- may be camping.", trackio.AlertLevel.WARN) return True def _on_training_end(self): trackio.alert("Training Complete", f"Finished at {self.num_timesteps}. Final mean reward: {self._last_mean_reward:.2f}", trackio.AlertLevel.INFO) trackio.finish() # ============================================================================ # Main training pipeline # ============================================================================ def train_phase(cfg, phase, total_timesteps, model=None): trackio_project = os.environ.get("TRACKIO_PROJECT", "til-26-ae") if phase == 1: print("=== PHASE 1: MaskablePPO vs Random Opponents ===") base_env = BombermanSingleAgentEnv(cfg=cfg, opponent_policy="random") env = ActionMasker(base_env, lambda env: env.action_masks()) env = Monitor(env) run_name = "phase1-maskable-ppo-random" elif phase == 2: print("=== PHASE 2: Adaptive Exploration Annealing ===") base_env = BombermanSingleAgentEnv(cfg=cfg, opponent_policy="random") shaped_env = RewardShapingWrapper(base_env, adaptive_k=1.2, base_explore_weight=0.5) env = ActionMasker(shaped_env, lambda env: env.action_masks()) env = Monitor(env) run_name = "phase2-adaptive-explore" elif phase == 3: print("=== PHASE 3: Curriculum + Rule-Based Self-Play ===") cfg.env.num_teams = 3 base_env = CurriculumEnv(cfg=cfg) env = ActionMasker(base_env, lambda env: env.action_masks()) env = Monitor(env) run_name = "phase3-curriculum-selfplay" else: raise ValueError(f"Unknown phase: {phase}") if model is None: model = MaskablePPO( "MlpPolicy", env, learning_rate=3e-4, n_steps=2048, batch_size=64, n_epochs=10, gamma=0.99, gae_lambda=0.95, clip_range=0.2, ent_coef=0.01, vf_coef=0.5, max_grad_norm=0.5, verbose=1, device="cuda" if torch.cuda.is_available() else "cpu", ) else: model.set_env(env) checkpoint_callback = CheckpointCallback( save_freq=50_000, save_path=f"./checkpoints/phase{phase}", name_prefix=f"bomberman_phase{phase}", ) trackio_callback = TrackioLoggingCallback( trackio_project, run_name, log_interval=2048, ) model.learn( total_timesteps=total_timesteps, callback=[checkpoint_callback, trackio_callback], progress_bar=False, ) model.save(f"bomberman_phase{phase}_final") env.close() print(f"Phase {phase} complete. Model saved to bomberman_phase{phase}_final.zip") return model def main(): cfg = default_config() cfg.env.render_mode = None total_ts_env = os.environ.get("TOTAL_TIMESTEPS", "500_000:500_000:1_000_000") phase_ts = [int(x.replace("_", "")) for x in total_ts_env.split(":")] model = None model = train_phase(cfg, phase=1, total_timesteps=phase_ts[0], model=model) if len(phase_ts) > 1: model = train_phase(cfg, phase=2, total_timesteps=phase_ts[1], model=model) if len(phase_ts) > 2: model = train_phase(cfg, phase=3, total_timesteps=phase_ts[2], model=model) hub_model_id = os.environ.get("HUB_MODEL_ID", "") if hub_model_id: from huggingface_hub import HfApi api = HfApi() for phase in range(1, len(phase_ts) + 1): try: api.upload_file( path_or_fileobj=f"bomberman_phase{phase}_final.zip", path_in_repo=f"bomberman_phase{phase}_final.zip", repo_id=hub_model_id, repo_type="model", ) print(f"Phase {phase} model pushed to {hub_model_id}") except Exception as e: print(f"Failed to push phase {phase}: {e}") print("\n=== All phases complete! ===") if hub_model_id: print(f"Model repository: https://huggingface.co/{hub_model_id}") if __name__ == "__main__": main()