nkshirsa commited on
Commit
4bca821
·
verified ·
1 Parent(s): b20f4c2

Add phd_research_os/pipeline.py

Browse files
Files changed (1) hide show
  1. phd_research_os/pipeline.py +276 -0
phd_research_os/pipeline.py ADDED
@@ -0,0 +1,276 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ PhD Research OS — Pipeline Orchestrator (Phase 1 + Phase 6)
3
+ ============================================================
4
+ End-to-end pipeline: PDF → Text → Claims → Conflicts → Obsidian Export
5
+
6
+ Implements:
7
+ - PDF text extraction (PyMuPDF / pdfplumber fallback)
8
+ - Source metadata via CrossRef API
9
+ - Claim extraction + confidence scoring via AI brain
10
+ - Batch inbox processing
11
+ - Cost tracking
12
+ """
13
+
14
+ import os
15
+ import json
16
+ import glob
17
+ import shutil
18
+ import time
19
+ from datetime import datetime
20
+ from typing import Optional
21
+ from pathlib import Path
22
+
23
+ from .db import (get_db, init_db, create_claim, create_source,
24
+ get_claim, search_claims, log_api_usage, get_cost_summary)
25
+ from .agents import ResearchOSBrain
26
+
27
+
28
+ class Pipeline:
29
+ """
30
+ PhD Research OS Paper Ingestion Pipeline.
31
+
32
+ Usage:
33
+ pipeline = Pipeline()
34
+ result = pipeline.process_paper("path/to/paper.pdf")
35
+ pipeline.process_inbox() # Batch process /inbox/ folder
36
+ """
37
+
38
+ def __init__(self, db_path: str = None, brain: ResearchOSBrain = None):
39
+ self.db_path = db_path or os.environ.get("RESEARCH_OS_DB", "data/research_os.db")
40
+ init_db(self.db_path)
41
+ self.conn = get_db(self.db_path)
42
+ self.brain = brain
43
+
44
+ # Ensure directories exist
45
+ for d in ["inbox", "processed", "lab_data", "data"]:
46
+ os.makedirs(d, exist_ok=True)
47
+
48
+ def extract_text(self, pdf_path: str) -> list[str]:
49
+ """
50
+ Extract text from PDF, page by page.
51
+ Primary: PyMuPDF (fitz). Fallback: pdfplumber.
52
+ """
53
+ pages = []
54
+
55
+ try:
56
+ import fitz # PyMuPDF
57
+ doc = fitz.open(pdf_path)
58
+ for page in doc:
59
+ text = page.get_text()
60
+ if text.strip():
61
+ pages.append(text)
62
+ doc.close()
63
+ if pages:
64
+ return pages
65
+ except ImportError:
66
+ pass
67
+ except Exception as e:
68
+ print(f" PyMuPDF failed: {e}, trying pdfplumber...")
69
+
70
+ try:
71
+ import pdfplumber
72
+ with pdfplumber.open(pdf_path) as pdf:
73
+ for page in pdf.pages:
74
+ text = page.extract_text()
75
+ if text and text.strip():
76
+ pages.append(text)
77
+ if pages:
78
+ return pages
79
+ except ImportError:
80
+ pass
81
+ except Exception as e:
82
+ print(f" pdfplumber failed: {e}")
83
+
84
+ raise RuntimeError(f"Could not extract text from {pdf_path}. Install PyMuPDF or pdfplumber.")
85
+
86
+ def lookup_doi(self, text: str) -> Optional[dict]:
87
+ """
88
+ Look up DOI metadata via CrossRef API.
89
+ Searches for DOI in text, then fetches metadata.
90
+ """
91
+ import re
92
+
93
+ # Try to find DOI in text
94
+ doi_pattern = r'10\.\d{4,}/[^\s]+'
95
+ match = re.search(doi_pattern, text)
96
+
97
+ if not match:
98
+ return None
99
+
100
+ doi = match.group(0).rstrip('.,;)')
101
+
102
+ try:
103
+ import httpx
104
+ response = httpx.get(
105
+ f"https://api.crossref.org/works/{doi}",
106
+ timeout=10,
107
+ headers={"User-Agent": "PhDResearchOS/1.0 (mailto:research@example.com)"}
108
+ )
109
+ if response.status_code == 200:
110
+ data = response.json()["message"]
111
+ return {
112
+ "doi": doi,
113
+ "title": data.get("title", [""])[0],
114
+ "authors": [f"{a.get('given', '')} {a.get('family', '')}"
115
+ for a in data.get("author", [])],
116
+ "year": data.get("published-print", {}).get("date-parts", [[None]])[0][0],
117
+ "journal": data.get("container-title", [""])[0],
118
+ }
119
+ except Exception as e:
120
+ print(f" DOI lookup failed: {e}")
121
+
122
+ return {"doi": doi, "title": "", "authors": [], "year": None, "journal": ""}
123
+
124
+ def process_paper(self, pdf_path: str, journal_tier: int = 2,
125
+ is_canonical: bool = False) -> dict:
126
+ """
127
+ Process a single paper end-to-end.
128
+
129
+ Returns summary dict with claim counts and metrics.
130
+ """
131
+ print(f"\n{'='*60}")
132
+ print(f"Processing: {pdf_path}")
133
+ print(f"{'='*60}")
134
+
135
+ start_time = time.time()
136
+ result = {
137
+ "file": pdf_path,
138
+ "claims_extracted": 0,
139
+ "claims_complete": 0,
140
+ "claims_incomplete": 0,
141
+ "epistemic_distribution": {},
142
+ "avg_confidence": 0.0,
143
+ "doi": None,
144
+ "errors": [],
145
+ "processing_time_s": 0,
146
+ }
147
+
148
+ # Step 1: Extract text
149
+ print(" Step 1: Extracting text...")
150
+ try:
151
+ pages = self.extract_text(pdf_path)
152
+ print(f" Extracted {len(pages)} pages")
153
+ except Exception as e:
154
+ result["errors"].append(f"Text extraction failed: {e}")
155
+ return result
156
+
157
+ # Step 2: DOI lookup
158
+ print(" Step 2: Looking up DOI...")
159
+ full_text = " ".join(pages[:3]) # Search first 3 pages for DOI
160
+ doi_info = self.lookup_doi(full_text)
161
+ if doi_info:
162
+ result["doi"] = doi_info["doi"]
163
+ create_source(self.conn, doi_info["doi"], doi_info["title"],
164
+ doi_info["authors"], doi_info["year"], doi_info["journal"],
165
+ journal_tier, is_canonical=is_canonical)
166
+ print(f" Found DOI: {doi_info['doi']}")
167
+ else:
168
+ print(" No DOI found")
169
+
170
+ # Step 3: Extract claims via AI brain
171
+ if self.brain is None:
172
+ print(" Step 3: Skipping claim extraction (no brain configured)")
173
+ print(" To enable: set brain=ResearchOSBrain(...) or use API mode")
174
+ result["processing_time_s"] = time.time() - start_time
175
+ return result
176
+
177
+ print(" Step 3: Extracting claims via AI brain...")
178
+ all_claims = []
179
+
180
+ for i, page_text in enumerate(pages):
181
+ if len(page_text.strip()) < 100: # Skip near-empty pages
182
+ continue
183
+
184
+ print(f" Processing page {i+1}/{len(pages)}...")
185
+ response = self.brain.extract_claims(page_text)
186
+
187
+ if response.success and "claims" in response.data:
188
+ for claim_data in response.data["claims"]:
189
+ try:
190
+ cid = create_claim(
191
+ self.conn,
192
+ text=claim_data.get("text", ""),
193
+ epistemic_tag=claim_data.get("epistemic_tag", "Interpretation"),
194
+ confidence=float(claim_data.get("confidence", 0.5)),
195
+ source_doi=doi_info["doi"] if doi_info else None,
196
+ evidence_strength=float(claim_data.get("evidence_strength", 0.5)),
197
+ study_type=claim_data.get("study_type"),
198
+ missing_fields=claim_data.get("missing_fields", []),
199
+ parameters=claim_data.get("parameters", {}),
200
+ is_canonical=is_canonical,
201
+ )
202
+ all_claims.append(get_claim(self.conn, cid))
203
+ except Exception as e:
204
+ result["errors"].append(f"Claim storage error: {e}")
205
+ else:
206
+ result["errors"].append(f"Page {i+1}: Extraction failed")
207
+
208
+ # Step 4: Compile statistics
209
+ result["claims_extracted"] = len(all_claims)
210
+ result["claims_complete"] = sum(1 for c in all_claims if c["status"] == "Complete")
211
+ result["claims_incomplete"] = sum(1 for c in all_claims if c["status"] == "Incomplete")
212
+
213
+ for c in all_claims:
214
+ tag = c["epistemic_tag"]
215
+ result["epistemic_distribution"][tag] = result["epistemic_distribution"].get(tag, 0) + 1
216
+
217
+ if all_claims:
218
+ result["avg_confidence"] = sum(c["confidence"] for c in all_claims) / len(all_claims)
219
+
220
+ result["processing_time_s"] = time.time() - start_time
221
+
222
+ # Print summary
223
+ print(f"\n Summary:")
224
+ print(f" Claims extracted: {result['claims_extracted']}")
225
+ print(f" Complete/Incomplete: {result['claims_complete']}/{result['claims_incomplete']}")
226
+ print(f" Epistemic distribution: {result['epistemic_distribution']}")
227
+ print(f" Average confidence: {result['avg_confidence']:.3f}")
228
+ print(f" Processing time: {result['processing_time_s']:.1f}s")
229
+ if result["errors"]:
230
+ print(f" Errors: {len(result['errors'])}")
231
+
232
+ return result
233
+
234
+ def process_inbox(self, rate_limit: int = 5) -> list:
235
+ """
236
+ Batch process all PDFs in /inbox/.
237
+ Moves processed files to /processed/ with timestamp.
238
+
239
+ Args:
240
+ rate_limit: Max papers per hour (default 5)
241
+ """
242
+ pdf_files = sorted(glob.glob("inbox/*.pdf"))
243
+
244
+ if not pdf_files:
245
+ print("No PDFs in inbox/")
246
+ return []
247
+
248
+ print(f"Found {len(pdf_files)} PDFs in inbox/")
249
+ results = []
250
+
251
+ delay = 3600 / rate_limit # seconds between papers
252
+
253
+ for i, pdf_path in enumerate(pdf_files):
254
+ print(f"\n[{i+1}/{len(pdf_files)}] ", end="")
255
+ result = self.process_paper(pdf_path)
256
+ results.append(result)
257
+
258
+ # Move to processed
259
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
260
+ dest = f"processed/{timestamp}_{os.path.basename(pdf_path)}"
261
+ shutil.move(pdf_path, dest)
262
+ print(f" Moved to {dest}")
263
+
264
+ # Rate limiting
265
+ if i < len(pdf_files) - 1:
266
+ print(f" Waiting {delay:.0f}s (rate limit: {rate_limit}/hr)...")
267
+ time.sleep(min(delay, 5)) # Cap at 5s in practice
268
+
269
+ # Print batch summary
270
+ total_claims = sum(r["claims_extracted"] for r in results)
271
+ total_errors = sum(len(r["errors"]) for r in results)
272
+ print(f"\n{'='*60}")
273
+ print(f"Batch complete: {len(results)} papers, {total_claims} claims, {total_errors} errors")
274
+ print(f"{'='*60}")
275
+
276
+ return results