| """ |
| Layer 0: Structural PDF Ingestion |
| =================================== |
| Converts PDF bundles into section-aware, bbox-annotated, quality-scored regions. |
| Uses Marker as primary parser with fallback to pdfplumber. |
| """ |
|
|
| import json |
| import os |
| import re |
| from pathlib import Path |
| from typing import Optional |
|
|
| from ..core.database import ( |
| get_db, init_db, gen_id, now_iso, to_fixed, from_fixed, hash_text |
| ) |
|
|
|
|
| |
| SECTION_PATTERNS = [ |
| (r'(?i)^(abstract)\b', 'abstract'), |
| (r'(?i)^(introduction)\b', 'introduction'), |
| (r'(?i)^(background)\b', 'introduction'), |
| (r'(?i)^(related\s+work)\b', 'related_work'), |
| (r'(?i)^(materials?\s+and\s+methods?|methods?|experimental)\b', 'methods'), |
| (r'(?i)^(results?\s+and\s+discussion)\b', 'results_discussion'), |
| (r'(?i)^(results?)\b', 'results'), |
| (r'(?i)^(discussion)\b', 'discussion'), |
| (r'(?i)^(conclusions?|summary)\b', 'conclusion'), |
| (r'(?i)^(acknowledge?ments?)\b', 'acknowledgments'), |
| (r'(?i)^(references?|bibliography)\b', 'references'), |
| (r'(?i)^(supplementary|supporting\s+information|appendix)\b', 'supplement'), |
| ] |
|
|
|
|
| def detect_section(text: str) -> Optional[str]: |
| """Detect which section a text block belongs to.""" |
| first_line = text.strip().split('\n')[0].strip() |
| |
| first_line = re.sub(r'^[\d]+\.[\d]*\s*', '', first_line) |
| first_line = re.sub(r'^[\d]+\.\s*', '', first_line) |
| first_line = re.sub(r'^[IVXivx]+\.\s*', '', first_line) |
| first_line = first_line.strip() |
| |
| for pattern, section in SECTION_PATTERNS: |
| if re.match(pattern, first_line): |
| return section |
| return None |
|
|
|
|
| def classify_region_type(text: str) -> str: |
| """Classify a text block's type based on content patterns.""" |
| stripped = text.strip() |
| |
| |
| if '|' in stripped and stripped.count('|') > 3: |
| return 'table' |
| if re.match(r'(?i)^table\s+\d', stripped): |
| return 'caption' |
| |
| |
| if re.match(r'(?i)^(figure|fig\.?)\s+\d', stripped): |
| return 'caption' |
| |
| |
| if stripped.count('$') >= 2 or '\\frac' in stripped or '\\sum' in stripped: |
| return 'equation' |
| if re.match(r'^\s*\([\d]+\)\s*$', stripped): |
| return 'equation' |
| |
| |
| if re.match(r'(?i)^references?\s*$', stripped) or re.match(r'^\[\d+\]', stripped): |
| return 'reference' |
| |
| |
| if len(stripped) < 100 and stripped.isupper(): |
| return 'header' |
| if len(stripped) < 80 and not stripped.endswith('.'): |
| section = detect_section(stripped) |
| if section: |
| return 'header' |
| |
| return 'body_text' |
|
|
|
|
| def extract_cross_references(text: str) -> list: |
| """Extract in-text references to figures, tables, equations.""" |
| refs = [] |
| |
| |
| for m in re.finditer(r'(?i)(figure|fig\.?)\s+(\d+[a-z]?)', text): |
| refs.append({ |
| "ref_text": m.group(0), |
| "ref_type": "figure", |
| "ref_number": m.group(2), |
| "resolved_to": None, |
| "verified": False, |
| }) |
| |
| |
| for m in re.finditer(r'(?i)(table)\s+(\d+[a-z]?)', text): |
| refs.append({ |
| "ref_text": m.group(0), |
| "ref_type": "table", |
| "ref_number": m.group(2), |
| "resolved_to": None, |
| "verified": False, |
| }) |
| |
| |
| for m in re.finditer(r'(?i)(eq\.?|equation)\s+\(?(\d+)\)?', text): |
| refs.append({ |
| "ref_text": m.group(0), |
| "ref_type": "equation", |
| "ref_number": m.group(2), |
| "resolved_to": None, |
| "verified": False, |
| }) |
| |
| |
| for m in re.finditer(r'\[(\d+(?:[,\-–]\s*\d+)*)\]', text): |
| refs.append({ |
| "ref_text": m.group(0), |
| "ref_type": "citation", |
| "ref_number": m.group(1), |
| "resolved_to": None, |
| "verified": False, |
| }) |
| |
| return refs |
|
|
|
|
| def score_parse_quality(text: str, method: str) -> int: |
| """Score parsing quality for a text region (fixed-point ×1000).""" |
| score = 1000 |
| |
| if not text or not text.strip(): |
| return 0 |
| |
| |
| garbled_chars = sum(1 for c in text if ord(c) > 65535 or c in '□■◊▪▫●○◆◇') |
| garbled_ratio = garbled_chars / max(len(text), 1) |
| score -= int(garbled_ratio * 3000) |
| if garbled_chars > 0: |
| score -= garbled_chars * 50 |
| |
| |
| ws_ratio = text.count(' ') / max(len(text), 1) |
| score -= int(ws_ratio * 200) |
| |
| |
| if len(text.strip()) < 20: |
| score -= 200 |
| |
| |
| if len(text) > 100 and '.' not in text: |
| score -= 300 |
| |
| |
| if method == 'marker' and '#' in text: |
| score += 50 |
| |
| return max(0, min(1000, score)) |
|
|
|
|
| class StructuralParser: |
| """ |
| Layer 0: Parse PDF bundles into annotated regions. |
| |
| Tries Marker first (best quality), falls back to pdfplumber/PyMuPDF. |
| Every region gets: section tag, bbox, quality score, cross-references. |
| """ |
| |
| def __init__(self, db_path: str = None): |
| self.db_path = db_path or os.environ.get("RESEARCH_OS_DB", "data/research_os_v2.db") |
| init_db(self.db_path) |
| self._marker_available = None |
| self._fitz_available = None |
| self._pdfplumber_available = None |
| |
| def _check_marker(self) -> bool: |
| if self._marker_available is None: |
| try: |
| import marker |
| self._marker_available = True |
| except ImportError: |
| self._marker_available = False |
| return self._marker_available |
| |
| def _check_fitz(self) -> bool: |
| if self._fitz_available is None: |
| try: |
| import fitz |
| self._fitz_available = True |
| except ImportError: |
| self._fitz_available = False |
| return self._fitz_available |
| |
| def _check_pdfplumber(self) -> bool: |
| if self._pdfplumber_available is None: |
| try: |
| import pdfplumber |
| self._pdfplumber_available = True |
| except ImportError: |
| self._pdfplumber_available = False |
| return self._pdfplumber_available |
| |
| def ingest_document(self, file_path: str, doc_type: str = "main", |
| title: str = None, doi: str = None) -> dict: |
| """ |
| Ingest a single document. Returns ingestion summary. |
| """ |
| if not os.path.exists(file_path): |
| return {"error": f"File not found: {file_path}", "doc_id": None} |
| |
| doc_id = gen_id("DOC") |
| conn = get_db(self.db_path) |
| |
| |
| conn.execute(""" |
| INSERT INTO documents (doc_id, file_path, doc_type, title, doi, |
| ingestion_status, schema_version, created_at) |
| VALUES (?, ?, ?, ?, ?, 'processing', '2.0', ?) |
| """, (doc_id, file_path, doc_type, title, doi, now_iso())) |
| conn.commit() |
| |
| |
| regions = [] |
| parse_method = "unknown" |
| |
| if file_path.lower().endswith('.pdf'): |
| if self._check_fitz(): |
| regions, parse_method = self._parse_with_fitz(file_path, doc_id) |
| elif self._check_pdfplumber(): |
| regions, parse_method = self._parse_with_pdfplumber(file_path, doc_id) |
| else: |
| conn.execute( |
| "UPDATE documents SET ingestion_status = 'failed' WHERE doc_id = ?", |
| (doc_id,) |
| ) |
| conn.commit() |
| conn.close() |
| return {"error": "No PDF parser available. Install PyMuPDF: pip install pymupdf", "doc_id": doc_id} |
| elif file_path.lower().endswith(('.csv', '.xlsx', '.xls')): |
| regions, parse_method = self._parse_tabular(file_path, doc_id) |
| elif file_path.lower().endswith(('.md', '.txt')): |
| regions, parse_method = self._parse_text(file_path, doc_id) |
| else: |
| regions, parse_method = self._parse_text(file_path, doc_id) |
| |
| |
| for region in regions: |
| conn.execute(""" |
| INSERT INTO regions (region_id, doc_id, page, bbox, region_type, |
| section, subsection, content_text, content_markdown, |
| parse_method, parse_confidence, extraction_status, |
| quality_flags, cross_refs, schema_version, created_at) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, '2.0', ?) |
| """, ( |
| region["region_id"], doc_id, region["page"], |
| json.dumps(region.get("bbox")), |
| region["region_type"], region.get("section"), region.get("subsection"), |
| region["content_text"], region.get("content_markdown"), |
| parse_method, region["parse_confidence"], |
| region["extraction_status"], |
| json.dumps(region.get("quality_flags", [])), |
| json.dumps(region.get("cross_refs", [])), |
| now_iso() |
| )) |
| |
| |
| avg_quality = sum(r["parse_confidence"] for r in regions) // max(len(regions), 1) |
| conn.execute(""" |
| UPDATE documents SET ingestion_status = 'complete', parse_method = ?, |
| parse_quality_avg = ?, total_regions = ?, created_at = ? |
| WHERE doc_id = ? |
| """, (parse_method, avg_quality, len(regions), now_iso(), doc_id)) |
| conn.commit() |
| conn.close() |
| |
| return { |
| "doc_id": doc_id, |
| "parse_method": parse_method, |
| "total_regions": len(regions), |
| "avg_quality": from_fixed(avg_quality), |
| "regions_by_type": self._count_by_type(regions), |
| "sections_found": list(set(r.get("section") for r in regions if r.get("section"))), |
| } |
| |
| def _parse_with_fitz(self, file_path: str, doc_id: str) -> tuple: |
| """Parse PDF using PyMuPDF (fitz) with section detection.""" |
| import fitz |
| doc = fitz.open(file_path) |
| regions = [] |
| current_section = None |
| |
| for page_num in range(len(doc)): |
| page = doc[page_num] |
| blocks = page.get_text("dict")["blocks"] |
| |
| for block in blocks: |
| if block["type"] == 0: |
| text = "" |
| for line in block.get("lines", []): |
| for span in line.get("spans", []): |
| text += span.get("text", "") |
| text += "\n" |
| |
| text = text.strip() |
| if not text or len(text) < 5: |
| continue |
| |
| |
| detected = detect_section(text) |
| if detected: |
| current_section = detected |
| |
| region_type = classify_region_type(text) |
| quality = score_parse_quality(text, "fitz") |
| cross_refs = extract_cross_references(text) |
| |
| |
| if quality >= 700: |
| status = "extractable" |
| elif quality >= 400: |
| status = "low_confidence" |
| else: |
| status = "unextractable" |
| |
| bbox = block.get("bbox", [0, 0, 0, 0]) |
| |
| regions.append({ |
| "region_id": gen_id("REG"), |
| "page": page_num + 1, |
| "bbox": list(bbox), |
| "region_type": region_type, |
| "section": current_section, |
| "subsection": None, |
| "content_text": text, |
| "content_markdown": text, |
| "parse_confidence": quality, |
| "extraction_status": status, |
| "quality_flags": [], |
| "cross_refs": cross_refs, |
| }) |
| |
| elif block["type"] == 1: |
| bbox = block.get("bbox", [0, 0, 0, 0]) |
| regions.append({ |
| "region_id": gen_id("REG"), |
| "page": page_num + 1, |
| "bbox": list(bbox), |
| "region_type": "figure", |
| "section": current_section, |
| "subsection": None, |
| "content_text": "[Image detected — requires VLM processing]", |
| "content_markdown": "", |
| "parse_confidence": 500, |
| "extraction_status": "low_confidence", |
| "quality_flags": ["image_region_needs_vlm"], |
| "cross_refs": [], |
| }) |
| |
| doc.close() |
| return regions, "fitz" |
| |
| def _parse_with_pdfplumber(self, file_path: str, doc_id: str) -> tuple: |
| """Fallback parser using pdfplumber.""" |
| import pdfplumber |
| regions = [] |
| current_section = None |
| |
| with pdfplumber.open(file_path) as pdf: |
| for page_num, page in enumerate(pdf.pages): |
| text = page.extract_text() |
| if not text or len(text.strip()) < 10: |
| continue |
| |
| |
| paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()] |
| |
| for para in paragraphs: |
| detected = detect_section(para) |
| if detected: |
| current_section = detected |
| |
| region_type = classify_region_type(para) |
| quality = score_parse_quality(para, "pdfplumber") |
| cross_refs = extract_cross_references(para) |
| |
| status = "extractable" if quality >= 700 else "low_confidence" if quality >= 400 else "unextractable" |
| |
| regions.append({ |
| "region_id": gen_id("REG"), |
| "page": page_num + 1, |
| "bbox": None, |
| "region_type": region_type, |
| "section": current_section, |
| "subsection": None, |
| "content_text": para, |
| "content_markdown": para, |
| "parse_confidence": quality, |
| "extraction_status": status, |
| "quality_flags": ["no_bbox_available"], |
| "cross_refs": cross_refs, |
| }) |
| |
| |
| tables = page.extract_tables() |
| for table in tables: |
| if not table: |
| continue |
| table_text = "\n".join([" | ".join([str(c) if c else "" for c in row]) for row in table]) |
| regions.append({ |
| "region_id": gen_id("REG"), |
| "page": page_num + 1, |
| "bbox": None, |
| "region_type": "table", |
| "section": current_section, |
| "subsection": None, |
| "content_text": table_text, |
| "content_markdown": table_text, |
| "parse_confidence": 700, |
| "extraction_status": "extractable", |
| "quality_flags": ["table_extracted"], |
| "cross_refs": [], |
| }) |
| |
| return regions, "pdfplumber" |
| |
| def _parse_tabular(self, file_path: str, doc_id: str) -> tuple: |
| """Parse CSV/Excel files as data regions.""" |
| regions = [] |
| try: |
| if file_path.endswith('.csv'): |
| with open(file_path) as f: |
| text = f.read() |
| else: |
| text = f"[Excel file: {os.path.basename(file_path)} — requires pandas for full parsing]" |
| |
| regions.append({ |
| "region_id": gen_id("REG"), |
| "page": 1, |
| "bbox": None, |
| "region_type": "table", |
| "section": "data", |
| "subsection": None, |
| "content_text": text[:10000], |
| "content_markdown": text[:10000], |
| "parse_confidence": 900, |
| "extraction_status": "extractable", |
| "quality_flags": ["tabular_data"], |
| "cross_refs": [], |
| }) |
| except Exception as e: |
| regions.append({ |
| "region_id": gen_id("REG"), |
| "page": 1, "bbox": None, "region_type": "body_text", |
| "section": None, "subsection": None, |
| "content_text": f"Error reading file: {e}", |
| "content_markdown": "", "parse_confidence": 0, |
| "extraction_status": "unextractable", |
| "quality_flags": ["parse_error"], "cross_refs": [], |
| }) |
| return regions, "tabular" |
| |
| def _parse_text(self, file_path: str, doc_id: str) -> tuple: |
| """Parse plain text or markdown files.""" |
| regions = [] |
| try: |
| with open(file_path, encoding='utf-8', errors='replace') as f: |
| text = f.read() |
| |
| paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()] |
| current_section = None |
| |
| for para in paragraphs: |
| detected = detect_section(para) |
| if detected: |
| current_section = detected |
| |
| regions.append({ |
| "region_id": gen_id("REG"), |
| "page": 1, "bbox": None, |
| "region_type": classify_region_type(para), |
| "section": current_section, "subsection": None, |
| "content_text": para, "content_markdown": para, |
| "parse_confidence": 900, |
| "extraction_status": "extractable", |
| "quality_flags": [], "cross_refs": extract_cross_references(para), |
| }) |
| except Exception as e: |
| regions.append({ |
| "region_id": gen_id("REG"), |
| "page": 1, "bbox": None, "region_type": "body_text", |
| "section": None, "subsection": None, |
| "content_text": f"Error: {e}", "content_markdown": "", |
| "parse_confidence": 0, "extraction_status": "unextractable", |
| "quality_flags": ["parse_error"], "cross_refs": [], |
| }) |
| return regions, "text" |
| |
| def _count_by_type(self, regions: list) -> dict: |
| counts = {} |
| for r in regions: |
| t = r["region_type"] |
| counts[t] = counts.get(t, 0) + 1 |
| return counts |
| |
| def get_extractable_regions(self, doc_id: str) -> list: |
| """Get all extractable regions for a document, ordered by section.""" |
| conn = get_db(self.db_path) |
| rows = conn.execute(""" |
| SELECT * FROM regions |
| WHERE doc_id = ? AND extraction_status = 'extractable' |
| AND region_type IN ('body_text', 'table', 'caption') |
| ORDER BY page, region_id |
| """, (doc_id,)).fetchall() |
| conn.close() |
| |
| results = [] |
| for r in rows: |
| d = dict(r) |
| d["cross_refs"] = json.loads(d.get("cross_refs", "[]")) |
| d["quality_flags"] = json.loads(d.get("quality_flags", "[]")) |
| d["bbox"] = json.loads(d["bbox"]) if d.get("bbox") else None |
| results.append(d) |
| return results |
| |
| def get_section_chunks(self, doc_id: str) -> list: |
| """ |
| Get section-aware chunks for extraction. |
| Merges consecutive body_text regions in the same section. |
| """ |
| regions = self.get_extractable_regions(doc_id) |
| chunks = [] |
| current_chunk = None |
| |
| for region in regions: |
| section = region.get("section") or "unknown" |
| |
| if (current_chunk and |
| current_chunk["section"] == section and |
| region["region_type"] == "body_text" and |
| len(current_chunk["text"]) < 3000): |
| |
| current_chunk["text"] += "\n\n" + region["content_text"] |
| current_chunk["region_ids"].append(region["region_id"]) |
| current_chunk["min_confidence"] = min( |
| current_chunk["min_confidence"], region["parse_confidence"] |
| ) |
| else: |
| |
| if current_chunk: |
| chunks.append(current_chunk) |
| current_chunk = { |
| "chunk_id": gen_id("CHK"), |
| "doc_id": doc_id, |
| "section": section, |
| "text": region["content_text"], |
| "region_ids": [region["region_id"]], |
| "page": region["page"], |
| "min_confidence": region["parse_confidence"], |
| "region_type": region["region_type"], |
| } |
| |
| if current_chunk: |
| chunks.append(current_chunk) |
| |
| return chunks |
|
|