""" Layer 0: Structural PDF Ingestion =================================== Converts PDF bundles into section-aware, bbox-annotated, quality-scored regions. Uses Marker as primary parser with fallback to pdfplumber. """ import json import os import re from pathlib import Path from typing import Optional from ..core.database import ( get_db, init_db, gen_id, now_iso, to_fixed, from_fixed, hash_text ) # Section detection patterns for scientific papers SECTION_PATTERNS = [ (r'(?i)^(abstract)\b', 'abstract'), (r'(?i)^(introduction)\b', 'introduction'), (r'(?i)^(background)\b', 'introduction'), (r'(?i)^(related\s+work)\b', 'related_work'), (r'(?i)^(materials?\s+and\s+methods?|methods?|experimental)\b', 'methods'), (r'(?i)^(results?\s+and\s+discussion)\b', 'results_discussion'), (r'(?i)^(results?)\b', 'results'), (r'(?i)^(discussion)\b', 'discussion'), (r'(?i)^(conclusions?|summary)\b', 'conclusion'), (r'(?i)^(acknowledge?ments?)\b', 'acknowledgments'), (r'(?i)^(references?|bibliography)\b', 'references'), (r'(?i)^(supplementary|supporting\s+information|appendix)\b', 'supplement'), ] def detect_section(text: str) -> Optional[str]: """Detect which section a text block belongs to.""" first_line = text.strip().split('\n')[0].strip() # Remove numbering like "2.1", "3.", "III.", "2.1 ", etc. first_line = re.sub(r'^[\d]+\.[\d]*\s*', '', first_line) first_line = re.sub(r'^[\d]+\.\s*', '', first_line) first_line = re.sub(r'^[IVXivx]+\.\s*', '', first_line) first_line = first_line.strip() for pattern, section in SECTION_PATTERNS: if re.match(pattern, first_line): return section return None def classify_region_type(text: str) -> str: """Classify a text block's type based on content patterns.""" stripped = text.strip() # Table detection if '|' in stripped and stripped.count('|') > 3: return 'table' if re.match(r'(?i)^table\s+\d', stripped): return 'caption' # Figure detection if re.match(r'(?i)^(figure|fig\.?)\s+\d', stripped): return 'caption' # Equation detection (LaTeX or heavy math symbols) if stripped.count('$') >= 2 or '\\frac' in stripped or '\\sum' in stripped: return 'equation' if re.match(r'^\s*\([\d]+\)\s*$', stripped): return 'equation' # Reference detection if re.match(r'(?i)^references?\s*$', stripped) or re.match(r'^\[\d+\]', stripped): return 'reference' # Header detection (short, possibly bold/caps) if len(stripped) < 100 and stripped.isupper(): return 'header' if len(stripped) < 80 and not stripped.endswith('.'): section = detect_section(stripped) if section: return 'header' return 'body_text' def extract_cross_references(text: str) -> list: """Extract in-text references to figures, tables, equations.""" refs = [] # Figure references for m in re.finditer(r'(?i)(figure|fig\.?)\s+(\d+[a-z]?)', text): refs.append({ "ref_text": m.group(0), "ref_type": "figure", "ref_number": m.group(2), "resolved_to": None, "verified": False, }) # Table references for m in re.finditer(r'(?i)(table)\s+(\d+[a-z]?)', text): refs.append({ "ref_text": m.group(0), "ref_type": "table", "ref_number": m.group(2), "resolved_to": None, "verified": False, }) # Equation references for m in re.finditer(r'(?i)(eq\.?|equation)\s+\(?(\d+)\)?', text): refs.append({ "ref_text": m.group(0), "ref_type": "equation", "ref_number": m.group(2), "resolved_to": None, "verified": False, }) # Citation references [1], [2,3], [1-5] for m in re.finditer(r'\[(\d+(?:[,\-–]\s*\d+)*)\]', text): refs.append({ "ref_text": m.group(0), "ref_type": "citation", "ref_number": m.group(1), "resolved_to": None, "verified": False, }) return refs def score_parse_quality(text: str, method: str) -> int: """Score parsing quality for a text region (fixed-point ×1000).""" score = 1000 # Start at perfect if not text or not text.strip(): return 0 # Penalize: garbled characters (common in bad OCR/parsing) garbled_chars = sum(1 for c in text if ord(c) > 65535 or c in '□■◊▪▫●○◆◇') garbled_ratio = garbled_chars / max(len(text), 1) score -= int(garbled_ratio * 3000) # Heavy penalty: even 10% garbled → -300 if garbled_chars > 0: score -= garbled_chars * 50 # Additional per-character penalty # Penalize: excessive whitespace (column merge artifact) ws_ratio = text.count(' ') / max(len(text), 1) score -= int(ws_ratio * 200) # Penalize: very short fragments (likely parsing artifact) if len(text.strip()) < 20: score -= 200 # Penalize: no sentence structure (no periods, likely garbled) if len(text) > 100 and '.' not in text: score -= 300 # Bonus: markdown structure preserved (Marker output) if method == 'marker' and '#' in text: score += 50 return max(0, min(1000, score)) class StructuralParser: """ Layer 0: Parse PDF bundles into annotated regions. Tries Marker first (best quality), falls back to pdfplumber/PyMuPDF. Every region gets: section tag, bbox, quality score, cross-references. """ def __init__(self, db_path: str = None): self.db_path = db_path or os.environ.get("RESEARCH_OS_DB", "data/research_os_v2.db") init_db(self.db_path) self._marker_available = None self._fitz_available = None self._pdfplumber_available = None def _check_marker(self) -> bool: if self._marker_available is None: try: import marker self._marker_available = True except ImportError: self._marker_available = False return self._marker_available def _check_fitz(self) -> bool: if self._fitz_available is None: try: import fitz self._fitz_available = True except ImportError: self._fitz_available = False return self._fitz_available def _check_pdfplumber(self) -> bool: if self._pdfplumber_available is None: try: import pdfplumber self._pdfplumber_available = True except ImportError: self._pdfplumber_available = False return self._pdfplumber_available def ingest_document(self, file_path: str, doc_type: str = "main", title: str = None, doi: str = None) -> dict: """ Ingest a single document. Returns ingestion summary. """ if not os.path.exists(file_path): return {"error": f"File not found: {file_path}", "doc_id": None} doc_id = gen_id("DOC") conn = get_db(self.db_path) # Create document record conn.execute(""" INSERT INTO documents (doc_id, file_path, doc_type, title, doi, ingestion_status, schema_version, created_at) VALUES (?, ?, ?, ?, ?, 'processing', '2.0', ?) """, (doc_id, file_path, doc_type, title, doi, now_iso())) conn.commit() # Parse based on available tools regions = [] parse_method = "unknown" if file_path.lower().endswith('.pdf'): if self._check_fitz(): regions, parse_method = self._parse_with_fitz(file_path, doc_id) elif self._check_pdfplumber(): regions, parse_method = self._parse_with_pdfplumber(file_path, doc_id) else: conn.execute( "UPDATE documents SET ingestion_status = 'failed' WHERE doc_id = ?", (doc_id,) ) conn.commit() conn.close() return {"error": "No PDF parser available. Install PyMuPDF: pip install pymupdf", "doc_id": doc_id} elif file_path.lower().endswith(('.csv', '.xlsx', '.xls')): regions, parse_method = self._parse_tabular(file_path, doc_id) elif file_path.lower().endswith(('.md', '.txt')): regions, parse_method = self._parse_text(file_path, doc_id) else: regions, parse_method = self._parse_text(file_path, doc_id) # Store regions for region in regions: conn.execute(""" INSERT INTO regions (region_id, doc_id, page, bbox, region_type, section, subsection, content_text, content_markdown, parse_method, parse_confidence, extraction_status, quality_flags, cross_refs, schema_version, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, '2.0', ?) """, ( region["region_id"], doc_id, region["page"], json.dumps(region.get("bbox")), region["region_type"], region.get("section"), region.get("subsection"), region["content_text"], region.get("content_markdown"), parse_method, region["parse_confidence"], region["extraction_status"], json.dumps(region.get("quality_flags", [])), json.dumps(region.get("cross_refs", [])), now_iso() )) # Update document status avg_quality = sum(r["parse_confidence"] for r in regions) // max(len(regions), 1) conn.execute(""" UPDATE documents SET ingestion_status = 'complete', parse_method = ?, parse_quality_avg = ?, total_regions = ?, created_at = ? WHERE doc_id = ? """, (parse_method, avg_quality, len(regions), now_iso(), doc_id)) conn.commit() conn.close() return { "doc_id": doc_id, "parse_method": parse_method, "total_regions": len(regions), "avg_quality": from_fixed(avg_quality), "regions_by_type": self._count_by_type(regions), "sections_found": list(set(r.get("section") for r in regions if r.get("section"))), } def _parse_with_fitz(self, file_path: str, doc_id: str) -> tuple: """Parse PDF using PyMuPDF (fitz) with section detection.""" import fitz doc = fitz.open(file_path) regions = [] current_section = None for page_num in range(len(doc)): page = doc[page_num] blocks = page.get_text("dict")["blocks"] for block in blocks: if block["type"] == 0: # Text block text = "" for line in block.get("lines", []): for span in line.get("spans", []): text += span.get("text", "") text += "\n" text = text.strip() if not text or len(text) < 5: continue # Detect section from headers detected = detect_section(text) if detected: current_section = detected region_type = classify_region_type(text) quality = score_parse_quality(text, "fitz") cross_refs = extract_cross_references(text) # Extraction status based on quality if quality >= 700: status = "extractable" elif quality >= 400: status = "low_confidence" else: status = "unextractable" bbox = block.get("bbox", [0, 0, 0, 0]) regions.append({ "region_id": gen_id("REG"), "page": page_num + 1, "bbox": list(bbox), "region_type": region_type, "section": current_section, "subsection": None, "content_text": text, "content_markdown": text, "parse_confidence": quality, "extraction_status": status, "quality_flags": [], "cross_refs": cross_refs, }) elif block["type"] == 1: # Image block bbox = block.get("bbox", [0, 0, 0, 0]) regions.append({ "region_id": gen_id("REG"), "page": page_num + 1, "bbox": list(bbox), "region_type": "figure", "section": current_section, "subsection": None, "content_text": "[Image detected — requires VLM processing]", "content_markdown": "![Figure](image)", "parse_confidence": 500, "extraction_status": "low_confidence", "quality_flags": ["image_region_needs_vlm"], "cross_refs": [], }) doc.close() return regions, "fitz" def _parse_with_pdfplumber(self, file_path: str, doc_id: str) -> tuple: """Fallback parser using pdfplumber.""" import pdfplumber regions = [] current_section = None with pdfplumber.open(file_path) as pdf: for page_num, page in enumerate(pdf.pages): text = page.extract_text() if not text or len(text.strip()) < 10: continue # Split into paragraphs paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()] for para in paragraphs: detected = detect_section(para) if detected: current_section = detected region_type = classify_region_type(para) quality = score_parse_quality(para, "pdfplumber") cross_refs = extract_cross_references(para) status = "extractable" if quality >= 700 else "low_confidence" if quality >= 400 else "unextractable" regions.append({ "region_id": gen_id("REG"), "page": page_num + 1, "bbox": None, "region_type": region_type, "section": current_section, "subsection": None, "content_text": para, "content_markdown": para, "parse_confidence": quality, "extraction_status": status, "quality_flags": ["no_bbox_available"], "cross_refs": cross_refs, }) # Extract tables tables = page.extract_tables() for table in tables: if not table: continue table_text = "\n".join([" | ".join([str(c) if c else "" for c in row]) for row in table]) regions.append({ "region_id": gen_id("REG"), "page": page_num + 1, "bbox": None, "region_type": "table", "section": current_section, "subsection": None, "content_text": table_text, "content_markdown": table_text, "parse_confidence": 700, "extraction_status": "extractable", "quality_flags": ["table_extracted"], "cross_refs": [], }) return regions, "pdfplumber" def _parse_tabular(self, file_path: str, doc_id: str) -> tuple: """Parse CSV/Excel files as data regions.""" regions = [] try: if file_path.endswith('.csv'): with open(file_path) as f: text = f.read() else: text = f"[Excel file: {os.path.basename(file_path)} — requires pandas for full parsing]" regions.append({ "region_id": gen_id("REG"), "page": 1, "bbox": None, "region_type": "table", "section": "data", "subsection": None, "content_text": text[:10000], "content_markdown": text[:10000], "parse_confidence": 900, "extraction_status": "extractable", "quality_flags": ["tabular_data"], "cross_refs": [], }) except Exception as e: regions.append({ "region_id": gen_id("REG"), "page": 1, "bbox": None, "region_type": "body_text", "section": None, "subsection": None, "content_text": f"Error reading file: {e}", "content_markdown": "", "parse_confidence": 0, "extraction_status": "unextractable", "quality_flags": ["parse_error"], "cross_refs": [], }) return regions, "tabular" def _parse_text(self, file_path: str, doc_id: str) -> tuple: """Parse plain text or markdown files.""" regions = [] try: with open(file_path, encoding='utf-8', errors='replace') as f: text = f.read() paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()] current_section = None for para in paragraphs: detected = detect_section(para) if detected: current_section = detected regions.append({ "region_id": gen_id("REG"), "page": 1, "bbox": None, "region_type": classify_region_type(para), "section": current_section, "subsection": None, "content_text": para, "content_markdown": para, "parse_confidence": 900, "extraction_status": "extractable", "quality_flags": [], "cross_refs": extract_cross_references(para), }) except Exception as e: regions.append({ "region_id": gen_id("REG"), "page": 1, "bbox": None, "region_type": "body_text", "section": None, "subsection": None, "content_text": f"Error: {e}", "content_markdown": "", "parse_confidence": 0, "extraction_status": "unextractable", "quality_flags": ["parse_error"], "cross_refs": [], }) return regions, "text" def _count_by_type(self, regions: list) -> dict: counts = {} for r in regions: t = r["region_type"] counts[t] = counts.get(t, 0) + 1 return counts def get_extractable_regions(self, doc_id: str) -> list: """Get all extractable regions for a document, ordered by section.""" conn = get_db(self.db_path) rows = conn.execute(""" SELECT * FROM regions WHERE doc_id = ? AND extraction_status = 'extractable' AND region_type IN ('body_text', 'table', 'caption') ORDER BY page, region_id """, (doc_id,)).fetchall() conn.close() results = [] for r in rows: d = dict(r) d["cross_refs"] = json.loads(d.get("cross_refs", "[]")) d["quality_flags"] = json.loads(d.get("quality_flags", "[]")) d["bbox"] = json.loads(d["bbox"]) if d.get("bbox") else None results.append(d) return results def get_section_chunks(self, doc_id: str) -> list: """ Get section-aware chunks for extraction. Merges consecutive body_text regions in the same section. """ regions = self.get_extractable_regions(doc_id) chunks = [] current_chunk = None for region in regions: section = region.get("section") or "unknown" if (current_chunk and current_chunk["section"] == section and region["region_type"] == "body_text" and len(current_chunk["text"]) < 3000): # Merge into current chunk current_chunk["text"] += "\n\n" + region["content_text"] current_chunk["region_ids"].append(region["region_id"]) current_chunk["min_confidence"] = min( current_chunk["min_confidence"], region["parse_confidence"] ) else: # Start new chunk if current_chunk: chunks.append(current_chunk) current_chunk = { "chunk_id": gen_id("CHK"), "doc_id": doc_id, "section": section, "text": region["content_text"], "region_ids": [region["region_id"]], "page": region["page"], "min_confidence": region["parse_confidence"], "region_type": region["region_type"], } if current_chunk: chunks.append(current_chunk) return chunks