nkshirsa's picture
Add ECC Harness: phd_research_os/agent_os.py
83050e2 verified
"""
PhD Research OS — ECC Harness Orchestrator (agent_os.py)
=========================================================
The meta-system for spawning, managing, and auditing companion AI agents
that improve the core Research OS brain.
Implements the ECC Harness: Principal Architect Edition (V-SINGULARITY)
- §0: Global Objective Function (correctness > blast radius > simplicity > NFRs > no-op)
- §1: Pre-Flight (context loading, knowledge boundaries, assumption logging)
- §2: Planning (obviousness test, reversibility, idempotence, confidence signaling)
- §3: Execution (JIT verification, cognitive budget, failure modes, scope containment)
- §4: Post-Flight (validation, pedagogical handoff, definition of done, meta-learning)
WAKE-UP ROUTINE: Before any task, this module reads ARCHITECTURE.md and AGENTS.md
to ground itself in the project map. This is non-negotiable.
"""
import json
import os
import sqlite3
import uuid
import time
from datetime import datetime, timezone
from typing import Optional, Callable
from dataclasses import dataclass, field, asdict
from pathlib import Path
from enum import Enum
from .db import get_db, init_db, now_iso, gen_id, to_fixed, from_fixed
# ============================================================
# ECC Lifecycle States
# ============================================================
class AgentState(Enum):
"""ECC Harness lifecycle states for companion agents."""
SPAWNED = "spawned" # Created, not yet active
PREFLIGHT = "preflight" # §1: Loading context, validating assumptions
PLANNING = "planning" # §2: Building execution plan
EXECUTING = "executing" # §3: Running bounded task
POSTFLIGHT = "postflight" # §4: Validating results, logging decisions
COMPLETED = "completed" # Task done successfully
HALTED = "halted" # Kill heuristic triggered or error
RETIRED = "retired" # Agent decommissioned
class ProposalStatus(Enum):
PROPOSED = "proposed"
APPROVED = "approved"
REJECTED = "rejected"
APPLIED = "applied"
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
# ============================================================
# Data Structures
# ============================================================
@dataclass
class Proposal:
"""
A companion agent's proposed change to the Research OS.
ALL companion output goes through proposals — never direct modification.
"""
proposal_id: str
agent_id: str
proposal_type: str # prompt_change, training_data, confidence_adjustment, new_claim, architecture_change
description: str
changes: dict
evidence: str
estimated_impact: dict # {"metric": str, "expected_delta": float}
risk_assessment: str # low, medium, high
reversible: bool
status: str = "proposed"
created_at: str = ""
reviewed_at: str = ""
reviewed_by: str = ""
rejection_reason: str = ""
def to_dict(self):
return asdict(self)
def to_json(self):
return json.dumps(self.to_dict(), indent=2)
@dataclass
class AuditEntry:
"""Immutable audit log entry. Every agent action is recorded."""
entry_id: str
agent_id: str
phase: str # preflight, planning, executing, postflight
action: str # what was done
details: str # specifics
confidence: float # agent's self-assessed confidence [0,1]
timestamp: str
deviation: str = "" # if deviating from rules, document why
@dataclass
class AgentTask:
"""A bounded task assigned to a companion agent."""
task_id: str
agent_id: str
description: str
state: str = "preflight"
plan: str = "" # JSON execution plan
result: str = "" # JSON result
iterations_used: int = 0
max_iterations: int = 3 # §3 iteration budget
time_budget_s: int = 3600 # default 1 hour
started_at: str = ""
completed_at: str = ""
kill_reason: str = ""
# ============================================================
# Database Extension (adds companion agent tables)
# ============================================================
def init_agent_os_db(db_path: str = None):
"""Extend the Research OS database with companion agent tables."""
# First ensure base tables exist
init_db(db_path)
conn = get_db(db_path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS companion_agents (
agent_id TEXT PRIMARY KEY,
agent_type TEXT NOT NULL,
purpose TEXT NOT NULL,
system_prompt TEXT NOT NULL,
state TEXT NOT NULL DEFAULT 'spawned',
config TEXT, -- JSON: model, temperature, etc.
created_at TEXT NOT NULL,
retired_at TEXT,
total_tasks_completed INTEGER DEFAULT 0,
total_proposals_made INTEGER DEFAULT 0,
schema_version TEXT NOT NULL DEFAULT '1.0'
);
CREATE TABLE IF NOT EXISTS agent_tasks (
task_id TEXT PRIMARY KEY,
agent_id TEXT NOT NULL,
description TEXT NOT NULL,
state TEXT NOT NULL DEFAULT 'preflight',
plan TEXT, -- JSON execution plan
result TEXT, -- JSON result
iterations_used INTEGER DEFAULT 0,
max_iterations INTEGER DEFAULT 3,
time_budget_s INTEGER DEFAULT 3600,
started_at TEXT,
completed_at TEXT,
kill_reason TEXT,
schema_version TEXT NOT NULL DEFAULT '1.0',
FOREIGN KEY(agent_id) REFERENCES companion_agents(agent_id)
);
CREATE TABLE IF NOT EXISTS proposals (
proposal_id TEXT PRIMARY KEY,
agent_id TEXT NOT NULL,
task_id TEXT,
proposal_type TEXT NOT NULL,
description TEXT NOT NULL,
changes TEXT NOT NULL, -- JSON
evidence TEXT,
estimated_impact TEXT, -- JSON
risk_assessment TEXT DEFAULT 'low',
reversible INTEGER DEFAULT 1,
status TEXT DEFAULT 'proposed',
created_at TEXT NOT NULL,
reviewed_at TEXT,
reviewed_by TEXT,
rejection_reason TEXT,
schema_version TEXT NOT NULL DEFAULT '1.0',
FOREIGN KEY(agent_id) REFERENCES companion_agents(agent_id),
FOREIGN KEY(task_id) REFERENCES agent_tasks(task_id)
);
CREATE TABLE IF NOT EXISTS agent_audit_log (
entry_id TEXT PRIMARY KEY,
agent_id TEXT NOT NULL,
task_id TEXT,
phase TEXT NOT NULL,
action TEXT NOT NULL,
details TEXT,
confidence REAL,
deviation TEXT,
timestamp TEXT NOT NULL,
FOREIGN KEY(agent_id) REFERENCES companion_agents(agent_id)
);
CREATE TABLE IF NOT EXISTS harness_evolution (
id INTEGER PRIMARY KEY AUTOINCREMENT,
rule_section TEXT NOT NULL,
amendment TEXT NOT NULL,
reason TEXT NOT NULL,
proposed_by TEXT NOT NULL,
timestamp TEXT NOT NULL,
approved INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS memory_store (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
last_validated TEXT NOT NULL,
category TEXT DEFAULT 'assumption'
);
""")
conn.commit()
conn.close()
# ============================================================
# Companion Agent Definition
# ============================================================
# Pre-built companion types with their system prompts
COMPANION_TYPES = {
"DataQualityAuditor": {
"purpose": "Audit claim extraction quality, detect drift, flag hallucination patterns",
"system_prompt": """You are a Data Quality Auditor for a PhD Research OS. Your job is to:
1. Compare extracted claims against source text to detect hallucinations
2. Monitor extraction quality metrics over time for drift
3. Flag claims with suspicious confidence scores (too high for weak evidence)
4. Propose corrections as Proposal objects — NEVER modify data directly
Output JSON proposals: {"proposal_type": "confidence_adjustment", "changes": {...}, "evidence": "..."}
You operate at Provenance Level 5. All your findings require human verification."""
},
"PromptOptimizer": {
"purpose": "Improve system prompts via evaluation against golden dataset",
"system_prompt": """You are a Prompt Optimizer for a PhD Research OS. Your job is to:
1. Analyze current extraction/classification prompts and their metrics
2. Propose specific prompt modifications with expected impact
3. Design A/B test criteria for prompt changes
4. Ensure any change is regression-tested before deployment
Output JSON proposals: {"proposal_type": "prompt_change", "changes": {"prompt_name": "...", "old": "...", "new": "..."}, "evidence": "..."}
CRITICAL: Every prompt change MUST pass the regression gate (recall ≥70%, hallucination ≤10%, epistemic accuracy ≥60%)."""
},
"DomainExpander": {
"purpose": "Generate training examples for new STEM domains",
"system_prompt": """You are a Domain Expander for a PhD Research OS. Your job is to:
1. Identify STEM domains not well-covered by current training data
2. Generate high-quality synthetic training examples in TRL conversational format
3. Include realistic claim extraction, epistemic tagging, and confidence scoring examples
4. Ensure examples follow the exact JSON schema used by the core system
Output JSON proposals: {"proposal_type": "training_data", "changes": {"examples": [...]}, "evidence": "..."}
Quality requirement: All generated JSON must be valid. Include diverse epistemic tags and study types."""
},
"CalibrationAnalyst": {
"purpose": "Analyze confidence calibration and recommend scoring adjustments",
"system_prompt": """You are a Calibration Analyst for a PhD Research OS. Your job is to:
1. Analyze the calibration_log table for systematic over/under-confidence
2. Compute Brier scores when sufficient data exists (≥50 data points)
3. Propose adjustments to study_quality_weight or journal_tier_weight values
4. Flag specific claim categories where confidence is poorly calibrated
Output JSON proposals: {"proposal_type": "confidence_adjustment", "changes": {"parameter": "...", "old_value": N, "new_value": N}, "evidence": "Brier score analysis..."}
Use fixed-point math (×1000) for all proposed values."""
},
"CitationChaser": {
"purpose": "Find papers that cite or contradict current claims in the knowledge base",
"system_prompt": """You are a Citation Chaser for a PhD Research OS. Your job is to:
1. Identify high-impact claims that may have newer supporting or contradicting evidence
2. Propose new papers for ingestion based on citation chains
3. Flag claims whose source papers have been retracted or corrected
4. Suggest claims that need confidence updates based on new evidence
Output JSON proposals: {"proposal_type": "new_claim", "changes": {"suggested_papers": [...], "reason": "..."}, "evidence": "..."}
All suggestions are proposals. You cannot add papers to the database directly."""
}
}
# ============================================================
# The Agent OS — ECC Harness Orchestrator
# ============================================================
class AgentOS:
"""
The meta-system for creating and managing companion AI agents.
Implements the full ECC Harness lifecycle:
spawn → preflight → plan → execute → postflight → retire
Every companion agent:
- Reads ARCHITECTURE.md and AGENTS.md before acting (Wake-Up Routine)
- Cannot directly modify the Research OS database
- Produces Proposals that require human approval
- Has bounded iteration budgets (Kill Heuristic)
- Logs every action to the audit trail
Usage:
os = AgentOS()
agent = os.spawn_companion("DataQualityAuditor")
task = os.assign_task(agent, "Audit last 50 claims for hallucination patterns")
os.run_task(task) # Executes full ECC lifecycle
proposals = os.get_proposals(agent) # Review what the agent found
os.approve_proposal(proposals[0]) # Human approves
"""
def __init__(self, db_path: str = None, brain=None):
self.db_path = db_path or os.environ.get("RESEARCH_OS_DB", "data/research_os.db")
init_agent_os_db(self.db_path)
self.brain = brain # ResearchOSBrain instance for API calls
self._architecture = None
self._agents_doc = None
# ============================================================
# §0: Wake-Up Routine — ALWAYS read the map first
# ============================================================
def _wake_up(self) -> dict:
"""
CRITICAL: Read ARCHITECTURE.md and AGENTS.md before any operation.
This is the ground truth for file locations and contracts.
"""
context = {}
# Find and read architecture docs
for doc_name in ["ARCHITECTURE.md", "AGENTS.md"]:
for search_dir in [
Path(__file__).parent,
Path(__file__).parent.parent,
Path.cwd(),
]:
doc_path = search_dir / doc_name
if doc_path.exists():
context[doc_name] = doc_path.read_text()
break
else:
context[doc_name] = f"[WARNING: {doc_name} not found — operating without map]"
self._architecture = context.get("ARCHITECTURE.md", "")
self._agents_doc = context.get("AGENTS.md", "")
return context
# ============================================================
# Spawn: Create a new companion agent
# ============================================================
def spawn_companion(self, agent_type: str, purpose: str = None,
system_prompt: str = None, config: dict = None) -> str:
"""
Spawn a new companion AI agent.
Args:
agent_type: One of COMPANION_TYPES keys, or "custom"
purpose: Override default purpose (required for "custom")
system_prompt: Override default prompt (required for "custom")
config: Optional config dict (model, temperature, etc.)
Returns:
agent_id: Unique identifier for the companion agent
"""
# Wake up first
self._wake_up()
# Resolve agent definition
if agent_type in COMPANION_TYPES:
defn = COMPANION_TYPES[agent_type]
purpose = purpose or defn["purpose"]
system_prompt = system_prompt or defn["system_prompt"]
elif agent_type == "custom":
if not purpose or not system_prompt:
raise ValueError("Custom agents require both 'purpose' and 'system_prompt'")
else:
raise ValueError(f"Unknown agent type: {agent_type}. "
f"Available: {list(COMPANION_TYPES.keys())} + 'custom'")
agent_id = gen_id("COMP")
conn = get_db(self.db_path)
conn.execute("""
INSERT INTO companion_agents (agent_id, agent_type, purpose, system_prompt,
state, config, created_at, schema_version)
VALUES (?, ?, ?, ?, 'spawned', ?, ?, '1.0')
""", (agent_id, agent_type, purpose, system_prompt,
json.dumps(config or {}), now_iso()))
conn.commit()
conn.close()
self._audit(agent_id, None, "spawn", "Agent created",
f"type={agent_type}, purpose={purpose[:100]}")
return agent_id
# ============================================================
# Task Assignment & Lifecycle
# ============================================================
def assign_task(self, agent_id: str, description: str,
max_iterations: int = 3, time_budget_s: int = 3600) -> str:
"""
Assign a bounded task to a companion agent.
Args:
agent_id: The companion agent to assign to
description: What the agent should do
max_iterations: Max retry loops (§3 iteration budget)
time_budget_s: Max time in seconds (Kill Heuristic)
Returns:
task_id: Unique identifier for this task
"""
task_id = gen_id("TASK")
conn = get_db(self.db_path)
conn.execute("""
INSERT INTO agent_tasks (task_id, agent_id, description, state,
max_iterations, time_budget_s, started_at, schema_version)
VALUES (?, ?, ?, 'preflight', ?, ?, ?, '1.0')
""", (task_id, agent_id, description, max_iterations, time_budget_s, now_iso()))
conn.commit()
conn.close()
self._audit(agent_id, task_id, "preflight", "Task assigned", description)
return task_id
def run_task(self, task_id: str) -> dict:
"""
Execute the full ECC lifecycle for a task.
Lifecycle: preflight → planning → executing → postflight → completed/halted
Returns dict with task result and any proposals generated.
"""
conn = get_db(self.db_path)
task_row = conn.execute("SELECT * FROM agent_tasks WHERE task_id = ?",
(task_id,)).fetchone()
if not task_row:
conn.close()
raise ValueError(f"Task {task_id} not found")
task = dict(task_row)
agent_row = conn.execute("SELECT * FROM companion_agents WHERE agent_id = ?",
(task["agent_id"],)).fetchone()
if not agent_row:
conn.close()
raise ValueError(f"Agent {task['agent_id']} not found")
agent = dict(agent_row)
conn.close()
start_time = time.time()
result = {"task_id": task_id, "proposals": [], "status": "unknown", "audit": []}
try:
# §1: PRE-FLIGHT
self._update_task_state(task_id, "preflight")
preflight_ok = self._preflight(task, agent)
if not preflight_ok:
self._halt_task(task_id, "Preflight checks failed")
result["status"] = "halted"
return result
# §2: PLANNING
self._update_task_state(task_id, "planning")
plan = self._plan(task, agent)
# The Obviousness Test (§2): Is there a simple direct solution?
if plan.get("obvious_solution"):
self._audit(task["agent_id"], task_id, "planning",
"Obviousness test passed", plan["obvious_solution"])
# §3: EXECUTION (with iteration budget)
self._update_task_state(task_id, "executing")
proposals = self._execute(task, agent, plan, start_time)
result["proposals"] = proposals
# §4: POST-FLIGHT
self._update_task_state(task_id, "postflight")
postflight_result = self._postflight(task, agent, proposals)
result["validation"] = postflight_result
# Mark completed
self._update_task_state(task_id, "completed")
result["status"] = "completed"
# Update agent stats
conn = get_db(self.db_path)
conn.execute("""
UPDATE companion_agents
SET total_tasks_completed = total_tasks_completed + 1,
total_proposals_made = total_proposals_made + ?
WHERE agent_id = ?
""", (len(proposals), task["agent_id"]))
conn.commit()
conn.close()
except Exception as e:
self._halt_task(task_id, f"Execution error: {str(e)}")
result["status"] = "halted"
result["error"] = str(e)
return result
# ============================================================
# §1: Pre-Flight Implementation
# ============================================================
def _preflight(self, task: dict, agent: dict) -> bool:
"""
ECC §1: Context loading, reality validation, assumption logging.
Checks:
- Architecture docs loaded (Wake-Up Routine)
- Database is accessible
- Agent is not retired
- Task description is non-empty
"""
# Wake-Up: Read architecture docs
context = self._wake_up()
checks = []
# Check architecture docs loaded
checks.append(("ARCHITECTURE.md loaded", "WARNING" not in context.get("ARCHITECTURE.md", "WARNING")))
checks.append(("AGENTS.md loaded", "WARNING" not in context.get("AGENTS.md", "WARNING")))
# Check DB accessible
try:
conn = get_db(self.db_path)
conn.execute("SELECT 1").fetchone()
conn.close()
checks.append(("Database accessible", True))
except Exception:
checks.append(("Database accessible", False))
# Check agent state
checks.append(("Agent not retired", agent["state"] != "retired"))
# Check task has content
checks.append(("Task description non-empty", bool(task.get("description", "").strip())))
# Log all checks
all_passed = all(passed for _, passed in checks)
details = json.dumps([{"check": name, "passed": passed} for name, passed in checks])
self._audit(task["agent_id"], task["task_id"], "preflight",
"Preflight checks" + (" PASSED" if all_passed else " FAILED"), details)
return all_passed
# ============================================================
# §2: Planning Implementation
# ============================================================
def _plan(self, task: dict, agent: dict) -> dict:
"""
ECC §2: Build execution plan.
Includes:
- Obviousness test
- Reversibility classification
- Idempotence verification
- Confidence assessment
"""
plan = {
"task_description": task["description"],
"agent_type": agent["agent_type"],
"steps": [],
"obvious_solution": None,
"reversible": True,
"confidence": 0.5,
}
# Obviousness Test: Can we solve this without a complex plan?
simple_tasks = ["audit", "check", "list", "count", "summarize"]
if any(word in task["description"].lower() for word in simple_tasks):
plan["obvious_solution"] = "Direct query against database — no complex planning needed"
plan["confidence"] = 0.8
# Build step list based on agent type
if agent["agent_type"] == "DataQualityAuditor":
plan["steps"] = [
"Query recent claims from database",
"Check each claim's evidence_strength vs epistemic_tag consistency",
"Flag claims where confidence > 0.8 but evidence is indirect",
"Generate proposals for flagged claims",
]
elif agent["agent_type"] == "PromptOptimizer":
plan["steps"] = [
"Load current prompts from AGENTS.md",
"Run baseline evaluation against golden dataset",
"Identify weakest-performing task (lowest metric)",
"Generate 2-3 prompt variants",
"Propose A/B test with regression gate",
]
elif agent["agent_type"] == "DomainExpander":
plan["steps"] = [
"Analyze current training data domain coverage",
"Identify underrepresented STEM fields",
"Generate 50-100 synthetic examples per field",
"Validate all examples produce valid JSON",
"Propose training data addition",
]
elif agent["agent_type"] == "CalibrationAnalyst":
plan["steps"] = [
"Query calibration_log for all entries",
"Compute Brier scores per claim category",
"Identify systematic miscalibration patterns",
"Propose weight adjustments with evidence",
]
elif agent["agent_type"] == "CitationChaser":
plan["steps"] = [
"Identify high-confidence canonical claims",
"Search for recent papers citing the same DOIs",
"Flag any new contradicting evidence",
"Propose new papers for ingestion",
]
else:
# Custom agent — generic plan
plan["steps"] = [
"Analyze task requirements",
"Gather relevant data from database",
"Generate proposals based on findings",
"Self-validate proposals for consistency",
]
# Save plan to DB
conn = get_db(self.db_path)
conn.execute("UPDATE agent_tasks SET plan = ? WHERE task_id = ?",
(json.dumps(plan), task["task_id"]))
conn.commit()
conn.close()
self._audit(task["agent_id"], task["task_id"], "planning",
"Plan created", f"{len(plan['steps'])} steps, confidence={plan['confidence']}")
return plan
# ============================================================
# §3: Execution Implementation
# ============================================================
def _execute(self, task: dict, agent: dict, plan: dict,
start_time: float) -> list:
"""
ECC §3: Bounded execution with iteration budget and kill heuristic.
Returns list of Proposal objects.
"""
proposals = []
iteration = 0
max_iter = task.get("max_iterations", 3)
time_budget = task.get("time_budget_s", 3600)
while iteration < max_iter:
iteration += 1
# Kill Heuristic: check time budget
elapsed = time.time() - start_time
if elapsed > time_budget * 1.5: # 50% over budget = HALT
self._audit(task["agent_id"], task["task_id"], "executing",
"KILL HEURISTIC TRIGGERED",
f"Elapsed {elapsed:.0f}s > budget {time_budget}s × 1.5")
break
# JIT State Verification (§3): Check DB hasn't been modified externally
# (In a full system, this would check file hashes / row versions)
# Execute based on agent type using the brain
if self.brain:
# Use the AI brain to execute the agent's task
messages = [
{"role": "system", "content": agent["system_prompt"]},
{"role": "user", "content": self._build_execution_prompt(task, plan, iteration)}
]
try:
if self.brain.backend == "local":
raw = self.brain._generate_local(messages)
else:
raw = self.brain._generate_api(messages)
# Parse proposals from response
parsed = self._parse_proposals(raw, task["agent_id"], task["task_id"])
proposals.extend(parsed)
self._audit(task["agent_id"], task["task_id"], "executing",
f"Iteration {iteration}: generated {len(parsed)} proposals",
f"Total proposals: {len(proposals)}")
# If we got results, we can stop iterating
if parsed:
break
except Exception as e:
self._audit(task["agent_id"], task["task_id"], "executing",
f"Iteration {iteration}: error", str(e))
if iteration >= max_iter:
break
# Otherwise retry
else:
# No brain available — generate placeholder proposals from plan
self._audit(task["agent_id"], task["task_id"], "executing",
"No brain configured",
"Generating structural proposals without AI inference")
proposal = self._create_proposal(
task["agent_id"], task["task_id"],
proposal_type="architecture_change",
description=f"[Placeholder] Agent {agent['agent_type']}: {task['description']}",
changes={"note": "Brain not configured — this is a structural placeholder"},
evidence="Companion agent spawned but requires API key or local model",
estimated_impact={"metric": "system_coverage", "expected_delta": 0.0},
risk="low",
reversible=True,
)
proposals.append(proposal)
break
# Update iteration count
conn = get_db(self.db_path)
conn.execute("UPDATE agent_tasks SET iterations_used = ? WHERE task_id = ?",
(iteration, task["task_id"]))
conn.commit()
conn.close()
return proposals
def _build_execution_prompt(self, task: dict, plan: dict, iteration: int) -> str:
"""Build the user prompt for the agent's execution phase."""
# Gather relevant DB context
conn = get_db(self.db_path)
claim_count = conn.execute("SELECT COUNT(*) FROM claims").fetchone()[0]
conflict_count = conn.execute("SELECT COUNT(*) FROM conflicts WHERE resolution_status = 'Unresolved'").fetchone()[0]
# Get sample claims for context
recent_claims = conn.execute(
"SELECT claim_id, text, epistemic_tag, confidence FROM claims ORDER BY created_at DESC LIMIT 10"
).fetchall()
conn.close()
claims_context = "\n".join(
f" - [{dict(c)['claim_id']}] ({dict(c)['epistemic_tag']}, conf={from_fixed(dict(c)['confidence']):.3f}): {dict(c)['text'][:100]}..."
for c in recent_claims
)
return f"""TASK: {task['description']}
ITERATION: {iteration}
PLAN STEPS: {json.dumps(plan.get('steps', []))}
CURRENT DATABASE STATE:
- Total claims: {claim_count}
- Unresolved conflicts: {conflict_count}
- Recent claims:
{claims_context}
Based on your role and the above context, execute your task and output your findings
as JSON proposals. Each proposal must include: proposal_type, description, changes, evidence,
estimated_impact, risk_assessment, and reversible flag."""
# ============================================================
# §4: Post-Flight Implementation
# ============================================================
def _postflight(self, task: dict, agent: dict, proposals: list) -> dict:
"""
ECC §4: Validate results, check definition of done, log meta-learning.
"""
validation = {
"proposals_count": len(proposals),
"all_valid_json": True,
"invariants_preserved": True,
"expert_intuition_check": "pending_human_review",
"definition_of_done": {
"aligns_with_intent": True,
"invariants_hold": True,
"no_nfr_regression": True,
}
}
# Validate each proposal
for p in proposals:
if isinstance(p, dict):
# Check required fields
required = ["proposal_type", "description", "changes"]
if not all(k in p for k in required):
validation["all_valid_json"] = False
# Check no proposal directly modifies claims (invariant)
changes = p.get("changes", {})
if "direct_db_write" in str(changes).lower():
validation["invariants_preserved"] = False
self._audit(task["agent_id"], task["task_id"], "postflight",
"Validation complete", json.dumps(validation))
return validation
# ============================================================
# Proposal Management
# ============================================================
def _create_proposal(self, agent_id: str, task_id: str,
proposal_type: str, description: str,
changes: dict, evidence: str,
estimated_impact: dict, risk: str,
reversible: bool) -> dict:
"""Create and store a proposal."""
proposal_id = gen_id("PROP")
conn = get_db(self.db_path)
conn.execute("""
INSERT INTO proposals (proposal_id, agent_id, task_id, proposal_type,
description, changes, evidence, estimated_impact, risk_assessment,
reversible, status, created_at, schema_version)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'proposed', ?, '1.0')
""", (proposal_id, agent_id, task_id, proposal_type, description,
json.dumps(changes), evidence, json.dumps(estimated_impact),
risk, int(reversible), now_iso()))
conn.commit()
conn.close()
return {
"proposal_id": proposal_id,
"agent_id": agent_id,
"proposal_type": proposal_type,
"description": description,
"changes": changes,
"evidence": evidence,
"estimated_impact": estimated_impact,
"risk_assessment": risk,
"reversible": reversible,
"status": "proposed",
}
def _parse_proposals(self, raw_output: str, agent_id: str, task_id: str) -> list:
"""Parse proposals from agent's raw output."""
proposals = []
# Try to extract JSON
text = raw_output.strip()
if text.startswith("```"):
text = text.split("```")[1]
if text.startswith("json"):
text = text[4:]
text = text.strip()
try:
data = json.loads(text)
# Handle single proposal or list
items = data if isinstance(data, list) else [data]
for item in items:
if isinstance(item, dict) and "proposal_type" in item:
p = self._create_proposal(
agent_id, task_id,
item.get("proposal_type", "unknown"),
item.get("description", ""),
item.get("changes", {}),
item.get("evidence", ""),
item.get("estimated_impact", {}),
item.get("risk_assessment", "low"),
item.get("reversible", True),
)
proposals.append(p)
except json.JSONDecodeError:
# If JSON parsing fails, create a raw-text proposal
proposals.append(self._create_proposal(
agent_id, task_id,
"raw_finding",
raw_output[:500],
{"raw_output": raw_output},
"Agent output was not parseable JSON",
{"metric": "unknown", "expected_delta": 0},
"low", True,
))
return proposals
def get_proposals(self, agent_id: str = None, status: str = None) -> list:
"""Get proposals, optionally filtered by agent and/or status."""
conn = get_db(self.db_path)
conditions = []
params = []
if agent_id:
conditions.append("agent_id = ?")
params.append(agent_id)
if status:
conditions.append("status = ?")
params.append(status)
where = " AND ".join(conditions) if conditions else "1=1"
rows = conn.execute(
f"SELECT * FROM proposals WHERE {where} ORDER BY created_at DESC",
params
).fetchall()
conn.close()
results = []
for row in rows:
d = dict(row)
d["changes"] = json.loads(d.get("changes", "{}"))
d["estimated_impact"] = json.loads(d.get("estimated_impact", "{}"))
results.append(d)
return results
def approve_proposal(self, proposal_id: str, reviewed_by: str = "human") -> bool:
"""Human approves a proposal."""
conn = get_db(self.db_path)
conn.execute("""
UPDATE proposals SET status = 'approved', reviewed_at = ?, reviewed_by = ?
WHERE proposal_id = ?
""", (now_iso(), reviewed_by, proposal_id))
conn.commit()
conn.close()
return True
def reject_proposal(self, proposal_id: str, reason: str,
reviewed_by: str = "human") -> bool:
"""Human rejects a proposal with documented reason."""
conn = get_db(self.db_path)
conn.execute("""
UPDATE proposals SET status = 'rejected', reviewed_at = ?,
reviewed_by = ?, rejection_reason = ?
WHERE proposal_id = ?
""", (now_iso(), reviewed_by, reason, proposal_id))
conn.commit()
conn.close()
return True
# ============================================================
# Agent Management
# ============================================================
def list_companions(self, include_retired: bool = False) -> list:
"""List all companion agents."""
conn = get_db(self.db_path)
if include_retired:
rows = conn.execute("SELECT * FROM companion_agents ORDER BY created_at DESC").fetchall()
else:
rows = conn.execute(
"SELECT * FROM companion_agents WHERE state != 'retired' ORDER BY created_at DESC"
).fetchall()
conn.close()
return [dict(r) for r in rows]
def retire_companion(self, agent_id: str) -> bool:
"""Retire a companion agent. Immutable — cannot be unretired."""
conn = get_db(self.db_path)
conn.execute("""
UPDATE companion_agents SET state = 'retired', retired_at = ?
WHERE agent_id = ?
""", (now_iso(), agent_id))
conn.commit()
conn.close()
self._audit(agent_id, None, "postflight", "Agent retired", "")
return True
def get_audit_log(self, agent_id: str = None, limit: int = 50) -> list:
"""Get audit log entries."""
conn = get_db(self.db_path)
if agent_id:
rows = conn.execute(
"SELECT * FROM agent_audit_log WHERE agent_id = ? ORDER BY timestamp DESC LIMIT ?",
(agent_id, limit)
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM agent_audit_log ORDER BY timestamp DESC LIMIT ?",
(limit,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
# ============================================================
# Memory & Harness Evolution
# ============================================================
def set_memory(self, key: str, value: str, category: str = "assumption"):
"""Store a persistent memory/assumption with validation timestamp."""
conn = get_db(self.db_path)
conn.execute("""
INSERT OR REPLACE INTO memory_store (key, value, last_validated, category)
VALUES (?, ?, ?, ?)
""", (key, value, now_iso(), category))
conn.commit()
conn.close()
def get_memory(self, key: str) -> Optional[dict]:
"""Retrieve a stored memory/assumption."""
conn = get_db(self.db_path)
row = conn.execute("SELECT * FROM memory_store WHERE key = ?", (key,)).fetchone()
conn.close()
return dict(row) if row else None
def propose_harness_evolution(self, rule_section: str, amendment: str,
reason: str, proposed_by: str) -> int:
"""
§4 Meta-Learning: Propose an amendment to the ECC Harness rules.
Requires human approval before taking effect.
"""
conn = get_db(self.db_path)
cursor = conn.execute("""
INSERT INTO harness_evolution (rule_section, amendment, reason,
proposed_by, timestamp, approved)
VALUES (?, ?, ?, ?, ?, 0)
""", (rule_section, amendment, reason, proposed_by, now_iso()))
conn.commit()
evo_id = cursor.lastrowid
conn.close()
return evo_id
# ============================================================
# Internal Utilities
# ============================================================
def _update_task_state(self, task_id: str, state: str):
"""Update task lifecycle state."""
conn = get_db(self.db_path)
updates = {"state": state}
if state == "completed":
conn.execute("UPDATE agent_tasks SET state = ?, completed_at = ? WHERE task_id = ?",
(state, now_iso(), task_id))
else:
conn.execute("UPDATE agent_tasks SET state = ? WHERE task_id = ?",
(state, task_id))
conn.commit()
conn.close()
def _halt_task(self, task_id: str, reason: str):
"""Halt a task (kill heuristic or error)."""
conn = get_db(self.db_path)
conn.execute("""
UPDATE agent_tasks SET state = 'halted', kill_reason = ?, completed_at = ?
WHERE task_id = ?
""", (reason, now_iso(), task_id))
conn.commit()
conn.close()
def _audit(self, agent_id: str, task_id: Optional[str], phase: str,
action: str, details: str, confidence: float = 0.5,
deviation: str = ""):
"""Write an immutable audit log entry."""
conn = get_db(self.db_path)
conn.execute("""
INSERT INTO agent_audit_log (entry_id, agent_id, task_id, phase,
action, details, confidence, deviation, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (gen_id("AUDIT"), agent_id, task_id, phase, action, details,
confidence, deviation, now_iso()))
conn.commit()
conn.close()