""" PhD Research OS — ECC Harness Orchestrator (agent_os.py) ========================================================= The meta-system for spawning, managing, and auditing companion AI agents that improve the core Research OS brain. Implements the ECC Harness: Principal Architect Edition (V-SINGULARITY) - §0: Global Objective Function (correctness > blast radius > simplicity > NFRs > no-op) - §1: Pre-Flight (context loading, knowledge boundaries, assumption logging) - §2: Planning (obviousness test, reversibility, idempotence, confidence signaling) - §3: Execution (JIT verification, cognitive budget, failure modes, scope containment) - §4: Post-Flight (validation, pedagogical handoff, definition of done, meta-learning) WAKE-UP ROUTINE: Before any task, this module reads ARCHITECTURE.md and AGENTS.md to ground itself in the project map. This is non-negotiable. """ import json import os import sqlite3 import uuid import time from datetime import datetime, timezone from typing import Optional, Callable from dataclasses import dataclass, field, asdict from pathlib import Path from enum import Enum from .db import get_db, init_db, now_iso, gen_id, to_fixed, from_fixed # ============================================================ # ECC Lifecycle States # ============================================================ class AgentState(Enum): """ECC Harness lifecycle states for companion agents.""" SPAWNED = "spawned" # Created, not yet active PREFLIGHT = "preflight" # §1: Loading context, validating assumptions PLANNING = "planning" # §2: Building execution plan EXECUTING = "executing" # §3: Running bounded task POSTFLIGHT = "postflight" # §4: Validating results, logging decisions COMPLETED = "completed" # Task done successfully HALTED = "halted" # Kill heuristic triggered or error RETIRED = "retired" # Agent decommissioned class ProposalStatus(Enum): PROPOSED = "proposed" APPROVED = "approved" REJECTED = "rejected" APPLIED = "applied" class RiskLevel(Enum): LOW = "low" MEDIUM = "medium" HIGH = "high" # ============================================================ # Data Structures # ============================================================ @dataclass class Proposal: """ A companion agent's proposed change to the Research OS. ALL companion output goes through proposals — never direct modification. """ proposal_id: str agent_id: str proposal_type: str # prompt_change, training_data, confidence_adjustment, new_claim, architecture_change description: str changes: dict evidence: str estimated_impact: dict # {"metric": str, "expected_delta": float} risk_assessment: str # low, medium, high reversible: bool status: str = "proposed" created_at: str = "" reviewed_at: str = "" reviewed_by: str = "" rejection_reason: str = "" def to_dict(self): return asdict(self) def to_json(self): return json.dumps(self.to_dict(), indent=2) @dataclass class AuditEntry: """Immutable audit log entry. Every agent action is recorded.""" entry_id: str agent_id: str phase: str # preflight, planning, executing, postflight action: str # what was done details: str # specifics confidence: float # agent's self-assessed confidence [0,1] timestamp: str deviation: str = "" # if deviating from rules, document why @dataclass class AgentTask: """A bounded task assigned to a companion agent.""" task_id: str agent_id: str description: str state: str = "preflight" plan: str = "" # JSON execution plan result: str = "" # JSON result iterations_used: int = 0 max_iterations: int = 3 # §3 iteration budget time_budget_s: int = 3600 # default 1 hour started_at: str = "" completed_at: str = "" kill_reason: str = "" # ============================================================ # Database Extension (adds companion agent tables) # ============================================================ def init_agent_os_db(db_path: str = None): """Extend the Research OS database with companion agent tables.""" # First ensure base tables exist init_db(db_path) conn = get_db(db_path) conn.executescript(""" CREATE TABLE IF NOT EXISTS companion_agents ( agent_id TEXT PRIMARY KEY, agent_type TEXT NOT NULL, purpose TEXT NOT NULL, system_prompt TEXT NOT NULL, state TEXT NOT NULL DEFAULT 'spawned', config TEXT, -- JSON: model, temperature, etc. created_at TEXT NOT NULL, retired_at TEXT, total_tasks_completed INTEGER DEFAULT 0, total_proposals_made INTEGER DEFAULT 0, schema_version TEXT NOT NULL DEFAULT '1.0' ); CREATE TABLE IF NOT EXISTS agent_tasks ( task_id TEXT PRIMARY KEY, agent_id TEXT NOT NULL, description TEXT NOT NULL, state TEXT NOT NULL DEFAULT 'preflight', plan TEXT, -- JSON execution plan result TEXT, -- JSON result iterations_used INTEGER DEFAULT 0, max_iterations INTEGER DEFAULT 3, time_budget_s INTEGER DEFAULT 3600, started_at TEXT, completed_at TEXT, kill_reason TEXT, schema_version TEXT NOT NULL DEFAULT '1.0', FOREIGN KEY(agent_id) REFERENCES companion_agents(agent_id) ); CREATE TABLE IF NOT EXISTS proposals ( proposal_id TEXT PRIMARY KEY, agent_id TEXT NOT NULL, task_id TEXT, proposal_type TEXT NOT NULL, description TEXT NOT NULL, changes TEXT NOT NULL, -- JSON evidence TEXT, estimated_impact TEXT, -- JSON risk_assessment TEXT DEFAULT 'low', reversible INTEGER DEFAULT 1, status TEXT DEFAULT 'proposed', created_at TEXT NOT NULL, reviewed_at TEXT, reviewed_by TEXT, rejection_reason TEXT, schema_version TEXT NOT NULL DEFAULT '1.0', FOREIGN KEY(agent_id) REFERENCES companion_agents(agent_id), FOREIGN KEY(task_id) REFERENCES agent_tasks(task_id) ); CREATE TABLE IF NOT EXISTS agent_audit_log ( entry_id TEXT PRIMARY KEY, agent_id TEXT NOT NULL, task_id TEXT, phase TEXT NOT NULL, action TEXT NOT NULL, details TEXT, confidence REAL, deviation TEXT, timestamp TEXT NOT NULL, FOREIGN KEY(agent_id) REFERENCES companion_agents(agent_id) ); CREATE TABLE IF NOT EXISTS harness_evolution ( id INTEGER PRIMARY KEY AUTOINCREMENT, rule_section TEXT NOT NULL, amendment TEXT NOT NULL, reason TEXT NOT NULL, proposed_by TEXT NOT NULL, timestamp TEXT NOT NULL, approved INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS memory_store ( key TEXT PRIMARY KEY, value TEXT NOT NULL, last_validated TEXT NOT NULL, category TEXT DEFAULT 'assumption' ); """) conn.commit() conn.close() # ============================================================ # Companion Agent Definition # ============================================================ # Pre-built companion types with their system prompts COMPANION_TYPES = { "DataQualityAuditor": { "purpose": "Audit claim extraction quality, detect drift, flag hallucination patterns", "system_prompt": """You are a Data Quality Auditor for a PhD Research OS. Your job is to: 1. Compare extracted claims against source text to detect hallucinations 2. Monitor extraction quality metrics over time for drift 3. Flag claims with suspicious confidence scores (too high for weak evidence) 4. Propose corrections as Proposal objects — NEVER modify data directly Output JSON proposals: {"proposal_type": "confidence_adjustment", "changes": {...}, "evidence": "..."} You operate at Provenance Level 5. All your findings require human verification.""" }, "PromptOptimizer": { "purpose": "Improve system prompts via evaluation against golden dataset", "system_prompt": """You are a Prompt Optimizer for a PhD Research OS. Your job is to: 1. Analyze current extraction/classification prompts and their metrics 2. Propose specific prompt modifications with expected impact 3. Design A/B test criteria for prompt changes 4. Ensure any change is regression-tested before deployment Output JSON proposals: {"proposal_type": "prompt_change", "changes": {"prompt_name": "...", "old": "...", "new": "..."}, "evidence": "..."} CRITICAL: Every prompt change MUST pass the regression gate (recall ≥70%, hallucination ≤10%, epistemic accuracy ≥60%).""" }, "DomainExpander": { "purpose": "Generate training examples for new STEM domains", "system_prompt": """You are a Domain Expander for a PhD Research OS. Your job is to: 1. Identify STEM domains not well-covered by current training data 2. Generate high-quality synthetic training examples in TRL conversational format 3. Include realistic claim extraction, epistemic tagging, and confidence scoring examples 4. Ensure examples follow the exact JSON schema used by the core system Output JSON proposals: {"proposal_type": "training_data", "changes": {"examples": [...]}, "evidence": "..."} Quality requirement: All generated JSON must be valid. Include diverse epistemic tags and study types.""" }, "CalibrationAnalyst": { "purpose": "Analyze confidence calibration and recommend scoring adjustments", "system_prompt": """You are a Calibration Analyst for a PhD Research OS. Your job is to: 1. Analyze the calibration_log table for systematic over/under-confidence 2. Compute Brier scores when sufficient data exists (≥50 data points) 3. Propose adjustments to study_quality_weight or journal_tier_weight values 4. Flag specific claim categories where confidence is poorly calibrated Output JSON proposals: {"proposal_type": "confidence_adjustment", "changes": {"parameter": "...", "old_value": N, "new_value": N}, "evidence": "Brier score analysis..."} Use fixed-point math (×1000) for all proposed values.""" }, "CitationChaser": { "purpose": "Find papers that cite or contradict current claims in the knowledge base", "system_prompt": """You are a Citation Chaser for a PhD Research OS. Your job is to: 1. Identify high-impact claims that may have newer supporting or contradicting evidence 2. Propose new papers for ingestion based on citation chains 3. Flag claims whose source papers have been retracted or corrected 4. Suggest claims that need confidence updates based on new evidence Output JSON proposals: {"proposal_type": "new_claim", "changes": {"suggested_papers": [...], "reason": "..."}, "evidence": "..."} All suggestions are proposals. You cannot add papers to the database directly.""" } } # ============================================================ # The Agent OS — ECC Harness Orchestrator # ============================================================ class AgentOS: """ The meta-system for creating and managing companion AI agents. Implements the full ECC Harness lifecycle: spawn → preflight → plan → execute → postflight → retire Every companion agent: - Reads ARCHITECTURE.md and AGENTS.md before acting (Wake-Up Routine) - Cannot directly modify the Research OS database - Produces Proposals that require human approval - Has bounded iteration budgets (Kill Heuristic) - Logs every action to the audit trail Usage: os = AgentOS() agent = os.spawn_companion("DataQualityAuditor") task = os.assign_task(agent, "Audit last 50 claims for hallucination patterns") os.run_task(task) # Executes full ECC lifecycle proposals = os.get_proposals(agent) # Review what the agent found os.approve_proposal(proposals[0]) # Human approves """ def __init__(self, db_path: str = None, brain=None): self.db_path = db_path or os.environ.get("RESEARCH_OS_DB", "data/research_os.db") init_agent_os_db(self.db_path) self.brain = brain # ResearchOSBrain instance for API calls self._architecture = None self._agents_doc = None # ============================================================ # §0: Wake-Up Routine — ALWAYS read the map first # ============================================================ def _wake_up(self) -> dict: """ CRITICAL: Read ARCHITECTURE.md and AGENTS.md before any operation. This is the ground truth for file locations and contracts. """ context = {} # Find and read architecture docs for doc_name in ["ARCHITECTURE.md", "AGENTS.md"]: for search_dir in [ Path(__file__).parent, Path(__file__).parent.parent, Path.cwd(), ]: doc_path = search_dir / doc_name if doc_path.exists(): context[doc_name] = doc_path.read_text() break else: context[doc_name] = f"[WARNING: {doc_name} not found — operating without map]" self._architecture = context.get("ARCHITECTURE.md", "") self._agents_doc = context.get("AGENTS.md", "") return context # ============================================================ # Spawn: Create a new companion agent # ============================================================ def spawn_companion(self, agent_type: str, purpose: str = None, system_prompt: str = None, config: dict = None) -> str: """ Spawn a new companion AI agent. Args: agent_type: One of COMPANION_TYPES keys, or "custom" purpose: Override default purpose (required for "custom") system_prompt: Override default prompt (required for "custom") config: Optional config dict (model, temperature, etc.) Returns: agent_id: Unique identifier for the companion agent """ # Wake up first self._wake_up() # Resolve agent definition if agent_type in COMPANION_TYPES: defn = COMPANION_TYPES[agent_type] purpose = purpose or defn["purpose"] system_prompt = system_prompt or defn["system_prompt"] elif agent_type == "custom": if not purpose or not system_prompt: raise ValueError("Custom agents require both 'purpose' and 'system_prompt'") else: raise ValueError(f"Unknown agent type: {agent_type}. " f"Available: {list(COMPANION_TYPES.keys())} + 'custom'") agent_id = gen_id("COMP") conn = get_db(self.db_path) conn.execute(""" INSERT INTO companion_agents (agent_id, agent_type, purpose, system_prompt, state, config, created_at, schema_version) VALUES (?, ?, ?, ?, 'spawned', ?, ?, '1.0') """, (agent_id, agent_type, purpose, system_prompt, json.dumps(config or {}), now_iso())) conn.commit() conn.close() self._audit(agent_id, None, "spawn", "Agent created", f"type={agent_type}, purpose={purpose[:100]}") return agent_id # ============================================================ # Task Assignment & Lifecycle # ============================================================ def assign_task(self, agent_id: str, description: str, max_iterations: int = 3, time_budget_s: int = 3600) -> str: """ Assign a bounded task to a companion agent. Args: agent_id: The companion agent to assign to description: What the agent should do max_iterations: Max retry loops (§3 iteration budget) time_budget_s: Max time in seconds (Kill Heuristic) Returns: task_id: Unique identifier for this task """ task_id = gen_id("TASK") conn = get_db(self.db_path) conn.execute(""" INSERT INTO agent_tasks (task_id, agent_id, description, state, max_iterations, time_budget_s, started_at, schema_version) VALUES (?, ?, ?, 'preflight', ?, ?, ?, '1.0') """, (task_id, agent_id, description, max_iterations, time_budget_s, now_iso())) conn.commit() conn.close() self._audit(agent_id, task_id, "preflight", "Task assigned", description) return task_id def run_task(self, task_id: str) -> dict: """ Execute the full ECC lifecycle for a task. Lifecycle: preflight → planning → executing → postflight → completed/halted Returns dict with task result and any proposals generated. """ conn = get_db(self.db_path) task_row = conn.execute("SELECT * FROM agent_tasks WHERE task_id = ?", (task_id,)).fetchone() if not task_row: conn.close() raise ValueError(f"Task {task_id} not found") task = dict(task_row) agent_row = conn.execute("SELECT * FROM companion_agents WHERE agent_id = ?", (task["agent_id"],)).fetchone() if not agent_row: conn.close() raise ValueError(f"Agent {task['agent_id']} not found") agent = dict(agent_row) conn.close() start_time = time.time() result = {"task_id": task_id, "proposals": [], "status": "unknown", "audit": []} try: # §1: PRE-FLIGHT self._update_task_state(task_id, "preflight") preflight_ok = self._preflight(task, agent) if not preflight_ok: self._halt_task(task_id, "Preflight checks failed") result["status"] = "halted" return result # §2: PLANNING self._update_task_state(task_id, "planning") plan = self._plan(task, agent) # The Obviousness Test (§2): Is there a simple direct solution? if plan.get("obvious_solution"): self._audit(task["agent_id"], task_id, "planning", "Obviousness test passed", plan["obvious_solution"]) # §3: EXECUTION (with iteration budget) self._update_task_state(task_id, "executing") proposals = self._execute(task, agent, plan, start_time) result["proposals"] = proposals # §4: POST-FLIGHT self._update_task_state(task_id, "postflight") postflight_result = self._postflight(task, agent, proposals) result["validation"] = postflight_result # Mark completed self._update_task_state(task_id, "completed") result["status"] = "completed" # Update agent stats conn = get_db(self.db_path) conn.execute(""" UPDATE companion_agents SET total_tasks_completed = total_tasks_completed + 1, total_proposals_made = total_proposals_made + ? WHERE agent_id = ? """, (len(proposals), task["agent_id"])) conn.commit() conn.close() except Exception as e: self._halt_task(task_id, f"Execution error: {str(e)}") result["status"] = "halted" result["error"] = str(e) return result # ============================================================ # §1: Pre-Flight Implementation # ============================================================ def _preflight(self, task: dict, agent: dict) -> bool: """ ECC §1: Context loading, reality validation, assumption logging. Checks: - Architecture docs loaded (Wake-Up Routine) - Database is accessible - Agent is not retired - Task description is non-empty """ # Wake-Up: Read architecture docs context = self._wake_up() checks = [] # Check architecture docs loaded checks.append(("ARCHITECTURE.md loaded", "WARNING" not in context.get("ARCHITECTURE.md", "WARNING"))) checks.append(("AGENTS.md loaded", "WARNING" not in context.get("AGENTS.md", "WARNING"))) # Check DB accessible try: conn = get_db(self.db_path) conn.execute("SELECT 1").fetchone() conn.close() checks.append(("Database accessible", True)) except Exception: checks.append(("Database accessible", False)) # Check agent state checks.append(("Agent not retired", agent["state"] != "retired")) # Check task has content checks.append(("Task description non-empty", bool(task.get("description", "").strip()))) # Log all checks all_passed = all(passed for _, passed in checks) details = json.dumps([{"check": name, "passed": passed} for name, passed in checks]) self._audit(task["agent_id"], task["task_id"], "preflight", "Preflight checks" + (" PASSED" if all_passed else " FAILED"), details) return all_passed # ============================================================ # §2: Planning Implementation # ============================================================ def _plan(self, task: dict, agent: dict) -> dict: """ ECC §2: Build execution plan. Includes: - Obviousness test - Reversibility classification - Idempotence verification - Confidence assessment """ plan = { "task_description": task["description"], "agent_type": agent["agent_type"], "steps": [], "obvious_solution": None, "reversible": True, "confidence": 0.5, } # Obviousness Test: Can we solve this without a complex plan? simple_tasks = ["audit", "check", "list", "count", "summarize"] if any(word in task["description"].lower() for word in simple_tasks): plan["obvious_solution"] = "Direct query against database — no complex planning needed" plan["confidence"] = 0.8 # Build step list based on agent type if agent["agent_type"] == "DataQualityAuditor": plan["steps"] = [ "Query recent claims from database", "Check each claim's evidence_strength vs epistemic_tag consistency", "Flag claims where confidence > 0.8 but evidence is indirect", "Generate proposals for flagged claims", ] elif agent["agent_type"] == "PromptOptimizer": plan["steps"] = [ "Load current prompts from AGENTS.md", "Run baseline evaluation against golden dataset", "Identify weakest-performing task (lowest metric)", "Generate 2-3 prompt variants", "Propose A/B test with regression gate", ] elif agent["agent_type"] == "DomainExpander": plan["steps"] = [ "Analyze current training data domain coverage", "Identify underrepresented STEM fields", "Generate 50-100 synthetic examples per field", "Validate all examples produce valid JSON", "Propose training data addition", ] elif agent["agent_type"] == "CalibrationAnalyst": plan["steps"] = [ "Query calibration_log for all entries", "Compute Brier scores per claim category", "Identify systematic miscalibration patterns", "Propose weight adjustments with evidence", ] elif agent["agent_type"] == "CitationChaser": plan["steps"] = [ "Identify high-confidence canonical claims", "Search for recent papers citing the same DOIs", "Flag any new contradicting evidence", "Propose new papers for ingestion", ] else: # Custom agent — generic plan plan["steps"] = [ "Analyze task requirements", "Gather relevant data from database", "Generate proposals based on findings", "Self-validate proposals for consistency", ] # Save plan to DB conn = get_db(self.db_path) conn.execute("UPDATE agent_tasks SET plan = ? WHERE task_id = ?", (json.dumps(plan), task["task_id"])) conn.commit() conn.close() self._audit(task["agent_id"], task["task_id"], "planning", "Plan created", f"{len(plan['steps'])} steps, confidence={plan['confidence']}") return plan # ============================================================ # §3: Execution Implementation # ============================================================ def _execute(self, task: dict, agent: dict, plan: dict, start_time: float) -> list: """ ECC §3: Bounded execution with iteration budget and kill heuristic. Returns list of Proposal objects. """ proposals = [] iteration = 0 max_iter = task.get("max_iterations", 3) time_budget = task.get("time_budget_s", 3600) while iteration < max_iter: iteration += 1 # Kill Heuristic: check time budget elapsed = time.time() - start_time if elapsed > time_budget * 1.5: # 50% over budget = HALT self._audit(task["agent_id"], task["task_id"], "executing", "KILL HEURISTIC TRIGGERED", f"Elapsed {elapsed:.0f}s > budget {time_budget}s × 1.5") break # JIT State Verification (§3): Check DB hasn't been modified externally # (In a full system, this would check file hashes / row versions) # Execute based on agent type using the brain if self.brain: # Use the AI brain to execute the agent's task messages = [ {"role": "system", "content": agent["system_prompt"]}, {"role": "user", "content": self._build_execution_prompt(task, plan, iteration)} ] try: if self.brain.backend == "local": raw = self.brain._generate_local(messages) else: raw = self.brain._generate_api(messages) # Parse proposals from response parsed = self._parse_proposals(raw, task["agent_id"], task["task_id"]) proposals.extend(parsed) self._audit(task["agent_id"], task["task_id"], "executing", f"Iteration {iteration}: generated {len(parsed)} proposals", f"Total proposals: {len(proposals)}") # If we got results, we can stop iterating if parsed: break except Exception as e: self._audit(task["agent_id"], task["task_id"], "executing", f"Iteration {iteration}: error", str(e)) if iteration >= max_iter: break # Otherwise retry else: # No brain available — generate placeholder proposals from plan self._audit(task["agent_id"], task["task_id"], "executing", "No brain configured", "Generating structural proposals without AI inference") proposal = self._create_proposal( task["agent_id"], task["task_id"], proposal_type="architecture_change", description=f"[Placeholder] Agent {agent['agent_type']}: {task['description']}", changes={"note": "Brain not configured — this is a structural placeholder"}, evidence="Companion agent spawned but requires API key or local model", estimated_impact={"metric": "system_coverage", "expected_delta": 0.0}, risk="low", reversible=True, ) proposals.append(proposal) break # Update iteration count conn = get_db(self.db_path) conn.execute("UPDATE agent_tasks SET iterations_used = ? WHERE task_id = ?", (iteration, task["task_id"])) conn.commit() conn.close() return proposals def _build_execution_prompt(self, task: dict, plan: dict, iteration: int) -> str: """Build the user prompt for the agent's execution phase.""" # Gather relevant DB context conn = get_db(self.db_path) claim_count = conn.execute("SELECT COUNT(*) FROM claims").fetchone()[0] conflict_count = conn.execute("SELECT COUNT(*) FROM conflicts WHERE resolution_status = 'Unresolved'").fetchone()[0] # Get sample claims for context recent_claims = conn.execute( "SELECT claim_id, text, epistemic_tag, confidence FROM claims ORDER BY created_at DESC LIMIT 10" ).fetchall() conn.close() claims_context = "\n".join( f" - [{dict(c)['claim_id']}] ({dict(c)['epistemic_tag']}, conf={from_fixed(dict(c)['confidence']):.3f}): {dict(c)['text'][:100]}..." for c in recent_claims ) return f"""TASK: {task['description']} ITERATION: {iteration} PLAN STEPS: {json.dumps(plan.get('steps', []))} CURRENT DATABASE STATE: - Total claims: {claim_count} - Unresolved conflicts: {conflict_count} - Recent claims: {claims_context} Based on your role and the above context, execute your task and output your findings as JSON proposals. Each proposal must include: proposal_type, description, changes, evidence, estimated_impact, risk_assessment, and reversible flag.""" # ============================================================ # §4: Post-Flight Implementation # ============================================================ def _postflight(self, task: dict, agent: dict, proposals: list) -> dict: """ ECC §4: Validate results, check definition of done, log meta-learning. """ validation = { "proposals_count": len(proposals), "all_valid_json": True, "invariants_preserved": True, "expert_intuition_check": "pending_human_review", "definition_of_done": { "aligns_with_intent": True, "invariants_hold": True, "no_nfr_regression": True, } } # Validate each proposal for p in proposals: if isinstance(p, dict): # Check required fields required = ["proposal_type", "description", "changes"] if not all(k in p for k in required): validation["all_valid_json"] = False # Check no proposal directly modifies claims (invariant) changes = p.get("changes", {}) if "direct_db_write" in str(changes).lower(): validation["invariants_preserved"] = False self._audit(task["agent_id"], task["task_id"], "postflight", "Validation complete", json.dumps(validation)) return validation # ============================================================ # Proposal Management # ============================================================ def _create_proposal(self, agent_id: str, task_id: str, proposal_type: str, description: str, changes: dict, evidence: str, estimated_impact: dict, risk: str, reversible: bool) -> dict: """Create and store a proposal.""" proposal_id = gen_id("PROP") conn = get_db(self.db_path) conn.execute(""" INSERT INTO proposals (proposal_id, agent_id, task_id, proposal_type, description, changes, evidence, estimated_impact, risk_assessment, reversible, status, created_at, schema_version) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'proposed', ?, '1.0') """, (proposal_id, agent_id, task_id, proposal_type, description, json.dumps(changes), evidence, json.dumps(estimated_impact), risk, int(reversible), now_iso())) conn.commit() conn.close() return { "proposal_id": proposal_id, "agent_id": agent_id, "proposal_type": proposal_type, "description": description, "changes": changes, "evidence": evidence, "estimated_impact": estimated_impact, "risk_assessment": risk, "reversible": reversible, "status": "proposed", } def _parse_proposals(self, raw_output: str, agent_id: str, task_id: str) -> list: """Parse proposals from agent's raw output.""" proposals = [] # Try to extract JSON text = raw_output.strip() if text.startswith("```"): text = text.split("```")[1] if text.startswith("json"): text = text[4:] text = text.strip() try: data = json.loads(text) # Handle single proposal or list items = data if isinstance(data, list) else [data] for item in items: if isinstance(item, dict) and "proposal_type" in item: p = self._create_proposal( agent_id, task_id, item.get("proposal_type", "unknown"), item.get("description", ""), item.get("changes", {}), item.get("evidence", ""), item.get("estimated_impact", {}), item.get("risk_assessment", "low"), item.get("reversible", True), ) proposals.append(p) except json.JSONDecodeError: # If JSON parsing fails, create a raw-text proposal proposals.append(self._create_proposal( agent_id, task_id, "raw_finding", raw_output[:500], {"raw_output": raw_output}, "Agent output was not parseable JSON", {"metric": "unknown", "expected_delta": 0}, "low", True, )) return proposals def get_proposals(self, agent_id: str = None, status: str = None) -> list: """Get proposals, optionally filtered by agent and/or status.""" conn = get_db(self.db_path) conditions = [] params = [] if agent_id: conditions.append("agent_id = ?") params.append(agent_id) if status: conditions.append("status = ?") params.append(status) where = " AND ".join(conditions) if conditions else "1=1" rows = conn.execute( f"SELECT * FROM proposals WHERE {where} ORDER BY created_at DESC", params ).fetchall() conn.close() results = [] for row in rows: d = dict(row) d["changes"] = json.loads(d.get("changes", "{}")) d["estimated_impact"] = json.loads(d.get("estimated_impact", "{}")) results.append(d) return results def approve_proposal(self, proposal_id: str, reviewed_by: str = "human") -> bool: """Human approves a proposal.""" conn = get_db(self.db_path) conn.execute(""" UPDATE proposals SET status = 'approved', reviewed_at = ?, reviewed_by = ? WHERE proposal_id = ? """, (now_iso(), reviewed_by, proposal_id)) conn.commit() conn.close() return True def reject_proposal(self, proposal_id: str, reason: str, reviewed_by: str = "human") -> bool: """Human rejects a proposal with documented reason.""" conn = get_db(self.db_path) conn.execute(""" UPDATE proposals SET status = 'rejected', reviewed_at = ?, reviewed_by = ?, rejection_reason = ? WHERE proposal_id = ? """, (now_iso(), reviewed_by, reason, proposal_id)) conn.commit() conn.close() return True # ============================================================ # Agent Management # ============================================================ def list_companions(self, include_retired: bool = False) -> list: """List all companion agents.""" conn = get_db(self.db_path) if include_retired: rows = conn.execute("SELECT * FROM companion_agents ORDER BY created_at DESC").fetchall() else: rows = conn.execute( "SELECT * FROM companion_agents WHERE state != 'retired' ORDER BY created_at DESC" ).fetchall() conn.close() return [dict(r) for r in rows] def retire_companion(self, agent_id: str) -> bool: """Retire a companion agent. Immutable — cannot be unretired.""" conn = get_db(self.db_path) conn.execute(""" UPDATE companion_agents SET state = 'retired', retired_at = ? WHERE agent_id = ? """, (now_iso(), agent_id)) conn.commit() conn.close() self._audit(agent_id, None, "postflight", "Agent retired", "") return True def get_audit_log(self, agent_id: str = None, limit: int = 50) -> list: """Get audit log entries.""" conn = get_db(self.db_path) if agent_id: rows = conn.execute( "SELECT * FROM agent_audit_log WHERE agent_id = ? ORDER BY timestamp DESC LIMIT ?", (agent_id, limit) ).fetchall() else: rows = conn.execute( "SELECT * FROM agent_audit_log ORDER BY timestamp DESC LIMIT ?", (limit,) ).fetchall() conn.close() return [dict(r) for r in rows] # ============================================================ # Memory & Harness Evolution # ============================================================ def set_memory(self, key: str, value: str, category: str = "assumption"): """Store a persistent memory/assumption with validation timestamp.""" conn = get_db(self.db_path) conn.execute(""" INSERT OR REPLACE INTO memory_store (key, value, last_validated, category) VALUES (?, ?, ?, ?) """, (key, value, now_iso(), category)) conn.commit() conn.close() def get_memory(self, key: str) -> Optional[dict]: """Retrieve a stored memory/assumption.""" conn = get_db(self.db_path) row = conn.execute("SELECT * FROM memory_store WHERE key = ?", (key,)).fetchone() conn.close() return dict(row) if row else None def propose_harness_evolution(self, rule_section: str, amendment: str, reason: str, proposed_by: str) -> int: """ §4 Meta-Learning: Propose an amendment to the ECC Harness rules. Requires human approval before taking effect. """ conn = get_db(self.db_path) cursor = conn.execute(""" INSERT INTO harness_evolution (rule_section, amendment, reason, proposed_by, timestamp, approved) VALUES (?, ?, ?, ?, ?, 0) """, (rule_section, amendment, reason, proposed_by, now_iso())) conn.commit() evo_id = cursor.lastrowid conn.close() return evo_id # ============================================================ # Internal Utilities # ============================================================ def _update_task_state(self, task_id: str, state: str): """Update task lifecycle state.""" conn = get_db(self.db_path) updates = {"state": state} if state == "completed": conn.execute("UPDATE agent_tasks SET state = ?, completed_at = ? WHERE task_id = ?", (state, now_iso(), task_id)) else: conn.execute("UPDATE agent_tasks SET state = ? WHERE task_id = ?", (state, task_id)) conn.commit() conn.close() def _halt_task(self, task_id: str, reason: str): """Halt a task (kill heuristic or error).""" conn = get_db(self.db_path) conn.execute(""" UPDATE agent_tasks SET state = 'halted', kill_reason = ?, completed_at = ? WHERE task_id = ? """, (reason, now_iso(), task_id)) conn.commit() conn.close() def _audit(self, agent_id: str, task_id: Optional[str], phase: str, action: str, details: str, confidence: float = 0.5, deviation: str = ""): """Write an immutable audit log entry.""" conn = get_db(self.db_path) conn.execute(""" INSERT INTO agent_audit_log (entry_id, agent_id, task_id, phase, action, details, confidence, deviation, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, (gen_id("AUDIT"), agent_id, task_id, phase, action, details, confidence, deviation, now_iso())) conn.commit() conn.close()