"""
FORENSIQ — Main Gradio Application
Physics-Based, Multi-Agent Forensic Framework for Explainable Deepfake Detection
"""
import os
import sys
import time
import numpy as np
import gradio as gr
import plotly.graph_objects as go
import plotly.express as px
from PIL import Image
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Tuple, Any
# Import agents
from agents.optical_agent import run_optical_agent, AgentEvidence
from agents.sensor_agent import run_sensor_agent
from agents.model_agent import run_model_agent
from agents.statistical_agent import run_statistical_agent
from agents.semantic_agent import run_semantic_agent
from agents.metadata_agent import run_metadata_agent
from agents.text_agent import run_text_agent
# Import engine and explanation
from bayesian_engine import bayesian_synthesis, ForensicVerdict
from explanation import generate_forensic_report, generate_reasoning_tree, generate_court_brief
# ─── Agent Orchestrator ──────────────────────────────────────────────
def run_all_agents(img: Image.Image) -> Tuple[List[AgentEvidence], ForensicVerdict]:
"""Run all 7 forensic agents in parallel and synthesize via Bayesian engine."""
if img is None:
raise ValueError("No image provided")
# Ensure RGB
if img.mode != "RGB":
img = img.convert("RGB")
# Resize if too large (for speed)
max_dim = 2048
w, h = img.size
if max(w, h) > max_dim:
ratio = max_dim / max(w, h)
img = img.resize((int(w * ratio), int(h * ratio)), Image.LANCZOS)
# ── Capture Modality Detection (runs BEFORE agents) ───────────────
from agents.modality_detector import detect_modality
modality = detect_modality(img)
adj = modality.score_adjustments
print(f"[FORENSIQ] Modality: {modality.modality} (conf={modality.confidence})", file=sys.stderr)
print(f"[FORENSIQ] Adjustments: {len(adj)} tests recalibrated", file=sys.stderr)
# Signal processing agents (fast, run in parallel with modality adjustments)
signal_agents = [
("optical", lambda i: run_optical_agent(i, adj)),
("sensor", lambda i: run_sensor_agent(i, adj)),
("model", lambda i: run_model_agent(i, adj)),
("statistical", lambda i: run_statistical_agent(i, adj)),
("metadata", lambda i: run_metadata_agent(i, adj)),
]
# VLM agents (no modality adjustment needed — VLM sees the image directly)
vlm_agents = [
("semantic", run_semantic_agent),
("text", run_text_agent),
]
results = {}
with ThreadPoolExecutor(max_workers=7) as executor:
futures = {}
for name, fn in signal_agents + vlm_agents:
futures[executor.submit(fn, img)] = name
for future in as_completed(futures):
name = futures[future]
try:
results[name] = future.result()
except Exception as e:
print(f"[FORENSIQ] Agent '{name}' FAILED: {e}", file=sys.stderr)
results[name] = AgentEvidence(
agent_name=f"{name.title()} Agent (Error)",
violation_score=0.0,
confidence=0.0,
failure_prob=1.0,
rationale=f"Agent failed: {str(e)}",
)
# Order agents consistently
ordered = [
results.get("optical"),
results.get("sensor"),
results.get("model"),
results.get("statistical"),
results.get("semantic"),
results.get("metadata"),
results.get("text"),
]
ordered = [r for r in ordered if r is not None]
# Log agent scores for debugging
for a in ordered:
status = "ACTIVE" if a.failure_prob < 0.8 else "FAILED"
print(f"[FORENSIQ] {a.agent_name}: score={a.violation_score:+.3f} conf={a.confidence:.3f} fail={a.failure_prob:.2f} [{status}]", file=sys.stderr)
# FIX Bug 3: Filter out failed agents BEFORE Bayesian synthesis
# Failed agents (failure_prob >= 0.8) contribute no evidence — they're
# ghost entries that shouldn't appear in the reasoning tree or affect
# the active agent count display.
active_agents = [r for r in ordered if r.failure_prob < 0.8]
failed_agents = [r for r in ordered if r.failure_prob >= 0.8]
n_active = len(active_agents)
n_total = len(ordered)
print(f"[FORENSIQ] Active agents: {n_active}/{n_total}", file=sys.stderr)
# Bayesian synthesis — only pass active agents
verdict = bayesian_synthesis(active_agents)
# Attach modality info to verdict for reporting — include ALL indicators for diagnostics
verdict.reasoning_tree["modality"] = {
"detected": modality.modality,
"confidence": modality.confidence,
"indicators": {k: v for k, v in modality.indicators.items()
if not isinstance(v, np.ndarray) and k != "modality_scores"},
"modality_scores": modality.indicators.get("modality_scores", {}),
"adjustments_applied": len(modality.score_adjustments),
"adjustments_list": list(modality.score_adjustments.keys())[:10],
}
# Track failed agents in the tree for transparency
if failed_agents:
verdict.reasoning_tree["failed_agents"] = [
{"name": a.agent_name, "reason": a.rationale[:200]}
for a in failed_agents
]
# Generate explanations
verdict.forensic_report = generate_forensic_report(verdict)
reasoning = generate_reasoning_tree(verdict)
verdict.court_brief = generate_court_brief(verdict)
# Return ALL agents (including failed) for display, but verdict uses only active
return ordered, verdict, reasoning
# ─── Visualization Functions ─────────────────────────────────────────
def create_gauge_chart(probability: float, verdict: str) -> go.Figure:
"""Create a gauge chart for the overall probability."""
if probability > 0.65:
color = "red"
elif probability > 0.45:
color = "orange"
elif probability > 0.25:
color = "gold"
else:
color = "green"
fig = go.Figure(go.Indicator(
mode="gauge+number+delta",
value=probability * 100,
number={"suffix": "%", "font": {"size": 48}},
title={"text": f"Manipulation Probability
{verdict}",
"font": {"size": 18}},
gauge={
"axis": {"range": [0, 100], "tickwidth": 2},
"bar": {"color": color, "thickness": 0.3},
"bgcolor": "white",
"steps": [
{"range": [0, 25], "color": "rgba(0,180,0,0.15)"},
{"range": [25, 45], "color": "rgba(255,215,0,0.15)"},
{"range": [45, 65], "color": "rgba(255,165,0,0.15)"},
{"range": [65, 100], "color": "rgba(255,0,0,0.15)"},
],
"threshold": {
"line": {"color": "black", "width": 3},
"thickness": 0.8,
"value": probability * 100,
},
},
))
fig.update_layout(
height=280,
margin=dict(l=30, r=30, t=60, b=20),
paper_bgcolor="rgba(0,0,0,0)",
font={"family": "Inter, sans-serif"},
)
return fig
def create_radar_chart(agent_results: List[AgentEvidence]) -> go.Figure:
"""Create radar chart showing all agent scores."""
names = []
scores = []
colors = []
for agent in agent_results:
short_name = agent.agent_name.replace(" Agent", "").replace(" Characteristics", "")
names.append(short_name)
display_score = (agent.violation_score + 1) * 50
scores.append(display_score)
if agent.violation_score > 0.2:
colors.append("red")
elif agent.violation_score < -0.1:
colors.append("green")
else:
colors.append("gold")
names_closed = names + [names[0]]
scores_closed = scores + [scores[0]]
fig = go.Figure()
fig.add_trace(go.Scatterpolar(
r=scores_closed,
theta=names_closed,
fill="toself",
fillcolor="rgba(255, 100, 100, 0.15)",
line=dict(color="rgba(255, 50, 50, 0.8)", width=2),
name="Violation Score",
))
fig.add_trace(go.Scatterpolar(
r=[50] * (len(names) + 1),
theta=names_closed,
line=dict(color="gray", width=1, dash="dash"),
name="Neutral (score=0)",
showlegend=True,
))
fig.update_layout(
polar=dict(
radialaxis=dict(
visible=True, range=[0, 100],
tickvals=[0, 25, 50, 75, 100],
ticktext=["Authentic", "", "Neutral", "", "Fake"],
),
),
height=400,
margin=dict(l=60, r=60, t=40, b=40),
paper_bgcolor="rgba(0,0,0,0)",
font={"family": "Inter, sans-serif", "size": 11},
showlegend=True,
legend=dict(x=0, y=-0.15, orientation="h"),
)
return fig
def create_agent_bar_chart(agent_results: List[AgentEvidence]) -> go.Figure:
"""Create horizontal bar chart of agent scores."""
names = []
scores = []
colors = []
confidences = []
for agent in sorted(agent_results, key=lambda a: a.violation_score, reverse=True):
short = agent.agent_name.replace(" Agent", "")
names.append(short)
scores.append(agent.violation_score)
confidences.append(agent.confidence)
if agent.violation_score > 0.2:
colors.append("rgba(220, 53, 69, 0.8)")
elif agent.violation_score < -0.1:
colors.append("rgba(40, 167, 69, 0.8)")
else:
colors.append("rgba(255, 193, 7, 0.8)")
fig = go.Figure()
fig.add_trace(go.Bar(
y=names,
x=scores,
orientation="h",
marker_color=colors,
text=[f"{s:+.2f}" for s in scores],
textposition="outside",
))
fig.add_vline(x=0, line_dash="dash", line_color="gray")
fig.update_layout(
xaxis=dict(title="Violation Score (-1=Authentic, +1=Fake)", range=[-1.1, 1.1]),
height=350,
margin=dict(l=150, r=50, t=20, b=40),
paper_bgcolor="rgba(0,0,0,0)",
font={"family": "Inter, sans-serif"},
)
return fig
def create_ela_display(agent_results: List[AgentEvidence]) -> Image.Image:
"""Extract ELA image from metadata agent if available."""
for agent in agent_results:
if agent.visual_evidence is not None:
if isinstance(agent.visual_evidence, Image.Image):
return agent.visual_evidence
return None
def create_fft_display(agent_results: List[AgentEvidence]) -> go.Figure:
"""Create FFT magnitude spectrum heatmap."""
for agent in agent_results:
if agent.agent_name == "Generative Model Agent":
for sf in agent.sub_findings:
if "magnitude_spectrum" in sf:
mag = sf["magnitude_spectrum"]
fig = go.Figure(data=go.Heatmap(
z=mag,
colorscale="Viridis",
showscale=True,
colorbar=dict(title="Log Magnitude"),
))
fig.update_layout(
title="2D FFT Magnitude Spectrum",
height=400,
margin=dict(l=40, r=40, t=50, b=40),
paper_bgcolor="rgba(0,0,0,0)",
xaxis=dict(showticklabels=False),
yaxis=dict(showticklabels=False, scaleanchor="x"),
)
return fig
fig = go.Figure()
fig.update_layout(height=400, title="FFT Spectrum (not available)")
return fig
def create_noise_map_display(agent_results: List[AgentEvidence]) -> go.Figure:
"""Create noise residual heatmap."""
for agent in agent_results:
if agent.agent_name == "Sensor Characteristics Agent":
for sf in agent.sub_findings:
if "noise_map" in sf:
nm = sf["noise_map"]
fig = go.Figure(data=go.Heatmap(
z=nm,
colorscale="Hot",
showscale=True,
colorbar=dict(title="Noise Energy"),
))
fig.update_layout(
title="PRNU Noise Residual Map",
height=400,
margin=dict(l=40, r=40, t=50, b=40),
paper_bgcolor="rgba(0,0,0,0)",
xaxis=dict(showticklabels=False),
yaxis=dict(showticklabels=False, scaleanchor="x"),
)
return fig
fig = go.Figure()
fig.update_layout(height=400, title="Noise Map (not available)")
return fig
def create_benford_chart(agent_results: List[AgentEvidence]) -> go.Figure:
"""Create Benford's Law comparison chart."""
for agent in agent_results:
if agent.agent_name == "Statistical Priors Agent":
for sf in agent.sub_findings:
if "observed" in sf and "benford_expected" in sf:
observed = sf["observed"]
expected = sf["benford_expected"]
digits = list(range(1, 10))
fig = go.Figure()
fig.add_trace(go.Bar(
x=digits, y=expected,
name="Benford's Law (Expected)",
marker_color="rgba(100, 150, 255, 0.7)",
))
fig.add_trace(go.Bar(
x=digits, y=observed,
name="Observed Distribution",
marker_color="rgba(255, 100, 100, 0.7)",
))
fig.update_layout(
title=f"Benford's Law Analysis (χ²={sf.get('chi_squared', 0):.5f})",
xaxis=dict(title="First Digit", dtick=1),
yaxis=dict(title="Proportion"),
barmode="group",
height=350,
margin=dict(l=50, r=30, t=50, b=40),
paper_bgcolor="rgba(0,0,0,0)",
font={"family": "Inter, sans-serif"},
)
return fig
fig = go.Figure()
fig.update_layout(height=350, title="Benford's Law (not available)")
return fig
def format_metadata_table(agent_results: List[AgentEvidence]) -> list:
"""Extract EXIF data as table rows."""
for agent in agent_results:
if agent.agent_name == "Metadata Agent":
for sf in agent.sub_findings:
if "exif_data" in sf:
rows = [[k, v[:100]] for k, v in sf["exif_data"].items()]
if not rows:
rows = [["(No EXIF data)", "Image has no metadata"]]
return rows
return [["(Not available)", ""]]
# ─── Main Analysis Pipeline ─────────────────────────────────────────
def analyze_image(img):
"""Main entry point for Gradio — runs full FORENSIQ pipeline."""
if img is None:
return (
"
Physics-Based, Multi-Agent Forensic Framework for Explainable Deepfake Detection
7 Independent Forensic Agents • Bayesian Evidence Synthesis • Court-Admissible Reports