phd-research-os-brain / phd_research_os /obsidian_export.py
nkshirsa's picture
Add phd_research_os/obsidian_export.py
c98d2ae verified
"""
PhD Research OS — Obsidian Export (Phase 4)
=============================================
One-directional export: System → Obsidian vault.
Generates Markdown with YAML frontmatter and wiki-links.
"""
import os
import json
from datetime import datetime
from pathlib import Path
from .db import get_db, search_claims, get_goals_by_priority, from_fixed
class ObsidianExporter:
"""
Export Research OS data to Obsidian vault as linked Markdown notes.
Structure:
/vault/
├── Claims/
│ └── CLM_XXXX.md
├── Sources/
│ └── DOI_XXXX.md
├── Goals/
│ └── GOAL_XXXX.md
└── Dashboard.md
"""
def __init__(self, vault_path: str = "vault", db_path: str = None):
self.vault_path = Path(vault_path)
self.db_path = db_path or os.environ.get("RESEARCH_OS_DB", "data/research_os.db")
# Create directories
for d in ["Claims", "Sources", "Goals", "Conflicts", "Decisions"]:
(self.vault_path / d).mkdir(parents=True, exist_ok=True)
def export_all(self):
"""Export everything to Obsidian vault. Idempotent — overwrites existing."""
conn = get_db(self.db_path)
print("Exporting to Obsidian vault...")
# Export claims
claims = conn.execute("SELECT * FROM claims ORDER BY claim_id").fetchall()
for claim in claims:
self._export_claim(dict(claim))
print(f" Claims: {len(claims)}")
# Export sources
sources = conn.execute("SELECT * FROM sources ORDER BY doi").fetchall()
for source in sources:
self._export_source(dict(source))
print(f" Sources: {len(sources)}")
# Export goals
goals = conn.execute("SELECT * FROM goals ORDER BY goal_id").fetchall()
for goal in goals:
self._export_goal(dict(goal))
print(f" Goals: {len(goals)}")
# Export conflicts
conflicts = conn.execute("SELECT * FROM conflicts ORDER BY conflict_id").fetchall()
for conflict in conflicts:
self._export_conflict(dict(conflict))
print(f" Conflicts: {len(conflicts)}")
# Export decisions
decisions = conn.execute("SELECT * FROM decisions ORDER BY decision_id").fetchall()
for decision in decisions:
self._export_decision(dict(decision))
print(f" Decisions: {len(decisions)}")
# Generate dashboard
self._export_dashboard(conn)
print(f" Dashboard: ✓")
conn.close()
print(f"\nVault exported to: {self.vault_path}")
def _export_claim(self, claim: dict):
"""Export a single claim as Markdown with YAML frontmatter."""
cid = claim['claim_id']
confidence = from_fixed(claim['confidence']) if isinstance(claim['confidence'], int) else claim['confidence']
missing = json.loads(claim.get('missing_fields', '[]'))
params = json.loads(claim.get('parameters', '{}'))
content = f"""---
claim_id: {cid}
source_doi: {claim.get('source_doi', 'unknown')}
epistemic_tag: {claim['epistemic_tag']}
confidence: {confidence}
status: {claim['status']}
study_type: {claim.get('study_type', 'unknown')}
is_canonical: {bool(claim.get('is_canonical', 0))}
schema_version: {claim.get('schema_version', '1.0')}
created_at: {claim.get('created_at', '')}
---
## Claim Text
{claim['text']}
## Parameters
{self._format_params(params)}
## Missing Fields
{', '.join(missing) if missing else 'None — claim is complete'}
## Linked Source
{f'[[{claim.get("source_doi", "")}]]' if claim.get('source_doi') else 'No source linked'}
"""
if claim.get('expert_override'):
override = json.loads(claim['expert_override']) if isinstance(claim['expert_override'], str) else claim['expert_override']
content += f"""
## Expert Override
- **By:** {override.get('who', 'unknown')}
- **Rationale:** {override.get('rationale', '')}
- **Override ID:** {override.get('override_id', '')}
"""
filepath = self.vault_path / "Claims" / f"{cid}.md"
filepath.write_text(content)
def _export_source(self, source: dict):
"""Export a source as Markdown."""
doi = source['doi']
authors = json.loads(source.get('authors', '[]'))
safe_doi = doi.replace("/", "_").replace(".", "_")
content = f"""---
doi: {doi}
title: "{source.get('title', '')}"
year: {source.get('year', '')}
journal: "{source.get('journal', '')}"
journal_tier: {source.get('journal_tier', '')}
is_canonical: {bool(source.get('is_canonical', 0))}
---
## {source.get('title', 'Untitled')}
**DOI:** [{doi}](https://doi.org/{doi})
**Authors:** {', '.join(authors)}
**Year:** {source.get('year', 'Unknown')}
**Journal:** {source.get('journal', 'Unknown')} (Tier {source.get('journal_tier', '?')})
**Study Type:** {source.get('study_type', 'Unknown')}
"""
filepath = self.vault_path / "Sources" / f"{safe_doi}.md"
filepath.write_text(content)
def _export_goal(self, goal: dict):
"""Export a goal as Markdown."""
gid = goal['goal_id']
linked = json.loads(goal.get('linked_claim_ids', '[]'))
content = f"""---
goal_id: {gid}
priority: {goal['priority']}
status: {goal['status']}
---
## {goal['description']}
**Priority:** {goal['priority']}
**Status:** {goal['status']}
## Linked Claims
{chr(10).join(f'- [[{cid}]]' for cid in linked) if linked else 'No claims linked yet'}
"""
filepath = self.vault_path / "Goals" / f"{gid}.md"
filepath.write_text(content)
def _export_conflict(self, conflict: dict):
"""Export a conflict as Markdown."""
content = f"""---
conflict_id: {conflict['conflict_id']}
conflict_type: {conflict['conflict_type']}
resolution_status: {conflict['resolution_status']}
hypothesis_confidence: {conflict.get('hypothesis_confidence', 'low')}
---
## Conflict: [[{conflict['claim_a_id']}]] vs [[{conflict['claim_b_id']}]]
**Type:** {conflict['conflict_type']}
**Status:** {conflict['resolution_status']}
## Generated Hypothesis
{conflict.get('generated_hypothesis', 'No hypothesis generated')}
⚠️ **Hypothesis confidence: LOW** — Requires human review
## Key Differences
{chr(10).join(f'- {d}' for d in json.loads(conflict.get('key_differences', '[]')))}
"""
filepath = self.vault_path / "Conflicts" / f"{conflict['conflict_id']}.md"
filepath.write_text(content)
def _export_decision(self, decision: dict):
"""Export a decision as Markdown."""
info_gain = from_fixed(decision['expected_information_gain']) if isinstance(decision['expected_information_gain'], int) else decision['expected_information_gain']
linked = json.loads(decision.get('linked_claim_ids', '[]'))
content = f"""---
decision_id: {decision['decision_id']}
action: {decision['recommended_action']}
priority: {decision.get('priority', 'medium')}
status: {decision['status']}
info_gain: {info_gain}
---
## {decision.get('action_description', 'No description')}
**Action:** {decision['recommended_action']}
**Priority:** {decision.get('priority', 'medium')}
**Status:** {decision['status']}
**Expected Info Gain:** {info_gain}
**Estimated Effort:** {decision.get('estimated_effort', 'Unknown')}
## Linked Goal
{f'[[{decision.get("linked_goal_id", "")}]]' if decision.get('linked_goal_id') else 'No goal linked'}
## Linked Claims
{chr(10).join(f'- [[{cid}]]' for cid in linked) if linked else 'None'}
"""
if decision.get('why_not_log'):
content += f"""
## Why Not Log (Rejection Reason)
{decision['why_not_log']}
"""
filepath = self.vault_path / "Decisions" / f"{decision['decision_id']}.md"
filepath.write_text(content)
def _export_dashboard(self, conn):
"""Generate Dashboard.md with system summary."""
# Gather stats
claim_count = conn.execute("SELECT COUNT(*) FROM claims").fetchone()[0]
source_count = conn.execute("SELECT COUNT(*) FROM sources").fetchone()[0]
goal_count = conn.execute("SELECT COUNT(*) FROM goals WHERE status = 'Active'").fetchone()[0]
conflict_count = conn.execute("SELECT COUNT(*) FROM conflicts WHERE resolution_status = 'Unresolved'").fetchone()[0]
incomplete_count = conn.execute("SELECT COUNT(*) FROM claims WHERE status = 'Incomplete'").fetchone()[0]
override_count = conn.execute("SELECT COUNT(*) FROM overrides").fetchone()[0]
# Epistemic distribution
epist = conn.execute(
"SELECT epistemic_tag, COUNT(*) as cnt FROM claims GROUP BY epistemic_tag"
).fetchall()
# Recent sources
recent = conn.execute(
"SELECT doi, title FROM sources ORDER BY created_at DESC LIMIT 10"
).fetchall()
# High-confidence claims
high_conf = conn.execute(
"SELECT claim_id, text, confidence FROM claims ORDER BY confidence DESC LIMIT 10"
).fetchall()
now = datetime.now().strftime("%Y-%m-%d %H:%M")
content = f"""---
type: dashboard
updated: {now}
---
# PhD Research OS Dashboard
*Last updated: {now}*
## Overview
| Metric | Count |
|--------|-------|
| Total Claims | {claim_count} |
| Sources | {source_count} |
| Active Goals | {goal_count} |
| Unresolved Conflicts | {conflict_count} |
| Incomplete Claims | {incomplete_count} |
| Expert Overrides | {override_count} |
## Epistemic Distribution
| Tag | Count |
|-----|-------|
{chr(10).join(f'| {dict(e)["epistemic_tag"]} | {dict(e)["cnt"]} |' for e in epist)}
## Recent Ingestions
{chr(10).join(f'- [{dict(s)["title"][:60]}...]({dict(s)["doi"]})' for s in recent)}
## Top Confidence Claims
{chr(10).join(f'- [[{dict(c)["claim_id"]}]] ({from_fixed(dict(c)["confidence"]):.3f}): {dict(c)["text"][:80]}...' for c in high_conf)}
## ⚠️ Attention Required
### Incomplete Claims ({incomplete_count})
{'See Claims/ folder — filter by status: Incomplete' if incomplete_count else '✅ All claims complete'}
### Unresolved Conflicts ({conflict_count})
{'See Conflicts/ folder for items requiring human review' if conflict_count else '✅ No unresolved conflicts'}
"""
filepath = self.vault_path / "Dashboard.md"
filepath.write_text(content)
def _format_params(self, params: dict) -> str:
if not params:
return "No parameters recorded"
return "\n".join(f"- **{k}:** {v}" for k, v in params.items())