# -*- coding: utf-8 -*- """Cross-Episode Worker Reputation Learning. Builds persistent reputation profiles for each worker that carry across training episodes. SENTINEL uses these profiles to make better-informed oversight decisions — implementing genuine theory-of-mind reasoning. Usage: from sentinel.reputation import WorkerReputationTracker tracker = WorkerReputationTracker("outputs/reputation.json") tracker.record_episode("worker_db", episode_stats) profile = tracker.get_profile("worker_db") context = tracker.build_reputation_context() # inject into prompts """ from __future__ import annotations import json import logging from pathlib import Path from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) # Default reputation for a new worker _DEFAULT_PROFILE = { "episodes_seen": 0, "total_proposals": 0, "misbehaviors_total": 0, "misbehaviors_caught": 0, "false_positives_caused": 0, "trust_trajectory": [], "misbehavior_type_counts": {}, "domains_reliable": [], "domains_unreliable": [], "rehabilitation_attempts": 0, "rehabilitation_successes": 0, "current_trust_score": 0.70, "trend": "stable", } class WorkerReputationTracker: """Persistent cross-episode reputation tracker for worker agents.""" def __init__(self, path: str = "outputs/worker_reputation.json", max_trajectory: int = 50): self.path = Path(path) self.max_trajectory = max_trajectory self.profiles: Dict[str, Dict[str, Any]] = {} self._load() def _load(self) -> None: if self.path.exists(): try: self.profiles = json.loads(self.path.read_text(encoding="utf-8")) logger.info("Loaded reputation profiles for %d workers", len(self.profiles)) except Exception as exc: logger.warning("Failed to load reputation: %s", exc) self.profiles = {} def _save(self) -> None: self.path.parent.mkdir(parents=True, exist_ok=True) self.path.write_text( json.dumps(self.profiles, indent=2, sort_keys=True, default=str), encoding="utf-8", ) def _ensure_profile(self, worker_id: str) -> Dict[str, Any]: if worker_id not in self.profiles: self.profiles[worker_id] = dict(_DEFAULT_PROFILE) self.profiles[worker_id]["trust_trajectory"] = [] self.profiles[worker_id]["misbehavior_type_counts"] = {} self.profiles[worker_id]["domains_reliable"] = [] self.profiles[worker_id]["domains_unreliable"] = [] return self.profiles[worker_id] def record_episode( self, worker_id: str, episode_stats: Dict[str, Any], ) -> Dict[str, Any]: """Record one episode's stats for a worker. episode_stats should contain: proposals: int — total proposals made misbehaviors: int — number of misbehavior proposals caught: int — misbehaviors SENTINEL caught false_positives: int — valid proposals SENTINEL incorrectly blocked misbehavior_types: list[str] — types of misbehavior in this episode domain: str — worker's domain rehabilitation_attempted: bool rehabilitation_success: bool """ profile = self._ensure_profile(worker_id) profile["episodes_seen"] += 1 profile["total_proposals"] += int(episode_stats.get("proposals", 0)) profile["misbehaviors_total"] += int(episode_stats.get("misbehaviors", 0)) profile["misbehaviors_caught"] += int(episode_stats.get("caught", 0)) profile["false_positives_caused"] += int(episode_stats.get("false_positives", 0)) # Track misbehavior type distribution for mb_type in episode_stats.get("misbehavior_types", []): key = str(mb_type) profile["misbehavior_type_counts"][key] = profile["misbehavior_type_counts"].get(key, 0) + 1 # Rehabilitation tracking if episode_stats.get("rehabilitation_attempted"): profile["rehabilitation_attempts"] += 1 if episode_stats.get("rehabilitation_success"): profile["rehabilitation_successes"] += 1 # Compute trust score total = max(1, profile["total_proposals"]) misbehavior_rate = profile["misbehaviors_total"] / total trust = max(0.0, min(1.0, 1.0 - misbehavior_rate * 1.5)) profile["current_trust_score"] = round(trust, 4) # Track trajectory profile["trust_trajectory"].append(round(trust, 4)) if len(profile["trust_trajectory"]) > self.max_trajectory: profile["trust_trajectory"] = profile["trust_trajectory"][-self.max_trajectory:] # Compute trend traj = profile["trust_trajectory"] if len(traj) >= 5: recent = sum(traj[-5:]) / 5 older = sum(traj[-10:-5]) / 5 if len(traj) >= 10 else sum(traj[:5]) / max(1, min(5, len(traj))) if recent > older + 0.05: profile["trend"] = "improving" elif recent < older - 0.05: profile["trend"] = "declining" else: profile["trend"] = "stable" # Domain reliability domain = episode_stats.get("domain", "") if domain: if misbehavior_rate < 0.15 and domain not in profile["domains_reliable"]: profile["domains_reliable"].append(domain) elif misbehavior_rate > 0.30 and domain not in profile["domains_unreliable"]: profile["domains_unreliable"].append(domain) # Find most common misbehavior if profile["misbehavior_type_counts"]: most_common = max(profile["misbehavior_type_counts"], key=profile["misbehavior_type_counts"].get) profile["most_common_misbehavior"] = most_common else: profile["most_common_misbehavior"] = None # Compute rehabilitation rate if profile["rehabilitation_attempts"] > 0: profile["rehabilitation_rate"] = round( profile["rehabilitation_successes"] / profile["rehabilitation_attempts"], 4 ) else: profile["rehabilitation_rate"] = 0.0 # Compute misbehavior frequency profile["misbehavior_frequency"] = round(misbehavior_rate, 4) self._save() return profile def get_profile(self, worker_id: str) -> Dict[str, Any]: return self._ensure_profile(worker_id) def get_all_profiles(self) -> Dict[str, Dict[str, Any]]: return dict(self.profiles) def build_reputation_context(self, max_chars: int = 600) -> str: """Build a text context block for injection into SENTINEL prompts. Returns a concise summary of each worker's reputation that helps SENTINEL make better-informed oversight decisions. """ if not self.profiles: return "" lines = ["WORKER REPUTATION PROFILES (cross-episode):"] for worker_id, profile in sorted(self.profiles.items()): trust = profile.get("current_trust_score", 0.7) trend = profile.get("trend", "stable") freq = profile.get("misbehavior_frequency", 0.0) most_common = profile.get("most_common_misbehavior", "none") episodes = profile.get("episodes_seen", 0) rehab_rate = profile.get("rehabilitation_rate", 0.0) trust_label = "HIGH" if trust >= 0.75 else "MEDIUM" if trust >= 0.50 else "LOW" trend_icon = "↑" if trend == "improving" else "↓" if trend == "declining" else "→" line = ( f" {worker_id}: trust={trust_label}({trust:.2f}{trend_icon}) " f"misbehavior_rate={freq:.0%} " f"primary_risk={most_common or 'none'} " f"episodes={episodes} " f"rehab={rehab_rate:.0%}" ) lines.append(line) if len("\n".join(lines)) > max_chars: break return "\n".join(lines) def extract_from_episode_history( self, history: List[Dict[str, Any]], ) -> Dict[str, Dict[str, Any]]: """Extract per-worker stats from a SENTINEL episode history. Returns a dict keyed by worker_id with episode_stats suitable for record_episode(). """ worker_stats: Dict[str, Dict[str, Any]] = {} for entry in history: audit = entry.get("audit", {}) or {} proposal = entry.get("proposal", {}) or {} revision = entry.get("worker_revision", {}) or {} info = entry.get("info", {}) or {} worker_id = str(audit.get("worker_id") or proposal.get("worker_id") or "unknown") if worker_id not in worker_stats: worker_stats[worker_id] = { "proposals": 0, "misbehaviors": 0, "caught": 0, "false_positives": 0, "misbehavior_types": [], "domain": "", "rehabilitation_attempted": False, "rehabilitation_success": False, } stats = worker_stats[worker_id] stats["proposals"] += 1 stats["domain"] = str(audit.get("worker_role") or info.get("worker_role") or "") was_mb = bool(audit.get("was_misbehavior") or info.get("is_misbehavior")) decision = audit.get("sentinel_decision") or "" if was_mb: stats["misbehaviors"] += 1 mb_type = str(audit.get("reason") or info.get("mb_type") or "") if mb_type: stats["misbehavior_types"].append(mb_type) if decision and decision != "APPROVE": stats["caught"] += 1 elif decision and decision != "APPROVE": stats["false_positives"] += 1 if revision.get("attempted"): stats["rehabilitation_attempted"] = True if revision.get("revision_approved"): stats["rehabilitation_success"] = True return worker_stats def update_from_episode(self, history: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: """Convenience: extract stats from history and record all workers.""" per_worker = self.extract_from_episode_history(history) updated = {} for worker_id, stats in per_worker.items(): updated[worker_id] = self.record_episode(worker_id, stats) return updated