| |
| import math |
| import json |
| import base64 |
| import random |
| from dataclasses import dataclass, asdict, field |
| from typing import Dict, List, Tuple, Optional, Any |
|
|
| import numpy as np |
| from PIL import Image, ImageDraw, ImageFont |
|
|
| import gradio as gr |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| GRID_W, GRID_H = 29, 19 |
| TILE = 24 |
| HUD_H = 64 |
| SVG_W = GRID_W * TILE |
| SVG_H = GRID_H * TILE + HUD_H |
|
|
| VIEW_W, VIEW_H = 560, 315 |
| FOV_DEG = 74 |
| MAX_DEPTH = 22 |
|
|
| DIRS = [(1, 0), (0, 1), (-1, 0), (0, -1)] |
| ORI_DEG = [0, 90, 180, 270] |
|
|
| |
| |
| |
| EMPTY = 0 |
| WALL = 1 |
| PELLET = 2 |
| POWER = 3 |
| FLAG_A = 4 |
| FLAG_B = 5 |
| TREASURE = 6 |
| BASE_A = 7 |
| BASE_B = 8 |
| RESOURCE = 9 |
| HAZARD = 10 |
| GATE = 11 |
|
|
| TILE_NAMES = { |
| EMPTY: "Empty", |
| WALL: "Wall", |
| PELLET: "Pellet", |
| POWER: "Power", |
| FLAG_A: "Flag A", |
| FLAG_B: "Flag B", |
| TREASURE: "Treasure", |
| BASE_A: "Base A", |
| BASE_B: "Base B", |
| RESOURCE: "Resource", |
| HAZARD: "Hazard", |
| GATE: "Gate", |
| } |
|
|
| |
| COL_BG = "#0b1020" |
| COL_PANEL = "#0f1733" |
| COL_GRIDLINE = "#121a3b" |
| COL_WALL = "#cdd2e6" |
| COL_EMPTY = "#19214a" |
| COL_PELLET = "#ffd17a" |
| COL_POWER = "#ff7ad9" |
| COL_FLAG_A = "#7affc8" |
| COL_FLAG_B = "#ff7a7a" |
| COL_TREASURE = "#ffb86b" |
| COL_BASE_A = "#a0ffd9" |
| COL_BASE_B = "#ffb0b0" |
| COL_RESOURCE = "#9ab0ff" |
| COL_HAZARD = "#ff3b3b" |
| COL_GATE = "#7ad9ff" |
|
|
| AGENT_COLORS = { |
| "Predator": "#ff6d6d", |
| "Prey": "#6dffb0", |
| "Ghost1": "#ff7ad9", |
| "Ghost2": "#7ad9ff", |
| "RunnerA": "#ffd17a", |
| "RunnerB": "#9ab0ff", |
| "GuardA": "#7affc8", |
| "GuardB": "#ffb0b0", |
| "MinerA": "#a0ffd9", |
| "MinerB": "#c7d2fe", |
| "Raider": "#ff9b6b", |
| } |
|
|
| |
| |
| |
| def clamp(v, lo, hi): |
| return lo if v < lo else hi if v > hi else v |
|
|
| def in_bounds(x: int, y: int) -> bool: |
| return 0 <= x < GRID_W and 0 <= y < GRID_H |
|
|
| def manhattan(a: Tuple[int, int], b: Tuple[int, int]) -> int: |
| return abs(a[0] - b[0]) + abs(a[1] - b[1]) |
|
|
| def rng(seed: int) -> random.Random: |
| r = random.Random() |
| r.seed(seed & 0xFFFFFFFF) |
| return r |
|
|
| def grid_copy(g: List[List[int]]) -> List[List[int]]: |
| return [row[:] for row in g] |
|
|
| def find_all(g: List[List[int]], tile: int) -> List[Tuple[int, int]]: |
| out = [] |
| for y in range(GRID_H): |
| for x in range(GRID_W): |
| if g[y][x] == tile: |
| out.append((x, y)) |
| return out |
|
|
| def bresenham_los(grid: List[List[int]], x0: int, y0: int, x1: int, y1: int) -> bool: |
| dx = abs(x1 - x0) |
| dy = abs(y1 - y0) |
| sx = 1 if x0 < x1 else -1 |
| sy = 1 if y0 < y1 else -1 |
| err = dx - dy |
| x, y = x0, y0 |
| while True: |
| if (x, y) != (x0, y0) and (x, y) != (x1, y1): |
| if grid[y][x] == WALL: |
| return False |
| if x == x1 and y == y1: |
| return True |
| e2 = 2 * err |
| if e2 > -dy: |
| err -= dy |
| x += sx |
| if e2 < dx: |
| err += dx |
| y += sy |
|
|
| def within_fov(ax: int, ay: int, ori: int, tx: int, ty: int, fov_deg: float = FOV_DEG) -> bool: |
| dx = tx - ax |
| dy = ty - ay |
| if dx == 0 and dy == 0: |
| return True |
| ang = (math.degrees(math.atan2(dy, dx)) % 360) |
| facing = ORI_DEG[ori] |
| diff = (ang - facing + 540) % 360 - 180 |
| return abs(diff) <= (fov_deg / 2) |
|
|
| |
| |
| |
| @dataclass |
| class Agent: |
| name: str |
| team: str |
| x: int |
| y: int |
| ori: int = 0 |
| hp: int = 5 |
| energy: int = 200 |
| inventory: Dict[str, int] = field(default_factory=dict) |
| mode: str = "auto" |
| brain: str = "heur" |
|
|
| @dataclass |
| class Objective: |
| title: str |
| detail: str |
|
|
| @dataclass |
| class EnvSpec: |
| key: str |
| title: str |
| summary: str |
| max_steps: int |
|
|
| @dataclass |
| class World: |
| seed: int |
| step: int |
| env_key: str |
| map_key: str |
|
|
| grid: List[List[int]] |
| agents: Dict[str, Agent] |
|
|
| |
| done: bool = False |
| outcome: str = "ongoing" |
|
|
| |
| power_timer: int = 0 |
| pellets_left: int = 0 |
|
|
| |
| flag_carrier: Optional[str] = None |
| flag_taken_from: Optional[str] = None |
|
|
| |
| treasure_collected_A: int = 0 |
| treasure_collected_B: int = 0 |
|
|
| |
| baseA_progress: int = 0 |
| baseB_progress: int = 0 |
| base_target: int = 10 |
|
|
| |
| controlled: str = "" |
| pov: str = "" |
| overlay: bool = True |
| auto_camera: bool = True |
|
|
| |
| events: List[str] = field(default_factory=list) |
|
|
| |
| |
| |
| def base_border_grid() -> List[List[int]]: |
| g = [[EMPTY for _ in range(GRID_W)] for _ in range(GRID_H)] |
| for x in range(GRID_W): |
| g[0][x] = WALL |
| g[GRID_H - 1][x] = WALL |
| for y in range(GRID_H): |
| g[y][0] = WALL |
| g[y][GRID_W - 1] = WALL |
| return g |
|
|
| def carve_maze(seed: int, density: float = 0.66) -> List[List[int]]: |
| """ |
| Procedural "course" generator: a DFS maze with a few open plazas. |
| We generate walls then carve corridors. This produces interesting navigation. |
| """ |
| r = rng(seed) |
| g = [[WALL for _ in range(GRID_W)] for _ in range(GRID_H)] |
|
|
| |
| for y in range(GRID_H): |
| for x in range(GRID_W): |
| if x in (0, GRID_W - 1) or y in (0, GRID_H - 1): |
| g[y][x] = WALL |
|
|
| |
| def neighbors(cx, cy): |
| dirs = [(2, 0), (-2, 0), (0, 2), (0, -2)] |
| r.shuffle(dirs) |
| for dx, dy in dirs: |
| nx, ny = cx + dx, cy + dy |
| if 1 <= nx < GRID_W - 1 and 1 <= ny < GRID_H - 1: |
| yield nx, ny, dx, dy |
|
|
| start = (1 + 2 * (r.randint(0, (GRID_W - 3) // 2)), |
| 1 + 2 * (r.randint(0, (GRID_H - 3) // 2))) |
| stack = [start] |
| g[start[1]][start[0]] = EMPTY |
|
|
| visited = set([start]) |
| while stack: |
| cx, cy = stack[-1] |
| moved = False |
| for nx, ny, dx, dy in neighbors(cx, cy): |
| if (nx, ny) in visited: |
| continue |
| visited.add((nx, ny)) |
| g[cy + dy // 2][cx + dx // 2] = EMPTY |
| g[ny][nx] = EMPTY |
| stack.append((nx, ny)) |
| moved = True |
| break |
| if not moved: |
| stack.pop() |
|
|
| |
| plazas = int((1.0 - density) * 8) + 2 |
| for _ in range(plazas): |
| px = r.randint(3, GRID_W - 4) |
| py = r.randint(3, GRID_H - 4) |
| w = r.randint(2, 4) |
| h = r.randint(2, 3) |
| for yy in range(py - h, py + h + 1): |
| for xx in range(px - w, px + w + 1): |
| if 1 <= xx < GRID_W - 1 and 1 <= yy < GRID_H - 1: |
| g[yy][xx] = EMPTY |
|
|
| return g |
|
|
| def map_pac_chase(seed: int) -> List[List[int]]: |
| g = base_border_grid() |
| |
| for x in range(4, GRID_W - 4): |
| g[GRID_H // 2][x] = WALL |
| gate_x = GRID_W // 2 |
| g[GRID_H // 2][gate_x] = GATE |
| g[GRID_H // 2][gate_x - 1] = GATE |
| g[GRID_H // 2][gate_x + 1] = GATE |
|
|
| |
| for y in range(1, GRID_H - 1): |
| for x in range(1, GRID_W - 1): |
| if g[y][x] == EMPTY: |
| g[y][x] = PELLET |
|
|
| |
| for (x, y) in [(2, 2), (GRID_W - 3, 2), (2, GRID_H - 3), (GRID_W - 3, GRID_H - 3)]: |
| g[y][x] = POWER |
|
|
| |
| r = rng(seed) |
| for _ in range(26): |
| x = r.randint(2, GRID_W - 3) |
| y = r.randint(2, GRID_H - 3) |
| if g[y][x] in (PELLET, EMPTY): |
| g[y][x] = WALL |
|
|
| return g |
|
|
| def map_ctf_arena(seed: int) -> List[List[int]]: |
| g = carve_maze(seed, density=0.60) |
| |
| cx, cy = GRID_W // 2, GRID_H // 2 |
| for y in range(cy - 3, cy + 4): |
| for x in range(cx - 5, cx + 6): |
| if 1 <= x < GRID_W - 1 and 1 <= y < GRID_H - 1: |
| g[y][x] = EMPTY |
|
|
| |
| g[2][2] = FLAG_A |
| g[GRID_H - 3][GRID_W - 3] = FLAG_B |
| g[2][GRID_W - 3] = BASE_A |
| g[GRID_H - 3][2] = BASE_B |
|
|
| |
| r = rng(seed + 11) |
| for _ in range(18): |
| x = r.randint(2, GRID_W - 3) |
| y = r.randint(2, GRID_H - 3) |
| if g[y][x] == EMPTY: |
| g[y][x] = HAZARD |
|
|
| return g |
|
|
| def map_treasure_run(seed: int) -> List[List[int]]: |
| g = carve_maze(seed, density=0.70) |
| |
| r = rng(seed + 7) |
| for _ in range(12): |
| x = r.randint(2, GRID_W - 3) |
| y = r.randint(2, GRID_H - 3) |
| if g[y][x] == EMPTY: |
| g[y][x] = TREASURE |
| |
| g[2][2] = BASE_A |
| g[GRID_H - 3][GRID_W - 3] = BASE_B |
| return g |
|
|
| def map_resource_raid(seed: int) -> List[List[int]]: |
| g = carve_maze(seed, density=0.64) |
| |
| r = rng(seed + 23) |
| for _ in range(22): |
| x = r.randint(2, GRID_W - 3) |
| y = r.randint(2, GRID_H - 3) |
| if g[y][x] == EMPTY: |
| g[y][x] = RESOURCE |
| |
| g[2][2] = BASE_A |
| g[GRID_H - 3][GRID_W - 3] = BASE_B |
| return g |
|
|
| MAP_BUILDERS = { |
| "Classic Pac-Chase": map_pac_chase, |
| "CTF Maze Arena": map_ctf_arena, |
| "Treasure Labyrinth": map_treasure_run, |
| "Resource Raid Maze": map_resource_raid, |
| "Procedural Maze (General)": lambda seed: carve_maze(seed, density=0.62), |
| } |
|
|
| |
| |
| |
| ENVS: Dict[str, EnvSpec] = { |
| "pac_chase": EnvSpec( |
| key="pac_chase", |
| title="Predator/Prey (Pac-Chase)", |
| summary="Predator hunts Prey. Prey scores by eating pellets; power flips the chase temporarily.", |
| max_steps=650, |
| ), |
| "ctf": EnvSpec( |
| key="ctf", |
| title="Capture The Flag", |
| summary="Steal the opponent’s flag and return it to your base. Hazards drain HP.", |
| max_steps=800, |
| ), |
| "treasure": EnvSpec( |
| key="treasure", |
| title="Treasure Run", |
| summary="Collect treasures scattered in the maze and deposit at base. First to 6 deposits wins.", |
| max_steps=750, |
| ), |
| "resource": EnvSpec( |
| key="resource", |
| title="Resource Raid", |
| summary="Mine resources, deposit to build base progress. Raider tries to disrupt and tag.", |
| max_steps=850, |
| ), |
| } |
|
|
| def env_objectives(env_key: str) -> List[Objective]: |
| if env_key == "pac_chase": |
| return [ |
| Objective("Prey", "Eat pellets (+) and survive. Power pellet makes Predator vulnerable temporarily."), |
| Objective("Predator", "Catch the Prey (tag on same tile). Avoid chasing into power windows."), |
| ] |
| if env_key == "ctf": |
| return [ |
| Objective("Team A", "Grab Flag B and return to Base A."), |
| Objective("Team B", "Grab Flag A and return to Base B."), |
| ] |
| if env_key == "treasure": |
| return [ |
| Objective("Both Teams", "Collect Treasures and deposit at your Base. First to 6 deposits wins."), |
| ] |
| if env_key == "resource": |
| return [ |
| Objective("Builders (A & B)", "Collect Resources and deposit to raise base progress."), |
| Objective("Raider", "Tag builders (collision) to slow progress; win by eliminating both or forcing timeout."), |
| ] |
| return [Objective("Objective", "Explore.")] |
|
|
| |
| |
| |
| def random_empty_cell(g: List[List[int]], r: random.Random) -> Tuple[int, int]: |
| empties = [(x, y) for y in range(1, GRID_H - 1) for x in range(1, GRID_W - 1) if g[y][x] in (EMPTY, PELLET)] |
| return r.choice(empties) if empties else (2, 2) |
|
|
| def init_world(seed: int, env_key: str, map_key: str) -> World: |
| r = rng(seed) |
| g = MAP_BUILDERS[map_key](seed) |
| spec = ENVS[env_key] |
|
|
| agents: Dict[str, Agent] = {} |
|
|
| if env_key == "pac_chase": |
| |
| px, py = 2, 2 |
| qx, qy = GRID_W - 3, GRID_H - 3 |
| agents["Predator"] = Agent("Predator", "A", px, py, ori=0, hp=6, mode="auto", brain="heur") |
| agents["Prey"] = Agent("Prey", "B", qx, qy, ori=2, hp=5, mode="auto", brain="heur") |
| gx1, gy1 = (GRID_W // 2, 2) |
| gx2, gy2 = (GRID_W // 2, GRID_H - 3) |
| agents["Ghost1"] = Agent("Ghost1", "A", gx1, gy1, ori=1, hp=4, mode="auto", brain="random") |
| agents["Ghost2"] = Agent("Ghost2", "A", gx2, gy2, ori=3, hp=4, mode="auto", brain="random") |
|
|
| pellets = sum(1 for y in range(GRID_H) for x in range(GRID_W) if g[y][x] in (PELLET, POWER)) |
| controlled = "Prey" |
| pov = "Prey" |
|
|
| elif env_key == "ctf": |
| |
| ax, ay = 2, GRID_H - 3 |
| bx, by = GRID_W - 3, 2 |
| agents["RunnerA"] = Agent("RunnerA", "A", ax, ay, ori=0, hp=6, mode="auto", brain="heur") |
| agents["GuardA"] = Agent("GuardA", "A", 2, 2, ori=0, hp=7, mode="auto", brain="heur") |
| agents["RunnerB"] = Agent("RunnerB", "B", bx, by, ori=2, hp=6, mode="auto", brain="heur") |
| agents["GuardB"] = Agent("GuardB", "B", GRID_W - 3, GRID_H - 3, ori=2, hp=7, mode="auto", brain="heur") |
| pellets = 0 |
| controlled = "RunnerA" |
| pov = "RunnerA" |
|
|
| elif env_key == "treasure": |
| agents["RunnerA"] = Agent("RunnerA", "A", 2, 2, ori=0, hp=6, mode="auto", brain="heur") |
| agents["RunnerB"] = Agent("RunnerB", "B", GRID_W - 3, GRID_H - 3, ori=2, hp=6, mode="auto", brain="heur") |
| agents["GuardA"] = Agent("GuardA", "A", 2, GRID_H - 3, ori=0, hp=6, mode="auto", brain="heur") |
| agents["GuardB"] = Agent("GuardB", "B", GRID_W - 3, 2, ori=2, hp=6, mode="auto", brain="heur") |
| pellets = 0 |
| controlled = "RunnerA" |
| pov = "RunnerA" |
|
|
| else: |
| agents["MinerA"] = Agent("MinerA", "A", 2, 2, ori=0, hp=6, mode="auto", brain="heur") |
| agents["MinerB"] = Agent("MinerB", "B", GRID_W - 3, GRID_H - 3, ori=2, hp=6, mode="auto", brain="heur") |
| agents["Raider"] = Agent("Raider", "R", GRID_W - 3, 2, ori=2, hp=7, mode="auto", brain="heur") |
| pellets = 0 |
| controlled = "MinerA" |
| pov = "MinerA" |
|
|
| w = World( |
| seed=seed, |
| step=0, |
| env_key=env_key, |
| map_key=map_key, |
| grid=g, |
| agents=agents, |
| pellets_left=pellets, |
| controlled=controlled, |
| pov=pov, |
| overlay=True, |
| auto_camera=True, |
| events=[f"Initialized: env={env_key} ({spec.title}) | map={map_key} | seed={seed}"], |
| ) |
| return w |
|
|
| |
| |
| |
| def is_blocking(tile: int) -> bool: |
| return tile == WALL |
|
|
| def neighbors4(x: int, y: int) -> List[Tuple[int, int]]: |
| return [(x + 1, y), (x, y + 1), (x - 1, y), (x, y - 1)] |
|
|
| def bfs_next_step(grid: List[List[int]], start: Tuple[int, int], goal: Tuple[int, int]) -> Optional[Tuple[int, int]]: |
| if start == goal: |
| return None |
| sx, sy = start |
| gx, gy = goal |
| q = [(sx, sy)] |
| prev = {start: None} |
| while q: |
| x, y = q.pop(0) |
| if (x, y) == (gx, gy): |
| break |
| for nx, ny in neighbors4(x, y): |
| if not in_bounds(nx, ny): |
| continue |
| if is_blocking(grid[ny][nx]): |
| continue |
| if (nx, ny) not in prev: |
| prev[(nx, ny)] = (x, y) |
| q.append((nx, ny)) |
| if (gx, gy) not in prev: |
| return None |
| |
| cur = (gx, gy) |
| while prev[cur] != start and prev[cur] is not None: |
| cur = prev[cur] |
| return cur |
|
|
| def face_towards(a: Agent, tx: int, ty: int): |
| dx = tx - a.x |
| dy = ty - a.y |
| if abs(dx) > abs(dy): |
| a.ori = 0 if dx > 0 else 2 |
| else: |
| a.ori = 1 if dy > 0 else 3 |
|
|
| def move_to(world: World, a: Agent, nx: int, ny: int) -> bool: |
| if not in_bounds(nx, ny): |
| return False |
| if is_blocking(world.grid[ny][nx]): |
| return False |
| a.x, a.y = nx, ny |
| a.energy = max(0, a.energy - 1) |
| return True |
|
|
| |
| |
| |
| def apply_tile_effects(world: World, a: Agent): |
| t = world.grid[a.y][a.x] |
|
|
| |
| if t == HAZARD: |
| a.hp -= 1 |
| world.events.append(f"t={world.step}: {a.name} hit a hazard (-hp).") |
|
|
| if world.env_key == "pac_chase": |
| if t == PELLET: |
| world.grid[a.y][a.x] = EMPTY |
| world.pellets_left = max(0, world.pellets_left - 1) |
| a.inventory["pellets"] = a.inventory.get("pellets", 0) + 1 |
| elif t == POWER: |
| world.grid[a.y][a.x] = EMPTY |
| world.pellets_left = max(0, world.pellets_left - 1) |
| world.power_timer = 26 |
| world.events.append(f"t={world.step}: POWER ACTIVE — chase flips for a bit.") |
|
|
| if world.env_key == "ctf": |
| if t == FLAG_A and a.team == "B" and world.flag_carrier is None: |
| world.flag_carrier = a.name |
| world.flag_taken_from = "A" |
| world.grid[a.y][a.x] = EMPTY |
| world.events.append(f"t={world.step}: {a.name} stole Flag A!") |
| if t == FLAG_B and a.team == "A" and world.flag_carrier is None: |
| world.flag_carrier = a.name |
| world.flag_taken_from = "B" |
| world.grid[a.y][a.x] = EMPTY |
| world.events.append(f"t={world.step}: {a.name} stole Flag B!") |
|
|
| |
| if world.flag_carrier == a.name: |
| if a.team == "A" and world.grid[a.y][a.x] == BASE_A and world.flag_taken_from == "B": |
| world.done = True |
| world.outcome = "A_win" |
| world.events.append(f"t={world.step}: Team A captured the flag!") |
| if a.team == "B" and world.grid[a.y][a.x] == BASE_B and world.flag_taken_from == "A": |
| world.done = True |
| world.outcome = "B_win" |
| world.events.append(f"t={world.step}: Team B captured the flag!") |
|
|
| if world.env_key == "treasure": |
| if t == TREASURE: |
| world.grid[a.y][a.x] = EMPTY |
| a.inventory["treasure"] = a.inventory.get("treasure", 0) + 1 |
| world.events.append(f"t={world.step}: {a.name} picked treasure.") |
| if t == BASE_A and a.team == "A": |
| dep = a.inventory.get("treasure", 0) |
| if dep > 0: |
| a.inventory["treasure"] = 0 |
| world.treasure_collected_A += dep |
| world.events.append(f"t={world.step}: Team A deposited {dep} treasure (total={world.treasure_collected_A}).") |
| if t == BASE_B and a.team == "B": |
| dep = a.inventory.get("treasure", 0) |
| if dep > 0: |
| a.inventory["treasure"] = 0 |
| world.treasure_collected_B += dep |
| world.events.append(f"t={world.step}: Team B deposited {dep} treasure (total={world.treasure_collected_B}).") |
|
|
| if world.env_key == "resource": |
| if t == RESOURCE: |
| world.grid[a.y][a.x] = EMPTY |
| a.inventory["res"] = a.inventory.get("res", 0) + 1 |
| world.events.append(f"t={world.step}: {a.name} mined resource.") |
| if t == BASE_A and a.name == "MinerA": |
| dep = min(2, a.inventory.get("res", 0)) |
| if dep > 0: |
| a.inventory["res"] -= dep |
| world.baseA_progress += dep |
| world.events.append(f"t={world.step}: MinerA deposited +{dep} (A={world.baseA_progress}/{world.base_target}).") |
| if t == BASE_B and a.name == "MinerB": |
| dep = min(2, a.inventory.get("res", 0)) |
| if dep > 0: |
| a.inventory["res"] -= dep |
| world.baseB_progress += dep |
| world.events.append(f"t={world.step}: MinerB deposited +{dep} (B={world.baseB_progress}/{world.base_target}).") |
|
|
| |
| |
| |
| def resolve_tags(world: World): |
| |
| pos: Dict[Tuple[int, int], List[str]] = {} |
| for nm, a in world.agents.items(): |
| if a.hp <= 0: |
| continue |
| pos.setdefault((a.x, a.y), []).append(nm) |
|
|
| for (x, y), names in pos.items(): |
| if len(names) < 2: |
| continue |
| teams = set(world.agents[n].team for n in names) |
| if len(teams) <= 1: |
| continue |
|
|
| |
| if world.env_key == "pac_chase": |
| if "Predator" in names and "Prey" in names: |
| if world.power_timer > 0: |
| |
| world.agents["Predator"].hp -= 2 |
| world.events.append(f"t={world.step}: Prey TAGGED Predator during POWER (-2hp Predator).") |
| else: |
| world.done = True |
| world.outcome = "A_win" |
| world.events.append(f"t={world.step}: Predator CAUGHT Prey.") |
| return |
|
|
| |
| for n in names: |
| world.agents[n].hp -= 1 |
| world.events.append(f"t={world.step}: TAG at ({x},{y}) {names} (-hp).") |
|
|
| |
| if world.env_key == "ctf" and world.flag_carrier in names: |
| carrier = world.flag_carrier |
| world.flag_carrier = None |
| |
| if world.flag_taken_from == "A": |
| world.grid[2][2] = FLAG_A |
| elif world.flag_taken_from == "B": |
| world.grid[GRID_H - 3][GRID_W - 3] = FLAG_B |
| world.events.append(f"t={world.step}: {carrier} dropped the flag!") |
|
|
| |
| |
| |
| def check_done(world: World): |
| spec = ENVS[world.env_key] |
| if world.done: |
| return |
|
|
| |
| if world.step >= spec.max_steps: |
| world.done = True |
| world.outcome = "draw" |
| world.events.append(f"t={world.step}: TIMEOUT (draw).") |
| return |
|
|
| if world.env_key == "pac_chase": |
| prey = world.agents["Prey"] |
| pred = world.agents["Predator"] |
| if prey.hp <= 0: |
| world.done = True |
| world.outcome = "A_win" |
| world.events.append(f"t={world.step}: Prey eliminated — Predator wins.") |
| return |
| if pred.hp <= 0: |
| world.done = True |
| world.outcome = "B_win" |
| world.events.append(f"t={world.step}: Predator eliminated — Prey wins.") |
| return |
| if world.pellets_left <= 0: |
| world.done = True |
| world.outcome = "B_win" |
| world.events.append(f"t={world.step}: All pellets cleared — Prey wins.") |
| return |
|
|
| if world.env_key == "ctf": |
| |
| |
| aliveA = any(a.hp > 0 for a in world.agents.values() if a.team == "A") |
| aliveB = any(a.hp > 0 for a in world.agents.values() if a.team == "B") |
| if not aliveA and aliveB: |
| world.done = True |
| world.outcome = "B_win" |
| world.events.append(f"t={world.step}: Team A eliminated — Team B wins.") |
| elif not aliveB and aliveA: |
| world.done = True |
| world.outcome = "A_win" |
| world.events.append(f"t={world.step}: Team B eliminated — Team A wins.") |
|
|
| if world.env_key == "treasure": |
| if world.treasure_collected_A >= 6: |
| world.done = True |
| world.outcome = "A_win" |
| world.events.append(f"t={world.step}: Team A reached 6 treasure — wins.") |
| elif world.treasure_collected_B >= 6: |
| world.done = True |
| world.outcome = "B_win" |
| world.events.append(f"t={world.step}: Team B reached 6 treasure — wins.") |
|
|
| if world.env_key == "resource": |
| if world.baseA_progress >= world.base_target: |
| world.done = True |
| world.outcome = "A_win" |
| world.events.append(f"t={world.step}: Base A complete — MinerA wins.") |
| elif world.baseB_progress >= world.base_target: |
| world.done = True |
| world.outcome = "B_win" |
| world.events.append(f"t={world.step}: Base B complete — MinerB wins.") |
| |
| alive_miners = sum(1 for nm in ("MinerA", "MinerB") if world.agents.get(nm) and world.agents[nm].hp > 0) |
| if alive_miners == 0 and world.agents["Raider"].hp > 0: |
| world.done = True |
| world.outcome = "B_win" |
| world.events.append(f"t={world.step}: Miners eliminated — Raider wins.") |
|
|
| |
| |
| |
| def choose_target_pac(world: World, who: str) -> Tuple[int, int]: |
| a = world.agents[who] |
| prey = world.agents["Prey"] |
| pred = world.agents["Predator"] |
|
|
| if who == "Prey": |
| |
| if world.power_timer > 0: |
| return (pred.x, pred.y) |
| |
| pellets = find_all(world.grid, PELLET) + find_all(world.grid, POWER) |
| if pellets: |
| pellets.sort(key=lambda p: manhattan((a.x, a.y), p)) |
| return pellets[0] |
| return (a.x, a.y) |
|
|
| if who == "Predator": |
| |
| if world.power_timer > 0: |
| |
| corners = [(2, 2), (GRID_W - 3, 2), (2, GRID_H - 3), (GRID_W - 3, GRID_H - 3)] |
| corners.sort(key=lambda c: -manhattan((prey.x, prey.y), c)) |
| return corners[0] |
| return (prey.x, prey.y) |
|
|
| |
| return (prey.x, prey.y) |
|
|
| def choose_target_ctf(world: World, who: str) -> Tuple[int, int]: |
| a = world.agents[who] |
| |
| if a.team == "A": |
| home_base = BASE_A |
| enemy_flag = FLAG_B |
| home_base_pos = find_all(world.grid, BASE_A)[0] |
| else: |
| home_base = BASE_B |
| enemy_flag = FLAG_A |
| home_base_pos = find_all(world.grid, BASE_B)[0] |
|
|
| |
| if world.flag_carrier == who: |
| return home_base_pos |
|
|
| |
| if world.flag_carrier is not None: |
| carrier = world.agents[world.flag_carrier] |
| return (carrier.x, carrier.y) |
|
|
| |
| if "Runner" in who: |
| flags = find_all(world.grid, enemy_flag) |
| if flags: |
| return flags[0] |
| return home_base_pos |
|
|
| |
| enemy_flag_pos = find_all(world.grid, enemy_flag) |
| if enemy_flag_pos: |
| ex, ey = enemy_flag_pos[0] |
| bx, by = home_base_pos |
| return ((ex + bx) // 2, (ey + by) // 2) |
| return home_base_pos |
|
|
| def choose_target_treasure(world: World, who: str) -> Tuple[int, int]: |
| a = world.agents[who] |
| base = BASE_A if a.team == "A" else BASE_B |
| base_pos = find_all(world.grid, base)[0] |
|
|
| |
| if a.inventory.get("treasure", 0) >= 2: |
| return base_pos |
|
|
| treasures = find_all(world.grid, TREASURE) |
| if treasures: |
| treasures.sort(key=lambda p: manhattan((a.x, a.y), p)) |
| return treasures[0] |
| return base_pos |
|
|
| def choose_target_resource(world: World, who: str) -> Tuple[int, int]: |
| a = world.agents[who] |
| if who == "Raider": |
| |
| miners = [world.agents[n] for n in ("MinerA", "MinerB") if world.agents.get(n) and world.agents[n].hp > 0] |
| if miners: |
| miners.sort(key=lambda m: manhattan((a.x, a.y), (m.x, m.y))) |
| return (miners[0].x, miners[0].y) |
| return (a.x, a.y) |
|
|
| |
| base_tile = BASE_A if who == "MinerA" else BASE_B |
| base_pos = find_all(world.grid, base_tile)[0] |
| if a.inventory.get("res", 0) >= 3: |
| return base_pos |
|
|
| res = find_all(world.grid, RESOURCE) |
| if res: |
| res.sort(key=lambda p: manhattan((a.x, a.y), p)) |
| return res[0] |
| return base_pos |
|
|
| def choose_target(world: World, who: str) -> Tuple[int, int]: |
| if world.env_key == "pac_chase": |
| return choose_target_pac(world, who) |
| if world.env_key == "ctf": |
| return choose_target_ctf(world, who) |
| if world.env_key == "treasure": |
| return choose_target_treasure(world, who) |
| if world.env_key == "resource": |
| return choose_target_resource(world, who) |
| return (world.agents[who].x, world.agents[who].y) |
|
|
| def auto_step_agent(world: World, who: str): |
| a = world.agents[who] |
| if a.hp <= 0: |
| return |
|
|
| |
| if a.brain == "random": |
| cand = [] |
| for nx, ny in neighbors4(a.x, a.y): |
| if in_bounds(nx, ny) and not is_blocking(world.grid[ny][nx]): |
| cand.append((nx, ny)) |
| if cand: |
| nx, ny = random.choice(cand) |
| face_towards(a, nx, ny) |
| move_to(world, a, nx, ny) |
| return |
|
|
| tx, ty = choose_target(world, who) |
| nxt = bfs_next_step(world.grid, (a.x, a.y), (tx, ty)) |
| if nxt is None: |
| |
| cand = [] |
| for nx, ny in neighbors4(a.x, a.y): |
| if in_bounds(nx, ny) and not is_blocking(world.grid[ny][nx]): |
| cand.append((nx, ny)) |
| if cand: |
| nx, ny = cand[world.step % len(cand)] |
| face_towards(a, nx, ny) |
| move_to(world, a, nx, ny) |
| return |
|
|
| nx, ny = nxt |
| face_towards(a, nx, ny) |
| move_to(world, a, nx, ny) |
|
|
| def manual_action(world: World, action: str): |
| """ |
| Manual control for the 'controlled' agent: |
| L/R/F/I style minimal actions (Pacman-appropriate). |
| """ |
| who = world.controlled |
| a = world.agents[who] |
| if a.hp <= 0: |
| return |
|
|
| if action == "L": |
| a.ori = (a.ori - 1) % 4 |
| return |
| if action == "R": |
| a.ori = (a.ori + 1) % 4 |
| return |
| if action == "F": |
| dx, dy = DIRS[a.ori] |
| nx, ny = a.x + dx, a.y + dy |
| if in_bounds(nx, ny) and not is_blocking(world.grid[ny][nx]): |
| move_to(world, a, nx, ny) |
| return |
| if action == "I": |
| |
| |
| if world.env_key == "ctf" and world.flag_carrier == who: |
| world.flag_carrier = None |
| |
| if world.flag_taken_from == "A": |
| world.grid[a.y][a.x] = FLAG_A |
| elif world.flag_taken_from == "B": |
| world.grid[a.y][a.x] = FLAG_B |
| world.events.append(f"t={world.step}: {who} dropped the flag manually.") |
| return |
|
|
| |
| |
| |
| SKY = np.array([12, 14, 26], dtype=np.uint8) |
| FLOOR1 = np.array([24, 28, 54], dtype=np.uint8) |
| FLOOR2 = np.array([10, 12, 22], dtype=np.uint8) |
| WALL1 = np.array([205, 210, 232], dtype=np.uint8) |
| WALL2 = np.array([160, 168, 195], dtype=np.uint8) |
| GATEC = np.array([120, 220, 255], dtype=np.uint8) |
|
|
| def raycast_pov(world: World, who: str) -> np.ndarray: |
| a = world.agents[who] |
| img = np.zeros((VIEW_H, VIEW_W, 3), dtype=np.uint8) |
| img[:, :] = SKY |
| |
| for y in range(VIEW_H // 2, VIEW_H): |
| t = (y - VIEW_H // 2) / max(1, (VIEW_H // 2)) |
| col = (1 - t) * FLOOR1 + t * FLOOR2 |
| img[y, :] = col.astype(np.uint8) |
|
|
| |
| ray_cols = VIEW_W |
| half = math.radians(FOV_DEG / 2) |
| base = math.radians(ORI_DEG[a.ori]) |
|
|
| for rx in range(ray_cols): |
| cam = (2 * rx / (ray_cols - 1)) - 1 |
| ang = base + cam * half |
| sin_a = math.sin(ang) |
| cos_a = math.cos(ang) |
|
|
| ox, oy = a.x + 0.5, a.y + 0.5 |
| depth = 0.0 |
| hit = None |
| side = 0 |
|
|
| while depth < MAX_DEPTH: |
| depth += 0.06 |
| tx = int(ox + cos_a * depth) |
| ty = int(oy + sin_a * depth) |
| if not in_bounds(tx, ty): |
| break |
| tile = world.grid[ty][tx] |
| if tile == WALL: |
| hit = "wall" |
| side = 1 if abs(cos_a) > abs(sin_a) else 0 |
| break |
| if tile == GATE: |
| hit = "gate" |
| break |
|
|
| if hit is None: |
| continue |
|
|
| depth *= math.cos(ang - base) |
| depth = max(depth, 0.001) |
|
|
| h = int((VIEW_H * 0.92) / depth) |
| y0 = max(0, VIEW_H // 2 - h // 2) |
| y1 = min(VIEW_H - 1, VIEW_H // 2 + h // 2) |
|
|
| col = (GATEC.copy() if hit == "gate" else (WALL1.copy() if side == 0 else WALL2.copy())) |
| dim = max(0.28, 1.0 - depth / MAX_DEPTH) |
| col = (col * dim).astype(np.uint8) |
| img[y0:y1, rx:rx + 1] = col |
|
|
| |
| for nm, other in world.agents.items(): |
| if nm == who or other.hp <= 0: |
| continue |
| if not within_fov(a.x, a.y, a.ori, other.x, other.y): |
| continue |
| if not bresenham_los(world.grid, a.x, a.y, other.x, other.y): |
| continue |
|
|
| dx = other.x - a.x |
| dy = other.y - a.y |
| ang = math.degrees(math.atan2(dy, dx)) % 360 |
| facing = ORI_DEG[a.ori] |
| diff = (ang - facing + 540) % 360 - 180 |
| sx = int((diff / (FOV_DEG / 2)) * (VIEW_W / 2) + (VIEW_W / 2)) |
| dist = math.sqrt(dx * dx + dy * dy) |
| size = int((VIEW_H * 0.55) / max(dist, 1.0)) |
| size = clamp(size, 10, 110) |
| ymid = VIEW_H // 2 |
| x0 = clamp(sx - size // 4, 0, VIEW_W - 1) |
| x1 = clamp(sx + size // 4, 0, VIEW_W - 1) |
| y0 = clamp(ymid - size // 2, 0, VIEW_H - 1) |
| y1 = clamp(ymid + size // 2, 0, VIEW_H - 1) |
|
|
| |
| hexcol = AGENT_COLORS.get(nm, "#ffd17a").lstrip("#") |
| rgb = np.array([int(hexcol[i:i+2], 16) for i in (0, 2, 4)], dtype=np.uint8) |
| img[y0:y1, x0:x1] = rgb |
|
|
| |
| if world.overlay: |
| cx, cy = VIEW_W // 2, VIEW_H // 2 |
| img[cy - 1:cy + 2, cx - 16:cx + 16] = np.array([110, 210, 255], dtype=np.uint8) |
| img[cy - 16:cy + 16, cx - 1:cx + 2] = np.array([110, 210, 255], dtype=np.uint8) |
|
|
| return img |
|
|
| |
| |
| |
| def tile_color(tile: int) -> str: |
| return { |
| EMPTY: COL_EMPTY, |
| WALL: COL_WALL, |
| PELLET: COL_PELLET, |
| POWER: COL_POWER, |
| FLAG_A: COL_FLAG_A, |
| FLAG_B: COL_FLAG_B, |
| TREASURE: COL_TREASURE, |
| BASE_A: COL_BASE_A, |
| BASE_B: COL_BASE_B, |
| RESOURCE: COL_RESOURCE, |
| HAZARD: COL_HAZARD, |
| GATE: COL_GATE, |
| }.get(tile, COL_EMPTY) |
|
|
| def objective_hud(world: World) -> Tuple[str, str]: |
| spec = ENVS[world.env_key] |
| |
| if world.env_key == "pac_chase": |
| prey_score = world.agents["Prey"].inventory.get("pellets", 0) |
| headline = f"{spec.title} • pellets_left={world.pellets_left} • prey_score={prey_score} • power={world.power_timer}" |
| detail = "Prey clears pellets; Predator catches. Power flips vulnerability briefly." |
| elif world.env_key == "ctf": |
| carrier = world.flag_carrier or "none" |
| headline = f"{spec.title} • carrier={carrier} • step={world.step}/{spec.max_steps}" |
| detail = "Steal opponent flag → return to base. Tagging drops the flag." |
| elif world.env_key == "treasure": |
| headline = f"{spec.title} • A={world.treasure_collected_A}/6 • B={world.treasure_collected_B}/6 • step={world.step}/{spec.max_steps}" |
| detail = "Collect treasures and deposit at base. First to 6 wins." |
| else: |
| headline = f"{spec.title} • A={world.baseA_progress}/{world.base_target} • B={world.baseB_progress}/{world.base_target} • step={world.step}/{spec.max_steps}" |
| detail = "Mine resources, deposit to build progress. Raider tags to disrupt." |
| return headline, detail |
|
|
| def svg_render(world: World, highlight: Optional[Tuple[int, int]] = None) -> str: |
| headline, detail = objective_hud(world) |
|
|
| |
| |
| css = f""" |
| <style> |
| .root {{ |
| background: {COL_BG}; |
| border-radius: 18px; |
| overflow: hidden; |
| box-shadow: 0 18px 40px rgba(0,0,0,0.45); |
| }} |
| .hud {{ |
| font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Arial; |
| fill: rgba(235,240,255,0.92); |
| }} |
| .hudSmall {{ |
| fill: rgba(235,240,255,0.72); |
| }} |
| .tile {{ |
| shape-rendering: crispEdges; |
| }} |
| .gridline {{ |
| stroke: {COL_GRIDLINE}; |
| stroke-width: 1; |
| opacity: 0.45; |
| }} |
| .agent {{ |
| transition: transform 220ms cubic-bezier(.2,.8,.2,1); |
| filter: drop-shadow(0px 8px 10px rgba(0,0,0,0.45)); |
| }} |
| .agentCore {{ |
| transition: r 220ms cubic-bezier(.2,.8,.2,1); |
| }} |
| .pulse {{ |
| animation: pulse 1.2s ease-in-out infinite; |
| opacity: 0.24; |
| }} |
| @keyframes pulse {{ |
| 0% {{ transform: scale(1.0); opacity: 0.16; }} |
| 50% {{ transform: scale(1.15); opacity: 0.28; }} |
| 100% {{ transform: scale(1.0); opacity: 0.16; }} |
| }} |
| .badge {{ |
| fill: rgba(15,23,51,0.72); |
| stroke: rgba(170,195,255,0.16); |
| stroke-width: 1; |
| }} |
| .hint {{ |
| fill: rgba(110,180,255,0.95); |
| }} |
| .dead {{ |
| opacity: 0.22; |
| filter: none; |
| }} |
| .banner {{ |
| fill: rgba(255,255,255,0.08); |
| }} |
| </style> |
| """ |
|
|
| |
| svg = [f""" |
| <div class="root"> |
| {css} |
| <svg width="{SVG_W}" height="{SVG_H}" viewBox="0 0 {SVG_W} {SVG_H}"> |
| <rect x="0" y="0" width="{SVG_W}" height="{SVG_H}" fill="{COL_BG}"/> |
| <rect class="banner" x="0" y="0" width="{SVG_W}" height="{HUD_H}" rx="0" ry="0"/> |
| <text class="hud" x="18" y="28" font-size="16" font-weight="700">{headline}</text> |
| <text class="hud hudSmall" x="18" y="50" font-size="12">{detail}</text> |
| """] |
|
|
| |
| for y in range(GRID_H): |
| for x in range(GRID_W): |
| t = world.grid[y][x] |
| c = tile_color(t) |
| px = x * TILE |
| py = HUD_H + y * TILE |
| |
| if t == PELLET: |
| |
| svg.append(f'<rect class="tile" x="{px}" y="{py}" width="{TILE}" height="{TILE}" fill="{COL_EMPTY}"/>') |
| cx = px + TILE * 0.5 |
| cy = py + TILE * 0.5 |
| svg.append(f'<circle cx="{cx}" cy="{cy}" r="3" fill="{COL_PELLET}" opacity="0.95"/>') |
| elif t == POWER: |
| svg.append(f'<rect class="tile" x="{px}" y="{py}" width="{TILE}" height="{TILE}" fill="{COL_EMPTY}"/>') |
| cx = px + TILE * 0.5 |
| cy = py + TILE * 0.5 |
| svg.append(f'<circle cx="{cx}" cy="{cy}" r="7" fill="{COL_POWER}" opacity="0.95"/>') |
| else: |
| svg.append(f'<rect class="tile" x="{px}" y="{py}" width="{TILE}" height="{TILE}" fill="{c}"/>') |
|
|
| |
| for x in range(GRID_W + 1): |
| px = x * TILE |
| svg.append(f'<line class="gridline" x1="{px}" y1="{HUD_H}" x2="{px}" y2="{SVG_H}"/>') |
| for y in range(GRID_H + 1): |
| py = HUD_H + y * TILE |
| svg.append(f'<line class="gridline" x1="0" y1="{py}" x2="{SVG_W}" y2="{py}"/>') |
|
|
| |
| if highlight is not None: |
| hx, hy = highlight |
| if in_bounds(hx, hy): |
| px = hx * TILE |
| py = HUD_H + hy * TILE |
| svg.append(f'<rect x="{px+2}" y="{py+2}" width="{TILE-4}" height="{TILE-4}" rx="10" fill="none" stroke="rgba(110,180,255,0.95)" stroke-width="2"/>') |
|
|
| |
| for nm, a in world.agents.items(): |
| px = a.x * TILE |
| py = HUD_H + a.y * TILE |
| col = AGENT_COLORS.get(nm, "#ffd17a") |
| dead_cls = " dead" if a.hp <= 0 else "" |
| |
| svg.append(f""" |
| <g class="agent{dead_cls}" style="transform: translate({px}px, {py}px);"> |
| <circle class="pulse" cx="{TILE/2}" cy="{TILE/2}" r="{TILE*0.46}" fill="{col}"></circle> |
| <circle class="agentCore" cx="{TILE/2}" cy="{TILE/2}" r="{TILE*0.34}" fill="{col}" opacity="0.98"></circle> |
| """) |
|
|
| |
| dx, dy = DIRS[a.ori] |
| x2 = TILE/2 + dx*(TILE*0.32) |
| y2 = TILE/2 + dy*(TILE*0.32) |
| svg.append(f'<line x1="{TILE/2}" y1="{TILE/2}" x2="{x2}" y2="{y2}" stroke="rgba(10,10,14,0.85)" stroke-width="4" stroke-linecap="round"/>') |
|
|
| |
| badge_w = max(46, 10 * len(nm) * 0.62) |
| svg.append(f'<rect class="badge" x="{TILE/2 - badge_w/2}" y="{TILE*0.05}" rx="10" width="{badge_w}" height="16"/>') |
| svg.append(f'<text x="{TILE/2}" y="{TILE*0.05 + 12}" text-anchor="middle" font-size="10" fill="rgba(235,240,255,0.92)" font-family="ui-sans-serif, system-ui">{nm}</text>') |
|
|
| |
| hp = clamp(a.hp, 0, 10) |
| bar_w = TILE * 0.78 |
| bx = TILE/2 - bar_w/2 |
| by = TILE * 0.80 |
| svg.append(f'<rect x="{bx}" y="{by}" width="{bar_w}" height="6" rx="4" fill="rgba(255,255,255,0.12)"/>') |
| svg.append(f'<rect x="{bx}" y="{by}" width="{bar_w*(hp/10.0)}" height="6" rx="4" fill="rgba(122,255,200,0.85)"/>') |
|
|
| |
| if nm == world.controlled: |
| svg.append(f'<circle cx="{TILE*0.88}" cy="{TILE*0.18}" r="6" fill="rgba(110,180,255,0.95)"/>') |
|
|
| svg.append("</g>") |
|
|
| |
| if world.done: |
| outcome = world.outcome |
| outcome_col = "rgba(122,255,200,0.95)" if outcome == "A_win" else "rgba(255,122,122,0.95)" if outcome == "B_win" else "rgba(255,209,122,0.95)" |
| svg.append(f""" |
| <rect x="{SVG_W-300}" y="14" width="280" height="36" rx="14" fill="rgba(0,0,0,0.28)" stroke="rgba(255,255,255,0.10)"/> |
| <text x="{SVG_W-160}" y="38" text-anchor="middle" font-size="14" font-weight="700" |
| fill="{outcome_col}" font-family="ui-sans-serif, system-ui"> |
| DONE • {outcome} |
| </text> |
| """) |
|
|
| svg.append("</svg></div>") |
| return "".join(svg) |
|
|
| |
| |
| |
| def agent_table(world: World) -> str: |
| rows = [["agent", "team", "hp", "x", "y", "ori", "mode", "inv"]] |
| for nm, a in world.agents.items(): |
| rows.append([nm, a.team, a.hp, a.x, a.y, ORI_DEG[a.ori], a.mode, json.dumps(a.inventory, sort_keys=True)]) |
| |
| widths = [max(len(str(r[i])) for r in rows) for i in range(len(rows[0]))] |
| lines = [] |
| for i, r in enumerate(rows): |
| lines.append(" | ".join(str(r[j]).ljust(widths[j]) for j in range(len(widths)))) |
| if i == 0: |
| lines.append("-+-".join("-" * w for w in widths)) |
| return "\n".join(lines) |
|
|
| def status_text(world: World) -> str: |
| spec = ENVS[world.env_key] |
| return ( |
| f"env={world.env_key} ({spec.title}) | map={world.map_key} | seed={world.seed}\n" |
| f"step={world.step}/{spec.max_steps} | done={world.done} outcome={world.outcome}\n" |
| f"controlled={world.controlled} (mode={world.agents[world.controlled].mode}) | pov={world.pov} | auto_camera={world.auto_camera}" |
| ) |
|
|
| |
| |
| |
| def tick_world(world: World, manual: Optional[str] = None): |
| if world.done: |
| return |
|
|
| |
| if world.power_timer > 0: |
| world.power_timer -= 1 |
|
|
| |
| if manual is not None: |
| manual_action(world, manual) |
|
|
| |
| for nm, a in world.agents.items(): |
| if a.hp <= 0: |
| continue |
| if nm == world.controlled and manual is not None: |
| |
| pass |
| else: |
| if a.mode == "auto": |
| auto_step_agent(world, nm) |
|
|
| |
| for nm, a in world.agents.items(): |
| if a.hp > 0: |
| apply_tile_effects(world, a) |
|
|
| |
| resolve_tags(world) |
|
|
| |
| for nm, a in world.agents.items(): |
| if a.hp <= 0: |
| a.hp = 0 |
|
|
| |
| if world.auto_camera: |
| |
| best = None |
| best_score = -1e9 |
| for nm, a in world.agents.items(): |
| if a.hp <= 0: |
| continue |
| score = 0.0 |
| score += (10 - a.hp) * 0.8 |
| |
| for om, o in world.agents.items(): |
| if om == nm or o.hp <= 0: |
| continue |
| if a.team != o.team: |
| d = manhattan((a.x, a.y), (o.x, o.y)) |
| score += max(0, 10 - d) * 0.25 |
| if world.env_key == "ctf" and world.flag_carrier == nm: |
| score += 4.0 |
| if world.env_key == "treasure" and a.inventory.get("treasure", 0) > 0: |
| score += 2.0 |
| if world.env_key == "resource" and a.inventory.get("res", 0) > 0: |
| score += 1.5 |
|
|
| if score > best_score: |
| best_score = score |
| best = nm |
| if best is not None: |
| world.pov = best |
|
|
| |
| check_done(world) |
|
|
| |
| world.step += 1 |
|
|
| |
| if len(world.events) > 220: |
| world.events = world.events[-220:] |
|
|
| |
| |
| |
| def rebuild_views(world: World, highlight: Optional[Tuple[int, int]] = None): |
| svg = svg_render(world, highlight=highlight) |
| pov = raycast_pov(world, world.pov) |
| status = status_text(world) |
| agents_txt = agent_table(world) |
| events_txt = "\n".join(world.events[-18:]) |
| return svg, pov, status, agents_txt, events_txt |
|
|
| def set_agent_modes(world: World, controlled_mode: str, other_mode: str): |
| |
| if world.controlled in world.agents: |
| world.agents[world.controlled].mode = controlled_mode |
| |
| for nm, a in world.agents.items(): |
| if nm != world.controlled: |
| a.mode = other_mode |
| world.events.append(f"t={world.step}: Modes set — controlled={controlled_mode}, others={other_mode}") |
|
|
| def swap_controlled(world: World): |
| names = list(world.agents.keys()) |
| i = names.index(world.controlled) |
| world.controlled = names[(i + 1) % len(names)] |
| world.events.append(f"t={world.step}: Controlled -> {world.controlled}") |
|
|
| def swap_pov(world: World): |
| names = list(world.agents.keys()) |
| i = names.index(world.pov) |
| world.pov = names[(i + 1) % len(names)] |
| world.events.append(f"t={world.step}: POV -> {world.pov}") |
|
|
| def apply_env_map_seed(seed: int, env_key: str, map_key: str) -> World: |
| seed = int(seed) |
| w = init_world(seed=seed, env_key=env_key, map_key=map_key) |
| return w |
|
|
| |
| |
| |
| TITLE = "ZEN AgentLab++ — Animated Multi-Map Agent Simulation Arena" |
|
|
| with gr.Blocks(title=TITLE) as demo: |
| gr.Markdown( |
| f"## {TITLE}\n" |
| "A living playground: agents **navigate real maps/courses**, chase objectives, and animate smoothly.\n" |
| "Use **Autoplay** for hands-free demos (Pac-Chase feels like chaotic Pac-Man)." |
| ) |
|
|
| |
| w0 = init_world(seed=1337, env_key="pac_chase", map_key="Classic Pac-Chase") |
| w_state = gr.State(w0) |
| autoplay_on = gr.State(False) |
| highlight_state = gr.State(None) |
|
|
| with gr.Row(): |
| |
| arena = gr.HTML(label="Arena (Animated SVG)") |
|
|
| |
| with gr.Column(scale=1): |
| pov_img = gr.Image(label="Agent POV (Pseudo-3D)", type="numpy", width=VIEW_W, height=VIEW_H) |
| status_box = gr.Textbox(label="Status", lines=3) |
| agent_box = gr.Textbox(label="Agents", lines=10) |
|
|
| with gr.Row(): |
| events_box = gr.Textbox(label="Event Log", lines=10) |
|
|
| with gr.Row(): |
| with gr.Column(scale=2): |
| gr.Markdown("### Scenario Controls") |
| env_pick = gr.Radio( |
| choices=[ |
| (ENVS["pac_chase"].title, "pac_chase"), |
| (ENVS["ctf"].title, "ctf"), |
| (ENVS["treasure"].title, "treasure"), |
| (ENVS["resource"].title, "resource"), |
| ], |
| value="pac_chase", |
| label="Gameplay Type", |
| ) |
| map_pick = gr.Dropdown( |
| choices=list(MAP_BUILDERS.keys()), |
| value="Classic Pac-Chase", |
| label="Map / Course", |
| ) |
| seed_box = gr.Number(value=1337, precision=0, label="Seed") |
|
|
| with gr.Row(): |
| btn_apply = gr.Button("Apply (Env + Map + Seed)") |
| btn_reset = gr.Button("Reset (Same Env/Map/Seed)") |
|
|
| gr.Markdown("### Autoplay / Demo Mode") |
| autoplay_speed = gr.Slider(0.05, 0.8, value=0.18, step=0.01, label="Autoplay tick interval (sec)") |
| with gr.Row(): |
| btn_play = gr.Button("▶ Start Autoplay") |
| btn_pause = gr.Button("⏸ Stop Autoplay") |
| with gr.Row(): |
| run_n = gr.Number(value=25, precision=0, label="Run N ticks") |
| btn_run = gr.Button("Run") |
|
|
| with gr.Column(scale=2): |
| gr.Markdown("### Control & Camera") |
| with gr.Row(): |
| btn_ctrl = gr.Button("Swap Controlled Agent") |
| btn_pov = gr.Button("Swap POV Agent") |
| overlay = gr.Checkbox(value=True, label="POV Overlay Reticle") |
| auto_camera = gr.Checkbox(value=True, label="Auto Camera Cuts (Spectator Mode)") |
|
|
| gr.Markdown("### Agent Modes") |
| controlled_mode = gr.Radio(choices=["auto", "manual"], value="auto", label="Controlled Agent Mode") |
| other_mode = gr.Radio(choices=["auto", "manual"], value="auto", label="Other Agents Mode") |
| btn_modes = gr.Button("Apply Agent Modes") |
|
|
| gr.Markdown("### Manual Actions (Controlled Agent)") |
| with gr.Row(): |
| btn_L = gr.Button("L") |
| btn_F = gr.Button("F") |
| btn_R = gr.Button("R") |
| btn_I = gr.Button("I") |
|
|
| |
| timer = gr.Timer(value=0.18, active=False) |
|
|
| |
| def ui_refresh(world: World, highlight): |
| world.overlay = bool(world.overlay) |
| return (*rebuild_views(world, highlight=highlight), world, highlight) |
|
|
| def on_load(world: World, highlight): |
| return ui_refresh(world, highlight) |
|
|
| demo.load( |
| on_load, |
| inputs=[w_state, highlight_state], |
| outputs=[arena, pov_img, status_box, agent_box, events_box, w_state, highlight_state], |
| queue=True, |
| ) |
|
|
| |
| def apply_clicked(world: World, highlight, env_key: str, map_key: str, seed: int): |
| world = apply_env_map_seed(seed=seed, env_key=env_key, map_key=map_key) |
| world.overlay = True |
| world.auto_camera = True |
| highlight = None |
| return ui_refresh(world, highlight) |
|
|
| btn_apply.click( |
| apply_clicked, |
| inputs=[w_state, highlight_state, env_pick, map_pick, seed_box], |
| outputs=[arena, pov_img, status_box, agent_box, events_box, w_state, highlight_state], |
| queue=True, |
| ) |
|
|
| def reset_clicked(world: World, highlight): |
| world = init_world(seed=world.seed, env_key=world.env_key, map_key=world.map_key) |
| highlight = None |
| return ui_refresh(world, highlight) |
|
|
| btn_reset.click( |
| reset_clicked, |
| inputs=[w_state, highlight_state], |
| outputs=[arena, pov_img, status_box, agent_box, events_box, w_state, highlight_state], |
| queue=True, |
| ) |
|
|
| |
| def modes_clicked(world: World, highlight, cmode: str, omode: str): |
| set_agent_modes(world, cmode, omode) |
| return ui_refresh(world, highlight) |
|
|
| btn_modes.click( |
| modes_clicked, |
| inputs=[w_state, highlight_state, controlled_mode, other_mode], |
| outputs=[arena, pov_img, status_box, agent_box, events_box, w_state, highlight_state], |
| queue=True, |
| ) |
|
|
| def ctrl_clicked(world: World, highlight): |
| swap_controlled(world) |
| return ui_refresh(world, highlight) |
|
|
| btn_ctrl.click( |
| ctrl_clicked, |
| inputs=[w_state, highlight_state], |
| outputs=[arena, pov_img, status_box, agent_box, events_box, w_state, highlight_state], |
| queue=True, |
| ) |
|
|
| def pov_clicked(world: World, highlight): |
| swap_pov(world) |
| return ui_refresh(world, highlight) |
|
|
| btn_pov.click( |
| pov_clicked, |
| inputs=[w_state, highlight_state], |
| outputs=[arena, pov_img, status_box, agent_box, events_box, w_state, highlight_state], |
| queue=True, |
| ) |
|
|
| def overlay_changed(world: World, highlight, v: bool): |
| world.overlay = bool(v) |
| return ui_refresh(world, highlight) |
|
|
| overlay.change( |
| overlay_changed, |
| inputs=[w_state, highlight_state, overlay], |
| outputs=[arena, pov_img, status_box, agent_box, events_box, w_state, highlight_state], |
| queue=True, |
| ) |
|
|
| def auto_camera_changed(world: World, highlight, v: bool): |
| world.auto_camera = bool(v) |
| world.events.append(f"t={world.step}: auto_camera={world.auto_camera}") |
| return ui_refresh(world, highlight) |
|
|
| auto_camera.change( |
| auto_camera_changed, |
| inputs=[w_state, highlight_state, auto_camera], |
| outputs=[arena, pov_img, status_box, agent_box, events_box, w_state, highlight_state], |
| queue=True, |
| ) |
|
|
| |
| def manual_btn(world: World, highlight, act: str): |
| tick_world(world, manual=act) |
| return ui_refresh(world, highlight) |
|
|
| btn_L.click(lambda w,h: manual_btn(w,h,"L"), inputs=[w_state, highlight_state], outputs=[arena, pov_img, status_box, agent_box, events_box, w_state, highlight_state], queue=True) |
| btn_F.click(lambda w,h: manual_btn(w,h,"F"), inputs=[w_state, highlight_state], outputs=[arena, pov_img, status_box, agent_box, events_box, w_state, highlight_state], queue=True) |
| btn_R.click(lambda w,h: manual_btn(w,h,"R"), inputs=[w_state, highlight_state], outputs=[arena, pov_img, status_box, agent_box, events_box, w_state, highlight_state], queue=True) |
| btn_I.click(lambda w,h: manual_btn(w,h,"I"), inputs=[w_state, highlight_state], outputs=[arena, pov_img, status_box, agent_box, events_box, w_state, highlight_state], queue=True) |
|
|
| |
| def run_clicked(world: World, highlight, n: int): |
| n = max(1, int(n)) |
| for _ in range(n): |
| if world.done: |
| break |
| tick_world(world, manual=None) |
| return ui_refresh(world, highlight) |
|
|
| btn_run.click( |
| run_clicked, |
| inputs=[w_state, highlight_state, run_n], |
| outputs=[arena, pov_img, status_box, agent_box, events_box, w_state, highlight_state], |
| queue=True, |
| ) |
|
|
| |
| def autoplay_start(world: World, highlight, interval: float): |
| interval = float(interval) |
| return gr.update(value=interval, active=True), True, world, highlight |
|
|
| def autoplay_stop(world: World, highlight): |
| return gr.update(active=False), False, world, highlight |
|
|
| btn_play.click( |
| autoplay_start, |
| inputs=[w_state, highlight_state, autoplay_speed], |
| outputs=[timer, autoplay_on, w_state, highlight_state], |
| queue=True, |
| ) |
|
|
| btn_pause.click( |
| autoplay_stop, |
| inputs=[w_state, highlight_state], |
| outputs=[timer, autoplay_on, w_state, highlight_state], |
| queue=True, |
| ) |
|
|
| def autoplay_tick(world: World, highlight, is_on: bool): |
| if not is_on: |
| return (*rebuild_views(world, highlight=highlight), world, highlight, is_on, gr.update()) |
| if not world.done: |
| tick_world(world, manual=None) |
| |
| if world.done: |
| return (*rebuild_views(world, highlight=highlight), world, highlight, False, gr.update(active=False)) |
| return (*rebuild_views(world, highlight=highlight), world, highlight, True, gr.update()) |
|
|
| timer.tick( |
| autoplay_tick, |
| inputs=[w_state, highlight_state, autoplay_on], |
| outputs=[arena, pov_img, status_box, agent_box, events_box, w_state, highlight_state, autoplay_on, timer], |
| queue=True, |
| ) |
|
|
| demo.queue().launch(ssr_mode=False) |
|
|