| """ |
| PhD Research OS — ECC Harness Orchestrator (agent_os.py) |
| ========================================================= |
| The meta-system for spawning, managing, and auditing companion AI agents |
| that improve the core Research OS brain. |
| |
| Implements the ECC Harness: Principal Architect Edition (V-SINGULARITY) |
| - §0: Global Objective Function (correctness > blast radius > simplicity > NFRs > no-op) |
| - §1: Pre-Flight (context loading, knowledge boundaries, assumption logging) |
| - §2: Planning (obviousness test, reversibility, idempotence, confidence signaling) |
| - §3: Execution (JIT verification, cognitive budget, failure modes, scope containment) |
| - §4: Post-Flight (validation, pedagogical handoff, definition of done, meta-learning) |
| |
| WAKE-UP ROUTINE: Before any task, this module reads ARCHITECTURE.md and AGENTS.md |
| to ground itself in the project map. This is non-negotiable. |
| """ |
|
|
| import json |
| import os |
| import sqlite3 |
| import uuid |
| import time |
| from datetime import datetime, timezone |
| from typing import Optional, Callable |
| from dataclasses import dataclass, field, asdict |
| from pathlib import Path |
| from enum import Enum |
|
|
| from .db import get_db, init_db, now_iso, gen_id, to_fixed, from_fixed |
|
|
|
|
| |
| |
| |
|
|
| class AgentState(Enum): |
| """ECC Harness lifecycle states for companion agents.""" |
| SPAWNED = "spawned" |
| PREFLIGHT = "preflight" |
| PLANNING = "planning" |
| EXECUTING = "executing" |
| POSTFLIGHT = "postflight" |
| COMPLETED = "completed" |
| HALTED = "halted" |
| RETIRED = "retired" |
|
|
|
|
| class ProposalStatus(Enum): |
| PROPOSED = "proposed" |
| APPROVED = "approved" |
| REJECTED = "rejected" |
| APPLIED = "applied" |
|
|
|
|
| class RiskLevel(Enum): |
| LOW = "low" |
| MEDIUM = "medium" |
| HIGH = "high" |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class Proposal: |
| """ |
| A companion agent's proposed change to the Research OS. |
| ALL companion output goes through proposals — never direct modification. |
| """ |
| proposal_id: str |
| agent_id: str |
| proposal_type: str |
| description: str |
| changes: dict |
| evidence: str |
| estimated_impact: dict |
| risk_assessment: str |
| reversible: bool |
| status: str = "proposed" |
| created_at: str = "" |
| reviewed_at: str = "" |
| reviewed_by: str = "" |
| rejection_reason: str = "" |
|
|
| def to_dict(self): |
| return asdict(self) |
|
|
| def to_json(self): |
| return json.dumps(self.to_dict(), indent=2) |
|
|
|
|
| @dataclass |
| class AuditEntry: |
| """Immutable audit log entry. Every agent action is recorded.""" |
| entry_id: str |
| agent_id: str |
| phase: str |
| action: str |
| details: str |
| confidence: float |
| timestamp: str |
| deviation: str = "" |
|
|
|
|
| @dataclass |
| class AgentTask: |
| """A bounded task assigned to a companion agent.""" |
| task_id: str |
| agent_id: str |
| description: str |
| state: str = "preflight" |
| plan: str = "" |
| result: str = "" |
| iterations_used: int = 0 |
| max_iterations: int = 3 |
| time_budget_s: int = 3600 |
| started_at: str = "" |
| completed_at: str = "" |
| kill_reason: str = "" |
|
|
|
|
| |
| |
| |
|
|
| def init_agent_os_db(db_path: str = None): |
| """Extend the Research OS database with companion agent tables.""" |
| |
| init_db(db_path) |
|
|
| conn = get_db(db_path) |
| conn.executescript(""" |
| CREATE TABLE IF NOT EXISTS companion_agents ( |
| agent_id TEXT PRIMARY KEY, |
| agent_type TEXT NOT NULL, |
| purpose TEXT NOT NULL, |
| system_prompt TEXT NOT NULL, |
| state TEXT NOT NULL DEFAULT 'spawned', |
| config TEXT, -- JSON: model, temperature, etc. |
| created_at TEXT NOT NULL, |
| retired_at TEXT, |
| total_tasks_completed INTEGER DEFAULT 0, |
| total_proposals_made INTEGER DEFAULT 0, |
| schema_version TEXT NOT NULL DEFAULT '1.0' |
| ); |
| |
| CREATE TABLE IF NOT EXISTS agent_tasks ( |
| task_id TEXT PRIMARY KEY, |
| agent_id TEXT NOT NULL, |
| description TEXT NOT NULL, |
| state TEXT NOT NULL DEFAULT 'preflight', |
| plan TEXT, -- JSON execution plan |
| result TEXT, -- JSON result |
| iterations_used INTEGER DEFAULT 0, |
| max_iterations INTEGER DEFAULT 3, |
| time_budget_s INTEGER DEFAULT 3600, |
| started_at TEXT, |
| completed_at TEXT, |
| kill_reason TEXT, |
| schema_version TEXT NOT NULL DEFAULT '1.0', |
| FOREIGN KEY(agent_id) REFERENCES companion_agents(agent_id) |
| ); |
| |
| CREATE TABLE IF NOT EXISTS proposals ( |
| proposal_id TEXT PRIMARY KEY, |
| agent_id TEXT NOT NULL, |
| task_id TEXT, |
| proposal_type TEXT NOT NULL, |
| description TEXT NOT NULL, |
| changes TEXT NOT NULL, -- JSON |
| evidence TEXT, |
| estimated_impact TEXT, -- JSON |
| risk_assessment TEXT DEFAULT 'low', |
| reversible INTEGER DEFAULT 1, |
| status TEXT DEFAULT 'proposed', |
| created_at TEXT NOT NULL, |
| reviewed_at TEXT, |
| reviewed_by TEXT, |
| rejection_reason TEXT, |
| schema_version TEXT NOT NULL DEFAULT '1.0', |
| FOREIGN KEY(agent_id) REFERENCES companion_agents(agent_id), |
| FOREIGN KEY(task_id) REFERENCES agent_tasks(task_id) |
| ); |
| |
| CREATE TABLE IF NOT EXISTS agent_audit_log ( |
| entry_id TEXT PRIMARY KEY, |
| agent_id TEXT NOT NULL, |
| task_id TEXT, |
| phase TEXT NOT NULL, |
| action TEXT NOT NULL, |
| details TEXT, |
| confidence REAL, |
| deviation TEXT, |
| timestamp TEXT NOT NULL, |
| FOREIGN KEY(agent_id) REFERENCES companion_agents(agent_id) |
| ); |
| |
| CREATE TABLE IF NOT EXISTS harness_evolution ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| rule_section TEXT NOT NULL, |
| amendment TEXT NOT NULL, |
| reason TEXT NOT NULL, |
| proposed_by TEXT NOT NULL, |
| timestamp TEXT NOT NULL, |
| approved INTEGER DEFAULT 0 |
| ); |
| |
| CREATE TABLE IF NOT EXISTS memory_store ( |
| key TEXT PRIMARY KEY, |
| value TEXT NOT NULL, |
| last_validated TEXT NOT NULL, |
| category TEXT DEFAULT 'assumption' |
| ); |
| """) |
| conn.commit() |
| conn.close() |
|
|
|
|
| |
| |
| |
|
|
| |
| COMPANION_TYPES = { |
| "DataQualityAuditor": { |
| "purpose": "Audit claim extraction quality, detect drift, flag hallucination patterns", |
| "system_prompt": """You are a Data Quality Auditor for a PhD Research OS. Your job is to: |
| 1. Compare extracted claims against source text to detect hallucinations |
| 2. Monitor extraction quality metrics over time for drift |
| 3. Flag claims with suspicious confidence scores (too high for weak evidence) |
| 4. Propose corrections as Proposal objects — NEVER modify data directly |
| |
| Output JSON proposals: {"proposal_type": "confidence_adjustment", "changes": {...}, "evidence": "..."} |
| You operate at Provenance Level 5. All your findings require human verification.""" |
| }, |
| "PromptOptimizer": { |
| "purpose": "Improve system prompts via evaluation against golden dataset", |
| "system_prompt": """You are a Prompt Optimizer for a PhD Research OS. Your job is to: |
| 1. Analyze current extraction/classification prompts and their metrics |
| 2. Propose specific prompt modifications with expected impact |
| 3. Design A/B test criteria for prompt changes |
| 4. Ensure any change is regression-tested before deployment |
| |
| Output JSON proposals: {"proposal_type": "prompt_change", "changes": {"prompt_name": "...", "old": "...", "new": "..."}, "evidence": "..."} |
| CRITICAL: Every prompt change MUST pass the regression gate (recall ≥70%, hallucination ≤10%, epistemic accuracy ≥60%).""" |
| }, |
| "DomainExpander": { |
| "purpose": "Generate training examples for new STEM domains", |
| "system_prompt": """You are a Domain Expander for a PhD Research OS. Your job is to: |
| 1. Identify STEM domains not well-covered by current training data |
| 2. Generate high-quality synthetic training examples in TRL conversational format |
| 3. Include realistic claim extraction, epistemic tagging, and confidence scoring examples |
| 4. Ensure examples follow the exact JSON schema used by the core system |
| |
| Output JSON proposals: {"proposal_type": "training_data", "changes": {"examples": [...]}, "evidence": "..."} |
| Quality requirement: All generated JSON must be valid. Include diverse epistemic tags and study types.""" |
| }, |
| "CalibrationAnalyst": { |
| "purpose": "Analyze confidence calibration and recommend scoring adjustments", |
| "system_prompt": """You are a Calibration Analyst for a PhD Research OS. Your job is to: |
| 1. Analyze the calibration_log table for systematic over/under-confidence |
| 2. Compute Brier scores when sufficient data exists (≥50 data points) |
| 3. Propose adjustments to study_quality_weight or journal_tier_weight values |
| 4. Flag specific claim categories where confidence is poorly calibrated |
| |
| Output JSON proposals: {"proposal_type": "confidence_adjustment", "changes": {"parameter": "...", "old_value": N, "new_value": N}, "evidence": "Brier score analysis..."} |
| Use fixed-point math (×1000) for all proposed values.""" |
| }, |
| "CitationChaser": { |
| "purpose": "Find papers that cite or contradict current claims in the knowledge base", |
| "system_prompt": """You are a Citation Chaser for a PhD Research OS. Your job is to: |
| 1. Identify high-impact claims that may have newer supporting or contradicting evidence |
| 2. Propose new papers for ingestion based on citation chains |
| 3. Flag claims whose source papers have been retracted or corrected |
| 4. Suggest claims that need confidence updates based on new evidence |
| |
| Output JSON proposals: {"proposal_type": "new_claim", "changes": {"suggested_papers": [...], "reason": "..."}, "evidence": "..."} |
| All suggestions are proposals. You cannot add papers to the database directly.""" |
| } |
| } |
|
|
|
|
| |
| |
| |
|
|
| class AgentOS: |
| """ |
| The meta-system for creating and managing companion AI agents. |
| |
| Implements the full ECC Harness lifecycle: |
| spawn → preflight → plan → execute → postflight → retire |
| |
| Every companion agent: |
| - Reads ARCHITECTURE.md and AGENTS.md before acting (Wake-Up Routine) |
| - Cannot directly modify the Research OS database |
| - Produces Proposals that require human approval |
| - Has bounded iteration budgets (Kill Heuristic) |
| - Logs every action to the audit trail |
| |
| Usage: |
| os = AgentOS() |
| agent = os.spawn_companion("DataQualityAuditor") |
| task = os.assign_task(agent, "Audit last 50 claims for hallucination patterns") |
| os.run_task(task) # Executes full ECC lifecycle |
| proposals = os.get_proposals(agent) # Review what the agent found |
| os.approve_proposal(proposals[0]) # Human approves |
| """ |
|
|
| def __init__(self, db_path: str = None, brain=None): |
| self.db_path = db_path or os.environ.get("RESEARCH_OS_DB", "data/research_os.db") |
| init_agent_os_db(self.db_path) |
| self.brain = brain |
| self._architecture = None |
| self._agents_doc = None |
|
|
| |
| |
| |
|
|
| def _wake_up(self) -> dict: |
| """ |
| CRITICAL: Read ARCHITECTURE.md and AGENTS.md before any operation. |
| This is the ground truth for file locations and contracts. |
| """ |
| context = {} |
|
|
| |
| for doc_name in ["ARCHITECTURE.md", "AGENTS.md"]: |
| for search_dir in [ |
| Path(__file__).parent, |
| Path(__file__).parent.parent, |
| Path.cwd(), |
| ]: |
| doc_path = search_dir / doc_name |
| if doc_path.exists(): |
| context[doc_name] = doc_path.read_text() |
| break |
| else: |
| context[doc_name] = f"[WARNING: {doc_name} not found — operating without map]" |
|
|
| self._architecture = context.get("ARCHITECTURE.md", "") |
| self._agents_doc = context.get("AGENTS.md", "") |
| return context |
|
|
| |
| |
| |
|
|
| def spawn_companion(self, agent_type: str, purpose: str = None, |
| system_prompt: str = None, config: dict = None) -> str: |
| """ |
| Spawn a new companion AI agent. |
| |
| Args: |
| agent_type: One of COMPANION_TYPES keys, or "custom" |
| purpose: Override default purpose (required for "custom") |
| system_prompt: Override default prompt (required for "custom") |
| config: Optional config dict (model, temperature, etc.) |
| |
| Returns: |
| agent_id: Unique identifier for the companion agent |
| """ |
| |
| self._wake_up() |
|
|
| |
| if agent_type in COMPANION_TYPES: |
| defn = COMPANION_TYPES[agent_type] |
| purpose = purpose or defn["purpose"] |
| system_prompt = system_prompt or defn["system_prompt"] |
| elif agent_type == "custom": |
| if not purpose or not system_prompt: |
| raise ValueError("Custom agents require both 'purpose' and 'system_prompt'") |
| else: |
| raise ValueError(f"Unknown agent type: {agent_type}. " |
| f"Available: {list(COMPANION_TYPES.keys())} + 'custom'") |
|
|
| agent_id = gen_id("COMP") |
| conn = get_db(self.db_path) |
| conn.execute(""" |
| INSERT INTO companion_agents (agent_id, agent_type, purpose, system_prompt, |
| state, config, created_at, schema_version) |
| VALUES (?, ?, ?, ?, 'spawned', ?, ?, '1.0') |
| """, (agent_id, agent_type, purpose, system_prompt, |
| json.dumps(config or {}), now_iso())) |
| conn.commit() |
| conn.close() |
|
|
| self._audit(agent_id, None, "spawn", "Agent created", |
| f"type={agent_type}, purpose={purpose[:100]}") |
|
|
| return agent_id |
|
|
| |
| |
| |
|
|
| def assign_task(self, agent_id: str, description: str, |
| max_iterations: int = 3, time_budget_s: int = 3600) -> str: |
| """ |
| Assign a bounded task to a companion agent. |
| |
| Args: |
| agent_id: The companion agent to assign to |
| description: What the agent should do |
| max_iterations: Max retry loops (§3 iteration budget) |
| time_budget_s: Max time in seconds (Kill Heuristic) |
| |
| Returns: |
| task_id: Unique identifier for this task |
| """ |
| task_id = gen_id("TASK") |
| conn = get_db(self.db_path) |
| conn.execute(""" |
| INSERT INTO agent_tasks (task_id, agent_id, description, state, |
| max_iterations, time_budget_s, started_at, schema_version) |
| VALUES (?, ?, ?, 'preflight', ?, ?, ?, '1.0') |
| """, (task_id, agent_id, description, max_iterations, time_budget_s, now_iso())) |
| conn.commit() |
| conn.close() |
|
|
| self._audit(agent_id, task_id, "preflight", "Task assigned", description) |
| return task_id |
|
|
| def run_task(self, task_id: str) -> dict: |
| """ |
| Execute the full ECC lifecycle for a task. |
| |
| Lifecycle: preflight → planning → executing → postflight → completed/halted |
| |
| Returns dict with task result and any proposals generated. |
| """ |
| conn = get_db(self.db_path) |
| task_row = conn.execute("SELECT * FROM agent_tasks WHERE task_id = ?", |
| (task_id,)).fetchone() |
| if not task_row: |
| conn.close() |
| raise ValueError(f"Task {task_id} not found") |
| task = dict(task_row) |
|
|
| agent_row = conn.execute("SELECT * FROM companion_agents WHERE agent_id = ?", |
| (task["agent_id"],)).fetchone() |
| if not agent_row: |
| conn.close() |
| raise ValueError(f"Agent {task['agent_id']} not found") |
| agent = dict(agent_row) |
| conn.close() |
|
|
| start_time = time.time() |
| result = {"task_id": task_id, "proposals": [], "status": "unknown", "audit": []} |
|
|
| try: |
| |
| self._update_task_state(task_id, "preflight") |
| preflight_ok = self._preflight(task, agent) |
| if not preflight_ok: |
| self._halt_task(task_id, "Preflight checks failed") |
| result["status"] = "halted" |
| return result |
|
|
| |
| self._update_task_state(task_id, "planning") |
| plan = self._plan(task, agent) |
|
|
| |
| if plan.get("obvious_solution"): |
| self._audit(task["agent_id"], task_id, "planning", |
| "Obviousness test passed", plan["obvious_solution"]) |
|
|
| |
| self._update_task_state(task_id, "executing") |
| proposals = self._execute(task, agent, plan, start_time) |
| result["proposals"] = proposals |
|
|
| |
| self._update_task_state(task_id, "postflight") |
| postflight_result = self._postflight(task, agent, proposals) |
| result["validation"] = postflight_result |
|
|
| |
| self._update_task_state(task_id, "completed") |
| result["status"] = "completed" |
|
|
| |
| conn = get_db(self.db_path) |
| conn.execute(""" |
| UPDATE companion_agents |
| SET total_tasks_completed = total_tasks_completed + 1, |
| total_proposals_made = total_proposals_made + ? |
| WHERE agent_id = ? |
| """, (len(proposals), task["agent_id"])) |
| conn.commit() |
| conn.close() |
|
|
| except Exception as e: |
| self._halt_task(task_id, f"Execution error: {str(e)}") |
| result["status"] = "halted" |
| result["error"] = str(e) |
|
|
| return result |
|
|
| |
| |
| |
|
|
| def _preflight(self, task: dict, agent: dict) -> bool: |
| """ |
| ECC §1: Context loading, reality validation, assumption logging. |
| |
| Checks: |
| - Architecture docs loaded (Wake-Up Routine) |
| - Database is accessible |
| - Agent is not retired |
| - Task description is non-empty |
| """ |
| |
| context = self._wake_up() |
|
|
| checks = [] |
|
|
| |
| checks.append(("ARCHITECTURE.md loaded", "WARNING" not in context.get("ARCHITECTURE.md", "WARNING"))) |
| checks.append(("AGENTS.md loaded", "WARNING" not in context.get("AGENTS.md", "WARNING"))) |
|
|
| |
| try: |
| conn = get_db(self.db_path) |
| conn.execute("SELECT 1").fetchone() |
| conn.close() |
| checks.append(("Database accessible", True)) |
| except Exception: |
| checks.append(("Database accessible", False)) |
|
|
| |
| checks.append(("Agent not retired", agent["state"] != "retired")) |
|
|
| |
| checks.append(("Task description non-empty", bool(task.get("description", "").strip()))) |
|
|
| |
| all_passed = all(passed for _, passed in checks) |
| details = json.dumps([{"check": name, "passed": passed} for name, passed in checks]) |
| self._audit(task["agent_id"], task["task_id"], "preflight", |
| "Preflight checks" + (" PASSED" if all_passed else " FAILED"), details) |
|
|
| return all_passed |
|
|
| |
| |
| |
|
|
| def _plan(self, task: dict, agent: dict) -> dict: |
| """ |
| ECC §2: Build execution plan. |
| |
| Includes: |
| - Obviousness test |
| - Reversibility classification |
| - Idempotence verification |
| - Confidence assessment |
| """ |
| plan = { |
| "task_description": task["description"], |
| "agent_type": agent["agent_type"], |
| "steps": [], |
| "obvious_solution": None, |
| "reversible": True, |
| "confidence": 0.5, |
| } |
|
|
| |
| simple_tasks = ["audit", "check", "list", "count", "summarize"] |
| if any(word in task["description"].lower() for word in simple_tasks): |
| plan["obvious_solution"] = "Direct query against database — no complex planning needed" |
| plan["confidence"] = 0.8 |
|
|
| |
| if agent["agent_type"] == "DataQualityAuditor": |
| plan["steps"] = [ |
| "Query recent claims from database", |
| "Check each claim's evidence_strength vs epistemic_tag consistency", |
| "Flag claims where confidence > 0.8 but evidence is indirect", |
| "Generate proposals for flagged claims", |
| ] |
| elif agent["agent_type"] == "PromptOptimizer": |
| plan["steps"] = [ |
| "Load current prompts from AGENTS.md", |
| "Run baseline evaluation against golden dataset", |
| "Identify weakest-performing task (lowest metric)", |
| "Generate 2-3 prompt variants", |
| "Propose A/B test with regression gate", |
| ] |
| elif agent["agent_type"] == "DomainExpander": |
| plan["steps"] = [ |
| "Analyze current training data domain coverage", |
| "Identify underrepresented STEM fields", |
| "Generate 50-100 synthetic examples per field", |
| "Validate all examples produce valid JSON", |
| "Propose training data addition", |
| ] |
| elif agent["agent_type"] == "CalibrationAnalyst": |
| plan["steps"] = [ |
| "Query calibration_log for all entries", |
| "Compute Brier scores per claim category", |
| "Identify systematic miscalibration patterns", |
| "Propose weight adjustments with evidence", |
| ] |
| elif agent["agent_type"] == "CitationChaser": |
| plan["steps"] = [ |
| "Identify high-confidence canonical claims", |
| "Search for recent papers citing the same DOIs", |
| "Flag any new contradicting evidence", |
| "Propose new papers for ingestion", |
| ] |
| else: |
| |
| plan["steps"] = [ |
| "Analyze task requirements", |
| "Gather relevant data from database", |
| "Generate proposals based on findings", |
| "Self-validate proposals for consistency", |
| ] |
|
|
| |
| conn = get_db(self.db_path) |
| conn.execute("UPDATE agent_tasks SET plan = ? WHERE task_id = ?", |
| (json.dumps(plan), task["task_id"])) |
| conn.commit() |
| conn.close() |
|
|
| self._audit(task["agent_id"], task["task_id"], "planning", |
| "Plan created", f"{len(plan['steps'])} steps, confidence={plan['confidence']}") |
|
|
| return plan |
|
|
| |
| |
| |
|
|
| def _execute(self, task: dict, agent: dict, plan: dict, |
| start_time: float) -> list: |
| """ |
| ECC §3: Bounded execution with iteration budget and kill heuristic. |
| |
| Returns list of Proposal objects. |
| """ |
| proposals = [] |
| iteration = 0 |
| max_iter = task.get("max_iterations", 3) |
| time_budget = task.get("time_budget_s", 3600) |
|
|
| while iteration < max_iter: |
| iteration += 1 |
|
|
| |
| elapsed = time.time() - start_time |
| if elapsed > time_budget * 1.5: |
| self._audit(task["agent_id"], task["task_id"], "executing", |
| "KILL HEURISTIC TRIGGERED", |
| f"Elapsed {elapsed:.0f}s > budget {time_budget}s × 1.5") |
| break |
|
|
| |
| |
|
|
| |
| if self.brain: |
| |
| messages = [ |
| {"role": "system", "content": agent["system_prompt"]}, |
| {"role": "user", "content": self._build_execution_prompt(task, plan, iteration)} |
| ] |
| try: |
| if self.brain.backend == "local": |
| raw = self.brain._generate_local(messages) |
| else: |
| raw = self.brain._generate_api(messages) |
|
|
| |
| parsed = self._parse_proposals(raw, task["agent_id"], task["task_id"]) |
| proposals.extend(parsed) |
|
|
| self._audit(task["agent_id"], task["task_id"], "executing", |
| f"Iteration {iteration}: generated {len(parsed)} proposals", |
| f"Total proposals: {len(proposals)}") |
|
|
| |
| if parsed: |
| break |
|
|
| except Exception as e: |
| self._audit(task["agent_id"], task["task_id"], "executing", |
| f"Iteration {iteration}: error", str(e)) |
| if iteration >= max_iter: |
| break |
| |
| else: |
| |
| self._audit(task["agent_id"], task["task_id"], "executing", |
| "No brain configured", |
| "Generating structural proposals without AI inference") |
|
|
| proposal = self._create_proposal( |
| task["agent_id"], task["task_id"], |
| proposal_type="architecture_change", |
| description=f"[Placeholder] Agent {agent['agent_type']}: {task['description']}", |
| changes={"note": "Brain not configured — this is a structural placeholder"}, |
| evidence="Companion agent spawned but requires API key or local model", |
| estimated_impact={"metric": "system_coverage", "expected_delta": 0.0}, |
| risk="low", |
| reversible=True, |
| ) |
| proposals.append(proposal) |
| break |
|
|
| |
| conn = get_db(self.db_path) |
| conn.execute("UPDATE agent_tasks SET iterations_used = ? WHERE task_id = ?", |
| (iteration, task["task_id"])) |
| conn.commit() |
| conn.close() |
|
|
| return proposals |
|
|
| def _build_execution_prompt(self, task: dict, plan: dict, iteration: int) -> str: |
| """Build the user prompt for the agent's execution phase.""" |
| |
| conn = get_db(self.db_path) |
| claim_count = conn.execute("SELECT COUNT(*) FROM claims").fetchone()[0] |
| conflict_count = conn.execute("SELECT COUNT(*) FROM conflicts WHERE resolution_status = 'Unresolved'").fetchone()[0] |
|
|
| |
| recent_claims = conn.execute( |
| "SELECT claim_id, text, epistemic_tag, confidence FROM claims ORDER BY created_at DESC LIMIT 10" |
| ).fetchall() |
| conn.close() |
|
|
| claims_context = "\n".join( |
| f" - [{dict(c)['claim_id']}] ({dict(c)['epistemic_tag']}, conf={from_fixed(dict(c)['confidence']):.3f}): {dict(c)['text'][:100]}..." |
| for c in recent_claims |
| ) |
|
|
| return f"""TASK: {task['description']} |
| |
| ITERATION: {iteration} |
| PLAN STEPS: {json.dumps(plan.get('steps', []))} |
| |
| CURRENT DATABASE STATE: |
| - Total claims: {claim_count} |
| - Unresolved conflicts: {conflict_count} |
| - Recent claims: |
| {claims_context} |
| |
| Based on your role and the above context, execute your task and output your findings |
| as JSON proposals. Each proposal must include: proposal_type, description, changes, evidence, |
| estimated_impact, risk_assessment, and reversible flag.""" |
|
|
| |
| |
| |
|
|
| def _postflight(self, task: dict, agent: dict, proposals: list) -> dict: |
| """ |
| ECC §4: Validate results, check definition of done, log meta-learning. |
| """ |
| validation = { |
| "proposals_count": len(proposals), |
| "all_valid_json": True, |
| "invariants_preserved": True, |
| "expert_intuition_check": "pending_human_review", |
| "definition_of_done": { |
| "aligns_with_intent": True, |
| "invariants_hold": True, |
| "no_nfr_regression": True, |
| } |
| } |
|
|
| |
| for p in proposals: |
| if isinstance(p, dict): |
| |
| required = ["proposal_type", "description", "changes"] |
| if not all(k in p for k in required): |
| validation["all_valid_json"] = False |
|
|
| |
| changes = p.get("changes", {}) |
| if "direct_db_write" in str(changes).lower(): |
| validation["invariants_preserved"] = False |
|
|
| self._audit(task["agent_id"], task["task_id"], "postflight", |
| "Validation complete", json.dumps(validation)) |
|
|
| return validation |
|
|
| |
| |
| |
|
|
| def _create_proposal(self, agent_id: str, task_id: str, |
| proposal_type: str, description: str, |
| changes: dict, evidence: str, |
| estimated_impact: dict, risk: str, |
| reversible: bool) -> dict: |
| """Create and store a proposal.""" |
| proposal_id = gen_id("PROP") |
| conn = get_db(self.db_path) |
| conn.execute(""" |
| INSERT INTO proposals (proposal_id, agent_id, task_id, proposal_type, |
| description, changes, evidence, estimated_impact, risk_assessment, |
| reversible, status, created_at, schema_version) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'proposed', ?, '1.0') |
| """, (proposal_id, agent_id, task_id, proposal_type, description, |
| json.dumps(changes), evidence, json.dumps(estimated_impact), |
| risk, int(reversible), now_iso())) |
| conn.commit() |
| conn.close() |
|
|
| return { |
| "proposal_id": proposal_id, |
| "agent_id": agent_id, |
| "proposal_type": proposal_type, |
| "description": description, |
| "changes": changes, |
| "evidence": evidence, |
| "estimated_impact": estimated_impact, |
| "risk_assessment": risk, |
| "reversible": reversible, |
| "status": "proposed", |
| } |
|
|
| def _parse_proposals(self, raw_output: str, agent_id: str, task_id: str) -> list: |
| """Parse proposals from agent's raw output.""" |
| proposals = [] |
|
|
| |
| text = raw_output.strip() |
| if text.startswith("```"): |
| text = text.split("```")[1] |
| if text.startswith("json"): |
| text = text[4:] |
| text = text.strip() |
|
|
| try: |
| data = json.loads(text) |
| |
| items = data if isinstance(data, list) else [data] |
|
|
| for item in items: |
| if isinstance(item, dict) and "proposal_type" in item: |
| p = self._create_proposal( |
| agent_id, task_id, |
| item.get("proposal_type", "unknown"), |
| item.get("description", ""), |
| item.get("changes", {}), |
| item.get("evidence", ""), |
| item.get("estimated_impact", {}), |
| item.get("risk_assessment", "low"), |
| item.get("reversible", True), |
| ) |
| proposals.append(p) |
| except json.JSONDecodeError: |
| |
| proposals.append(self._create_proposal( |
| agent_id, task_id, |
| "raw_finding", |
| raw_output[:500], |
| {"raw_output": raw_output}, |
| "Agent output was not parseable JSON", |
| {"metric": "unknown", "expected_delta": 0}, |
| "low", True, |
| )) |
|
|
| return proposals |
|
|
| def get_proposals(self, agent_id: str = None, status: str = None) -> list: |
| """Get proposals, optionally filtered by agent and/or status.""" |
| conn = get_db(self.db_path) |
| conditions = [] |
| params = [] |
|
|
| if agent_id: |
| conditions.append("agent_id = ?") |
| params.append(agent_id) |
| if status: |
| conditions.append("status = ?") |
| params.append(status) |
|
|
| where = " AND ".join(conditions) if conditions else "1=1" |
| rows = conn.execute( |
| f"SELECT * FROM proposals WHERE {where} ORDER BY created_at DESC", |
| params |
| ).fetchall() |
| conn.close() |
|
|
| results = [] |
| for row in rows: |
| d = dict(row) |
| d["changes"] = json.loads(d.get("changes", "{}")) |
| d["estimated_impact"] = json.loads(d.get("estimated_impact", "{}")) |
| results.append(d) |
| return results |
|
|
| def approve_proposal(self, proposal_id: str, reviewed_by: str = "human") -> bool: |
| """Human approves a proposal.""" |
| conn = get_db(self.db_path) |
| conn.execute(""" |
| UPDATE proposals SET status = 'approved', reviewed_at = ?, reviewed_by = ? |
| WHERE proposal_id = ? |
| """, (now_iso(), reviewed_by, proposal_id)) |
| conn.commit() |
| conn.close() |
| return True |
|
|
| def reject_proposal(self, proposal_id: str, reason: str, |
| reviewed_by: str = "human") -> bool: |
| """Human rejects a proposal with documented reason.""" |
| conn = get_db(self.db_path) |
| conn.execute(""" |
| UPDATE proposals SET status = 'rejected', reviewed_at = ?, |
| reviewed_by = ?, rejection_reason = ? |
| WHERE proposal_id = ? |
| """, (now_iso(), reviewed_by, reason, proposal_id)) |
| conn.commit() |
| conn.close() |
| return True |
|
|
| |
| |
| |
|
|
| def list_companions(self, include_retired: bool = False) -> list: |
| """List all companion agents.""" |
| conn = get_db(self.db_path) |
| if include_retired: |
| rows = conn.execute("SELECT * FROM companion_agents ORDER BY created_at DESC").fetchall() |
| else: |
| rows = conn.execute( |
| "SELECT * FROM companion_agents WHERE state != 'retired' ORDER BY created_at DESC" |
| ).fetchall() |
| conn.close() |
| return [dict(r) for r in rows] |
|
|
| def retire_companion(self, agent_id: str) -> bool: |
| """Retire a companion agent. Immutable — cannot be unretired.""" |
| conn = get_db(self.db_path) |
| conn.execute(""" |
| UPDATE companion_agents SET state = 'retired', retired_at = ? |
| WHERE agent_id = ? |
| """, (now_iso(), agent_id)) |
| conn.commit() |
| conn.close() |
| self._audit(agent_id, None, "postflight", "Agent retired", "") |
| return True |
|
|
| def get_audit_log(self, agent_id: str = None, limit: int = 50) -> list: |
| """Get audit log entries.""" |
| conn = get_db(self.db_path) |
| if agent_id: |
| rows = conn.execute( |
| "SELECT * FROM agent_audit_log WHERE agent_id = ? ORDER BY timestamp DESC LIMIT ?", |
| (agent_id, limit) |
| ).fetchall() |
| else: |
| rows = conn.execute( |
| "SELECT * FROM agent_audit_log ORDER BY timestamp DESC LIMIT ?", |
| (limit,) |
| ).fetchall() |
| conn.close() |
| return [dict(r) for r in rows] |
|
|
| |
| |
| |
|
|
| def set_memory(self, key: str, value: str, category: str = "assumption"): |
| """Store a persistent memory/assumption with validation timestamp.""" |
| conn = get_db(self.db_path) |
| conn.execute(""" |
| INSERT OR REPLACE INTO memory_store (key, value, last_validated, category) |
| VALUES (?, ?, ?, ?) |
| """, (key, value, now_iso(), category)) |
| conn.commit() |
| conn.close() |
|
|
| def get_memory(self, key: str) -> Optional[dict]: |
| """Retrieve a stored memory/assumption.""" |
| conn = get_db(self.db_path) |
| row = conn.execute("SELECT * FROM memory_store WHERE key = ?", (key,)).fetchone() |
| conn.close() |
| return dict(row) if row else None |
|
|
| def propose_harness_evolution(self, rule_section: str, amendment: str, |
| reason: str, proposed_by: str) -> int: |
| """ |
| §4 Meta-Learning: Propose an amendment to the ECC Harness rules. |
| Requires human approval before taking effect. |
| """ |
| conn = get_db(self.db_path) |
| cursor = conn.execute(""" |
| INSERT INTO harness_evolution (rule_section, amendment, reason, |
| proposed_by, timestamp, approved) |
| VALUES (?, ?, ?, ?, ?, 0) |
| """, (rule_section, amendment, reason, proposed_by, now_iso())) |
| conn.commit() |
| evo_id = cursor.lastrowid |
| conn.close() |
| return evo_id |
|
|
| |
| |
| |
|
|
| def _update_task_state(self, task_id: str, state: str): |
| """Update task lifecycle state.""" |
| conn = get_db(self.db_path) |
| updates = {"state": state} |
| if state == "completed": |
| conn.execute("UPDATE agent_tasks SET state = ?, completed_at = ? WHERE task_id = ?", |
| (state, now_iso(), task_id)) |
| else: |
| conn.execute("UPDATE agent_tasks SET state = ? WHERE task_id = ?", |
| (state, task_id)) |
| conn.commit() |
| conn.close() |
|
|
| def _halt_task(self, task_id: str, reason: str): |
| """Halt a task (kill heuristic or error).""" |
| conn = get_db(self.db_path) |
| conn.execute(""" |
| UPDATE agent_tasks SET state = 'halted', kill_reason = ?, completed_at = ? |
| WHERE task_id = ? |
| """, (reason, now_iso(), task_id)) |
| conn.commit() |
| conn.close() |
|
|
| def _audit(self, agent_id: str, task_id: Optional[str], phase: str, |
| action: str, details: str, confidence: float = 0.5, |
| deviation: str = ""): |
| """Write an immutable audit log entry.""" |
| conn = get_db(self.db_path) |
| conn.execute(""" |
| INSERT INTO agent_audit_log (entry_id, agent_id, task_id, phase, |
| action, details, confidence, deviation, timestamp) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) |
| """, (gen_id("AUDIT"), agent_id, task_id, phase, action, details, |
| confidence, deviation, now_iso())) |
| conn.commit() |
| conn.close() |
|
|