nkshirsa's picture
Add phd_research_os/db.py
d86f9d6 verified
"""
PhD Research OS — Core Data Layer (Phase 0)
============================================
SQLite database with CRUD operations for Claim, Source, Goal, Conflict,
Decision, Override, and Experiment objects.
All objects carry schema_version tags. Fixed-point confidence math.
"""
import sqlite3
import json
import uuid
import os
from datetime import datetime, timezone
from typing import Optional
from dataclasses import dataclass, field, asdict
SCHEMA_VERSION = "1.0"
DB_PATH = os.environ.get("RESEARCH_OS_DB", "data/research_os.db")
def get_db(db_path: str = None) -> sqlite3.Connection:
"""Get database connection with WAL mode and foreign keys."""
path = db_path or DB_PATH
os.makedirs(os.path.dirname(path) if os.path.dirname(path) else ".", exist_ok=True)
conn = sqlite3.connect(path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def init_db(db_path: str = None):
"""Initialize all tables."""
conn = get_db(db_path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS claims (
claim_id TEXT PRIMARY KEY,
text TEXT NOT NULL,
status TEXT NOT NULL CHECK(status IN ('Complete', 'Incomplete')),
epistemic_tag TEXT NOT NULL CHECK(epistemic_tag IN ('Fact', 'Interpretation', 'Hypothesis', 'Conflict_Hypothesis')),
confidence INTEGER NOT NULL, -- Fixed-point: actual * 1000
evidence_strength INTEGER, -- Fixed-point: actual * 1000
study_type TEXT,
study_quality_weight INTEGER, -- Fixed-point: actual * 1000
journal_tier_weight INTEGER, -- Fixed-point: actual * 1000
completeness_penalty INTEGER, -- Fixed-point: actual * 1000
source_doi TEXT,
source_type TEXT,
missing_fields TEXT, -- JSON array
parameters TEXT, -- JSON dict
is_canonical INTEGER DEFAULT 0,
expert_override TEXT, -- JSON object if set
schema_version TEXT NOT NULL DEFAULT '1.0',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS sources (
doi TEXT PRIMARY KEY,
title TEXT NOT NULL,
authors TEXT, -- JSON array
year INTEGER,
journal TEXT,
journal_tier INTEGER CHECK(journal_tier IN (1, 2, 3)),
study_type TEXT,
is_canonical INTEGER DEFAULT 0,
source_type TEXT,
schema_version TEXT NOT NULL DEFAULT '1.0',
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS goals (
goal_id TEXT PRIMARY KEY,
description TEXT NOT NULL,
priority TEXT NOT NULL CHECK(priority IN ('high', 'medium', 'low')),
status TEXT NOT NULL CHECK(status IN ('Active', 'Paused', 'Completed')),
linked_claim_ids TEXT, -- JSON array
schema_version TEXT NOT NULL DEFAULT '1.0',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS conflicts (
conflict_id TEXT PRIMARY KEY,
claim_a_id TEXT NOT NULL,
claim_b_id TEXT NOT NULL,
conflict_type TEXT NOT NULL CHECK(conflict_type IN ('value_mismatch', 'methodology_difference', 'scope_difference', 'no_conflict')),
generated_hypothesis TEXT,
hypothesis_confidence TEXT DEFAULT 'low',
resolution_status TEXT NOT NULL DEFAULT 'Unresolved' CHECK(resolution_status IN ('Unresolved', 'Human_Resolved')),
resolution_notes TEXT,
key_differences TEXT, -- JSON array
schema_version TEXT NOT NULL DEFAULT '1.0',
created_at TEXT NOT NULL,
resolved_at TEXT,
FOREIGN KEY(claim_a_id) REFERENCES claims(claim_id),
FOREIGN KEY(claim_b_id) REFERENCES claims(claim_id)
);
CREATE TABLE IF NOT EXISTS decisions (
decision_id TEXT PRIMARY KEY,
linked_claim_ids TEXT, -- JSON array
recommended_action TEXT NOT NULL,
action_description TEXT,
expected_information_gain INTEGER, -- Fixed-point * 1000
linked_goal_id TEXT,
status TEXT NOT NULL DEFAULT 'Proposed' CHECK(status IN ('Proposed', 'Accepted', 'Rejected', 'Completed')),
why_not_log TEXT, -- Reason if rejected
priority TEXT DEFAULT 'medium',
estimated_effort TEXT,
schema_version TEXT NOT NULL DEFAULT '1.0',
created_at TEXT NOT NULL,
FOREIGN KEY(linked_goal_id) REFERENCES goals(goal_id)
);
CREATE TABLE IF NOT EXISTS overrides (
override_id TEXT PRIMARY KEY,
claim_id TEXT NOT NULL,
who TEXT NOT NULL,
rationale TEXT NOT NULL,
authoritative_source TEXT,
original_confidence INTEGER, -- Fixed-point * 1000
new_confidence INTEGER, -- Fixed-point * 1000
status TEXT DEFAULT 'Provisional',
schema_version TEXT NOT NULL DEFAULT '1.0',
created_at TEXT NOT NULL,
FOREIGN KEY(claim_id) REFERENCES claims(claim_id)
);
CREATE TABLE IF NOT EXISTS experiments (
experiment_id TEXT PRIMARY KEY,
date TEXT NOT NULL,
instrument TEXT,
measurement_type TEXT,
raw_file_path TEXT,
summary_statistics TEXT, -- JSON dict
linked_claim_ids TEXT, -- JSON array
approved INTEGER DEFAULT 0,
schema_version TEXT NOT NULL DEFAULT '1.0',
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS api_usage_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
model TEXT NOT NULL,
tokens_in INTEGER NOT NULL,
tokens_out INTEGER NOT NULL,
cost_usd REAL NOT NULL,
task_type TEXT
);
CREATE TABLE IF NOT EXISTS calibration_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
conflict_id TEXT,
system_predicted_confidence INTEGER, -- Fixed-point * 1000
human_actual_judgment TEXT,
timestamp TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS embedding_cache (
text_hash TEXT PRIMARY KEY,
embedding BLOB NOT NULL,
model TEXT NOT NULL,
created_at TEXT NOT NULL
);
""")
conn.commit()
conn.close()
# ============================================================
# Fixed-Point Math Utilities
# ============================================================
def to_fixed(value: float) -> int:
"""Convert float to fixed-point integer (×1000).
Research OS Rule 5: All probability/confidence use scaled integers."""
return round(value * 1000)
def from_fixed(value: int) -> float:
"""Convert fixed-point integer back to float."""
return value / 1000.0
# ============================================================
# CRUD Operations
# ============================================================
def now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def gen_id(prefix: str) -> str:
return f"{prefix}_{uuid.uuid4().hex[:8].upper()}"
# --- Claims ---
def create_claim(conn: sqlite3.Connection, text: str, epistemic_tag: str,
confidence: float, source_doi: str = None,
evidence_strength: float = None, study_type: str = None,
study_quality_weight: float = None, journal_tier_weight: float = None,
completeness_penalty: float = None, missing_fields: list = None,
parameters: dict = None, is_canonical: bool = False) -> str:
"""Create a new Claim Object. Returns claim_id."""
claim_id = gen_id("CLM")
status = "Incomplete" if missing_fields else "Complete"
now = now_iso()
conn.execute("""
INSERT INTO claims (claim_id, text, status, epistemic_tag, confidence,
evidence_strength, study_type, study_quality_weight, journal_tier_weight,
completeness_penalty, source_doi, missing_fields, parameters,
is_canonical, schema_version, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (claim_id, text, status, epistemic_tag, to_fixed(confidence),
to_fixed(evidence_strength) if evidence_strength else None,
study_type,
to_fixed(study_quality_weight) if study_quality_weight else None,
to_fixed(journal_tier_weight) if journal_tier_weight else None,
to_fixed(completeness_penalty) if completeness_penalty else None,
source_doi,
json.dumps(missing_fields or []),
json.dumps(parameters or {}),
int(is_canonical),
SCHEMA_VERSION, now, now))
conn.commit()
return claim_id
def get_claim(conn: sqlite3.Connection, claim_id: str) -> Optional[dict]:
"""Get a single claim by ID."""
row = conn.execute("SELECT * FROM claims WHERE claim_id = ?", (claim_id,)).fetchone()
if row is None:
return None
d = dict(row)
# Convert fixed-point back to float
for key in ['confidence', 'evidence_strength', 'study_quality_weight',
'journal_tier_weight', 'completeness_penalty']:
if d.get(key) is not None:
d[key] = from_fixed(d[key])
d['missing_fields'] = json.loads(d.get('missing_fields', '[]'))
d['parameters'] = json.loads(d.get('parameters', '{}'))
if d.get('expert_override'):
d['expert_override'] = json.loads(d['expert_override'])
return d
def update_claim(conn: sqlite3.Connection, claim_id: str, **kwargs) -> bool:
"""Update claim fields. Fixed-point conversion applied automatically."""
fixed_fields = {'confidence', 'evidence_strength', 'study_quality_weight',
'journal_tier_weight', 'completeness_penalty'}
json_fields = {'missing_fields', 'parameters', 'expert_override'}
updates = []
values = []
for key, val in kwargs.items():
if key in fixed_fields and val is not None:
updates.append(f"{key} = ?")
values.append(to_fixed(val))
elif key in json_fields and val is not None:
updates.append(f"{key} = ?")
values.append(json.dumps(val))
else:
updates.append(f"{key} = ?")
values.append(val)
updates.append("updated_at = ?")
values.append(now_iso())
values.append(claim_id)
if updates:
conn.execute(f"UPDATE claims SET {', '.join(updates)} WHERE claim_id = ?", values)
conn.commit()
return True
return False
def search_claims(conn: sqlite3.Connection, query: str = None,
epistemic_tag: str = None, min_confidence: float = None,
source_doi: str = None, limit: int = 50) -> list:
"""Search claims with optional filters."""
conditions = []
params = []
if query:
conditions.append("text LIKE ?")
params.append(f"%{query}%")
if epistemic_tag:
conditions.append("epistemic_tag = ?")
params.append(epistemic_tag)
if min_confidence is not None:
conditions.append("confidence >= ?")
params.append(to_fixed(min_confidence))
if source_doi:
conditions.append("source_doi = ?")
params.append(source_doi)
where = " AND ".join(conditions) if conditions else "1=1"
rows = conn.execute(
f"SELECT * FROM claims WHERE {where} ORDER BY confidence DESC LIMIT ?",
params + [limit]
).fetchall()
results = []
for row in rows:
d = dict(row)
for key in ['confidence', 'evidence_strength', 'study_quality_weight',
'journal_tier_weight', 'completeness_penalty']:
if d.get(key) is not None:
d[key] = from_fixed(d[key])
d['missing_fields'] = json.loads(d.get('missing_fields', '[]'))
d['parameters'] = json.loads(d.get('parameters', '{}'))
results.append(d)
return results
# --- Sources ---
def create_source(conn: sqlite3.Connection, doi: str, title: str,
authors: list = None, year: int = None, journal: str = None,
journal_tier: int = None, study_type: str = None,
is_canonical: bool = False) -> str:
"""Create a Source Object."""
conn.execute("""
INSERT OR REPLACE INTO sources (doi, title, authors, year, journal, journal_tier,
study_type, is_canonical, schema_version, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (doi, title, json.dumps(authors or []), year, journal, journal_tier,
study_type, int(is_canonical), SCHEMA_VERSION, now_iso()))
conn.commit()
return doi
def get_source(conn: sqlite3.Connection, doi: str) -> Optional[dict]:
"""Get source by DOI."""
row = conn.execute("SELECT * FROM sources WHERE doi = ?", (doi,)).fetchone()
if row is None:
return None
d = dict(row)
d['authors'] = json.loads(d.get('authors', '[]'))
return d
# --- Goals ---
def create_goal(conn: sqlite3.Connection, description: str, priority: str,
linked_claim_ids: list = None) -> str:
"""Create a Goal Object."""
goal_id = gen_id("GOAL")
now = now_iso()
conn.execute("""
INSERT INTO goals (goal_id, description, priority, status, linked_claim_ids,
schema_version, created_at, updated_at)
VALUES (?, ?, ?, 'Active', ?, ?, ?, ?)
""", (goal_id, description, priority, json.dumps(linked_claim_ids or []),
SCHEMA_VERSION, now, now))
conn.commit()
return goal_id
def get_goals_by_priority(conn: sqlite3.Connection, status: str = "Active") -> list:
"""Get goals sorted by priority."""
priority_order = {"high": 1, "medium": 2, "low": 3}
rows = conn.execute(
"SELECT * FROM goals WHERE status = ? ORDER BY priority", (status,)
).fetchall()
results = [dict(r) for r in rows]
for r in results:
r['linked_claim_ids'] = json.loads(r.get('linked_claim_ids', '[]'))
results.sort(key=lambda x: priority_order.get(x['priority'], 99))
return results
# --- Conflicts ---
def create_conflict(conn: sqlite3.Connection, claim_a_id: str, claim_b_id: str,
conflict_type: str, generated_hypothesis: str = None,
key_differences: list = None) -> str:
"""Create a Conflict Resolution Object."""
conflict_id = gen_id("CONF")
conn.execute("""
INSERT INTO conflicts (conflict_id, claim_a_id, claim_b_id, conflict_type,
generated_hypothesis, hypothesis_confidence, key_differences,
schema_version, created_at)
VALUES (?, ?, ?, ?, ?, 'low', ?, ?, ?)
""", (conflict_id, claim_a_id, claim_b_id, conflict_type,
generated_hypothesis, json.dumps(key_differences or []),
SCHEMA_VERSION, now_iso()))
conn.commit()
return conflict_id
# --- Decisions ---
def create_decision(conn: sqlite3.Connection, recommended_action: str,
action_description: str, expected_info_gain: float,
linked_goal_id: str = None, linked_claim_ids: list = None,
priority: str = "medium", estimated_effort: str = None) -> str:
"""Create a Decision Object."""
dec_id = gen_id("DEC")
conn.execute("""
INSERT INTO decisions (decision_id, linked_claim_ids, recommended_action,
action_description, expected_information_gain, linked_goal_id,
priority, estimated_effort, schema_version, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (dec_id, json.dumps(linked_claim_ids or []), recommended_action,
action_description, to_fixed(expected_info_gain), linked_goal_id,
priority, estimated_effort, SCHEMA_VERSION, now_iso()))
conn.commit()
return dec_id
# --- Overrides ---
def create_override(conn: sqlite3.Connection, claim_id: str, who: str,
rationale: str, new_confidence: float,
authoritative_source: str = None) -> str:
"""Create an expert override. Locks claim confidence."""
override_id = gen_id("OVR")
# Get original confidence
claim = get_claim(conn, claim_id)
if not claim:
raise ValueError(f"Claim {claim_id} not found")
conn.execute("""
INSERT INTO overrides (override_id, claim_id, who, rationale,
authoritative_source, original_confidence, new_confidence,
schema_version, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (override_id, claim_id, who, rationale, authoritative_source,
to_fixed(claim['confidence']), to_fixed(new_confidence),
SCHEMA_VERSION, now_iso()))
# Update the claim with override
update_claim(conn, claim_id, confidence=new_confidence,
expert_override={"override_id": override_id, "who": who,
"rationale": rationale})
conn.commit()
return override_id
# --- API Usage Logging ---
def log_api_usage(conn: sqlite3.Connection, model: str, tokens_in: int,
tokens_out: int, cost_usd: float, task_type: str = None):
"""Log API usage for cost tracking."""
conn.execute("""
INSERT INTO api_usage_log (timestamp, model, tokens_in, tokens_out, cost_usd, task_type)
VALUES (?, ?, ?, ?, ?, ?)
""", (now_iso(), model, tokens_in, tokens_out, cost_usd, task_type))
conn.commit()
def get_cost_summary(conn: sqlite3.Connection, days: int = 7) -> dict:
"""Get API cost summary for the last N days."""
row = conn.execute("""
SELECT SUM(cost_usd) as total_cost, SUM(tokens_in) as total_in,
SUM(tokens_out) as total_out, COUNT(*) as num_calls
FROM api_usage_log
WHERE timestamp >= datetime('now', ?)
""", (f"-{days} days",)).fetchone()
return dict(row) if row else {"total_cost": 0, "total_in": 0, "total_out": 0, "num_calls": 0}
if __name__ == "__main__":
# Self-test
init_db("test_research_os.db")
conn = get_db("test_research_os.db")
# Test claim CRUD
cid = create_claim(conn, "Test claim about graphene", "Fact", 0.85,
evidence_strength=0.9, study_quality_weight=1.0,
journal_tier_weight=1.0, completeness_penalty=1.0)
claim = get_claim(conn, cid)
assert claim is not None
assert claim['confidence'] == 0.85
assert claim['status'] == 'Complete'
# Test with missing fields
cid2 = create_claim(conn, "Incomplete claim", "Hypothesis", 0.35,
missing_fields=["temperature", "pH"])
claim2 = get_claim(conn, cid2)
assert claim2['status'] == 'Incomplete'
assert len(claim2['missing_fields']) == 2
# Test source
create_source(conn, "10.1234/test", "Test Paper", ["Author A"], 2024,
"Nature", 1)
src = get_source(conn, "10.1234/test")
assert src is not None
# Test goal
gid = create_goal(conn, "Achieve LOD < 1 fM", "high", [cid])
goals = get_goals_by_priority(conn)
assert len(goals) == 1
# Test conflict
conf_id = create_conflict(conn, cid, cid2, "value_mismatch",
"Different measurement conditions")
# Test override
ovr_id = create_override(conn, cid, "Dr. Smith", "Expert knowledge", 0.95)
claim_updated = get_claim(conn, cid)
assert claim_updated['confidence'] == 0.95
assert claim_updated['expert_override'] is not None
# Test search
results = search_claims(conn, query="graphene")
assert len(results) >= 1
# Test fixed-point math
assert to_fixed(0.85) == 850
assert from_fixed(850) == 0.85
assert to_fixed(0.123) == 123
conn.close()
os.remove("test_research_os.db")
print("All database tests passed! ✓")