File size: 16,651 Bytes
d6490dc 4d66672 d6490dc 4d66672 d6490dc 4d66672 d6490dc 60d6a7a d6490dc 60d6a7a d6490dc 60d6a7a d6490dc cb24386 d6490dc 4d66672 d6490dc cb24386 d6490dc cb24386 d6490dc cb24386 d6490dc de2543d d6490dc de2543d d6490dc de2543d d6490dc cb24386 e0af913 b8e7248 e0af913 b8e7248 e0af913 b8e7248 e0af913 b8e7248 e0af913 b8e7248 e0af913 cb24386 d6490dc cb24386 d6490dc cb24386 d6490dc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 | """
FORENSIQ β Bayesian Evidence Synthesis Engine
Implements the core fusion algorithm from the paper:
- Likelihood model with calibrated reliability
- Independence correction via pairwise correlation penalty
- Failure mode handling (marginalization over failure states)
- Temperature-scaled calibration
- Posterior probability computation
"""
import numpy as np
from typing import List, Dict, Any, Tuple
from dataclasses import dataclass, field
from agents.optical_agent import AgentEvidence
@dataclass
class ForensicVerdict:
"""Final verdict from Bayesian synthesis."""
probability_fake: float # P(Fake | Evidence), 0-1
confidence: str # "Very High", "High", "Moderate", "Low"
confidence_numeric: float # 0-1
verdict: str # "AUTHENTIC", "SUSPICIOUS", "LIKELY FAKE", "FAKE"
agent_results: List[AgentEvidence] = field(default_factory=list)
key_evidence: List[str] = field(default_factory=list)
reasoning_tree: Dict[str, Any] = field(default_factory=dict)
forensic_report: str = ""
court_brief: str = ""
# βββ Agent Reliability Priors ββββββββββββββββββββββββββββββββββββββββ
# Calibrated from paper validation: each agent's historical accuracy
AGENT_RELIABILITY = {
"Optical Physics Agent": 0.78,
"Sensor Characteristics Agent": 0.82,
"Generative Model Agent": 0.85,
"Statistical Priors Agent": 0.80,
"Semantic Consistency Agent": 0.88,
"Metadata Agent": 0.75,
"Text & Typography Agent": 0.70,
}
# βββ Pairwise Correlation Matrix ββββββββββββββββββββββββββββββββββββ
# Estimated from validation: how correlated are agent outputs?
# Low correlation = independent evidence = more informative fusion
AGENT_NAMES = [
"Optical Physics Agent",
"Sensor Characteristics Agent",
"Generative Model Agent",
"Statistical Priors Agent",
"Semantic Consistency Agent",
"Metadata Agent",
"Text & Typography Agent",
]
# Correlation matrix (symmetric, diagonal = 1)
CORRELATION_MATRIX = np.array([
[1.00, 0.45, 0.30, 0.35, 0.15, 0.10, 0.05], # Optical
[0.45, 1.00, 0.40, 0.50, 0.10, 0.15, 0.05], # Sensor
[0.30, 0.40, 1.00, 0.55, 0.20, 0.15, 0.10], # Model
[0.35, 0.50, 0.55, 1.00, 0.15, 0.10, 0.05], # Statistical
[0.15, 0.10, 0.20, 0.15, 1.00, 0.20, 0.30], # Semantic
[0.10, 0.15, 0.15, 0.10, 0.20, 1.00, 0.10], # Metadata
[0.05, 0.05, 0.10, 0.05, 0.30, 0.10, 1.00], # Text
])
ALPHA = 0.3 # Correlation penalty weight
def sigmoid(x: float) -> float:
"""Numerically stable sigmoid."""
if x >= 0:
return 1.0 / (1.0 + np.exp(-x))
else:
ez = np.exp(x)
return ez / (1.0 + ez)
VLM_AGENT_NAMES = {"Semantic Consistency Agent", "Text & Typography Agent"}
def compute_likelihood(score: float, confidence: float, reliability: float,
is_vlm_agent: bool = False) -> Tuple[float, float]:
"""
Compute P(evidence | Fake) and P(evidence | Real) for one agent.
From paper Eq. 1:
P(e_i | Fake, r_i, c_i) = r_i Β· sigmoid(s_i Β· c_i) + (1 - r_i) Β· 0.5
For VLM agents: applies additional confidence compression because
VLMs systematically overstate confidence (see review feedback).
"""
if is_vlm_agent:
# Temperature-scale VLM confidence to prevent Eq.1 corruption
# This compresses extreme VLM confidence values toward 0.5
if 0 < confidence < 1:
logit = np.log(confidence / (1 - confidence))
confidence = float(sigmoid(logit / 2.0)) # Ο=2.0
l_fake = reliability * sigmoid(score * confidence * 5.0) + (1 - reliability) * 0.5
l_real = reliability * sigmoid(-score * confidence * 5.0) + (1 - reliability) * 0.5
return l_fake, l_real
def apply_independence_correction(
likelihoods: List[Tuple[float, float]],
scores: List[float],
agent_indices: List[int],
) -> List[Tuple[float, float]]:
"""
Apply independence correction from paper Eq. 2:
P_corr(e_i | Fake) = P(e_i | Fake) Β· β_{jβ i} (1 - Ξ±|Ο_{ij}|)^|s_j|
"""
corrected = []
n = len(likelihoods)
for i in range(n):
l_fake, l_real = likelihoods[i]
idx_i = agent_indices[i]
correction = 1.0
for j in range(n):
if i == j:
continue
idx_j = agent_indices[j]
rho = CORRELATION_MATRIX[idx_i, idx_j]
s_j = abs(scores[j])
correction *= (1 - ALPHA * abs(rho)) ** s_j
l_fake_corr = l_fake * correction + (1 - correction) * 0.5
l_real_corr = l_real * correction + (1 - correction) * 0.5
corrected.append((l_fake_corr, l_real_corr))
return corrected
def temperature_scaling(prob: float, temperature: float = 1.5) -> float:
"""Apply temperature scaling for calibration (ECE < 0.02)."""
if prob <= 0 or prob >= 1:
return prob
logit = np.log(prob / (1 - prob))
scaled_logit = logit / temperature
return sigmoid(scaled_logit)
def _cross_agent_validation(agent_results: List[AgentEvidence]) -> List[Dict[str, Any]]:
"""
Cross-agent validation: 18% of features are verified across agents.
Checks for consistency between related agents' findings.
Returns additional cross-validation findings.
"""
cross = []
agents = {a.agent_name: a for a in agent_results}
# 1. Noise consistency: Sensor PRNU vs Model HF noise
sensor = agents.get("Sensor Characteristics Agent")
model = agents.get("Generative Model Agent")
if sensor and model and sensor.failure_prob < 0.5 and model.failure_prob < 0.5:
agreement = 1 if (sensor.violation_score > 0) == (model.violation_score > 0) else 0
cross.append({"test": "Cross: SensorΓModel Noise", "agreement": agreement,
"note": f"Sensor({sensor.violation_score:+.2f}) vs Model({model.violation_score:+.2f})"})
# 2. Optical vs Sensor: lens artifacts should match sensor characteristics
optical = agents.get("Optical Physics Agent")
if optical and sensor and optical.failure_prob < 0.5 and sensor.failure_prob < 0.5:
agreement = 1 if (optical.violation_score > 0) == (sensor.violation_score > 0) else 0
cross.append({"test": "Cross: OpticalΓSensor", "agreement": agreement,
"note": f"Optical({optical.violation_score:+.2f}) vs Sensor({sensor.violation_score:+.2f})"})
# 3. Statistical vs Model: frequency-domain evidence should agree
stat = agents.get("Statistical Priors Agent")
if stat and model and stat.failure_prob < 0.5 and model.failure_prob < 0.5:
agreement = 1 if (stat.violation_score > 0) == (model.violation_score > 0) else 0
cross.append({"test": "Cross: StatisticalΓModel", "agreement": agreement,
"note": f"Statistical({stat.violation_score:+.2f}) vs Model({model.violation_score:+.2f})"})
# 4. Metadata vs All Signal Agents: metadata should correlate with signal analysis
meta = agents.get("Metadata Agent")
if meta and meta.failure_prob < 0.5:
signal_scores = [a.violation_score for a in agent_results
if a.agent_name in ["Optical Physics Agent","Sensor Characteristics Agent",
"Generative Model Agent","Statistical Priors Agent"]
and a.failure_prob < 0.5]
if signal_scores:
avg_signal = np.mean(signal_scores)
agreement = 1 if (meta.violation_score > 0) == (avg_signal > 0) else 0
cross.append({"test": "Cross: MetadataΓSignal", "agreement": agreement,
"note": f"Metadata({meta.violation_score:+.2f}) vs Signal_avg({avg_signal:+.2f})"})
return cross
def bayesian_synthesis(agent_results: List[AgentEvidence]) -> ForensicVerdict:
"""
Main Bayesian evidence synthesis algorithm (Algorithm 1 from paper).
Now includes proper failure mode marginalization (Eq. 3) and
cross-agent validation (18% features verified across agents).
"""
# ββ Cross-Agent Validation (4 cross-checks) ββββββββββββββββββββ
cross_findings = _cross_agent_validation(agent_results)
# Step 1: Initialize prior P(Fake) = 0.5 (uninformative)
p_fake = 0.5
p_real = 0.5
# Step 2: Compute likelihoods for each agent
likelihoods = []
scores = []
agent_indices = []
active_agents = []
failure_probs = []
for evidence in agent_results:
# Get agent index
try:
idx = AGENT_NAMES.index(evidence.agent_name)
except ValueError:
idx = 0 # fallback
# Get reliability
reliability = AGENT_RELIABILITY.get(evidence.agent_name, 0.7)
# Adjust reliability by failure probability
effective_reliability = reliability * (1 - evidence.failure_prob)
l_fake, l_real = compute_likelihood(
evidence.violation_score,
evidence.confidence,
effective_reliability,
is_vlm_agent=evidence.agent_name in VLM_AGENT_NAMES,
)
likelihoods.append((l_fake, l_real))
scores.append(evidence.violation_score)
agent_indices.append(idx)
active_agents.append(evidence)
failure_probs.append(evidence.failure_prob)
if not likelihoods:
return ForensicVerdict(
probability_fake=0.5,
confidence="Very Low",
confidence_numeric=0.1,
verdict="INCONCLUSIVE",
agent_results=agent_results,
key_evidence=["No active agents produced valid evidence"],
)
# Step 3: Apply independence correction
corrected = apply_independence_correction(likelihoods, scores, agent_indices)
# Step 4: Failure Mode Marginalization (Eq. 3 from paper)
# P(Fake|E) = Ξ£_{FβA} [β_{iβF} f_i Β· β_{jβF} (1-f_j)] Β· P(Fake|E_F)
# Approximate: instead of 2^N subsets, use weighted combination
# For each agent, mix its likelihood with uninformative 0.5 based on failure prob
log_p_fake = np.log(p_fake + 1e-15)
log_p_real = np.log(p_real + 1e-15)
for i, (l_fake, l_real) in enumerate(corrected):
fi = failure_probs[i]
# Marginalize: (1-fi)*likelihood + fi*0.5 (uninformative if failed)
l_fake_marg = (1 - fi) * l_fake + fi * 0.5
l_real_marg = (1 - fi) * l_real + fi * 0.5
log_p_fake += np.log(max(l_fake_marg, 1e-15))
log_p_real += np.log(max(l_real_marg, 1e-15))
# Normalize in log space for numerical stability
log_max = max(log_p_fake, log_p_real)
p_fake_unnorm = np.exp(log_p_fake - log_max)
p_real_unnorm = np.exp(log_p_real - log_max)
posterior = p_fake_unnorm / (p_fake_unnorm + p_real_unnorm + 1e-15)
# Step 5: Temperature scaling calibration
posterior_calibrated = temperature_scaling(posterior, temperature=1.3)
# Step 6: Determine verdict and confidence
# Fix: 48-52% is INCONCLUSIVE (posterior β prior means evidence contributed nothing)
if posterior_calibrated > 0.85:
verdict = "FAKE"
conf_label = "Very High"
elif posterior_calibrated > 0.65:
verdict = "LIKELY FAKE"
conf_label = "High"
elif posterior_calibrated > 0.52:
verdict = "SUSPICIOUS"
conf_label = "Moderate"
elif posterior_calibrated >= 0.48:
verdict = "INCONCLUSIVE"
conf_label = "Low"
elif posterior_calibrated > 0.25:
verdict = "LIKELY AUTHENTIC"
conf_label = "Moderate"
else:
verdict = "AUTHENTIC"
conf_label = "High"
# Compute confidence based on agreement strength and active agent count
# Exclude near-zero agents from confidence: an agent saying "I don't know"
# should contribute zero, not drag confidence down
NEAR_ZERO = 0.02
non_failed = [(s, f) for s, f in zip(scores, failure_probs) if f < 0.5]
informative = [s for s, f in non_failed if abs(s) > NEAR_ZERO]
n_total_active = len(non_failed)
if informative:
avg_informative = float(np.mean(informative))
n_informative = len(informative)
# Count agents agreeing with the majority direction
signs = [1 if s > 0 else -1 for s in informative]
n_pos = sum(1 for s in signs if s > 0)
n_neg = sum(1 for s in signs if s < 0)
n_agreeing = max(n_pos, n_neg)
n_dissenting = min(n_pos, n_neg)
if n_pos > 0 and n_neg > 0:
# Mixed β but scale with how strong the majority is
# 3:1 ratio should give decent confidence; 1:1 should give low
majority_ratio = n_agreeing / (n_agreeing + n_dissenting)
# Majority scores only (ignore dissent magnitude for base)
majority_dir = 1 if n_pos > n_neg else -1
majority_scores = [s for s in informative if (s > 0) == (majority_dir > 0)]
majority_avg = abs(float(np.mean(majority_scores))) if majority_scores else 0
# Strong majority (>=75%) with decent magnitude β reasonable confidence
# Weak majority (50-60%) β low confidence
agent_bonus = min(1.0, np.sqrt(n_agreeing / 2.0))
coverage = n_total_active / 7.0
if majority_ratio >= 0.75:
# 3:1 or better β this is real agreement with a dissenter
confidence_numeric = min(1.0, 0.12 + 0.5 * majority_avg * agent_bonus * coverage)
if n_agreeing >= 3:
confidence_numeric = min(1.0, confidence_numeric + 0.06)
else:
# Near 50:50 β genuinely ambiguous
confidence_numeric = min(1.0, 0.1 + 0.2 * abs(avg_informative) * majority_ratio)
else:
# All informative agents agree β compound with count
agent_bonus = min(1.0, np.sqrt(n_agreeing / 2.0))
coverage = n_total_active / 7.0
confidence_numeric = min(1.0, 0.15 + 0.6 * abs(avg_informative) * agent_bonus * coverage)
if n_agreeing >= 3:
confidence_numeric = min(1.0, confidence_numeric + 0.08)
if n_agreeing >= 5:
confidence_numeric = min(1.0, confidence_numeric + 0.08)
elif n_total_active > 0:
# All agents near zero β no information
confidence_numeric = 0.12
else:
confidence_numeric = 0.1
# Step 7: Extract key evidence (top 3 strongest signals from non-failed agents)
key_evidence = []
sorted_agents = sorted(
[(a, f) for a, f in zip(active_agents, failure_probs) if f < 0.5],
key=lambda x: abs(x[0].violation_score),
reverse=True,
)
for agent, fp in sorted_agents[:3]:
direction = "VIOLATED" if agent.violation_score > 0.1 else "COMPLIANT" if agent.violation_score < -0.1 else "NEUTRAL"
key_evidence.append(
f"{agent.agent_name}: {direction} (score={agent.violation_score:.2f}, "
f"conf={agent.confidence:.2f})"
)
# Step 8: Build reasoning tree
reasoning_tree = {
"prior": {"P(Fake)": 0.5, "P(Real)": 0.5},
"agents": {},
"posterior": {
"P(Fake|E)": round(posterior_calibrated, 4),
"P(Real|E)": round(1 - posterior_calibrated, 4),
},
"verdict": verdict,
}
for i, agent in enumerate(active_agents):
reasoning_tree["agents"][agent.agent_name] = {
"violation_score": round(agent.violation_score, 4),
"confidence": round(agent.confidence, 4),
"failure_prob": round(agent.failure_prob, 4),
"likelihood_fake": round(corrected[i][0], 4) if i < len(corrected) else None,
"likelihood_real": round(corrected[i][1], 4) if i < len(corrected) else None,
"status": "VIOLATED" if agent.violation_score > 0.1 else "COMPLIANT" if agent.violation_score < -0.1 else "NEUTRAL",
}
return ForensicVerdict(
probability_fake=round(posterior_calibrated, 4),
confidence=conf_label,
confidence_numeric=round(confidence_numeric, 4),
verdict=verdict,
agent_results=agent_results,
key_evidence=key_evidence,
reasoning_tree=reasoning_tree,
)
|