| """ |
| Tool-backed fact verification module for the multi-role AI workflow. |
| |
| Provides: |
| - EvidenceItem / EvidenceResult: structured evidence objects |
| - ResearchToolAdapter: abstract interface for search tools |
| - WebSearchAdapter, WikipediaAdapter, ArxivAdapter: concrete implementations |
| - gather_evidence(): orchestrates multi-tool evidence retrieval |
| - check_claims_against_evidence(): validates factual claims |
| """ |
|
|
| import json |
| import re |
| from abc import ABC, abstractmethod |
| from dataclasses import dataclass, field |
| from typing import Any, Callable, Dict, List, Optional |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class EvidenceItem: |
| """A single piece of retrieved evidence.""" |
| title: str |
| source: str |
| snippet: str |
| url: str = "" |
|
|
| def to_dict(self) -> dict: |
| return { |
| "title": self.title, |
| "source": self.source, |
| "snippet": self.snippet, |
| "url": self.url, |
| } |
|
|
|
|
| @dataclass |
| class EvidenceResult: |
| """Aggregated evidence for a research query.""" |
| query: str |
| results: List[EvidenceItem] = field(default_factory=list) |
| summary: str = "" |
| confidence: str = "low" |
|
|
| def to_dict(self) -> dict: |
| return { |
| "query": self.query, |
| "results": [r.to_dict() for r in self.results], |
| "summary": self.summary, |
| "confidence": self.confidence, |
| } |
|
|
| @property |
| def has_evidence(self) -> bool: |
| return len(self.results) > 0 |
|
|
| def merge(self, other: "EvidenceResult"): |
| """Merge another EvidenceResult into this one.""" |
| self.results.extend(other.results) |
| if other.confidence == "high" or (other.confidence == "medium" and self.confidence == "low"): |
| self.confidence = other.confidence |
|
|
|
|
| |
| |
| |
|
|
| class ResearchToolAdapter(ABC): |
| """Abstract interface for a tool that retrieves evidence.""" |
|
|
| @property |
| @abstractmethod |
| def name(self) -> str: |
| """Human-readable name of this tool.""" |
|
|
| @property |
| @abstractmethod |
| def source_type(self) -> str: |
| """Source identifier for EvidenceItem.source.""" |
|
|
| @abstractmethod |
| def search(self, query: str) -> List[EvidenceItem]: |
| """Execute a search and return evidence items.""" |
|
|
|
|
| class WebSearchAdapter(ResearchToolAdapter): |
| """Adapter wrapping a DuckDuckGo-style web search function.""" |
|
|
| def __init__(self, search_fn: Callable[[str], str]): |
| self._search_fn = search_fn |
|
|
| @property |
| def name(self) -> str: |
| return "Web Search" |
|
|
| @property |
| def source_type(self) -> str: |
| return "web_search" |
|
|
| def search(self, query: str) -> List[EvidenceItem]: |
| try: |
| raw = self._search_fn(query) |
| except Exception: |
| return [] |
| if not raw or "unavailable" in raw.lower(): |
| return [] |
| return self._parse_ddg_results(raw) |
|
|
| @staticmethod |
| def _parse_ddg_results(text: str) -> List[EvidenceItem]: |
| """Parse DuckDuckGo search text into evidence items.""" |
| items = [] |
| |
| |
| chunks = re.split(r'\n(?=[A-Z])', text) |
| if len(chunks) <= 1: |
| |
| items.append(EvidenceItem( |
| title="Web search result", |
| source="web_search", |
| snippet=text[:500], |
| )) |
| else: |
| for chunk in chunks[:5]: |
| chunk = chunk.strip() |
| if not chunk: |
| continue |
| |
| lines = chunk.split("\n", 1) |
| title = lines[0][:100] if lines else "Web result" |
| snippet = lines[1][:300] if len(lines) > 1 else chunk[:300] |
| items.append(EvidenceItem( |
| title=title, |
| source="web_search", |
| snippet=snippet, |
| )) |
| return items |
|
|
|
|
| class WikipediaAdapter(ResearchToolAdapter): |
| """Adapter wrapping a Wikipedia search function.""" |
|
|
| def __init__(self, search_fn: Callable[[str], str]): |
| self._search_fn = search_fn |
|
|
| @property |
| def name(self) -> str: |
| return "Wikipedia" |
|
|
| @property |
| def source_type(self) -> str: |
| return "wikipedia" |
|
|
| def search(self, query: str) -> List[EvidenceItem]: |
| try: |
| raw = self._search_fn(query) |
| except Exception: |
| return [] |
| if not raw: |
| return [] |
| |
| return [EvidenceItem( |
| title=f"Wikipedia: {query}", |
| source="wikipedia", |
| snippet=raw[:600], |
| )] |
|
|
|
|
| class ArxivAdapter(ResearchToolAdapter): |
| """Adapter wrapping an arXiv search function.""" |
|
|
| def __init__(self, search_fn: Callable[[str], str]): |
| self._search_fn = search_fn |
|
|
| @property |
| def name(self) -> str: |
| return "arXiv" |
|
|
| @property |
| def source_type(self) -> str: |
| return "arxiv" |
|
|
| def search(self, query: str) -> List[EvidenceItem]: |
| try: |
| raw = self._search_fn(query) |
| except Exception: |
| return [] |
| if not raw: |
| return [] |
| items = [] |
| |
| papers = re.split(r'(?:Published:|Title:)', raw) |
| for paper in papers[:3]: |
| paper = paper.strip() |
| if not paper: |
| continue |
| items.append(EvidenceItem( |
| title=paper[:100], |
| source="arxiv", |
| snippet=paper[:400], |
| )) |
| if not items: |
| items.append(EvidenceItem( |
| title=f"arXiv: {query}", |
| source="arxiv", |
| snippet=raw[:500], |
| )) |
| return items |
|
|
|
|
| |
| |
| |
|
|
| def gather_evidence( |
| queries: List[str], |
| adapters: List[ResearchToolAdapter], |
| max_results_per_query: int = 5, |
| ) -> EvidenceResult: |
| """Run queries across all available adapters and aggregate results. |
| |
| Args: |
| queries: Search queries to run. |
| adapters: Available research tool adapters. |
| max_results_per_query: Max evidence items to keep per query. |
| |
| Returns: |
| Aggregated EvidenceResult with confidence assessment. |
| """ |
| combined = EvidenceResult(query="; ".join(queries)) |
|
|
| for query in queries: |
| for adapter in adapters: |
| try: |
| items = adapter.search(query) |
| except Exception: |
| continue |
| for item in items[:max_results_per_query]: |
| combined.results.append(item) |
|
|
| |
| sources = {r.source for r in combined.results} |
| if len(combined.results) >= 3 and len(sources) >= 2: |
| combined.confidence = "high" |
| elif len(combined.results) >= 1: |
| combined.confidence = "medium" |
| else: |
| combined.confidence = "low" |
|
|
| return combined |
|
|
|
|
| def extract_search_queries(user_request: str, plan_text: str = "") -> List[str]: |
| """Extract meaningful search queries from the user request and plan. |
| |
| Returns a list of 1-3 focused queries for evidence retrieval. |
| """ |
| queries = [] |
|
|
| |
| cleaned = re.sub(r'\b(please|could you|can you|i want|give me|write)\b', '', user_request, flags=re.IGNORECASE) |
| cleaned = cleaned.strip() |
| if cleaned: |
| queries.append(cleaned[:200]) |
|
|
| |
| if plan_text: |
| |
| for section in ("KEY FINDINGS:", "TASK BREAKDOWN:"): |
| if section in plan_text: |
| content = plan_text.split(section, 1)[1] |
| |
| for line in content.split("\n"): |
| line = line.strip().lstrip("•-*1234567890. ") |
| if len(line) > 20 and line not in queries: |
| queries.append(line[:200]) |
| break |
|
|
| return queries[:3] |
|
|
|
|
| |
| |
| |
|
|
| _SPECIFIC_CLAIM_PATTERNS = [ |
| r'(?:according to|cited in|published in|reported by)\s+["\']?[A-Z]', |
| r'\b(?:study|research|paper|article|report)\s+(?:by|from|in)\s+[A-Z]', |
| r'\b(?:in|published)\s+\d{4}\b', |
| r'\b(?:Dr\.|Prof\.|Professor)\s+[A-Z][a-z]+', |
| r'"[^"]{10,}"', |
| r'\b\d+(?:\.\d+)?%\s+(?:of|increase|decrease|growth|decline)', |
| ] |
|
|
|
|
| def detect_unsupported_claims(text: str, evidence: EvidenceResult) -> List[str]: |
| """Detect specific factual claims in text that aren't backed by evidence. |
| |
| Returns a list of potentially unsupported claim excerpts. |
| """ |
| unsupported = [] |
| evidence_text = " ".join( |
| f"{r.title} {r.snippet}" for r in evidence.results |
| ).lower() |
|
|
| for pattern in _SPECIFIC_CLAIM_PATTERNS: |
| for match in re.finditer(pattern, text): |
| claim_context = text[max(0, match.start() - 30):match.end() + 30] |
| |
| claim_words = set(re.findall(r'\b[A-Z][a-z]{3,}\b', claim_context)) |
| if claim_words: |
| matched = sum(1 for w in claim_words if w.lower() in evidence_text) |
| if matched < len(claim_words) * 0.3: |
| unsupported.append(claim_context.strip()) |
|
|
| return unsupported[:5] |
|
|
|
|
| def format_evidence_for_prompt(evidence: EvidenceResult) -> str: |
| """Format evidence into a concise string for injection into LLM prompts.""" |
| if not evidence.has_evidence: |
| return "No evidence was retrieved. Avoid specific claims, citations, or examples." |
|
|
| lines = [f"RETRIEVED EVIDENCE (confidence: {evidence.confidence}):"] |
| for i, item in enumerate(evidence.results[:8], 1): |
| lines.append(f" [{i}] ({item.source}) {item.title}") |
| if item.snippet: |
| lines.append(f" {item.snippet[:200]}") |
| lines.append("") |
| lines.append( |
| "RULE: Only cite facts, examples, or statistics that appear in the evidence above. " |
| "If evidence is insufficient, give a general answer without fabricated specifics." |
| ) |
| return "\n".join(lines) |
|
|
|
|
| def format_evidence_for_qa(evidence: EvidenceResult) -> str: |
| """Format evidence context for QA validation.""" |
| if not evidence.has_evidence: |
| return ( |
| "EVIDENCE VALIDATION: No evidence was retrieved.\n" |
| "FAIL any answer that includes specific factual claims, named examples, " |
| "citations, case studies, or statistics not backed by retrieved evidence.\n" |
| "General reasoning and widely-known facts are acceptable." |
| ) |
|
|
| lines = [ |
| f"EVIDENCE VALIDATION: {len(evidence.results)} items retrieved " |
| f"(confidence: {evidence.confidence}).", |
| "Evidence sources: " + ", ".join( |
| sorted({r.source for r in evidence.results}) |
| ), |
| "RULE: FAIL if the answer includes specific claims, names, dates, or statistics " |
| "not supported by the retrieved evidence. General knowledge is acceptable.", |
| ] |
| return "\n".join(lines) |
|
|