| """ |
| Layer 2: Qualified Extraction with AI Council |
| ================================================ |
| Extracts claims from parsed regions using the parallel-then-merge council. |
| Applies section-aware confidence modifiers. |
| All output constrained to valid schema. |
| """ |
|
|
| import json |
| import os |
| import re |
| from typing import Optional |
|
|
| from ..core.database import ( |
| get_db, init_db, gen_id, now_iso, to_fixed, from_fixed |
| ) |
|
|
| |
| SECTION_MODIFIERS = { |
| "abstract": 700, |
| "introduction": 800, |
| "related_work": 800, |
| "methods": 1000, |
| "results": 1000, |
| "results_discussion": 900, |
| "discussion": 750, |
| "conclusion": 800, |
| "supplement": 1000, |
| "unknown": 850, |
| None: 850, |
| } |
|
|
| VALID_TAGS = ["Fact", "Interpretation", "Hypothesis", "Conflict_Hypothesis"] |
|
|
| EXTRACTOR_PROMPT = """You are a scientific claim extractor for a PhD Research OS. |
| |
| Extract precise, atomic claims from the text. For EACH claim provide: |
| - text: The exact claim statement (preserve qualifiers like "may", "suggests", "not significant") |
| - epistemic_tag: One of [Fact, Interpretation, Hypothesis, Conflict_Hypothesis] |
| * Fact: Directly supported by quantitative data in THIS paper |
| * Interpretation: Author's explanation that goes beyond raw data |
| * Hypothesis: Untested proposal using "may", "could", "we propose" |
| * Conflict_Hypothesis: Explicitly contradicts another established finding |
| - confidence: Float 0.0-1.0 (how strong is the evidence FOR this specific claim) |
| - missing_fields: What would make this claim more complete (empty list if complete) |
| - status: "Complete" or "Incomplete" (Incomplete if missing_fields is non-empty) |
| - qualifiers: List of hedging words or conditions (e.g., ["in 10 mM PBS", "n=5", "not statistically significant"]) |
| - is_null_result: true if the claim reports a negative/null finding |
| - source_quote: The EXACT sentence from the text that supports this claim |
| |
| CRITICAL RULES: |
| 1. PRESERVE all qualifiers — "may", "suggests", "under these conditions", "not significant" |
| 2. If a result is NOT statistically significant, mark is_null_result=true |
| 3. If the text says "X causes Y", mark causal_direction as "causal_claim" |
| 4. If the text says "X is associated with Y", mark causal_direction as "observed_correlation" |
| |
| Output MUST be a valid JSON array. No markdown, no explanations.""" |
|
|
| CRITIC_PROMPT = """You are a critical reviewer for a PhD Research OS. |
| |
| Review extracted claims against the original text. Check: |
| 1. Missing important claims the extractor overlooked |
| 2. Incorrect epistemic tags (e.g., Interpretation tagged as Fact) |
| 3. Overly confident claims that should be Incomplete |
| 4. Dropped qualifiers (hedging words removed from claim text) |
| 5. Null results not flagged as is_null_result=true |
| 6. Causal claims from correlational data |
| |
| Output JSON: { |
| "feedback": "overall critique", |
| "missing_claims": ["claim text 1", ...], |
| "tag_corrections": {"0": "Interpretation", ...}, |
| "confidence_adjustments": {"0": 0.5, ...}, |
| "qualifier_additions": {"0": ["qualifier1"], ...}, |
| "null_result_flags": [0, 2] |
| }""" |
|
|
| CHAIRMAN_PROMPT = """You are the chairman of a scientific claim extraction council. |
| |
| You receive: original text, extracted claims, and critic feedback. |
| Synthesize into final claims applying these rules: |
| 1. Apply critic's tag corrections where justified |
| 2. Apply critic's confidence adjustments |
| 3. Add any missing claims the critic identified |
| 4. Apply 0.7 completeness penalty for claims with significant missing fields |
| 5. Ensure ALL qualifiers from source text are preserved |
| 6. Flag null results appropriately |
| |
| Output MUST be a valid JSON array of claims. No markdown.""" |
|
|
|
|
| class QualifiedExtractor: |
| """ |
| Layer 2: Extract claims using the AI Model Council. |
| |
| Pipeline: Extractor → Critic → Chairman (sequential for now, |
| upgrade to parallel-then-merge when multi-model serving available) |
| """ |
| |
| def __init__(self, db_path: str = None, brain=None): |
| self.db_path = db_path or os.environ.get("RESEARCH_OS_DB", "data/research_os_v2.db") |
| self.brain = brain |
| |
| def extract_from_chunk(self, chunk: dict, source_doi: str = None) -> list: |
| """ |
| Extract claims from a single section-aware chunk. |
| Returns list of claim dicts ready for DB insertion. |
| """ |
| text = chunk.get("text", "") |
| section = chunk.get("section", "unknown") |
| page = chunk.get("page", 0) |
| parse_confidence = chunk.get("min_confidence", 1000) |
| |
| if not text or len(text.strip()) < 50: |
| return [] |
| |
| |
| if self.brain: |
| raw_claims = self._extract_with_brain(text, section) |
| else: |
| raw_claims = self._extract_mock(text, section) |
| |
| |
| claims = [] |
| section_mod = SECTION_MODIFIERS.get(section, 850) |
| |
| for i, raw in enumerate(raw_claims): |
| if not isinstance(raw, dict) or not raw.get("text"): |
| continue |
| |
| |
| tag = raw.get("epistemic_tag", "Interpretation") |
| if tag not in VALID_TAGS: |
| tag = "Interpretation" |
| |
| |
| if section == "abstract" and tag == "Fact": |
| tag = "Interpretation" |
| |
| |
| evidence_strength = to_fixed(min(1.0, max(0.0, float(raw.get("confidence", 0.5))))) |
| missing = raw.get("missing_fields", []) |
| if not isinstance(missing, list): |
| missing = [] |
| completeness = 700 if missing else 1000 |
| |
| qualifiers = raw.get("qualifiers", []) |
| if not isinstance(qualifiers, list): |
| qualifiers = [] |
| qualifier_penalty = max(500, 1000 - len(qualifiers) * 100) |
| |
| |
| is_null = bool(raw.get("is_null_result", False)) |
| status = "Complete" if not missing else "Incomplete" |
| |
| |
| |
| composite = (evidence_strength * section_mod // 1000 |
| * completeness // 1000 |
| * qualifier_penalty // 1000) |
| |
| |
| composite = min(composite, parse_confidence) |
| |
| claim = { |
| "claim_id": gen_id("CLM"), |
| "text": str(raw.get("text", "")), |
| "epistemic_tag": tag, |
| "evidence_strength": evidence_strength, |
| "section_modifier": section_mod, |
| "completeness_penalty": completeness, |
| "qualifier_penalty": qualifier_penalty, |
| "composite_confidence": composite, |
| "status": status, |
| "is_null_result": is_null, |
| "is_inherited_citation": bool(raw.get("is_inherited_citation", False)), |
| "causal_direction": raw.get("causal_direction", "unspecified"), |
| "qualifiers": qualifiers, |
| "missing_fields": missing, |
| "source_quote": raw.get("source_quote", ""), |
| "source_page": page, |
| "source_section": section, |
| "source_doc_id": chunk.get("doc_id"), |
| "source_doi": source_doi, |
| "source_region_id": (chunk.get("region_ids") or [None])[0], |
| "extraction_timestamp": now_iso(), |
| } |
| claims.append(claim) |
| |
| return claims |
| |
| def extract_from_document(self, doc_id: str, source_doi: str = None) -> dict: |
| """ |
| Extract claims from all chunks of a document. |
| Uses Layer 0's section-aware chunking. |
| """ |
| from ..layer0.parser import StructuralParser |
| parser = StructuralParser(self.db_path) |
| chunks = parser.get_section_chunks(doc_id) |
| |
| all_claims = [] |
| section_stats = {} |
| |
| for chunk in chunks: |
| claims = self.extract_from_chunk(chunk, source_doi) |
| all_claims.extend(claims) |
| |
| section = chunk.get("section", "unknown") |
| section_stats[section] = section_stats.get(section, 0) + len(claims) |
| |
| |
| conn = get_db(self.db_path) |
| for claim in all_claims: |
| conn.execute(""" |
| INSERT INTO claims (claim_id, text, epistemic_tag, |
| evidence_strength, section_modifier, completeness_penalty, |
| qualifier_penalty, composite_confidence, |
| status, is_null_result, is_inherited_citation, causal_direction, |
| qualifiers, missing_fields, source_quote, source_page, |
| source_section, source_doc_id, source_doi, source_region_id, |
| extraction_timestamp, pipeline_version, |
| schema_version, created_at, updated_at) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, '2.0', ?, ?) |
| """, ( |
| claim["claim_id"], claim["text"], claim["epistemic_tag"], |
| claim["evidence_strength"], claim["section_modifier"], |
| claim["completeness_penalty"], claim["qualifier_penalty"], |
| claim["composite_confidence"], |
| claim["status"], claim["is_null_result"], |
| claim["is_inherited_citation"], claim["causal_direction"], |
| json.dumps(claim["qualifiers"]), json.dumps(claim["missing_fields"]), |
| claim.get("source_quote"), claim.get("source_page"), |
| claim.get("source_section"), claim.get("source_doc_id"), |
| claim.get("source_doi"), claim.get("source_region_id"), |
| claim.get("extraction_timestamp"), "2.1.0", |
| now_iso(), now_iso() |
| )) |
| conn.commit() |
| conn.close() |
| |
| return { |
| "doc_id": doc_id, |
| "total_claims": len(all_claims), |
| "section_distribution": section_stats, |
| "epistemic_distribution": self._count_tags(all_claims), |
| "null_results": sum(1 for c in all_claims if c["is_null_result"]), |
| "incomplete": sum(1 for c in all_claims if c["status"] == "Incomplete"), |
| "avg_confidence": (sum(c["composite_confidence"] for c in all_claims) // max(len(all_claims), 1)), |
| } |
| |
| def _extract_with_brain(self, text: str, section: str) -> list: |
| """Extract using the AI brain (local or API model).""" |
| messages = [ |
| {"role": "system", "content": EXTRACTOR_PROMPT}, |
| {"role": "user", "content": f"Section: {section}\n\nText:\n{text}"} |
| ] |
| |
| try: |
| if hasattr(self.brain, '_generate_local') and self.brain.backend == "local": |
| raw = self.brain._generate_local(messages) |
| elif hasattr(self.brain, '_generate_api'): |
| raw = self.brain._generate_api(messages) |
| else: |
| return self._extract_mock(text, section) |
| |
| |
| text_clean = raw.strip() |
| if text_clean.startswith("```"): |
| parts = text_clean.split("```") |
| text_clean = parts[1] if len(parts) > 1 else text_clean |
| if text_clean.startswith("json"): |
| text_clean = text_clean[4:] |
| text_clean = text_clean.strip() |
| |
| data = json.loads(text_clean) |
| return data if isinstance(data, list) else [data] |
| except Exception: |
| return self._extract_mock(text, section) |
| |
| def _extract_mock(self, text: str, section: str) -> list: |
| """Mock extraction when no brain is available. Produces structurally valid output.""" |
| |
| sentences = [s.strip() for s in re.split(r'[.!?]\s+', text) if len(s.strip()) > 30] |
| |
| claims = [] |
| for i, sent in enumerate(sentences[:5]): |
| |
| lower = sent.lower() |
| |
| if any(w in lower for w in ["measured", "found", "detected", "achieved", "showed"]): |
| tag = "Fact" |
| confidence = 0.7 |
| elif any(w in lower for w in ["suggest", "indicate", "consistent with", "interpret"]): |
| tag = "Interpretation" |
| confidence = 0.5 |
| elif any(w in lower for w in ["may", "could", "hypothesize", "propose", "possible"]): |
| tag = "Hypothesis" |
| confidence = 0.3 |
| elif any(w in lower for w in ["contradict", "unlike", "contrary"]): |
| tag = "Conflict_Hypothesis" |
| confidence = 0.4 |
| else: |
| tag = "Interpretation" |
| confidence = 0.5 |
| |
| |
| qualifiers = [] |
| for q in ["may", "might", "could", "suggests", "possibly", "not significant", |
| "under these conditions", "in vitro", "preliminary"]: |
| if q in lower: |
| qualifiers.append(q) |
| |
| is_null = any(w in lower for w in ["not significant", "no effect", "no difference", |
| "failed to", "did not"]) |
| |
| claims.append({ |
| "text": sent + ".", |
| "epistemic_tag": tag, |
| "confidence": confidence, |
| "missing_fields": [], |
| "status": "Complete", |
| "qualifiers": qualifiers, |
| "is_null_result": is_null, |
| "is_inherited_citation": "[" in sent and "]" in sent, |
| "causal_direction": "causal_claim" if "cause" in lower else "observed_correlation" if "correlat" in lower else "unspecified", |
| "source_quote": sent + ".", |
| }) |
| |
| return claims |
| |
| def _count_tags(self, claims: list) -> dict: |
| counts = {} |
| for c in claims: |
| tag = c.get("epistemic_tag", "unknown") |
| counts[tag] = counts.get(tag, 0) + 1 |
| return counts |
|
|