"""Minimal local rollout: 1 episode × 1 day × 1 agent (Ollama). No HuggingFace download required — uses your local Ollama model. Usage: cd viral-posts-env .venv/bin/python training/train_local.py Override via env: TASK_HORIZON=1 # days per episode NUM_EPISODES=1 # episodes per round NUM_ROUNDS=1 # outer loop OLLAMA_MODEL=qwen2.5:3b-instruct-q4_K_M TASK=monthly_engage # or monthly_strategic / monthly_competitive """ from __future__ import annotations import json import os import sys import textwrap import time from pathlib import Path os.environ.setdefault("TASK_HORIZON", "1") REPO_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(REPO_ROOT)) import httpx # noqa: E402 from models import ScheduledAction, ToolCall, ViraltestAction # noqa: E402 from server.viraltest_environment import ( # noqa: E402 TASK_HORIZON, ViraltestEnvironment, get_peak_hours, ) NUM_EPISODES = int(os.environ.get("NUM_EPISODES", "1")) NUM_ROUNDS = int(os.environ.get("NUM_ROUNDS", "1")) OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5:3b-instruct-q4_K_M") OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434") TASK = os.environ.get("TASK", "monthly_engage") SEED = int(os.environ.get("SEED", "42")) TEMPERATURE = float(os.environ.get("TEMPERATURE", "0.7")) OUT_DIR = REPO_ROOT / "plots" OUT_DIR.mkdir(parents=True, exist_ok=True) LOG_PATH = OUT_DIR / "train_local_log.jsonl" print(f"[config] task_horizon={TASK_HORIZON} episodes={NUM_EPISODES} rounds={NUM_ROUNDS} " f"task={TASK} model={OLLAMA_MODEL}") SYSTEM_PROMPT = textwrap.dedent("""\ You are an Instagram content strategy agent. Each step is one day. RESPONSE FORMAT — return ONLY valid JSON, no markdown: { "tool_calls": [{"name": "", "arguments": {...}}], "scheduled_actions": [ {"hour": 0-23, "action_type": "post|create_content", "content_type": "reel|story|carousel|text_post", "topic": "", "tags": ["..."], "intent": "send_bait|save_bait|watch_bait|like_bait"} ], "notes": "strategy notes" } VALID TOOL ARGS: - niche: tech | lifestyle | fitness | business | food | travel | fashion | beauty | photography | education - segment_id: young_professionals | students | parents | global_night_owls | passive_scrollers - competitor_id: niche_expert | viral_chaser | lifestyle_blogger | b2b_thought_leader | food_creator | fitness_coach | travel_creator POSTING RULES: - Active day: 2-3 `post` actions at peak hours. - Vary `intent` and `content_type`.""") _DAY_NAMES = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] def format_obs(obs, hint_hours: str | None = None) -> str: day_name = _DAY_NAMES[obs.day_of_week] if 0 <= obs.day_of_week < 7 else "?" sig = getattr(obs, "engagement_signals", None) sig_str = "" if sig: sig_str = (f"Signals: watch={sig.watch_time:.3f} " f"sends={sig.sends_per_reach:.3f} saves={sig.saves:.3f}\n") hint = "" if hint_hours: hint = (f"COACH HINT: post 2-3 times today at hours {hint_hours}. " "Set scheduled_actions[i].hour to one of these.\n") return (f"Day: {day_name} | days_elapsed={obs.days_elapsed}\n" f"Energy: {obs.creator_energy:.2f} | Followers: {obs.follower_count}\n" f"Engagement: {obs.engagement_rate:.3f} | Queue: {obs.content_queue_size}\n" f"{sig_str}{hint}Plan today's actions (JSON only):") def parse_model_output(text: str) -> ViraltestAction: text = text.strip() if "```" in text: text = "\n".join(l for l in text.split("\n") if not l.strip().startswith("```")).strip() s, e = text.find("{"), text.rfind("}") + 1 if s >= 0 and e > s: text = text[s:e] try: data = json.loads(text) except Exception: return ViraltestAction(scheduled_actions=[]) tool_calls = [] for tc in data.get("tool_calls", []): if not isinstance(tc, dict) or "name" not in tc: continue args = tc.get("arguments", {}) if isinstance(args, list) and args and isinstance(args[0], dict): args = args[0] if isinstance(args, dict): try: tool_calls.append(ToolCall(name=tc["name"], arguments=args)) except Exception: pass scheduled = [] for a in data.get("scheduled_actions", []): try: scheduled.append(ScheduledAction(**a)) except Exception: pass return ViraltestAction(tool_calls=tool_calls, scheduled_actions=scheduled, notes=data.get("notes")) def ollama_generate(prompt: str, temperature: float = 0.7, num_predict: int = 384) -> str: try: resp = httpx.post( f"{OLLAMA_URL}/api/generate", json={ "model": OLLAMA_MODEL, "prompt": prompt, "system": SYSTEM_PROMPT, "stream": False, "options": {"temperature": temperature, "num_predict": num_predict}, }, timeout=120.0, ) resp.raise_for_status() return resp.json().get("response", "") except Exception as e: print(f" [ollama-error] {type(e).__name__}: {e}") return '{"scheduled_actions": []}' def run_one_episode(task: str, seed: int, log_fp) -> dict: env = ViraltestEnvironment() obs = env.reset(task=task, seed=seed) rewards: list[float] = [] pairs: list[dict] = [] for day in range(1, TASK_HORIZON + 1): if obs.done: break peak = get_peak_hours(obs.day_of_week, top_k=3) hint = ", ".join(f"{h:02d}:00" for h in peak) if peak else None prompt = format_obs(obs, hint_hours=hint) t = time.time() response = ollama_generate(prompt, temperature=TEMPERATURE) gen_s = time.time() - t action = parse_model_output(response) log_fp.write(json.dumps({ "day": day, "task": task, "seed": seed, "prompt": prompt, "response": response, }) + "\n") log_fp.flush() obs = env.step(action) r = obs.reward or 0.0 rewards.append(r) n_posts = sum(1 for sa in action.scheduled_actions if sa.action_type == "post") n_tools = len(action.tool_calls) print(f" day {day}: gen={gen_s:.1f}s posts={n_posts} tools={n_tools} " f"reward={r:.4f} energy={obs.creator_energy:.2f}") pairs.append({"prompt": prompt, "response": response, "reward": r}) grader = (obs.metadata or {}).get("grader_score", 0.0) return { "task": task, "seed": seed, "grader_score": grader, "total_reward": sum(rewards), "rewards": rewards, "final_energy": obs.creator_energy, "follower_delta": obs.follower_count - 10000, "pairs": pairs, } def main() -> None: t_start = time.time() try: info = httpx.get(f"{OLLAMA_URL}/api/tags", timeout=5).json() names = [m["name"] for m in info.get("models", [])] print(f"[ollama] reachable. models: {names}") if OLLAMA_MODEL not in names: print(f" WARNING: {OLLAMA_MODEL} not in {names}. " f"Run: ollama pull {OLLAMA_MODEL}") except Exception as e: print(f"[ollama] NOT reachable at {OLLAMA_URL}: {e}\n Start it with: ollama serve") sys.exit(1) LOG_PATH.write_text("") log_fp = LOG_PATH.open("a") all_results: list[dict] = [] for round_idx in range(NUM_ROUNDS): print(f"\n[round] {round_idx + 1}/{NUM_ROUNDS}") for ep in range(NUM_EPISODES): seed = SEED + ep + round_idx * 100 print(f" [episode] {ep + 1}/{NUM_EPISODES} task={TASK} seed={seed}") t_ep = time.time() result = run_one_episode(TASK, seed, log_fp) all_results.append({"round": round_idx + 1, "ep": ep + 1, **result}) print(f" -> grader={result['grader_score']:.4f} " f"reward={result['total_reward']:.3f} " f"energy={result['final_energy']:.2f} " f"({time.time() - t_ep:.1f}s)") log_fp.close() summary = { "config": { "task_horizon": TASK_HORIZON, "num_episodes": NUM_EPISODES, "num_rounds": NUM_ROUNDS, "model": OLLAMA_MODEL, "task": TASK, "temperature": TEMPERATURE, }, "results": [ {k: v for k, v in r.items() if k != "pairs"} for r in all_results ], "elapsed_seconds": round(time.time() - t_start, 1), } summary_path = OUT_DIR / "train_local_summary.json" summary_path.write_text(json.dumps(summary, indent=2)) print(f"\n[summary] -> {summary_path}") print(f"[log] -> {LOG_PATH}") print(f"[done] {time.time() - t_start:.1f}s total") print("\nResults:") for r in all_results: print(f" round={r['round']} ep={r['ep']} task={r['task']} " f"grader={r['grader_score']:.4f} reward={r['total_reward']:.3f}") if __name__ == "__main__": main()