| from typing import Optional, Tuple |
| from fastapi import FastAPI, UploadFile, File, Form |
| from fastapi.responses import FileResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from PIL import Image, ExifTags |
| import io |
| import hashlib |
| import httpx |
| import os |
| import base64 |
| import json |
| import asyncio |
| import cv2 |
| import tempfile |
| import fitz |
| import pypdf |
|
|
| |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
| HF_API_KEY = os.getenv("HF_API_KEY") |
| GROQ_API_URL = "https://api.groq.com/openai/v1/chat/completions" |
| ROBERTA_FAKE_NEWS_URL = "https://router.huggingface.co/hf-inference/models/hamzab/roberta-fake-news-classification" |
| ROBERTA_AI_TEXT_URL = "https://router.huggingface.co/hf-inference/models/openai-community/roberta-base-openai-detector" |
|
|
| |
| app = FastAPI() |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| @app.get("/") |
| def read_root(): |
| return FileResponse("index.html") |
|
|
|
|
| |
| |
| |
|
|
| def calculate_sha256(contents: bytes): |
| return hashlib.sha256(contents).hexdigest() |
|
|
|
|
| def calculate_metadata_risk(image: Image.Image): |
| risk = 0.0 |
| try: |
| exif = image._getexif() |
| if exif is None: |
| risk += 0.1 |
| else: |
| for tag, value in exif.items(): |
| decoded = ExifTags.TAGS.get(tag, tag) |
| if decoded == "Software": |
| risk += 0.2 |
| except Exception: |
| risk += 0.1 |
| return min(risk, 1.0) |
|
|
|
|
| def fusion_score(model_score: float, metadata_risk: float): |
| final = 0.9 * model_score + 0.1 * metadata_risk |
| authenticity = (1 - final) * 100 |
| fake = final * 100 |
| return authenticity, fake |
|
|
|
|
| def normalize_output(label_prob_dict: dict) -> float: |
| FAKE_KEYWORDS = ["fake", "ai", "generated", "manipulated", "deepfake", "artificial", "synthetic", "machine"] |
| REAL_KEYWORDS = ["real", "authentic", "genuine", "human", "original"] |
|
|
| fake_score = 0.0 |
| uncertain_score = 0.0 |
|
|
| for label, prob in label_prob_dict.items(): |
| label_lower = label.lower() |
| if any(k in label_lower for k in FAKE_KEYWORDS): |
| fake_score += prob |
| elif any(k in label_lower for k in REAL_KEYWORDS): |
| pass |
| else: |
| uncertain_score += prob |
|
|
| fake_score += 0.4 * uncertain_score |
| return min(fake_score, 1.0) |
|
|
|
|
| def make_confidence(authenticity, fake): |
| diff = abs(authenticity - fake) |
| return "low" if diff < 20 else "medium" if diff < 40 else "high" |
|
|
|
|
| |
| |
| |
|
|
| async def call_groq_vision(contents: bytes) -> Tuple[Optional[float], str]: |
| if not GROQ_API_KEY: |
| print("No GROQ_API_KEY set") |
| return None, "" |
| try: |
| base64_image = base64.b64encode(contents).decode('utf-8') |
| payload = { |
| "model": "meta-llama/llama-4-scout-17b-16e-instruct", |
| "messages": [ |
| { |
| "role": "user", |
| "content": [ |
| { |
| "type": "text", |
| "text": """You are a forensic image analyst expert. Analyze this image for signs of AI generation or manipulation. |
| |
| Look for: |
| - Unnatural skin texture or too-perfect features |
| - Inconsistent lighting or shadows |
| - Background anomalies or blurring |
| - Artifacts typical of diffusion models (Midjourney, DALL-E, Stable Diffusion) |
| - Overly smooth or painterly textures |
| - Unnatural hair or eye details |
| - Signs of face swapping or deepfake manipulation |
| - EXIF/compression patterns typical of AI tools |
| |
| Respond ONLY in this exact JSON format, nothing else: |
| {"fake_probability": 0.0, "reasoning": "brief reason"} |
| |
| fake_probability must be between 0.0 (definitely real) and 1.0 (definitely AI/fake).""" |
| }, |
| { |
| "type": "image_url", |
| "image_url": { |
| "url": f"data:image/jpeg;base64,{base64_image}" |
| } |
| } |
| ] |
| } |
| ], |
| "max_tokens": 200, |
| "temperature": 0.1 |
| } |
|
|
| async with httpx.AsyncClient(timeout=30.0) as client: |
| response = await client.post( |
| GROQ_API_URL, |
| headers={ |
| "Authorization": f"Bearer {GROQ_API_KEY}", |
| "Content-Type": "application/json" |
| }, |
| json=payload |
| ) |
| response.raise_for_status() |
| data = response.json() |
| text = data["choices"][0]["message"]["content"] |
| print(f"Groq vision response: {text}") |
| clean = text.strip().replace("```json", "").replace("```", "") |
| result = json.loads(clean) |
| return float(result["fake_probability"]), result.get("reasoning", "") |
|
|
| except Exception as e: |
| print(f"Groq vision failed: {e}") |
| return None, "" |
|
|
|
|
| |
| |
| |
|
|
| async def call_groq_text(text: str) -> Tuple[Optional[float], str]: |
| if not GROQ_API_KEY: |
| return None, "" |
| try: |
| payload = { |
| "model": "llama-3.3-70b-versatile", |
| "messages": [ |
| { |
| "role": "user", |
| "content": f"""You are a forensic text analyst. Analyze the following text and determine if it is AI-generated or written by a human. Also check if it could be a forged government document or fake news. |
| |
| Look for: |
| - Overly formal or repetitive sentence structure typical of LLMs |
| - Lack of personal voice or human inconsistencies |
| - Suspiciously perfect grammar with no natural errors |
| - Generic phrasing commonly used by AI models |
| - For government documents: inconsistent terminology, wrong formats, suspicious clauses |
| - For news: sensational language, lack of credible sources, misleading framing |
| |
| Text to analyze: |
| \"\"\" |
| {text[:4000]} |
| \"\"\" |
| |
| Respond ONLY in this exact JSON format, nothing else: |
| {{"fake_probability": 0.0, "reasoning": "brief reason"}} |
| |
| fake_probability must be between 0.0 (definitely human/authentic) and 1.0 (definitely AI-generated/forged).""" |
| } |
| ], |
| "max_tokens": 200, |
| "temperature": 0.1 |
| } |
|
|
| async with httpx.AsyncClient(timeout=30.0) as client: |
| response = await client.post( |
| GROQ_API_URL, |
| headers={ |
| "Authorization": f"Bearer {GROQ_API_KEY}", |
| "Content-Type": "application/json" |
| }, |
| json=payload |
| ) |
| response.raise_for_status() |
| data = response.json() |
| text_response = data["choices"][0]["message"]["content"] |
| print(f"Groq text response: {text_response}") |
| clean = text_response.strip().replace("```json", "").replace("```", "") |
| result = json.loads(clean) |
| return float(result["fake_probability"]), result.get("reasoning", "") |
|
|
| except Exception as e: |
| print(f"Groq text failed: {e}") |
| return None, "" |
|
|
|
|
| |
| |
| |
|
|
| async def call_roberta(url: str, text: str, name: str) -> Optional[float]: |
| if not HF_API_KEY: |
| print(f"No HF_API_KEY, skipping {name}") |
| return None |
| try: |
| async with httpx.AsyncClient(timeout=30.0) as client: |
| response = await client.post( |
| url, |
| headers={"Authorization": f"Bearer {HF_API_KEY}"}, |
| json={"inputs": text[:512]} |
| ) |
| response.raise_for_status() |
| data = response.json() |
| print(f"{name} response: {data}") |
| label_prob_dict = {item["label"]: item["score"] for item in data[0]} |
| return normalize_output(label_prob_dict) |
| except Exception as e: |
| print(f"{name} failed: {e}") |
| return None |
|
|
|
|
| |
| |
| |
|
|
| async def analyze_image(contents: bytes, content_type: str = "image/jpeg"): |
| image = Image.open(io.BytesIO(contents)).convert("RGB") |
|
|
| if len(contents) > 20 * 1024 * 1024: |
| print("Image too large for Groq") |
| score, reasoning = None, "Image too large for analysis" |
| else: |
| score, reasoning = await call_groq_vision(contents) |
|
|
| combined_model_score = score if score is not None else 0.5 |
| models_used = ["Groq_Llama4"] if score is not None else [] |
|
|
| metadata_risk = calculate_metadata_risk(image) |
| authenticity, fake = fusion_score(combined_model_score, metadata_risk) |
|
|
| return { |
| "type": "image", |
| "authenticity": round(authenticity, 2), |
| "fake": round(fake, 2), |
| "confidence_level": make_confidence(authenticity, fake), |
| "models_used": models_used, |
| "details": { |
| "groq_score": round(score, 4) if score is not None else "unavailable", |
| "groq_reasoning": reasoning, |
| "metadata_risk": round(metadata_risk, 4), |
| } |
| } |
|
|
|
|
| async def analyze_video(contents: bytes): |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as f: |
| f.write(contents) |
| tmp_path = f.name |
|
|
| try: |
| cap = cv2.VideoCapture(tmp_path) |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| duration = round(frame_count / fps, 1) if fps > 0 else 0 |
|
|
| sample_indices = [int(frame_count * i / 5) for i in range(5)] |
| frames = [] |
|
|
| for idx in sample_indices: |
| cap.set(cv2.CAP_PROP_POS_FRAMES, idx) |
| ret, frame = cap.read() |
| if ret: |
| pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
| buf = io.BytesIO() |
| pil_img.save(buf, format="JPEG", quality=85) |
| frames.append(buf.getvalue()) |
|
|
| cap.release() |
| os.unlink(tmp_path) |
|
|
| if not frames: |
| return { |
| "type": "video", |
| "authenticity": 50.0, |
| "fake": 50.0, |
| "confidence_level": "low", |
| "models_used": [], |
| "details": { |
| "groq_score": "unavailable", |
| "groq_reasoning": "Could not extract frames from video.", |
| "metadata_risk": 0.0, |
| "frames_analyzed": 0, |
| "video_duration": duration |
| } |
| } |
|
|
| scores = [] |
| reasonings = [] |
| for i, frame_bytes in enumerate(frames): |
| print(f"Analyzing frame {i+1}/{len(frames)}") |
| score, reasoning = await call_groq_vision(frame_bytes) |
| if score is not None: |
| scores.append(score) |
| reasonings.append(f"Frame {i+1}: {reasoning}") |
| if i < len(frames) - 1: |
| await asyncio.sleep(2) |
|
|
| combined_model_score = sum(scores) / len(scores) if scores else 0.5 |
| models_used = ["Groq_Llama4"] if scores else [] |
| groq_reasoning = " | ".join(reasonings) if reasonings else "All frame analyses failed." |
|
|
| authenticity = round((1 - combined_model_score) * 100, 2) |
| fake = round(combined_model_score * 100, 2) |
|
|
| return { |
| "type": "video", |
| "authenticity": authenticity, |
| "fake": fake, |
| "confidence_level": make_confidence(authenticity, fake), |
| "models_used": models_used, |
| "details": { |
| "groq_score": round(combined_model_score, 4), |
| "groq_reasoning": groq_reasoning, |
| "metadata_risk": 0.0, |
| "frames_analyzed": len(scores), |
| "video_duration": duration |
| } |
| } |
|
|
| except Exception as e: |
| print(f"Video analysis failed: {e}") |
| if os.path.exists(tmp_path): |
| os.unlink(tmp_path) |
| return { |
| "type": "video", |
| "authenticity": 50.0, |
| "fake": 50.0, |
| "confidence_level": "low", |
| "models_used": [], |
| "details": { |
| "groq_score": "unavailable", |
| "groq_reasoning": f"Analysis failed: {str(e)}", |
| "metadata_risk": 0.0, |
| "frames_analyzed": 0, |
| "video_duration": 0 |
| } |
| } |
|
|
|
|
| async def analyze_text(text: str): |
| |
| results = await asyncio.gather( |
| call_roberta(ROBERTA_FAKE_NEWS_URL, text, "RoBERTa_FakeNews"), |
| call_roberta(ROBERTA_AI_TEXT_URL, text, "RoBERTa_AIDetector"), |
| call_groq_text(text) |
| ) |
|
|
| score1 = results[0] |
| score2 = results[1] |
| score3, reasoning = results[2] |
|
|
| scores = [(s, n) for s, n in [ |
| (score1, "RoBERTa_FakeNews"), |
| (score2, "RoBERTa_AIDetector"), |
| (score3, "Groq_Llama3") |
| ] if s is not None] |
|
|
| combined = sum(s for s, _ in scores) / len(scores) if scores else 0.5 |
| models_used = [n for _, n in scores] |
|
|
| authenticity = round((1 - combined) * 100, 2) |
| fake = round(combined * 100, 2) |
|
|
| return { |
| "type": "text", |
| "authenticity": authenticity, |
| "fake": fake, |
| "confidence_level": make_confidence(authenticity, fake), |
| "models_used": models_used, |
| "details": { |
| "groq_score": round(score3, 4) if score3 is not None else "unavailable", |
| "roberta_fakenews_score": round(score1, 4) if score1 is not None else "unavailable", |
| "roberta_aidetector_score": round(score2, 4) if score2 is not None else "unavailable", |
| "groq_reasoning": reasoning, |
| "metadata_risk": 0.0, |
| } |
| } |
|
|
|
|
| async def analyze_pdf(contents: bytes): |
| scores = [] |
| reasonings = [] |
|
|
| try: |
| |
| reader = pypdf.PdfReader(io.BytesIO(contents)) |
| full_text = "" |
| for page in reader.pages: |
| full_text += page.extract_text() or "" |
|
|
| if full_text.strip(): |
| print(f"Extracted {len(full_text)} chars from PDF") |
| text_results = await asyncio.gather( |
| call_roberta(ROBERTA_FAKE_NEWS_URL, full_text, "RoBERTa_FakeNews"), |
| call_roberta(ROBERTA_AI_TEXT_URL, full_text, "RoBERTa_AIDetector"), |
| call_groq_text(full_text) |
| ) |
| s1 = text_results[0] |
| s2 = text_results[1] |
| s3, text_reasoning = text_results[2] |
|
|
| if s1 is not None: |
| scores.append(s1) |
| reasonings.append(f"RoBERTa FakeNews: {round(s1*100)}% fake") |
| if s2 is not None: |
| scores.append(s2) |
| reasonings.append(f"RoBERTa AI Detector: {round(s2*100)}% AI-generated") |
| if s3 is not None: |
| scores.append(s3) |
| reasonings.append(f"Groq text: {text_reasoning}") |
|
|
| |
| doc = fitz.open(stream=contents, filetype="pdf") |
| image_count = 0 |
| for page in doc: |
| for img in page.get_images(): |
| if image_count >= 3: |
| break |
| xref = img[0] |
| base_image = doc.extract_image(xref) |
| img_bytes = base_image["image"] |
| await asyncio.sleep(2) |
| img_score, img_reasoning = await call_groq_vision(img_bytes) |
| if img_score is not None: |
| scores.append(img_score) |
| reasonings.append(f"Image {image_count+1}: {img_reasoning}") |
| image_count += 1 |
| doc.close() |
|
|
| except Exception as e: |
| print(f"PDF analysis error: {e}") |
|
|
| combined = sum(scores) / len(scores) if scores else 0.5 |
| models_used = ["RoBERTa_FakeNews", "RoBERTa_AIDetector", "Groq_Llama3+Vision"] if scores else [] |
|
|
| authenticity = round((1 - combined) * 100, 2) |
| fake = round(combined * 100, 2) |
|
|
| return { |
| "type": "pdf", |
| "authenticity": authenticity, |
| "fake": fake, |
| "confidence_level": make_confidence(authenticity, fake), |
| "models_used": models_used, |
| "details": { |
| "groq_score": "see breakdown", |
| "groq_reasoning": " | ".join(reasonings) if reasonings else "No content extracted", |
| "metadata_risk": 0.0, |
| } |
| } |
|
|
|
|
| |
| |
| |
|
|
| @app.post("/analyze") |
| async def analyze( |
| file: Optional[UploadFile] = File(None), |
| text: Optional[str] = Form(None) |
| ): |
| |
| if text and not file: |
| result = await analyze_text(text) |
| result["sha256"] = hashlib.sha256(text.encode()).hexdigest() |
| return result |
|
|
| if not file: |
| return {"error": "No file or text provided"} |
|
|
| contents = await file.read() |
| sha256 = calculate_sha256(contents) |
|
|
| if file.content_type.startswith("image/"): |
| result = await analyze_image(contents, file.content_type) |
| elif file.content_type.startswith("video/"): |
| result = await analyze_video(contents) |
| elif file.content_type == "application/pdf": |
| result = await analyze_pdf(contents) |
| else: |
| return {"error": "Unsupported file type"} |
|
|
| result["sha256"] = sha256 |
| return result |
|
|
|
|
|
|