| """Soci — LLM-powered city population simulator. |
| |
| Usage: |
| python main.py [--ticks N] [--agents N] [--speed SPEED] |
| |
| Controls: |
| Press Ctrl+C to pause and save the simulation. |
| |
| Persistence: |
| The simulation auto-saves every 6 in-game hours and on exit. |
| Next run automatically resumes from the last save. |
| Use --fresh to discard the save and start a new city. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import asyncio |
| import logging |
| import os |
| import sys |
| from pathlib import Path |
|
|
| from dotenv import load_dotenv |
| from rich.console import Console |
| from rich.layout import Layout |
| from rich.live import Live |
| from rich.panel import Panel |
| from rich.table import Table |
| from rich.text import Text |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parent / "src")) |
|
|
| from soci.engine.llm import create_llm_client |
| from soci.engine.simulation import Simulation |
| from soci.persistence.database import Database |
| from soci.persistence.snapshots import save_simulation, load_simulation |
| from soci.world.city import City |
| from soci.world.clock import SimClock |
|
|
| load_dotenv() |
|
|
| console = Console() |
| logger = logging.getLogger("soci") |
|
|
|
|
| def build_dashboard(sim: Simulation, recent_events: list[str]) -> Layout: |
| """Build the Rich layout for the live dashboard.""" |
| layout = Layout() |
| layout.split_column( |
| Layout(name="header", size=3), |
| Layout(name="body"), |
| Layout(name="footer", size=5), |
| ) |
| layout["body"].split_row( |
| Layout(name="city", ratio=1), |
| Layout(name="events", ratio=2), |
| ) |
|
|
| |
| clock = sim.clock |
| weather = sim.events.weather.value |
| cost = f"${sim.llm.usage.estimated_cost_usd:.4f}" |
| calls = sim.llm.usage.total_calls |
| header_text = ( |
| f" SOCI CITY | {clock.datetime_str} ({clock.time_of_day.value}) | " |
| f"Weather: {weather} | Agents: {len(sim.agents)} | " |
| f"API calls: {calls} | Cost: {cost}" |
| ) |
| layout["header"].update(Panel(header_text, style="bold white on blue")) |
|
|
| |
| loc_table = Table(title="City Locations", expand=True, show_lines=True) |
| loc_table.add_column("Location", style="cyan", width=20) |
| loc_table.add_column("People", style="green") |
| loc_table.add_column("#", style="yellow", width=3) |
|
|
| for loc in sim.city.locations.values(): |
| occupants = [] |
| for aid in loc.occupants: |
| agent = sim.agents.get(aid) |
| if agent: |
| state_icon = { |
| "idle": ".", |
| "working": "W", |
| "eating": "E", |
| "sleeping": "Z", |
| "socializing": "S", |
| "exercising": "X", |
| "in_conversation": "C", |
| "moving": ">", |
| "shopping": "$", |
| "relaxing": "~", |
| }.get(agent.state.value, "?") |
| occupants.append(f"{agent.name}[{state_icon}]") |
| loc_table.add_row( |
| loc.name, |
| ", ".join(occupants) if occupants else "-", |
| str(len(loc.occupants)), |
| ) |
|
|
| layout["city"].update(Panel(loc_table)) |
|
|
| |
| event_text = "\n".join(recent_events[-25:]) if recent_events else "Simulation starting..." |
| layout["events"].update(Panel(event_text, title="Recent Activity", border_style="green")) |
|
|
| |
| footer_parts = [] |
| for agent in list(sim.agents.values())[:10]: |
| mood_bar = "+" * max(0, int((agent.mood + 1) * 3)) + "-" * max(0, int((1 - agent.mood) * 3)) |
| urgent = agent.needs.most_urgent |
| footer_parts.append(f"{agent.name[:8]}: [{mood_bar}] need:{urgent[:4]}") |
| footer_text = " | ".join(footer_parts) |
| layout["footer"].update(Panel(footer_text, title="Agent Status", border_style="dim")) |
|
|
| return layout |
|
|
|
|
| async def run_simulation( |
| ticks: int = 96, |
| max_agents: int = 100, |
| tick_delay: float = 0.5, |
| fresh: bool = False, |
| generate: bool = False, |
| provider: str = "", |
| model: str = "", |
| ) -> None: |
| """Run the simulation with a live Rich dashboard.""" |
| |
| console.print("[bold blue]Initializing Soci City Simulation...[/]") |
|
|
| try: |
| llm = create_llm_client( |
| provider=provider or None, |
| model=model or None, |
| ) |
| console.print(f"[green]LLM provider: {llm.provider} (model: {llm.default_model})[/]") |
| except (ValueError, ConnectionError) as e: |
| console.print(f"[bold red]Error: {e}[/]") |
| return |
|
|
| db = Database() |
| await db.connect() |
|
|
| sim = None |
| if not fresh: |
| |
| sim = await load_simulation(db, llm) |
| if sim: |
| console.print( |
| f"[green]Resumed simulation: Day {sim.clock.day}, {sim.clock.time_str} " |
| f"(tick {sim.clock.total_ticks}, {len(sim.agents)} agents)[/]" |
| ) |
|
|
| if sim is None: |
| if fresh: |
| console.print("[yellow]Starting fresh simulation (ignoring any previous save).[/]") |
| else: |
| console.print("[dim]No previous save found — starting new simulation.[/]") |
|
|
| config_dir = Path(__file__).parent / "config" |
| city = City.from_yaml(str(config_dir / "city.yaml")) |
| clock = SimClock(tick_minutes=15, hour=6, minute=0) |
| sim = Simulation(city=city, clock=clock, llm=llm) |
|
|
| |
| sim.load_agents_from_yaml(str(config_dir / "personas.yaml")) |
| yaml_count = len(sim.agents) |
| console.print(f"[green]Loaded {yaml_count} YAML agents.[/]") |
|
|
| |
| gen_count = max_agents - yaml_count |
| if (generate or gen_count > 0) and gen_count > 0: |
| sim.generate_agents(gen_count) |
| console.print(f"[green]Generated {gen_count} procedural agents ({len(sim.agents)} total).[/]") |
| else: |
| console.print(f"[green]Created new simulation with {len(sim.agents)} agents.[/]") |
|
|
| |
| if max_agents < len(sim.agents): |
| agent_ids = list(sim.agents.keys())[:max_agents] |
| sim.agents = {aid: sim.agents[aid] for aid in agent_ids} |
| console.print(f"[yellow]Limited to {max_agents} agents.[/]") |
|
|
| |
| all_events: list[str] = [] |
|
|
| def on_event(msg: str): |
| all_events.append(msg) |
|
|
| sim.on_event = on_event |
|
|
| console.print(f"[bold green]Starting simulation: {ticks} ticks ({ticks * 15 // 60} hours)[/]") |
| console.print("[dim]Press Ctrl+C to pause and save.[/]") |
|
|
| try: |
| with Live(build_dashboard(sim, all_events), refresh_per_second=2, console=console) as live: |
| for tick_num in range(ticks): |
| tick_events = await sim.tick() |
|
|
| |
| live.update(build_dashboard(sim, all_events)) |
|
|
| |
| if tick_num > 0 and tick_num % 24 == 0: |
| await save_simulation(sim, db, "autosave") |
|
|
| |
| await asyncio.sleep(tick_delay) |
|
|
| except KeyboardInterrupt: |
| console.print("\n[yellow]Simulation paused.[/]") |
|
|
| |
| await save_simulation(sim, db, "autosave") |
|
|
| |
| console.print("\n[bold blue]Simulation Summary[/]") |
| console.print(f" Time: {sim.clock.datetime_str}") |
| console.print(f" Total ticks: {sim.clock.total_ticks}") |
| console.print(f" {sim.llm.usage.summary()}") |
|
|
| |
| console.print("\n[bold]Agent Status:[/]") |
| for agent in sim.agents.values(): |
| mood_emoji = "+" if agent.mood > 0.2 else ("-" if agent.mood < -0.2 else "~") |
| loc = sim.city.get_location(agent.location) |
| loc_name = loc.name if loc else agent.location |
| console.print( |
| f" [{mood_emoji}] {agent.name} ({agent.persona.occupation}) " |
| f"at {loc_name} — {agent.needs.describe()}" |
| ) |
|
|
| await db.close() |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Soci — City Population Simulator") |
| parser.add_argument("--ticks", type=int, default=96, help="Number of ticks to simulate (default: 96 = 1 day)") |
| parser.add_argument("--agents", type=int, default=100, help="Max number of agents (default: 100)") |
| parser.add_argument("--speed", type=float, default=0.5, help="Delay between ticks in seconds (default: 0.5)") |
| parser.add_argument("--fresh", action="store_true", |
| help="Discard the autosave and start a brand-new simulation") |
| parser.add_argument("--generate", action="store_true", |
| help="Generate procedural agents to fill up to --agents count") |
| parser.add_argument("--provider", type=str, default="", choices=["", "claude", "groq", "ollama"], |
| help="LLM provider: claude, groq, or ollama (default: auto-detect)") |
| parser.add_argument("--model", type=str, default="", |
| help="Model name (e.g. llama3.1:8b, mistral, qwen2.5)") |
| args = parser.parse_args() |
|
|
| Path("data").mkdir(exist_ok=True) |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s %(name)s %(levelname)s %(message)s", |
| handlers=[logging.FileHandler("data/soci.log", mode="a", encoding="utf-8")], |
| ) |
|
|
| asyncio.run(run_simulation( |
| ticks=args.ticks, |
| max_agents=args.agents, |
| tick_delay=args.speed, |
| fresh=args.fresh, |
| generate=args.generate, |
| provider=args.provider, |
| model=args.model, |
| )) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|