| """FORENSIQ β Metadata Agent (12 features)""" |
| import numpy as np, io |
| from PIL import Image, ImageChops, ImageEnhance |
| from PIL.ExifTags import TAGS |
| from scipy.ndimage import maximum_filter, gaussian_filter |
| from typing import Dict, Any |
| from agents.optical_agent import AgentEvidence |
|
|
| def _g(img): return np.array(img.convert("L")).astype(np.float64) |
|
|
| def d01_exif_completeness(img): |
| try: exif=img._getexif() or {} |
| except: exif={} |
| decoded={} |
| for tid,v in exif.items(): |
| t=TAGS.get(tid,str(tid)) |
| try: decoded[t]=str(v)[:200] |
| except: decoded[t]="<binary>" |
| flags,auth=[],[] |
| has_make="Make" in decoded; has_model="Model" in decoded; has_lens="LensModel" in decoded or "LensInfo" in decoded |
| has_focal="FocalLength" in decoded; has_exp="ExposureTime" in decoded; has_iso="ISOSpeedRatings" in decoded |
| has_f="FNumber" in decoded; cam=sum([has_make,has_model,has_lens,has_focal,has_exp,has_iso,has_f]) |
| if cam==0: flags.append("No camera metadata") |
| elif cam>=4: auth.append(f"Rich EXIF ({cam}/7)") |
| if not decoded: flags.append("Empty EXIF") |
| if "GPSInfo" in decoded: auth.append("GPS present") |
| if cam>=4 and not flags: s,n=-0.5,f"Rich plausible EXIF ({cam}/7 fields)" |
| elif not decoded or len(flags)>=2: s,n=0.5,"Missing/suspicious metadata" |
| elif flags: s,n=0.2,"Minor metadata concern" |
| else: s,n=-0.1,"Partial metadata" |
| return {"test":"EXIF Completeness","fields":len(decoded),"camera_fields":cam,"exif_data":decoded,"score":s,"note":n} |
|
|
| def d02_software_check(img): |
| try: exif=img._getexif() or {} |
| except: exif={} |
| decoded={TAGS.get(tid,str(tid)):str(v)[:200] for tid,v in exif.items()} |
| sw=decoded.get("Software","").lower() |
| ai=["stable diffusion","midjourney","dall-e","comfyui","automatic1111","invoke","flux","novelai","sd"] |
| edit=["photoshop","gimp","lightroom","capture one","snapseed"] |
| if any(k in sw for k in ai): s,n=0.8,f"AI software: {decoded.get('Software','')}" |
| elif any(k in sw for k in edit): s,n=0.2,f"Editing software: {decoded.get('Software','')}" |
| elif sw: s,n=-0.1,f"Software: {decoded.get('Software','')}" |
| else: s,n=0.1,"No software tag" |
| return {"test":"Software Detection","score":s,"note":n} |
|
|
| def d03_ela(img, quality=90): |
| |
| source_format = getattr(img, 'format', None) |
| is_jpeg = source_format and source_format.upper() in ("JPEG", "JPG") |
| |
| buf=io.BytesIO(); img_rgb=img.convert("RGB"); img_rgb.save(buf,"JPEG",quality=quality); buf.seek(0) |
| resaved=Image.open(buf).convert("RGB"); ela=ImageChops.difference(img_rgb,resaved) |
| ext=ela.getextrema(); mx=max(e[1] for e in ext) or 1 |
| ela_vis=ImageEnhance.Brightness(ela).enhance(255.0/mx) |
| ea=np.array(ela).astype(float); bs=32; bm=[] |
| h,w,_=ea.shape |
| for i in range(0,h-bs,bs): |
| for j in range(0,w-bs,bs): bm.append(float(np.mean(ea[i:i+bs,j:j+bs]))) |
| bm=np.array(bm); bstd=float(np.std(bm)); br=float(np.max(bm)-np.min(bm)) |
| if not is_jpeg and float(np.std(ea))<1: |
| s,n=0.0,"PNG/lossless source β ELA comparison is losslessβJPEG, not meaningful" |
| elif bstd>8 and br>30: s,n=0.6,f"High ELA variance (Ο={bstd:.1f}) β manipulation" |
| elif bstd>4: s,n=0.3,f"Moderate ELA (Ο={bstd:.1f})" |
| elif float(np.std(ea))<1 and is_jpeg: s,n=0.2,"Uniform ELA on JPEG β possible AI" |
| elif float(np.std(ea))<1: s,n=0.0,"Uniform ELA (non-JPEG source)" |
| else: s,n=-0.2,f"Consistent ELA (Ο={bstd:.1f})" |
| return {"test":"Error Level Analysis","block_std":round(bstd,3),"score":s,"note":n,"ela_image":ela_vis} |
|
|
| def d04_ai_metadata(img): |
| info=img.info or {}; traces=[] |
| markers=["stable diffusion","comfyui","automatic1111","midjourney","dall-e","novelai","parameters","prompt","negative_prompt","steps","sampler","cfg_scale","flux","sd_model"] |
| for k in info: |
| ks=str(k).lower(); vs=str(info[k])[:500].lower() |
| if any(m in ks or m in vs for m in markers): traces.append(f"{k}: {str(info[k])[:80]}") |
| xmp=str(info.get("XML:com.adobe.xmp","") or info.get("xmp","")) |
| if "generativeAI" in xmp or "ai:" in xmp.lower(): traces.append("XMP AI markers") |
| if "c2pa" in xmp.lower(): traces.append("C2PA Content Credentials") |
| if traces: s,n=0.8,f"AI traces: {'; '.join(traces[:3])}" |
| else: s,n=0.0,"No AI metadata" |
| return {"test":"AI Metadata Traces","traces":traces,"score":s,"note":n} |
|
|
| def d05_thumbnail(img): |
| try: exif=img._getexif() or {} |
| except: exif={} |
| has_thumb=513 in exif or 514 in exif |
| if has_thumb: s,n=-0.1,"Thumbnail present β camera" |
| else: s,n=0.0,"No thumbnail" |
| return {"test":"Thumbnail Check","has_thumbnail":has_thumb,"score":s,"note":n} |
|
|
| def d06_watermark(img): |
| gray=_g(img); fft=np.fft.fftshift(np.fft.fft2(gray)); mag=np.log(np.abs(fft)+1) |
| h,w=mag.shape; cy,cx=h//2,w//2; mc=mag.copy(); mc[cy-3:cy+3,cx-3:cx+3]=0 |
| lm=maximum_filter(mc,10); peaks=(mc==lm)&(mc>np.percentile(mc,99.5)) |
| np_=int(np.sum(peaks)) |
| if np_>20: s,n=0.2,f"Frequency peaks ({np_}) β watermark?" |
| else: s,n=0.0,f"No watermark ({np_} peaks)" |
| return {"test":"Watermark Detection","peaks":np_,"score":s,"note":n} |
|
|
| def d07_compression_ghost(img): |
| gray=_g(img); h,w=gray.shape; hc,wc=(h//8)*8,(w//8)*8; gray=gray[:hc,:wc] |
| bd,it=[],[] |
| for i in range(1,hc): |
| rd=np.abs(gray[i,:]-gray[i-1,:]) |
| if i%8==0: bd.extend(rd.tolist()) |
| else: it.extend(rd.tolist()) |
| bk=float(np.mean(bd))/(float(np.mean(it))+1e-9) if it else 1 |
| if bk>1.5: s,n=0.3,f"Double JPEG (blockiness={bk:.3f})" |
| elif bk>1.2: s,n=-0.1,f"JPEG blocks ({bk:.3f})" |
| elif bk<1.02: s,n=0.1,f"No blocks ({bk:.3f})" |
| else: s,n=0.0,f"Blockiness={bk:.3f}" |
| return {"test":"Compression Ghosts","blockiness":round(bk,4),"score":s,"note":n} |
|
|
| def d08_icc_profile(img): |
| icc=img.info.get("icc_profile",None) |
| if icc: |
| size=len(icc) |
| if size>100: s,n=-0.2,f"ICC profile ({size}B) β camera/editor" |
| else: s,n=0.0,f"Small ICC ({size}B)" |
| else: s,n=0.1,"No ICC profile" |
| return {"test":"ICC Color Profile","has_icc":icc is not None,"score":s,"note":n} |
|
|
| def d09_color_space(img): |
| mode=img.mode |
| try: exif=img._getexif() or {} |
| except: exif={} |
| cs=str(exif.get(40961,"")) |
| if cs=="1": s,n=-0.1,"sRGB color space β standard" |
| elif cs=="65535": s,n=-0.1,"Uncalibrated (wide gamut)" |
| elif mode=="CMYK": s,n=-0.2,"CMYK β professional source" |
| else: s,n=0.0,f"Color mode={mode}" |
| return {"test":"Color Space","mode":mode,"score":s,"note":n} |
|
|
| def d10_gps_plausibility(img): |
| try: exif=img._getexif() or {} |
| except: exif={} |
| gps=exif.get(34853) |
| if not gps: return {"test":"GPS Plausibility","score":0.0,"note":"No GPS data"} |
| try: |
| def _dms_to_dd(dms): |
| """Convert (degrees, minutes, seconds) tuple to decimal degrees.""" |
| if isinstance(dms, (list, tuple)) and len(dms) == 3: |
| d = float(dms[0]) if not hasattr(dms[0], 'numerator') else float(dms[0]) |
| m = float(dms[1]) if not hasattr(dms[1], 'numerator') else float(dms[1]) |
| s_val = float(dms[2]) if not hasattr(dms[2], 'numerator') else float(dms[2]) |
| return d + m/60.0 + s_val/3600.0 |
| return float(dms) |
| |
| lat_ref = gps.get(1, "N") |
| lon_ref = gps.get(3, "E") |
| lat_raw = gps.get(2, (0,0,0)) |
| lon_raw = gps.get(4, (0,0,0)) |
| |
| lat = _dms_to_dd(lat_raw) |
| lon = _dms_to_dd(lon_raw) |
| if lat_ref == "S": lat = -lat |
| if lon_ref == "W": lon = -lon |
| |
| |
| if not (-90 <= lat <= 90) or not (-180 <= lon <= 180): |
| s,n = 0.4, f"Impossible GPS coordinates (lat={lat:.4f}, lon={lon:.4f})" |
| elif abs(lat) < 0.001 and abs(lon) < 0.001: |
| s,n = 0.3, f"GPS at Null Island (0,0) β likely fabricated" |
| elif abs(lat) < 0.1 and abs(lon) < 0.1: |
| s,n = 0.2, f"GPS near (0,0) β suspicious (lat={lat:.4f}, lon={lon:.4f})" |
| else: |
| s,n = -0.2, f"Plausible GPS ({lat:.4f}Β°{'N' if lat>=0 else 'S'}, {lon:.4f}Β°{'E' if lon>=0 else 'W'})" |
| except Exception as e: |
| s,n = 0.0, f"GPS parse error: {str(e)[:50]}" |
| return {"test":"GPS Plausibility","score":s,"note":n} |
|
|
| def d11_maker_note(img): |
| try: exif=img._getexif() or {} |
| except: exif={} |
| mn=exif.get(37500) |
| if mn: |
| size=len(mn) if isinstance(mn,bytes) else len(str(mn)) |
| if size>100: s,n=-0.3,f"MakerNote ({size}B) β camera firmware" |
| else: s,n=-0.1,f"Small MakerNote ({size}B)" |
| else: s,n=0.1,"No MakerNote" |
| return {"test":"Maker Note","score":s,"note":n} |
|
|
| def d12_file_structure(img): |
| fmt=img.format or "unknown"; w,h=img.size |
| mp=w*h/1e6 |
| standard_mp=[0.3,0.8,1,2,3,4,5,8,10,12,16,20,24,36,45,50,61,100,108,150,200] |
| closest=min(standard_mp,key=lambda x:abs(mp-x)); diff=abs(mp-closest)/closest if closest>0 else 1 |
| if diff<0.05: s,n=-0.1,f"Standard resolution ({mp:.1f}MP β {closest}MP)" |
| elif diff>0.3: s,n=0.1,f"Non-standard resolution ({mp:.1f}MP)" |
| else: s,n=0.0,f"{mp:.1f}MP, format={fmt}" |
| return {"test":"File Structure","format":fmt,"megapixels":round(mp,2),"score":s,"note":n} |
|
|
| ALL_TESTS=[d01_exif_completeness,d02_software_check,d03_ela,d04_ai_metadata,d05_thumbnail, |
| d06_watermark,d07_compression_ghost,d08_icc_profile,d09_color_space, |
| d10_gps_plausibility,d11_maker_note,d12_file_structure] |
|
|
| def run_metadata_agent(img, modality_adjustments=None): |
| from agents.utils import run_agent_tests |
| findings_raw, avg, conf, fail, rat = run_agent_tests(ALL_TESTS, img, "Metadata Agent", modality_adjustments) |
| ela_img = None |
| for f in findings_raw: |
| if "ela_image" in f: |
| ela_img = f.pop("ela_image") |
| return AgentEvidence("Metadata Agent", np.clip(avg,-1,1), conf, fail, rat, findings_raw, ela_img) |
|
|