final-iteration / training /train_local.py
anuragredbus's picture
training: add local run script and generated local summaries
dda4654
"""Minimal local rollout: 1 episode × 1 day × 1 agent (Ollama).
No HuggingFace download required — uses your local Ollama model.
Usage:
cd viral-posts-env
.venv/bin/python training/train_local.py
Override via env:
TASK_HORIZON=1 # days per episode
NUM_EPISODES=1 # episodes per round
NUM_ROUNDS=1 # outer loop
OLLAMA_MODEL=qwen2.5:3b-instruct-q4_K_M
TASK=monthly_engage # or monthly_strategic / monthly_competitive
"""
from __future__ import annotations
import json
import os
import sys
import textwrap
import time
from pathlib import Path
os.environ.setdefault("TASK_HORIZON", "1")
REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO_ROOT))
import httpx # noqa: E402
from models import ScheduledAction, ToolCall, ViraltestAction # noqa: E402
from server.viraltest_environment import ( # noqa: E402
TASK_HORIZON,
ViraltestEnvironment,
get_peak_hours,
)
NUM_EPISODES = int(os.environ.get("NUM_EPISODES", "1"))
NUM_ROUNDS = int(os.environ.get("NUM_ROUNDS", "1"))
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5:3b-instruct-q4_K_M")
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434")
TASK = os.environ.get("TASK", "monthly_engage")
SEED = int(os.environ.get("SEED", "42"))
TEMPERATURE = float(os.environ.get("TEMPERATURE", "0.7"))
OUT_DIR = REPO_ROOT / "plots"
OUT_DIR.mkdir(parents=True, exist_ok=True)
LOG_PATH = OUT_DIR / "train_local_log.jsonl"
print(f"[config] task_horizon={TASK_HORIZON} episodes={NUM_EPISODES} rounds={NUM_ROUNDS} "
f"task={TASK} model={OLLAMA_MODEL}")
SYSTEM_PROMPT = textwrap.dedent("""\
You are an Instagram content strategy agent. Each step is one day.
RESPONSE FORMAT — return ONLY valid JSON, no markdown:
{
"tool_calls": [{"name": "<tool>", "arguments": {...}}],
"scheduled_actions": [
{"hour": 0-23, "action_type": "post|create_content",
"content_type": "reel|story|carousel|text_post",
"topic": "<string>", "tags": ["..."],
"intent": "send_bait|save_bait|watch_bait|like_bait"}
],
"notes": "strategy notes"
}
VALID TOOL ARGS:
- niche: tech | lifestyle | fitness | business | food | travel | fashion | beauty | photography | education
- segment_id: young_professionals | students | parents | global_night_owls | passive_scrollers
- competitor_id: niche_expert | viral_chaser | lifestyle_blogger | b2b_thought_leader | food_creator | fitness_coach | travel_creator
POSTING RULES:
- Active day: 2-3 `post` actions at peak hours.
- Vary `intent` and `content_type`.""")
_DAY_NAMES = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
def format_obs(obs, hint_hours: str | None = None) -> str:
day_name = _DAY_NAMES[obs.day_of_week] if 0 <= obs.day_of_week < 7 else "?"
sig = getattr(obs, "engagement_signals", None)
sig_str = ""
if sig:
sig_str = (f"Signals: watch={sig.watch_time:.3f} "
f"sends={sig.sends_per_reach:.3f} saves={sig.saves:.3f}\n")
hint = ""
if hint_hours:
hint = (f"COACH HINT: post 2-3 times today at hours {hint_hours}. "
"Set scheduled_actions[i].hour to one of these.\n")
return (f"Day: {day_name} | days_elapsed={obs.days_elapsed}\n"
f"Energy: {obs.creator_energy:.2f} | Followers: {obs.follower_count}\n"
f"Engagement: {obs.engagement_rate:.3f} | Queue: {obs.content_queue_size}\n"
f"{sig_str}{hint}Plan today's actions (JSON only):")
def parse_model_output(text: str) -> ViraltestAction:
text = text.strip()
if "```" in text:
text = "\n".join(l for l in text.split("\n") if not l.strip().startswith("```")).strip()
s, e = text.find("{"), text.rfind("}") + 1
if s >= 0 and e > s:
text = text[s:e]
try:
data = json.loads(text)
except Exception:
return ViraltestAction(scheduled_actions=[])
tool_calls = []
for tc in data.get("tool_calls", []):
if not isinstance(tc, dict) or "name" not in tc:
continue
args = tc.get("arguments", {})
if isinstance(args, list) and args and isinstance(args[0], dict):
args = args[0]
if isinstance(args, dict):
try:
tool_calls.append(ToolCall(name=tc["name"], arguments=args))
except Exception:
pass
scheduled = []
for a in data.get("scheduled_actions", []):
try:
scheduled.append(ScheduledAction(**a))
except Exception:
pass
return ViraltestAction(tool_calls=tool_calls, scheduled_actions=scheduled,
notes=data.get("notes"))
def ollama_generate(prompt: str, temperature: float = 0.7, num_predict: int = 384) -> str:
try:
resp = httpx.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": OLLAMA_MODEL,
"prompt": prompt,
"system": SYSTEM_PROMPT,
"stream": False,
"options": {"temperature": temperature, "num_predict": num_predict},
},
timeout=120.0,
)
resp.raise_for_status()
return resp.json().get("response", "")
except Exception as e:
print(f" [ollama-error] {type(e).__name__}: {e}")
return '{"scheduled_actions": []}'
def run_one_episode(task: str, seed: int, log_fp) -> dict:
env = ViraltestEnvironment()
obs = env.reset(task=task, seed=seed)
rewards: list[float] = []
pairs: list[dict] = []
for day in range(1, TASK_HORIZON + 1):
if obs.done:
break
peak = get_peak_hours(obs.day_of_week, top_k=3)
hint = ", ".join(f"{h:02d}:00" for h in peak) if peak else None
prompt = format_obs(obs, hint_hours=hint)
t = time.time()
response = ollama_generate(prompt, temperature=TEMPERATURE)
gen_s = time.time() - t
action = parse_model_output(response)
log_fp.write(json.dumps({
"day": day, "task": task, "seed": seed,
"prompt": prompt, "response": response,
}) + "\n")
log_fp.flush()
obs = env.step(action)
r = obs.reward or 0.0
rewards.append(r)
n_posts = sum(1 for sa in action.scheduled_actions if sa.action_type == "post")
n_tools = len(action.tool_calls)
print(f" day {day}: gen={gen_s:.1f}s posts={n_posts} tools={n_tools} "
f"reward={r:.4f} energy={obs.creator_energy:.2f}")
pairs.append({"prompt": prompt, "response": response, "reward": r})
grader = (obs.metadata or {}).get("grader_score", 0.0)
return {
"task": task, "seed": seed,
"grader_score": grader,
"total_reward": sum(rewards),
"rewards": rewards,
"final_energy": obs.creator_energy,
"follower_delta": obs.follower_count - 10000,
"pairs": pairs,
}
def main() -> None:
t_start = time.time()
try:
info = httpx.get(f"{OLLAMA_URL}/api/tags", timeout=5).json()
names = [m["name"] for m in info.get("models", [])]
print(f"[ollama] reachable. models: {names}")
if OLLAMA_MODEL not in names:
print(f" WARNING: {OLLAMA_MODEL} not in {names}. "
f"Run: ollama pull {OLLAMA_MODEL}")
except Exception as e:
print(f"[ollama] NOT reachable at {OLLAMA_URL}: {e}\n Start it with: ollama serve")
sys.exit(1)
LOG_PATH.write_text("")
log_fp = LOG_PATH.open("a")
all_results: list[dict] = []
for round_idx in range(NUM_ROUNDS):
print(f"\n[round] {round_idx + 1}/{NUM_ROUNDS}")
for ep in range(NUM_EPISODES):
seed = SEED + ep + round_idx * 100
print(f" [episode] {ep + 1}/{NUM_EPISODES} task={TASK} seed={seed}")
t_ep = time.time()
result = run_one_episode(TASK, seed, log_fp)
all_results.append({"round": round_idx + 1, "ep": ep + 1, **result})
print(f" -> grader={result['grader_score']:.4f} "
f"reward={result['total_reward']:.3f} "
f"energy={result['final_energy']:.2f} "
f"({time.time() - t_ep:.1f}s)")
log_fp.close()
summary = {
"config": {
"task_horizon": TASK_HORIZON,
"num_episodes": NUM_EPISODES,
"num_rounds": NUM_ROUNDS,
"model": OLLAMA_MODEL,
"task": TASK,
"temperature": TEMPERATURE,
},
"results": [
{k: v for k, v in r.items() if k != "pairs"} for r in all_results
],
"elapsed_seconds": round(time.time() - t_start, 1),
}
summary_path = OUT_DIR / "train_local_summary.json"
summary_path.write_text(json.dumps(summary, indent=2))
print(f"\n[summary] -> {summary_path}")
print(f"[log] -> {LOG_PATH}")
print(f"[done] {time.time() - t_start:.1f}s total")
print("\nResults:")
for r in all_results:
print(f" round={r['round']} ep={r['ep']} task={r['task']} "
f"grader={r['grader_score']:.4f} reward={r['total_reward']:.3f}")
if __name__ == "__main__":
main()