| import math |
| import time |
| from collections import defaultdict, namedtuple |
| from dataclasses import dataclass, field |
| from itertools import combinations |
|
|
| |
| |
| |
|
|
| BOARD = 100.0 |
| CENTER_X = 50.0 |
| CENTER_Y = 50.0 |
| SUN_R = 10.0 |
| MAX_SPEED = 6.0 |
| SUN_SAFETY = 1.5 |
| ROTATION_LIMIT = 50.0 |
| TOTAL_STEPS = 500 |
| SIM_HORIZON = 130 |
| ROUTE_SEARCH_HORIZON = 60 |
| HORIZON = SIM_HORIZON |
| LAUNCH_CLEARANCE = 0.1 |
|
|
| EARLY_TURN_LIMIT = 40 |
| OPENING_TURN_LIMIT = 80 |
| LATE_REMAINING_TURNS = 70 |
| VERY_LATE_REMAINING_TURNS = 25 |
| TOTAL_WAR_REMAINING_TURNS = 55 |
|
|
| SAFE_NEUTRAL_MARGIN = 2 |
| CONTESTED_NEUTRAL_MARGIN = 2 |
| INTERCEPT_TOLERANCE = 1 |
|
|
| SAFE_OPENING_PROD_THRESHOLD = 4 |
| SAFE_OPENING_TURN_LIMIT = 10 |
| ROTATING_OPENING_MAX_TURNS = 13 |
| ROTATING_OPENING_LOW_PROD = 2 |
| FOUR_PLAYER_ROTATING_REACTION_GAP = 3 |
| FOUR_PLAYER_ROTATING_SEND_RATIO = 0.55 |
| FOUR_PLAYER_ROTATING_TURN_LIMIT = 10 |
|
|
| COMET_MAX_CHASE_TURNS = 10 |
|
|
| ATTACK_COST_TURN_WEIGHT = 0.50 |
| SNIPE_COST_TURN_WEIGHT = 0.42 |
| INDIRECT_VALUE_SCALE = 0.15 |
| INDIRECT_FRIENDLY_WEIGHT = 0.35 |
| INDIRECT_NEUTRAL_WEIGHT = 0.9 |
| INDIRECT_ENEMY_WEIGHT = 1.25 |
|
|
| STATIC_NEUTRAL_VALUE_MULT = 1.4 |
| STATIC_HOSTILE_VALUE_MULT = 1.65 |
| ROTATING_OPENING_VALUE_MULT = 0.9 |
| HOSTILE_TARGET_VALUE_MULT = 2.05 |
| OPENING_HOSTILE_TARGET_VALUE_MULT = 1.55 |
| SAFE_NEUTRAL_VALUE_MULT = 1.2 |
| CONTESTED_NEUTRAL_VALUE_MULT = 0.7 |
| EARLY_NEUTRAL_VALUE_MULT = 1.2 |
| COMET_VALUE_MULT = 0.65 |
| SNIPE_VALUE_MULT = 1.12 |
| SWARM_VALUE_MULT = 1.05 |
| REINFORCE_VALUE_MULT = 1.35 |
| CRASH_EXPLOIT_VALUE_MULT = 1.18 |
| FINISHING_HOSTILE_VALUE_MULT = 1.3 |
| BEHIND_ROTATING_NEUTRAL_VALUE_MULT = 0.92 |
| EXPOSED_PLANET_VALUE_MULT = 2.0 |
|
|
| |
| WEAKEST_ENEMY_VALUE_MULT_4P = 1.5 |
| WEAKEST_ENEMY_VALUE_MULT_2P = 1.25 |
| GANG_UP_VALUE_MULT = 1.4 |
| GANG_UP_POST_BATTLE_DELAY = 2 |
| GANG_UP_ETA_WINDOW = 4 |
|
|
| NEUTRAL_MARGIN_BASE = 2 |
| NEUTRAL_MARGIN_PROD_WEIGHT = 2 |
| NEUTRAL_MARGIN_CAP = 8 |
| HOSTILE_MARGIN_BASE = 3 |
| HOSTILE_MARGIN_PROD_WEIGHT = 2 |
| HOSTILE_MARGIN_CAP = 12 |
| HOSTILE_REINFORCE_HORIZON = 8 |
| HOSTILE_REINFORCE_RATIO = 0.25 |
| HOSTILE_REINFORCE_CAP = 15 |
| STATIC_TARGET_MARGIN = 4 |
| CONTESTED_TARGET_MARGIN = 5 |
| FOUR_PLAYER_TARGET_MARGIN = 2 |
| LONG_TRAVEL_MARGIN_START = 18 |
| LONG_TRAVEL_MARGIN_DIVISOR = 3 |
| LONG_TRAVEL_MARGIN_CAP = 8 |
| COMET_MARGIN_RELIEF = 6 |
| FINISHING_HOSTILE_SEND_BONUS = 5 |
|
|
| STATIC_TARGET_SCORE_MULT = 1.18 |
| EARLY_STATIC_NEUTRAL_SCORE_MULT = 1.25 |
| FOUR_PLAYER_ROTATING_NEUTRAL_SCORE_MULT = 0.84 |
| DENSE_STATIC_NEUTRAL_COUNT = 4 |
| DENSE_ROTATING_NEUTRAL_SCORE_MULT = 0.86 |
| SNIPE_SCORE_MULT = 1.12 |
| SWARM_SCORE_MULT = 1.06 |
| CRASH_EXPLOIT_SCORE_MULT = 1.05 |
|
|
| FOLLOWUP_MIN_SHIPS = 8 |
| LOW_VALUE_COMET_PRODUCTION = 1 |
| LATE_CAPTURE_BUFFER = 5 |
| VERY_LATE_CAPTURE_BUFFER = 3 |
|
|
| DEFENSE_LOOKAHEAD_TURNS = 28 |
| DEFENSE_COST_TURN_WEIGHT = 0.4 |
| DEFENSE_FRONTIER_SCORE_MULT = 1.12 |
| DEFENSE_SEND_MARGIN_BASE = 1 |
| DEFENSE_SEND_MARGIN_PROD_WEIGHT = 1 |
| DEFENSE_SHIP_VALUE = 0.55 |
|
|
| REINFORCE_ENABLED = True |
| REINFORCE_MIN_PRODUCTION = 2 |
| REINFORCE_MAX_TRAVEL_TURNS = 22 |
| REINFORCE_SAFETY_MARGIN = 2 |
| REINFORCE_MAX_SOURCE_FRACTION = 0.75 |
| REINFORCE_MIN_FUTURE_TURNS = 40 |
| REINFORCE_HOLD_LOOKAHEAD = 20 |
| REINFORCE_COST_TURN_WEIGHT = 0.35 |
|
|
| RECAPTURE_LOOKAHEAD_TURNS = 10 |
| RECAPTURE_COST_TURN_WEIGHT = 0.52 |
| RECAPTURE_VALUE_MULT = 0.88 |
| RECAPTURE_FRONTIER_MULT = 1.08 |
| RECAPTURE_PRODUCTION_WEIGHT = 0.6 |
| RECAPTURE_IMMEDIATE_WEIGHT = 0.4 |
|
|
| REAR_SOURCE_MIN_SHIPS = 16 |
| REAR_DISTANCE_RATIO = 1.25 |
| REAR_STAGE_PROGRESS = 0.78 |
| REAR_SEND_RATIO_TWO_PLAYER = 0.62 |
| REAR_SEND_RATIO_FOUR_PLAYER = 0.60 |
| REAR_SEND_MIN_SHIPS = 10 |
| REAR_MAX_TRAVEL_TURNS = 40 |
|
|
| PARTIAL_SOURCE_MIN_SHIPS = 6 |
| MULTI_SOURCE_TOP_K = 8 |
| MULTI_SOURCE_ETA_TOLERANCE = 2 |
| MULTI_SOURCE_PLAN_PENALTY = 0.97 |
| HOSTILE_SWARM_ETA_TOLERANCE = 1 |
| THREE_SOURCE_SWARM_ENABLED = True |
| THREE_SOURCE_MIN_TARGET_SHIPS = 20 |
| THREE_SOURCE_ETA_TOLERANCE = 1 |
| THREE_SOURCE_PLAN_PENALTY = 0.93 |
|
|
| |
| FOUR_SOURCE_SWARM_ENABLED = True |
| FOUR_SOURCE_ETA_TOLERANCE = 2 |
| FOUR_SOURCE_MIN_TARGET_SHIPS = 40 |
| FOUR_SOURCE_PLAN_PENALTY = 0.91 |
|
|
| PROACTIVE_DEFENSE_HORIZON = 12 |
| PROACTIVE_DEFENSE_RATIO = 0.28 |
| MULTI_ENEMY_PROACTIVE_HORIZON = 14 |
| MULTI_ENEMY_PROACTIVE_RATIO = 0.35 |
| MULTI_ENEMY_STACK_WINDOW = 4 |
| REACTION_SOURCE_TOP_K_MY = 4 |
| REACTION_SOURCE_TOP_K_ENEMY = 4 |
| PROACTIVE_ENEMY_TOP_K = 3 |
|
|
| CRASH_EXPLOIT_ENABLED = True |
| CRASH_EXPLOIT_MIN_TOTAL_SHIPS = 7 |
| CRASH_EXPLOIT_ETA_WINDOW = 3 |
| CRASH_EXPLOIT_POST_CRASH_DELAY = 1 |
|
|
| LATE_IMMEDIATE_SHIP_VALUE = 0.75 |
| WEAK_ENEMY_THRESHOLD = 110 |
| ELIMINATION_BONUS = 55.0 |
|
|
| BEHIND_DOMINATION = -0.20 |
| AHEAD_DOMINATION = 0.15 |
| FINISHING_DOMINATION = 0.28 |
| FINISHING_PROD_RATIO = 1.15 |
| AHEAD_ATTACK_MARGIN_BONUS = 0.12 |
| BEHIND_ATTACK_MARGIN_PENALTY = 0.05 |
| FINISHING_ATTACK_MARGIN_BONUS = 0.12 |
|
|
| DOOMED_EVAC_TURN_LIMIT = 24 |
| DOOMED_MIN_SHIPS = 8 |
|
|
| SOFT_ACT_DEADLINE = 0.82 |
| HEAVY_PHASE_MIN_TIME = 0.16 |
| OPTIONAL_PHASE_MIN_TIME = 0.08 |
| HEAVY_ROUTE_PLANET_LIMIT = 32 |
|
|
| |
| |
| |
|
|
| Planet = namedtuple( |
| "Planet", ["id", "owner", "x", "y", "radius", "ships", "production"] |
| ) |
| Fleet = namedtuple( |
| "Fleet", ["id", "owner", "x", "y", "angle", "from_planet_id", "ships"] |
| ) |
|
|
|
|
| @dataclass(frozen=True) |
| class ShotOption: |
| score: float |
| src_id: int |
| target_id: int |
| angle: float |
| turns: int |
| needed: int |
| send_cap: int |
| mission: str = "capture" |
| anchor_turn: int | None = None |
|
|
|
|
| @dataclass |
| class Mission: |
| kind: str |
| score: float |
| target_id: int |
| turns: int |
| options: list[ShotOption] = field(default_factory=list) |
|
|
| |
| |
| |
|
|
| def dist(ax, ay, bx, by): |
| return math.hypot(ax - bx, ay - by) |
|
|
|
|
| def orbital_radius(planet): |
| return dist(planet.x, planet.y, CENTER_X, CENTER_Y) |
|
|
|
|
| def is_static_planet(planet): |
| return orbital_radius(planet) + planet.radius >= ROTATION_LIMIT |
|
|
|
|
| def fleet_speed(ships): |
| if ships <= 1: |
| return 1.0 |
| ratio = math.log(ships) / math.log(1000.0) |
| ratio = max(0.0, min(1.0, ratio)) |
| return 1.0 + (MAX_SPEED - 1.0) * (ratio**1.5) |
|
|
|
|
| def point_to_segment_distance(px, py, x1, y1, x2, y2): |
| dx = x2 - x1 |
| dy = y2 - y1 |
| seg_len_sq = dx * dx + dy * dy |
| if seg_len_sq <= 1e-9: |
| return dist(px, py, x1, y1) |
| t = ((px - x1) * dx + (py - y1) * dy) / seg_len_sq |
| t = max(0.0, min(1.0, t)) |
| proj_x = x1 + t * dx |
| proj_y = y1 + t * dy |
| return dist(px, py, proj_x, proj_y) |
|
|
|
|
| def segment_hits_sun(x1, y1, x2, y2, safety=SUN_SAFETY): |
| return point_to_segment_distance(CENTER_X, CENTER_Y, x1, y1, x2, y2) < SUN_R + safety |
|
|
|
|
| def launch_point(sx, sy, sr, angle): |
| clearance = sr + LAUNCH_CLEARANCE |
| return sx + math.cos(angle) * clearance, sy + math.sin(angle) * clearance |
|
|
|
|
| def actual_path_geometry(sx, sy, sr, tx, ty, tr): |
| angle = math.atan2(ty - sy, tx - sx) |
| start_x, start_y = launch_point(sx, sy, sr, angle) |
| hit_distance = max(0.0, dist(sx, sy, tx, ty) - (sr + LAUNCH_CLEARANCE) - tr) |
| end_x = start_x + math.cos(angle) * hit_distance |
| end_y = start_y + math.sin(angle) * hit_distance |
| return angle, start_x, start_y, end_x, end_y, hit_distance |
|
|
|
|
| def safe_angle_and_distance(sx, sy, sr, tx, ty, tr): |
| angle, start_x, start_y, end_x, end_y, hit_distance = actual_path_geometry( |
| sx, sy, sr, tx, ty, tr, |
| ) |
| if segment_hits_sun(start_x, start_y, end_x, end_y): |
| return None |
| return angle, hit_distance |
|
|
|
|
| def predict_planet_position(planet, initial_by_id, angular_velocity, turns): |
| init = initial_by_id.get(planet.id) |
| if init is None: |
| return planet.x, planet.y |
| r = dist(init.x, init.y, CENTER_X, CENTER_Y) |
| if r + init.radius >= ROTATION_LIMIT: |
| return planet.x, planet.y |
| cur_ang = math.atan2(planet.y - CENTER_Y, planet.x - CENTER_X) |
| new_ang = cur_ang + angular_velocity * turns |
| return ( |
| CENTER_X + r * math.cos(new_ang), |
| CENTER_Y + r * math.sin(new_ang), |
| ) |
|
|
|
|
| def predict_comet_position(planet_id, comets, turns): |
| for group in comets: |
| pids = group.get("planet_ids", []) |
| if planet_id not in pids: |
| continue |
| idx = pids.index(planet_id) |
| paths = group.get("paths", []) |
| path_index = group.get("path_index", 0) |
| if idx >= len(paths): |
| return None |
| path = paths[idx] |
| future_idx = path_index + int(turns) |
| if 0 <= future_idx < len(path): |
| return path[future_idx][0], path[future_idx][1] |
| return None |
| return None |
|
|
|
|
| def comet_remaining_life(planet_id, comets): |
| for group in comets: |
| pids = group.get("planet_ids", []) |
| if planet_id not in pids: |
| continue |
| idx = pids.index(planet_id) |
| paths = group.get("paths", []) |
| path_index = group.get("path_index", 0) |
| if idx < len(paths): |
| return max(0, len(paths[idx]) - path_index) |
| return 0 |
|
|
|
|
| def estimate_arrival(sx, sy, sr, tx, ty, tr, ships): |
| safe = safe_angle_and_distance(sx, sy, sr, tx, ty, tr) |
| if safe is None: |
| return None |
| angle, total_d = safe |
| turns = max(1, int(math.ceil(total_d / fleet_speed(max(1, ships))))) |
| return angle, turns |
|
|
|
|
| def travel_time(sx, sy, sr, tx, ty, tr, ships): |
| est = estimate_arrival(sx, sy, sr, tx, ty, tr, ships) |
| if est is None: |
| return 10**9 |
| return est[1] |
|
|
|
|
| def predict_target_position(target, turns, initial_by_id, ang_vel, comets, comet_ids): |
| if target.id in comet_ids: |
| return predict_comet_position(target.id, comets, turns) |
| return predict_planet_position(target, initial_by_id, ang_vel, turns) |
|
|
|
|
| def target_can_move(target, initial_by_id, comet_ids): |
| if target.id in comet_ids: |
| return True |
| init = initial_by_id.get(target.id) |
| if init is None: |
| return False |
| r = dist(init.x, init.y, CENTER_X, CENTER_Y) |
| return r + init.radius < ROTATION_LIMIT |
|
|
|
|
| def search_safe_intercept(src, target, ships, initial_by_id, ang_vel, comets, comet_ids): |
| best = None |
| best_score = None |
| max_turns = min(HORIZON, ROUTE_SEARCH_HORIZON) |
| if target.id in comet_ids: |
| max_turns = min(max_turns, max(0, comet_remaining_life(target.id, comets) - 1)) |
|
|
| for candidate_turns in range(1, max_turns + 1): |
| pos = predict_target_position( |
| target, candidate_turns, initial_by_id, ang_vel, comets, comet_ids, |
| ) |
| if pos is None: |
| continue |
| est = estimate_arrival(src.x, src.y, src.radius, pos[0], pos[1], target.radius, ships) |
| if est is None: |
| continue |
| _, turns = est |
| if abs(turns - candidate_turns) > INTERCEPT_TOLERANCE: |
| continue |
|
|
| actual_turns = max(turns, candidate_turns) |
| actual_pos = predict_target_position( |
| target, actual_turns, initial_by_id, ang_vel, comets, comet_ids, |
| ) |
| if actual_pos is None: |
| continue |
|
|
| confirm = estimate_arrival( |
| src.x, src.y, src.radius, actual_pos[0], actual_pos[1], target.radius, ships, |
| ) |
| if confirm is None: |
| continue |
|
|
| delta = abs(confirm[1] - actual_turns) |
| if delta > INTERCEPT_TOLERANCE: |
| continue |
|
|
| score = (delta, confirm[1], candidate_turns) |
| if best is None or score < best_score: |
| best_score = score |
| best = (confirm[0], confirm[1], actual_pos[0], actual_pos[1]) |
|
|
| return best |
|
|
|
|
| def aim_with_prediction(src, target, ships, initial_by_id, ang_vel, comets, comet_ids): |
| est = estimate_arrival(src.x, src.y, src.radius, target.x, target.y, target.radius, ships) |
| if est is None: |
| if not target_can_move(target, initial_by_id, comet_ids): |
| return None |
| return search_safe_intercept( |
| src, target, ships, initial_by_id, ang_vel, comets, comet_ids, |
| ) |
|
|
| tx, ty = target.x, target.y |
| for _ in range(5): |
| _, turns = est |
| pos = predict_target_position(target, turns, initial_by_id, ang_vel, comets, comet_ids) |
| if pos is None: |
| return None |
| ntx, nty = pos |
| next_est = estimate_arrival(src.x, src.y, src.radius, ntx, nty, target.radius, ships) |
| if next_est is None: |
| if not target_can_move(target, initial_by_id, comet_ids): |
| return None |
| return search_safe_intercept( |
| src, target, ships, initial_by_id, ang_vel, comets, comet_ids, |
| ) |
| if ( |
| abs(ntx - tx) < 0.3 |
| and abs(nty - ty) < 0.3 |
| and abs(next_est[1] - turns) <= INTERCEPT_TOLERANCE |
| ): |
| return next_est[0], next_est[1], ntx, nty |
| tx, ty = ntx, nty |
| est = next_est |
|
|
| final_est = estimate_arrival(src.x, src.y, src.radius, tx, ty, target.radius, ships) |
| if final_est is None: |
| return search_safe_intercept( |
| src, target, ships, initial_by_id, ang_vel, comets, comet_ids, |
| ) |
| return final_est[0], final_est[1], tx, ty |
|
|
| |
| |
| |
|
|
| def fleet_target_planet(fleet, planets): |
| best_planet = None |
| best_time = 1e9 |
| dir_x = math.cos(fleet.angle) |
| dir_y = math.sin(fleet.angle) |
| speed = fleet_speed(fleet.ships) |
|
|
| for planet in planets: |
| dx = planet.x - fleet.x |
| dy = planet.y - fleet.y |
| proj = dx * dir_x + dy * dir_y |
| if proj < 0: |
| continue |
| perp_sq = dx * dx + dy * dy - proj * proj |
| radius_sq = planet.radius * planet.radius |
| if perp_sq >= radius_sq: |
| continue |
| hit_d = max(0.0, proj - math.sqrt(max(0.0, radius_sq - perp_sq))) |
| turns = hit_d / speed |
| if turns <= HORIZON and turns < best_time: |
| best_time = turns |
| best_planet = planet |
|
|
| if best_planet is None: |
| return None, None |
| return best_planet, int(math.ceil(best_time)) |
|
|
|
|
| def build_arrival_ledger(fleets, planets): |
| arrivals_by_planet = {planet.id: [] for planet in planets} |
| for fleet in fleets: |
| target, eta = fleet_target_planet(fleet, planets) |
| if target is None: |
| continue |
| arrivals_by_planet[target.id].append((eta, fleet.owner, int(fleet.ships))) |
| return arrivals_by_planet |
|
|
|
|
| def resolve_arrival_event(owner, garrison, arrivals): |
| by_owner = {} |
| for _, attacker_owner, ships in arrivals: |
| by_owner[attacker_owner] = by_owner.get(attacker_owner, 0) + ships |
|
|
| if not by_owner: |
| return owner, max(0.0, garrison) |
|
|
| sorted_players = sorted(by_owner.items(), key=lambda item: item[1], reverse=True) |
| top_owner, top_ships = sorted_players[0] |
|
|
| if len(sorted_players) > 1: |
| second_ships = sorted_players[1][1] |
| if top_ships == second_ships: |
| survivor_owner = -1 |
| survivor_ships = 0 |
| else: |
| survivor_owner = top_owner |
| survivor_ships = top_ships - second_ships |
| else: |
| survivor_owner = top_owner |
| survivor_ships = top_ships |
|
|
| if survivor_ships <= 0: |
| return owner, max(0.0, garrison) |
|
|
| if owner == survivor_owner: |
| return owner, garrison + survivor_ships |
|
|
| garrison -= survivor_ships |
| if garrison < 0: |
| return survivor_owner, -garrison |
| return owner, garrison |
|
|
|
|
| def normalize_arrivals(arrivals, horizon): |
| events = [] |
| for turns, owner, ships in arrivals: |
| if ships <= 0: |
| continue |
| eta = max(1, int(math.ceil(turns))) |
| if eta > horizon: |
| continue |
| events.append((eta, owner, int(ships))) |
| events.sort(key=lambda item: item[0]) |
| return events |
|
|
|
|
| def simulate_planet_timeline(planet, arrivals, player, horizon): |
| horizon = max(0, int(math.ceil(horizon))) |
| events = normalize_arrivals(arrivals, horizon) |
| by_turn = defaultdict(list) |
| for item in events: |
| by_turn[item[0]].append(item) |
|
|
| owner = planet.owner |
| garrison = float(planet.ships) |
| owner_at = {0: owner} |
| ships_at = {0: max(0.0, garrison)} |
| min_owned = garrison if owner == player else 0.0 |
| first_enemy = None |
| fall_turn = None |
|
|
| for turn in range(1, horizon + 1): |
| if owner != -1: |
| garrison += planet.production |
|
|
| group = by_turn.get(turn, []) |
| prev_owner = owner |
| if group: |
| if prev_owner == player and first_enemy is None: |
| if any(item[1] not in (-1, player) for item in group): |
| first_enemy = turn |
| owner, garrison = resolve_arrival_event(owner, garrison, group) |
| if prev_owner == player and owner != player and fall_turn is None: |
| fall_turn = turn |
|
|
| owner_at[turn] = owner |
| ships_at[turn] = max(0.0, garrison) |
| if owner == player: |
| min_owned = min(min_owned, garrison) |
|
|
| keep_needed = 0 |
| holds_full = True |
|
|
| if planet.owner == player: |
| def survives_with_keep(keep): |
| sim_owner = planet.owner |
| sim_garrison = float(keep) |
| for turn in range(1, horizon + 1): |
| if sim_owner != -1: |
| sim_garrison += planet.production |
| group = by_turn.get(turn, []) |
| if group: |
| sim_owner, sim_garrison = resolve_arrival_event(sim_owner, sim_garrison, group) |
| if sim_owner != player: |
| return False |
| return sim_owner == player |
|
|
| if survives_with_keep(int(planet.ships)): |
| lo, hi = 0, int(planet.ships) |
| while lo < hi: |
| mid = (lo + hi) // 2 |
| if survives_with_keep(mid): |
| hi = mid |
| else: |
| lo = mid + 1 |
| keep_needed = lo |
| else: |
| holds_full = False |
| keep_needed = int(planet.ships) |
|
|
| return { |
| "owner_at": owner_at, |
| "ships_at": ships_at, |
| "keep_needed": keep_needed, |
| "min_owned": max(0, int(math.floor(min_owned))) if planet.owner == player else 0, |
| "first_enemy": first_enemy, |
| "fall_turn": fall_turn, |
| "holds_full": holds_full, |
| "horizon": horizon, |
| } |
|
|
|
|
| def state_at_timeline(timeline, arrival_turn): |
| turn = max(0, int(math.ceil(arrival_turn))) |
| turn = min(turn, timeline["horizon"]) |
| owner = timeline["owner_at"].get(turn, timeline["owner_at"][timeline["horizon"]]) |
| ships = timeline["ships_at"].get(turn, timeline["ships_at"][timeline["horizon"]]) |
| return owner, max(0.0, ships) |
|
|
|
|
| def count_players(planets, fleets): |
| owners = set() |
| for planet in planets: |
| if planet.owner != -1: |
| owners.add(planet.owner) |
| for fleet in fleets: |
| owners.add(fleet.owner) |
| return max(2, len(owners)) |
|
|
|
|
| def nearest_distance_to_set(px, py, planets): |
| if not planets: |
| return 10**9 |
| return min(dist(px, py, planet.x, planet.y) for planet in planets) |
|
|
|
|
| def indirect_features(planet, planets, player): |
| friendly = 0.0 |
| neutral = 0.0 |
| enemy = 0.0 |
| for other in planets: |
| if other.id == planet.id: |
| continue |
| d = dist(planet.x, planet.y, other.x, other.y) |
| if d < 1: |
| continue |
| factor = other.production / (d + 12.0) |
| if other.owner == player: |
| friendly += factor |
| elif other.owner == -1: |
| neutral += factor |
| else: |
| enemy += factor |
| return friendly, neutral, enemy |
|
|
|
|
| def detect_exposed_enemy_planets(fleets, enemy_planets): |
| exposed = set() |
| for planet in enemy_planets: |
| outbound = sum( |
| int(f.ships) |
| for f in fleets |
| if f.owner == planet.owner and f.from_planet_id == planet.id and f.ships >= 5 |
| ) |
| if outbound >= 12 and outbound >= planet.ships * 0.8: |
| exposed.add(planet.id) |
| return exposed |
|
|
|
|
| def _compute_weakest_enemy(enemy_planets, owner_strength, owner_production): |
| """Return the player ID of the weakest enemy (lowest ships + future production).""" |
| enemy_owners = set(p.owner for p in enemy_planets) |
| if not enemy_owners: |
| return None |
| return min( |
| enemy_owners, |
| key=lambda owner: owner_strength.get(owner, 0) + owner_production.get(owner, 0) * 15, |
| ) |
|
|
|
|
| class WorldModel: |
| def __init__(self, player, step, planets, fleets, initial_by_id, ang_vel, comets, comet_ids): |
| self.player = player |
| self.step = step |
| self.planets = planets |
| self.fleets = fleets |
| self.initial_by_id = initial_by_id |
| self.ang_vel = ang_vel |
| self.comets = comets |
| self.comet_ids = set(comet_ids) |
|
|
| self.planet_by_id = {planet.id: planet for planet in planets} |
| self.my_planets = [planet for planet in planets if planet.owner == player] |
| self.enemy_planets = [planet for planet in planets if planet.owner not in (-1, player)] |
| self.neutral_planets = [planet for planet in planets if planet.owner == -1] |
| self.static_neutral_planets = [ |
| planet for planet in self.neutral_planets if is_static_planet(planet) |
| ] |
|
|
| |
| self.opp_planets = defaultdict(list) |
| for p in self.enemy_planets: |
| self.opp_planets[p.owner].append(p) |
|
|
| self.num_players = count_players(planets, fleets) |
| self.remaining_steps = max(1, TOTAL_STEPS - step) |
| self.is_early = step < EARLY_TURN_LIMIT |
| self.is_opening = step < OPENING_TURN_LIMIT |
| self.is_late = self.remaining_steps < LATE_REMAINING_TURNS |
| self.is_very_late = self.remaining_steps < VERY_LATE_REMAINING_TURNS |
| self.is_total_war = self.remaining_steps < TOTAL_WAR_REMAINING_TURNS |
| self.is_four_player = self.num_players >= 4 |
|
|
| self.owner_strength = defaultdict(int) |
| self.owner_production = defaultdict(int) |
| for planet in planets: |
| if planet.owner != -1: |
| self.owner_strength[planet.owner] += int(planet.ships) |
| self.owner_production[planet.owner] += int(planet.production) |
| for fleet in fleets: |
| self.owner_strength[fleet.owner] += int(fleet.ships) |
|
|
| self.my_total = self.owner_strength.get(player, 0) |
| self.enemy_total = sum( |
| strength for owner, strength in self.owner_strength.items() if owner != player |
| ) |
| self.max_enemy_strength = max( |
| (strength for owner, strength in self.owner_strength.items() if owner != player), |
| default=0, |
| ) |
| self.my_prod = self.owner_production.get(player, 0) |
| self.enemy_prod = sum( |
| production |
| for owner, production in self.owner_production.items() |
| if owner != player |
| ) |
|
|
| |
| self._weakest_enemy = _compute_weakest_enemy( |
| self.enemy_planets, self.owner_strength, self.owner_production |
| ) |
| self._weakest_enemy_strength = ( |
| self.owner_strength.get(self._weakest_enemy, 0) |
| if self._weakest_enemy is not None else 0 |
| ) |
|
|
| self.arrivals_by_planet = build_arrival_ledger(fleets, planets) |
| self.base_timeline = { |
| planet.id: simulate_planet_timeline( |
| planet, |
| self.arrivals_by_planet[planet.id], |
| player, |
| HORIZON, |
| ) |
| for planet in planets |
| } |
| self.keep_needed_map = { |
| planet.id: self.base_timeline[planet.id]["keep_needed"] for planet in planets |
| } |
| self.min_owned_map = { |
| planet.id: self.base_timeline[planet.id]["min_owned"] for planet in planets |
| } |
| self.first_enemy_map = { |
| planet.id: self.base_timeline[planet.id]["first_enemy"] for planet in planets |
| } |
| self.fall_turn_map = { |
| planet.id: self.base_timeline[planet.id]["fall_turn"] for planet in planets |
| } |
| self.holds_full_map = { |
| planet.id: self.base_timeline[planet.id]["holds_full"] for planet in planets |
| } |
| self.indirect_feature_map = { |
| planet.id: indirect_features(planet, planets, player) for planet in planets |
| } |
| self.exposed_planet_ids = detect_exposed_enemy_planets(fleets, self.enemy_planets) |
| self.shot_cache = {} |
| self.probe_candidate_cache = {} |
| self.best_probe_cache = {} |
| self.reaction_cache = {} |
| self.exact_need_cache = {} |
|
|
| self.total_visible_ships = sum(int(planet.ships) for planet in planets) + sum( |
| int(fleet.ships) for fleet in fleets |
| ) |
| self.total_production = sum(int(planet.production) for planet in planets) |
|
|
| def is_static(self, planet_id): |
| return is_static_planet(self.planet_by_id[planet_id]) |
|
|
| def comet_life(self, planet_id): |
| return comet_remaining_life(planet_id, self.comets) |
|
|
| def source_inventory_left(self, source_id, spent_total): |
| return max(0, int(self.planet_by_id[source_id].ships) - spent_total[source_id]) |
|
|
| def plan_shot(self, src_id, target_id, ships): |
| ships = int(ships) |
| key = (src_id, target_id, ships) |
| if key in self.shot_cache: |
| return self.shot_cache[key] |
| src = self.planet_by_id[src_id] |
| target = self.planet_by_id[target_id] |
| result = aim_with_prediction( |
| src, target, ships, self.initial_by_id, self.ang_vel, self.comets, self.comet_ids, |
| ) |
| self.shot_cache[key] = result |
| return result |
|
|
| def probe_ship_candidates(self, src_id, target_id, source_cap, hints=()): |
| source_cap = max(1, int(source_cap)) |
| normalized_hints = tuple( |
| int(math.ceil(hint)) for hint in hints if hint is not None |
| ) |
| cache_key = (src_id, target_id, source_cap, normalized_hints) |
| cached = self.probe_candidate_cache.get(cache_key) |
| if cached is not None: |
| return cached |
| target = self.planet_by_id[target_id] |
| target_ships = max(1, int(math.ceil(target.ships))) |
|
|
| values = set(range(1, min(6, source_cap) + 1)) |
| values.update({ |
| source_cap, |
| max(1, source_cap // 2), |
| max(1, source_cap // 3), |
| min(source_cap, PARTIAL_SOURCE_MIN_SHIPS), |
| min(source_cap, target_ships + 1), |
| min(source_cap, target_ships + 2), |
| min(source_cap, target_ships + 4), |
| min(source_cap, target_ships + 8), |
| }) |
|
|
| for hint in normalized_hints: |
| base = max(1, min(source_cap, hint)) |
| for delta in (-2, -1, 0, 1, 2): |
| candidate = base + delta |
| if 1 <= candidate <= source_cap: |
| values.add(candidate) |
|
|
| result = sorted(values) |
| self.probe_candidate_cache[cache_key] = result |
| return result |
|
|
| def best_probe_aim( |
| self, |
| src_id, |
| target_id, |
| source_cap, |
| hints=(), |
| min_turn=None, |
| max_turn=None, |
| anchor_turn=None, |
| max_anchor_diff=None, |
| ): |
| cache_key = ( |
| src_id, |
| target_id, |
| max(1, int(source_cap)), |
| tuple(hints), |
| min_turn, |
| max_turn, |
| anchor_turn, |
| max_anchor_diff, |
| ) |
| if cache_key in self.best_probe_cache: |
| return self.best_probe_cache[cache_key] |
|
|
| best = None |
| best_key = None |
|
|
| for ships in self.probe_ship_candidates(src_id, target_id, source_cap, hints=hints): |
| aim = self.plan_shot(src_id, target_id, ships) |
| if aim is None: |
| continue |
|
|
| angle, turns, dist_to_target, path_target = aim |
| if min_turn is not None and turns < min_turn: |
| continue |
| if max_turn is not None and turns > max_turn: |
| continue |
| if ( |
| anchor_turn is not None |
| and max_anchor_diff is not None |
| and abs(turns - anchor_turn) > max_anchor_diff |
| ): |
| continue |
|
|
| if anchor_turn is None: |
| key = (turns, ships) |
| else: |
| key = (abs(turns - anchor_turn), turns, ships) |
|
|
| if best_key is None or key < best_key: |
| best_key = key |
| best = (ships, (angle, turns, dist_to_target, path_target)) |
|
|
| self.best_probe_cache[cache_key] = best |
| return best |
|
|
| def reaction_times(self, target_id): |
| cached = self.reaction_cache.get(target_id) |
| if cached is not None: |
| return cached |
|
|
| target = self.planet_by_id[target_id] |
| my_t = 10**9 |
| for planet in self.my_planets: |
| seeded = self.best_probe_aim(planet.id, target.id, max(1, int(planet.ships))) |
| if seeded is None: |
| continue |
| _, aim = seeded |
| my_t = min(my_t, aim[1]) |
|
|
| enemy_t = 10**9 |
| for planet in self.enemy_planets: |
| seeded = self.best_probe_aim(planet.id, target.id, max(1, int(planet.ships))) |
| if seeded is None: |
| continue |
| _, aim = seeded |
| enemy_t = min(enemy_t, aim[1]) |
|
|
| cached = (my_t, enemy_t) |
| self.reaction_cache[target_id] = cached |
| return cached |
|
|
| def projected_state(self, target_id, arrival_turn, planned_commitments=None, extra_arrivals=()): |
| planned_commitments = planned_commitments or {} |
| cutoff = max(1, int(math.ceil(arrival_turn))) |
| if not planned_commitments.get(target_id) and not extra_arrivals: |
| return state_at_timeline(self.base_timeline[target_id], cutoff) |
|
|
| arrivals = [ |
| item |
| for item in self.arrivals_by_planet.get(target_id, []) |
| if item[0] <= cutoff |
| ] |
| arrivals.extend( |
| item |
| for item in planned_commitments.get(target_id, []) |
| if item[0] <= cutoff |
| ) |
| arrivals.extend(item for item in extra_arrivals if item[0] <= cutoff) |
|
|
| target = self.planet_by_id[target_id] |
| dyn = simulate_planet_timeline(target, arrivals, self.player, cutoff) |
| return state_at_timeline(dyn, cutoff) |
|
|
| def projected_timeline(self, target_id, horizon, planned_commitments=None, extra_arrivals=()): |
| planned_commitments = planned_commitments or {} |
| horizon = max(1, int(math.ceil(horizon))) |
| arrivals = [ |
| item for item in self.arrivals_by_planet.get(target_id, []) if item[0] <= horizon |
| ] |
| arrivals.extend( |
| item for item in planned_commitments.get(target_id, []) if item[0] <= horizon |
| ) |
| arrivals.extend(item for item in extra_arrivals if item[0] <= horizon) |
| target = self.planet_by_id[target_id] |
| return simulate_planet_timeline(target, arrivals, self.player, horizon) |
|
|
| def hold_status(self, target_id, planned_commitments=None, horizon=HORIZON): |
| planned_commitments = planned_commitments or {} |
| if planned_commitments.get(target_id): |
| tl = self.projected_timeline( |
| target_id, horizon, planned_commitments=planned_commitments, |
| ) |
| else: |
| tl = self.base_timeline[target_id] |
| return { |
| "keep_needed": tl["keep_needed"], |
| "min_owned": tl["min_owned"], |
| "first_enemy": tl["first_enemy"], |
| "fall_turn": tl["fall_turn"], |
| "holds_full": tl["holds_full"], |
| } |
|
|
| def _ownership_search_cap(self, eval_turn): |
| productive_cap = self.total_production * max(2, eval_turn + 2) |
| return max(32, int(self.total_visible_ships + productive_cap + 32)) |
|
|
| def min_ships_to_own_by( |
| self, |
| target_id, |
| eval_turn, |
| attacker_owner, |
| arrival_turn=None, |
| planned_commitments=None, |
| extra_arrivals=(), |
| upper_bound=None, |
| ): |
| planned_commitments = planned_commitments or {} |
| eval_turn = max(1, int(math.ceil(eval_turn))) |
| arrival_turn = eval_turn if arrival_turn is None else max(1, int(math.ceil(arrival_turn))) |
| if arrival_turn > eval_turn: |
| if upper_bound is not None: |
| return max(1, int(upper_bound)) + 1 |
| return self._ownership_search_cap(eval_turn) + 1 |
|
|
| normalized_extra = tuple( |
| (max(1, int(math.ceil(turns))), owner, int(ships)) |
| for turns, owner, ships in extra_arrivals |
| if ships > 0 and max(1, int(math.ceil(turns))) <= eval_turn |
| ) |
|
|
| cache_key = None |
| if ( |
| arrival_turn == eval_turn |
| and not planned_commitments.get(target_id) |
| and not normalized_extra |
| ): |
| cache_key = (target_id, eval_turn, attacker_owner) |
| cached = self.exact_need_cache.get(cache_key) |
| if cached is not None: |
| return cached |
|
|
| owner_before, ships_before = self.projected_state( |
| target_id, |
| eval_turn, |
| planned_commitments=planned_commitments, |
| extra_arrivals=normalized_extra, |
| ) |
| if owner_before == attacker_owner: |
| if cache_key is not None: |
| self.exact_need_cache[cache_key] = 0 |
| return 0 |
|
|
| def owns_at(ships): |
| owner_after, _ = self.projected_state( |
| target_id, |
| eval_turn, |
| planned_commitments=planned_commitments, |
| extra_arrivals=normalized_extra + ((arrival_turn, attacker_owner, int(ships)),), |
| ) |
| return owner_after == attacker_owner |
|
|
| if upper_bound is not None: |
| hi = max(1, int(upper_bound)) |
| if not owns_at(hi): |
| return hi + 1 |
| else: |
| hi = max(1, int(math.ceil(ships_before)) + 1) |
| search_cap = self._ownership_search_cap(eval_turn) |
| while hi <= search_cap and not owns_at(hi): |
| hi *= 2 |
| if hi > search_cap: |
| hi = search_cap |
| if not owns_at(hi): |
| return hi + 1 |
|
|
| lo = 1 |
| while lo < hi: |
| mid = (lo + hi) // 2 |
| if owns_at(mid): |
| hi = mid |
| else: |
| lo = mid + 1 |
|
|
| if cache_key is not None: |
| self.exact_need_cache[cache_key] = lo |
| return lo |
|
|
| def min_ships_to_own_at( |
| self, |
| target_id, |
| arrival_turn, |
| attacker_owner, |
| planned_commitments=None, |
| extra_arrivals=(), |
| upper_bound=None, |
| ): |
| return self.min_ships_to_own_by( |
| target_id, |
| arrival_turn, |
| attacker_owner, |
| arrival_turn=arrival_turn, |
| planned_commitments=planned_commitments, |
| extra_arrivals=extra_arrivals, |
| upper_bound=upper_bound, |
| ) |
|
|
| def reinforcement_needed_to_hold_until( |
| self, |
| planet_id, |
| arrival_turn, |
| hold_until, |
| planned_commitments=None, |
| upper_bound=None, |
| ): |
| planned_commitments = planned_commitments or {} |
| target = self.planet_by_id[planet_id] |
| arrival_turn = max(1, int(math.ceil(arrival_turn))) |
| hold_until = max(arrival_turn, int(math.ceil(hold_until))) |
|
|
| if target.owner != self.player: |
| return self.min_ships_to_own_by( |
| planet_id, |
| hold_until, |
| self.player, |
| arrival_turn=arrival_turn, |
| planned_commitments=planned_commitments, |
| upper_bound=upper_bound, |
| ) |
|
|
| def holds_with_reinforcement(ships): |
| timeline = self.projected_timeline( |
| planet_id, |
| hold_until, |
| planned_commitments=planned_commitments, |
| extra_arrivals=((arrival_turn, self.player, int(ships)),), |
| ) |
| for turn in range(arrival_turn, hold_until + 1): |
| if timeline["owner_at"].get(turn) != self.player: |
| return False |
| return True |
|
|
| if upper_bound is not None: |
| hi = max(1, int(upper_bound)) |
| if not holds_with_reinforcement(hi): |
| return hi + 1 |
| else: |
| hi = 1 |
| search_cap = self._ownership_search_cap(hold_until) |
| while hi <= search_cap and not holds_with_reinforcement(hi): |
| hi *= 2 |
| if hi > search_cap: |
| hi = search_cap |
| if not holds_with_reinforcement(hi): |
| return hi + 1 |
|
|
| lo = 1 |
| while lo < hi: |
| mid = (lo + hi) // 2 |
| if holds_with_reinforcement(mid): |
| hi = mid |
| else: |
| lo = mid + 1 |
| return lo |
|
|
| def ships_needed_to_capture( |
| self, |
| target_id, |
| arrival_turn, |
| planned_commitments=None, |
| extra_arrivals=(), |
| ): |
| return self.min_ships_to_own_at( |
| target_id, |
| arrival_turn, |
| self.player, |
| planned_commitments=planned_commitments, |
| extra_arrivals=extra_arrivals, |
| ) |
|
|
| |
| |
| |
|
|
| def planet_distance(first, second): |
| return math.hypot(first.x - second.x, first.y - second.y) |
|
|
|
|
| def nearest_sources_to_target(target, sources, top_k): |
| if top_k <= 0 or len(sources) <= top_k: |
| return sources |
| return sorted( |
| sources, |
| key=lambda src: (planet_distance(src, target), -int(src.ships), src.id), |
| )[:top_k] |
|
|
|
|
| def min_legal_reaction_time(target, sources, world): |
| best = 10**9 |
| for src in sources: |
| seeded = world.best_probe_aim(src.id, target.id, max(1, int(src.ships))) |
| if seeded is None: |
| continue |
| _, aim = seeded |
| best = min(best, aim[1]) |
| return best |
|
|
|
|
| def policy_reaction_times(target_id, policy): |
| return policy["reaction_time_map"].get(target_id, (10**9, 10**9)) |
|
|
|
|
| def candidate_time_valid(target, turns, world, remaining_buffer): |
| if turns > world.remaining_steps - remaining_buffer: |
| return False |
| if target.id in world.comet_ids: |
| life = world.comet_life(target.id) |
| if turns >= life or turns > COMET_MAX_CHASE_TURNS: |
| return False |
| return True |
|
|
|
|
| def stacked_enemy_proactive_keep(planet, world): |
| threats = [] |
| for enemy in world.enemy_planets: |
| seeded = world.best_probe_aim(enemy.id, planet.id, max(1, int(enemy.ships))) |
| if seeded is None: |
| continue |
| _, aim = seeded |
| eta = aim[1] |
| if eta > MULTI_ENEMY_PROACTIVE_HORIZON: |
| continue |
| threats.append((eta, int(enemy.ships))) |
|
|
| if not threats: |
| return 0 |
|
|
| threats.sort() |
| best_stacked = 0 |
| left = 0 |
| running = 0 |
| for right in range(len(threats)): |
| running += threats[right][1] |
| while threats[right][0] - threats[left][0] > MULTI_ENEMY_STACK_WINDOW: |
| running -= threats[left][1] |
| left += 1 |
| best_stacked = max(best_stacked, running) |
|
|
| return int(best_stacked * MULTI_ENEMY_PROACTIVE_RATIO) |
|
|
|
|
| def swarm_eta_tolerance(options, target, world): |
| if len(options) >= 3: |
| return THREE_SOURCE_ETA_TOLERANCE |
| if target.owner not in (-1, world.player): |
| return HOSTILE_SWARM_ETA_TOLERANCE |
| return MULTI_SOURCE_ETA_TOLERANCE |
|
|
|
|
| def detect_enemy_crashes(world): |
| crashes = [] |
| for target_id, arrivals in world.arrivals_by_planet.items(): |
| enemy_events = [ |
| (int(math.ceil(eta)), owner, int(ships)) |
| for eta, owner, ships in arrivals |
| if owner not in (-1, world.player) and ships > 0 |
| ] |
| enemy_events.sort() |
| for i in range(len(enemy_events)): |
| eta_a, owner_a, ships_a = enemy_events[i] |
| for j in range(i + 1, len(enemy_events)): |
| eta_b, owner_b, ships_b = enemy_events[j] |
| if owner_a == owner_b: |
| continue |
| if abs(eta_a - eta_b) > CRASH_EXPLOIT_ETA_WINDOW: |
| break |
| if ships_a + ships_b < CRASH_EXPLOIT_MIN_TOTAL_SHIPS: |
| continue |
| crashes.append({ |
| "target_id": target_id, |
| "crash_turn": max(eta_a, eta_b), |
| "owners": (owner_a, owner_b), |
| "ships": (ships_a, ships_b), |
| }) |
| return crashes |
|
|
|
|
| def detect_enemy_planet_battles(world): |
| """ |
| Detect where an enemy fleet is attacking another enemy's owned planet. |
| These create attack windows β the winning side will be weakened. |
| Returns list of dicts with target_id, battle_turn, estimated post_battle_ships. |
| """ |
| battles = [] |
| for target in world.enemy_planets: |
| attacking_fleets = [ |
| (int(math.ceil(eta)), owner, int(ships)) |
| for eta, owner, ships in world.arrivals_by_planet.get(target.id, []) |
| if owner not in (-1, world.player) and owner != target.owner and ships > 0 |
| ] |
| if not attacking_fleets: |
| continue |
|
|
| for eta, attacker_owner, attacker_ships in attacking_fleets: |
| |
| garrison_at_eta = target.ships + target.production * eta |
| surviving_attacker = max(0, attacker_ships - garrison_at_eta) |
| surviving_defender = max(0, garrison_at_eta - attacker_ships) |
|
|
| if attacker_ships > garrison_at_eta: |
| post_owner = attacker_owner |
| post_ships = surviving_attacker |
| else: |
| post_owner = target.owner |
| post_ships = surviving_defender |
|
|
| |
| if post_ships < 25: |
| battles.append({ |
| "target_id": target.id, |
| "battle_turn": eta, |
| "post_owner": post_owner, |
| "post_ships": post_ships, |
| "original_owner": target.owner, |
| }) |
|
|
| return battles |
|
|
|
|
| def build_policy_state(world, deadline=None): |
| def expired(): |
| return deadline is not None and time.perf_counter() > deadline |
|
|
| indirect_wealth_map = {} |
| for target_id, features in world.indirect_feature_map.items(): |
| friendly, neutral, enemy = features |
| indirect_wealth_map[target_id] = ( |
| friendly * INDIRECT_FRIENDLY_WEIGHT |
| + neutral * INDIRECT_NEUTRAL_WEIGHT |
| + enemy * INDIRECT_ENEMY_WEIGHT |
| ) |
|
|
| reserve = {} |
| attack_budget = {} |
| reaction_time_map = {} |
|
|
| for target in world.planets: |
| if expired(): |
| break |
| if target.owner == world.player: |
| continue |
| my_sources = nearest_sources_to_target(target, world.my_planets, REACTION_SOURCE_TOP_K_MY) |
| enemy_sources = nearest_sources_to_target(target, world.enemy_planets, REACTION_SOURCE_TOP_K_ENEMY) |
| my_t = min_legal_reaction_time(target, my_sources, world) |
| enemy_t = min_legal_reaction_time(target, enemy_sources, world) |
| reaction_time_map[target.id] = (my_t, enemy_t) |
|
|
| for planet in world.my_planets: |
| if expired(): |
| break |
| exact_keep = world.keep_needed_map.get(planet.id, 0) |
|
|
| proactive_keep = 0 |
| for enemy in nearest_sources_to_target(planet, world.enemy_planets, PROACTIVE_ENEMY_TOP_K): |
| enemy_aim = world.plan_shot(enemy.id, planet.id, max(1, int(enemy.ships))) |
| if enemy_aim is None: |
| continue |
| enemy_eta = enemy_aim[1] |
| if enemy_eta > PROACTIVE_DEFENSE_HORIZON: |
| continue |
| proactive_keep = max(proactive_keep, int(enemy.ships * PROACTIVE_DEFENSE_RATIO)) |
| proactive_keep = max(proactive_keep, stacked_enemy_proactive_keep(planet, world)) |
|
|
| if world.is_total_war: |
| exact_keep = min(exact_keep, max(1, exact_keep // 2)) |
| proactive_keep = min(proactive_keep, max(1, proactive_keep // 2)) |
|
|
| reserve[planet.id] = min(int(planet.ships), max(exact_keep, proactive_keep)) |
| attack_budget[planet.id] = max(0, int(planet.ships) - reserve[planet.id]) |
|
|
| return { |
| "indirect_wealth_map": indirect_wealth_map, |
| "reserve": reserve, |
| "attack_budget": attack_budget, |
| "reaction_time_map": reaction_time_map, |
| } |
|
|
|
|
| def build_modes(world): |
| domination = (world.my_total - world.enemy_total) / max(1, world.my_total + world.enemy_total) |
| is_behind = domination < BEHIND_DOMINATION |
| is_ahead = domination > AHEAD_DOMINATION |
| is_dominating = is_ahead or ( |
| world.max_enemy_strength > 0 and world.my_total > world.max_enemy_strength * 1.25 |
| ) |
| is_finishing = ( |
| domination > FINISHING_DOMINATION |
| and world.my_prod > world.enemy_prod * FINISHING_PROD_RATIO |
| and world.step > 80 |
| ) |
|
|
| attack_margin_mult = 1.0 |
| if is_ahead: |
| attack_margin_mult += AHEAD_ATTACK_MARGIN_BONUS |
| if is_behind: |
| attack_margin_mult -= BEHIND_ATTACK_MARGIN_PENALTY |
| if is_finishing: |
| attack_margin_mult += FINISHING_ATTACK_MARGIN_BONUS |
|
|
| return { |
| "domination": domination, |
| "is_behind": is_behind, |
| "is_ahead": is_ahead, |
| "is_dominating": is_dominating, |
| "is_finishing": is_finishing, |
| "attack_margin_mult": attack_margin_mult, |
| } |
|
|
|
|
| def is_safe_neutral(target, policy): |
| if target.owner != -1: |
| return False |
| my_t, enemy_t = policy_reaction_times(target.id, policy) |
| return my_t <= enemy_t - SAFE_NEUTRAL_MARGIN |
|
|
|
|
| def is_contested_neutral(target, policy): |
| if target.owner != -1: |
| return False |
| my_t, enemy_t = policy_reaction_times(target.id, policy) |
| return abs(my_t - enemy_t) <= CONTESTED_NEUTRAL_MARGIN |
|
|
|
|
| def opening_filter(target, arrival_turns, needed, src_available, world, policy): |
| if not world.is_opening or target.owner != -1: |
| return False |
| if target.id in world.comet_ids: |
| return False |
| if world.is_static(target.id): |
| return False |
|
|
| my_t, enemy_t = policy_reaction_times(target.id, policy) |
| reaction_gap = enemy_t - my_t |
| if ( |
| target.production >= SAFE_OPENING_PROD_THRESHOLD |
| and arrival_turns <= SAFE_OPENING_TURN_LIMIT |
| and reaction_gap >= SAFE_NEUTRAL_MARGIN |
| ): |
| return False |
|
|
| if world.is_four_player: |
| affordable = needed <= max( |
| PARTIAL_SOURCE_MIN_SHIPS, |
| int(src_available * FOUR_PLAYER_ROTATING_SEND_RATIO), |
| ) |
| if ( |
| affordable |
| and arrival_turns <= FOUR_PLAYER_ROTATING_TURN_LIMIT |
| and reaction_gap >= FOUR_PLAYER_ROTATING_REACTION_GAP |
| ): |
| return False |
| return True |
|
|
| return arrival_turns > ROTATING_OPENING_MAX_TURNS or target.production <= ROTATING_OPENING_LOW_PROD |
|
|
|
|
| def target_value(target, arrival_turns, mission, world, modes, policy): |
| turns_profit = max(1, world.remaining_steps - arrival_turns) |
| if target.id in world.comet_ids: |
| life = world.comet_life(target.id) |
| turns_profit = max(0, min(turns_profit, life - arrival_turns)) |
| if turns_profit <= 0: |
| return -1.0 |
|
|
| value = target.production * turns_profit |
| value += policy["indirect_wealth_map"][target.id] * turns_profit * INDIRECT_VALUE_SCALE |
|
|
| if world.is_static(target.id): |
| value *= STATIC_NEUTRAL_VALUE_MULT if target.owner == -1 else STATIC_HOSTILE_VALUE_MULT |
| else: |
| value *= ROTATING_OPENING_VALUE_MULT if world.is_opening else 1.0 |
|
|
| if target.owner not in (-1, world.player): |
| value *= OPENING_HOSTILE_TARGET_VALUE_MULT if world.is_opening else HOSTILE_TARGET_VALUE_MULT |
|
|
| if target.owner == -1: |
| if is_safe_neutral(target, policy): |
| value *= SAFE_NEUTRAL_VALUE_MULT |
| elif is_contested_neutral(target, policy): |
| value *= CONTESTED_NEUTRAL_VALUE_MULT |
| if world.is_early: |
| value *= EARLY_NEUTRAL_VALUE_MULT |
|
|
| if target.id in world.comet_ids: |
| value *= COMET_VALUE_MULT |
|
|
| if mission == "snipe": |
| value *= SNIPE_VALUE_MULT |
| elif mission == "swarm": |
| value *= SWARM_VALUE_MULT |
| elif mission == "reinforce": |
| value *= REINFORCE_VALUE_MULT |
| elif mission == "crash_exploit": |
| value *= CRASH_EXPLOIT_VALUE_MULT |
|
|
| if target.id in world.exposed_planet_ids: |
| value *= EXPOSED_PLANET_VALUE_MULT |
|
|
| if world.is_late: |
| value += max(0, target.ships) * LATE_IMMEDIATE_SHIP_VALUE |
|
|
| |
| if target.owner not in (-1, world.player): |
| enemy_strength = world.owner_strength.get(target.owner, 0) |
| if enemy_strength <= WEAK_ENEMY_THRESHOLD: |
| value += ELIMINATION_BONUS |
|
|
| |
| if target.owner not in (-1, world.player) and world._weakest_enemy is not None: |
| if target.owner == world._weakest_enemy: |
| mult = WEAKEST_ENEMY_VALUE_MULT_4P if world.is_four_player else WEAKEST_ENEMY_VALUE_MULT_2P |
| value *= mult |
|
|
| if modes["is_finishing"] and target.owner not in (-1, world.player): |
| value *= FINISHING_HOSTILE_VALUE_MULT |
| if modes["is_behind"] and target.owner == -1 and not world.is_static(target.id): |
| value *= BEHIND_ROTATING_NEUTRAL_VALUE_MULT |
| if modes["is_behind"] and target.owner == -1 and is_safe_neutral(target, policy): |
| value *= 1.08 |
| if modes["is_dominating"] and target.owner == -1 and is_contested_neutral(target, policy): |
| value *= 0.92 |
|
|
| return value |
|
|
|
|
| def reinforce_value(target, hold_until, world, policy): |
| saved_turns = max(1, world.remaining_steps - hold_until) |
| value = target.production * saved_turns + max(0, target.ships) * DEFENSE_SHIP_VALUE |
| if world.enemy_planets and nearest_distance_to_set(target.x, target.y, world.enemy_planets) < 22: |
| value *= DEFENSE_FRONTIER_SCORE_MULT |
| value += policy["indirect_wealth_map"][target.id] * saved_turns * INDIRECT_VALUE_SCALE * 0.35 |
| return value * REINFORCE_VALUE_MULT |
|
|
|
|
| def preferred_send(target, base_needed, arrival_turns, src_available, world, modes, policy): |
| send = max(base_needed, int(math.ceil(base_needed * modes["attack_margin_mult"]))) |
| margin = 0 |
| if target.owner == -1: |
| margin += min( |
| NEUTRAL_MARGIN_CAP, |
| NEUTRAL_MARGIN_BASE + target.production * NEUTRAL_MARGIN_PROD_WEIGHT, |
| ) |
| else: |
| margin += min( |
| HOSTILE_MARGIN_CAP, |
| HOSTILE_MARGIN_BASE + target.production * HOSTILE_MARGIN_PROD_WEIGHT, |
| ) |
| |
| |
| |
| |
| reinforce_est = 0 |
| for ep in world.opp_planets.get(target.owner, []): |
| if ep.id == target.id: |
| continue |
| ep_aim = world.plan_shot(ep.id, target.id, max(1, int(ep.ships))) |
| if ep_aim is None: |
| continue |
| ep_eta = ep_aim[1] |
| |
| |
| |
| if ep_eta <= arrival_turns + HOSTILE_REINFORCE_HORIZON: |
| sendable = max(0, int(ep.ships) - 3) |
| reinforce_est += sendable |
| reinforce_margin = min(HOSTILE_REINFORCE_CAP, int(reinforce_est * HOSTILE_REINFORCE_RATIO)) |
| margin += reinforce_margin |
| if world.is_static(target.id): |
| margin += STATIC_TARGET_MARGIN |
| if is_contested_neutral(target, policy): |
| margin += CONTESTED_TARGET_MARGIN |
| if world.is_four_player: |
| margin += FOUR_PLAYER_TARGET_MARGIN |
| if arrival_turns > LONG_TRAVEL_MARGIN_START: |
| margin += min(LONG_TRAVEL_MARGIN_CAP, arrival_turns // LONG_TRAVEL_MARGIN_DIVISOR) |
| if target.id in world.comet_ids: |
| margin = max(0, margin - COMET_MARGIN_RELIEF) |
| if modes["is_finishing"] and target.owner not in (-1, world.player): |
| margin += FINISHING_HOSTILE_SEND_BONUS |
| if target.id in world.exposed_planet_ids: |
| margin = max(0, margin - 2) |
| |
| if world.is_four_player and world._weakest_enemy and target.owner == world._weakest_enemy: |
| margin = max(0, margin - 2) |
| return min(src_available, send + margin) |
|
|
|
|
| def apply_score_modifiers(base_score, target, mission, world): |
| score = base_score |
| if world.is_static(target.id): |
| score *= STATIC_TARGET_SCORE_MULT |
| if world.is_early and target.owner == -1 and world.is_static(target.id): |
| score *= EARLY_STATIC_NEUTRAL_SCORE_MULT |
| if world.is_four_player and target.owner == -1 and not world.is_static(target.id): |
| score *= FOUR_PLAYER_ROTATING_NEUTRAL_SCORE_MULT |
| if ( |
| len(world.static_neutral_planets) >= DENSE_STATIC_NEUTRAL_COUNT |
| and target.owner == -1 |
| and not world.is_static(target.id) |
| ): |
| score *= DENSE_ROTATING_NEUTRAL_SCORE_MULT |
| if mission == "snipe": |
| score *= SNIPE_SCORE_MULT |
| elif mission == "swarm": |
| score *= SWARM_SCORE_MULT |
| elif mission == "crash_exploit": |
| score *= CRASH_EXPLOIT_SCORE_MULT |
| if target.id in world.exposed_planet_ids: |
| score *= 1.25 |
| |
| if target.owner not in (-1, world.player) and world._weakest_enemy == target.owner: |
| score *= 1.2 |
| return score |
|
|
|
|
| def settle_plan( |
| src, |
| target, |
| src_cap, |
| send_guess, |
| world, |
| planned_commitments, |
| modes, |
| policy, |
| mission="capture", |
| eval_turn_fn=None, |
| anchor_turn=None, |
| anchor_tolerance=None, |
| max_iter=4, |
| ): |
| if src_cap < 1: |
| return None |
|
|
| seed_hint = max(1, min(src_cap, int(send_guess))) |
| eval_turn_fn = eval_turn_fn or (lambda turns: turns) |
| anchor_tolerance = ( |
| anchor_tolerance |
| if anchor_tolerance is not None |
| else (1 if mission == "snipe" else None) |
| ) |
| tested = {} |
| tested_order = [] |
|
|
| def evaluate(send): |
| send = max(1, min(src_cap, int(send))) |
| cached = tested.get(send) |
| if cached is not None or send in tested: |
| return cached |
|
|
| aim = world.plan_shot(src.id, target.id, send) |
| if aim is None: |
| tested[send] = None |
| return None |
|
|
| angle, turns, _, _ = aim |
| if mission == "crash_exploit" and anchor_turn is not None and turns < anchor_turn: |
| tested[send] = None |
| return None |
| raw_eval_turn = int(math.ceil(eval_turn_fn(turns))) |
| if raw_eval_turn < turns: |
| tested[send] = None |
| return None |
| eval_turn = raw_eval_turn |
| need = world.min_ships_to_own_by( |
| target.id, |
| eval_turn, |
| world.player, |
| arrival_turn=turns, |
| planned_commitments=planned_commitments, |
| upper_bound=src_cap, |
| ) |
| if need <= 0 or need > src_cap: |
| tested[send] = None |
| return None |
|
|
| if mission in ("snipe", "crash_exploit"): |
| desired = need |
| elif mission == "rescue": |
| desired = min( |
| src_cap, |
| max( |
| need, |
| need + DEFENSE_SEND_MARGIN_BASE + target.production * DEFENSE_SEND_MARGIN_PROD_WEIGHT, |
| ), |
| ) |
| else: |
| desired = min( |
| src_cap, |
| max(need, preferred_send(target, need, turns, src_cap, world, modes, policy)), |
| ) |
|
|
| result = (angle, turns, eval_turn, need, send, desired) |
| tested[send] = result |
| tested_order.append(send) |
| return result |
|
|
| initial_candidates = sorted( |
| world.probe_ship_candidates(src.id, target.id, src_cap, hints=(seed_hint,)), |
| key=lambda send: (abs(send - seed_hint), send), |
| ) |
|
|
| current_send = None |
| for seed in initial_candidates: |
| result = evaluate(seed) |
| if result is None: |
| continue |
| if ( |
| anchor_turn is not None |
| and anchor_tolerance is not None |
| and abs(result[1] - anchor_turn) > anchor_tolerance |
| ): |
| continue |
| current_send = seed |
| break |
|
|
| if current_send is None: |
| return None |
|
|
| for _ in range(max_iter): |
| result = evaluate(current_send) |
| if result is None: |
| break |
|
|
| angle, turns, eval_turn, need, actual_send, desired = result |
| if desired == actual_send: |
| if ( |
| anchor_turn is not None |
| and anchor_tolerance is not None |
| and abs(turns - anchor_turn) > anchor_tolerance |
| ): |
| return None |
| if mission == "rescue" and turns > eval_turn: |
| return None |
| return angle, turns, eval_turn, need, actual_send |
|
|
| next_send = max(1, min(src_cap, int(desired))) |
| if next_send in tested: |
| current_send = next_send |
| break |
| current_send = next_send |
|
|
| candidate_sends = sorted( |
| [send for send in tested_order if tested.get(send) is not None], |
| key=lambda send: ( |
| 0 if mission != "snipe" or anchor_turn is None else abs(tested[send][1] - anchor_turn), |
| abs(send - seed_hint), |
| tested[send][1], |
| send, |
| ), |
| ) |
|
|
| seen = set() |
| for send in candidate_sends: |
| if send in seen: |
| continue |
| seen.add(send) |
| result = tested.get(send) |
| if result is None: |
| continue |
| angle, turns, eval_turn, need, actual_send, _ = result |
| if actual_send < need: |
| continue |
| if ( |
| anchor_turn is not None |
| and anchor_tolerance is not None |
| and abs(turns - anchor_turn) > anchor_tolerance |
| ): |
| continue |
| if mission == "rescue" and turns > eval_turn: |
| continue |
| return angle, turns, eval_turn, need, actual_send |
|
|
| return None |
|
|
|
|
| def settle_reinforce_plan( |
| src, |
| target, |
| src_cap, |
| send_guess, |
| world, |
| planned_commitments, |
| hold_until, |
| max_arrival_turn, |
| max_iter=4, |
| ): |
| if src_cap < 1: |
| return None |
|
|
| seed_hint = max(1, min(src_cap, int(send_guess))) |
| tested = {} |
| tested_order = [] |
|
|
| def evaluate(send): |
| send = max(1, min(src_cap, int(send))) |
| cached = tested.get(send) |
| if cached is not None or send in tested: |
| return cached |
|
|
| aim = world.plan_shot(src.id, target.id, send) |
| if aim is None: |
| tested[send] = None |
| return None |
|
|
| angle, turns, _, _ = aim |
| if turns > max_arrival_turn: |
| tested[send] = None |
| return None |
|
|
| need = world.reinforcement_needed_to_hold_until( |
| target.id, |
| turns, |
| hold_until, |
| planned_commitments=planned_commitments, |
| upper_bound=src_cap, |
| ) |
| if need <= 0 or need > src_cap: |
| tested[send] = None |
| return None |
|
|
| desired = min(src_cap, need + REINFORCE_SAFETY_MARGIN) |
| result = (angle, turns, hold_until, need, send, desired) |
| tested[send] = result |
| tested_order.append(send) |
| return result |
|
|
| initial_candidates = sorted( |
| world.probe_ship_candidates(src.id, target.id, src_cap, hints=(seed_hint,)), |
| key=lambda send: (abs(send - seed_hint), send), |
| ) |
|
|
| current_send = None |
| for seed in initial_candidates: |
| result = evaluate(seed) |
| if result is None: |
| continue |
| current_send = seed |
| break |
|
|
| if current_send is None: |
| return None |
|
|
| for _ in range(max_iter): |
| result = evaluate(current_send) |
| if result is None: |
| break |
|
|
| angle, turns, eval_turn, need, actual_send, desired = result |
| if desired == actual_send: |
| return angle, turns, eval_turn, need, actual_send |
|
|
| next_send = max(1, min(src_cap, int(desired))) |
| if next_send in tested: |
| current_send = next_send |
| break |
| current_send = next_send |
|
|
| candidate_sends = sorted( |
| [send for send in tested_order if tested.get(send) is not None], |
| key=lambda send: (abs(send - seed_hint), tested[send][1], send), |
| ) |
| for send in candidate_sends: |
| result = tested.get(send) |
| if result is None: |
| continue |
| angle, turns, eval_turn, need, actual_send, _ = result |
| if actual_send < need or turns > max_arrival_turn: |
| continue |
| return angle, turns, eval_turn, need, actual_send |
|
|
| return None |
|
|
|
|
| def build_snipe_mission(src, target, src_available, world, planned_commitments, modes, policy): |
| if target.owner != -1: |
| return None |
|
|
| enemy_etas = sorted({ |
| int(math.ceil(eta)) |
| for eta, owner, ships in world.arrivals_by_planet.get(target.id, []) |
| if owner not in (-1, world.player) and ships > 0 |
| }) |
| if not enemy_etas: |
| return None |
|
|
| best = None |
| for enemy_eta in enemy_etas[:3]: |
| seeded = world.best_probe_aim( |
| src.id, |
| target.id, |
| src_available, |
| hints=(int(target.ships) + 1, int(target.ships) + 8), |
| anchor_turn=enemy_eta, |
| max_anchor_diff=1, |
| ) |
| if seeded is None: |
| continue |
|
|
| probe, rough = seeded |
| sync_turn = max(rough[1], enemy_eta) |
| if target.id in world.comet_ids: |
| life = world.comet_life(target.id) |
| if sync_turn >= life or sync_turn > COMET_MAX_CHASE_TURNS: |
| continue |
|
|
| plan = settle_plan( |
| src, |
| target, |
| src_available, |
| probe, |
| world, |
| planned_commitments, |
| modes, |
| policy, |
| mission="snipe", |
| eval_turn_fn=lambda turns, enemy_eta=enemy_eta: max(turns, enemy_eta), |
| anchor_turn=enemy_eta, |
| ) |
| if plan is None: |
| continue |
|
|
| angle, turns, sync_turn, need, send_pref = plan |
| if target.id in world.comet_ids: |
| life = world.comet_life(target.id) |
| if sync_turn >= life or sync_turn > COMET_MAX_CHASE_TURNS: |
| continue |
|
|
| value = target_value(target, sync_turn, "snipe", world, modes, policy) |
| if value <= 0: |
| continue |
|
|
| score = apply_score_modifiers( |
| value / (send_pref + sync_turn * SNIPE_COST_TURN_WEIGHT + 1.0), |
| target, |
| "snipe", |
| world, |
| ) |
| option = ShotOption( |
| score=score, |
| src_id=src.id, |
| target_id=target.id, |
| angle=angle, |
| turns=turns, |
| needed=need, |
| send_cap=send_pref, |
| mission="snipe", |
| anchor_turn=enemy_eta, |
| ) |
| mission_obj = Mission( |
| kind="snipe", |
| score=score, |
| target_id=target.id, |
| turns=sync_turn, |
| options=[option], |
| ) |
| if best is None or mission_obj.score > best.score: |
| best = mission_obj |
|
|
| return best |
|
|
|
|
| def build_rescue_missions(world, policy, planned_commitments, modes): |
| missions = [] |
|
|
| for target in world.my_planets: |
| fall_turn = world.fall_turn_map.get(target.id) |
| if fall_turn is None or fall_turn > DEFENSE_LOOKAHEAD_TURNS: |
| continue |
|
|
| for src in world.my_planets: |
| if src.id == target.id: |
| continue |
|
|
| src_available = policy["attack_budget"].get(src.id, 0) |
| if src_available < PARTIAL_SOURCE_MIN_SHIPS: |
| continue |
|
|
| seeded = world.best_probe_aim( |
| src.id, |
| target.id, |
| src_available, |
| hints=(target.production + DEFENSE_SEND_MARGIN_BASE + 2,), |
| max_turn=fall_turn, |
| ) |
| if seeded is None: |
| continue |
| probe, probe_aim = seeded |
| plan = settle_plan( |
| src, |
| target, |
| src_available, |
| probe, |
| world, |
| planned_commitments, |
| modes, |
| policy, |
| mission="rescue", |
| eval_turn_fn=lambda _turns, fall_turn=fall_turn: fall_turn, |
| anchor_turn=fall_turn, |
| ) |
| if plan is None: |
| continue |
|
|
| angle, turns, _, need, send_pref = plan |
| saved_turns = max(1, world.remaining_steps - fall_turn) |
| value = target.production * saved_turns + max(0, target.ships) * DEFENSE_SHIP_VALUE |
| if world.enemy_planets and nearest_distance_to_set(target.x, target.y, world.enemy_planets) < 22: |
| value *= DEFENSE_FRONTIER_SCORE_MULT |
| score = value / (send_pref + turns * DEFENSE_COST_TURN_WEIGHT + 1.0) |
|
|
| option = ShotOption( |
| score=score, |
| src_id=src.id, |
| target_id=target.id, |
| angle=angle, |
| turns=turns, |
| needed=need, |
| send_cap=send_pref, |
| mission="rescue", |
| anchor_turn=fall_turn, |
| ) |
| missions.append(Mission( |
| kind="rescue", |
| score=score, |
| target_id=target.id, |
| turns=fall_turn, |
| options=[option], |
| )) |
|
|
| return missions |
|
|
|
|
| def build_recapture_missions(world, policy, planned_commitments, modes): |
| missions = [] |
|
|
| for target in world.my_planets: |
| fall_turn = world.fall_turn_map.get(target.id) |
| if fall_turn is None or fall_turn > DEFENSE_LOOKAHEAD_TURNS: |
| continue |
|
|
| for src in world.my_planets: |
| if src.id == target.id: |
| continue |
|
|
| src_available = policy["attack_budget"].get(src.id, 0) |
| if src_available < PARTIAL_SOURCE_MIN_SHIPS: |
| continue |
|
|
| seeded = world.best_probe_aim( |
| src.id, |
| target.id, |
| src_available, |
| hints=(target.production + DEFENSE_SEND_MARGIN_BASE + 2,), |
| min_turn=fall_turn + 1, |
| max_turn=fall_turn + RECAPTURE_LOOKAHEAD_TURNS, |
| ) |
| if seeded is None: |
| continue |
| probe, probe_aim = seeded |
| probe_turns = probe_aim[1] |
|
|
| plan = settle_plan( |
| src, |
| target, |
| src_available, |
| probe, |
| world, |
| planned_commitments, |
| modes, |
| policy, |
| mission="capture", |
| ) |
| if plan is None: |
| continue |
|
|
| angle, turns, _, need, send_pref = plan |
| if turns <= fall_turn or turns - fall_turn > RECAPTURE_LOOKAHEAD_TURNS: |
| continue |
|
|
| saved_turns = max(1, world.remaining_steps - turns) |
| value = ( |
| RECAPTURE_PRODUCTION_WEIGHT * target.production * saved_turns |
| + RECAPTURE_IMMEDIATE_WEIGHT * max(0, target.ships) |
| ) |
| if world.enemy_planets and nearest_distance_to_set(target.x, target.y, world.enemy_planets) < 22: |
| value *= RECAPTURE_FRONTIER_MULT |
| value *= RECAPTURE_VALUE_MULT |
| score = value / (send_pref + turns * RECAPTURE_COST_TURN_WEIGHT + 1.0) |
|
|
| option = ShotOption( |
| score=score, |
| src_id=src.id, |
| target_id=target.id, |
| angle=angle, |
| turns=turns, |
| needed=need, |
| send_cap=send_pref, |
| mission="recapture", |
| anchor_turn=fall_turn, |
| ) |
| missions.append(Mission( |
| kind="recapture", |
| score=score, |
| target_id=target.id, |
| turns=turns, |
| options=[option], |
| )) |
|
|
| return missions |
|
|
|
|
| def build_reinforce_missions(world, policy, planned_commitments, modes, inventory_left_fn): |
| if not REINFORCE_ENABLED: |
| return [] |
|
|
| missions = [] |
| if world.remaining_steps < REINFORCE_MIN_FUTURE_TURNS: |
| return missions |
|
|
| for target in world.my_planets: |
| fall_turn = world.fall_turn_map.get(target.id) |
| if fall_turn is None: |
| continue |
| if target.production < REINFORCE_MIN_PRODUCTION: |
| continue |
|
|
| hold_until = min(HORIZON, fall_turn + REINFORCE_HOLD_LOOKAHEAD) |
| max_arrival_turn = min(fall_turn, REINFORCE_MAX_TRAVEL_TURNS) |
|
|
| for src in world.my_planets: |
| if src.id == target.id: |
| continue |
|
|
| budget = inventory_left_fn(src.id) |
| source_cap = min(budget, int(src.ships * REINFORCE_MAX_SOURCE_FRACTION)) |
| if source_cap < PARTIAL_SOURCE_MIN_SHIPS: |
| continue |
|
|
| seeded = world.best_probe_aim( |
| src.id, |
| target.id, |
| source_cap, |
| hints=(target.production + REINFORCE_SAFETY_MARGIN + 2,), |
| max_turn=max_arrival_turn, |
| ) |
| if seeded is None: |
| continue |
| probe, _ = seeded |
|
|
| plan = settle_reinforce_plan( |
| src, |
| target, |
| source_cap, |
| probe, |
| world, |
| planned_commitments, |
| hold_until, |
| max_arrival_turn, |
| ) |
| if plan is None: |
| continue |
|
|
| angle, turns, _, need, send_pref = plan |
| value = reinforce_value(target, hold_until, world, policy) |
| score = value / (send_pref + turns * REINFORCE_COST_TURN_WEIGHT + 1.0) |
|
|
| option = ShotOption( |
| score=score, |
| src_id=src.id, |
| target_id=target.id, |
| angle=angle, |
| turns=turns, |
| needed=need, |
| send_cap=send_pref, |
| mission="reinforce", |
| anchor_turn=hold_until, |
| ) |
| missions.append(Mission( |
| kind="reinforce", |
| score=score, |
| target_id=target.id, |
| turns=fall_turn, |
| options=[option], |
| )) |
|
|
| return missions |
|
|
|
|
| def build_crash_exploit_missions(world, policy, planned_commitments, modes): |
| if not CRASH_EXPLOIT_ENABLED or not world.is_four_player: |
| return [] |
|
|
| missions = [] |
| for crash in detect_enemy_crashes(world): |
| target = world.planet_by_id[crash["target_id"]] |
| if target.owner == world.player: |
| continue |
| desired_arrival = crash["crash_turn"] + CRASH_EXPLOIT_POST_CRASH_DELAY |
|
|
| for src in world.my_planets: |
| src_available = policy["attack_budget"].get(src.id, 0) |
| if src_available < PARTIAL_SOURCE_MIN_SHIPS: |
| continue |
|
|
| seeded = world.best_probe_aim( |
| src.id, |
| target.id, |
| src_available, |
| hints=(12, int(target.ships) + 1), |
| anchor_turn=desired_arrival, |
| max_anchor_diff=CRASH_EXPLOIT_ETA_WINDOW, |
| ) |
| if seeded is None: |
| continue |
| probe, _ = seeded |
|
|
| plan = settle_plan( |
| src, |
| target, |
| src_available, |
| probe, |
| world, |
| planned_commitments, |
| modes, |
| policy, |
| mission="crash_exploit", |
| eval_turn_fn=lambda turns, desired_arrival=desired_arrival: max(turns, desired_arrival), |
| anchor_turn=desired_arrival, |
| anchor_tolerance=CRASH_EXPLOIT_ETA_WINDOW, |
| ) |
| if plan is None: |
| continue |
|
|
| angle, turns, _, need, send_pref = plan |
| if not candidate_time_valid(target, turns, world, LATE_CAPTURE_BUFFER): |
| continue |
| value = target_value(target, turns, "crash_exploit", world, modes, policy) |
| if value <= 0: |
| continue |
|
|
| score = apply_score_modifiers( |
| value / (send_pref + turns * SNIPE_COST_TURN_WEIGHT + 1.0), |
| target, |
| "crash_exploit", |
| world, |
| ) |
| option = ShotOption( |
| score=score, |
| src_id=src.id, |
| target_id=target.id, |
| angle=angle, |
| turns=turns, |
| needed=need, |
| send_cap=send_pref, |
| mission="crash_exploit", |
| anchor_turn=desired_arrival, |
| ) |
| missions.append(Mission( |
| kind="crash_exploit", |
| score=score, |
| target_id=target.id, |
| turns=turns, |
| options=[option], |
| )) |
|
|
| return missions |
|
|
|
|
| def build_gang_up_missions(world, policy, planned_commitments, modes): |
| """ |
| Attack enemy planets that will be weakened by inter-enemy combat. |
| Schedule our fleet to arrive shortly after the battle resolves. |
| This is the key 4P tactic: use enemies fighting each other to our advantage. |
| """ |
| missions = [] |
|
|
| for battle in detect_enemy_planet_battles(world): |
| target = world.planet_by_id[battle["target_id"]] |
| if target.owner == world.player: |
| continue |
|
|
| |
| desired_arrival = battle["battle_turn"] + GANG_UP_POST_BATTLE_DELAY |
|
|
| for src in world.my_planets: |
| src_available = policy["attack_budget"].get(src.id, 0) |
| if src_available < PARTIAL_SOURCE_MIN_SHIPS: |
| continue |
|
|
| |
| post_hint = max(3, int(battle["post_ships"]) + 3) |
| seeded = world.best_probe_aim( |
| src.id, |
| target.id, |
| src_available, |
| hints=(post_hint, int(target.ships) + 1), |
| anchor_turn=desired_arrival, |
| max_anchor_diff=GANG_UP_ETA_WINDOW, |
| ) |
| if seeded is None: |
| continue |
| probe, _ = seeded |
|
|
| plan = settle_plan( |
| src, |
| target, |
| src_available, |
| probe, |
| world, |
| planned_commitments, |
| modes, |
| policy, |
| mission="capture", |
| eval_turn_fn=lambda turns, da=desired_arrival: max(turns, da), |
| anchor_turn=desired_arrival, |
| anchor_tolerance=GANG_UP_ETA_WINDOW, |
| ) |
| if plan is None: |
| continue |
|
|
| angle, turns, _, need, send_pref = plan |
| if not candidate_time_valid(target, turns, world, LATE_CAPTURE_BUFFER): |
| continue |
|
|
| value = target_value(target, turns, "capture", world, modes, policy) |
| value *= GANG_UP_VALUE_MULT |
| if value <= 0: |
| continue |
|
|
| score = apply_score_modifiers( |
| value / (send_pref + turns * ATTACK_COST_TURN_WEIGHT + 1.0), |
| target, |
| "capture", |
| world, |
| ) |
|
|
| option = ShotOption( |
| score=score, |
| src_id=src.id, |
| target_id=target.id, |
| angle=angle, |
| turns=turns, |
| needed=need, |
| send_cap=send_pref, |
| mission="capture", |
| anchor_turn=desired_arrival, |
| ) |
| missions.append(Mission( |
| kind="single", |
| score=score, |
| target_id=target.id, |
| turns=turns, |
| options=[option], |
| )) |
|
|
| return missions |
|
|
|
|
| def build_elimination_missions(world, policy, planned_commitments, modes): |
| """ |
| Dedicated missions to eliminate the weakest enemy entirely. |
| High priority in 4P β removing a player is a massive advantage. |
| Only activates when we have a clear strength advantage over the target. |
| """ |
| if world._weakest_enemy is None: |
| return [] |
|
|
| weakest = world._weakest_enemy |
| weakest_total = world._weakest_enemy_strength |
|
|
| |
| if weakest_total > world.my_total * 0.9: |
| return [] |
|
|
| |
| other_enemies = [ |
| s for owner, s in world.owner_strength.items() |
| if owner not in (world.player, weakest) |
| ] |
| if other_enemies and weakest_total > min(other_enemies) * 0.95: |
| return [] |
|
|
| weakest_planets = [p for p in world.enemy_planets if p.owner == weakest] |
| if not weakest_planets: |
| return [] |
|
|
| missions = [] |
| elimination_bonus_mult = 1.5 if world.is_four_player else 1.25 |
|
|
| for target in weakest_planets: |
| for src in world.my_planets: |
| src_available = policy["attack_budget"].get(src.id, 0) |
| if src_available < PARTIAL_SOURCE_MIN_SHIPS: |
| continue |
|
|
| seeded = world.best_probe_aim( |
| src.id, |
| target.id, |
| src_available, |
| hints=(int(target.ships) + 1, int(target.ships) + 5), |
| ) |
| if seeded is None: |
| continue |
| probe, rough_aim = seeded |
| rough_turns = rough_aim[1] |
|
|
| if not candidate_time_valid(target, rough_turns, world, LATE_CAPTURE_BUFFER): |
| continue |
|
|
| global_needed = world.min_ships_to_own_at( |
| target.id, rough_turns, world.player, |
| planned_commitments=planned_commitments, |
| ) |
| if global_needed <= 0 or global_needed > src_available: |
| continue |
|
|
| send_guess = preferred_send(target, global_needed, rough_turns, src_available, world, modes, policy) |
| plan = settle_plan( |
| src, target, src_available, send_guess, |
| world, planned_commitments, modes, policy, |
| mission="capture", |
| ) |
| if plan is None: |
| continue |
|
|
| angle, turns, _, need, send_pref = plan |
| if not candidate_time_valid(target, turns, world, LATE_CAPTURE_BUFFER): |
| continue |
| if send_pref < need: |
| continue |
|
|
| value = target_value(target, turns, "capture", world, modes, policy) |
| if value <= 0: |
| continue |
|
|
| score = apply_score_modifiers( |
| value * elimination_bonus_mult / (send_pref + turns * ATTACK_COST_TURN_WEIGHT + 1.0), |
| target, "capture", world, |
| ) |
|
|
| option = ShotOption( |
| score=score, src_id=src.id, target_id=target.id, |
| angle=angle, turns=turns, needed=need, send_cap=send_pref, |
| mission="capture", |
| ) |
| missions.append(Mission( |
| kind="single", score=score, target_id=target.id, |
| turns=turns, options=[option], |
| )) |
|
|
| return missions |
|
|
|
|
| def plan_moves(world, deadline=None): |
| def expired(): |
| return deadline is not None and time.perf_counter() > deadline |
|
|
| def time_left(): |
| if deadline is None: |
| return 10**9 |
| return deadline - time.perf_counter() |
|
|
| def allow_heavy_phase(): |
| return time_left() > HEAVY_PHASE_MIN_TIME and len(world.planets) <= HEAVY_ROUTE_PLANET_LIMIT |
|
|
| def allow_optional_phase(): |
| return time_left() > OPTIONAL_PHASE_MIN_TIME |
|
|
| modes = build_modes(world) |
| policy = build_policy_state(world, deadline=deadline) |
| planned_commitments = defaultdict(list) |
| source_options_by_target = defaultdict(list) |
| missions = [] |
| moves = [] |
| spent_total = defaultdict(int) |
|
|
| def source_inventory_left(source_id): |
| return world.source_inventory_left(source_id, spent_total) |
|
|
| def source_attack_left(source_id): |
| budget = policy["attack_budget"].get(source_id, 0) |
| return max(0, budget - spent_total[source_id]) |
|
|
| def append_move(src_id, angle, ships): |
| send = min(int(ships), source_inventory_left(src_id)) |
| if send < 1: |
| return 0 |
| moves.append([src_id, float(angle), int(send)]) |
| spent_total[src_id] += send |
| return send |
|
|
| def finalize_moves(): |
| final_moves = [] |
| used_final = defaultdict(int) |
| for src_id, angle, ships in moves: |
| source = world.planet_by_id[src_id] |
| max_allowed = int(source.ships) - used_final[src_id] |
| send = min(int(ships), max_allowed) |
| if send >= 1: |
| final_moves.append([src_id, float(angle), int(send)]) |
| used_final[src_id] += send |
| return final_moves |
|
|
| def compute_live_doomed(): |
| doomed = set() |
| for planet in world.my_planets: |
| status = world.hold_status( |
| planet.id, |
| planned_commitments=planned_commitments, |
| horizon=DOOMED_EVAC_TURN_LIMIT, |
| ) |
| if ( |
| not status["holds_full"] |
| and status["fall_turn"] is not None |
| and status["fall_turn"] <= DOOMED_EVAC_TURN_LIMIT |
| and source_inventory_left(planet.id) >= DOOMED_MIN_SHIPS |
| ): |
| doomed.add(planet.id) |
| return doomed |
|
|
| def time_filters_pass(target, turns, needed, src_cap): |
| if not candidate_time_valid(target, turns, world, VERY_LATE_CAPTURE_BUFFER if world.is_very_late else LATE_CAPTURE_BUFFER): |
| return False |
| if opening_filter(target, turns, needed, src_cap, world, policy): |
| return False |
| return True |
|
|
| |
|
|
| if allow_heavy_phase(): |
| missions.extend( |
| build_reinforce_missions(world, policy, planned_commitments, modes, source_inventory_left) |
| ) |
|
|
| missions.extend(build_rescue_missions(world, policy, planned_commitments, modes)) |
| missions.extend(build_recapture_missions(world, policy, planned_commitments, modes)) |
|
|
| |
| if allow_heavy_phase(): |
| missions.extend(build_elimination_missions(world, policy, planned_commitments, modes)) |
|
|
| |
| if allow_heavy_phase(): |
| missions.extend(build_gang_up_missions(world, policy, planned_commitments, modes)) |
|
|
| for src in world.my_planets: |
| if expired(): |
| return finalize_moves() |
| src_available = source_attack_left(src.id) |
| if src_available <= 0: |
| continue |
|
|
| for target in world.planets: |
| if expired(): |
| return finalize_moves() |
| if target.id == src.id or target.owner == world.player: |
| continue |
|
|
| seeded = world.best_probe_aim( |
| src.id, |
| target.id, |
| src_available, |
| hints=(int(target.ships) + 1,), |
| ) |
| if seeded is None: |
| continue |
| _, rough_aim = seeded |
|
|
| rough_turns = rough_aim[1] |
| if not candidate_time_valid( |
| target, |
| rough_turns, |
| world, |
| VERY_LATE_CAPTURE_BUFFER if world.is_very_late else LATE_CAPTURE_BUFFER, |
| ): |
| continue |
|
|
| global_needed = world.min_ships_to_own_at( |
| target.id, |
| rough_turns, |
| world.player, |
| planned_commitments=planned_commitments, |
| ) |
| if global_needed <= 0: |
| continue |
| if opening_filter(target, rough_turns, global_needed, src_available, world, policy): |
| continue |
|
|
| partial_send_cap = min( |
| src_available, |
| preferred_send(target, global_needed, rough_turns, src_available, world, modes, policy), |
| ) |
| if partial_send_cap >= PARTIAL_SOURCE_MIN_SHIPS: |
| partial_seed = world.best_probe_aim( |
| src.id, |
| target.id, |
| partial_send_cap, |
| hints=(partial_send_cap, global_needed, int(target.ships) + 1), |
| ) |
| if partial_seed is not None: |
| _, partial_aim = partial_seed |
| p_angle, p_turns, _, _ = partial_aim |
| if time_filters_pass(target, p_turns, global_needed, src_available): |
| partial_value = target_value(target, p_turns, "swarm", world, modes, policy) |
| if partial_value > 0: |
| partial_score = apply_score_modifiers( |
| partial_value / (partial_send_cap + p_turns * ATTACK_COST_TURN_WEIGHT + 1.0), |
| target, |
| "swarm", |
| world, |
| ) |
| source_options_by_target[target.id].append( |
| ShotOption( |
| score=partial_score, |
| src_id=src.id, |
| target_id=target.id, |
| angle=p_angle, |
| turns=p_turns, |
| needed=global_needed, |
| send_cap=partial_send_cap, |
| mission="swarm", |
| ) |
| ) |
|
|
| if global_needed <= src_available: |
| send_guess = preferred_send( |
| target, global_needed, rough_turns, src_available, world, modes, policy, |
| ) |
| plan = settle_plan( |
| src, |
| target, |
| src_available, |
| send_guess, |
| world, |
| planned_commitments, |
| modes, |
| policy, |
| mission="capture", |
| ) |
| if plan is None: |
| continue |
|
|
| angle, turns, _, needed, send_cap = plan |
| if not time_filters_pass(target, turns, needed, src_available): |
| continue |
| if send_cap < 1: |
| continue |
|
|
| value = target_value(target, turns, "capture", world, modes, policy) |
| if value <= 0: |
| continue |
|
|
| score = apply_score_modifiers( |
| value / (send_cap + turns * ATTACK_COST_TURN_WEIGHT + 1.0), |
| target, |
| "capture", |
| world, |
| ) |
|
|
| option = ShotOption( |
| score=score, |
| src_id=src.id, |
| target_id=target.id, |
| angle=angle, |
| turns=turns, |
| needed=needed, |
| send_cap=send_cap, |
| mission="capture", |
| ) |
|
|
| if send_cap >= needed: |
| missions.append(Mission( |
| kind="single", |
| score=score, |
| target_id=target.id, |
| turns=turns, |
| options=[option], |
| )) |
|
|
| snipe = build_snipe_mission(src, target, src_available, world, planned_commitments, modes, policy) |
| if snipe is not None: |
| missions.append(snipe) |
|
|
| for target_id, options in source_options_by_target.items(): |
| if expired(): |
| return finalize_moves() |
| if len(options) < 2: |
| continue |
|
|
| target = world.planet_by_id[target_id] |
| top_options = sorted(options, key=lambda item: -item.score)[:MULTI_SOURCE_TOP_K] |
| for i in range(len(top_options)): |
| for j in range(i + 1, len(top_options)): |
| first = top_options[i] |
| second = top_options[j] |
| if first.src_id == second.src_id: |
| continue |
| pair_tol = swarm_eta_tolerance((first, second), target, world) |
| if abs(first.turns - second.turns) > pair_tol: |
| continue |
|
|
| joint_turn = max(first.turns, second.turns) |
| total_cap = first.send_cap + second.send_cap |
| need = world.min_ships_to_own_at( |
| target_id, |
| joint_turn, |
| world.player, |
| planned_commitments=planned_commitments, |
| upper_bound=total_cap, |
| ) |
| if need <= 0: |
| continue |
| if first.send_cap >= need or second.send_cap >= need: |
| continue |
| if total_cap < need: |
| continue |
|
|
| value = target_value(target, joint_turn, "swarm", world, modes, policy) |
| if value <= 0: |
| continue |
|
|
| pair_score = apply_score_modifiers( |
| value / (need + joint_turn * ATTACK_COST_TURN_WEIGHT + 1.0), |
| target, |
| "swarm", |
| world, |
| ) |
| pair_score *= MULTI_SOURCE_PLAN_PENALTY |
| missions.append(Mission( |
| kind="swarm", |
| score=pair_score, |
| target_id=target_id, |
| turns=joint_turn, |
| options=[first, second], |
| )) |
|
|
| if ( |
| THREE_SOURCE_SWARM_ENABLED |
| and allow_heavy_phase() |
| and target.owner not in (-1, world.player) |
| and int(target.ships) >= THREE_SOURCE_MIN_TARGET_SHIPS |
| and len(top_options) >= 3 |
| ): |
| for i in range(len(top_options)): |
| for j in range(i + 1, len(top_options)): |
| for k in range(j + 1, len(top_options)): |
| if expired(): |
| return finalize_moves() |
| trio = [top_options[i], top_options[j], top_options[k]] |
| if len({option.src_id for option in trio}) < 3: |
| continue |
| trio_tol = swarm_eta_tolerance(tuple(trio), target, world) |
| turns = [option.turns for option in trio] |
| if max(turns) - min(turns) > trio_tol: |
| continue |
|
|
| joint_turn = max(turns) |
| total_cap = sum(option.send_cap for option in trio) |
| need = world.min_ships_to_own_at( |
| target_id, |
| joint_turn, |
| world.player, |
| planned_commitments=planned_commitments, |
| upper_bound=total_cap, |
| ) |
| if need <= 0 or total_cap < need: |
| continue |
| if any( |
| trio[a].send_cap + trio[b].send_cap >= need |
| for a in range(3) |
| for b in range(a + 1, 3) |
| ): |
| continue |
|
|
| value = target_value(target, joint_turn, "swarm", world, modes, policy) |
| if value <= 0: |
| continue |
|
|
| trio_score = apply_score_modifiers( |
| value / (need + joint_turn * ATTACK_COST_TURN_WEIGHT + 1.0), |
| target, |
| "swarm", |
| world, |
| ) |
| trio_score *= THREE_SOURCE_PLAN_PENALTY |
| missions.append(Mission( |
| kind="swarm", |
| score=trio_score, |
| target_id=target_id, |
| turns=joint_turn, |
| options=trio, |
| )) |
|
|
| |
| if ( |
| FOUR_SOURCE_SWARM_ENABLED |
| and allow_heavy_phase() |
| and target.owner not in (-1, world.player) |
| and int(target.ships) >= FOUR_SOURCE_MIN_TARGET_SHIPS |
| and len(top_options) >= 4 |
| ): |
| for i in range(len(top_options)): |
| for j in range(i + 1, len(top_options)): |
| for k in range(j + 1, len(top_options)): |
| for l in range(k + 1, len(top_options)): |
| if expired(): |
| return finalize_moves() |
| quad = [top_options[i], top_options[j], top_options[k], top_options[l]] |
| if len({option.src_id for option in quad}) < 4: |
| continue |
| turns_list = [option.turns for option in quad] |
| if max(turns_list) - min(turns_list) > FOUR_SOURCE_ETA_TOLERANCE: |
| continue |
|
|
| joint_turn = max(turns_list) |
| total_cap = sum(option.send_cap for option in quad) |
| need = world.min_ships_to_own_at( |
| target_id, |
| joint_turn, |
| world.player, |
| planned_commitments=planned_commitments, |
| upper_bound=total_cap, |
| ) |
| if need <= 0 or total_cap < need: |
| continue |
| |
| if any( |
| sum(quad[idx].send_cap for idx in combo) >= need |
| for combo in combinations(range(4), 3) |
| ): |
| continue |
|
|
| value = target_value(target, joint_turn, "swarm", world, modes, policy) |
| if value <= 0: |
| continue |
|
|
| quad_score = apply_score_modifiers( |
| value / (need + joint_turn * ATTACK_COST_TURN_WEIGHT + 1.0), |
| target, |
| "swarm", |
| world, |
| ) |
| quad_score *= FOUR_SOURCE_PLAN_PENALTY |
| missions.append(Mission( |
| kind="swarm", |
| score=quad_score, |
| target_id=target_id, |
| turns=joint_turn, |
| options=quad, |
| )) |
|
|
| if allow_heavy_phase(): |
| missions.extend(build_crash_exploit_missions(world, policy, planned_commitments, modes)) |
|
|
| missions.sort(key=lambda item: -item.score) |
|
|
| |
|
|
| for mission in missions: |
| if expired(): |
| return finalize_moves() |
| target = world.planet_by_id[mission.target_id] |
|
|
| if mission.kind in ("single", "snipe", "rescue", "recapture", "reinforce", "crash_exploit"): |
| option = mission.options[0] |
| src = world.planet_by_id[option.src_id] |
| if mission.kind == "reinforce": |
| left = min( |
| source_inventory_left(option.src_id), |
| int(src.ships * REINFORCE_MAX_SOURCE_FRACTION), |
| ) |
| else: |
| left = source_attack_left(option.src_id) |
| if left <= 0: |
| continue |
|
|
| if mission.kind == "reinforce": |
| plan = settle_reinforce_plan( |
| src, |
| target, |
| left, |
| min(left, option.send_cap), |
| world, |
| planned_commitments, |
| option.anchor_turn, |
| mission.turns, |
| ) |
| elif mission.kind == "rescue": |
| plan = settle_plan( |
| src, |
| target, |
| left, |
| min(left, option.send_cap), |
| world, |
| planned_commitments, |
| modes, |
| policy, |
| mission="rescue", |
| eval_turn_fn=lambda _turns, hold_turn=mission.turns: hold_turn, |
| anchor_turn=option.anchor_turn, |
| ) |
| elif mission.kind == "snipe": |
| plan = settle_plan( |
| src, |
| target, |
| left, |
| min(left, option.send_cap), |
| world, |
| planned_commitments, |
| modes, |
| policy, |
| mission="snipe", |
| eval_turn_fn=lambda turns, enemy_eta=option.anchor_turn: max(turns, enemy_eta), |
| anchor_turn=option.anchor_turn, |
| ) |
| elif mission.kind == "crash_exploit": |
| plan = settle_plan( |
| src, |
| target, |
| left, |
| min(left, option.send_cap), |
| world, |
| planned_commitments, |
| modes, |
| policy, |
| mission="crash_exploit", |
| eval_turn_fn=lambda turns, desired_arrival=option.anchor_turn: max(turns, desired_arrival), |
| anchor_turn=option.anchor_turn, |
| anchor_tolerance=CRASH_EXPLOIT_ETA_WINDOW, |
| ) |
| else: |
| plan = settle_plan( |
| src, |
| target, |
| left, |
| min(left, option.send_cap), |
| world, |
| planned_commitments, |
| modes, |
| policy, |
| mission="capture", |
| ) |
| if plan is None: |
| continue |
|
|
| angle, turns, _, need, send = plan |
| if send < need or need > left: |
| continue |
|
|
| sent = append_move(option.src_id, angle, send) |
| if sent < need: |
| continue |
| planned_commitments[target.id].append((turns, world.player, int(sent))) |
| continue |
|
|
| limits = [] |
| for option in mission.options: |
| left = source_attack_left(option.src_id) |
| limits.append(min(left, option.send_cap)) |
| if min(limits) <= 0: |
| continue |
|
|
| missing = world.min_ships_to_own_at( |
| target.id, |
| mission.turns, |
| world.player, |
| planned_commitments=planned_commitments, |
| upper_bound=sum(limits), |
| ) |
| if missing <= 0 or sum(limits) < missing: |
| continue |
|
|
| ordered = sorted( |
| zip(mission.options, limits), |
| key=lambda item: (item[0].turns, -item[1], item[0].src_id), |
| ) |
| remaining = missing |
| sends = {} |
| for idx, (option, limit) in enumerate(ordered): |
| remaining_other = sum(other_limit for _, other_limit in ordered[idx + 1:]) |
| send = min(limit, max(0, remaining - remaining_other)) |
| sends[option.src_id] = send |
| remaining -= send |
| if remaining > 0: |
| continue |
|
|
| reaimed = [] |
| for option, _ in ordered: |
| send = sends.get(option.src_id, 0) |
| if send <= 0: |
| continue |
| src = world.planet_by_id[option.src_id] |
| fixed_aim = world.plan_shot(src.id, target.id, send) |
| if fixed_aim is None: |
| reaimed = [] |
| break |
| angle, turns, _, _ = fixed_aim |
| reaimed.append((option.src_id, angle, turns, send)) |
| if not reaimed: |
| continue |
|
|
| turns_only = [item[2] for item in reaimed] |
| eta_tol = swarm_eta_tolerance(mission.options, target, world) |
| if max(turns_only) - min(turns_only) > eta_tol: |
| continue |
|
|
| actual_joint_turn = max(turns_only) |
| owner_after, _ = world.projected_state( |
| target.id, |
| actual_joint_turn, |
| planned_commitments=planned_commitments, |
| extra_arrivals=[(turns, world.player, send) for _, _, turns, send in reaimed], |
| ) |
| if owner_after != world.player: |
| continue |
|
|
| committed = [] |
| for src_id, angle, turns, send in reaimed: |
| actual = append_move(src_id, angle, send) |
| if actual <= 0: |
| continue |
| committed.append((turns, world.player, int(actual))) |
| if sum(item[2] for item in committed) < missing: |
| continue |
| planned_commitments[target.id].extend(committed) |
|
|
| |
|
|
| if not world.is_very_late and allow_optional_phase(): |
| for src in world.my_planets: |
| if expired(): |
| return finalize_moves() |
| src_left = source_attack_left(src.id) |
| if src_left < FOLLOWUP_MIN_SHIPS: |
| continue |
|
|
| best = None |
| for target in world.planets: |
| if expired(): |
| return finalize_moves() |
| if target.id == src.id or target.owner == world.player: |
| continue |
| if target.id in world.comet_ids and target.production <= LOW_VALUE_COMET_PRODUCTION: |
| continue |
|
|
| seeded = world.best_probe_aim( |
| src.id, |
| target.id, |
| src_left, |
| hints=(int(target.ships) + 1,), |
| ) |
| if seeded is None: |
| continue |
| rough_ships, rough_aim = seeded |
|
|
| est_turns = rough_aim[1] |
| if world.is_late and est_turns > world.remaining_steps - LATE_CAPTURE_BUFFER: |
| continue |
|
|
| rough_needed = world.min_ships_to_own_at( |
| target.id, |
| est_turns, |
| world.player, |
| planned_commitments=planned_commitments, |
| upper_bound=src_left, |
| ) |
| if rough_needed <= 0 or rough_needed > src_left: |
| continue |
| if opening_filter(target, est_turns, rough_needed, src_left, world, policy): |
| continue |
|
|
| send = preferred_send(target, rough_needed, est_turns, src_left, world, modes, policy) |
| if send < rough_needed: |
| continue |
|
|
| plan = settle_plan( |
| src, |
| target, |
| src_left, |
| send, |
| world, |
| planned_commitments, |
| modes, |
| policy, |
| mission="capture", |
| ) |
| if plan is None: |
| continue |
|
|
| _, turns, _, need, final_send = plan |
| if world.is_late and turns > world.remaining_steps - LATE_CAPTURE_BUFFER: |
| continue |
| if final_send < need: |
| continue |
|
|
| value = target_value(target, turns, "capture", world, modes, policy) |
| if value <= 0: |
| continue |
|
|
| score = apply_score_modifiers( |
| value / (final_send + turns * ATTACK_COST_TURN_WEIGHT + 1.0), |
| target, |
| "capture", |
| world, |
| ) |
| if best is None or score > best[0]: |
| best = (score, target, plan) |
|
|
| if best is None: |
| continue |
|
|
| _, target, plan = best |
| angle, turns, _, need, send = plan |
| src_left = source_attack_left(src.id) |
| if need > src_left: |
| continue |
|
|
| plan = settle_plan( |
| src, |
| target, |
| src_left, |
| min(src_left, send), |
| world, |
| planned_commitments, |
| modes, |
| policy, |
| mission="capture", |
| ) |
| if plan is None: |
| continue |
|
|
| angle, turns, _, need, send = plan |
| if send < need: |
| continue |
|
|
| actual = append_move(src.id, angle, send) |
| if actual < need: |
| continue |
| planned_commitments[target.id].append((turns, world.player, int(actual))) |
|
|
| |
|
|
| if expired(): |
| return finalize_moves() |
| live_doomed = compute_live_doomed() |
| if live_doomed: |
| frontier_targets = ( |
| world.enemy_planets |
| if world.enemy_planets |
| else (world.static_neutral_planets or world.neutral_planets) |
| ) |
| if frontier_targets: |
| frontier_distance = { |
| planet.id: nearest_distance_to_set(planet.x, planet.y, frontier_targets) |
| for planet in world.my_planets |
| } |
| else: |
| frontier_distance = {planet.id: 10**9 for planet in world.my_planets} |
|
|
| for planet in world.my_planets: |
| if expired(): |
| return finalize_moves() |
| if planet.id not in live_doomed: |
| continue |
|
|
| available_now = source_inventory_left(planet.id) |
| if available_now < policy["reserve"].get(planet.id, 0): |
| continue |
|
|
| best_capture = None |
| for target in world.planets: |
| if expired(): |
| return finalize_moves() |
| if target.id == planet.id or target.owner == world.player: |
| continue |
| seeded = world.best_probe_aim( |
| planet.id, |
| target.id, |
| available_now, |
| hints=(available_now, int(target.ships) + 1), |
| ) |
| if seeded is None: |
| continue |
| _, probe_aim = seeded |
| probe_turns = probe_aim[1] |
| if probe_turns > world.remaining_steps - 2: |
| continue |
|
|
| need = world.min_ships_to_own_at( |
| target.id, |
| probe_turns, |
| world.player, |
| planned_commitments=planned_commitments, |
| upper_bound=available_now, |
| ) |
| if need <= 0 or need > available_now: |
| continue |
|
|
| plan = settle_plan( |
| planet, |
| target, |
| available_now, |
| min(available_now, max(need, int(target.ships) + 1)), |
| world, |
| planned_commitments, |
| modes, |
| policy, |
| mission="capture", |
| ) |
| if plan is None: |
| continue |
| angle, turns, _, plan_need, send = plan |
| if send < plan_need: |
| continue |
| score = target_value(target, turns, "capture", world, modes, policy) / (send + turns + 1.0) |
| if target.owner not in (-1, world.player): |
| score *= 1.05 |
| if best_capture is None or score > best_capture[0]: |
| best_capture = (score, target.id, angle, turns, send) |
|
|
| if best_capture is not None: |
| _, target_id, angle, turns, need = best_capture |
| actual = append_move(planet.id, angle, need) |
| if actual >= 1: |
| planned_commitments[target_id].append((turns, world.player, int(actual))) |
| continue |
|
|
| safe_allies = [ |
| ally |
| for ally in world.my_planets |
| if ally.id != planet.id and ally.id not in live_doomed |
| ] |
| if not safe_allies: |
| continue |
|
|
| retreat_target = min( |
| safe_allies, |
| key=lambda ally: ( |
| frontier_distance.get(ally.id, 10**9), |
| planet_distance(planet, ally), |
| ), |
| ) |
| aim = world.plan_shot(planet.id, retreat_target.id, available_now) |
| if aim is None: |
| continue |
| angle, _, _, _ = aim |
| append_move(planet.id, angle, available_now) |
|
|
| |
|
|
| if ( |
| (world.enemy_planets or world.neutral_planets) |
| and len(world.my_planets) > 1 |
| and not world.is_late |
| and allow_optional_phase() |
| ): |
| live_doomed = compute_live_doomed() |
| frontier_targets = ( |
| world.enemy_planets |
| if world.enemy_planets |
| else (world.static_neutral_planets or world.neutral_planets) |
| ) |
| frontier_distance = { |
| planet.id: nearest_distance_to_set(planet.x, planet.y, frontier_targets) |
| for planet in world.my_planets |
| } |
| safe_fronts = [planet for planet in world.my_planets if planet.id not in live_doomed] |
| if safe_fronts: |
| front_anchor = min(safe_fronts, key=lambda planet: frontier_distance[planet.id]) |
| send_ratio = REAR_SEND_RATIO_FOUR_PLAYER if world.is_four_player else REAR_SEND_RATIO_TWO_PLAYER |
| if modes["is_finishing"]: |
| send_ratio = max(send_ratio, REAR_SEND_RATIO_FOUR_PLAYER) |
|
|
| for rear in sorted(world.my_planets, key=lambda planet: -frontier_distance[planet.id]): |
| if expired(): |
| return finalize_moves() |
| if rear.id == front_anchor.id or rear.id in live_doomed: |
| continue |
| if source_attack_left(rear.id) < REAR_SOURCE_MIN_SHIPS: |
| continue |
| if frontier_distance[rear.id] < frontier_distance[front_anchor.id] * REAR_DISTANCE_RATIO: |
| continue |
|
|
| stage_candidates = [ |
| planet |
| for planet in safe_fronts |
| if planet.id != rear.id |
| and frontier_distance[planet.id] < frontier_distance[rear.id] * REAR_STAGE_PROGRESS |
| ] |
| if stage_candidates: |
| front = min(stage_candidates, key=lambda planet: planet_distance(rear, planet)) |
| else: |
| objective = min(frontier_targets, key=lambda target: planet_distance(rear, target)) |
| remaining_fronts = [planet for planet in safe_fronts if planet.id != rear.id] |
| if not remaining_fronts: |
| continue |
| front = min(remaining_fronts, key=lambda planet: planet_distance(planet, objective)) |
|
|
| if front.id == rear.id: |
| continue |
|
|
| send = int(source_attack_left(rear.id) * send_ratio) |
| if send < REAR_SEND_MIN_SHIPS: |
| continue |
|
|
| aim = world.plan_shot(rear.id, front.id, send) |
| if aim is None: |
| continue |
|
|
| angle, turns, _, _ = aim |
| if turns > REAR_MAX_TRAVEL_TURNS: |
| continue |
| append_move(rear.id, angle, send) |
|
|
| |
|
|
| if world.is_total_war and world.enemy_planets and allow_optional_phase(): |
| |
| if world._weakest_enemy is not None: |
| weakest_planets = [p for p in world.enemy_planets if p.owner == world._weakest_enemy] |
| if weakest_planets: |
| primary_targets = weakest_planets |
| else: |
| primary_targets = world.enemy_planets |
| else: |
| primary_targets = world.enemy_planets |
|
|
| |
| for target_set in [primary_targets, world.enemy_planets]: |
| if not target_set: |
| continue |
| for src in world.my_planets: |
| if expired(): |
| return finalize_moves() |
| atk_left = source_attack_left(src.id) |
| if atk_left < 5: |
| continue |
|
|
| |
| best_target = None |
| best_dist = float('inf') |
| for ep in target_set: |
| d = planet_distance(src, ep) |
| if d < best_dist: |
| aim_test = world.plan_shot(src.id, ep.id, atk_left) |
| if aim_test is not None: |
| best_dist = d |
| best_target = ep |
|
|
| if best_target is None: |
| continue |
| aim = world.plan_shot(src.id, best_target.id, atk_left) |
| if aim is None: |
| continue |
| angle, turns, _, _ = aim |
| if turns >= world.remaining_steps: |
| continue |
| append_move(src.id, angle, atk_left) |
| break |
|
|
| return finalize_moves() |
|
|
| |
| |
| |
|
|
| _agent_step = 0 |
|
|
|
|
| |
| |
| |
|
|
| class _OpponentProfilerImpl: |
| """Tracks opponent behavior during a match for real-time adaptation. |
| |
| Maintains exponential moving averages of: |
| - Aggression: fleet launch rate (how often they attack) |
| - Expansion: planet capture speed |
| - Strength balance: whether we're ahead or behind |
| """ |
| |
| def __init__(self): |
| self.alpha = 0.08 |
| self.aggression = 0.5 |
| self.expansion_rate = 0.5 |
| self.prev_enemy_planets = 0 |
| self.prev_enemy_fleets = 0 |
| self.prev_my_ships = 0 |
| self.prev_enemy_ships = 0 |
| self.my_planet_count = 0 |
| self.enemy_planet_count = 0 |
| self.step_count = 0 |
| self.current_step = 0 |
| |
| def update(self, obs): |
| get = obs.get if isinstance(obs, dict) else lambda k, d=None: getattr(obs, k, d) |
| player = int(get("player", 0) or 0) |
| step = int(get("step", 0) or 0) |
| planets = get("planets") or [] |
| fleets = get("fleets") or [] |
| |
| self.current_step = step |
| |
| enemy_planets = sum(1 for p in planets if p[1] not in (-1, player)) |
| enemy_fleets = sum(1 for f in fleets if f[1] != player) |
| my_ships = sum(p[5] for p in planets if p[1] == player) |
| my_ships += sum(f[6] for f in fleets if f[1] == player) |
| enemy_ships = sum(p[5] for p in planets if p[1] not in (-1, player)) |
| enemy_ships += sum(f[6] for f in fleets if f[1] != player) |
| |
| self.my_planet_count = sum(1 for p in planets if p[1] == player) |
| self.enemy_planet_count = enemy_planets |
| |
| if self.step_count > 0: |
| |
| fleet_delta = max(0, enemy_fleets - self.prev_enemy_fleets) |
| self.aggression = (1 - self.alpha) * self.aggression + self.alpha * min(1.0, fleet_delta / 4.0) |
| |
| |
| planet_delta = enemy_planets - self.prev_enemy_planets |
| expansion_signal = max(0.0, min(1.0, (planet_delta + 1) / 3.0)) |
| self.expansion_rate = (1 - self.alpha) * self.expansion_rate + self.alpha * expansion_signal |
| |
| self.prev_enemy_planets = enemy_planets |
| self.prev_enemy_fleets = enemy_fleets |
| self.prev_my_ships = my_ships |
| self.prev_enemy_ships = enemy_ships |
| self.step_count += 1 |
| |
| def get_profile(self): |
| total = self.prev_my_ships + self.prev_enemy_ships |
| my_share = self.prev_my_ships / max(1, total) |
| |
| return { |
| "aggression": self.aggression, |
| "expansion_rate": self.expansion_rate, |
| "we_are_ahead": my_share > 0.55, |
| "we_are_behind": my_share < 0.42, |
| "enemy_expanding_fast": self.expansion_rate > 0.6, |
| "late_game": self.current_step > 350, |
| "confident": self.step_count > 30, |
| } |
|
|
|
|
| _profiler = _OpponentProfilerImpl() |
|
|
|
|
| def _read(obs, key, default=None): |
| if isinstance(obs, dict): |
| return obs.get(key, default) |
| return getattr(obs, key, default) |
|
|
|
|
| def build_world(obs, inferred_step=None): |
| player = _read(obs, "player", 0) |
| obs_step = _read(obs, "step", 0) or 0 |
| step = max(obs_step, inferred_step or 0) |
| raw_planets = _read(obs, "planets", []) or [] |
| raw_fleets = _read(obs, "fleets", []) or [] |
| ang_vel = _read(obs, "angular_velocity", 0.0) or 0.0 |
| raw_init = _read(obs, "initial_planets", []) or [] |
| comets = _read(obs, "comets", []) or [] |
| comet_ids = set(_read(obs, "comet_planet_ids", []) or []) |
|
|
| planets = [Planet(*planet) for planet in raw_planets] |
| fleets = [Fleet(*fleet) for fleet in raw_fleets] |
| initial_planets = [Planet(*planet) for planet in raw_init] |
| initial_by_id = {planet.id: planet for planet in initial_planets} |
|
|
| return WorldModel( |
| player=player, |
| step=step, |
| planets=planets, |
| fleets=fleets, |
| initial_by_id=initial_by_id, |
| ang_vel=ang_vel, |
| comets=comets, |
| comet_ids=comet_ids, |
| ) |
|
|
|
|
| def agent(obs, config=None): |
| global _agent_step |
| global HOSTILE_TARGET_VALUE_MULT, ELIMINATION_BONUS, PROACTIVE_DEFENSE_RATIO |
| global FINISHING_HOSTILE_VALUE_MULT, WEAK_ENEMY_THRESHOLD, ATTACK_COST_TURN_WEIGHT |
| global HOSTILE_MARGIN_BASE, FOUR_PLAYER_TARGET_MARGIN, FINISHING_HOSTILE_SEND_BONUS |
| global STATIC_HOSTILE_VALUE_MULT, GANG_UP_VALUE_MULT, EXPOSED_PLANET_VALUE_MULT |
| global REINFORCE_VALUE_MULT, DEFENSE_SHIP_VALUE, BEHIND_DOMINATION, AHEAD_DOMINATION |
| global LATE_REMAINING_TURNS, REAR_SEND_RATIO_TWO_PLAYER, COMET_VALUE_MULT, SNIPE_VALUE_MULT |
| |
| _agent_step += 1 |
| start_time = time.perf_counter() |
| |
| |
| _profiler.update(obs) |
| |
| if _agent_step > 20: |
| profile = _profiler.get_profile() |
| |
| |
| if profile["aggression"] > 0.6: |
| |
| PROACTIVE_DEFENSE_RATIO = 0.38 |
| DEFENSE_SHIP_VALUE = 0.70 |
| HOSTILE_TARGET_VALUE_MULT = 1.80 |
| REINFORCE_VALUE_MULT = 1.55 |
| BEHIND_DOMINATION = -0.30 |
| elif profile["aggression"] < 0.3: |
| |
| PROACTIVE_DEFENSE_RATIO = 0.15 |
| DEFENSE_SHIP_VALUE = 0.40 |
| HOSTILE_TARGET_VALUE_MULT = 2.30 |
| ELIMINATION_BONUS = 70.0 |
| EXPOSED_PLANET_VALUE_MULT = 2.5 |
| GANG_UP_VALUE_MULT = 1.6 |
| else: |
| |
| PROACTIVE_DEFENSE_RATIO = 0.28 |
| DEFENSE_SHIP_VALUE = 0.55 |
| HOSTILE_TARGET_VALUE_MULT = 2.05 |
| ELIMINATION_BONUS = 55.0 |
| EXPOSED_PLANET_VALUE_MULT = 2.0 |
| GANG_UP_VALUE_MULT = 1.4 |
| |
| |
| if profile["we_are_ahead"]: |
| |
| ATTACK_COST_TURN_WEIGHT = 0.55 |
| BEHIND_DOMINATION = -0.15 |
| AHEAD_DOMINATION = 0.20 |
| REAR_SEND_RATIO_TWO_PLAYER = 0.55 |
| elif profile["we_are_behind"]: |
| |
| ATTACK_COST_TURN_WEIGHT = 0.40 |
| BEHIND_DOMINATION = -0.30 |
| WEAK_ENEMY_THRESHOLD = 140 |
| FINISHING_HOSTILE_VALUE_MULT = 1.5 |
| FINISHING_HOSTILE_SEND_BONUS = 7 |
| SNIPE_VALUE_MULT = 1.3 |
| |
| |
| if profile["enemy_expanding_fast"]: |
| |
| COMET_VALUE_MULT = 0.85 |
| STATIC_HOSTILE_VALUE_MULT = 1.85 |
| FOUR_PLAYER_TARGET_MARGIN = 1 |
| |
| |
| if profile["late_game"]: |
| LATE_REMAINING_TURNS = 80 |
| ELIMINATION_BONUS = max(ELIMINATION_BONUS, 65.0) |
| FINISHING_HOSTILE_VALUE_MULT = max(FINISHING_HOSTILE_VALUE_MULT, 1.4) |
| |
| world = build_world(obs, inferred_step=_agent_step - 1) |
| if not world.my_planets: |
| return [] |
| act_timeout = _read(config, "actTimeout", 1.0) if config is not None else 1.0 |
| soft_budget = min(SOFT_ACT_DEADLINE, max(0.55, act_timeout * 0.82)) |
| deadline = start_time + soft_budget |
| return plan_moves(world, deadline=deadline) |
|
|
|
|
| __all__ = ["agent", "build_world"] |