""" FORENSIQ — Main Gradio Application Physics-Based, Multi-Agent Forensic Framework for Explainable Deepfake Detection """ import os import sys import time import numpy as np import gradio as gr import plotly.graph_objects as go import plotly.express as px from PIL import Image from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Tuple, Any # Import agents from agents.optical_agent import run_optical_agent, AgentEvidence from agents.sensor_agent import run_sensor_agent from agents.model_agent import run_model_agent from agents.statistical_agent import run_statistical_agent from agents.semantic_agent import run_semantic_agent from agents.metadata_agent import run_metadata_agent from agents.text_agent import run_text_agent # Import engine and explanation from bayesian_engine import bayesian_synthesis, ForensicVerdict from explanation import generate_forensic_report, generate_reasoning_tree, generate_court_brief # ─── Agent Orchestrator ────────────────────────────────────────────── def run_all_agents(img: Image.Image) -> Tuple[List[AgentEvidence], ForensicVerdict]: """Run all 7 forensic agents in parallel and synthesize via Bayesian engine.""" if img is None: raise ValueError("No image provided") # Ensure RGB if img.mode != "RGB": img = img.convert("RGB") # Resize if too large (for speed) max_dim = 2048 w, h = img.size if max(w, h) > max_dim: ratio = max_dim / max(w, h) img = img.resize((int(w * ratio), int(h * ratio)), Image.LANCZOS) # ── Capture Modality Detection (runs BEFORE agents) ─────────────── from agents.modality_detector import detect_modality modality = detect_modality(img) adj = modality.score_adjustments print(f"[FORENSIQ] Modality: {modality.modality} (conf={modality.confidence})", file=sys.stderr) print(f"[FORENSIQ] Adjustments: {len(adj)} tests recalibrated", file=sys.stderr) # Signal processing agents (fast, run in parallel with modality adjustments) signal_agents = [ ("optical", lambda i: run_optical_agent(i, adj)), ("sensor", lambda i: run_sensor_agent(i, adj)), ("model", lambda i: run_model_agent(i, adj)), ("statistical", lambda i: run_statistical_agent(i, adj)), ("metadata", lambda i: run_metadata_agent(i, adj)), ] # VLM agents (no modality adjustment needed — VLM sees the image directly) vlm_agents = [ ("semantic", run_semantic_agent), ("text", run_text_agent), ] results = {} with ThreadPoolExecutor(max_workers=7) as executor: futures = {} for name, fn in signal_agents + vlm_agents: futures[executor.submit(fn, img)] = name for future in as_completed(futures): name = futures[future] try: results[name] = future.result() except Exception as e: print(f"[FORENSIQ] Agent '{name}' FAILED: {e}", file=sys.stderr) results[name] = AgentEvidence( agent_name=f"{name.title()} Agent (Error)", violation_score=0.0, confidence=0.0, failure_prob=1.0, rationale=f"Agent failed: {str(e)}", ) # Order agents consistently ordered = [ results.get("optical"), results.get("sensor"), results.get("model"), results.get("statistical"), results.get("semantic"), results.get("metadata"), results.get("text"), ] ordered = [r for r in ordered if r is not None] # Log agent scores for debugging for a in ordered: status = "ACTIVE" if a.failure_prob < 0.8 else "FAILED" print(f"[FORENSIQ] {a.agent_name}: score={a.violation_score:+.3f} conf={a.confidence:.3f} fail={a.failure_prob:.2f} [{status}]", file=sys.stderr) # FIX Bug 3: Filter out failed agents BEFORE Bayesian synthesis # Failed agents (failure_prob >= 0.8) contribute no evidence — they're # ghost entries that shouldn't appear in the reasoning tree or affect # the active agent count display. active_agents = [r for r in ordered if r.failure_prob < 0.8] failed_agents = [r for r in ordered if r.failure_prob >= 0.8] n_active = len(active_agents) n_total = len(ordered) print(f"[FORENSIQ] Active agents: {n_active}/{n_total}", file=sys.stderr) # Bayesian synthesis — only pass active agents verdict = bayesian_synthesis(active_agents) # Attach modality info to verdict for reporting — include ALL indicators for diagnostics verdict.reasoning_tree["modality"] = { "detected": modality.modality, "confidence": modality.confidence, "indicators": {k: v for k, v in modality.indicators.items() if not isinstance(v, np.ndarray) and k != "modality_scores"}, "modality_scores": modality.indicators.get("modality_scores", {}), "adjustments_applied": len(modality.score_adjustments), "adjustments_list": list(modality.score_adjustments.keys())[:10], } # Track failed agents in the tree for transparency if failed_agents: verdict.reasoning_tree["failed_agents"] = [ {"name": a.agent_name, "reason": a.rationale[:200]} for a in failed_agents ] # Generate explanations verdict.forensic_report = generate_forensic_report(verdict) reasoning = generate_reasoning_tree(verdict) verdict.court_brief = generate_court_brief(verdict) # Return ALL agents (including failed) for display, but verdict uses only active return ordered, verdict, reasoning # ─── Visualization Functions ───────────────────────────────────────── def create_gauge_chart(probability: float, verdict: str) -> go.Figure: """Create a gauge chart for the overall probability.""" if probability > 0.65: color = "red" elif probability > 0.45: color = "orange" elif probability > 0.25: color = "gold" else: color = "green" fig = go.Figure(go.Indicator( mode="gauge+number+delta", value=probability * 100, number={"suffix": "%", "font": {"size": 48}}, title={"text": f"Manipulation Probability
{verdict}", "font": {"size": 18}}, gauge={ "axis": {"range": [0, 100], "tickwidth": 2}, "bar": {"color": color, "thickness": 0.3}, "bgcolor": "white", "steps": [ {"range": [0, 25], "color": "rgba(0,180,0,0.15)"}, {"range": [25, 45], "color": "rgba(255,215,0,0.15)"}, {"range": [45, 65], "color": "rgba(255,165,0,0.15)"}, {"range": [65, 100], "color": "rgba(255,0,0,0.15)"}, ], "threshold": { "line": {"color": "black", "width": 3}, "thickness": 0.8, "value": probability * 100, }, }, )) fig.update_layout( height=280, margin=dict(l=30, r=30, t=60, b=20), paper_bgcolor="rgba(0,0,0,0)", font={"family": "Inter, sans-serif"}, ) return fig def create_radar_chart(agent_results: List[AgentEvidence]) -> go.Figure: """Create radar chart showing all agent scores.""" names = [] scores = [] colors = [] for agent in agent_results: short_name = agent.agent_name.replace(" Agent", "").replace(" Characteristics", "") names.append(short_name) display_score = (agent.violation_score + 1) * 50 scores.append(display_score) if agent.violation_score > 0.2: colors.append("red") elif agent.violation_score < -0.1: colors.append("green") else: colors.append("gold") names_closed = names + [names[0]] scores_closed = scores + [scores[0]] fig = go.Figure() fig.add_trace(go.Scatterpolar( r=scores_closed, theta=names_closed, fill="toself", fillcolor="rgba(255, 100, 100, 0.15)", line=dict(color="rgba(255, 50, 50, 0.8)", width=2), name="Violation Score", )) fig.add_trace(go.Scatterpolar( r=[50] * (len(names) + 1), theta=names_closed, line=dict(color="gray", width=1, dash="dash"), name="Neutral (score=0)", showlegend=True, )) fig.update_layout( polar=dict( radialaxis=dict( visible=True, range=[0, 100], tickvals=[0, 25, 50, 75, 100], ticktext=["Authentic", "", "Neutral", "", "Fake"], ), ), height=400, margin=dict(l=60, r=60, t=40, b=40), paper_bgcolor="rgba(0,0,0,0)", font={"family": "Inter, sans-serif", "size": 11}, showlegend=True, legend=dict(x=0, y=-0.15, orientation="h"), ) return fig def create_agent_bar_chart(agent_results: List[AgentEvidence]) -> go.Figure: """Create horizontal bar chart of agent scores.""" names = [] scores = [] colors = [] confidences = [] for agent in sorted(agent_results, key=lambda a: a.violation_score, reverse=True): short = agent.agent_name.replace(" Agent", "") names.append(short) scores.append(agent.violation_score) confidences.append(agent.confidence) if agent.violation_score > 0.2: colors.append("rgba(220, 53, 69, 0.8)") elif agent.violation_score < -0.1: colors.append("rgba(40, 167, 69, 0.8)") else: colors.append("rgba(255, 193, 7, 0.8)") fig = go.Figure() fig.add_trace(go.Bar( y=names, x=scores, orientation="h", marker_color=colors, text=[f"{s:+.2f}" for s in scores], textposition="outside", )) fig.add_vline(x=0, line_dash="dash", line_color="gray") fig.update_layout( xaxis=dict(title="Violation Score (-1=Authentic, +1=Fake)", range=[-1.1, 1.1]), height=350, margin=dict(l=150, r=50, t=20, b=40), paper_bgcolor="rgba(0,0,0,0)", font={"family": "Inter, sans-serif"}, ) return fig def create_ela_display(agent_results: List[AgentEvidence]) -> Image.Image: """Extract ELA image from metadata agent if available.""" for agent in agent_results: if agent.visual_evidence is not None: if isinstance(agent.visual_evidence, Image.Image): return agent.visual_evidence return None def create_fft_display(agent_results: List[AgentEvidence]) -> go.Figure: """Create FFT magnitude spectrum heatmap.""" for agent in agent_results: if agent.agent_name == "Generative Model Agent": for sf in agent.sub_findings: if "magnitude_spectrum" in sf: mag = sf["magnitude_spectrum"] fig = go.Figure(data=go.Heatmap( z=mag, colorscale="Viridis", showscale=True, colorbar=dict(title="Log Magnitude"), )) fig.update_layout( title="2D FFT Magnitude Spectrum", height=400, margin=dict(l=40, r=40, t=50, b=40), paper_bgcolor="rgba(0,0,0,0)", xaxis=dict(showticklabels=False), yaxis=dict(showticklabels=False, scaleanchor="x"), ) return fig fig = go.Figure() fig.update_layout(height=400, title="FFT Spectrum (not available)") return fig def create_noise_map_display(agent_results: List[AgentEvidence]) -> go.Figure: """Create noise residual heatmap.""" for agent in agent_results: if agent.agent_name == "Sensor Characteristics Agent": for sf in agent.sub_findings: if "noise_map" in sf: nm = sf["noise_map"] fig = go.Figure(data=go.Heatmap( z=nm, colorscale="Hot", showscale=True, colorbar=dict(title="Noise Energy"), )) fig.update_layout( title="PRNU Noise Residual Map", height=400, margin=dict(l=40, r=40, t=50, b=40), paper_bgcolor="rgba(0,0,0,0)", xaxis=dict(showticklabels=False), yaxis=dict(showticklabels=False, scaleanchor="x"), ) return fig fig = go.Figure() fig.update_layout(height=400, title="Noise Map (not available)") return fig def create_benford_chart(agent_results: List[AgentEvidence]) -> go.Figure: """Create Benford's Law comparison chart.""" for agent in agent_results: if agent.agent_name == "Statistical Priors Agent": for sf in agent.sub_findings: if "observed" in sf and "benford_expected" in sf: observed = sf["observed"] expected = sf["benford_expected"] digits = list(range(1, 10)) fig = go.Figure() fig.add_trace(go.Bar( x=digits, y=expected, name="Benford's Law (Expected)", marker_color="rgba(100, 150, 255, 0.7)", )) fig.add_trace(go.Bar( x=digits, y=observed, name="Observed Distribution", marker_color="rgba(255, 100, 100, 0.7)", )) fig.update_layout( title=f"Benford's Law Analysis (χ²={sf.get('chi_squared', 0):.5f})", xaxis=dict(title="First Digit", dtick=1), yaxis=dict(title="Proportion"), barmode="group", height=350, margin=dict(l=50, r=30, t=50, b=40), paper_bgcolor="rgba(0,0,0,0)", font={"family": "Inter, sans-serif"}, ) return fig fig = go.Figure() fig.update_layout(height=350, title="Benford's Law (not available)") return fig def format_metadata_table(agent_results: List[AgentEvidence]) -> list: """Extract EXIF data as table rows.""" for agent in agent_results: if agent.agent_name == "Metadata Agent": for sf in agent.sub_findings: if "exif_data" in sf: rows = [[k, v[:100]] for k, v in sf["exif_data"].items()] if not rows: rows = [["(No EXIF data)", "Image has no metadata"]] return rows return [["(Not available)", ""]] # ─── Main Analysis Pipeline ───────────────────────────────────────── def analyze_image(img): """Main entry point for Gradio — runs full FORENSIQ pipeline.""" if img is None: return ( "
Upload an image to begin analysis
", go.Figure(), go.Figure(), go.Figure(), "Upload an image to begin analysis.", "", "", go.Figure(), None, go.Figure(), go.Figure(), [["", ""]], "", ) try: # Convert numpy array to PIL if needed if isinstance(img, np.ndarray): img = Image.fromarray(img) agent_results, verdict, reasoning_tree_md = run_all_agents(img) # Build verdict HTML prob = verdict.probability_fake if prob > 0.65: bg = "linear-gradient(135deg, #dc3545, #c82333)" icon = "🔴" elif prob > 0.52: bg = "linear-gradient(135deg, #fd7e14, #e8590c)" icon = "🟠" elif prob >= 0.48: bg = "linear-gradient(135deg, #6c757d, #495057)" icon = "⚪" elif prob > 0.25: bg = "linear-gradient(135deg, #ffc107, #e0a800)" icon = "🟡" else: bg = "linear-gradient(135deg, #28a745, #218838)" icon = "✅" # Get modality info for display mod_info = verdict.reasoning_tree.get("modality", {}) mod_name = mod_info.get("detected", "UNKNOWN") mod_conf = mod_info.get("confidence", 0) mod_adj = mod_info.get("adjustments_applied", 0) mod_label = { "PORTRAIT_MODE": "📱 Portrait Mode", "MESSAGING": "💬 Messaging App", "SMARTPHONE": "📱 Smartphone", "SCREENSHOT": "🖥️ Screenshot", "DSLR": "📷 DSLR", "MACRO_DSLR": "🔬 Macro DSLR", "UNKNOWN": "❓ Unknown", }.get(mod_name, f"📷 {mod_name}") # Count active agents from verdict (which only has active agents now) n_active = len(verdict.agent_results) verdict_html = f"""
{icon}
{verdict.verdict}
{prob:.1%}
Confidence: {verdict.confidence} | Agents: {n_active}/7 active
Modality: {mod_label} ({mod_conf:.0%}) | {mod_adj} test(s) recalibrated
""" # Create all visualizations gauge = create_gauge_chart(verdict.probability_fake, verdict.verdict) radar = create_radar_chart(agent_results) bar_chart = create_agent_bar_chart(agent_results) ela_img = create_ela_display(agent_results) fft_fig = create_fft_display(agent_results) noise_fig = create_noise_map_display(agent_results) benford_fig = create_benford_chart(agent_results) metadata_rows = format_metadata_table(agent_results) return ( verdict_html, gauge, radar, bar_chart, verdict.forensic_report, reasoning_tree_md, verdict.court_brief, fft_fig, ela_img, noise_fig, benford_fig, metadata_rows, _build_agent_details_md(agent_results), ) except Exception as e: import traceback traceback.print_exc(file=sys.stderr) error_html = f"""
⚠️
Analysis Error
{str(e)}
""" empty_fig = go.Figure() return ( error_html, empty_fig, empty_fig, empty_fig, f"Error during analysis: {str(e)}", "", "", empty_fig, None, empty_fig, empty_fig, [["Error", str(e)]], "", ) def _build_agent_details_md(agent_results: List[AgentEvidence]) -> str: """Build detailed agent findings markdown.""" md = "" for agent in agent_results: if agent.violation_score > 0.2: badge = "🔴 VIOLATED" elif agent.violation_score < -0.1: badge = "🟢 COMPLIANT" elif agent.failure_prob > 0.7: badge = "⚪ SKIPPED" else: badge = "🟡 NEUTRAL" md += f"### {agent.agent_name} — {badge}\n\n" md += f"**Score:** {agent.violation_score:+.3f} | " md += f"**Confidence:** {agent.confidence:.1%} | " md += f"**Failure:** {agent.failure_prob:.1%}\n\n" for sf in agent.sub_findings: test = sf.get("test", "") note = sf.get("note", "") s = sf.get("score", 0) ic = "🔴" if s > 0.2 else "🟢" if s < -0.1 else "🟡" # Show modality adjustment info if sf.get("modality_adjusted"): adj_info = f" [×{sf.get('adjustment_multiplier', '?')} modality]" else: adj_info = "" md += f"- {ic} **{test}** ({s:+.2f}{adj_info}): {note}\n" md += "\n---\n\n" return md # ─── Gradio UI ─────────────────────────────────────────────────────── CUSTOM_CSS = """ .gradio-container { max-width: 1400px !important; font-family: 'Inter', sans-serif !important; } .main-title { text-align: center; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); -webkit-background-clip: text; -webkit-text-fill-color: transparent; font-size: 2.5em !important; font-weight: 800 !important; margin-bottom: 0 !important; } .subtitle { text-align: center; color: #6c757d; font-size: 1.1em; margin-top: 0; } .tab-content { padding: 10px; } footer { display: none !important; } """ HEADER_MD = """

🔬 FORENSIQ

Physics-Based, Multi-Agent Forensic Framework for Explainable Deepfake Detection

7 Independent Forensic Agents • Bayesian Evidence Synthesis • Court-Admissible Reports

""" def build_app(): with gr.Blocks( title="FORENSIQ — Deepfake Detection", theme=gr.themes.Soft( primary_hue="purple", secondary_hue="blue", ), css=CUSTOM_CSS, ) as demo: gr.HTML(HEADER_MD) with gr.Row(equal_height=False): # Left column: input with gr.Column(scale=1, min_width=300): image_input = gr.Image( label="📷 Upload Suspect Image", type="pil", height=350, sources=["upload", "clipboard"], ) analyze_btn = gr.Button( "🔬 Run Forensic Analysis", variant="primary", size="lg", ) gr.Markdown("""
Supported: JPEG, PNG, WebP, BMP, TIFF
Agents: Optical Physics • Sensor • Generative Model • Statistical • Semantic • Metadata • Text
Engine: Bayesian Evidence Synthesis with Independence Correction
""") gr.HTML("""
⚡ VLM Status: Semantic & Text agents require HF Inference credits. Without credits, 5/7 agents run (signal processing only). For full 7/7 analysis including anatomy, lighting, and physics checks, ensure your HF account has active inference credits or a PRO subscription.
""") # Right column: verdict with gr.Column(scale=1, min_width=300): verdict_html = gr.HTML( value="
Upload an image and click Analyze
" ) gauge_plot = gr.Plot(label="Confidence Gauge") # Tabs for detailed results with gr.Tabs(): with gr.Tab("📊 Overview"): with gr.Row(): radar_plot = gr.Plot(label="Agent Scores Radar") bar_plot = gr.Plot(label="Agent Violation Scores") agent_details_md = gr.Markdown(label="Agent Details") with gr.Tab("🔊 Frequency Analysis"): with gr.Row(): fft_plot = gr.Plot(label="FFT Magnitude Spectrum") benford_plot = gr.Plot(label="Benford's Law Analysis") with gr.Tab("🔬 Signal Forensics"): with gr.Row(): noise_plot = gr.Plot(label="PRNU Noise Residual Map") ela_image = gr.Image(label="Error Level Analysis (ELA)", type="pil") with gr.Tab("📋 Metadata"): metadata_table = gr.Dataframe( headers=["Field", "Value"], label="EXIF Metadata", wrap=True, ) with gr.Tab("📄 Forensic Report"): report_md = gr.Markdown(label="Full Forensic Report") with gr.Tab("🌳 Reasoning Tree"): tree_md = gr.Markdown(label="Reasoning Tree") with gr.Tab("⚖️ Court Brief"): court_md = gr.Markdown(label="Court Brief (FRE 702)") with gr.Tab("📥 Export"): gr.Markdown("""### Export Forensic Report Export the complete analysis in your preferred format. Reports are professionally formatted using **Qwen2.5-72B-Instruct** when available. """) with gr.Row(): export_pdf_btn = gr.Button("📄 Export PDF", variant="primary") export_docx_btn = gr.Button("📝 Export DOCX", variant="primary") export_txt_btn = gr.Button("📃 Export TXT", variant="secondary") export_md_btn = gr.Button("📋 Export Markdown", variant="secondary") export_file = gr.File(label="Download Report", visible=True) export_status = gr.Markdown("") # ── Hidden state to store report data for exports ──────────── report_state = gr.State("") court_state = gr.State("") tree_state = gr.State("") # Wire up the analysis — also store report data in state def analyze_and_store(img): results = analyze_image(img) # results[4] = report_md, results[5] = tree_md, results[6] = court_md return list(results) + [results[4], results[6], results[5]] analyze_btn.click( fn=analyze_and_store, inputs=[image_input], outputs=[ verdict_html, gauge_plot, radar_plot, bar_plot, report_md, tree_md, court_md, fft_plot, ela_image, noise_plot, benford_plot, metadata_table, agent_details_md, report_state, court_state, tree_state, ], ) # ── Export handlers ─────────────────────────────────────────── from export import export_pdf, export_docx, export_txt, export_md def do_export_pdf(report, court, tree): if not report: return None, "⚠️ Run analysis first" try: path = export_pdf(report, court, tree) return path, "✅ PDF exported successfully" except Exception as e: return None, f"❌ Export failed: {e}" def do_export_docx(report, court, tree): if not report: return None, "⚠️ Run analysis first" try: path = export_docx(report, court, tree) return path, "✅ DOCX exported successfully" except Exception as e: return None, f"❌ Export failed: {e}" def do_export_txt(report, court, tree): if not report: return None, "⚠️ Run analysis first" try: path = export_txt(report, court, tree) return path, "✅ TXT exported successfully" except Exception as e: return None, f"❌ Export failed: {e}" def do_export_md(report, court, tree): if not report: return None, "⚠️ Run analysis first" try: path = export_md(report, court, tree) return path, "✅ Markdown exported successfully" except Exception as e: return None, f"❌ Export failed: {e}" export_pdf_btn.click(fn=do_export_pdf, inputs=[report_state, court_state, tree_state], outputs=[export_file, export_status]) export_docx_btn.click(fn=do_export_docx, inputs=[report_state, court_state, tree_state], outputs=[export_file, export_status]) export_txt_btn.click(fn=do_export_txt, inputs=[report_state, court_state, tree_state], outputs=[export_file, export_status]) export_md_btn.click(fn=do_export_md, inputs=[report_state, court_state, tree_state], outputs=[export_file, export_status]) # Footer gr.HTML("""
FORENSIQ v1.0 — Physics-Based Multi-Agent Forensic Framework
7 Agents • 127+ Physical Constraints • Bayesian Evidence Synthesis • Court-Admissible Reports
Powered by Qwen2.5-VL for semantic analysis • Signal processing via NumPy/SciPy
""") return demo if __name__ == "__main__": demo = build_app() demo.launch(server_name="0.0.0.0", server_port=7860)