""" PhD Research OS — Obsidian Export (Phase 4) ============================================= One-directional export: System → Obsidian vault. Generates Markdown with YAML frontmatter and wiki-links. """ import os import json from datetime import datetime from pathlib import Path from .db import get_db, search_claims, get_goals_by_priority, from_fixed class ObsidianExporter: """ Export Research OS data to Obsidian vault as linked Markdown notes. Structure: /vault/ ├── Claims/ │ └── CLM_XXXX.md ├── Sources/ │ └── DOI_XXXX.md ├── Goals/ │ └── GOAL_XXXX.md └── Dashboard.md """ def __init__(self, vault_path: str = "vault", db_path: str = None): self.vault_path = Path(vault_path) self.db_path = db_path or os.environ.get("RESEARCH_OS_DB", "data/research_os.db") # Create directories for d in ["Claims", "Sources", "Goals", "Conflicts", "Decisions"]: (self.vault_path / d).mkdir(parents=True, exist_ok=True) def export_all(self): """Export everything to Obsidian vault. Idempotent — overwrites existing.""" conn = get_db(self.db_path) print("Exporting to Obsidian vault...") # Export claims claims = conn.execute("SELECT * FROM claims ORDER BY claim_id").fetchall() for claim in claims: self._export_claim(dict(claim)) print(f" Claims: {len(claims)}") # Export sources sources = conn.execute("SELECT * FROM sources ORDER BY doi").fetchall() for source in sources: self._export_source(dict(source)) print(f" Sources: {len(sources)}") # Export goals goals = conn.execute("SELECT * FROM goals ORDER BY goal_id").fetchall() for goal in goals: self._export_goal(dict(goal)) print(f" Goals: {len(goals)}") # Export conflicts conflicts = conn.execute("SELECT * FROM conflicts ORDER BY conflict_id").fetchall() for conflict in conflicts: self._export_conflict(dict(conflict)) print(f" Conflicts: {len(conflicts)}") # Export decisions decisions = conn.execute("SELECT * FROM decisions ORDER BY decision_id").fetchall() for decision in decisions: self._export_decision(dict(decision)) print(f" Decisions: {len(decisions)}") # Generate dashboard self._export_dashboard(conn) print(f" Dashboard: ✓") conn.close() print(f"\nVault exported to: {self.vault_path}") def _export_claim(self, claim: dict): """Export a single claim as Markdown with YAML frontmatter.""" cid = claim['claim_id'] confidence = from_fixed(claim['confidence']) if isinstance(claim['confidence'], int) else claim['confidence'] missing = json.loads(claim.get('missing_fields', '[]')) params = json.loads(claim.get('parameters', '{}')) content = f"""--- claim_id: {cid} source_doi: {claim.get('source_doi', 'unknown')} epistemic_tag: {claim['epistemic_tag']} confidence: {confidence} status: {claim['status']} study_type: {claim.get('study_type', 'unknown')} is_canonical: {bool(claim.get('is_canonical', 0))} schema_version: {claim.get('schema_version', '1.0')} created_at: {claim.get('created_at', '')} --- ## Claim Text {claim['text']} ## Parameters {self._format_params(params)} ## Missing Fields {', '.join(missing) if missing else 'None — claim is complete'} ## Linked Source {f'[[{claim.get("source_doi", "")}]]' if claim.get('source_doi') else 'No source linked'} """ if claim.get('expert_override'): override = json.loads(claim['expert_override']) if isinstance(claim['expert_override'], str) else claim['expert_override'] content += f""" ## Expert Override - **By:** {override.get('who', 'unknown')} - **Rationale:** {override.get('rationale', '')} - **Override ID:** {override.get('override_id', '')} """ filepath = self.vault_path / "Claims" / f"{cid}.md" filepath.write_text(content) def _export_source(self, source: dict): """Export a source as Markdown.""" doi = source['doi'] authors = json.loads(source.get('authors', '[]')) safe_doi = doi.replace("/", "_").replace(".", "_") content = f"""--- doi: {doi} title: "{source.get('title', '')}" year: {source.get('year', '')} journal: "{source.get('journal', '')}" journal_tier: {source.get('journal_tier', '')} is_canonical: {bool(source.get('is_canonical', 0))} --- ## {source.get('title', 'Untitled')} **DOI:** [{doi}](https://doi.org/{doi}) **Authors:** {', '.join(authors)} **Year:** {source.get('year', 'Unknown')} **Journal:** {source.get('journal', 'Unknown')} (Tier {source.get('journal_tier', '?')}) **Study Type:** {source.get('study_type', 'Unknown')} """ filepath = self.vault_path / "Sources" / f"{safe_doi}.md" filepath.write_text(content) def _export_goal(self, goal: dict): """Export a goal as Markdown.""" gid = goal['goal_id'] linked = json.loads(goal.get('linked_claim_ids', '[]')) content = f"""--- goal_id: {gid} priority: {goal['priority']} status: {goal['status']} --- ## {goal['description']} **Priority:** {goal['priority']} **Status:** {goal['status']} ## Linked Claims {chr(10).join(f'- [[{cid}]]' for cid in linked) if linked else 'No claims linked yet'} """ filepath = self.vault_path / "Goals" / f"{gid}.md" filepath.write_text(content) def _export_conflict(self, conflict: dict): """Export a conflict as Markdown.""" content = f"""--- conflict_id: {conflict['conflict_id']} conflict_type: {conflict['conflict_type']} resolution_status: {conflict['resolution_status']} hypothesis_confidence: {conflict.get('hypothesis_confidence', 'low')} --- ## Conflict: [[{conflict['claim_a_id']}]] vs [[{conflict['claim_b_id']}]] **Type:** {conflict['conflict_type']} **Status:** {conflict['resolution_status']} ## Generated Hypothesis {conflict.get('generated_hypothesis', 'No hypothesis generated')} ⚠️ **Hypothesis confidence: LOW** — Requires human review ## Key Differences {chr(10).join(f'- {d}' for d in json.loads(conflict.get('key_differences', '[]')))} """ filepath = self.vault_path / "Conflicts" / f"{conflict['conflict_id']}.md" filepath.write_text(content) def _export_decision(self, decision: dict): """Export a decision as Markdown.""" info_gain = from_fixed(decision['expected_information_gain']) if isinstance(decision['expected_information_gain'], int) else decision['expected_information_gain'] linked = json.loads(decision.get('linked_claim_ids', '[]')) content = f"""--- decision_id: {decision['decision_id']} action: {decision['recommended_action']} priority: {decision.get('priority', 'medium')} status: {decision['status']} info_gain: {info_gain} --- ## {decision.get('action_description', 'No description')} **Action:** {decision['recommended_action']} **Priority:** {decision.get('priority', 'medium')} **Status:** {decision['status']} **Expected Info Gain:** {info_gain} **Estimated Effort:** {decision.get('estimated_effort', 'Unknown')} ## Linked Goal {f'[[{decision.get("linked_goal_id", "")}]]' if decision.get('linked_goal_id') else 'No goal linked'} ## Linked Claims {chr(10).join(f'- [[{cid}]]' for cid in linked) if linked else 'None'} """ if decision.get('why_not_log'): content += f""" ## Why Not Log (Rejection Reason) {decision['why_not_log']} """ filepath = self.vault_path / "Decisions" / f"{decision['decision_id']}.md" filepath.write_text(content) def _export_dashboard(self, conn): """Generate Dashboard.md with system summary.""" # Gather stats claim_count = conn.execute("SELECT COUNT(*) FROM claims").fetchone()[0] source_count = conn.execute("SELECT COUNT(*) FROM sources").fetchone()[0] goal_count = conn.execute("SELECT COUNT(*) FROM goals WHERE status = 'Active'").fetchone()[0] conflict_count = conn.execute("SELECT COUNT(*) FROM conflicts WHERE resolution_status = 'Unresolved'").fetchone()[0] incomplete_count = conn.execute("SELECT COUNT(*) FROM claims WHERE status = 'Incomplete'").fetchone()[0] override_count = conn.execute("SELECT COUNT(*) FROM overrides").fetchone()[0] # Epistemic distribution epist = conn.execute( "SELECT epistemic_tag, COUNT(*) as cnt FROM claims GROUP BY epistemic_tag" ).fetchall() # Recent sources recent = conn.execute( "SELECT doi, title FROM sources ORDER BY created_at DESC LIMIT 10" ).fetchall() # High-confidence claims high_conf = conn.execute( "SELECT claim_id, text, confidence FROM claims ORDER BY confidence DESC LIMIT 10" ).fetchall() now = datetime.now().strftime("%Y-%m-%d %H:%M") content = f"""--- type: dashboard updated: {now} --- # PhD Research OS Dashboard *Last updated: {now}* ## Overview | Metric | Count | |--------|-------| | Total Claims | {claim_count} | | Sources | {source_count} | | Active Goals | {goal_count} | | Unresolved Conflicts | {conflict_count} | | Incomplete Claims | {incomplete_count} | | Expert Overrides | {override_count} | ## Epistemic Distribution | Tag | Count | |-----|-------| {chr(10).join(f'| {dict(e)["epistemic_tag"]} | {dict(e)["cnt"]} |' for e in epist)} ## Recent Ingestions {chr(10).join(f'- [{dict(s)["title"][:60]}...]({dict(s)["doi"]})' for s in recent)} ## Top Confidence Claims {chr(10).join(f'- [[{dict(c)["claim_id"]}]] ({from_fixed(dict(c)["confidence"]):.3f}): {dict(c)["text"][:80]}...' for c in high_conf)} ## ⚠️ Attention Required ### Incomplete Claims ({incomplete_count}) {'See Claims/ folder — filter by status: Incomplete' if incomplete_count else '✅ All claims complete'} ### Unresolved Conflicts ({conflict_count}) {'See Conflicts/ folder for items requiring human review' if conflict_count else '✅ No unresolved conflicts'} """ filepath = self.vault_path / "Dashboard.md" filepath.write_text(content) def _format_params(self, params: dict) -> str: if not params: return "No parameters recorded" return "\n".join(f"- **{k}:** {v}" for k, v in params.items())