Spaces:
Paused
Paused
| """ | |
| Viraltest Environment v2 — Theme #3.1 World-Modeling Simulation. | |
| Multi-day creator optimization with: | |
| - Mosseri-aligned engagement signals (watch_time, sends, saves, likes) | |
| - Discoverable tool catalog (partial observability) | |
| - Piecewise-linear sleep model (Van Dongen 2003) | |
| - Data-driven hour heatmap (Buffer 9.6M + Sprout 2B) | |
| - Tiered audience fatigue (Buffer 2.1M) | |
| - Multi-episode brand persistence | |
| - Counterfactual coach feedback | |
| """ | |
| import json | |
| import math | |
| import random | |
| from collections import defaultdict | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from uuid import uuid4 | |
| from openenv.core.env_server.interfaces import Environment | |
| from openenv.core.env_server.types import State | |
| try: | |
| from ..models import ( | |
| CollabProposal, | |
| DailyInteractions, | |
| EngagementSignals, | |
| HeadlineMetrics, | |
| JudgeReport, | |
| ScheduledAction, | |
| ToolCall, | |
| ToolResult, | |
| ViraltestAction, | |
| ViraltestObservation, | |
| ) | |
| except ImportError: | |
| from models import ( | |
| CollabProposal, | |
| DailyInteractions, | |
| EngagementSignals, | |
| HeadlineMetrics, | |
| JudgeReport, | |
| ScheduledAction, | |
| ToolCall, | |
| ToolResult, | |
| ViraltestAction, | |
| ViraltestObservation, | |
| ) | |
| _DATA_DIR = Path(__file__).parent / "data" | |
| def _load_json(name: str) -> Any: | |
| return json.loads((_DATA_DIR / name).read_text()) | |
| # --------------------------------------------------------------------------- | |
| # Data files (loaded once at module level) | |
| # --------------------------------------------------------------------------- | |
| _TAGS_DATA = _load_json("tags.json") | |
| _TOPICS_DATA = _load_json("topics.json") | |
| _COMPETITORS_DATA = _load_json("competitors.json") | |
| _HEATMAP_DATA = _load_json("hour_heatmap.json") | |
| _AUDIENCE_DATA = _load_json("audience_segments.json") | |
| _OVERLAP_DATA = _load_json("audience_overlap_matrix.json") | |
| # Flatten tag pool for validation | |
| TAG_POOL: List[str] = [] | |
| for t in _TAGS_DATA.get("broad", []): | |
| TAG_POOL.append(t["tag"]) | |
| for _cat, tags in _TAGS_DATA.get("niche", {}).items(): | |
| for t in tags: | |
| TAG_POOL.append(t["tag"]) | |
| for t in _TAGS_DATA.get("trending", []): | |
| TAG_POOL.append(t["tag"]) | |
| for t in _TAGS_DATA.get("seasonal", []): | |
| TAG_POOL.append(t["tag"]) | |
| TOPIC_CATEGORIES: Dict[str, List[str]] = {} | |
| for niche_name, niche_data in _TOPICS_DATA.get("niches", {}).items(): | |
| TOPIC_CATEGORIES[niche_name] = niche_data["topics"] | |
| _NICHE_MULTIPLIERS: Dict[str, float] = {} | |
| for niche_name, niche_data in _TOPICS_DATA.get("niches", {}).items(): | |
| _NICHE_MULTIPLIERS[niche_name] = niche_data["engagement_multiplier"] | |
| _HEATMAP_GRID: Dict[int, List[float]] = { | |
| int(k): v for k, v in _HEATMAP_DATA.get("grid", {}).items() | |
| } | |
| # Mocked niche + follower-count lookups for the collab system. Live in the overlap matrix file | |
| # so the same source-of-truth carries (a) Jaccard overlap, (b) niche label, (c) follower size. | |
| _NICHE_BY_ARCHETYPE: Dict[str, str] = dict(_OVERLAP_DATA.get("niche_by_archetype", {})) | |
| _FOLLOWERS_BY_ARCHETYPE: Dict[str, int] = { | |
| k: int(v) for k, v in _OVERLAP_DATA.get("mock_followers_by_archetype", {}).items() | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Constants (research-backed, Tier 1-3 sources) | |
| # --------------------------------------------------------------------------- | |
| # Episode length in daily env steps. Graders and UI should stay consistent with this value. | |
| TASK_HORIZON = 15 | |
| # Distinct positive tags for full tag_discovery score in strategic/competitive graders. | |
| # Caps at 30 (original month-scale bar); scales down only for very short horizons. | |
| TAG_DISCOVERY_POSITIVE_TARGET = float(max(6, min(30, TASK_HORIZON * 2))) | |
| # Socialinsider 2026 (31M posts) | |
| CONTENT_ENERGY_COST = { | |
| "reel": 0.25, | |
| "carousel": 0.20, | |
| "story": 0.08, | |
| "text_post": 0.06, | |
| } | |
| BASE_ENGAGEMENT = { | |
| "reel": 0.52, | |
| "carousel": 0.55, | |
| "story": 0.30, | |
| "text_post": 0.45, | |
| } | |
| # Socialinsider 2026 + CreatorsJet 10K study | |
| REACH_MULT = { | |
| "reel": 2.25, | |
| "carousel": 1.0, | |
| "story": 0.5, | |
| "text_post": 0.91, | |
| } | |
| # Mosseri Jan-2025: format→signal affinity (which signal each format naturally excels at) | |
| FORMAT_SIGNAL_WEIGHTS = { | |
| "reel": {"watch_time": 0.50, "sends_per_reach": 0.25, "saves": 0.10, "likes_per_reach": 0.15}, | |
| "carousel": {"watch_time": 0.10, "sends_per_reach": 0.15, "saves": 0.50, "likes_per_reach": 0.25}, | |
| "story": {"watch_time": 0.20, "sends_per_reach": 0.40, "saves": 0.05, "likes_per_reach": 0.35}, | |
| "text_post": {"watch_time": 0.05, "sends_per_reach": 0.10, "saves": 0.30, "likes_per_reach": 0.55}, | |
| } | |
| # Intent multiplier matrix: when intent matches format's strong signal, boost that signal | |
| INTENT_MULTIPLIER = { | |
| "send_bait": {"sends_per_reach": 1.6}, | |
| "save_bait": {"saves": 1.7}, | |
| "watch_bait": {"watch_time": 1.5}, | |
| "like_bait": {"likes_per_reach": 1.3}, | |
| } | |
| VALID_TASKS = ("monthly_engage", "monthly_strategic", "monthly_competitive") | |
| INITIAL_FOLLOWERS = 10000 | |
| REST_RECOVERY = 0.12 | |
| CREATE_CONTENT_COST = 0.05 | |
| REPETITION_ENERGY_PENALTY = 0.05 | |
| FOLLOWER_DECAY_HOURS = 72 | |
| ALGORITHM_PENALTY_MULT = 0.6 | |
| ALGORITHM_PENALTY_BASE_DURATION = 2 | |
| # Van Dongen 2003 *Sleep* PMID 12683469: lapses linear above 15.84h | |
| SLEEP_OPTIMAL_AWAKE = 16 | |
| SLEEP_LINEAR_DECAY_PER_HOUR = 0.0625 # reaches ~50% at 24h awake (8h × 0.0625 = 0.5) | |
| SLEEP_MIN_QUALITY = 0.30 | |
| SLEEP_ENERGY_DRAIN_START = 16 | |
| SLEEP_ENERGY_DRAIN_RATE = 0.015 | |
| SLEEP_RECOVERY_PER_REST = 2 | |
| # Buffer 2.1M study + arxiv:2410.13108: tiered fatigue | |
| FATIGUE_TIERS = {2: 1.0, 3: 0.75, 4: 0.50, 5: 0.25} | |
| WEEKLY_FATIGUE_THRESHOLD = 7 | |
| WEEKLY_FATIGUE_MULT = 0.75 | |
| SATURATION_PENALTY_K = 0.25 | |
| TREND_DEFAULT_HALFLIFE_HOURS = 60 | |
| TREND_MATCH_STOPWORDS = {"tips", "guide", "review", "routine", "ideas", "hacks", "tutorial", "the", "a", "an", "and", "of", "for", "to"} | |
| # Collab reward shaping (Later 2023 reach study, HypeAuditor 2024 niche affinity, Rival IQ 2025 overlap patterns, | |
| # Cen et al. 2024 disengagement model for diminishing returns instead of a hard cap). | |
| # Per-partner exhaustion: each collab with the SAME partner re-exposes the user to the same set of followers, | |
| # so spillover and reward multipliers should decay sharply. First 1-2 collabs deliver most of the gain. | |
| # Floor at 0.05 keeps a tiny residual signal so the curve is monotonic but effectively saturates. | |
| COLLAB_PARTNER_REPEAT_DECAY = {0: 1.0, 1: 0.70, 2: 0.35, 3: 0.15} | |
| COLLAB_PARTNER_REPEAT_FLOOR = 0.05 | |
| COLLAB_FATIGUE_K = 0.3 # per-collab diminishing-returns factor: 1/(1+K*prior_collabs_this_episode) | |
| # Niche-aware tiered shaping (overlap = Jaccard intersection fraction). | |
| # Hard rule: any diff-niche multiplier must be < the minimum same-niche-low multiplier | |
| # so the env never recommends a diff-niche collab over an equal-overlap same-niche one. | |
| COLLAB_LOW_OVERLAP_THRESHOLD = 0.20 # < this counts as "low intersection" | |
| COLLAB_HIGH_OVERLAP_THRESHOLD = 0.40 # >= this counts as "high intersection" | |
| COLLAB_GUARDRAIL_OVERLAP_MIN = 0.10 # below this -> recommended=False (intersection-too-low guardrail) | |
| COLLAB_GUARDRAIL_FOLLOWER_GAP_MAX = 0.25 # |partner - user| / max > this -> follower-size mismatch | |
| COLLAB_FORCED_PENALTY_ENG = 0.7 # eng_mult applied if agent ignores guardrail | |
| COLLAB_FORCED_PENALTY_GROWTH = 0.6 # growth_mult applied if agent ignores guardrail | |
| # Same niche, LOW overlap -> HIGH reward (best case). Smoothly interpolated by overlap (low->high uplift as overlap->0). | |
| COLLAB_SAME_LOW_ENG = (1.50, 1.80) | |
| COLLAB_SAME_LOW_GROWTH = (1.60, 2.00) | |
| # Same niche, HIGH overlap -> LOW reward (no point, audience already shared). | |
| COLLAB_SAME_HIGH_ENG = 0.85 | |
| COLLAB_SAME_HIGH_GROWTH = 0.90 | |
| # Diff niche, LOW overlap -> MED reward (cross-pollination, capped < SAME_LOW min). | |
| COLLAB_DIFF_LOW_ENG = (1.20, 1.40) | |
| COLLAB_DIFF_LOW_GROWTH = (1.30, 1.55) | |
| # Diff niche, HIGH overlap -> LOW reward (mismatch). | |
| COLLAB_DIFF_HIGH_ENG = 0.75 | |
| COLLAB_DIFF_HIGH_GROWTH = 0.80 | |
| # Collab is the canonical "wider reach in one shot" lever. On top of the engagement | |
| # multiplier (which only affects the collab-day post), apply two extra mechanisms: | |
| # 1) One-shot follower spillover: partner_followers x (1 - overlap) x growth_mult x K_SPILLOVER. | |
| # Models the partner's audience getting exposed to the user — net new followers, not just engagement. | |
| # 2) Sustained reach buff: 2-3 days post-collab, all posts get a small algorithm boost | |
| # because the collab signal lifts the user's overall recommendability. | |
| COLLAB_SPILLOVER_K = 0.05 # follower spike fraction (0.05 = 5% of partner audience reach if overlap=0) | |
| COLLAB_REACH_CARRYOVER_DAYS = 2 # days the post-collab reach buff persists | |
| COLLAB_REACH_CARRYOVER_MULT = 1.20 # +20% engagement on each post during carryover window | |
| COLLAB_BLOCKED_SPILLOVER_K = 0.01 # forced/guardrail-blocked collabs get only ~20% of normal spillover | |
| # Interaction (likes/comments/replies) tunables | |
| INTERACT_ENERGY_LIKE = 0.005 | |
| INTERACT_ENERGY_COMMENT = 0.012 | |
| INTERACT_ENERGY_REPLY = 0.018 | |
| INTERACT_HEALTHY_LIKES = (5, 20) | |
| INTERACT_HEALTHY_COMMENTS = (3, 10) | |
| INTERACT_LIKE_REACH_BUFF = 0.02 # was 0.04 — interactions are a sustaining lever, not a growth lever | |
| INTERACT_COMMENT_REACH_BUFF = 0.04 # was 0.08 | |
| INTERACT_REPLY_REWARD_PER = 0.005 # was 0.01 | |
| INTERACT_REPLY_REWARD_CAP = 0.08 | |
| INTERACT_DAILY_REWARD_CAP = 0.10 | |
| INTERACT_SPAM_LIKES = 30 | |
| INTERACT_SPAM_COMMENTS = 20 | |
| INTERACT_SPAM_REACH_PENALTY = 0.85 | |
| INTERACT_SPAM_SHADOWBAN_BUMP = 0.20 | |
| INTERACT_IGNORE_THRESHOLD_K = 0.05 | |
| INTERACT_IGNORE_LOYALTY_DECAY = 0.97 | |
| INTERACT_OFFNICHE_THRESHOLD = 0.60 | |
| INTERACT_OFFNICHE_REACH_PENALTY = 0.90 | |
| INTERACT_LOWQ_THRESHOLD = 0.30 | |
| INTERACT_LOWQ_WEIGHT = 0.4 | |
| INTERACT_VERY_LOWQ_THRESHOLD = 0.10 | |
| INTERACT_VERY_LOWQ_PENALTY = -0.03 | |
| API_BUDGET_INITIAL = 10**9 # effectively unlimited; rate-limit removed | |
| # Heuristic baselines for headline metric `vs_baseline_pct`. | |
| # Data-driven: loaded from `plots/training_summary.json["smart_heuristic"]` recorded by | |
| # `training/run_training_evidence.py`. Falls back to conservative calibration constants | |
| # if the file is missing (audit trail: see RESEARCH.md for the rule-based policy spec). | |
| def _load_heuristic_baselines() -> Dict[str, float]: | |
| summary = Path(__file__).parent.parent / "plots" / "training_summary.json" | |
| try: | |
| data = json.loads(summary.read_text()) | |
| empirical = data.get("smart_heuristic") or {} | |
| return {k: float(v) for k, v in empirical.items() if k in VALID_TASKS} | |
| except Exception: | |
| return {} | |
| HEURISTIC_BASELINE_SCORES: Dict[str, float] = _load_heuristic_baselines() or { | |
| "monthly_engage": 0.43, | |
| "monthly_strategic": 0.77, | |
| "monthly_competitive": 0.81, | |
| } | |
| # Cross-episode store for distribution-shift retention. Keyed by episode_chain_id, stores | |
| # {"baseline": score, "shifted": score} so the second run can compute retention_under_shift. | |
| _SHIFT_HISTORY: Dict[str, Dict[str, float]] = {} | |
| # --------------------------------------------------------------------------- | |
| # Brand state for multi-episode persistence | |
| # --------------------------------------------------------------------------- | |
| _BRAND_STORE: Dict[str, Dict[str, Any]] = {} | |
| class CompetitorState: | |
| id: str | |
| name: str | |
| niche: str | |
| niche_topics: List[str] | |
| preferred_types: List[str] | |
| posts_per_week: float | |
| base_engagement_rate: float | |
| tag_preferences: List[str] | |
| style: str | |
| recent_posts: List[Dict[str, Any]] = field(default_factory=list) | |
| # --------------------------------------------------------------------------- | |
| # Tool catalog (schemas for GET /tools) | |
| # --------------------------------------------------------------------------- | |
| TOOL_CATALOG = { | |
| "query_audience": { | |
| "description": "Query a specific audience segment to learn its topic affinities, content preferences, and active hours.", | |
| "parameters": {"segment_id": {"type": "string", "enum": [s["id"] for s in _AUDIENCE_DATA.get("segments", [])]}}, | |
| }, | |
| "query_competitor": { | |
| "description": "Get recent posts and strategy of a competitor archetype within a time window.", | |
| "parameters": { | |
| "competitor_id": {"type": "string", "enum": [a["id"] for a in _COMPETITORS_DATA.get("archetypes", [])]}, | |
| "window_days": {"type": "integer", "default": 7, "minimum": 1, "maximum": 30}, | |
| }, | |
| }, | |
| "query_tag_history": { | |
| "description": "Get your historical engagement signals (watch, sends, saves, likes) for a specific tag.", | |
| "parameters": {"tag": {"type": "string"}}, | |
| }, | |
| "query_trends": { | |
| "description": "Get currently trending topics and tags for a niche, with decay-adjusted strength.", | |
| "parameters": {"niche": {"type": "string", "enum": list(TOPIC_CATEGORIES.keys())}}, | |
| }, | |
| "predict_engagement": { | |
| "description": "Simulate engagement signals for a hypothetical daily plan WITHOUT committing it. Returns predicted watch/sends/saves/likes.", | |
| "parameters": {"scheduled_actions": {"type": "array", "description": "Same format as ViraltestAction.scheduled_actions"}}, | |
| }, | |
| "draft_review": { | |
| "description": "Get AI review of a draft plan: strengths, weaknesses, suggested improvements.", | |
| "parameters": {"scheduled_actions": {"type": "array"}}, | |
| }, | |
| "query_creator_pool": { | |
| "description": "List available competitor archetypes for potential collaboration with audience overlap %, niche match, mocked follower counts, intersection size, and a recommendation flag (recommended=False when guardrails block: zero followers, intersection<10%, or follower-size gap>25%).", | |
| "parameters": {}, | |
| }, | |
| "propose_collab": { | |
| "description": "Propose a collab post with a competitor at a specific hour. The post you schedule at that hour will be co-authored (or auto-injected if absent — collab always pays a post's energy + counts as a post). Reward shaping: same-niche + low overlap = HIGH; same-niche + high overlap = LOW; diff-niche always capped below same-niche-low. Per-partner exhaustion: 1st collab full reward, 2nd 0.70x, 3rd 0.35x, 4th 0.15x, 5th+ 0.05x — partner audiences are tapped once. Guardrail violations apply a 0.7x engagement / 0.6x growth penalty AND surface in the JudgeReport.", | |
| "parameters": { | |
| "partner_id": {"type": "string"}, | |
| "content_type": {"type": "string", "enum": ["reel", "story", "carousel", "text_post"]}, | |
| "hour": {"type": "integer", "minimum": 0, "maximum": 23}, | |
| }, | |
| }, | |
| "query_interaction_norms": { | |
| "description": "Discover healthy daily ranges for likes/comments/replies and the current shadowban_risk. Use before submitting ViraltestAction.interactions.", | |
| "parameters": {}, | |
| }, | |
| } | |
| class ViraltestEnvironment(Environment): | |
| """Monthly creator optimization simulation (Theme #3.1 World Modeling).""" | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self) -> None: | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._task = "monthly_engage" | |
| self._rng = random.Random(42) | |
| self._init_state() | |
| def _init_state(self) -> None: | |
| self._energy = 1.0 | |
| self._followers = INITIAL_FOLLOWERS | |
| self._initial_followers = INITIAL_FOLLOWERS | |
| self._hour = 9 | |
| self._day = 0 | |
| self._posts_today = 0 | |
| self._last_post_types: List[str] = [] | |
| self._time_since_last_post = 0 | |
| self._engagement_history: List[float] = [] | |
| self._tag_history: Dict[str, List[Dict[str, float]]] = defaultdict(list) | |
| self._content_queue = 0 | |
| self._unique_tags_used: set = set() | |
| self._unique_content_types: set = set() | |
| self._energy_history: List[float] = [1.0] | |
| self._posting_steps = 0 | |
| self._episode_done = False | |
| self._last_topic: Optional[str] = None | |
| self._final_observation: Optional[ViraltestObservation] = None | |
| self._unique_topic_steps = 0 | |
| self._days_with_good_posts: set = set() | |
| self._total_engagement = 0.0 | |
| self._posts_per_day: Dict[int, int] = defaultdict(int) | |
| self._algorithm_penalty_remaining = 0 | |
| self._agent_notes: Optional[str] = None | |
| self._api_budget = API_BUDGET_INITIAL | |
| self._collabs_this_month = 0 | |
| self._collab_history: List[str] = [] | |
| self._active_collab: Optional[CollabProposal] = None | |
| self._collab_violations: List[str] = [] # collab guardrail breaches this step | |
| self._collab_carryover_days_remaining: int = 0 | |
| self._last_collab_spillover: int = 0 # diagnostic: followers gained from the most recent collab | |
| self._user_niche: str = _NICHE_BY_ARCHETYPE.get("user_creator", "generic") | |
| # Interaction state | |
| self._pending_reach_mult: float = 1.0 # applied to next day's posts (one-shot) | |
| self._shadowban_risk: float = 0.0 | |
| self._engagement_rate_loyalty_mult: float = 1.0 # compounding loyalty drop from ignoring audience | |
| self._interaction_violations: List[str] = [] | |
| self._last_interaction_summary: Optional[Dict[str, Any]] = None | |
| self._low_energy_days = 0 | |
| self._total_posts_this_week = 0 | |
| self._week_start_day = 0 | |
| self._daily_signals = EngagementSignals() | |
| self._total_tool_calls = 0 | |
| self._total_action_chars = 0 | |
| self._shift_label: Optional[str] = None | |
| self._chain_id: Optional[str] = None | |
| self._trending_topics = self._pick_trending_topics() | |
| self._trending_tags = self._pick_trending_tags() | |
| self._competitors = self._load_competitors() | |
| self._hours_since_sleep = 2 | |
| self._sleep_debt = 0.0 | |
| self._reward_mode = "combined" | |
| def _load_competitors(self) -> List[CompetitorState]: | |
| archetypes = _COMPETITORS_DATA.get("archetypes", []) | |
| return [ | |
| CompetitorState( | |
| id=a["id"], | |
| name=a["name"], | |
| niche=a["niche"], | |
| niche_topics=a["niche_topics"], | |
| preferred_types=a["preferred_types"], | |
| posts_per_week=a["posts_per_week"], | |
| base_engagement_rate=a["base_engagement_rate"], | |
| tag_preferences=a["tag_preferences"], | |
| style=a.get("style", "consistent_moderate"), | |
| ) | |
| for a in archetypes | |
| ] | |
| def _pick_trending_topics(self) -> List[str]: | |
| all_topics = [] | |
| for niche_data in _TOPICS_DATA.get("niches", {}).values(): | |
| all_topics.extend(niche_data["topics"]) | |
| return self._rng.sample(all_topics, min(3, len(all_topics))) | |
| def _pick_trending_tags(self) -> List[str]: | |
| return self._rng.sample(TAG_POOL, min(5, len(TAG_POOL))) | |
| def _rotate_trends(self) -> None: | |
| self._trending_topics = self._pick_trending_topics() | |
| self._trending_tags = self._pick_trending_tags() | |
| # ----- hour multiplier (heatmap-based) ----- | |
| def _get_hour_multiplier(self) -> float: | |
| dow = self._day % 7 | |
| h = self._hour | |
| row = _HEATMAP_GRID.get(dow) | |
| if row and 0 <= h < len(row): | |
| return row[h] | |
| return 0.8 | |
| # ----- quality (piecewise-linear sleep, Van Dongen 2003) ----- | |
| def _get_quality_modifier(self) -> float: | |
| if self._energy > 0.5: | |
| energy_factor = 1.0 | |
| else: | |
| energy_factor = max(0.48, self._energy * 1.5) | |
| if self._hours_since_sleep <= SLEEP_OPTIMAL_AWAKE: | |
| sleep_factor = 1.0 | |
| else: | |
| hours_over = self._hours_since_sleep - SLEEP_OPTIMAL_AWAKE | |
| sleep_factor = max(SLEEP_MIN_QUALITY, 1.0 - SLEEP_LINEAR_DECAY_PER_HOUR * hours_over) | |
| return energy_factor * sleep_factor | |
| # ----- niche multiplier ----- | |
| def _get_niche_multiplier(self, topic: Optional[str]) -> float: | |
| if not topic: | |
| return 1.0 | |
| topic_lower = topic.lower() | |
| for niche_name, niche_data in _TOPICS_DATA.get("niches", {}).items(): | |
| for t in niche_data["topics"]: | |
| if t.lower() == topic_lower: | |
| return _NICHE_MULTIPLIERS.get(niche_name, 1.0) | |
| return 1.0 | |
| # ----- tags ----- | |
| def _calc_tag_boost(self, tags: Optional[List[str]]) -> float: | |
| if not tags: | |
| return 1.0 | |
| trending_count = sum(1 for t in tags if t in self._trending_tags) | |
| perf_values = [self._tag_performance_avg(t) for t in tags if self._tag_performance_avg(t) > 0] | |
| perf_avg = sum(perf_values) / len(perf_values) if perf_values else 0.0 | |
| return 1.0 + 0.1 * trending_count + 0.05 * perf_avg | |
| def _tag_performance_avg(self, tag: str) -> float: | |
| history = self._tag_history.get(tag, []) | |
| if not history: | |
| return 0.0 | |
| window = history[-5:] | |
| totals = [h.get("total", 0.0) for h in window] | |
| return sum(totals) / len(totals) if totals else 0.0 | |
| # ----- competitors ----- | |
| def _advance_competitors(self) -> None: | |
| for comp in self._competitors: | |
| for p in comp.recent_posts: | |
| p["hours_ago"] += 1 | |
| comp.recent_posts = [p for p in comp.recent_posts if p["hours_ago"] < 72] | |
| daily_prob = comp.posts_per_week / (7.0 * 24.0) | |
| if self._rng.random() < daily_prob: | |
| ct = self._rng.choice(comp.preferred_types) | |
| topic = self._rng.choice(comp.niche_topics) | |
| tags = self._rng.sample(comp.tag_preferences, min(3, len(comp.tag_preferences))) | |
| eng = comp.base_engagement_rate + self._rng.uniform(-0.1, 0.1) | |
| eng = max(0.0, min(1.0, eng)) | |
| comp.recent_posts.append({ | |
| "content_type": ct, "topic": topic, "tags": tags, | |
| "engagement": round(eng, 3), "hours_ago": 0, | |
| }) | |
| def _get_competitor_avg_engagement(self) -> float: | |
| engagements = [p["engagement"] for comp in self._competitors for p in comp.recent_posts] | |
| return sum(engagements) / len(engagements) if engagements else 0.0 | |
| def _calc_niche_saturation(self, topic: Optional[str]) -> float: | |
| if not topic: | |
| return 0.0 | |
| recent_topics = [] | |
| for comp in self._competitors: | |
| for p in comp.recent_posts: | |
| if p["hours_ago"] < 12: | |
| recent_topics.append(p["topic"].lower()) | |
| if not recent_topics: | |
| return 0.0 | |
| topic_lower = topic.lower() | |
| overlap = sum(1 for t in recent_topics if _topic_overlap(topic_lower, t)) | |
| return min(1.0, overlap / max(1, len(recent_topics))) | |
| def _calc_competitor_diff(self, topic: Optional[str]) -> float: | |
| if not topic: | |
| return 1.0 | |
| saturation = self._calc_niche_saturation(topic) | |
| recent_topics = [ | |
| p["topic"].lower() | |
| for comp in self._competitors | |
| for p in comp.recent_posts | |
| if p["hours_ago"] < 12 | |
| ] | |
| has_overlap = any(_topic_overlap(topic.lower(), t) for t in recent_topics) | |
| if not has_overlap: | |
| return 1.3 | |
| if saturation > 0.7: | |
| return 0.6 | |
| return 1.0 | |
| def _count_competitors_same_hour(self) -> int: | |
| count = 0 | |
| for comp in self._competitors: | |
| for p in comp.recent_posts: | |
| if p["hours_ago"] <= 1: | |
| count += 1 | |
| return count | |
| # ----- fatigue (tiered, Buffer 2.1M) ----- | |
| def _get_fatigue_multiplier(self) -> float: | |
| if self._posts_today <= 2: | |
| daily_fatigue = 1.0 | |
| elif self._posts_today in FATIGUE_TIERS: | |
| daily_fatigue = FATIGUE_TIERS[self._posts_today] | |
| else: | |
| daily_fatigue = 0.25 | |
| weekly_mult = 1.0 | |
| if self._total_posts_this_week >= WEEKLY_FATIGUE_THRESHOLD: | |
| weekly_mult = WEEKLY_FATIGUE_MULT | |
| return daily_fatigue * weekly_mult | |
| # ----- collab evaluation (niche-aware, overlap-tiered) ----- | |
| def _user_partner_overlap(self, partner_id: str) -> Optional[float]: | |
| ids = _OVERLAP_DATA.get("archetype_ids", []) | |
| if "user_creator" not in ids or partner_id not in ids: | |
| return None | |
| u = ids.index("user_creator") | |
| p = ids.index(partner_id) | |
| return _OVERLAP_DATA["matrix"][u][p] | |
| def _partner_niche(self, partner_id: str) -> str: | |
| return _NICHE_BY_ARCHETYPE.get(partner_id, "generic") | |
| def _partner_followers(self, partner_id: str) -> int: | |
| return _FOLLOWERS_BY_ARCHETYPE.get(partner_id, 0) | |
| def _partner_prior_count(self, partner_id: str) -> int: | |
| """How many times we've already collabed with this partner THIS episode (excludes current entry).""" | |
| if not self._collab_history: | |
| return 0 | |
| return self._collab_history[:-1].count(partner_id) | |
| def _partner_repeat_decay(self, partner_id: str) -> float: | |
| """Multiplier in [floor, 1.0] that scales spillover + tier multipliers based on prior count. | |
| First collab = 1.0x (no decay), 2nd = 0.70, 3rd = 0.35, 4th = 0.15, 5th+ = 0.05. | |
| """ | |
| n = self._partner_prior_count(partner_id) | |
| return COLLAB_PARTNER_REPEAT_DECAY.get(n, COLLAB_PARTNER_REPEAT_FLOOR) | |
| def _collab_default_topic(self, partner_niche: str) -> str: | |
| """Pick a topic for an auto-injected collab post. | |
| Prefer the user's niche (so the post lands in user-niche distribution); fall back to | |
| the partner's niche; final fallback is the first known topic. | |
| """ | |
| for niche in (self._user_niche, partner_niche): | |
| topics = TOPIC_CATEGORIES.get(niche) | |
| if topics: | |
| return topics[0] | |
| # Fallback: first topic of any known niche. | |
| for topics in TOPIC_CATEGORIES.values(): | |
| if topics: | |
| return topics[0] | |
| return "collaboration" | |
| def _collab_default_tags(self, partner_niche: str) -> List[str]: | |
| # Pull a couple of trending tags so the auto-post benefits from the existing tag boost. | |
| return list(self._trending_tags[:2]) if self._trending_tags else [] | |
| def _interp(span: Tuple[float, float], t: float) -> float: | |
| """Linear interp from span[0] (t=0) to span[1] (t=1).""" | |
| t = max(0.0, min(1.0, t)) | |
| return span[0] + (span[1] - span[0]) * t | |
| def _collab_tier_multipliers(self, same_niche: bool, overlap: float) -> Tuple[float, float]: | |
| """Pure 2x2 tier shaping (no fatigue/repeat/guardrail effects yet).""" | |
| # Smooth interp factor: how "low" is this overlap on the [0, LOW_THRESHOLD] scale. | |
| low_t = 1.0 - min(1.0, overlap / COLLAB_LOW_OVERLAP_THRESHOLD) # 1 at overlap=0, 0 at threshold | |
| if same_niche: | |
| if overlap < COLLAB_LOW_OVERLAP_THRESHOLD: | |
| eng = self._interp(COLLAB_SAME_LOW_ENG, low_t) | |
| growth = self._interp(COLLAB_SAME_LOW_GROWTH, low_t) | |
| elif overlap >= COLLAB_HIGH_OVERLAP_THRESHOLD: | |
| eng = COLLAB_SAME_HIGH_ENG | |
| growth = COLLAB_SAME_HIGH_GROWTH | |
| else: | |
| # Mid-band linear interpolation between LOW endpoint (overlap=LOW_TH) and HIGH endpoint (overlap=HIGH_TH). | |
| mid_t = (overlap - COLLAB_LOW_OVERLAP_THRESHOLD) / (COLLAB_HIGH_OVERLAP_THRESHOLD - COLLAB_LOW_OVERLAP_THRESHOLD) | |
| eng = self._interp((COLLAB_SAME_LOW_ENG[0], COLLAB_SAME_HIGH_ENG), mid_t) | |
| growth = self._interp((COLLAB_SAME_LOW_GROWTH[0], COLLAB_SAME_HIGH_GROWTH), mid_t) | |
| else: | |
| if overlap < COLLAB_LOW_OVERLAP_THRESHOLD: | |
| eng = self._interp(COLLAB_DIFF_LOW_ENG, low_t) | |
| growth = self._interp(COLLAB_DIFF_LOW_GROWTH, low_t) | |
| elif overlap >= COLLAB_HIGH_OVERLAP_THRESHOLD: | |
| eng = COLLAB_DIFF_HIGH_ENG | |
| growth = COLLAB_DIFF_HIGH_GROWTH | |
| else: | |
| mid_t = (overlap - COLLAB_LOW_OVERLAP_THRESHOLD) / (COLLAB_HIGH_OVERLAP_THRESHOLD - COLLAB_LOW_OVERLAP_THRESHOLD) | |
| eng = self._interp((COLLAB_DIFF_LOW_ENG[0], COLLAB_DIFF_HIGH_ENG), mid_t) | |
| growth = self._interp((COLLAB_DIFF_LOW_GROWTH[0], COLLAB_DIFF_HIGH_GROWTH), mid_t) | |
| # Hard rule: diff-niche must always be < same-niche-low minimum (cap just below). | |
| eng = min(eng, COLLAB_SAME_LOW_ENG[0] - 0.01) | |
| growth = min(growth, COLLAB_SAME_LOW_GROWTH[0] - 0.01) | |
| return eng, growth | |
| def _collab_evaluation(self, partner_id: str) -> Dict[str, Any]: | |
| """Single source of truth: tier reward + guardrails + final multipliers (after fatigue/repeat). | |
| Returns a dict consumable by both query_creator_pool (for recommendation surface) | |
| and _process_hour_action (for applied multipliers). | |
| """ | |
| overlap = self._user_partner_overlap(partner_id) | |
| if overlap is None: | |
| return { | |
| "partner_id": partner_id, | |
| "overlap": None, | |
| "same_niche": False, | |
| "partner_followers": 0, | |
| "user_followers": self._followers, | |
| "follower_gap_pct": 1.0, | |
| "intersection_size": 0, | |
| "recommended": False, | |
| "reason": "unknown_partner", | |
| "tier_eng_mult": 1.0, | |
| "tier_growth_mult": 1.0, | |
| "eng_mult": 1.0, | |
| "growth_mult": 1.0, | |
| } | |
| partner_niche = self._partner_niche(partner_id) | |
| same_niche = partner_niche == self._user_niche | |
| partner_followers = self._partner_followers(partner_id) | |
| user_followers = max(0, int(self._followers)) | |
| denom = max(1, max(partner_followers, user_followers)) | |
| gap_pct = abs(partner_followers - user_followers) / denom if denom else 1.0 | |
| # Mock intersection size via Jaccard inversion: union ≈ (|A|+|B|)/(1+overlap), intersection = overlap*union. | |
| union_approx = (partner_followers + user_followers) / (1.0 + overlap) if overlap >= 0 else 0.0 | |
| intersection_size = int(round(overlap * union_approx)) | |
| # Guardrails (in priority order) | |
| recommended = True | |
| reason: Optional[str] = None | |
| if partner_followers <= 0: | |
| recommended = False | |
| reason = "partner_zero_followers" | |
| elif overlap < COLLAB_GUARDRAIL_OVERLAP_MIN: | |
| recommended = False | |
| reason = "intersection_below_10pct" | |
| elif gap_pct > COLLAB_GUARDRAIL_FOLLOWER_GAP_MAX: | |
| recommended = False | |
| reason = "follower_size_mismatch" | |
| tier_eng, tier_growth = self._collab_tier_multipliers(same_niche, overlap) | |
| eng_mult = tier_eng | |
| growth_mult = tier_growth | |
| # Per-partner exhaustion: spillover + tier mults degrade per repeat with this same partner. | |
| # 1st = 1.0, 2nd = 0.70, 3rd = 0.35, 4th = 0.15, 5th+ = 0.05. | |
| repeat_decay = self._partner_repeat_decay(partner_id) | |
| eng_mult *= repeat_decay | |
| growth_mult *= repeat_decay | |
| # Diminishing returns across the episode (Cen 2024) — applies regardless of partner. | |
| prior = max(0, self._collabs_this_month - 1) | |
| fatigue = 1.0 / (1.0 + COLLAB_FATIGUE_K * prior) | |
| eng_mult *= fatigue | |
| growth_mult *= fatigue | |
| return { | |
| "partner_id": partner_id, | |
| "overlap": round(overlap, 3), | |
| "same_niche": same_niche, | |
| "partner_niche": partner_niche, | |
| "user_niche": self._user_niche, | |
| "partner_followers": partner_followers, | |
| "user_followers": user_followers, | |
| "follower_gap_pct": round(gap_pct, 3), | |
| "intersection_size": intersection_size, | |
| "recommended": recommended, | |
| "reason": reason, | |
| "tier_eng_mult": round(tier_eng, 3), | |
| "tier_growth_mult": round(tier_growth, 3), | |
| "eng_mult": round(eng_mult, 3), | |
| "growth_mult": round(growth_mult, 3), | |
| "prior_count_with_partner": self._partner_prior_count(partner_id), | |
| "repeat_decay": round(repeat_decay, 3), | |
| } | |
| def _collab_multipliers(self, partner_id: str) -> Tuple[float, float]: | |
| """Returns (engagement_multiplier, follower_growth_multiplier). | |
| Applies guardrail penalties when the agent forces a non-recommended collab. | |
| Side effect: appends to self._collab_violations for the JudgeReport. | |
| """ | |
| ev = self._collab_evaluation(partner_id) | |
| eng = ev["eng_mult"] | |
| growth = ev["growth_mult"] | |
| if not ev["recommended"]: | |
| eng *= COLLAB_FORCED_PENALTY_ENG | |
| growth *= COLLAB_FORCED_PENALTY_GROWTH | |
| self._collab_violations.append( | |
| f"collab_guardrail:{ev.get('reason', 'blocked')}@{partner_id}" | |
| ) | |
| return eng, growth | |
| # ----- interactions (likes/comments/replies) ----- | |
| def _process_interactions( | |
| self, interactions: Optional[DailyInteractions] | |
| ) -> Tuple[float, Dict[str, Any]]: | |
| """Apply daily interaction effects: energy cost, reach buffs (next post), and 5 penalty paths. | |
| Returns (reward_delta, summary_dict). The reward_delta is added to today's averaged reward; | |
| reach effects propagate via self._pending_reach_mult (consumed at next _process_hour_action). | |
| Loyalty effects propagate via self._engagement_rate_loyalty_mult (compounding). | |
| """ | |
| # Reset reach mult for the day (default neutral); we accumulate per-day, then it's consumed | |
| # by today's posts and any leftover carries over by simply staying at 1.0 next step. | |
| self._pending_reach_mult = 1.0 | |
| self._interaction_violations = [] | |
| summary: Dict[str, Any] = { | |
| "likes_on_others": 0, | |
| "comments_on_others": 0, | |
| "replies_to_audience": 0, | |
| "energy_cost": 0.0, | |
| "reach_modifier": 1.0, | |
| "shadowban_risk": round(self._shadowban_risk, 3), | |
| "loyalty_mult": round(self._engagement_rate_loyalty_mult, 3), | |
| "reward_delta": 0.0, | |
| "violations": [], | |
| "summary": "no_interactions", | |
| } | |
| if interactions is None: | |
| return 0.0, summary | |
| likes = int(interactions.likes_on_others) | |
| comments = int(interactions.comments_on_others) | |
| replies = int(interactions.replies_to_audience) | |
| targets = list(interactions.target_partner_ids or []) | |
| quality = float(interactions.avg_reply_quality) | |
| # 1) Energy cost (paid up front; can push creator below 0.2 -> burnout track). | |
| energy_cost = ( | |
| INTERACT_ENERGY_LIKE * likes | |
| + INTERACT_ENERGY_COMMENT * comments | |
| + INTERACT_ENERGY_REPLY * replies | |
| ) | |
| self._energy = max(0.0, self._energy - energy_cost) | |
| # Determine off-niche share among interaction targets. | |
| off_niche_share = 0.0 | |
| if targets: | |
| off = 0 | |
| for tid in targets: | |
| if self._partner_niche(tid) != self._user_niche: | |
| off += 1 | |
| off_niche_share = off / len(targets) | |
| # 2) Reach buffs (next post engagement multiplier) — only when on-niche and within healthy band. | |
| on_niche_share = 1.0 - off_niche_share | |
| reach_mult = 1.0 | |
| if on_niche_share > 0: | |
| if INTERACT_HEALTHY_LIKES[0] <= likes <= INTERACT_HEALTHY_LIKES[1]: | |
| reach_mult *= 1.0 + INTERACT_LIKE_REACH_BUFF * on_niche_share | |
| if INTERACT_HEALTHY_COMMENTS[0] <= comments <= INTERACT_HEALTHY_COMMENTS[1]: | |
| reach_mult *= 1.0 + INTERACT_COMMENT_REACH_BUFF * on_niche_share | |
| reward_delta = 0.0 | |
| # 3) Reply reward (audience loyalty), scaled by quality. | |
| reply_weight = INTERACT_LOWQ_WEIGHT if quality < INTERACT_LOWQ_THRESHOLD else 1.0 | |
| reply_reward = min( | |
| INTERACT_REPLY_REWARD_CAP, | |
| INTERACT_REPLY_REWARD_PER * replies * quality * reply_weight, | |
| ) | |
| reward_delta += reply_reward | |
| # 4) Penalties — each surfaces a violation string. | |
| # 4a) Spam volume. | |
| if likes > INTERACT_SPAM_LIKES or comments > INTERACT_SPAM_COMMENTS: | |
| reach_mult *= INTERACT_SPAM_REACH_PENALTY | |
| self._shadowban_risk = min(1.0, self._shadowban_risk + INTERACT_SPAM_SHADOWBAN_BUMP) | |
| self._interaction_violations.append( | |
| f"interaction_spam:likes={likes},comments={comments}" | |
| ) | |
| # 4b) Off-niche heavy interaction. | |
| if off_niche_share >= INTERACT_OFFNICHE_THRESHOLD and len(targets) >= 3: | |
| reach_mult *= INTERACT_OFFNICHE_REACH_PENALTY | |
| self._interaction_violations.append( | |
| f"interaction_off_niche:share={off_niche_share:.2f}" | |
| ) | |
| # 4c) Ignoring own audience: expected_replies = K * recent_engagement_proxy (use last day's posts) | |
| prev_day = max(0, self._day - 1) | |
| expected_signal = self._posts_per_day.get(prev_day, 0) # # posts yesterday as a proxy | |
| # Multiply by a small constant so 1 post = 1 expected reply unit floor. | |
| expected_replies = expected_signal * 1.0 | |
| if expected_replies > 0 and replies < INTERACT_IGNORE_THRESHOLD_K * expected_replies * 20: | |
| # Compounding loyalty drop on engagement_rate, capped at 0.5x floor. | |
| self._engagement_rate_loyalty_mult = max( | |
| 0.5, self._engagement_rate_loyalty_mult * INTERACT_IGNORE_LOYALTY_DECAY | |
| ) | |
| self._interaction_violations.append( | |
| f"interaction_ignoring_own:replies={replies}" | |
| ) | |
| # 4d) Low quality replies — already weighted; if extremely low quality, additional penalty. | |
| if replies > 0 and quality < INTERACT_VERY_LOWQ_THRESHOLD: | |
| reward_delta += INTERACT_VERY_LOWQ_PENALTY | |
| self._interaction_violations.append( | |
| f"interaction_low_quality:q={quality:.2f}" | |
| ) | |
| # 4e) Energy: covered upstream; just record if it pushed creator into low-energy zone. | |
| if energy_cost > 0 and self._energy < 0.2: | |
| self._interaction_violations.append( | |
| f"interaction_energy_drain:residual_energy={self._energy:.2f}" | |
| ) | |
| # Cap daily reward_delta to avoid blowing past the per-step [0,1] reward envelope. | |
| reward_delta = max(-INTERACT_DAILY_REWARD_CAP, min(INTERACT_DAILY_REWARD_CAP, reward_delta)) | |
| # Persist computed reach_mult so today's hourly posts pick it up. | |
| self._pending_reach_mult = max(0.5, reach_mult) | |
| # Decay shadowban_risk slightly on quiet days (0 likes & 0 comments). | |
| if likes == 0 and comments == 0: | |
| self._shadowban_risk = max(0.0, self._shadowban_risk - 0.05) | |
| summary.update({ | |
| "likes_on_others": likes, | |
| "comments_on_others": comments, | |
| "replies_to_audience": replies, | |
| "energy_cost": round(energy_cost, 4), | |
| "reach_modifier": round(self._pending_reach_mult, 3), | |
| "shadowban_risk": round(self._shadowban_risk, 3), | |
| "loyalty_mult": round(self._engagement_rate_loyalty_mult, 3), | |
| "off_niche_share": round(off_niche_share, 2), | |
| "reward_delta": round(reward_delta, 4), | |
| "violations": list(self._interaction_violations), | |
| "summary": ( | |
| "spam" if likes > INTERACT_SPAM_LIKES or comments > INTERACT_SPAM_COMMENTS | |
| else "off_niche" if off_niche_share >= INTERACT_OFFNICHE_THRESHOLD and len(targets) >= 3 | |
| else "low_quality" if replies > 0 and quality < INTERACT_VERY_LOWQ_THRESHOLD | |
| else "ignoring_own" if expected_replies > 0 and replies < INTERACT_IGNORE_THRESHOLD_K * expected_replies * 20 | |
| else "healthy" if reward_delta > 0 or reach_mult > 1.0 | |
| else "neutral" | |
| ), | |
| }) | |
| return reward_delta, summary | |
| # ----- engagement signals (Mosseri-aligned) ----- | |
| def _compute_engagement_signals( | |
| self, content_type: str, base_eng: float, intent: Optional[str] | |
| ) -> EngagementSignals: | |
| weights = FORMAT_SIGNAL_WEIGHTS.get(content_type, FORMAT_SIGNAL_WEIGHTS["text_post"]) | |
| signals = {k: base_eng * v for k, v in weights.items()} | |
| if intent and intent in INTENT_MULTIPLIER: | |
| for signal_name, mult in INTENT_MULTIPLIER[intent].items(): | |
| if signal_name in signals: | |
| signals[signal_name] *= mult | |
| return EngagementSignals(**signals) | |
| # ----- tool dispatcher ----- | |
| def _dispatch_tool(self, tool: ToolCall) -> ToolResult: | |
| if tool.name == "query_audience": | |
| seg_id = tool.arguments.get("segment_id", "") | |
| for seg in _AUDIENCE_DATA.get("segments", []): | |
| if seg["id"] == seg_id: | |
| return ToolResult(name=tool.name, data=seg, budget_remaining=self._api_budget) | |
| return ToolResult(name=tool.name, success=False, error=f"unknown segment: {seg_id}", budget_remaining=self._api_budget) | |
| elif tool.name == "query_competitor": | |
| comp_id = tool.arguments.get("competitor_id", "") | |
| window = tool.arguments.get("window_days", 7) | |
| for comp in self._competitors: | |
| if comp.id == comp_id: | |
| posts = [p for p in comp.recent_posts if p["hours_ago"] < window * 24] | |
| return ToolResult(name=tool.name, data={ | |
| "id": comp.id, "name": comp.name, "niche": comp.niche, | |
| "posts_per_week": comp.posts_per_week, | |
| "recent_posts": posts[:10], | |
| "avg_engagement": round(sum(p["engagement"] for p in posts) / max(1, len(posts)), 3), | |
| }, budget_remaining=self._api_budget) | |
| return ToolResult(name=tool.name, success=False, error=f"unknown competitor: {comp_id}", budget_remaining=self._api_budget) | |
| elif tool.name == "query_tag_history": | |
| tag = tool.arguments.get("tag", "").lower() | |
| history = self._tag_history.get(tag, []) | |
| return ToolResult(name=tool.name, data={ | |
| "tag": tag, "uses": len(history), | |
| "avg_signals": _avg_signal_dicts(history[-10:]) if history else {}, | |
| }, budget_remaining=self._api_budget) | |
| elif tool.name == "query_trends": | |
| niche = tool.arguments.get("niche", "tech") | |
| return ToolResult(name=tool.name, data={ | |
| "trending_topics": self._trending_topics, | |
| "trending_tags": self._trending_tags, | |
| "niche_saturation": round(self._calc_niche_saturation(self._last_topic), 3), | |
| }, budget_remaining=self._api_budget) | |
| elif tool.name == "predict_engagement": | |
| raw_actions = tool.arguments.get("scheduled_actions", []) | |
| predicted_total = 0.0 | |
| for sa_dict in raw_actions[:5]: | |
| try: | |
| sa = ScheduledAction(**sa_dict) if isinstance(sa_dict, dict) else sa_dict | |
| except Exception: | |
| continue | |
| if sa.action_type == "post" and sa.content_type: | |
| base = BASE_ENGAGEMENT.get(sa.content_type, 0.3) | |
| reach = REACH_MULT.get(sa.content_type, 1.0) | |
| niche_m = self._get_niche_multiplier(sa.topic) | |
| predicted_total += base * reach * niche_m * self._get_hour_multiplier() | |
| return ToolResult(name=tool.name, data={"predicted_daily_engagement": round(predicted_total, 4)}, budget_remaining=self._api_budget) | |
| elif tool.name == "draft_review": | |
| raw_actions = tool.arguments.get("scheduled_actions", []) | |
| n_posts = sum(1 for a in raw_actions if (a.get("action_type") if isinstance(a, dict) else getattr(a, "action_type", "")) == "post") | |
| feedback = [] | |
| if n_posts == 0: | |
| feedback.append("No posts planned — you'll lose algorithmic momentum.") | |
| elif n_posts > 3: | |
| feedback.append(f"{n_posts} posts in one day risks audience fatigue (optimal: 1-2).") | |
| if n_posts >= 1 and n_posts <= 2: | |
| feedback.append("Good posting frequency for today.") | |
| return ToolResult(name=tool.name, data={"feedback": feedback, "post_count": n_posts}, budget_remaining=self._api_budget) | |
| elif tool.name == "query_creator_pool": | |
| pool = [] | |
| for comp in self._competitors: | |
| ev = self._collab_evaluation(comp.id) | |
| pool.append({ | |
| "id": comp.id, | |
| "name": comp.name, | |
| "niche": comp.niche, | |
| "audience_overlap": ev.get("overlap"), | |
| "mock_followers": ev.get("partner_followers"), | |
| "intersection_size": ev.get("intersection_size"), | |
| "same_niche": ev.get("same_niche"), | |
| "follower_gap_pct": ev.get("follower_gap_pct"), | |
| "recommended": ev.get("recommended"), | |
| "reason": ev.get("reason"), | |
| "expected_eng_mult": ev.get("eng_mult"), | |
| "expected_growth_mult": ev.get("growth_mult"), | |
| "prior_collabs_with_partner": ev.get("prior_count_with_partner"), | |
| "repeat_decay": ev.get("repeat_decay"), | |
| }) | |
| return ToolResult( | |
| name=tool.name, | |
| data={ | |
| "user_niche": self._user_niche, | |
| "user_followers": int(self._followers), | |
| "pool": pool, | |
| }, | |
| budget_remaining=self._api_budget, | |
| ) | |
| elif tool.name == "propose_collab": | |
| partner_id = tool.arguments.get("partner_id", "") | |
| if partner_id not in [c.id for c in self._competitors]: | |
| return ToolResult(name=tool.name, success=False, error=f"unknown partner: {partner_id}", budget_remaining=self._api_budget) | |
| ev = self._collab_evaluation(partner_id) | |
| return ToolResult( | |
| name=tool.name, | |
| data={ | |
| "status": "proposal_accepted" if ev["recommended"] else "proposal_accepted_with_warning", | |
| "partner_id": partner_id, | |
| "recommended": ev["recommended"], | |
| "reason": ev["reason"], | |
| "same_niche": ev["same_niche"], | |
| "audience_overlap": ev["overlap"], | |
| "intersection_size": ev["intersection_size"], | |
| "expected_eng_mult": ev["eng_mult"], | |
| "expected_growth_mult": ev["growth_mult"], | |
| "prior_collabs_with_partner": ev["prior_count_with_partner"], | |
| "repeat_decay": ev["repeat_decay"], | |
| }, | |
| budget_remaining=self._api_budget, | |
| ) | |
| elif tool.name == "query_interaction_norms": | |
| return ToolResult( | |
| name=tool.name, | |
| data={ | |
| "healthy_likes_per_day": list(INTERACT_HEALTHY_LIKES), | |
| "healthy_comments_per_day": list(INTERACT_HEALTHY_COMMENTS), | |
| "spam_threshold_likes": INTERACT_SPAM_LIKES, | |
| "spam_threshold_comments": INTERACT_SPAM_COMMENTS, | |
| "off_niche_share_max": INTERACT_OFFNICHE_THRESHOLD, | |
| "min_reply_quality": INTERACT_LOWQ_THRESHOLD, | |
| "current_shadowban_risk": round(self._shadowban_risk, 3), | |
| "user_niche": self._user_niche, | |
| "expected_replies_per_unit_engagement": INTERACT_IGNORE_THRESHOLD_K, | |
| }, | |
| budget_remaining=self._api_budget, | |
| ) | |
| return ToolResult(name=tool.name, success=False, error=f"unknown tool: {tool.name}", budget_remaining=self._api_budget) | |
| # ----- counterfactual coach ----- | |
| def _compute_coach_feedback(self, agent_engagement: float) -> Dict[str, Any]: | |
| # World-modeling discipline: emit a SCALAR delta only (no optimal_hours leak). | |
| # Agents must use `query_trends` / `predict_engagement` to discover *which* hours | |
| # are optimal — coach only signals "you're above/below the heatmap optimum today". | |
| dow = self._day % 7 | |
| row = _HEATMAP_GRID.get(dow, [1.0] * 24) | |
| best_hours = sorted(range(24), key=lambda h: row[h] if h < len(row) else 0, reverse=True)[:2] | |
| best_base = max(BASE_ENGAGEMENT.values()) | |
| best_reach = max(REACH_MULT.values()) | |
| optimal_eng = sum(row[h] * best_base * best_reach for h in best_hours) | |
| delta = agent_engagement - optimal_eng | |
| return { | |
| "delta": round(delta, 4), | |
| "suggestion": ( | |
| "Above heatmap optimum today." | |
| if delta >= 0 | |
| else "Below heatmap optimum — try `query_trends` / `predict_engagement` to find peak hours." | |
| ), | |
| } | |
| # ----- regulator / judge mode (deterministic, explainable) ----- | |
| def _compute_judge_report( | |
| self, | |
| action: ViraltestAction, | |
| daily_engagement: float, | |
| daily_posts: int, | |
| energy_min: float, | |
| errors: List[str], | |
| ) -> JudgeReport: | |
| violations: List[str] = [] | |
| pc = 1.0 | |
| if daily_posts > 5: | |
| violations.append(f"posts_today={daily_posts} exceeds tier-4 fatigue cliff (Buffer 2.1M)") | |
| pc -= 0.30 | |
| elif daily_posts > 2: | |
| violations.append(f"posts_today={daily_posts} enters fatigue tier (>2/day)") | |
| pc -= 0.10 | |
| if self._total_posts_this_week > WEEKLY_FATIGUE_THRESHOLD: | |
| violations.append(f"weekly posts={self._total_posts_this_week} > {WEEKLY_FATIGUE_THRESHOLD} (Buffer 2.1M cap)") | |
| pc -= 0.20 | |
| if self._collabs_this_month >= 4: | |
| violations.append(f"collab cadence={self._collabs_this_month} net-negative beyond 3 (Cen 2024)") | |
| pc -= 0.20 | |
| if errors: | |
| violations.append(f"plan_errors={len(errors)}") | |
| pc -= 0.05 * len(errors) | |
| if self._hours_since_sleep > 22: | |
| violations.append(f"sleep_debt: {self._hours_since_sleep}h awake (Van Dongen 2003)") | |
| pc -= 0.10 | |
| # Collab guardrail breaches surfaced by _collab_multipliers (forced past block). | |
| for v in self._collab_violations: | |
| violations.append(v) | |
| pc -= 0.10 | |
| # Interaction system violations (spam/off-niche/ignoring/low-quality/energy-drain). | |
| for v in self._interaction_violations: | |
| violations.append(v) | |
| pc -= 0.10 | |
| burnout_pressure = (1.0 - energy_min) * 0.4 + self._sleep_debt * 0.3 + (self._low_energy_days / 5.0) * 0.3 | |
| sustainability_risk = max(0.0, min(1.0, burnout_pressure)) | |
| intents_used = {sa.intent for sa in action.scheduled_actions if sa.intent} | |
| formats_used = {sa.content_type for sa in action.scheduled_actions if sa.action_type == "post" and sa.content_type} | |
| eng_per_post = daily_engagement / max(1, daily_posts) | |
| sq = ( | |
| 0.40 * min(1.0, eng_per_post / 1.2) | |
| + 0.30 * min(1.0, len(intents_used) / 2.0) | |
| + 0.30 * min(1.0, len(formats_used) / 2.0) | |
| ) | |
| explanation = ( | |
| f"compliance={max(0.0, pc):.2f} risk={sustainability_risk:.2f} strategy={sq:.2f} | " | |
| + (("violations: " + "; ".join(violations)) if violations else "no policy violations") | |
| ) | |
| return JudgeReport( | |
| policy_compliance=max(0.0, min(1.0, pc)), | |
| sustainability_risk=sustainability_risk, | |
| strategic_quality=max(0.0, min(1.0, sq)), | |
| explanation=explanation, | |
| violations=violations, | |
| ) | |
| def _compute_headline_metrics(self, grader_score: float) -> HeadlineMetrics: | |
| baseline = HEURISTIC_BASELINE_SCORES.get(self._task, 0.30) | |
| vs_pct = (grader_score - baseline) / baseline if baseline > 0 else 0.0 | |
| spt = grader_score / max(1, self._total_tool_calls) | |
| sp1k = grader_score / max(1.0, self._total_action_chars / 1000.0) | |
| retention: Optional[float] = None | |
| if self._chain_id: | |
| entry = _SHIFT_HISTORY.setdefault(self._chain_id, {}) | |
| label = self._shift_label or "baseline" | |
| entry[label] = grader_score | |
| base = entry.get("baseline") | |
| shifted = entry.get("shifted") | |
| if base is not None and shifted is not None and base > 0: | |
| retention = shifted / base | |
| return HeadlineMetrics( | |
| vs_baseline_pct=round(vs_pct, 4), | |
| score_per_tool_call=round(spt, 4), | |
| score_per_1k_chars=round(sp1k, 4), | |
| retention_under_shift=round(retention, 4) if retention is not None else None, | |
| heuristic_baseline_score=round(baseline, 4), | |
| agent_score=round(grader_score, 4), | |
| total_tool_calls=self._total_tool_calls, | |
| total_action_chars=self._total_action_chars, | |
| ) | |
| # ----- core API ----- | |
| def reset(self, seed: Optional[int] = None, episode_id: Optional[str] = None, **kwargs: Any) -> ViraltestObservation: | |
| self._task = kwargs.get("task", "monthly_engage") | |
| if self._task not in VALID_TASKS: | |
| self._task = "monthly_engage" | |
| self._rng = random.Random(seed if seed is not None else 42) | |
| self._state = State(episode_id=episode_id or str(uuid4()), step_count=0) | |
| self._init_state() | |
| # Optional user-niche override (for collab same/diff niche scenarios). | |
| user_niche_override = kwargs.get("user_niche") | |
| if user_niche_override: | |
| self._user_niche = str(user_niche_override) | |
| self._shift_label = kwargs.get("shift_label") | |
| self._chain_id = kwargs.get("episode_chain_id") | |
| mode = kwargs.get("reward_mode", "combined") | |
| self._reward_mode = mode if mode in ("timing", "content", "combined") else "combined" | |
| if self._chain_id and self._chain_id in _BRAND_STORE: | |
| brand = _BRAND_STORE[self._chain_id] | |
| self._unique_tags_used = set(brand.get("top_tags", [])) | |
| self._unique_content_types = set(brand.get("dominant_types", [])) | |
| self._collab_history = brand.get("collab_history", []) | |
| self._followers = brand.get("followers", INITIAL_FOLLOWERS) | |
| self._initial_followers = self._followers | |
| return self._build_observation(reward=0.0, error=None) | |
| def step(self, action: ViraltestAction, **kwargs: Any) -> ViraltestObservation: | |
| if self._episode_done and self._final_observation is not None: | |
| return self._final_observation | |
| self._state.step_count += 1 | |
| # Store agent notes for echo | |
| if action.notes: | |
| self._agent_notes = action.notes | |
| try: | |
| self._total_action_chars += len(action.model_dump_json()) | |
| except Exception: | |
| pass | |
| tool_results: List[ToolResult] = [] | |
| for tc in action.tool_calls: | |
| result = self._dispatch_tool(tc) | |
| tool_results.append(result) | |
| if result.success: | |
| self._total_tool_calls += 1 | |
| # Process collab proposal (no hard cap; diminishing returns enforced via _collab_multipliers) | |
| self._active_collab = None | |
| self._collab_violations = [] | |
| if action.collab: | |
| self._collabs_this_month += 1 | |
| self._collab_history.append(action.collab.partner_id) | |
| self._active_collab = action.collab | |
| # Process interactions BEFORE the day's hourly loop so energy cost and reach buffs/penalties | |
| # influence the same day's posts. | |
| interaction_reward, interaction_summary = self._process_interactions(action.interactions) | |
| # Validate scheduled actions | |
| schedule: Dict[int, ScheduledAction] = {} | |
| errors: List[str] = [] | |
| for sa in action.scheduled_actions: | |
| if sa.hour < 0 or sa.hour > 23: | |
| errors.append(f"Invalid hour: {sa.hour}") | |
| continue | |
| err = self._validate_scheduled_action(sa) | |
| if err: | |
| errors.append(f"hour {sa.hour}: {err}") | |
| continue | |
| schedule[sa.hour] = sa | |
| # Collab requires a post at collab.hour (energy + post-count come from posting itself). | |
| # If the agent didn't schedule one, auto-inject a co-authored post using the collab's | |
| # content_type. This guarantees collab pays energy and counts as a post — no free multiplier. | |
| if self._active_collab is not None: | |
| chour = self._active_collab.hour | |
| existing = schedule.get(chour) | |
| if existing is None or existing.action_type != "post": | |
| ct = self._active_collab.content_type or "reel" | |
| partner_niche = self._partner_niche(self._active_collab.partner_id) | |
| topic = self._collab_default_topic(partner_niche) | |
| schedule[chour] = ScheduledAction( | |
| hour=chour, action_type="post", content_type=ct, | |
| topic=topic, tags=self._collab_default_tags(partner_niche), | |
| intent="watch_bait" if ct in ("reel", "story") else "save_bait", | |
| ) | |
| daily_engagement = 0.0 | |
| daily_reward = 0.0 | |
| daily_posts = 0 | |
| energy_min = self._energy | |
| burned_out = False | |
| daily_signals = EngagementSignals() | |
| for hour in range(24): | |
| if burned_out: | |
| break | |
| self._hour = hour | |
| if hour in schedule: | |
| sa = schedule[hour] | |
| hourly_eng, hourly_reward, hourly_signals = self._process_hour_action(sa) | |
| else: | |
| hourly_eng, hourly_reward = self._process_hour_rest() | |
| hourly_signals = None | |
| daily_engagement += hourly_eng | |
| daily_reward += hourly_reward | |
| if hourly_eng > 0: | |
| daily_posts += 1 | |
| if hourly_signals: | |
| daily_signals = EngagementSignals( | |
| watch_time=daily_signals.watch_time + hourly_signals.watch_time, | |
| sends_per_reach=daily_signals.sends_per_reach + hourly_signals.sends_per_reach, | |
| saves=daily_signals.saves + hourly_signals.saves, | |
| likes_per_reach=daily_signals.likes_per_reach + hourly_signals.likes_per_reach, | |
| ) | |
| energy_min = min(energy_min, self._energy) | |
| self._advance_competitors() | |
| self._advance_time() | |
| self._energy_history.append(self._energy) | |
| if self._energy <= 0.0: | |
| burned_out = True | |
| # Weekly tracking | |
| self._total_posts_this_week += daily_posts | |
| if self._day % 7 == 0 and self._day > 0: | |
| self._total_posts_this_week = 0 | |
| # Tick down the post-collab algorithm carryover (one day per step). | |
| if self._collab_carryover_days_remaining > 0: | |
| self._collab_carryover_days_remaining -= 1 | |
| # Burnout risk tracking | |
| if energy_min < 0.2: | |
| self._low_energy_days += 1 | |
| else: | |
| self._low_energy_days = max(0, self._low_energy_days - 1) | |
| prev_day = max(0, self._day - 1) | |
| if 1 <= self._posts_per_day.get(prev_day, 0) <= 2: | |
| self._days_with_good_posts.add(prev_day) | |
| # Apply ignored-audience compounding loyalty multiplier into the per-day reward. | |
| avg_reward = (daily_reward / 24.0) + interaction_reward | |
| avg_reward = max(0.0, min(1.0, avg_reward)) | |
| error_str = "; ".join(errors) if errors else None | |
| # Finalize this step's interaction summary on the obs. | |
| self._last_interaction_summary = interaction_summary | |
| done = self._state.step_count >= TASK_HORIZON or self._energy <= 0.0 | |
| coach = self._compute_coach_feedback(daily_engagement) | |
| judge = self._compute_judge_report(action, daily_engagement, daily_posts, energy_min, errors) | |
| if done: | |
| self._episode_done = True | |
| grader_score = self._run_grader() | |
| headline = self._compute_headline_metrics(grader_score) | |
| if self._chain_id: | |
| top_tags = sorted(self._unique_tags_used, key=lambda t: self._tag_performance_avg(t), reverse=True)[:3] | |
| _BRAND_STORE[self._chain_id] = { | |
| "top_tags": list(top_tags), | |
| "dominant_types": list(self._unique_content_types), | |
| "collab_history": self._collab_history[-3:], | |
| "followers": self._followers, | |
| } | |
| self._final_observation = self._build_observation( | |
| reward=round(avg_reward, 4), error=error_str, done=True, | |
| grader_score=grader_score, daily_total_engagement=daily_engagement, | |
| daily_posts_made=daily_posts, daily_energy_min=energy_min, | |
| tool_results=tool_results, engagement_signals=daily_signals, | |
| coach_feedback=coach, judge_report=judge, headline_metrics=headline, | |
| interaction_metrics=interaction_summary, | |
| ) | |
| return self._final_observation | |
| return self._build_observation( | |
| reward=round(avg_reward, 4), error=error_str, | |
| daily_total_engagement=daily_engagement, | |
| daily_posts_made=daily_posts, daily_energy_min=energy_min, | |
| tool_results=tool_results, engagement_signals=daily_signals, | |
| coach_feedback=coach, judge_report=judge, | |
| interaction_metrics=interaction_summary, | |
| ) | |
| def _process_hour_action(self, sa: ScheduledAction) -> Tuple[float, float, Optional[EngagementSignals]]: | |
| engagement = 0.0 | |
| signals = None | |
| collab_growth_mult = 1.0 | |
| collab_spillover_followers = 0 | |
| if sa.action_type == "post": | |
| cost = CONTENT_ENERGY_COST.get(sa.content_type, 0.1) | |
| if self._content_queue > 0: | |
| cost *= 0.5 | |
| self._content_queue -= 1 | |
| if len(self._last_post_types) >= 3 and all(t == sa.content_type for t in self._last_post_types[-3:]): | |
| cost += REPETITION_ENERGY_PENALTY | |
| self._energy = max(0.0, self._energy - cost) | |
| self._unique_content_types.add(sa.content_type) | |
| if self._energy <= 0.0: | |
| engagement = 0.0 | |
| else: | |
| base = BASE_ENGAGEMENT.get(sa.content_type, 0.3) | |
| reach = REACH_MULT.get(sa.content_type, 1.0) | |
| hour_mult = self._get_hour_multiplier() | |
| quality = self._get_quality_modifier() | |
| tag_boost = self._calc_tag_boost(sa.tags) | |
| trending_bonus = 1.5 if self._is_topic_trending(sa.topic) else 1.0 | |
| comp_diff = self._calc_competitor_diff(sa.topic) | |
| fatigue = self._get_fatigue_multiplier() | |
| niche_mult = self._get_niche_multiplier(sa.topic) | |
| n_comp_same_hour = self._count_competitors_same_hour() | |
| saturation_factor = 1.0 / (1.0 + SATURATION_PENALTY_K * n_comp_same_hour) | |
| algo_mult = 1.0 | |
| if self._algorithm_penalty_remaining > 0: | |
| algo_mult = ALGORITHM_PENALTY_MULT | |
| self._algorithm_penalty_remaining -= 1 | |
| engagement = ( | |
| base * reach * hour_mult * quality * tag_boost | |
| * trending_bonus * comp_diff * fatigue * algo_mult | |
| * niche_mult * saturation_factor | |
| ) | |
| # Interaction-driven reach modifier (set by _process_interactions earlier this step). | |
| # Multiplicative on engagement; capped at 0.5 floor inside _process_interactions. | |
| engagement *= getattr(self, "_pending_reach_mult", 1.0) | |
| # Sustained post-collab algorithm reach buff (applies to every post in the carryover window). | |
| if getattr(self, "_collab_carryover_days_remaining", 0) > 0: | |
| engagement *= COLLAB_REACH_CARRYOVER_MULT | |
| collab_spillover_followers = 0 | |
| if self._active_collab is not None and self._active_collab.hour == sa.hour: | |
| ev = self._collab_evaluation(self._active_collab.partner_id) | |
| eng_m, growth_m = self._collab_multipliers(self._active_collab.partner_id) | |
| engagement *= eng_m | |
| collab_growth_mult = growth_m | |
| # One-shot follower spillover: partner audience gets exposed to user. | |
| # Scales with (1 - overlap) — disjoint audiences = more new followers. | |
| # repeat_decay flows through growth_m (already baked in by _collab_evaluation), | |
| # so we don't multiply by it again here. | |
| overlap = ev.get("overlap") or 0.0 | |
| partner_followers = ev.get("partner_followers") or 0 | |
| spill_k = COLLAB_BLOCKED_SPILLOVER_K if not ev.get("recommended") else COLLAB_SPILLOVER_K | |
| collab_spillover_followers = int( | |
| partner_followers * (1.0 - overlap) * growth_m * spill_k | |
| ) | |
| self._last_collab_spillover = collab_spillover_followers | |
| # Arm the post-collab carryover window for the next N days. | |
| self._collab_carryover_days_remaining = COLLAB_REACH_CARRYOVER_DAYS | |
| engagement = min(engagement, 5.0) | |
| signals = self._compute_engagement_signals(sa.content_type, engagement, sa.intent) | |
| self._last_topic = sa.topic | |
| if sa.tags and engagement > 0: | |
| signal_dict = signals.model_dump() if signals else {"total": engagement} | |
| signal_dict["total"] = engagement | |
| for tag in sa.tags: | |
| tag_lower = tag.lower() | |
| self._tag_history[tag_lower].append(signal_dict) | |
| self._unique_tags_used.add(tag_lower) | |
| self._engagement_history.append(engagement) | |
| self._total_engagement += engagement | |
| self._posting_steps += 1 | |
| if self._calc_competitor_diff(sa.topic) >= 1.3: | |
| self._unique_topic_steps += 1 | |
| self._last_post_types.append(sa.content_type) | |
| if len(self._last_post_types) > 3: | |
| self._last_post_types = self._last_post_types[-3:] | |
| self._posts_today += 1 | |
| self._posts_per_day[self._day] += 1 | |
| self._time_since_last_post = 0 | |
| if engagement > 0: | |
| self._followers += int(engagement * 100 * collab_growth_mult) | |
| if collab_spillover_followers > 0: | |
| self._followers += collab_spillover_followers | |
| elif sa.action_type == "create_content": | |
| self._energy = max(0.0, self._energy - CREATE_CONTENT_COST) | |
| self._content_queue += 1 | |
| self._time_since_last_post += 1 | |
| if self._time_since_last_post >= FOLLOWER_DECAY_HOURS: | |
| self._followers = max(0, self._followers - int(self._followers * 0.005)) | |
| if self._algorithm_penalty_remaining == 0: | |
| gap_days = self._time_since_last_post // 24 | |
| self._algorithm_penalty_remaining = ALGORITHM_PENALTY_BASE_DURATION + gap_days | |
| reward = 0.0 if self._energy <= 0.0 else self._compute_hourly_reward(sa, engagement) | |
| return engagement, reward, signals | |
| def _process_hour_rest(self) -> Tuple[float, float]: | |
| self._energy = min(1.0, self._energy + REST_RECOVERY) | |
| self._hours_since_sleep = max(0, self._hours_since_sleep - SLEEP_RECOVERY_PER_REST) | |
| self._sleep_debt = max(0.0, self._sleep_debt - 0.1) | |
| self._time_since_last_post += 1 | |
| if self._time_since_last_post >= FOLLOWER_DECAY_HOURS: | |
| self._followers = max(0, self._followers - int(self._followers * 0.005)) | |
| if self._algorithm_penalty_remaining == 0: | |
| gap_days = self._time_since_last_post // 24 | |
| self._algorithm_penalty_remaining = ALGORITHM_PENALTY_BASE_DURATION + gap_days | |
| reward = 0.0 if self._energy <= 0.0 else self._compute_rest_reward() | |
| return 0.0, reward | |
| def state(self) -> State: | |
| return self._state | |
| def _validate_scheduled_action(self, sa: ScheduledAction) -> Optional[str]: | |
| if sa.action_type not in ("post", "create_content"): | |
| return f"Invalid action_type: {sa.action_type}" | |
| if sa.action_type == "post": | |
| if not sa.content_type: | |
| return "content_type is required when posting" | |
| if sa.content_type not in CONTENT_ENERGY_COST: | |
| return f"Invalid content_type: {sa.content_type}" | |
| if not sa.topic or not sa.topic.strip(): | |
| return "topic is required when posting" | |
| if len(sa.topic) > 200: | |
| return "topic must be <= 200 characters" | |
| if sa.tags: | |
| valid = [t for t in sa.tags if t.lower() in [tp.lower() for tp in TAG_POOL]] | |
| sa.tags = valid if valid else None | |
| return None | |
| def _is_topic_trending(self, topic: Optional[str]) -> bool: | |
| if not topic: | |
| return False | |
| t_words = set(topic.lower().split()) - TREND_MATCH_STOPWORDS | |
| if not t_words: | |
| return False | |
| for trend in self._trending_topics: | |
| if t_words & (set(trend.lower().split()) - TREND_MATCH_STOPWORDS): | |
| return True | |
| return False | |
| # ----- reward ----- | |
| def _compute_hourly_reward(self, sa: ScheduledAction, engagement: float) -> float: | |
| if self._reward_mode == "timing": | |
| return self._compute_timing_reward(sa, engagement) | |
| if self._reward_mode == "content": | |
| return self._compute_content_reward(sa, engagement) | |
| return self._compute_combined_reward(sa, engagement) | |
| def _energy_component(self) -> float: | |
| prev_energy = self._energy_history[-2] if len(self._energy_history) >= 2 else 1.0 | |
| energy_delta = self._energy - prev_energy | |
| return max(0.0, min(1.0, (energy_delta + 0.3) / 0.6)) | |
| def _consistency_score(self) -> float: | |
| day_posts = self._posts_per_day.get(self._day, 0) | |
| if 1 <= day_posts <= 2: | |
| return 1.0 | |
| if day_posts == 0 or day_posts == 3: | |
| return 0.5 | |
| return 0.0 | |
| def _compute_combined_reward(self, sa: ScheduledAction, engagement: float) -> float: | |
| eng_component = min(1.0, engagement / 2.0) * 0.3 | |
| energy_component = self._energy_component() * 0.15 | |
| consistency_component = self._consistency_score() * 0.15 | |
| tag_component = 0.0 | |
| if sa.action_type == "post" and sa.tags: | |
| trending_match = sum(1 for t in sa.tags if t.lower() in self._trending_tags) / 5.0 | |
| tag_component = min(1.0, trending_match + 0.3) * 0.15 | |
| comp_component = 0.0 | |
| if sa.action_type == "post": | |
| diff = self._calc_competitor_diff(sa.topic) | |
| comp_component = min(1.0, diff / 1.3) * 0.15 | |
| burnout_penalty = 0.1 if self._energy < 0.2 else 0.0 | |
| is_post = sa.action_type == "post" | |
| trending_topic_mult = 1.5 if is_post and self._is_topic_trending(sa.topic) else 1.0 | |
| peak_hour_mult = 1.3 if is_post and self._get_hour_multiplier() >= 1.2 else 1.0 | |
| raw = ( | |
| (eng_component + tag_component + comp_component) * trending_topic_mult * peak_hour_mult | |
| + energy_component + consistency_component - burnout_penalty | |
| ) | |
| return max(0.0, min(1.0, raw)) | |
| def _compute_timing_reward(self, sa: ScheduledAction, engagement: float) -> float: | |
| is_post = sa.action_type == "post" | |
| peak_hour_mult = 1.3 if is_post and self._get_hour_multiplier() >= 1.2 else 1.0 | |
| trending_topic_mult = 1.5 if is_post and self._is_topic_trending(sa.topic) else 1.0 | |
| eng_component = min(1.0, engagement / 2.0) * 0.40 * trending_topic_mult * peak_hour_mult | |
| peak_bonus = min(1.0, self._get_hour_multiplier() / 1.3) if is_post else 0.0 | |
| peak_component = peak_bonus * 0.20 | |
| energy_component = self._energy_component() * 0.20 | |
| consistency_component = self._consistency_score() * 0.20 | |
| burnout_penalty = 0.1 if self._energy < 0.2 else 0.0 | |
| raw = eng_component + peak_component + energy_component + consistency_component - burnout_penalty | |
| return max(0.0, min(1.0, raw)) | |
| def _compute_content_reward(self, sa: ScheduledAction, engagement: float) -> float: | |
| is_post = sa.action_type == "post" | |
| trending_topic_mult = 1.5 if is_post and self._is_topic_trending(sa.topic) else 1.0 | |
| eng_component = min(1.0, engagement / 2.0) * 0.20 * trending_topic_mult | |
| tag_component = 0.0 | |
| if is_post and sa.tags: | |
| trending_match = sum(1 for t in sa.tags if t.lower() in self._trending_tags) / 5.0 | |
| tag_component = min(1.0, trending_match + 0.3) * 0.25 | |
| comp_component = 0.0 | |
| if is_post: | |
| diff = self._calc_competitor_diff(sa.topic) | |
| comp_component = min(1.0, diff / 1.3) * 0.25 | |
| variety_component = 0.0 | |
| intent_component = 0.0 | |
| if is_post: | |
| variety_component = min(1.0, len(self._unique_content_types) / 4.0) * 0.15 | |
| intent_component = (0.15 if sa.intent in INTENT_MULTIPLIER else 0.0) | |
| burnout_penalty = 0.05 if self._energy < 0.2 else 0.0 | |
| raw = eng_component + tag_component + comp_component + variety_component + intent_component - burnout_penalty | |
| return max(0.0, min(1.0, raw)) | |
| def _compute_rest_reward(self) -> float: | |
| energy_component = self._energy_component() * 0.15 | |
| consistency_component = self._consistency_score() * 0.15 | |
| burnout_penalty = 0.1 if self._energy < 0.2 else 0.0 | |
| raw = energy_component + consistency_component - burnout_penalty | |
| if self._reward_mode == "content": | |
| raw *= 0.5 | |
| return max(0.0, min(1.0, raw)) | |
| def _advance_time(self) -> None: | |
| self._hour += 1 | |
| self._hours_since_sleep += 1 | |
| if self._hours_since_sleep > SLEEP_ENERGY_DRAIN_START: | |
| hours_over = self._hours_since_sleep - SLEEP_ENERGY_DRAIN_START | |
| drain = SLEEP_ENERGY_DRAIN_RATE * (1 + hours_over * 0.1) | |
| self._energy = max(0.0, self._energy - drain) | |
| if self._hours_since_sleep > SLEEP_OPTIMAL_AWAKE: | |
| hours_over = self._hours_since_sleep - SLEEP_OPTIMAL_AWAKE | |
| debt_rate = 0.01 * (1 + hours_over * 0.05) | |
| self._sleep_debt = min(1.0, self._sleep_debt + debt_rate) | |
| if self._hour >= 24: | |
| self._hour = 0 | |
| self._day += 1 | |
| self._posts_today = 0 | |
| self._rotate_trends() | |
| def _build_observation( | |
| self, reward: float, error: Optional[str], done: bool = False, | |
| grader_score: Optional[float] = None, | |
| daily_total_engagement: float = 0.0, daily_posts_made: int = 0, | |
| daily_energy_min: float = 1.0, | |
| tool_results: Optional[List[ToolResult]] = None, | |
| engagement_signals: Optional[EngagementSignals] = None, | |
| coach_feedback: Optional[Dict[str, Any]] = None, | |
| judge_report: Optional[JudgeReport] = None, | |
| headline_metrics: Optional[HeadlineMetrics] = None, | |
| interaction_metrics: Optional[Dict[str, Any]] = None, | |
| ) -> ViraltestObservation: | |
| recent_eng = self._engagement_history[-10:] if self._engagement_history else [] | |
| eng_rate = sum(recent_eng) / len(recent_eng) if recent_eng else 0.0 | |
| eng_rate *= getattr(self, "_engagement_rate_loyalty_mult", 1.0) | |
| meta: Dict[str, Any] = {"step": self._state.step_count, "task": self._task} | |
| if grader_score is not None: | |
| meta["grader_score"] = round(grader_score, 4) | |
| audience_hours: set = set() | |
| for seg in _AUDIENCE_DATA.get("segments", []): | |
| audience_hours.update(seg.get("active_hours", [])) | |
| meta["audience_active_hours"] = sorted(audience_hours) | |
| comp_hours = [ | |
| (self._hour - p["hours_ago"]) % 24 | |
| for comp in self._competitors | |
| for p in comp.recent_posts | |
| if p["hours_ago"] < 48 | |
| ] | |
| meta["competitor_recent_post_hours"] = sorted(comp_hours) | |
| burnout_risk = min(1.0, self._low_energy_days / 5.0) | |
| return ViraltestObservation( | |
| current_hour=self._hour, | |
| day_of_week=self._day % 7, | |
| days_elapsed=self._day, | |
| creator_energy=round(self._energy, 3), | |
| hours_since_sleep=self._hours_since_sleep, | |
| sleep_debt=round(self._sleep_debt, 3), | |
| follower_count=self._followers, | |
| engagement_rate=round(eng_rate, 4), | |
| posts_today=self._posts_today, | |
| time_since_last_post=self._time_since_last_post, | |
| content_queue_size=self._content_queue, | |
| last_post_type=self._last_post_types[-1] if self._last_post_types else "none", | |
| burnout_risk=round(burnout_risk, 3), | |
| daily_total_engagement=round(daily_total_engagement, 4), | |
| daily_posts_made=daily_posts_made, | |
| daily_energy_min=round(daily_energy_min, 3), | |
| engagement_signals=engagement_signals, | |
| coach_feedback=coach_feedback, | |
| judge_report=judge_report, | |
| headline_metrics=headline_metrics, | |
| tool_results=tool_results or [], | |
| agent_notes=self._agent_notes, | |
| api_budget_remaining=self._api_budget, | |
| grader_score=round(grader_score, 4) if grader_score is not None else None, | |
| error=error, | |
| done=done, | |
| reward=round(reward, 4), | |
| metadata=meta, | |
| interaction_metrics=interaction_metrics, | |
| ) | |
| # ----- graders (monthly) ----- | |
| def _run_grader(self) -> float: | |
| if self._task == "monthly_engage": | |
| return self._grade_monthly_engage() | |
| elif self._task == "monthly_strategic": | |
| return self._grade_monthly_strategic() | |
| elif self._task == "monthly_competitive": | |
| return self._grade_monthly_competitive() | |
| return 0.0 | |
| def _theoretical_max_engagement(self) -> float: | |
| # Buffer 2.1M (RESEARCH.md): 3–5 posts/week doubles follower growth vs 1–2, | |
| # diminishing returns above 5/week, 20–35% engagement drop per post above 7/week. | |
| # Cap at 5 posts/week × 4 weeks = 20 posts/month (sweet-spot, no fatigue penalty). | |
| best_base = max(BASE_ENGAGEMENT.values()) | |
| best_reach = max(REACH_MULT.values()) | |
| best_niche = max(_NICHE_MULTIPLIERS.values()) if _NICHE_MULTIPLIERS else 1.0 | |
| posts_per_week = 5 | |
| weeks_in_horizon = TASK_HORIZON / 7.0 | |
| total_posts = int(round(posts_per_week * weeks_in_horizon)) | |
| avg_heatmap_peak = 1.0 | |
| if _HEATMAP_GRID: | |
| day_peaks = [ | |
| max(row) if row else 1.0 | |
| for row in _HEATMAP_GRID.values() | |
| ] | |
| avg_heatmap_peak = sum(day_peaks) / len(day_peaks) if day_peaks else 1.0 | |
| # Trending + tag uplifts: tier-1 industry data shows ~1.2-1.3x for trending topics | |
| # and ~1.05-1.15x for high-performance tags. Mid-range used to avoid headroom inflation. | |
| trending_bonus = 1.25 | |
| tag_boost = 1.1 | |
| per_post = ( | |
| best_base * best_reach * best_niche | |
| * avg_heatmap_peak * trending_bonus * tag_boost | |
| ) | |
| return per_post * total_posts | |
| def _grade_monthly_engage(self) -> float: | |
| theoretical_max = self._theoretical_max_engagement() | |
| if theoretical_max <= 0: | |
| return 0.0 | |
| raw = min(1.0, self._total_engagement / theoretical_max) | |
| if self._energy <= 0.0: | |
| raw *= 0.3 | |
| return raw | |
| def _grade_monthly_strategic(self) -> float: | |
| if self._energy <= 0.0: | |
| return max(0.0, min(0.15, self._total_engagement * 0.01)) | |
| theoretical_max = self._theoretical_max_engagement() | |
| norm_eng = min(1.0, self._total_engagement / theoretical_max) if theoretical_max > 0 else 0.0 | |
| positive_tags = sum(1 for t in self._unique_tags_used if self._tag_performance_avg(t) > 0) | |
| tag_discovery = min(1.0, positive_tags / TAG_DISCOVERY_POSITIVE_TARGET) | |
| top_perfs = sorted([self._tag_performance_avg(t) for t in self._unique_tags_used], reverse=True)[:3] | |
| tag_exploitation = (sum(top_perfs) / len(top_perfs)) if top_perfs else 0.0 | |
| tag_exploitation = min(1.0, tag_exploitation / 2.0) | |
| tag_score = 0.4 * tag_discovery + 0.6 * tag_exploitation | |
| avg_energy = sum(self._energy_history) / len(self._energy_history) if self._energy_history else 0.0 | |
| consistency = len(self._days_with_good_posts) / float(max(1, TASK_HORIZON)) | |
| raw = 0.35 * norm_eng + 0.25 * tag_score + 0.25 * avg_energy + 0.15 * consistency | |
| min_energy = min(self._energy_history) if self._energy_history else 0.0 | |
| if min_energy < 0.2: | |
| raw *= 0.4 | |
| elif min_energy < 0.3: | |
| raw = min(raw, 0.45) | |
| if len(self._unique_tags_used) < 5: | |
| raw *= 0.7 | |
| return max(0.0, min(1.0, raw)) | |
| def _grade_monthly_competitive(self) -> float: | |
| if self._energy <= 0.0: | |
| return 0.0 | |
| theoretical_max = self._theoretical_max_engagement() | |
| norm_eng = min(1.0, self._total_engagement / theoretical_max) if theoretical_max > 0 else 0.0 | |
| positive_tags = sum(1 for t in self._unique_tags_used if self._tag_performance_avg(t) > 0) | |
| tag_discovery = min(1.0, positive_tags / TAG_DISCOVERY_POSITIVE_TARGET) | |
| top_perfs = sorted([self._tag_performance_avg(t) for t in self._unique_tags_used], reverse=True)[:3] | |
| tag_exploitation = (sum(top_perfs) / len(top_perfs)) if top_perfs else 0.0 | |
| tag_exploitation = min(1.0, tag_exploitation / 2.0) | |
| tag_score = 0.4 * tag_discovery + 0.6 * tag_exploitation | |
| growth = (self._followers - self._initial_followers) / self._initial_followers if self._initial_followers > 0 else 0.0 | |
| target_growth = 0.04 | |
| norm_growth = min(1.0, max(0.0, growth / target_growth)) | |
| comp_avg = self._get_competitor_avg_engagement() | |
| my_avg = self._total_engagement / self._posting_steps if self._posting_steps > 0 else 0.0 | |
| outperformance = my_avg / comp_avg if comp_avg > 0 else 1.0 | |
| norm_outperformance = min(1.0, outperformance / 1.5) | |
| differentiation = self._unique_topic_steps / self._posting_steps if self._posting_steps > 0 else 0.0 | |
| min_energy = min(self._energy_history) if self._energy_history else 0.0 | |
| energy_floor = min(1.0, max(0.0, min_energy)) | |
| raw = ( | |
| 0.25 * norm_eng + 0.20 * tag_score + 0.20 * norm_growth | |
| + 0.15 * norm_outperformance + 0.10 * differentiation + 0.10 * energy_floor | |
| ) | |
| if len(self._unique_content_types) < 3: | |
| raw *= 0.5 | |
| if len(self._unique_tags_used) < 8: | |
| raw *= 0.7 | |
| return max(0.0, min(1.0, raw)) | |
| def get_peak_hours(day_of_week: int, top_k: int = 2) -> List[int]: | |
| row = _HEATMAP_GRID.get(day_of_week % 7, []) | |
| if not row: | |
| return [] | |
| return sorted(range(len(row)), key=lambda h: row[h], reverse=True)[:top_k] | |
| def _topic_overlap(topic_a: str, topic_b: str) -> bool: | |
| words_a = set(topic_a.split()) | |
| words_b = set(topic_b.split()) | |
| if not words_a or not words_b: | |
| return False | |
| common = words_a & words_b | |
| return len(common) / min(len(words_a), len(words_b)) >= 0.5 | |
| def _avg_signal_dicts(dicts: List[Dict[str, float]]) -> Dict[str, float]: | |
| if not dicts: | |
| return {} | |
| keys = set() | |
| for d in dicts: | |
| keys.update(d.keys()) | |
| result = {} | |
| for k in keys: | |
| vals = [d.get(k, 0.0) for d in dicts] | |
| result[k] = round(sum(vals) / len(vals), 4) | |
| return result | |