| """ |
| PhD Research OS — Obsidian Export (Phase 4) |
| ============================================= |
| One-directional export: System → Obsidian vault. |
| Generates Markdown with YAML frontmatter and wiki-links. |
| """ |
|
|
| import os |
| import json |
| from datetime import datetime |
| from pathlib import Path |
|
|
| from .db import get_db, search_claims, get_goals_by_priority, from_fixed |
|
|
|
|
| class ObsidianExporter: |
| """ |
| Export Research OS data to Obsidian vault as linked Markdown notes. |
| |
| Structure: |
| /vault/ |
| ├── Claims/ |
| │ └── CLM_XXXX.md |
| ├── Sources/ |
| │ └── DOI_XXXX.md |
| ├── Goals/ |
| │ └── GOAL_XXXX.md |
| └── Dashboard.md |
| """ |
| |
| def __init__(self, vault_path: str = "vault", db_path: str = None): |
| self.vault_path = Path(vault_path) |
| self.db_path = db_path or os.environ.get("RESEARCH_OS_DB", "data/research_os.db") |
| |
| |
| for d in ["Claims", "Sources", "Goals", "Conflicts", "Decisions"]: |
| (self.vault_path / d).mkdir(parents=True, exist_ok=True) |
| |
| def export_all(self): |
| """Export everything to Obsidian vault. Idempotent — overwrites existing.""" |
| conn = get_db(self.db_path) |
| |
| print("Exporting to Obsidian vault...") |
| |
| |
| claims = conn.execute("SELECT * FROM claims ORDER BY claim_id").fetchall() |
| for claim in claims: |
| self._export_claim(dict(claim)) |
| print(f" Claims: {len(claims)}") |
| |
| |
| sources = conn.execute("SELECT * FROM sources ORDER BY doi").fetchall() |
| for source in sources: |
| self._export_source(dict(source)) |
| print(f" Sources: {len(sources)}") |
| |
| |
| goals = conn.execute("SELECT * FROM goals ORDER BY goal_id").fetchall() |
| for goal in goals: |
| self._export_goal(dict(goal)) |
| print(f" Goals: {len(goals)}") |
| |
| |
| conflicts = conn.execute("SELECT * FROM conflicts ORDER BY conflict_id").fetchall() |
| for conflict in conflicts: |
| self._export_conflict(dict(conflict)) |
| print(f" Conflicts: {len(conflicts)}") |
| |
| |
| decisions = conn.execute("SELECT * FROM decisions ORDER BY decision_id").fetchall() |
| for decision in decisions: |
| self._export_decision(dict(decision)) |
| print(f" Decisions: {len(decisions)}") |
| |
| |
| self._export_dashboard(conn) |
| print(f" Dashboard: ✓") |
| |
| conn.close() |
| print(f"\nVault exported to: {self.vault_path}") |
| |
| def _export_claim(self, claim: dict): |
| """Export a single claim as Markdown with YAML frontmatter.""" |
| cid = claim['claim_id'] |
| confidence = from_fixed(claim['confidence']) if isinstance(claim['confidence'], int) else claim['confidence'] |
| missing = json.loads(claim.get('missing_fields', '[]')) |
| params = json.loads(claim.get('parameters', '{}')) |
| |
| content = f"""--- |
| claim_id: {cid} |
| source_doi: {claim.get('source_doi', 'unknown')} |
| epistemic_tag: {claim['epistemic_tag']} |
| confidence: {confidence} |
| status: {claim['status']} |
| study_type: {claim.get('study_type', 'unknown')} |
| is_canonical: {bool(claim.get('is_canonical', 0))} |
| schema_version: {claim.get('schema_version', '1.0')} |
| created_at: {claim.get('created_at', '')} |
| --- |
| |
| ## Claim Text |
| |
| {claim['text']} |
| |
| ## Parameters |
| |
| {self._format_params(params)} |
| |
| ## Missing Fields |
| |
| {', '.join(missing) if missing else 'None — claim is complete'} |
| |
| ## Linked Source |
| |
| {f'[[{claim.get("source_doi", "")}]]' if claim.get('source_doi') else 'No source linked'} |
| """ |
| |
| if claim.get('expert_override'): |
| override = json.loads(claim['expert_override']) if isinstance(claim['expert_override'], str) else claim['expert_override'] |
| content += f""" |
| ## Expert Override |
| |
| - **By:** {override.get('who', 'unknown')} |
| - **Rationale:** {override.get('rationale', '')} |
| - **Override ID:** {override.get('override_id', '')} |
| """ |
| |
| filepath = self.vault_path / "Claims" / f"{cid}.md" |
| filepath.write_text(content) |
| |
| def _export_source(self, source: dict): |
| """Export a source as Markdown.""" |
| doi = source['doi'] |
| authors = json.loads(source.get('authors', '[]')) |
| |
| safe_doi = doi.replace("/", "_").replace(".", "_") |
| |
| content = f"""--- |
| doi: {doi} |
| title: "{source.get('title', '')}" |
| year: {source.get('year', '')} |
| journal: "{source.get('journal', '')}" |
| journal_tier: {source.get('journal_tier', '')} |
| is_canonical: {bool(source.get('is_canonical', 0))} |
| --- |
| |
| ## {source.get('title', 'Untitled')} |
| |
| **DOI:** [{doi}](https://doi.org/{doi}) |
| **Authors:** {', '.join(authors)} |
| **Year:** {source.get('year', 'Unknown')} |
| **Journal:** {source.get('journal', 'Unknown')} (Tier {source.get('journal_tier', '?')}) |
| **Study Type:** {source.get('study_type', 'Unknown')} |
| """ |
| |
| filepath = self.vault_path / "Sources" / f"{safe_doi}.md" |
| filepath.write_text(content) |
| |
| def _export_goal(self, goal: dict): |
| """Export a goal as Markdown.""" |
| gid = goal['goal_id'] |
| linked = json.loads(goal.get('linked_claim_ids', '[]')) |
| |
| content = f"""--- |
| goal_id: {gid} |
| priority: {goal['priority']} |
| status: {goal['status']} |
| --- |
| |
| ## {goal['description']} |
| |
| **Priority:** {goal['priority']} |
| **Status:** {goal['status']} |
| |
| ## Linked Claims |
| |
| {chr(10).join(f'- [[{cid}]]' for cid in linked) if linked else 'No claims linked yet'} |
| """ |
| |
| filepath = self.vault_path / "Goals" / f"{gid}.md" |
| filepath.write_text(content) |
| |
| def _export_conflict(self, conflict: dict): |
| """Export a conflict as Markdown.""" |
| content = f"""--- |
| conflict_id: {conflict['conflict_id']} |
| conflict_type: {conflict['conflict_type']} |
| resolution_status: {conflict['resolution_status']} |
| hypothesis_confidence: {conflict.get('hypothesis_confidence', 'low')} |
| --- |
| |
| ## Conflict: [[{conflict['claim_a_id']}]] vs [[{conflict['claim_b_id']}]] |
| |
| **Type:** {conflict['conflict_type']} |
| **Status:** {conflict['resolution_status']} |
| |
| ## Generated Hypothesis |
| |
| {conflict.get('generated_hypothesis', 'No hypothesis generated')} |
| |
| ⚠️ **Hypothesis confidence: LOW** — Requires human review |
| |
| ## Key Differences |
| |
| {chr(10).join(f'- {d}' for d in json.loads(conflict.get('key_differences', '[]')))} |
| """ |
| |
| filepath = self.vault_path / "Conflicts" / f"{conflict['conflict_id']}.md" |
| filepath.write_text(content) |
| |
| def _export_decision(self, decision: dict): |
| """Export a decision as Markdown.""" |
| info_gain = from_fixed(decision['expected_information_gain']) if isinstance(decision['expected_information_gain'], int) else decision['expected_information_gain'] |
| linked = json.loads(decision.get('linked_claim_ids', '[]')) |
| |
| content = f"""--- |
| decision_id: {decision['decision_id']} |
| action: {decision['recommended_action']} |
| priority: {decision.get('priority', 'medium')} |
| status: {decision['status']} |
| info_gain: {info_gain} |
| --- |
| |
| ## {decision.get('action_description', 'No description')} |
| |
| **Action:** {decision['recommended_action']} |
| **Priority:** {decision.get('priority', 'medium')} |
| **Status:** {decision['status']} |
| **Expected Info Gain:** {info_gain} |
| **Estimated Effort:** {decision.get('estimated_effort', 'Unknown')} |
| |
| ## Linked Goal |
| |
| {f'[[{decision.get("linked_goal_id", "")}]]' if decision.get('linked_goal_id') else 'No goal linked'} |
| |
| ## Linked Claims |
| |
| {chr(10).join(f'- [[{cid}]]' for cid in linked) if linked else 'None'} |
| """ |
| |
| if decision.get('why_not_log'): |
| content += f""" |
| ## Why Not Log (Rejection Reason) |
| |
| {decision['why_not_log']} |
| """ |
| |
| filepath = self.vault_path / "Decisions" / f"{decision['decision_id']}.md" |
| filepath.write_text(content) |
| |
| def _export_dashboard(self, conn): |
| """Generate Dashboard.md with system summary.""" |
| |
| claim_count = conn.execute("SELECT COUNT(*) FROM claims").fetchone()[0] |
| source_count = conn.execute("SELECT COUNT(*) FROM sources").fetchone()[0] |
| goal_count = conn.execute("SELECT COUNT(*) FROM goals WHERE status = 'Active'").fetchone()[0] |
| conflict_count = conn.execute("SELECT COUNT(*) FROM conflicts WHERE resolution_status = 'Unresolved'").fetchone()[0] |
| incomplete_count = conn.execute("SELECT COUNT(*) FROM claims WHERE status = 'Incomplete'").fetchone()[0] |
| override_count = conn.execute("SELECT COUNT(*) FROM overrides").fetchone()[0] |
| |
| |
| epist = conn.execute( |
| "SELECT epistemic_tag, COUNT(*) as cnt FROM claims GROUP BY epistemic_tag" |
| ).fetchall() |
| |
| |
| recent = conn.execute( |
| "SELECT doi, title FROM sources ORDER BY created_at DESC LIMIT 10" |
| ).fetchall() |
| |
| |
| high_conf = conn.execute( |
| "SELECT claim_id, text, confidence FROM claims ORDER BY confidence DESC LIMIT 10" |
| ).fetchall() |
| |
| now = datetime.now().strftime("%Y-%m-%d %H:%M") |
| |
| content = f"""--- |
| type: dashboard |
| updated: {now} |
| --- |
| |
| # PhD Research OS Dashboard |
| |
| *Last updated: {now}* |
| |
| ## Overview |
| |
| | Metric | Count | |
| |--------|-------| |
| | Total Claims | {claim_count} | |
| | Sources | {source_count} | |
| | Active Goals | {goal_count} | |
| | Unresolved Conflicts | {conflict_count} | |
| | Incomplete Claims | {incomplete_count} | |
| | Expert Overrides | {override_count} | |
| |
| ## Epistemic Distribution |
| |
| | Tag | Count | |
| |-----|-------| |
| {chr(10).join(f'| {dict(e)["epistemic_tag"]} | {dict(e)["cnt"]} |' for e in epist)} |
| |
| ## Recent Ingestions |
| |
| {chr(10).join(f'- [{dict(s)["title"][:60]}...]({dict(s)["doi"]})' for s in recent)} |
| |
| ## Top Confidence Claims |
| |
| {chr(10).join(f'- [[{dict(c)["claim_id"]}]] ({from_fixed(dict(c)["confidence"]):.3f}): {dict(c)["text"][:80]}...' for c in high_conf)} |
| |
| ## ⚠️ Attention Required |
| |
| ### Incomplete Claims ({incomplete_count}) |
| {'See Claims/ folder — filter by status: Incomplete' if incomplete_count else '✅ All claims complete'} |
| |
| ### Unresolved Conflicts ({conflict_count}) |
| {'See Conflicts/ folder for items requiring human review' if conflict_count else '✅ No unresolved conflicts'} |
| """ |
| |
| filepath = self.vault_path / "Dashboard.md" |
| filepath.write_text(content) |
| |
| def _format_params(self, params: dict) -> str: |
| if not params: |
| return "No parameters recorded" |
| return "\n".join(f"- **{k}:** {v}" for k, v in params.items()) |
|
|