""" PhD Research OS — Core Data Layer (Phase 0) ============================================ SQLite database with CRUD operations for Claim, Source, Goal, Conflict, Decision, Override, and Experiment objects. All objects carry schema_version tags. Fixed-point confidence math. """ import sqlite3 import json import uuid import os from datetime import datetime, timezone from typing import Optional from dataclasses import dataclass, field, asdict SCHEMA_VERSION = "1.0" DB_PATH = os.environ.get("RESEARCH_OS_DB", "data/research_os.db") def get_db(db_path: str = None) -> sqlite3.Connection: """Get database connection with WAL mode and foreign keys.""" path = db_path or DB_PATH os.makedirs(os.path.dirname(path) if os.path.dirname(path) else ".", exist_ok=True) conn = sqlite3.connect(path) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") return conn def init_db(db_path: str = None): """Initialize all tables.""" conn = get_db(db_path) conn.executescript(""" CREATE TABLE IF NOT EXISTS claims ( claim_id TEXT PRIMARY KEY, text TEXT NOT NULL, status TEXT NOT NULL CHECK(status IN ('Complete', 'Incomplete')), epistemic_tag TEXT NOT NULL CHECK(epistemic_tag IN ('Fact', 'Interpretation', 'Hypothesis', 'Conflict_Hypothesis')), confidence INTEGER NOT NULL, -- Fixed-point: actual * 1000 evidence_strength INTEGER, -- Fixed-point: actual * 1000 study_type TEXT, study_quality_weight INTEGER, -- Fixed-point: actual * 1000 journal_tier_weight INTEGER, -- Fixed-point: actual * 1000 completeness_penalty INTEGER, -- Fixed-point: actual * 1000 source_doi TEXT, source_type TEXT, missing_fields TEXT, -- JSON array parameters TEXT, -- JSON dict is_canonical INTEGER DEFAULT 0, expert_override TEXT, -- JSON object if set schema_version TEXT NOT NULL DEFAULT '1.0', created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS sources ( doi TEXT PRIMARY KEY, title TEXT NOT NULL, authors TEXT, -- JSON array year INTEGER, journal TEXT, journal_tier INTEGER CHECK(journal_tier IN (1, 2, 3)), study_type TEXT, is_canonical INTEGER DEFAULT 0, source_type TEXT, schema_version TEXT NOT NULL DEFAULT '1.0', created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS goals ( goal_id TEXT PRIMARY KEY, description TEXT NOT NULL, priority TEXT NOT NULL CHECK(priority IN ('high', 'medium', 'low')), status TEXT NOT NULL CHECK(status IN ('Active', 'Paused', 'Completed')), linked_claim_ids TEXT, -- JSON array schema_version TEXT NOT NULL DEFAULT '1.0', created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS conflicts ( conflict_id TEXT PRIMARY KEY, claim_a_id TEXT NOT NULL, claim_b_id TEXT NOT NULL, conflict_type TEXT NOT NULL CHECK(conflict_type IN ('value_mismatch', 'methodology_difference', 'scope_difference', 'no_conflict')), generated_hypothesis TEXT, hypothesis_confidence TEXT DEFAULT 'low', resolution_status TEXT NOT NULL DEFAULT 'Unresolved' CHECK(resolution_status IN ('Unresolved', 'Human_Resolved')), resolution_notes TEXT, key_differences TEXT, -- JSON array schema_version TEXT NOT NULL DEFAULT '1.0', created_at TEXT NOT NULL, resolved_at TEXT, FOREIGN KEY(claim_a_id) REFERENCES claims(claim_id), FOREIGN KEY(claim_b_id) REFERENCES claims(claim_id) ); CREATE TABLE IF NOT EXISTS decisions ( decision_id TEXT PRIMARY KEY, linked_claim_ids TEXT, -- JSON array recommended_action TEXT NOT NULL, action_description TEXT, expected_information_gain INTEGER, -- Fixed-point * 1000 linked_goal_id TEXT, status TEXT NOT NULL DEFAULT 'Proposed' CHECK(status IN ('Proposed', 'Accepted', 'Rejected', 'Completed')), why_not_log TEXT, -- Reason if rejected priority TEXT DEFAULT 'medium', estimated_effort TEXT, schema_version TEXT NOT NULL DEFAULT '1.0', created_at TEXT NOT NULL, FOREIGN KEY(linked_goal_id) REFERENCES goals(goal_id) ); CREATE TABLE IF NOT EXISTS overrides ( override_id TEXT PRIMARY KEY, claim_id TEXT NOT NULL, who TEXT NOT NULL, rationale TEXT NOT NULL, authoritative_source TEXT, original_confidence INTEGER, -- Fixed-point * 1000 new_confidence INTEGER, -- Fixed-point * 1000 status TEXT DEFAULT 'Provisional', schema_version TEXT NOT NULL DEFAULT '1.0', created_at TEXT NOT NULL, FOREIGN KEY(claim_id) REFERENCES claims(claim_id) ); CREATE TABLE IF NOT EXISTS experiments ( experiment_id TEXT PRIMARY KEY, date TEXT NOT NULL, instrument TEXT, measurement_type TEXT, raw_file_path TEXT, summary_statistics TEXT, -- JSON dict linked_claim_ids TEXT, -- JSON array approved INTEGER DEFAULT 0, schema_version TEXT NOT NULL DEFAULT '1.0', created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS api_usage_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, model TEXT NOT NULL, tokens_in INTEGER NOT NULL, tokens_out INTEGER NOT NULL, cost_usd REAL NOT NULL, task_type TEXT ); CREATE TABLE IF NOT EXISTS calibration_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, conflict_id TEXT, system_predicted_confidence INTEGER, -- Fixed-point * 1000 human_actual_judgment TEXT, timestamp TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS embedding_cache ( text_hash TEXT PRIMARY KEY, embedding BLOB NOT NULL, model TEXT NOT NULL, created_at TEXT NOT NULL ); """) conn.commit() conn.close() # ============================================================ # Fixed-Point Math Utilities # ============================================================ def to_fixed(value: float) -> int: """Convert float to fixed-point integer (×1000). Research OS Rule 5: All probability/confidence use scaled integers.""" return round(value * 1000) def from_fixed(value: int) -> float: """Convert fixed-point integer back to float.""" return value / 1000.0 # ============================================================ # CRUD Operations # ============================================================ def now_iso() -> str: return datetime.now(timezone.utc).isoformat() def gen_id(prefix: str) -> str: return f"{prefix}_{uuid.uuid4().hex[:8].upper()}" # --- Claims --- def create_claim(conn: sqlite3.Connection, text: str, epistemic_tag: str, confidence: float, source_doi: str = None, evidence_strength: float = None, study_type: str = None, study_quality_weight: float = None, journal_tier_weight: float = None, completeness_penalty: float = None, missing_fields: list = None, parameters: dict = None, is_canonical: bool = False) -> str: """Create a new Claim Object. Returns claim_id.""" claim_id = gen_id("CLM") status = "Incomplete" if missing_fields else "Complete" now = now_iso() conn.execute(""" INSERT INTO claims (claim_id, text, status, epistemic_tag, confidence, evidence_strength, study_type, study_quality_weight, journal_tier_weight, completeness_penalty, source_doi, missing_fields, parameters, is_canonical, schema_version, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (claim_id, text, status, epistemic_tag, to_fixed(confidence), to_fixed(evidence_strength) if evidence_strength else None, study_type, to_fixed(study_quality_weight) if study_quality_weight else None, to_fixed(journal_tier_weight) if journal_tier_weight else None, to_fixed(completeness_penalty) if completeness_penalty else None, source_doi, json.dumps(missing_fields or []), json.dumps(parameters or {}), int(is_canonical), SCHEMA_VERSION, now, now)) conn.commit() return claim_id def get_claim(conn: sqlite3.Connection, claim_id: str) -> Optional[dict]: """Get a single claim by ID.""" row = conn.execute("SELECT * FROM claims WHERE claim_id = ?", (claim_id,)).fetchone() if row is None: return None d = dict(row) # Convert fixed-point back to float for key in ['confidence', 'evidence_strength', 'study_quality_weight', 'journal_tier_weight', 'completeness_penalty']: if d.get(key) is not None: d[key] = from_fixed(d[key]) d['missing_fields'] = json.loads(d.get('missing_fields', '[]')) d['parameters'] = json.loads(d.get('parameters', '{}')) if d.get('expert_override'): d['expert_override'] = json.loads(d['expert_override']) return d def update_claim(conn: sqlite3.Connection, claim_id: str, **kwargs) -> bool: """Update claim fields. Fixed-point conversion applied automatically.""" fixed_fields = {'confidence', 'evidence_strength', 'study_quality_weight', 'journal_tier_weight', 'completeness_penalty'} json_fields = {'missing_fields', 'parameters', 'expert_override'} updates = [] values = [] for key, val in kwargs.items(): if key in fixed_fields and val is not None: updates.append(f"{key} = ?") values.append(to_fixed(val)) elif key in json_fields and val is not None: updates.append(f"{key} = ?") values.append(json.dumps(val)) else: updates.append(f"{key} = ?") values.append(val) updates.append("updated_at = ?") values.append(now_iso()) values.append(claim_id) if updates: conn.execute(f"UPDATE claims SET {', '.join(updates)} WHERE claim_id = ?", values) conn.commit() return True return False def search_claims(conn: sqlite3.Connection, query: str = None, epistemic_tag: str = None, min_confidence: float = None, source_doi: str = None, limit: int = 50) -> list: """Search claims with optional filters.""" conditions = [] params = [] if query: conditions.append("text LIKE ?") params.append(f"%{query}%") if epistemic_tag: conditions.append("epistemic_tag = ?") params.append(epistemic_tag) if min_confidence is not None: conditions.append("confidence >= ?") params.append(to_fixed(min_confidence)) if source_doi: conditions.append("source_doi = ?") params.append(source_doi) where = " AND ".join(conditions) if conditions else "1=1" rows = conn.execute( f"SELECT * FROM claims WHERE {where} ORDER BY confidence DESC LIMIT ?", params + [limit] ).fetchall() results = [] for row in rows: d = dict(row) for key in ['confidence', 'evidence_strength', 'study_quality_weight', 'journal_tier_weight', 'completeness_penalty']: if d.get(key) is not None: d[key] = from_fixed(d[key]) d['missing_fields'] = json.loads(d.get('missing_fields', '[]')) d['parameters'] = json.loads(d.get('parameters', '{}')) results.append(d) return results # --- Sources --- def create_source(conn: sqlite3.Connection, doi: str, title: str, authors: list = None, year: int = None, journal: str = None, journal_tier: int = None, study_type: str = None, is_canonical: bool = False) -> str: """Create a Source Object.""" conn.execute(""" INSERT OR REPLACE INTO sources (doi, title, authors, year, journal, journal_tier, study_type, is_canonical, schema_version, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (doi, title, json.dumps(authors or []), year, journal, journal_tier, study_type, int(is_canonical), SCHEMA_VERSION, now_iso())) conn.commit() return doi def get_source(conn: sqlite3.Connection, doi: str) -> Optional[dict]: """Get source by DOI.""" row = conn.execute("SELECT * FROM sources WHERE doi = ?", (doi,)).fetchone() if row is None: return None d = dict(row) d['authors'] = json.loads(d.get('authors', '[]')) return d # --- Goals --- def create_goal(conn: sqlite3.Connection, description: str, priority: str, linked_claim_ids: list = None) -> str: """Create a Goal Object.""" goal_id = gen_id("GOAL") now = now_iso() conn.execute(""" INSERT INTO goals (goal_id, description, priority, status, linked_claim_ids, schema_version, created_at, updated_at) VALUES (?, ?, ?, 'Active', ?, ?, ?, ?) """, (goal_id, description, priority, json.dumps(linked_claim_ids or []), SCHEMA_VERSION, now, now)) conn.commit() return goal_id def get_goals_by_priority(conn: sqlite3.Connection, status: str = "Active") -> list: """Get goals sorted by priority.""" priority_order = {"high": 1, "medium": 2, "low": 3} rows = conn.execute( "SELECT * FROM goals WHERE status = ? ORDER BY priority", (status,) ).fetchall() results = [dict(r) for r in rows] for r in results: r['linked_claim_ids'] = json.loads(r.get('linked_claim_ids', '[]')) results.sort(key=lambda x: priority_order.get(x['priority'], 99)) return results # --- Conflicts --- def create_conflict(conn: sqlite3.Connection, claim_a_id: str, claim_b_id: str, conflict_type: str, generated_hypothesis: str = None, key_differences: list = None) -> str: """Create a Conflict Resolution Object.""" conflict_id = gen_id("CONF") conn.execute(""" INSERT INTO conflicts (conflict_id, claim_a_id, claim_b_id, conflict_type, generated_hypothesis, hypothesis_confidence, key_differences, schema_version, created_at) VALUES (?, ?, ?, ?, ?, 'low', ?, ?, ?) """, (conflict_id, claim_a_id, claim_b_id, conflict_type, generated_hypothesis, json.dumps(key_differences or []), SCHEMA_VERSION, now_iso())) conn.commit() return conflict_id # --- Decisions --- def create_decision(conn: sqlite3.Connection, recommended_action: str, action_description: str, expected_info_gain: float, linked_goal_id: str = None, linked_claim_ids: list = None, priority: str = "medium", estimated_effort: str = None) -> str: """Create a Decision Object.""" dec_id = gen_id("DEC") conn.execute(""" INSERT INTO decisions (decision_id, linked_claim_ids, recommended_action, action_description, expected_information_gain, linked_goal_id, priority, estimated_effort, schema_version, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (dec_id, json.dumps(linked_claim_ids or []), recommended_action, action_description, to_fixed(expected_info_gain), linked_goal_id, priority, estimated_effort, SCHEMA_VERSION, now_iso())) conn.commit() return dec_id # --- Overrides --- def create_override(conn: sqlite3.Connection, claim_id: str, who: str, rationale: str, new_confidence: float, authoritative_source: str = None) -> str: """Create an expert override. Locks claim confidence.""" override_id = gen_id("OVR") # Get original confidence claim = get_claim(conn, claim_id) if not claim: raise ValueError(f"Claim {claim_id} not found") conn.execute(""" INSERT INTO overrides (override_id, claim_id, who, rationale, authoritative_source, original_confidence, new_confidence, schema_version, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, (override_id, claim_id, who, rationale, authoritative_source, to_fixed(claim['confidence']), to_fixed(new_confidence), SCHEMA_VERSION, now_iso())) # Update the claim with override update_claim(conn, claim_id, confidence=new_confidence, expert_override={"override_id": override_id, "who": who, "rationale": rationale}) conn.commit() return override_id # --- API Usage Logging --- def log_api_usage(conn: sqlite3.Connection, model: str, tokens_in: int, tokens_out: int, cost_usd: float, task_type: str = None): """Log API usage for cost tracking.""" conn.execute(""" INSERT INTO api_usage_log (timestamp, model, tokens_in, tokens_out, cost_usd, task_type) VALUES (?, ?, ?, ?, ?, ?) """, (now_iso(), model, tokens_in, tokens_out, cost_usd, task_type)) conn.commit() def get_cost_summary(conn: sqlite3.Connection, days: int = 7) -> dict: """Get API cost summary for the last N days.""" row = conn.execute(""" SELECT SUM(cost_usd) as total_cost, SUM(tokens_in) as total_in, SUM(tokens_out) as total_out, COUNT(*) as num_calls FROM api_usage_log WHERE timestamp >= datetime('now', ?) """, (f"-{days} days",)).fetchone() return dict(row) if row else {"total_cost": 0, "total_in": 0, "total_out": 0, "num_calls": 0} if __name__ == "__main__": # Self-test init_db("test_research_os.db") conn = get_db("test_research_os.db") # Test claim CRUD cid = create_claim(conn, "Test claim about graphene", "Fact", 0.85, evidence_strength=0.9, study_quality_weight=1.0, journal_tier_weight=1.0, completeness_penalty=1.0) claim = get_claim(conn, cid) assert claim is not None assert claim['confidence'] == 0.85 assert claim['status'] == 'Complete' # Test with missing fields cid2 = create_claim(conn, "Incomplete claim", "Hypothesis", 0.35, missing_fields=["temperature", "pH"]) claim2 = get_claim(conn, cid2) assert claim2['status'] == 'Incomplete' assert len(claim2['missing_fields']) == 2 # Test source create_source(conn, "10.1234/test", "Test Paper", ["Author A"], 2024, "Nature", 1) src = get_source(conn, "10.1234/test") assert src is not None # Test goal gid = create_goal(conn, "Achieve LOD < 1 fM", "high", [cid]) goals = get_goals_by_priority(conn) assert len(goals) == 1 # Test conflict conf_id = create_conflict(conn, cid, cid2, "value_mismatch", "Different measurement conditions") # Test override ovr_id = create_override(conn, cid, "Dr. Smith", "Expert knowledge", 0.95) claim_updated = get_claim(conn, cid) assert claim_updated['confidence'] == 0.95 assert claim_updated['expert_override'] is not None # Test search results = search_claims(conn, query="graphene") assert len(results) >= 1 # Test fixed-point math assert to_fixed(0.85) == 850 assert from_fixed(850) == 0.85 assert to_fixed(0.123) == 123 conn.close() os.remove("test_research_os.db") print("All database tests passed! ✓")