Agent1 / evidence.py
CatoG
rework 2
61b5f58
"""
Tool-backed fact verification module for the multi-role AI workflow.
Provides:
- EvidenceItem / EvidenceResult: structured evidence objects
- ResearchToolAdapter: abstract interface for search tools
- WebSearchAdapter, WikipediaAdapter, ArxivAdapter: concrete implementations
- gather_evidence(): orchestrates multi-tool evidence retrieval
- check_claims_against_evidence(): validates factual claims
"""
import json
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
# ============================================================
# Structured Evidence Objects
# ============================================================
@dataclass
class EvidenceItem:
"""A single piece of retrieved evidence."""
title: str
source: str # "wikipedia" | "web_search" | "arxiv" | "internal"
snippet: str
url: str = ""
def to_dict(self) -> dict:
return {
"title": self.title,
"source": self.source,
"snippet": self.snippet,
"url": self.url,
}
@dataclass
class EvidenceResult:
"""Aggregated evidence for a research query."""
query: str
results: List[EvidenceItem] = field(default_factory=list)
summary: str = ""
confidence: str = "low" # "high" | "medium" | "low"
def to_dict(self) -> dict:
return {
"query": self.query,
"results": [r.to_dict() for r in self.results],
"summary": self.summary,
"confidence": self.confidence,
}
@property
def has_evidence(self) -> bool:
return len(self.results) > 0
def merge(self, other: "EvidenceResult"):
"""Merge another EvidenceResult into this one."""
self.results.extend(other.results)
if other.confidence == "high" or (other.confidence == "medium" and self.confidence == "low"):
self.confidence = other.confidence
# ============================================================
# Research Tool Adapter Interface
# ============================================================
class ResearchToolAdapter(ABC):
"""Abstract interface for a tool that retrieves evidence."""
@property
@abstractmethod
def name(self) -> str:
"""Human-readable name of this tool."""
@property
@abstractmethod
def source_type(self) -> str:
"""Source identifier for EvidenceItem.source."""
@abstractmethod
def search(self, query: str) -> List[EvidenceItem]:
"""Execute a search and return evidence items."""
class WebSearchAdapter(ResearchToolAdapter):
"""Adapter wrapping a DuckDuckGo-style web search function."""
def __init__(self, search_fn: Callable[[str], str]):
self._search_fn = search_fn
@property
def name(self) -> str:
return "Web Search"
@property
def source_type(self) -> str:
return "web_search"
def search(self, query: str) -> List[EvidenceItem]:
try:
raw = self._search_fn(query)
except Exception:
return []
if not raw or "unavailable" in raw.lower():
return []
return self._parse_ddg_results(raw)
@staticmethod
def _parse_ddg_results(text: str) -> List[EvidenceItem]:
"""Parse DuckDuckGo search text into evidence items."""
items = []
# DDG results typically come as a continuous text blob
# Split by common separators
chunks = re.split(r'\n(?=[A-Z])', text)
if len(chunks) <= 1:
# Single blob — treat whole thing as one result
items.append(EvidenceItem(
title="Web search result",
source="web_search",
snippet=text[:500],
))
else:
for chunk in chunks[:5]:
chunk = chunk.strip()
if not chunk:
continue
# Try to extract a title from the first line
lines = chunk.split("\n", 1)
title = lines[0][:100] if lines else "Web result"
snippet = lines[1][:300] if len(lines) > 1 else chunk[:300]
items.append(EvidenceItem(
title=title,
source="web_search",
snippet=snippet,
))
return items
class WikipediaAdapter(ResearchToolAdapter):
"""Adapter wrapping a Wikipedia search function."""
def __init__(self, search_fn: Callable[[str], str]):
self._search_fn = search_fn
@property
def name(self) -> str:
return "Wikipedia"
@property
def source_type(self) -> str:
return "wikipedia"
def search(self, query: str) -> List[EvidenceItem]:
try:
raw = self._search_fn(query)
except Exception:
return []
if not raw:
return []
# Wikipedia wrapper returns article content
return [EvidenceItem(
title=f"Wikipedia: {query}",
source="wikipedia",
snippet=raw[:600],
)]
class ArxivAdapter(ResearchToolAdapter):
"""Adapter wrapping an arXiv search function."""
def __init__(self, search_fn: Callable[[str], str]):
self._search_fn = search_fn
@property
def name(self) -> str:
return "arXiv"
@property
def source_type(self) -> str:
return "arxiv"
def search(self, query: str) -> List[EvidenceItem]:
try:
raw = self._search_fn(query)
except Exception:
return []
if not raw:
return []
items = []
# arXiv results typically list papers with titles and summaries
papers = re.split(r'(?:Published:|Title:)', raw)
for paper in papers[:3]:
paper = paper.strip()
if not paper:
continue
items.append(EvidenceItem(
title=paper[:100],
source="arxiv",
snippet=paper[:400],
))
if not items:
items.append(EvidenceItem(
title=f"arXiv: {query}",
source="arxiv",
snippet=raw[:500],
))
return items
# ============================================================
# Evidence Gathering Orchestrator
# ============================================================
def gather_evidence(
queries: List[str],
adapters: List[ResearchToolAdapter],
max_results_per_query: int = 5,
) -> EvidenceResult:
"""Run queries across all available adapters and aggregate results.
Args:
queries: Search queries to run.
adapters: Available research tool adapters.
max_results_per_query: Max evidence items to keep per query.
Returns:
Aggregated EvidenceResult with confidence assessment.
"""
combined = EvidenceResult(query="; ".join(queries))
for query in queries:
for adapter in adapters:
try:
items = adapter.search(query)
except Exception:
continue
for item in items[:max_results_per_query]:
combined.results.append(item)
# Assess confidence based on evidence quantity and diversity
sources = {r.source for r in combined.results}
if len(combined.results) >= 3 and len(sources) >= 2:
combined.confidence = "high"
elif len(combined.results) >= 1:
combined.confidence = "medium"
else:
combined.confidence = "low"
return combined
def extract_search_queries(user_request: str, plan_text: str = "") -> List[str]:
"""Extract meaningful search queries from the user request and plan.
Returns a list of 1-3 focused queries for evidence retrieval.
"""
queries = []
# The main user request is always a query (cleaned up)
cleaned = re.sub(r'\b(please|could you|can you|i want|give me|write)\b', '', user_request, flags=re.IGNORECASE)
cleaned = cleaned.strip()
if cleaned:
queries.append(cleaned[:200])
# Extract key noun phrases from the plan for additional queries
if plan_text:
# Look for KEY FINDINGS or TASK BREAKDOWN sections
for section in ("KEY FINDINGS:", "TASK BREAKDOWN:"):
if section in plan_text:
content = plan_text.split(section, 1)[1]
# Take the first substantial line
for line in content.split("\n"):
line = line.strip().lstrip("•-*1234567890. ")
if len(line) > 20 and line not in queries:
queries.append(line[:200])
break
return queries[:3]
# ============================================================
# Evidence-backed Claim Checking
# ============================================================
_SPECIFIC_CLAIM_PATTERNS = [
r'(?:according to|cited in|published in|reported by)\s+["\']?[A-Z]',
r'\b(?:study|research|paper|article|report)\s+(?:by|from|in)\s+[A-Z]',
r'\b(?:in|published)\s+\d{4}\b',
r'\b(?:Dr\.|Prof\.|Professor)\s+[A-Z][a-z]+',
r'"[^"]{10,}"', # Quoted text that might be a fake citation
r'\b\d+(?:\.\d+)?%\s+(?:of|increase|decrease|growth|decline)', # Specific statistics
]
def detect_unsupported_claims(text: str, evidence: EvidenceResult) -> List[str]:
"""Detect specific factual claims in text that aren't backed by evidence.
Returns a list of potentially unsupported claim excerpts.
"""
unsupported = []
evidence_text = " ".join(
f"{r.title} {r.snippet}" for r in evidence.results
).lower()
for pattern in _SPECIFIC_CLAIM_PATTERNS:
for match in re.finditer(pattern, text):
claim_context = text[max(0, match.start() - 30):match.end() + 30]
# Check if any key terms from the claim appear in evidence
claim_words = set(re.findall(r'\b[A-Z][a-z]{3,}\b', claim_context))
if claim_words:
matched = sum(1 for w in claim_words if w.lower() in evidence_text)
if matched < len(claim_words) * 0.3:
unsupported.append(claim_context.strip())
return unsupported[:5] # Limit to 5 most concerning
def format_evidence_for_prompt(evidence: EvidenceResult) -> str:
"""Format evidence into a concise string for injection into LLM prompts."""
if not evidence.has_evidence:
return "No evidence was retrieved. Avoid specific claims, citations, or examples."
lines = [f"RETRIEVED EVIDENCE (confidence: {evidence.confidence}):"]
for i, item in enumerate(evidence.results[:8], 1):
lines.append(f" [{i}] ({item.source}) {item.title}")
if item.snippet:
lines.append(f" {item.snippet[:200]}")
lines.append("")
lines.append(
"RULE: Only cite facts, examples, or statistics that appear in the evidence above. "
"If evidence is insufficient, give a general answer without fabricated specifics."
)
return "\n".join(lines)
def format_evidence_for_qa(evidence: EvidenceResult) -> str:
"""Format evidence context for QA validation."""
if not evidence.has_evidence:
return (
"EVIDENCE VALIDATION: No evidence was retrieved.\n"
"FAIL any answer that includes specific factual claims, named examples, "
"citations, case studies, or statistics not backed by retrieved evidence.\n"
"General reasoning and widely-known facts are acceptable."
)
lines = [
f"EVIDENCE VALIDATION: {len(evidence.results)} items retrieved "
f"(confidence: {evidence.confidence}).",
"Evidence sources: " + ", ".join(
sorted({r.source for r in evidence.results})
),
"RULE: FAIL if the answer includes specific claims, names, dates, or statistics "
"not supported by the retrieved evidence. General knowledge is acceptable.",
]
return "\n".join(lines)