File size: 9,984 Bytes
c46e5d1
 
27f7870
c46e5d1
 
 
27f7870
 
c46e5d1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7b95f04
 
 
 
c46e5d1
 
 
 
 
 
 
 
 
7b95f04
 
 
c46e5d1
7b95f04
 
c46e5d1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27f7870
7b95f04
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c46e5d1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f781fd5
7b95f04
f781fd5
7b95f04
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
"""FORENSIQ — Metadata Agent (12 features)"""
import numpy as np, io
from PIL import Image, ImageChops, ImageEnhance
from PIL.ExifTags import TAGS
from scipy.ndimage import maximum_filter, gaussian_filter
from typing import Dict, Any
from agents.optical_agent import AgentEvidence

def _g(img): return np.array(img.convert("L")).astype(np.float64)

def d01_exif_completeness(img):
    try: exif=img._getexif() or {}
    except: exif={}
    decoded={}
    for tid,v in exif.items():
        t=TAGS.get(tid,str(tid))
        try: decoded[t]=str(v)[:200]
        except: decoded[t]="<binary>"
    flags,auth=[],[]
    has_make="Make" in decoded; has_model="Model" in decoded; has_lens="LensModel" in decoded or "LensInfo" in decoded
    has_focal="FocalLength" in decoded; has_exp="ExposureTime" in decoded; has_iso="ISOSpeedRatings" in decoded
    has_f="FNumber" in decoded; cam=sum([has_make,has_model,has_lens,has_focal,has_exp,has_iso,has_f])
    if cam==0: flags.append("No camera metadata")
    elif cam>=4: auth.append(f"Rich EXIF ({cam}/7)")
    if not decoded: flags.append("Empty EXIF")
    if "GPSInfo" in decoded: auth.append("GPS present")
    if cam>=4 and not flags: s,n=-0.5,f"Rich plausible EXIF ({cam}/7 fields)"
    elif not decoded or len(flags)>=2: s,n=0.5,"Missing/suspicious metadata"
    elif flags: s,n=0.2,"Minor metadata concern"
    else: s,n=-0.1,"Partial metadata"
    return {"test":"EXIF Completeness","fields":len(decoded),"camera_fields":cam,"exif_data":decoded,"score":s,"note":n}

def d02_software_check(img):
    try: exif=img._getexif() or {}
    except: exif={}
    decoded={TAGS.get(tid,str(tid)):str(v)[:200] for tid,v in exif.items()}
    sw=decoded.get("Software","").lower()
    ai=["stable diffusion","midjourney","dall-e","comfyui","automatic1111","invoke","flux","novelai","sd"]
    edit=["photoshop","gimp","lightroom","capture one","snapseed"]
    if any(k in sw for k in ai): s,n=0.8,f"AI software: {decoded.get('Software','')}"
    elif any(k in sw for k in edit): s,n=0.2,f"Editing software: {decoded.get('Software','')}"
    elif sw: s,n=-0.1,f"Software: {decoded.get('Software','')}"
    else: s,n=0.1,"No software tag"
    return {"test":"Software Detection","score":s,"note":n}

def d03_ela(img, quality=90):
    # P4: Detect source format — ELA is only meaningful for JPEG inputs
    source_format = getattr(img, 'format', None)
    is_jpeg = source_format and source_format.upper() in ("JPEG", "JPG")
    
    buf=io.BytesIO(); img_rgb=img.convert("RGB"); img_rgb.save(buf,"JPEG",quality=quality); buf.seek(0)
    resaved=Image.open(buf).convert("RGB"); ela=ImageChops.difference(img_rgb,resaved)
    ext=ela.getextrema(); mx=max(e[1] for e in ext) or 1
    ela_vis=ImageEnhance.Brightness(ela).enhance(255.0/mx)
    ea=np.array(ela).astype(float); bs=32; bm=[]
    h,w,_=ea.shape
    for i in range(0,h-bs,bs):
        for j in range(0,w-bs,bs): bm.append(float(np.mean(ea[i:i+bs,j:j+bs])))
    bm=np.array(bm); bstd=float(np.std(bm)); br=float(np.max(bm)-np.min(bm))
    if not is_jpeg and float(np.std(ea))<1:
        s,n=0.0,"PNG/lossless source — ELA comparison is lossless→JPEG, not meaningful"
    elif bstd>8 and br>30: s,n=0.6,f"High ELA variance (σ={bstd:.1f}) — manipulation"
    elif bstd>4: s,n=0.3,f"Moderate ELA (σ={bstd:.1f})"
    elif float(np.std(ea))<1 and is_jpeg: s,n=0.2,"Uniform ELA on JPEG — possible AI"
    elif float(np.std(ea))<1: s,n=0.0,"Uniform ELA (non-JPEG source)"
    else: s,n=-0.2,f"Consistent ELA (σ={bstd:.1f})"
    return {"test":"Error Level Analysis","block_std":round(bstd,3),"score":s,"note":n,"ela_image":ela_vis}

