"""FORENSIQ — Metadata Agent (12 features)""" import numpy as np, io from PIL import Image, ImageChops, ImageEnhance from PIL.ExifTags import TAGS from scipy.ndimage import maximum_filter, gaussian_filter from typing import Dict, Any from agents.optical_agent import AgentEvidence def _g(img): return np.array(img.convert("L")).astype(np.float64) def d01_exif_completeness(img): try: exif=img._getexif() or {} except: exif={} decoded={} for tid,v in exif.items(): t=TAGS.get(tid,str(tid)) try: decoded[t]=str(v)[:200] except: decoded[t]="" flags,auth=[],[] has_make="Make" in decoded; has_model="Model" in decoded; has_lens="LensModel" in decoded or "LensInfo" in decoded has_focal="FocalLength" in decoded; has_exp="ExposureTime" in decoded; has_iso="ISOSpeedRatings" in decoded has_f="FNumber" in decoded; cam=sum([has_make,has_model,has_lens,has_focal,has_exp,has_iso,has_f]) if cam==0: flags.append("No camera metadata") elif cam>=4: auth.append(f"Rich EXIF ({cam}/7)") if not decoded: flags.append("Empty EXIF") if "GPSInfo" in decoded: auth.append("GPS present") if cam>=4 and not flags: s,n=-0.5,f"Rich plausible EXIF ({cam}/7 fields)" elif not decoded or len(flags)>=2: s,n=0.5,"Missing/suspicious metadata" elif flags: s,n=0.2,"Minor metadata concern" else: s,n=-0.1,"Partial metadata" return {"test":"EXIF Completeness","fields":len(decoded),"camera_fields":cam,"exif_data":decoded,"score":s,"note":n} def d02_software_check(img): try: exif=img._getexif() or {} except: exif={} decoded={TAGS.get(tid,str(tid)):str(v)[:200] for tid,v in exif.items()} sw=decoded.get("Software","").lower() ai=["stable diffusion","midjourney","dall-e","comfyui","automatic1111","invoke","flux","novelai","sd"] edit=["photoshop","gimp","lightroom","capture one","snapseed"] if any(k in sw for k in ai): s,n=0.8,f"AI software: {decoded.get('Software','')}" elif any(k in sw for k in edit): s,n=0.2,f"Editing software: {decoded.get('Software','')}" elif sw: s,n=-0.1,f"Software: {decoded.get('Software','')}" else: s,n=0.1,"No software tag" return {"test":"Software Detection","score":s,"note":n} def d03_ela(img, quality=90): # P4: Detect source format — ELA is only meaningful for JPEG inputs source_format = getattr(img, 'format', None) is_jpeg = source_format and source_format.upper() in ("JPEG", "JPG") buf=io.BytesIO(); img_rgb=img.convert("RGB"); img_rgb.save(buf,"JPEG",quality=quality); buf.seek(0) resaved=Image.open(buf).convert("RGB"); ela=ImageChops.difference(img_rgb,resaved) ext=ela.getextrema(); mx=max(e[1] for e in ext) or 1 ela_vis=ImageEnhance.Brightness(ela).enhance(255.0/mx) ea=np.array(ela).astype(float); bs=32; bm=[] h,w,_=ea.shape for i in range(0,h-bs,bs): for j in range(0,w-bs,bs): bm.append(float(np.mean(ea[i:i+bs,j:j+bs]))) bm=np.array(bm); bstd=float(np.std(bm)); br=float(np.max(bm)-np.min(bm)) if not is_jpeg and float(np.std(ea))<1: s,n=0.0,"PNG/lossless source — ELA comparison is lossless→JPEG, not meaningful" elif bstd>8 and br>30: s,n=0.6,f"High ELA variance (σ={bstd:.1f}) — manipulation" elif bstd>4: s,n=0.3,f"Moderate ELA (σ={bstd:.1f})" elif float(np.std(ea))<1 and is_jpeg: s,n=0.2,"Uniform ELA on JPEG — possible AI" elif float(np.std(ea))<1: s,n=0.0,"Uniform ELA (non-JPEG source)" else: s,n=-0.2,f"Consistent ELA (σ={bstd:.1f})" return {"test":"Error Level Analysis","block_std":round(bstd,3),"score":s,"note":n,"ela_image":ela_vis} def d04_ai_metadata(img): info=img.info or {}; traces=[] markers=["stable diffusion","comfyui","automatic1111","midjourney","dall-e","novelai","parameters","prompt","negative_prompt","steps","sampler","cfg_scale","flux","sd_model"] for k in info: ks=str(k).lower(); vs=str(info[k])[:500].lower() if any(m in ks or m in vs for m in markers): traces.append(f"{k}: {str(info[k])[:80]}") xmp=str(info.get("XML:com.adobe.xmp","") or info.get("xmp","")) if "generativeAI" in xmp or "ai:" in xmp.lower(): traces.append("XMP AI markers") if "c2pa" in xmp.lower(): traces.append("C2PA Content Credentials") if traces: s,n=0.8,f"AI traces: {'; '.join(traces[:3])}" else: s,n=0.0,"No AI metadata" return {"test":"AI Metadata Traces","traces":traces,"score":s,"note":n} def d05_thumbnail(img): try: exif=img._getexif() or {} except: exif={} has_thumb=513 in exif or 514 in exif if has_thumb: s,n=-0.1,"Thumbnail present — camera" else: s,n=0.0,"No thumbnail" return {"test":"Thumbnail Check","has_thumbnail":has_thumb,"score":s,"note":n} def d06_watermark(img): gray=_g(img); fft=np.fft.fftshift(np.fft.fft2(gray)); mag=np.log(np.abs(fft)+1) h,w=mag.shape; cy,cx=h//2,w//2; mc=mag.copy(); mc[cy-3:cy+3,cx-3:cx+3]=0 lm=maximum_filter(mc,10); peaks=(mc==lm)&(mc>np.percentile(mc,99.5)) np_=int(np.sum(peaks)) if np_>20: s,n=0.2,f"Frequency peaks ({np_}) — watermark?" else: s,n=0.0,f"No watermark ({np_} peaks)" return {"test":"Watermark Detection","peaks":np_,"score":s,"note":n} def d07_compression_ghost(img): gray=_g(img); h,w=gray.shape; hc,wc=(h//8)*8,(w//8)*8; gray=gray[:hc,:wc] bd,it=[],[] for i in range(1,hc): rd=np.abs(gray[i,:]-gray[i-1,:]) if i%8==0: bd.extend(rd.tolist()) else: it.extend(rd.tolist()) bk=float(np.mean(bd))/(float(np.mean(it))+1e-9) if it else 1 if bk>1.5: s,n=0.3,f"Double JPEG (blockiness={bk:.3f})" elif bk>1.2: s,n=-0.1,f"JPEG blocks ({bk:.3f})" elif bk<1.02: s,n=0.1,f"No blocks ({bk:.3f})" else: s,n=0.0,f"Blockiness={bk:.3f}" return {"test":"Compression Ghosts","blockiness":round(bk,4),"score":s,"note":n} def d08_icc_profile(img): icc=img.info.get("icc_profile",None) if icc: size=len(icc) if size>100: s,n=-0.2,f"ICC profile ({size}B) — camera/editor" else: s,n=0.0,f"Small ICC ({size}B)" else: s,n=0.1,"No ICC profile" return {"test":"ICC Color Profile","has_icc":icc is not None,"score":s,"note":n} def d09_color_space(img): mode=img.mode try: exif=img._getexif() or {} except: exif={} cs=str(exif.get(40961,"")) # ColorSpace tag if cs=="1": s,n=-0.1,"sRGB color space — standard" elif cs=="65535": s,n=-0.1,"Uncalibrated (wide gamut)" elif mode=="CMYK": s,n=-0.2,"CMYK — professional source" else: s,n=0.0,f"Color mode={mode}" return {"test":"Color Space","mode":mode,"score":s,"note":n} def d10_gps_plausibility(img): try: exif=img._getexif() or {} except: exif={} gps=exif.get(34853) if not gps: return {"test":"GPS Plausibility","score":0.0,"note":"No GPS data"} try: def _dms_to_dd(dms): """Convert (degrees, minutes, seconds) tuple to decimal degrees.""" if isinstance(dms, (list, tuple)) and len(dms) == 3: d = float(dms[0]) if not hasattr(dms[0], 'numerator') else float(dms[0]) m = float(dms[1]) if not hasattr(dms[1], 'numerator') else float(dms[1]) s_val = float(dms[2]) if not hasattr(dms[2], 'numerator') else float(dms[2]) return d + m/60.0 + s_val/3600.0 return float(dms) lat_ref = gps.get(1, "N") lon_ref = gps.get(3, "E") lat_raw = gps.get(2, (0,0,0)) lon_raw = gps.get(4, (0,0,0)) lat = _dms_to_dd(lat_raw) lon = _dms_to_dd(lon_raw) if lat_ref == "S": lat = -lat if lon_ref == "W": lon = -lon # Bounds check if not (-90 <= lat <= 90) or not (-180 <= lon <= 180): s,n = 0.4, f"Impossible GPS coordinates (lat={lat:.4f}, lon={lon:.4f})" elif abs(lat) < 0.001 and abs(lon) < 0.001: s,n = 0.3, f"GPS at Null Island (0,0) — likely fabricated" elif abs(lat) < 0.1 and abs(lon) < 0.1: s,n = 0.2, f"GPS near (0,0) — suspicious (lat={lat:.4f}, lon={lon:.4f})" else: s,n = -0.2, f"Plausible GPS ({lat:.4f}°{'N' if lat>=0 else 'S'}, {lon:.4f}°{'E' if lon>=0 else 'W'})" except Exception as e: s,n = 0.0, f"GPS parse error: {str(e)[:50]}" return {"test":"GPS Plausibility","score":s,"note":n} def d11_maker_note(img): try: exif=img._getexif() or {} except: exif={} mn=exif.get(37500) # MakerNote tag if mn: size=len(mn) if isinstance(mn,bytes) else len(str(mn)) if size>100: s,n=-0.3,f"MakerNote ({size}B) — camera firmware" else: s,n=-0.1,f"Small MakerNote ({size}B)" else: s,n=0.1,"No MakerNote" return {"test":"Maker Note","score":s,"note":n} def d12_file_structure(img): fmt=img.format or "unknown"; w,h=img.size mp=w*h/1e6 standard_mp=[0.3,0.8,1,2,3,4,5,8,10,12,16,20,24,36,45,50,61,100,108,150,200] closest=min(standard_mp,key=lambda x:abs(mp-x)); diff=abs(mp-closest)/closest if closest>0 else 1 if diff<0.05: s,n=-0.1,f"Standard resolution ({mp:.1f}MP ≈ {closest}MP)" elif diff>0.3: s,n=0.1,f"Non-standard resolution ({mp:.1f}MP)" else: s,n=0.0,f"{mp:.1f}MP, format={fmt}" return {"test":"File Structure","format":fmt,"megapixels":round(mp,2),"score":s,"note":n} ALL_TESTS=[d01_exif_completeness,d02_software_check,d03_ela,d04_ai_metadata,d05_thumbnail, d06_watermark,d07_compression_ghost,d08_icc_profile,d09_color_space, d10_gps_plausibility,d11_maker_note,d12_file_structure] def run_metadata_agent(img, modality_adjustments=None): from agents.utils import run_agent_tests findings_raw, avg, conf, fail, rat = run_agent_tests(ALL_TESTS, img, "Metadata Agent", modality_adjustments) ela_img = None for f in findings_raw: if "ela_image" in f: ela_img = f.pop("ela_image") return AgentEvidence("Metadata Agent", np.clip(avg,-1,1), conf, fail, rat, findings_raw, ela_img)