nkshirsa's picture
Add Superpowers Skill Tree + Meta-Improver: phd_research_os/meta_improver.py
3044b2e verified
"""
PhD Research OS — Meta-Improver AI
====================================
A continuously running self-improvement intelligence that:
1. Monitors the Research OS for quality degradation, drift, and opportunities
2. Scans external sources (papers, social media, repos) for improvement ideas
3. Proposes improvements to BOTH the Research OS AND to itself
4. Tracks improvement history and learns from what worked
The Meta-Improver operates under the ECC Harness and produces Proposals
that require human approval — it NEVER self-modifies without authorization.
Architecture:
MetaImprover
├── InternalMonitor — watches DB metrics, eval scores, error rates
├── ExternalScanner — searches papers, GitHub, social media for ideas
├── SelfReflector — analyzes own performance and proposes self-upgrades
└── ImprovementEngine — synthesizes findings into ranked Proposals
"""
import json
import os
import time
import hashlib
from datetime import datetime, timezone
from typing import Optional
from dataclasses import dataclass, field, asdict
from .db import get_db, init_db, now_iso, gen_id, to_fixed, from_fixed
from .skills.registry import skill_registry, SUPERPOWERS_WORKFLOW_CONTEXT
# ============================================================
# Meta-Improver Configuration
# ============================================================
META_IMPROVER_VERSION = "1.0.0"
# What the meta-improver monitors internally
INTERNAL_MONITORS = {
"extraction_quality": {
"description": "Track claim extraction precision/recall over time",
"metric": "hallucination_rate",
"alert_threshold": 0.12, # Alert if hallucination > 12%
"check_interval_hours": 24,
},
"confidence_calibration": {
"description": "Monitor Brier score for systematic miscalibration",
"metric": "brier_score",
"alert_threshold": 0.25, # Alert if Brier > 0.25
"check_interval_hours": 168, # Weekly
},
"conflict_detection_rate": {
"description": "Track false positive rate in conflict detection",
"metric": "false_positive_rate",
"alert_threshold": 0.30,
"check_interval_hours": 168,
},
"api_cost_efficiency": {
"description": "Monitor cost per claim extracted",
"metric": "cost_per_claim_usd",
"alert_threshold": 0.10, # Alert if > $0.10/claim
"check_interval_hours": 24,
},
"prompt_drift": {
"description": "Detect when prompt performance degrades vs baseline",
"metric": "eval_score_delta",
"alert_threshold": -0.05, # Alert if 5% below baseline
"check_interval_hours": 168,
},
"taxonomy_coverage": {
"description": "Track study types not in taxonomy that appear in data",
"metric": "unmapped_type_count",
"alert_threshold": 5,
"check_interval_hours": 168,
},
"companion_agent_health": {
"description": "Track companion agent task success rate",
"metric": "task_success_rate",
"alert_threshold": 0.70, # Alert if <70% tasks complete
"check_interval_hours": 168,
},
}
# External sources to scan for improvement ideas
EXTERNAL_SOURCES = {
"arxiv_papers": {
"description": "Search arXiv for papers on scientific NLP, claim extraction, epistemic classification",
"queries": [
"scientific claim extraction language model",
"epistemic classification scientific text",
"contradiction detection scientific literature",
"confidence calibration language model",
"structured output language model fine-tuning",
],
"scan_interval_hours": 168, # Weekly
},
"huggingface_models": {
"description": "Monitor HF Hub for new models suitable as Research OS brain",
"queries": [
"scientific text extraction",
"structured JSON output",
"instruction-tuned 3B 7B",
],
"scan_interval_hours": 168,
},
"github_repos": {
"description": "Monitor GitHub for tools/libraries that could improve the pipeline",
"queries": [
"scientific information extraction",
"pdf claim extraction",
"epistemic tagging NLP",
],
"scan_interval_hours": 336, # Bi-weekly
},
"social_discourse": {
"description": "Monitor discourse for best practices in research AI systems",
"queries": [
"research AI assistant best practices",
"scientific knowledge base design",
"LLM structured output techniques",
],
"scan_interval_hours": 168,
},
}
# ============================================================
# Database Extension
# ============================================================
def init_meta_improver_db(db_path: str = None):
"""Add meta-improver tables to the database."""
init_db(db_path)
conn = get_db(db_path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS meta_monitor_state (
monitor_name TEXT PRIMARY KEY,
last_value REAL,
last_checked TEXT,
alert_active INTEGER DEFAULT 0,
trend TEXT, -- JSON: recent values for trend analysis
baseline_value REAL,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS external_scan_results (
scan_id TEXT PRIMARY KEY,
source_type TEXT NOT NULL,
query TEXT NOT NULL,
results TEXT NOT NULL, -- JSON: search results
improvement_ideas TEXT, -- JSON: extracted ideas
scanned_at TEXT NOT NULL,
processed INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS improvement_history (
improvement_id TEXT PRIMARY KEY,
category TEXT NOT NULL, -- internal_monitor, external_scan, self_reflection
description TEXT NOT NULL,
proposal_id TEXT, -- Links to proposals table
impact_measured TEXT, -- JSON: before/after metrics
status TEXT DEFAULT 'proposed', -- proposed, applied, measured, reverted
created_at TEXT NOT NULL,
applied_at TEXT,
measured_at TEXT
);
CREATE TABLE IF NOT EXISTS self_reflection_log (
reflection_id TEXT PRIMARY KEY,
trigger TEXT NOT NULL, -- what triggered this reflection
findings TEXT NOT NULL, -- JSON: what was discovered
self_improvement_proposal TEXT, -- JSON: how to improve the meta-improver itself
confidence REAL,
created_at TEXT NOT NULL
);
-- Ensure proposals table exists (normally created by agent_os, but meta-improver reads it)
CREATE TABLE IF NOT EXISTS proposals (
proposal_id TEXT PRIMARY KEY,
agent_id TEXT NOT NULL,
task_id TEXT,
proposal_type TEXT NOT NULL,
description TEXT NOT NULL,
changes TEXT NOT NULL,
evidence TEXT,
estimated_impact TEXT,
risk_assessment TEXT DEFAULT 'low',
reversible INTEGER DEFAULT 1,
status TEXT DEFAULT 'proposed',
created_at TEXT NOT NULL,
reviewed_at TEXT,
reviewed_by TEXT,
rejection_reason TEXT,
schema_version TEXT NOT NULL DEFAULT '1.0'
);
""")
conn.commit()
conn.close()
# ============================================================
# Internal Monitor
# ============================================================
class InternalMonitor:
"""
Watches Research OS database metrics for quality degradation.
Runs checks based on configured intervals and thresholds.
"""
def __init__(self, db_path: str):
self.db_path = db_path
def run_all_checks(self) -> list:
"""Run all internal monitoring checks. Returns list of alerts."""
alerts = []
conn = get_db(self.db_path)
# Check 1: Hallucination rate proxy (claims with very low evidence strength)
total = conn.execute("SELECT COUNT(*) FROM claims").fetchone()[0]
if total > 0:
low_evidence = conn.execute(
"SELECT COUNT(*) FROM claims WHERE evidence_strength IS NOT NULL AND evidence_strength < 200"
).fetchone()[0]
rate = low_evidence / total
self._update_monitor(conn, "extraction_quality", rate)
if rate > INTERNAL_MONITORS["extraction_quality"]["alert_threshold"]:
alerts.append({
"monitor": "extraction_quality",
"message": f"Low-evidence claim rate at {rate:.1%} (threshold: {INTERNAL_MONITORS['extraction_quality']['alert_threshold']:.0%})",
"severity": "high" if rate > 0.20 else "medium",
"current_value": rate,
})
# Check 2: Confidence calibration (if calibration data exists)
cal_count = conn.execute("SELECT COUNT(*) FROM calibration_log").fetchone()[0]
if cal_count >= 20:
alerts.append({
"monitor": "confidence_calibration",
"message": f"Calibration data available ({cal_count} points). Brier score analysis recommended.",
"severity": "info",
"current_value": cal_count,
})
# Check 3: API cost tracking
cost_row = conn.execute("""
SELECT SUM(cost_usd) as total_cost, COUNT(DISTINCT task_type) as task_types
FROM api_usage_log WHERE timestamp >= datetime('now', '-7 days')
""").fetchone()
if cost_row and cost_row[0]:
weekly_cost = cost_row[0]
if weekly_cost > 15.0: # $15/week threshold
alerts.append({
"monitor": "api_cost_efficiency",
"message": f"Weekly API cost: ${weekly_cost:.2f} (threshold: $15.00)",
"severity": "medium",
"current_value": weekly_cost,
})
# Check 4: Unresolved conflicts accumulation
unresolved = conn.execute(
"SELECT COUNT(*) FROM conflicts WHERE resolution_status = 'Unresolved'"
).fetchone()[0]
if unresolved > 20:
alerts.append({
"monitor": "conflict_detection_rate",
"message": f"{unresolved} unresolved conflicts accumulating. Review recommended.",
"severity": "medium",
"current_value": unresolved,
})
# Check 5: Incomplete claims ratio
if total > 0:
incomplete = conn.execute(
"SELECT COUNT(*) FROM claims WHERE status = 'Incomplete'"
).fetchone()[0]
incomplete_rate = incomplete / total
if incomplete_rate > 0.30:
alerts.append({
"monitor": "extraction_quality",
"message": f"Incomplete claim rate: {incomplete_rate:.1%}. May indicate extraction issues.",
"severity": "low",
"current_value": incomplete_rate,
})
# Check 6: Taxonomy coverage
unmapped = conn.execute("""
SELECT DISTINCT study_type FROM claims
WHERE study_type IS NOT NULL
AND study_type NOT IN ('in_vivo', 'direct_physical_measurement', 'mathematical_proof',
'in_vitro', 'first_principles_simulation', 'phenomenological_simulation',
'review', 'perspective', 'primary_experimental', 'simulation', 'review_non_systematic')
""").fetchall()
if len(unmapped) > 0:
types = [dict(r)["study_type"] for r in unmapped]
alerts.append({
"monitor": "taxonomy_coverage",
"message": f"Found {len(types)} unmapped study types: {types[:5]}",
"severity": "low",
"current_value": len(types),
})
conn.close()
return alerts
def _update_monitor(self, conn, name: str, value: float):
"""Update monitor state with latest value."""
existing = conn.execute(
"SELECT trend FROM meta_monitor_state WHERE monitor_name = ?", (name,)
).fetchone()
if existing:
trend = json.loads(existing[0] or "[]")
trend.append({"value": value, "timestamp": now_iso()})
trend = trend[-50:] # Keep last 50 data points
conn.execute("""
UPDATE meta_monitor_state SET last_value = ?, last_checked = ?, trend = ?
WHERE monitor_name = ?
""", (value, now_iso(), json.dumps(trend), name))
else:
conn.execute("""
INSERT INTO meta_monitor_state (monitor_name, last_value, last_checked,
trend, baseline_value, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (name, value, now_iso(), json.dumps([{"value": value, "timestamp": now_iso()}]),
value, now_iso()))
conn.commit()
# ============================================================
# External Scanner
# ============================================================
class ExternalScanner:
"""
Scans external sources for improvement ideas.
Uses the Research OS Brain (API) to search and analyze findings.
Sources: arXiv papers, HF Hub models, GitHub repos, social discourse.
"""
def __init__(self, db_path: str, brain=None):
self.db_path = db_path
self.brain = brain
def scan_all_sources(self) -> list:
"""
Scan all configured external sources for improvement ideas.
Returns list of scan results with extracted ideas.
"""
results = []
conn = get_db(self.db_path)
for source_type, config in EXTERNAL_SOURCES.items():
for query in config["queries"]:
# Check if we scanned this recently
recent = conn.execute("""
SELECT 1 FROM external_scan_results
WHERE source_type = ? AND query = ?
AND scanned_at >= datetime('now', ?)
""", (source_type, query, f"-{config['scan_interval_hours']} hours")).fetchone()
if recent:
continue
# Perform the scan (using brain if available, otherwise generate structured placeholder)
scan_result = self._scan_source(source_type, query)
scan_id = gen_id("SCAN")
conn.execute("""
INSERT INTO external_scan_results (scan_id, source_type, query,
results, improvement_ideas, scanned_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (scan_id, source_type, query,
json.dumps(scan_result.get("results", [])),
json.dumps(scan_result.get("ideas", [])),
now_iso()))
results.append({
"scan_id": scan_id,
"source": source_type,
"query": query,
"results_count": len(scan_result.get("results", [])),
"ideas_count": len(scan_result.get("ideas", [])),
})
conn.commit()
conn.close()
return results
def _scan_source(self, source_type: str, query: str) -> dict:
"""Scan a single source. Returns results and extracted ideas."""
if self.brain:
return self._scan_with_brain(source_type, query)
else:
return self._generate_scan_template(source_type, query)
def _scan_with_brain(self, source_type: str, query: str) -> dict:
"""Use the AI brain to analyze a query and generate improvement ideas."""
prompt = f"""You are the External Intelligence Scanner for a PhD Research OS.
Search context: {source_type}
Query: "{query}"
Based on your knowledge, identify:
1. Recent developments (papers, tools, models) relevant to this query
2. Specific improvement ideas for a system that:
- Extracts scientific claims from papers
- Classifies claims as Fact/Interpretation/Hypothesis/Conflict
- Scores confidence using evidence × quality × tier × completeness
- Detects contradictions between claims
- Uses Qwen2.5-3B fine-tuned with QLoRA
Output JSON:
{{
"results": [{{"title": "...", "source": "...", "relevance": "high|medium|low", "summary": "..."}}],
"ideas": [{{"idea": "...", "expected_impact": "...", "effort": "low|medium|high", "category": "model|data|pipeline|taxonomy|architecture"}}]
}}"""
try:
messages = [
{"role": "system", "content": "You are a research intelligence scanner. Output valid JSON only."},
{"role": "user", "content": prompt}
]
if self.brain.backend == "local":
raw = self.brain._generate_local(messages)
else:
raw = self.brain._generate_api(messages)
text = raw.strip()
if text.startswith("```"):
text = text.split("```")[1]
if text.startswith("json"):
text = text[4:]
text = text.strip()
return json.loads(text)
except Exception:
return self._generate_scan_template(source_type, query)
def _generate_scan_template(self, source_type: str, query: str) -> dict:
"""Generate a structured template when no brain is available."""
return {
"results": [{
"title": f"[Placeholder] Scan for: {query}",
"source": source_type,
"relevance": "medium",
"summary": "Brain not configured — manual scan recommended"
}],
"ideas": [{
"idea": f"Investigate: {query}",
"expected_impact": "Unknown — requires manual evaluation",
"effort": "medium",
"category": "research"
}]
}
def get_unprocessed_ideas(self) -> list:
"""Get all improvement ideas not yet converted to proposals."""
conn = get_db(self.db_path)
rows = conn.execute("""
SELECT scan_id, source_type, query, improvement_ideas, scanned_at
FROM external_scan_results WHERE processed = 0
ORDER BY scanned_at DESC
""").fetchall()
conn.close()
ideas = []
for row in rows:
d = dict(row)
d["improvement_ideas"] = json.loads(d.get("improvement_ideas", "[]"))
ideas.append(d)
return ideas
# ============================================================
# Self-Reflector
# ============================================================
class SelfReflector:
"""
Analyzes the Meta-Improver's own performance and proposes self-upgrades.
Monitors:
- Which improvement proposals were accepted vs rejected (learn preferences)
- Which monitoring alerts were actionable vs noise (tune thresholds)
- Which external scans produced valuable ideas (focus searches)
- Overall system improvement trajectory (are we getting better?)
"""
def __init__(self, db_path: str):
self.db_path = db_path
def reflect(self) -> dict:
"""
Run a self-reflection cycle.
Returns findings and self-improvement proposals.
"""
conn = get_db(self.db_path)
findings = {}
# 1. Proposal acceptance rate
total_proposals = conn.execute("SELECT COUNT(*) FROM proposals").fetchone()[0]
approved = conn.execute("SELECT COUNT(*) FROM proposals WHERE status = 'approved'").fetchone()[0]
rejected = conn.execute("SELECT COUNT(*) FROM proposals WHERE status = 'rejected'").fetchone()[0]
if total_proposals > 0:
acceptance_rate = approved / total_proposals
findings["proposal_acceptance_rate"] = {
"total": total_proposals,
"approved": approved,
"rejected": rejected,
"rate": acceptance_rate,
"insight": (
"Low acceptance rate — proposals may be too aggressive or poorly targeted"
if acceptance_rate < 0.3 and total_proposals > 10
else "Acceptance rate healthy" if acceptance_rate > 0.5
else "Insufficient data for trend"
)
}
# 2. Rejection reasons analysis
rejections = conn.execute("""
SELECT rejection_reason FROM proposals WHERE status = 'rejected' AND rejection_reason != ''
""").fetchall()
if rejections:
reasons = [dict(r)["rejection_reason"] for r in rejections]
findings["rejection_patterns"] = {
"total_rejections": len(reasons),
"sample_reasons": reasons[:5],
"insight": "Analyze rejection reasons to avoid proposing similar changes"
}
# 3. External scan yield
total_scans = conn.execute("SELECT COUNT(*) FROM external_scan_results").fetchone()[0]
processed = conn.execute("SELECT COUNT(*) FROM external_scan_results WHERE processed = 1").fetchone()[0]
findings["external_scan_yield"] = {
"total_scans": total_scans,
"processed": processed,
"insight": "Track which source types produce the most actionable ideas"
}
# 4. Improvement trajectory
improvements = conn.execute("""
SELECT status, COUNT(*) as cnt FROM improvement_history GROUP BY status
""").fetchall()
findings["improvement_trajectory"] = {
status_counts[0]: status_counts[1]
for status_counts in [dict(r).values() for r in improvements]
} if improvements else {"no_data": True}
# 5. Self-improvement proposals
self_proposals = []
if findings.get("proposal_acceptance_rate", {}).get("rate", 1.0) < 0.3:
self_proposals.append({
"target": "proposal_generation",
"change": "Increase evidence requirements before generating proposals",
"reason": "Low acceptance rate suggests proposals are insufficiently grounded"
})
if total_scans > 20 and processed < total_scans * 0.5:
self_proposals.append({
"target": "external_scanning",
"change": "Reduce scan frequency or focus on higher-yield source types",
"reason": "Many scans unprocessed — scanning faster than consumption"
})
# Log reflection
reflection_id = gen_id("REFL")
conn.execute("""
INSERT INTO self_reflection_log (reflection_id, trigger, findings,
self_improvement_proposal, confidence, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (reflection_id, "scheduled_reflection", json.dumps(findings),
json.dumps(self_proposals), 0.6, now_iso()))
conn.commit()
conn.close()
return {
"reflection_id": reflection_id,
"findings": findings,
"self_improvement_proposals": self_proposals,
}
# ============================================================
# Meta-Improver — The Unified Intelligence
# ============================================================
class MetaImprover:
"""
The Meta-Improver AI: continuously monitors, scans, reflects, and proposes improvements.
This is permanently baked into the companion AI system. It:
1. Runs internal quality monitors on every check cycle
2. Scans external sources weekly for new papers, models, tools
3. Reflects on its own performance monthly
4. Produces ranked improvement proposals for human review
5. Tracks what improvements actually worked (closed-loop learning)
All improvements go through the ECC Harness proposal system.
The Meta-Improver NEVER self-modifies without human approval.
"""
def __init__(self, db_path: str = None, brain=None):
self.db_path = db_path or os.environ.get("RESEARCH_OS_DB", "data/research_os.db")
init_meta_improver_db(self.db_path)
self.brain = brain
self.monitor = InternalMonitor(self.db_path)
self.scanner = ExternalScanner(self.db_path, brain)
self.reflector = SelfReflector(self.db_path)
def run_improvement_cycle(self) -> dict:
"""
Run a full improvement cycle:
1. Internal monitoring → alerts
2. External scanning → ideas
3. Self-reflection → meta-proposals
4. Synthesis → ranked improvement proposals
Returns a comprehensive report.
"""
report = {
"timestamp": now_iso(),
"version": META_IMPROVER_VERSION,
"alerts": [],
"scan_results": [],
"reflection": {},
"improvement_proposals": [],
}
# Phase 1: Internal monitoring
print("MetaImprover: Running internal monitors...")
report["alerts"] = self.monitor.run_all_checks()
# Phase 2: External scanning
print("MetaImprover: Scanning external sources...")
report["scan_results"] = self.scanner.scan_all_sources()
# Phase 3: Self-reflection
print("MetaImprover: Running self-reflection...")
report["reflection"] = self.reflector.reflect()
# Phase 4: Synthesize improvement proposals
print("MetaImprover: Synthesizing proposals...")
report["improvement_proposals"] = self._synthesize_proposals(report)
return report
def _synthesize_proposals(self, report: dict) -> list:
"""
Synthesize findings from all sources into ranked improvement proposals.
"""
proposals = []
conn = get_db(self.db_path)
# From alerts → improvement proposals
for alert in report.get("alerts", []):
if alert.get("severity") in ["high", "medium"]:
imp_id = gen_id("IMP")
proposals.append({
"improvement_id": imp_id,
"category": "internal_monitor",
"source": alert["monitor"],
"description": f"Address: {alert['message']}",
"priority": "high" if alert["severity"] == "high" else "medium",
"suggested_action": self._suggest_action_for_alert(alert),
})
conn.execute("""
INSERT INTO improvement_history (improvement_id, category, description, created_at)
VALUES (?, 'internal_monitor', ?, ?)
""", (imp_id, alert["message"], now_iso()))
# From external scans → improvement proposals
unprocessed = self.scanner.get_unprocessed_ideas()
for scan in unprocessed[:5]: # Process top 5
for idea in scan.get("improvement_ideas", [])[:3]: # Top 3 ideas per scan
if isinstance(idea, dict):
imp_id = gen_id("IMP")
proposals.append({
"improvement_id": imp_id,
"category": "external_scan",
"source": f"{scan.get('source_type', 'unknown')}: {scan.get('query', '')}",
"description": idea.get("idea", ""),
"priority": "medium",
"expected_impact": idea.get("expected_impact", "unknown"),
"effort": idea.get("effort", "medium"),
})
# From self-reflection → meta-proposals
for sp in report.get("reflection", {}).get("self_improvement_proposals", []):
imp_id = gen_id("IMP")
proposals.append({
"improvement_id": imp_id,
"category": "self_reflection",
"source": "meta_improver_self_analysis",
"description": f"Self-improve: {sp.get('change', '')} (reason: {sp.get('reason', '')})",
"priority": "low",
"target": sp.get("target", "unknown"),
})
conn.commit()
conn.close()
# Rank by priority
priority_order = {"high": 0, "medium": 1, "low": 2}
proposals.sort(key=lambda x: priority_order.get(x.get("priority", "low"), 99))
return proposals
def _suggest_action_for_alert(self, alert: dict) -> str:
"""Suggest a concrete action based on an internal monitoring alert."""
actions = {
"extraction_quality": "Run evaluation harness against golden dataset. If degraded, check recent prompt changes and revert if needed.",
"confidence_calibration": "Compute Brier score from calibration_log. If overconfident, reduce study_quality_weights by 5%.",
"conflict_detection_rate": "Review last 20 detected conflicts manually. If >30% false positives, tighten keyword overlap threshold.",
"api_cost_efficiency": "Check which task types consume most tokens. Enable semantic caching for repeat queries.",
"prompt_drift": "Run regression gate. Compare current metrics to Phase 2 baseline. Revert prompt if degraded.",
"taxonomy_coverage": "Add unmapped study types to domain taxonomy or create alias mappings.",
"companion_agent_health": "Check failed tasks for common errors. Increase iteration budgets if tasks are timing out.",
}
return actions.get(alert.get("monitor", ""), "Investigate the alert and determine appropriate response.")
def get_improvement_history(self, limit: int = 20) -> list:
"""Get improvement history with status."""
conn = get_db(self.db_path)
rows = conn.execute("""
SELECT * FROM improvement_history ORDER BY created_at DESC LIMIT ?
""", (limit,)).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_self_reflections(self, limit: int = 10) -> list:
"""Get self-reflection history."""
conn = get_db(self.db_path)
rows = conn.execute("""
SELECT * FROM self_reflection_log ORDER BY created_at DESC LIMIT ?
""", (limit,)).fetchall()
conn.close()
results = []
for r in rows:
d = dict(r)
d["findings"] = json.loads(d.get("findings", "{}"))
d["self_improvement_proposal"] = json.loads(d.get("self_improvement_proposal", "[]"))
results.append(d)
return results
def mark_improvement_applied(self, improvement_id: str, proposal_id: str = None):
"""Mark an improvement as applied (with optional link to proposal)."""
conn = get_db(self.db_path)
conn.execute("""
UPDATE improvement_history SET status = 'applied', proposal_id = ?, applied_at = ?
WHERE improvement_id = ?
""", (proposal_id, now_iso(), improvement_id))
conn.commit()
conn.close()
def mark_improvement_measured(self, improvement_id: str, impact: dict):
"""Record the measured impact of an applied improvement."""
conn = get_db(self.db_path)
conn.execute("""
UPDATE improvement_history SET status = 'measured', impact_measured = ?, measured_at = ?
WHERE improvement_id = ?
""", (json.dumps(impact), now_iso(), improvement_id))
conn.commit()
conn.close()