def d04_ai_metadata(img):
    info=img.info or {}; traces=[]
    markers=["stable diffusion","comfyui","automatic1111","midjourney","dall-e","novelai","parameters","prompt","negative_prompt","steps","sampler","cfg_scale","flux","sd_model"]
    for k in info:
        ks=str(k).lower(); vs=str(info[k])[:500].lower()
        if any(m in ks or m in vs for m in markers): traces.append(f"{k}: {str(info[k])[:80]}")
    xmp=str(info.get("XML:com.adobe.xmp","") or info.get("xmp",""))
    if "generativeAI" in xmp or "ai:" in xmp.lower(): traces.append("XMP AI markers")
    if "c2pa" in xmp.lower(): traces.append("C2PA Content Credentials")
    if traces: s,n=0.8,f"AI traces: {'; '.join(traces[:3])}"
    else: s,n=0.0,"No AI metadata"
    return {"test":"AI Metadata Traces","traces":traces,"score":s,"note":n}

def d05_thumbnail(img):
    try: exif=img._getexif() or {}
    except: exif={}
    has_thumb=513 in exif or 514 in exif
    if has_thumb: s,n=-0.1,"Thumbnail present — camera"
    else: s,n=0.0,"No thumbnail"
    return {"test":"Thumbnail Check","has_thumbnail":has_thumb,"score":s,"note":n}

def d06_watermark(img):
    gray=_g(img); fft=np.fft.fftshift(np.fft.fft2(gray)); mag=np.log(np.abs(fft)+1)
    h,w=mag.shape; cy,cx=h//2,w//2; mc=mag.copy(); mc[cy-3:cy+3,cx-3:cx+3]=0
    lm=maximum_filter(mc,10); peaks=(mc==lm)&(mc>np.percentile(mc,99.5))
    np_=int(np.sum(peaks))
    if np_>20: s,n=0.2,f"Frequency peaks ({np_}) — watermark?"
    else: s,n=0.0,f"No watermark ({np_} peaks)"
    return {"test":"Watermark Detection","peaks":np_,"score":s,"note":n}

