nkshirsa's picture
v2.0: phd_research_os_v2/layer0/parser.py
08701e5 verified
"""
Layer 0: Structural PDF Ingestion
===================================
Converts PDF bundles into section-aware, bbox-annotated, quality-scored regions.
Uses Marker as primary parser with fallback to pdfplumber.
"""
import json
import os
import re
from pathlib import Path
from typing import Optional
from ..core.database import (
get_db, init_db, gen_id, now_iso, to_fixed, from_fixed, hash_text
)
# Section detection patterns for scientific papers
SECTION_PATTERNS = [
(r'(?i)^(abstract)\b', 'abstract'),
(r'(?i)^(introduction)\b', 'introduction'),
(r'(?i)^(background)\b', 'introduction'),
(r'(?i)^(related\s+work)\b', 'related_work'),
(r'(?i)^(materials?\s+and\s+methods?|methods?|experimental)\b', 'methods'),
(r'(?i)^(results?\s+and\s+discussion)\b', 'results_discussion'),
(r'(?i)^(results?)\b', 'results'),
(r'(?i)^(discussion)\b', 'discussion'),
(r'(?i)^(conclusions?|summary)\b', 'conclusion'),
(r'(?i)^(acknowledge?ments?)\b', 'acknowledgments'),
(r'(?i)^(references?|bibliography)\b', 'references'),
(r'(?i)^(supplementary|supporting\s+information|appendix)\b', 'supplement'),
]
def detect_section(text: str) -> Optional[str]:
"""Detect which section a text block belongs to."""
first_line = text.strip().split('\n')[0].strip()
# Remove numbering like "2.1", "3.", "III.", "2.1 ", etc.
first_line = re.sub(r'^[\d]+\.[\d]*\s*', '', first_line)
first_line = re.sub(r'^[\d]+\.\s*', '', first_line)
first_line = re.sub(r'^[IVXivx]+\.\s*', '', first_line)
first_line = first_line.strip()
for pattern, section in SECTION_PATTERNS:
if re.match(pattern, first_line):
return section
return None
def classify_region_type(text: str) -> str:
"""Classify a text block's type based on content patterns."""
stripped = text.strip()
# Table detection
if '|' in stripped and stripped.count('|') > 3:
return 'table'
if re.match(r'(?i)^table\s+\d', stripped):
return 'caption'
# Figure detection
if re.match(r'(?i)^(figure|fig\.?)\s+\d', stripped):
return 'caption'
# Equation detection (LaTeX or heavy math symbols)
if stripped.count('$') >= 2 or '\\frac' in stripped or '\\sum' in stripped:
return 'equation'
if re.match(r'^\s*\([\d]+\)\s*$', stripped):
return 'equation'
# Reference detection
if re.match(r'(?i)^references?\s*$', stripped) or re.match(r'^\[\d+\]', stripped):
return 'reference'
# Header detection (short, possibly bold/caps)
if len(stripped) < 100 and stripped.isupper():
return 'header'
if len(stripped) < 80 and not stripped.endswith('.'):
section = detect_section(stripped)
if section:
return 'header'
return 'body_text'
def extract_cross_references(text: str) -> list:
"""Extract in-text references to figures, tables, equations."""
refs = []
# Figure references
for m in re.finditer(r'(?i)(figure|fig\.?)\s+(\d+[a-z]?)', text):
refs.append({
"ref_text": m.group(0),
"ref_type": "figure",
"ref_number": m.group(2),
"resolved_to": None,
"verified": False,
})
# Table references
for m in re.finditer(r'(?i)(table)\s+(\d+[a-z]?)', text):
refs.append({
"ref_text": m.group(0),
"ref_type": "table",
"ref_number": m.group(2),
"resolved_to": None,
"verified": False,
})
# Equation references
for m in re.finditer(r'(?i)(eq\.?|equation)\s+\(?(\d+)\)?', text):
refs.append({
"ref_text": m.group(0),
"ref_type": "equation",
"ref_number": m.group(2),
"resolved_to": None,
"verified": False,
})
# Citation references [1], [2,3], [1-5]
for m in re.finditer(r'\[(\d+(?:[,\-–]\s*\d+)*)\]', text):
refs.append({
"ref_text": m.group(0),
"ref_type": "citation",
"ref_number": m.group(1),
"resolved_to": None,
"verified": False,
})
return refs
def score_parse_quality(text: str, method: str) -> int:
"""Score parsing quality for a text region (fixed-point ×1000)."""
score = 1000 # Start at perfect
if not text or not text.strip():
return 0
# Penalize: garbled characters (common in bad OCR/parsing)
garbled_chars = sum(1 for c in text if ord(c) > 65535 or c in '□■◊▪▫●○◆◇')
garbled_ratio = garbled_chars / max(len(text), 1)
score -= int(garbled_ratio * 3000) # Heavy penalty: even 10% garbled → -300
if garbled_chars > 0:
score -= garbled_chars * 50 # Additional per-character penalty
# Penalize: excessive whitespace (column merge artifact)
ws_ratio = text.count(' ') / max(len(text), 1)
score -= int(ws_ratio * 200)
# Penalize: very short fragments (likely parsing artifact)
if len(text.strip()) < 20:
score -= 200
# Penalize: no sentence structure (no periods, likely garbled)
if len(text) > 100 and '.' not in text:
score -= 300
# Bonus: markdown structure preserved (Marker output)
if method == 'marker' and '#' in text:
score += 50
return max(0, min(1000, score))
class StructuralParser:
"""
Layer 0: Parse PDF bundles into annotated regions.
Tries Marker first (best quality), falls back to pdfplumber/PyMuPDF.
Every region gets: section tag, bbox, quality score, cross-references.
"""
def __init__(self, db_path: str = None):
self.db_path = db_path or os.environ.get("RESEARCH_OS_DB", "data/research_os_v2.db")
init_db(self.db_path)
self._marker_available = None
self._fitz_available = None
self._pdfplumber_available = None
def _check_marker(self) -> bool:
if self._marker_available is None:
try:
import marker
self._marker_available = True
except ImportError:
self._marker_available = False
return self._marker_available
def _check_fitz(self) -> bool:
if self._fitz_available is None:
try:
import fitz
self._fitz_available = True
except ImportError:
self._fitz_available = False
return self._fitz_available
def _check_pdfplumber(self) -> bool:
if self._pdfplumber_available is None:
try:
import pdfplumber
self._pdfplumber_available = True
except ImportError:
self._pdfplumber_available = False
return self._pdfplumber_available
def ingest_document(self, file_path: str, doc_type: str = "main",
title: str = None, doi: str = None) -> dict:
"""
Ingest a single document. Returns ingestion summary.
"""
if not os.path.exists(file_path):
return {"error": f"File not found: {file_path}", "doc_id": None}
doc_id = gen_id("DOC")
conn = get_db(self.db_path)
# Create document record
conn.execute("""
INSERT INTO documents (doc_id, file_path, doc_type, title, doi,
ingestion_status, schema_version, created_at)
VALUES (?, ?, ?, ?, ?, 'processing', '2.0', ?)
""", (doc_id, file_path, doc_type, title, doi, now_iso()))
conn.commit()
# Parse based on available tools
regions = []
parse_method = "unknown"
if file_path.lower().endswith('.pdf'):
if self._check_fitz():
regions, parse_method = self._parse_with_fitz(file_path, doc_id)
elif self._check_pdfplumber():
regions, parse_method = self._parse_with_pdfplumber(file_path, doc_id)
else:
conn.execute(
"UPDATE documents SET ingestion_status = 'failed' WHERE doc_id = ?",
(doc_id,)
)
conn.commit()
conn.close()
return {"error": "No PDF parser available. Install PyMuPDF: pip install pymupdf", "doc_id": doc_id}
elif file_path.lower().endswith(('.csv', '.xlsx', '.xls')):
regions, parse_method = self._parse_tabular(file_path, doc_id)
elif file_path.lower().endswith(('.md', '.txt')):
regions, parse_method = self._parse_text(file_path, doc_id)
else:
regions, parse_method = self._parse_text(file_path, doc_id)
# Store regions
for region in regions:
conn.execute("""
INSERT INTO regions (region_id, doc_id, page, bbox, region_type,
section, subsection, content_text, content_markdown,
parse_method, parse_confidence, extraction_status,
quality_flags, cross_refs, schema_version, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, '2.0', ?)
""", (
region["region_id"], doc_id, region["page"],
json.dumps(region.get("bbox")),
region["region_type"], region.get("section"), region.get("subsection"),
region["content_text"], region.get("content_markdown"),
parse_method, region["parse_confidence"],
region["extraction_status"],
json.dumps(region.get("quality_flags", [])),
json.dumps(region.get("cross_refs", [])),
now_iso()
))
# Update document status
avg_quality = sum(r["parse_confidence"] for r in regions) // max(len(regions), 1)
conn.execute("""
UPDATE documents SET ingestion_status = 'complete', parse_method = ?,
parse_quality_avg = ?, total_regions = ?, created_at = ?
WHERE doc_id = ?
""", (parse_method, avg_quality, len(regions), now_iso(), doc_id))
conn.commit()
conn.close()
return {
"doc_id": doc_id,
"parse_method": parse_method,
"total_regions": len(regions),
"avg_quality": from_fixed(avg_quality),
"regions_by_type": self._count_by_type(regions),
"sections_found": list(set(r.get("section") for r in regions if r.get("section"))),
}
def _parse_with_fitz(self, file_path: str, doc_id: str) -> tuple:
"""Parse PDF using PyMuPDF (fitz) with section detection."""
import fitz
doc = fitz.open(file_path)
regions = []
current_section = None
for page_num in range(len(doc)):
page = doc[page_num]
blocks = page.get_text("dict")["blocks"]
for block in blocks:
if block["type"] == 0: # Text block
text = ""
for line in block.get("lines", []):
for span in line.get("spans", []):
text += span.get("text", "")
text += "\n"
text = text.strip()
if not text or len(text) < 5:
continue
# Detect section from headers
detected = detect_section(text)
if detected:
current_section = detected
region_type = classify_region_type(text)
quality = score_parse_quality(text, "fitz")
cross_refs = extract_cross_references(text)
# Extraction status based on quality
if quality >= 700:
status = "extractable"
elif quality >= 400:
status = "low_confidence"
else:
status = "unextractable"
bbox = block.get("bbox", [0, 0, 0, 0])
regions.append({
"region_id": gen_id("REG"),
"page": page_num + 1,
"bbox": list(bbox),
"region_type": region_type,
"section": current_section,
"subsection": None,
"content_text": text,
"content_markdown": text,
"parse_confidence": quality,
"extraction_status": status,
"quality_flags": [],
"cross_refs": cross_refs,
})
elif block["type"] == 1: # Image block
bbox = block.get("bbox", [0, 0, 0, 0])
regions.append({
"region_id": gen_id("REG"),
"page": page_num + 1,
"bbox": list(bbox),
"region_type": "figure",
"section": current_section,
"subsection": None,
"content_text": "[Image detected — requires VLM processing]",
"content_markdown": "![Figure](image)",
"parse_confidence": 500,
"extraction_status": "low_confidence",
"quality_flags": ["image_region_needs_vlm"],
"cross_refs": [],
})
doc.close()
return regions, "fitz"
def _parse_with_pdfplumber(self, file_path: str, doc_id: str) -> tuple:
"""Fallback parser using pdfplumber."""
import pdfplumber
regions = []
current_section = None
with pdfplumber.open(file_path) as pdf:
for page_num, page in enumerate(pdf.pages):
text = page.extract_text()
if not text or len(text.strip()) < 10:
continue
# Split into paragraphs
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
for para in paragraphs:
detected = detect_section(para)
if detected:
current_section = detected
region_type = classify_region_type(para)
quality = score_parse_quality(para, "pdfplumber")
cross_refs = extract_cross_references(para)
status = "extractable" if quality >= 700 else "low_confidence" if quality >= 400 else "unextractable"
regions.append({
"region_id": gen_id("REG"),
"page": page_num + 1,
"bbox": None,
"region_type": region_type,
"section": current_section,
"subsection": None,
"content_text": para,
"content_markdown": para,
"parse_confidence": quality,
"extraction_status": status,
"quality_flags": ["no_bbox_available"],
"cross_refs": cross_refs,
})
# Extract tables
tables = page.extract_tables()
for table in tables:
if not table:
continue
table_text = "\n".join([" | ".join([str(c) if c else "" for c in row]) for row in table])
regions.append({
"region_id": gen_id("REG"),
"page": page_num + 1,
"bbox": None,
"region_type": "table",
"section": current_section,
"subsection": None,
"content_text": table_text,
"content_markdown": table_text,
"parse_confidence": 700,
"extraction_status": "extractable",
"quality_flags": ["table_extracted"],
"cross_refs": [],
})
return regions, "pdfplumber"
def _parse_tabular(self, file_path: str, doc_id: str) -> tuple:
"""Parse CSV/Excel files as data regions."""
regions = []
try:
if file_path.endswith('.csv'):
with open(file_path) as f:
text = f.read()
else:
text = f"[Excel file: {os.path.basename(file_path)} — requires pandas for full parsing]"
regions.append({
"region_id": gen_id("REG"),
"page": 1,
"bbox": None,
"region_type": "table",
"section": "data",
"subsection": None,
"content_text": text[:10000],
"content_markdown": text[:10000],
"parse_confidence": 900,
"extraction_status": "extractable",
"quality_flags": ["tabular_data"],
"cross_refs": [],
})
except Exception as e:
regions.append({
"region_id": gen_id("REG"),
"page": 1, "bbox": None, "region_type": "body_text",
"section": None, "subsection": None,
"content_text": f"Error reading file: {e}",
"content_markdown": "", "parse_confidence": 0,
"extraction_status": "unextractable",
"quality_flags": ["parse_error"], "cross_refs": [],
})
return regions, "tabular"
def _parse_text(self, file_path: str, doc_id: str) -> tuple:
"""Parse plain text or markdown files."""
regions = []
try:
with open(file_path, encoding='utf-8', errors='replace') as f:
text = f.read()
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
current_section = None
for para in paragraphs:
detected = detect_section(para)
if detected:
current_section = detected
regions.append({
"region_id": gen_id("REG"),
"page": 1, "bbox": None,
"region_type": classify_region_type(para),
"section": current_section, "subsection": None,
"content_text": para, "content_markdown": para,
"parse_confidence": 900,
"extraction_status": "extractable",
"quality_flags": [], "cross_refs": extract_cross_references(para),
})
except Exception as e:
regions.append({
"region_id": gen_id("REG"),
"page": 1, "bbox": None, "region_type": "body_text",
"section": None, "subsection": None,
"content_text": f"Error: {e}", "content_markdown": "",
"parse_confidence": 0, "extraction_status": "unextractable",
"quality_flags": ["parse_error"], "cross_refs": [],
})
return regions, "text"
def _count_by_type(self, regions: list) -> dict:
counts = {}
for r in regions:
t = r["region_type"]
counts[t] = counts.get(t, 0) + 1
return counts
def get_extractable_regions(self, doc_id: str) -> list:
"""Get all extractable regions for a document, ordered by section."""
conn = get_db(self.db_path)
rows = conn.execute("""
SELECT * FROM regions
WHERE doc_id = ? AND extraction_status = 'extractable'
AND region_type IN ('body_text', 'table', 'caption')
ORDER BY page, region_id
""", (doc_id,)).fetchall()
conn.close()
results = []
for r in rows:
d = dict(r)
d["cross_refs"] = json.loads(d.get("cross_refs", "[]"))
d["quality_flags"] = json.loads(d.get("quality_flags", "[]"))
d["bbox"] = json.loads(d["bbox"]) if d.get("bbox") else None
results.append(d)
return results
def get_section_chunks(self, doc_id: str) -> list:
"""
Get section-aware chunks for extraction.
Merges consecutive body_text regions in the same section.
"""
regions = self.get_extractable_regions(doc_id)
chunks = []
current_chunk = None
for region in regions:
section = region.get("section") or "unknown"
if (current_chunk and
current_chunk["section"] == section and
region["region_type"] == "body_text" and
len(current_chunk["text"]) < 3000):
# Merge into current chunk
current_chunk["text"] += "\n\n" + region["content_text"]
current_chunk["region_ids"].append(region["region_id"])
current_chunk["min_confidence"] = min(
current_chunk["min_confidence"], region["parse_confidence"]
)
else:
# Start new chunk
if current_chunk:
chunks.append(current_chunk)
current_chunk = {
"chunk_id": gen_id("CHK"),
"doc_id": doc_id,
"section": section,
"text": region["content_text"],
"region_ids": [region["region_id"]],
"page": region["page"],
"min_confidence": region["parse_confidence"],
"region_type": region["region_type"],
}
if current_chunk:
chunks.append(current_chunk)
return chunks