""" PhD Research OS — Meta-Improver AI ==================================== A continuously running self-improvement intelligence that: 1. Monitors the Research OS for quality degradation, drift, and opportunities 2. Scans external sources (papers, social media, repos) for improvement ideas 3. Proposes improvements to BOTH the Research OS AND to itself 4. Tracks improvement history and learns from what worked The Meta-Improver operates under the ECC Harness and produces Proposals that require human approval — it NEVER self-modifies without authorization. Architecture: MetaImprover ├── InternalMonitor — watches DB metrics, eval scores, error rates ├── ExternalScanner — searches papers, GitHub, social media for ideas ├── SelfReflector — analyzes own performance and proposes self-upgrades └── ImprovementEngine — synthesizes findings into ranked Proposals """ import json import os import time import hashlib from datetime import datetime, timezone from typing import Optional from dataclasses import dataclass, field, asdict from .db import get_db, init_db, now_iso, gen_id, to_fixed, from_fixed from .skills.registry import skill_registry, SUPERPOWERS_WORKFLOW_CONTEXT # ============================================================ # Meta-Improver Configuration # ============================================================ META_IMPROVER_VERSION = "1.0.0" # What the meta-improver monitors internally INTERNAL_MONITORS = { "extraction_quality": { "description": "Track claim extraction precision/recall over time", "metric": "hallucination_rate", "alert_threshold": 0.12, # Alert if hallucination > 12% "check_interval_hours": 24, }, "confidence_calibration": { "description": "Monitor Brier score for systematic miscalibration", "metric": "brier_score", "alert_threshold": 0.25, # Alert if Brier > 0.25 "check_interval_hours": 168, # Weekly }, "conflict_detection_rate": { "description": "Track false positive rate in conflict detection", "metric": "false_positive_rate", "alert_threshold": 0.30, "check_interval_hours": 168, }, "api_cost_efficiency": { "description": "Monitor cost per claim extracted", "metric": "cost_per_claim_usd", "alert_threshold": 0.10, # Alert if > $0.10/claim "check_interval_hours": 24, }, "prompt_drift": { "description": "Detect when prompt performance degrades vs baseline", "metric": "eval_score_delta", "alert_threshold": -0.05, # Alert if 5% below baseline "check_interval_hours": 168, }, "taxonomy_coverage": { "description": "Track study types not in taxonomy that appear in data", "metric": "unmapped_type_count", "alert_threshold": 5, "check_interval_hours": 168, }, "companion_agent_health": { "description": "Track companion agent task success rate", "metric": "task_success_rate", "alert_threshold": 0.70, # Alert if <70% tasks complete "check_interval_hours": 168, }, } # External sources to scan for improvement ideas EXTERNAL_SOURCES = { "arxiv_papers": { "description": "Search arXiv for papers on scientific NLP, claim extraction, epistemic classification", "queries": [ "scientific claim extraction language model", "epistemic classification scientific text", "contradiction detection scientific literature", "confidence calibration language model", "structured output language model fine-tuning", ], "scan_interval_hours": 168, # Weekly }, "huggingface_models": { "description": "Monitor HF Hub for new models suitable as Research OS brain", "queries": [ "scientific text extraction", "structured JSON output", "instruction-tuned 3B 7B", ], "scan_interval_hours": 168, }, "github_repos": { "description": "Monitor GitHub for tools/libraries that could improve the pipeline", "queries": [ "scientific information extraction", "pdf claim extraction", "epistemic tagging NLP", ], "scan_interval_hours": 336, # Bi-weekly }, "social_discourse": { "description": "Monitor discourse for best practices in research AI systems", "queries": [ "research AI assistant best practices", "scientific knowledge base design", "LLM structured output techniques", ], "scan_interval_hours": 168, }, } # ============================================================ # Database Extension # ============================================================ def init_meta_improver_db(db_path: str = None): """Add meta-improver tables to the database.""" init_db(db_path) conn = get_db(db_path) conn.executescript(""" CREATE TABLE IF NOT EXISTS meta_monitor_state ( monitor_name TEXT PRIMARY KEY, last_value REAL, last_checked TEXT, alert_active INTEGER DEFAULT 0, trend TEXT, -- JSON: recent values for trend analysis baseline_value REAL, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS external_scan_results ( scan_id TEXT PRIMARY KEY, source_type TEXT NOT NULL, query TEXT NOT NULL, results TEXT NOT NULL, -- JSON: search results improvement_ideas TEXT, -- JSON: extracted ideas scanned_at TEXT NOT NULL, processed INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS improvement_history ( improvement_id TEXT PRIMARY KEY, category TEXT NOT NULL, -- internal_monitor, external_scan, self_reflection description TEXT NOT NULL, proposal_id TEXT, -- Links to proposals table impact_measured TEXT, -- JSON: before/after metrics status TEXT DEFAULT 'proposed', -- proposed, applied, measured, reverted created_at TEXT NOT NULL, applied_at TEXT, measured_at TEXT ); CREATE TABLE IF NOT EXISTS self_reflection_log ( reflection_id TEXT PRIMARY KEY, trigger TEXT NOT NULL, -- what triggered this reflection findings TEXT NOT NULL, -- JSON: what was discovered self_improvement_proposal TEXT, -- JSON: how to improve the meta-improver itself confidence REAL, created_at TEXT NOT NULL ); -- Ensure proposals table exists (normally created by agent_os, but meta-improver reads it) CREATE TABLE IF NOT EXISTS proposals ( proposal_id TEXT PRIMARY KEY, agent_id TEXT NOT NULL, task_id TEXT, proposal_type TEXT NOT NULL, description TEXT NOT NULL, changes TEXT NOT NULL, evidence TEXT, estimated_impact TEXT, risk_assessment TEXT DEFAULT 'low', reversible INTEGER DEFAULT 1, status TEXT DEFAULT 'proposed', created_at TEXT NOT NULL, reviewed_at TEXT, reviewed_by TEXT, rejection_reason TEXT, schema_version TEXT NOT NULL DEFAULT '1.0' ); """) conn.commit() conn.close() # ============================================================ # Internal Monitor # ============================================================ class InternalMonitor: """ Watches Research OS database metrics for quality degradation. Runs checks based on configured intervals and thresholds. """ def __init__(self, db_path: str): self.db_path = db_path def run_all_checks(self) -> list: """Run all internal monitoring checks. Returns list of alerts.""" alerts = [] conn = get_db(self.db_path) # Check 1: Hallucination rate proxy (claims with very low evidence strength) total = conn.execute("SELECT COUNT(*) FROM claims").fetchone()[0] if total > 0: low_evidence = conn.execute( "SELECT COUNT(*) FROM claims WHERE evidence_strength IS NOT NULL AND evidence_strength < 200" ).fetchone()[0] rate = low_evidence / total self._update_monitor(conn, "extraction_quality", rate) if rate > INTERNAL_MONITORS["extraction_quality"]["alert_threshold"]: alerts.append({ "monitor": "extraction_quality", "message": f"Low-evidence claim rate at {rate:.1%} (threshold: {INTERNAL_MONITORS['extraction_quality']['alert_threshold']:.0%})", "severity": "high" if rate > 0.20 else "medium", "current_value": rate, }) # Check 2: Confidence calibration (if calibration data exists) cal_count = conn.execute("SELECT COUNT(*) FROM calibration_log").fetchone()[0] if cal_count >= 20: alerts.append({ "monitor": "confidence_calibration", "message": f"Calibration data available ({cal_count} points). Brier score analysis recommended.", "severity": "info", "current_value": cal_count, }) # Check 3: API cost tracking cost_row = conn.execute(""" SELECT SUM(cost_usd) as total_cost, COUNT(DISTINCT task_type) as task_types FROM api_usage_log WHERE timestamp >= datetime('now', '-7 days') """).fetchone() if cost_row and cost_row[0]: weekly_cost = cost_row[0] if weekly_cost > 15.0: # $15/week threshold alerts.append({ "monitor": "api_cost_efficiency", "message": f"Weekly API cost: ${weekly_cost:.2f} (threshold: $15.00)", "severity": "medium", "current_value": weekly_cost, }) # Check 4: Unresolved conflicts accumulation unresolved = conn.execute( "SELECT COUNT(*) FROM conflicts WHERE resolution_status = 'Unresolved'" ).fetchone()[0] if unresolved > 20: alerts.append({ "monitor": "conflict_detection_rate", "message": f"{unresolved} unresolved conflicts accumulating. Review recommended.", "severity": "medium", "current_value": unresolved, }) # Check 5: Incomplete claims ratio if total > 0: incomplete = conn.execute( "SELECT COUNT(*) FROM claims WHERE status = 'Incomplete'" ).fetchone()[0] incomplete_rate = incomplete / total if incomplete_rate > 0.30: alerts.append({ "monitor": "extraction_quality", "message": f"Incomplete claim rate: {incomplete_rate:.1%}. May indicate extraction issues.", "severity": "low", "current_value": incomplete_rate, }) # Check 6: Taxonomy coverage unmapped = conn.execute(""" SELECT DISTINCT study_type FROM claims WHERE study_type IS NOT NULL AND study_type NOT IN ('in_vivo', 'direct_physical_measurement', 'mathematical_proof', 'in_vitro', 'first_principles_simulation', 'phenomenological_simulation', 'review', 'perspective', 'primary_experimental', 'simulation', 'review_non_systematic') """).fetchall() if len(unmapped) > 0: types = [dict(r)["study_type"] for r in unmapped] alerts.append({ "monitor": "taxonomy_coverage", "message": f"Found {len(types)} unmapped study types: {types[:5]}", "severity": "low", "current_value": len(types), }) conn.close() return alerts def _update_monitor(self, conn, name: str, value: float): """Update monitor state with latest value.""" existing = conn.execute( "SELECT trend FROM meta_monitor_state WHERE monitor_name = ?", (name,) ).fetchone() if existing: trend = json.loads(existing[0] or "[]") trend.append({"value": value, "timestamp": now_iso()}) trend = trend[-50:] # Keep last 50 data points conn.execute(""" UPDATE meta_monitor_state SET last_value = ?, last_checked = ?, trend = ? WHERE monitor_name = ? """, (value, now_iso(), json.dumps(trend), name)) else: conn.execute(""" INSERT INTO meta_monitor_state (monitor_name, last_value, last_checked, trend, baseline_value, created_at) VALUES (?, ?, ?, ?, ?, ?) """, (name, value, now_iso(), json.dumps([{"value": value, "timestamp": now_iso()}]), value, now_iso())) conn.commit() # ============================================================ # External Scanner # ============================================================ class ExternalScanner: """ Scans external sources for improvement ideas. Uses the Research OS Brain (API) to search and analyze findings. Sources: arXiv papers, HF Hub models, GitHub repos, social discourse. """ def __init__(self, db_path: str, brain=None): self.db_path = db_path self.brain = brain def scan_all_sources(self) -> list: """ Scan all configured external sources for improvement ideas. Returns list of scan results with extracted ideas. """ results = [] conn = get_db(self.db_path) for source_type, config in EXTERNAL_SOURCES.items(): for query in config["queries"]: # Check if we scanned this recently recent = conn.execute(""" SELECT 1 FROM external_scan_results WHERE source_type = ? AND query = ? AND scanned_at >= datetime('now', ?) """, (source_type, query, f"-{config['scan_interval_hours']} hours")).fetchone() if recent: continue # Perform the scan (using brain if available, otherwise generate structured placeholder) scan_result = self._scan_source(source_type, query) scan_id = gen_id("SCAN") conn.execute(""" INSERT INTO external_scan_results (scan_id, source_type, query, results, improvement_ideas, scanned_at) VALUES (?, ?, ?, ?, ?, ?) """, (scan_id, source_type, query, json.dumps(scan_result.get("results", [])), json.dumps(scan_result.get("ideas", [])), now_iso())) results.append({ "scan_id": scan_id, "source": source_type, "query": query, "results_count": len(scan_result.get("results", [])), "ideas_count": len(scan_result.get("ideas", [])), }) conn.commit() conn.close() return results def _scan_source(self, source_type: str, query: str) -> dict: """Scan a single source. Returns results and extracted ideas.""" if self.brain: return self._scan_with_brain(source_type, query) else: return self._generate_scan_template(source_type, query) def _scan_with_brain(self, source_type: str, query: str) -> dict: """Use the AI brain to analyze a query and generate improvement ideas.""" prompt = f"""You are the External Intelligence Scanner for a PhD Research OS. Search context: {source_type} Query: "{query}" Based on your knowledge, identify: 1. Recent developments (papers, tools, models) relevant to this query 2. Specific improvement ideas for a system that: - Extracts scientific claims from papers - Classifies claims as Fact/Interpretation/Hypothesis/Conflict - Scores confidence using evidence × quality × tier × completeness - Detects contradictions between claims - Uses Qwen2.5-3B fine-tuned with QLoRA Output JSON: {{ "results": [{{"title": "...", "source": "...", "relevance": "high|medium|low", "summary": "..."}}], "ideas": [{{"idea": "...", "expected_impact": "...", "effort": "low|medium|high", "category": "model|data|pipeline|taxonomy|architecture"}}] }}""" try: messages = [ {"role": "system", "content": "You are a research intelligence scanner. Output valid JSON only."}, {"role": "user", "content": prompt} ] if self.brain.backend == "local": raw = self.brain._generate_local(messages) else: raw = self.brain._generate_api(messages) text = raw.strip() if text.startswith("```"): text = text.split("```")[1] if text.startswith("json"): text = text[4:] text = text.strip() return json.loads(text) except Exception: return self._generate_scan_template(source_type, query) def _generate_scan_template(self, source_type: str, query: str) -> dict: """Generate a structured template when no brain is available.""" return { "results": [{ "title": f"[Placeholder] Scan for: {query}", "source": source_type, "relevance": "medium", "summary": "Brain not configured — manual scan recommended" }], "ideas": [{ "idea": f"Investigate: {query}", "expected_impact": "Unknown — requires manual evaluation", "effort": "medium", "category": "research" }] } def get_unprocessed_ideas(self) -> list: """Get all improvement ideas not yet converted to proposals.""" conn = get_db(self.db_path) rows = conn.execute(""" SELECT scan_id, source_type, query, improvement_ideas, scanned_at FROM external_scan_results WHERE processed = 0 ORDER BY scanned_at DESC """).fetchall() conn.close() ideas = [] for row in rows: d = dict(row) d["improvement_ideas"] = json.loads(d.get("improvement_ideas", "[]")) ideas.append(d) return ideas # ============================================================ # Self-Reflector # ============================================================ class SelfReflector: """ Analyzes the Meta-Improver's own performance and proposes self-upgrades. Monitors: - Which improvement proposals were accepted vs rejected (learn preferences) - Which monitoring alerts were actionable vs noise (tune thresholds) - Which external scans produced valuable ideas (focus searches) - Overall system improvement trajectory (are we getting better?) """ def __init__(self, db_path: str): self.db_path = db_path def reflect(self) -> dict: """ Run a self-reflection cycle. Returns findings and self-improvement proposals. """ conn = get_db(self.db_path) findings = {} # 1. Proposal acceptance rate total_proposals = conn.execute("SELECT COUNT(*) FROM proposals").fetchone()[0] approved = conn.execute("SELECT COUNT(*) FROM proposals WHERE status = 'approved'").fetchone()[0] rejected = conn.execute("SELECT COUNT(*) FROM proposals WHERE status = 'rejected'").fetchone()[0] if total_proposals > 0: acceptance_rate = approved / total_proposals findings["proposal_acceptance_rate"] = { "total": total_proposals, "approved": approved, "rejected": rejected, "rate": acceptance_rate, "insight": ( "Low acceptance rate — proposals may be too aggressive or poorly targeted" if acceptance_rate < 0.3 and total_proposals > 10 else "Acceptance rate healthy" if acceptance_rate > 0.5 else "Insufficient data for trend" ) } # 2. Rejection reasons analysis rejections = conn.execute(""" SELECT rejection_reason FROM proposals WHERE status = 'rejected' AND rejection_reason != '' """).fetchall() if rejections: reasons = [dict(r)["rejection_reason"] for r in rejections] findings["rejection_patterns"] = { "total_rejections": len(reasons), "sample_reasons": reasons[:5], "insight": "Analyze rejection reasons to avoid proposing similar changes" } # 3. External scan yield total_scans = conn.execute("SELECT COUNT(*) FROM external_scan_results").fetchone()[0] processed = conn.execute("SELECT COUNT(*) FROM external_scan_results WHERE processed = 1").fetchone()[0] findings["external_scan_yield"] = { "total_scans": total_scans, "processed": processed, "insight": "Track which source types produce the most actionable ideas" } # 4. Improvement trajectory improvements = conn.execute(""" SELECT status, COUNT(*) as cnt FROM improvement_history GROUP BY status """).fetchall() findings["improvement_trajectory"] = { status_counts[0]: status_counts[1] for status_counts in [dict(r).values() for r in improvements] } if improvements else {"no_data": True} # 5. Self-improvement proposals self_proposals = [] if findings.get("proposal_acceptance_rate", {}).get("rate", 1.0) < 0.3: self_proposals.append({ "target": "proposal_generation", "change": "Increase evidence requirements before generating proposals", "reason": "Low acceptance rate suggests proposals are insufficiently grounded" }) if total_scans > 20 and processed < total_scans * 0.5: self_proposals.append({ "target": "external_scanning", "change": "Reduce scan frequency or focus on higher-yield source types", "reason": "Many scans unprocessed — scanning faster than consumption" }) # Log reflection reflection_id = gen_id("REFL") conn.execute(""" INSERT INTO self_reflection_log (reflection_id, trigger, findings, self_improvement_proposal, confidence, created_at) VALUES (?, ?, ?, ?, ?, ?) """, (reflection_id, "scheduled_reflection", json.dumps(findings), json.dumps(self_proposals), 0.6, now_iso())) conn.commit() conn.close() return { "reflection_id": reflection_id, "findings": findings, "self_improvement_proposals": self_proposals, } # ============================================================ # Meta-Improver — The Unified Intelligence # ============================================================ class MetaImprover: """ The Meta-Improver AI: continuously monitors, scans, reflects, and proposes improvements. This is permanently baked into the companion AI system. It: 1. Runs internal quality monitors on every check cycle 2. Scans external sources weekly for new papers, models, tools 3. Reflects on its own performance monthly 4. Produces ranked improvement proposals for human review 5. Tracks what improvements actually worked (closed-loop learning) All improvements go through the ECC Harness proposal system. The Meta-Improver NEVER self-modifies without human approval. """ def __init__(self, db_path: str = None, brain=None): self.db_path = db_path or os.environ.get("RESEARCH_OS_DB", "data/research_os.db") init_meta_improver_db(self.db_path) self.brain = brain self.monitor = InternalMonitor(self.db_path) self.scanner = ExternalScanner(self.db_path, brain) self.reflector = SelfReflector(self.db_path) def run_improvement_cycle(self) -> dict: """ Run a full improvement cycle: 1. Internal monitoring → alerts 2. External scanning → ideas 3. Self-reflection → meta-proposals 4. Synthesis → ranked improvement proposals Returns a comprehensive report. """ report = { "timestamp": now_iso(), "version": META_IMPROVER_VERSION, "alerts": [], "scan_results": [], "reflection": {}, "improvement_proposals": [], } # Phase 1: Internal monitoring print("MetaImprover: Running internal monitors...") report["alerts"] = self.monitor.run_all_checks() # Phase 2: External scanning print("MetaImprover: Scanning external sources...") report["scan_results"] = self.scanner.scan_all_sources() # Phase 3: Self-reflection print("MetaImprover: Running self-reflection...") report["reflection"] = self.reflector.reflect() # Phase 4: Synthesize improvement proposals print("MetaImprover: Synthesizing proposals...") report["improvement_proposals"] = self._synthesize_proposals(report) return report def _synthesize_proposals(self, report: dict) -> list: """ Synthesize findings from all sources into ranked improvement proposals. """ proposals = [] conn = get_db(self.db_path) # From alerts → improvement proposals for alert in report.get("alerts", []): if alert.get("severity") in ["high", "medium"]: imp_id = gen_id("IMP") proposals.append({ "improvement_id": imp_id, "category": "internal_monitor", "source": alert["monitor"], "description": f"Address: {alert['message']}", "priority": "high" if alert["severity"] == "high" else "medium", "suggested_action": self._suggest_action_for_alert(alert), }) conn.execute(""" INSERT INTO improvement_history (improvement_id, category, description, created_at) VALUES (?, 'internal_monitor', ?, ?) """, (imp_id, alert["message"], now_iso())) # From external scans → improvement proposals unprocessed = self.scanner.get_unprocessed_ideas() for scan in unprocessed[:5]: # Process top 5 for idea in scan.get("improvement_ideas", [])[:3]: # Top 3 ideas per scan if isinstance(idea, dict): imp_id = gen_id("IMP") proposals.append({ "improvement_id": imp_id, "category": "external_scan", "source": f"{scan.get('source_type', 'unknown')}: {scan.get('query', '')}", "description": idea.get("idea", ""), "priority": "medium", "expected_impact": idea.get("expected_impact", "unknown"), "effort": idea.get("effort", "medium"), }) # From self-reflection → meta-proposals for sp in report.get("reflection", {}).get("self_improvement_proposals", []): imp_id = gen_id("IMP") proposals.append({ "improvement_id": imp_id, "category": "self_reflection", "source": "meta_improver_self_analysis", "description": f"Self-improve: {sp.get('change', '')} (reason: {sp.get('reason', '')})", "priority": "low", "target": sp.get("target", "unknown"), }) conn.commit() conn.close() # Rank by priority priority_order = {"high": 0, "medium": 1, "low": 2} proposals.sort(key=lambda x: priority_order.get(x.get("priority", "low"), 99)) return proposals def _suggest_action_for_alert(self, alert: dict) -> str: """Suggest a concrete action based on an internal monitoring alert.""" actions = { "extraction_quality": "Run evaluation harness against golden dataset. If degraded, check recent prompt changes and revert if needed.", "confidence_calibration": "Compute Brier score from calibration_log. If overconfident, reduce study_quality_weights by 5%.", "conflict_detection_rate": "Review last 20 detected conflicts manually. If >30% false positives, tighten keyword overlap threshold.", "api_cost_efficiency": "Check which task types consume most tokens. Enable semantic caching for repeat queries.", "prompt_drift": "Run regression gate. Compare current metrics to Phase 2 baseline. Revert prompt if degraded.", "taxonomy_coverage": "Add unmapped study types to domain taxonomy or create alias mappings.", "companion_agent_health": "Check failed tasks for common errors. Increase iteration budgets if tasks are timing out.", } return actions.get(alert.get("monitor", ""), "Investigate the alert and determine appropriate response.") def get_improvement_history(self, limit: int = 20) -> list: """Get improvement history with status.""" conn = get_db(self.db_path) rows = conn.execute(""" SELECT * FROM improvement_history ORDER BY created_at DESC LIMIT ? """, (limit,)).fetchall() conn.close() return [dict(r) for r in rows] def get_self_reflections(self, limit: int = 10) -> list: """Get self-reflection history.""" conn = get_db(self.db_path) rows = conn.execute(""" SELECT * FROM self_reflection_log ORDER BY created_at DESC LIMIT ? """, (limit,)).fetchall() conn.close() results = [] for r in rows: d = dict(r) d["findings"] = json.loads(d.get("findings", "{}")) d["self_improvement_proposal"] = json.loads(d.get("self_improvement_proposal", "[]")) results.append(d) return results def mark_improvement_applied(self, improvement_id: str, proposal_id: str = None): """Mark an improvement as applied (with optional link to proposal).""" conn = get_db(self.db_path) conn.execute(""" UPDATE improvement_history SET status = 'applied', proposal_id = ?, applied_at = ? WHERE improvement_id = ? """, (proposal_id, now_iso(), improvement_id)) conn.commit() conn.close() def mark_improvement_measured(self, improvement_id: str, impact: dict): """Record the measured impact of an applied improvement.""" conn = get_db(self.db_path) conn.execute(""" UPDATE improvement_history SET status = 'measured', impact_measured = ?, measured_at = ? WHERE improvement_id = ? """, (json.dumps(impact), now_iso(), improvement_id)) conn.commit() conn.close()