def d07_compression_ghost(img):
    gray=_g(img); h,w=gray.shape; hc,wc=(h//8)*8,(w//8)*8; gray=gray[:hc,:wc]
    bd,it=[],[]
    for i in range(1,hc):
        rd=np.abs(gray[i,:]-gray[i-1,:])
        if i%8==0: bd.extend(rd.tolist())
        else: it.extend(rd.tolist())
    bk=float(np.mean(bd))/(float(np.mean(it))+1e-9) if it else 1
    if bk>1.5: s,n=0.3,f"Double JPEG (blockiness={bk:.3f})"
    elif bk>1.2: s,n=-0.1,f"JPEG blocks ({bk:.3f})"
    elif bk<1.02: s,n=0.1,f"No blocks ({bk:.3f})"
    else: s,n=0.0,f"Blockiness={bk:.3f}"
    return {"test":"Compression Ghosts","blockiness":round(bk,4),"score":s,"note":n}

def d08_icc_profile(img):
    icc=img.info.get("icc_profile",None)
    if icc:
        size=len(icc)
        if size>100: s,n=-0.2,f"ICC profile ({size}B) — camera/editor"
        else: s,n=0.0,f"Small ICC ({size}B)"
    else: s,n=0.1,"No ICC profile"
    return {"test":"ICC Color Profile","has_icc":icc is not None,"score":s,"note":n}

def d09_color_space(img):
    mode=img.mode
    try: exif=img._getexif() or {}
    except: exif={}
    cs=str(exif.get(40961,""))  # ColorSpace tag
    if cs=="1": s,n=-0.1,"sRGB color space — standard"
    elif cs=="65535": s,n=-0.1,"Uncalibrated (wide gamut)"
    elif mode=="CMYK": s,n=-0.2,"CMYK — professional source"
    else: s,n=0.0,f"Color mode={mode}"
    return {"test":"Color Space","mode":mode,"score":s,"note":n}

def d10_gps_plausibility(img):
    try: exif=img._getexif() or {}
    except: exif={}
    gps=exif.get(34853)
    if not gps: return {"test":"GPS Plausibility","score":0.0,"note":"No GPS data"}
    try:
        def _dms_to_dd(dms):
            """Convert (degrees, minutes, seconds) tuple to decimal degrees."""
            if isinstance(dms, (list, tuple)) and len(dms) == 3:
                d = float(dms[0]) if not hasattr(dms[0], 'numerator') else float(dms[0])
                m = float(dms[1]) if not hasattr(dms[1], 'numerator') else float(dms[1])
                s_val = float(dms[2]) if not hasattr(dms[2], 'numerator') else float(dms[2])
                return d + m/60.0 + s_val/3600.0
            return float(dms)
        
        lat_ref = gps.get(1, "N")
        lon_ref = gps.get(3, "E")
        lat_raw = gps.get(2, (0,0,0))
        lon_raw = gps.get(4, (0,0,0))
        
        lat = _dms_to_dd(lat_raw)
        lon = _dms_to_dd(lon_raw)
        if lat_ref == "S": lat = -lat
        if lon_ref == "W": lon = -lon
        
        # Bounds check
        if not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
            s,n = 0.4, f"Impossible GPS coordinates (lat={lat:.4f}, lon={lon:.4f})"
        elif abs(lat) < 0.001 and abs(lon) < 0.001:
            s,n = 0.3, f"GPS at Null Island (0,0) — likely fabricated"
        elif abs(lat) < 0.1 and abs(lon) < 0.1:
            s,n = 0.2, f"GPS near (0,0) — suspicious (lat={lat:.4f}, lon={lon:.4f})"
        else:
            s,n = -0.2, f"Plausible GPS ({lat:.4f}°{'N' if lat>=0 else 'S'}, {lon:.4f}°{'E' if lon>=0 else 'W'})"
    except Exception as e:
        s,n = 0.0, f"GPS parse error: {str(e)[:50]}"
    return {"test":"GPS Plausibility","score":s,"note":n}

def d11_maker_note(img):
    try: exif=img._getexif() or {}
    except: exif={}
    mn=exif.get(37500)  # MakerNote tag
    if mn:
        size=len(mn) if isinstance(mn,bytes) else len(str(mn))
        if size>100: s,n=-0.3,f"MakerNote ({size}B) — camera firmware"
        else: s,n=-0.1,f"Small MakerNote ({size}B)"
    else: s,n=0.1,"No MakerNote"
    return {"test":"Maker Note","score":s,"note":n}

def d12_file_structure(img):
    fmt=img.format or "unknown"; w,h=img.size
    mp=w*h/1e6
    standard_mp=[0.3,0.8,1,2,3,4,5,8,10,12,16,20,24,36,45,50,61,100,108,150,200]
    closest=min(standard_mp,key=lambda x:abs(mp-x)); diff=abs(mp-closest)/closest if closest>0 else 1
    if diff<0.05: s,n=-0.1,f"Standard resolution ({mp:.1f}MP ≈ {closest}MP)"
    elif diff>0.3: s,n=0.1,f"Non-standard resolution ({mp:.1f}MP)"
    else: s,n=0.0,f"{mp:.1f}MP, format={fmt}"
    return {"test":"File Structure","format":fmt,"megapixels":round(mp,2),"score":s,"note":n}

ALL_TESTS=[d01_exif_completeness,d02_software_check,d03_ela,d04_ai_metadata,d05_thumbnail,
           d06_watermark,d07_compression_ghost,d08_icc_profile,d09_color_space,
           d10_gps_plausibility,d11_maker_note,d12_file_structure]

def run_metadata_agent(img, modality_adjustments=None):
    from agents.utils import run_agent_tests
    findings_raw, avg, conf, fail, rat = run_agent_tests(ALL_TESTS, img, "Metadata Agent", modality_adjustments)
    ela_img = None
    for f in findings_raw:
        if "ela_image" in f:
            ela_img = f.pop("ela_image")
    return AgentEvidence("Metadata Agent", np.clip(avg,-1,1), conf, fail, rat, findings_raw, ela_img)