FORENSIQ / agents /metadata_agent.py
anky2002's picture
Upload agents/metadata_agent.py with huggingface_hub
f781fd5 verified
"""FORENSIQ β€” Metadata Agent (12 features)"""
import numpy as np, io
from PIL import Image, ImageChops, ImageEnhance
from PIL.ExifTags import TAGS
from scipy.ndimage import maximum_filter, gaussian_filter
from typing import Dict, Any
from agents.optical_agent import AgentEvidence
def _g(img): return np.array(img.convert("L")).astype(np.float64)
def d01_exif_completeness(img):
try: exif=img._getexif() or {}
except: exif={}
decoded={}
for tid,v in exif.items():
t=TAGS.get(tid,str(tid))
try: decoded[t]=str(v)[:200]
except: decoded[t]="<binary>"
flags,auth=[],[]
has_make="Make" in decoded; has_model="Model" in decoded; has_lens="LensModel" in decoded or "LensInfo" in decoded
has_focal="FocalLength" in decoded; has_exp="ExposureTime" in decoded; has_iso="ISOSpeedRatings" in decoded
has_f="FNumber" in decoded; cam=sum([has_make,has_model,has_lens,has_focal,has_exp,has_iso,has_f])
if cam==0: flags.append("No camera metadata")
elif cam>=4: auth.append(f"Rich EXIF ({cam}/7)")
if not decoded: flags.append("Empty EXIF")
if "GPSInfo" in decoded: auth.append("GPS present")
if cam>=4 and not flags: s,n=-0.5,f"Rich plausible EXIF ({cam}/7 fields)"
elif not decoded or len(flags)>=2: s,n=0.5,"Missing/suspicious metadata"
elif flags: s,n=0.2,"Minor metadata concern"
else: s,n=-0.1,"Partial metadata"
return {"test":"EXIF Completeness","fields":len(decoded),"camera_fields":cam,"exif_data":decoded,"score":s,"note":n}
def d02_software_check(img):
try: exif=img._getexif() or {}
except: exif={}
decoded={TAGS.get(tid,str(tid)):str(v)[:200] for tid,v in exif.items()}
sw=decoded.get("Software","").lower()
ai=["stable diffusion","midjourney","dall-e","comfyui","automatic1111","invoke","flux","novelai","sd"]
edit=["photoshop","gimp","lightroom","capture one","snapseed"]
if any(k in sw for k in ai): s,n=0.8,f"AI software: {decoded.get('Software','')}"
elif any(k in sw for k in edit): s,n=0.2,f"Editing software: {decoded.get('Software','')}"
elif sw: s,n=-0.1,f"Software: {decoded.get('Software','')}"
else: s,n=0.1,"No software tag"
return {"test":"Software Detection","score":s,"note":n}
def d03_ela(img, quality=90):
# P4: Detect source format β€” ELA is only meaningful for JPEG inputs
source_format = getattr(img, 'format', None)
is_jpeg = source_format and source_format.upper() in ("JPEG", "JPG")
buf=io.BytesIO(); img_rgb=img.convert("RGB"); img_rgb.save(buf,"JPEG",quality=quality); buf.seek(0)
resaved=Image.open(buf).convert("RGB"); ela=ImageChops.difference(img_rgb,resaved)
ext=ela.getextrema(); mx=max(e[1] for e in ext) or 1
ela_vis=ImageEnhance.Brightness(ela).enhance(255.0/mx)
ea=np.array(ela).astype(float); bs=32; bm=[]
h,w,_=ea.shape
for i in range(0,h-bs,bs):
for j in range(0,w-bs,bs): bm.append(float(np.mean(ea[i:i+bs,j:j+bs])))
bm=np.array(bm); bstd=float(np.std(bm)); br=float(np.max(bm)-np.min(bm))
if not is_jpeg and float(np.std(ea))<1:
s,n=0.0,"PNG/lossless source — ELA comparison is lossless→JPEG, not meaningful"
elif bstd>8 and br>30: s,n=0.6,f"High ELA variance (Οƒ={bstd:.1f}) β€” manipulation"
elif bstd>4: s,n=0.3,f"Moderate ELA (Οƒ={bstd:.1f})"
elif float(np.std(ea))<1 and is_jpeg: s,n=0.2,"Uniform ELA on JPEG β€” possible AI"
elif float(np.std(ea))<1: s,n=0.0,"Uniform ELA (non-JPEG source)"
else: s,n=-0.2,f"Consistent ELA (Οƒ={bstd:.1f})"
return {"test":"Error Level Analysis","block_std":round(bstd,3),"score":s,"note":n,"ela_image":ela_vis}
def d04_ai_metadata(img):
info=img.info or {}; traces=[]
markers=["stable diffusion","comfyui","automatic1111","midjourney","dall-e","novelai","parameters","prompt","negative_prompt","steps","sampler","cfg_scale","flux","sd_model"]
for k in info:
ks=str(k).lower(); vs=str(info[k])[:500].lower()
if any(m in ks or m in vs for m in markers): traces.append(f"{k}: {str(info[k])[:80]}")
xmp=str(info.get("XML:com.adobe.xmp","") or info.get("xmp",""))
if "generativeAI" in xmp or "ai:" in xmp.lower(): traces.append("XMP AI markers")
if "c2pa" in xmp.lower(): traces.append("C2PA Content Credentials")
if traces: s,n=0.8,f"AI traces: {'; '.join(traces[:3])}"
else: s,n=0.0,"No AI metadata"
return {"test":"AI Metadata Traces","traces":traces,"score":s,"note":n}
def d05_thumbnail(img):
try: exif=img._getexif() or {}
except: exif={}
has_thumb=513 in exif or 514 in exif
if has_thumb: s,n=-0.1,"Thumbnail present β€” camera"
else: s,n=0.0,"No thumbnail"
return {"test":"Thumbnail Check","has_thumbnail":has_thumb,"score":s,"note":n}
def d06_watermark(img):
gray=_g(img); fft=np.fft.fftshift(np.fft.fft2(gray)); mag=np.log(np.abs(fft)+1)
h,w=mag.shape; cy,cx=h//2,w//2; mc=mag.copy(); mc[cy-3:cy+3,cx-3:cx+3]=0
lm=maximum_filter(mc,10); peaks=(mc==lm)&(mc>np.percentile(mc,99.5))
np_=int(np.sum(peaks))
if np_>20: s,n=0.2,f"Frequency peaks ({np_}) β€” watermark?"
else: s,n=0.0,f"No watermark ({np_} peaks)"
return {"test":"Watermark Detection","peaks":np_,"score":s,"note":n}
def d07_compression_ghost(img):
gray=_g(img); h,w=gray.shape; hc,wc=(h//8)*8,(w//8)*8; gray=gray[:hc,:wc]
bd,it=[],[]
for i in range(1,hc):
rd=np.abs(gray[i,:]-gray[i-1,:])
if i%8==0: bd.extend(rd.tolist())
else: it.extend(rd.tolist())
bk=float(np.mean(bd))/(float(np.mean(it))+1e-9) if it else 1
if bk>1.5: s,n=0.3,f"Double JPEG (blockiness={bk:.3f})"
elif bk>1.2: s,n=-0.1,f"JPEG blocks ({bk:.3f})"
elif bk<1.02: s,n=0.1,f"No blocks ({bk:.3f})"
else: s,n=0.0,f"Blockiness={bk:.3f}"
return {"test":"Compression Ghosts","blockiness":round(bk,4),"score":s,"note":n}
def d08_icc_profile(img):
icc=img.info.get("icc_profile",None)
if icc:
size=len(icc)
if size>100: s,n=-0.2,f"ICC profile ({size}B) β€” camera/editor"
else: s,n=0.0,f"Small ICC ({size}B)"
else: s,n=0.1,"No ICC profile"
return {"test":"ICC Color Profile","has_icc":icc is not None,"score":s,"note":n}
def d09_color_space(img):
mode=img.mode
try: exif=img._getexif() or {}
except: exif={}
cs=str(exif.get(40961,"")) # ColorSpace tag
if cs=="1": s,n=-0.1,"sRGB color space β€” standard"
elif cs=="65535": s,n=-0.1,"Uncalibrated (wide gamut)"
elif mode=="CMYK": s,n=-0.2,"CMYK β€” professional source"
else: s,n=0.0,f"Color mode={mode}"
return {"test":"Color Space","mode":mode,"score":s,"note":n}
def d10_gps_plausibility(img):
try: exif=img._getexif() or {}
except: exif={}
gps=exif.get(34853)
if not gps: return {"test":"GPS Plausibility","score":0.0,"note":"No GPS data"}
try:
def _dms_to_dd(dms):
"""Convert (degrees, minutes, seconds) tuple to decimal degrees."""
if isinstance(dms, (list, tuple)) and len(dms) == 3:
d = float(dms[0]) if not hasattr(dms[0], 'numerator') else float(dms[0])
m = float(dms[1]) if not hasattr(dms[1], 'numerator') else float(dms[1])
s_val = float(dms[2]) if not hasattr(dms[2], 'numerator') else float(dms[2])
return d + m/60.0 + s_val/3600.0
return float(dms)
lat_ref = gps.get(1, "N")
lon_ref = gps.get(3, "E")
lat_raw = gps.get(2, (0,0,0))
lon_raw = gps.get(4, (0,0,0))
lat = _dms_to_dd(lat_raw)
lon = _dms_to_dd(lon_raw)
if lat_ref == "S": lat = -lat
if lon_ref == "W": lon = -lon
# Bounds check
if not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
s,n = 0.4, f"Impossible GPS coordinates (lat={lat:.4f}, lon={lon:.4f})"
elif abs(lat) < 0.001 and abs(lon) < 0.001:
s,n = 0.3, f"GPS at Null Island (0,0) β€” likely fabricated"
elif abs(lat) < 0.1 and abs(lon) < 0.1:
s,n = 0.2, f"GPS near (0,0) β€” suspicious (lat={lat:.4f}, lon={lon:.4f})"
else:
s,n = -0.2, f"Plausible GPS ({lat:.4f}Β°{'N' if lat>=0 else 'S'}, {lon:.4f}Β°{'E' if lon>=0 else 'W'})"
except Exception as e:
s,n = 0.0, f"GPS parse error: {str(e)[:50]}"
return {"test":"GPS Plausibility","score":s,"note":n}
def d11_maker_note(img):
try: exif=img._getexif() or {}
except: exif={}
mn=exif.get(37500) # MakerNote tag
if mn:
size=len(mn) if isinstance(mn,bytes) else len(str(mn))
if size>100: s,n=-0.3,f"MakerNote ({size}B) β€” camera firmware"
else: s,n=-0.1,f"Small MakerNote ({size}B)"
else: s,n=0.1,"No MakerNote"
return {"test":"Maker Note","score":s,"note":n}
def d12_file_structure(img):
fmt=img.format or "unknown"; w,h=img.size
mp=w*h/1e6
standard_mp=[0.3,0.8,1,2,3,4,5,8,10,12,16,20,24,36,45,50,61,100,108,150,200]
closest=min(standard_mp,key=lambda x:abs(mp-x)); diff=abs(mp-closest)/closest if closest>0 else 1
if diff<0.05: s,n=-0.1,f"Standard resolution ({mp:.1f}MP β‰ˆ {closest}MP)"
elif diff>0.3: s,n=0.1,f"Non-standard resolution ({mp:.1f}MP)"
else: s,n=0.0,f"{mp:.1f}MP, format={fmt}"
return {"test":"File Structure","format":fmt,"megapixels":round(mp,2),"score":s,"note":n}
ALL_TESTS=[d01_exif_completeness,d02_software_check,d03_ela,d04_ai_metadata,d05_thumbnail,
d06_watermark,d07_compression_ghost,d08_icc_profile,d09_color_space,
d10_gps_plausibility,d11_maker_note,d12_file_structure]
def run_metadata_agent(img, modality_adjustments=None):
from agents.utils import run_agent_tests
findings_raw, avg, conf, fail, rat = run_agent_tests(ALL_TESTS, img, "Metadata Agent", modality_adjustments)
ela_img = None
for f in findings_raw:
if "ela_image" in f:
ela_img = f.pop("ela_image")
return AgentEvidence("Metadata Agent", np.clip(avg,-1,1), conf, fail, rat, findings_raw, ela_img)