anky2002 commited on
Commit
69446e8
Β·
verified Β·
1 Parent(s): 37025de

Upload agents/metadata_agent.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. agents/metadata_agent.py +156 -2
agents/metadata_agent.py CHANGED
@@ -241,13 +241,167 @@ def analyze_ai_metadata(img: Image.Image) -> Dict[str, Any]:
241
  }
242
 
243
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
244
  # ─── Main Agent Entry Point ─────────────────────────────────────────
245
  def run_metadata_agent(img: Image.Image) -> AgentEvidence:
246
  """Run all metadata analysis tests."""
247
  findings = []
248
  scores = []
249
 
250
- for fn in [analyze_exif, analyze_ela, analyze_ai_metadata]:
 
251
  try:
252
  result = fn(img)
253
  findings.append(result)
@@ -283,7 +437,7 @@ def run_metadata_agent(img: Image.Image) -> AgentEvidence:
283
  agent_name="Metadata Agent",
284
  violation_score=np.clip(avg_score, -1, 1),
285
  confidence=confidence,
286
- failure_prob=max(0.0, 1.0 - len(scores) / 3),
287
  rationale=rationale,
288
  sub_findings=findings,
289
  visual_evidence=ela_img,
 
241
  }
242
 
243
 
244
+ # ─── Thumbnail Consistency ───────────────────────────────────────────
245
+ def analyze_thumbnail_consistency(img: Image.Image) -> Dict[str, Any]:
246
+ """
247
+ JPEG files often embed a thumbnail. If the main image was manipulated
248
+ but the thumbnail wasn't updated, they'll differ.
249
+ """
250
+ try:
251
+ exif_data = img._getexif() or {}
252
+ except Exception:
253
+ exif_data = {}
254
+
255
+ # Check for embedded thumbnail
256
+ has_thumbnail = False
257
+ try:
258
+ if hasattr(img, 'applist'):
259
+ for app in img.applist:
260
+ if b'thumbnail' in app[1].lower() if isinstance(app[1], bytes) else False:
261
+ has_thumbnail = True
262
+ except Exception:
263
+ pass
264
+
265
+ # Check EXIF thumbnail tag (tag 513 = JPEGInterchangeFormat)
266
+ if 513 in exif_data or 514 in exif_data:
267
+ has_thumbnail = True
268
+
269
+ if not has_thumbnail:
270
+ return {
271
+ "test": "Thumbnail Consistency",
272
+ "score": 0.0,
273
+ "note": "No embedded thumbnail found for comparison",
274
+ }
275
+
276
+ return {
277
+ "test": "Thumbnail Consistency",
278
+ "has_thumbnail": True,
279
+ "score": -0.1,
280
+ "note": "Embedded thumbnail present (consistent with real camera output)",
281
+ }
282
+
283
+
284
+ # ─── Watermark Detection ────────────────────────────────────────────
285
+ def analyze_watermarks(img: Image.Image) -> Dict[str, Any]:
286
+ """
287
+ Detect invisible watermarks (frequency domain) and visible watermarks.
288
+ Some AI generators embed identifying watermarks.
289
+ """
290
+ gray = np.array(img.convert("L")).astype(np.float64)
291
+
292
+ # Check for periodic watermark patterns in FFT
293
+ fft = np.fft.fft2(gray)
294
+ fft_shift = np.fft.fftshift(fft)
295
+ magnitude = np.log(np.abs(fft_shift) + 1)
296
+
297
+ h, w = magnitude.shape
298
+ cy, cx = h // 2, w // 2
299
+
300
+ # Remove DC component and check for suspicious isolated peaks
301
+ magnitude_clean = magnitude.copy()
302
+ magnitude_clean[cy - 3:cy + 3, cx - 3:cx + 3] = 0
303
+
304
+ # Find isolated bright spots (potential watermark carriers)
305
+ from scipy.ndimage import maximum_filter
306
+ local_max = maximum_filter(magnitude_clean, size=10)
307
+ peaks = (magnitude_clean == local_max) & (magnitude_clean > np.percentile(magnitude_clean, 99.5))
308
+ n_isolated_peaks = int(np.sum(peaks))
309
+
310
+ # Check image info for C2PA / Content Credentials
311
+ info = img.info or {}
312
+ c2pa_found = False
313
+ for key in info:
314
+ key_str = str(key).lower()
315
+ val_str = str(info[key])[:500].lower()
316
+ if any(marker in key_str or marker in val_str
317
+ for marker in ["c2pa", "contentcredentials", "content_authenticity"]):
318
+ c2pa_found = True
319
+
320
+ if c2pa_found:
321
+ score = 0.0
322
+ note = "C2PA Content Credentials watermark detected (provenance tracking)"
323
+ elif n_isolated_peaks > 20:
324
+ score = 0.2
325
+ note = f"Suspicious frequency-domain peaks ({n_isolated_peaks}, possible embedded watermark)"
326
+ else:
327
+ score = 0.0
328
+ note = f"No watermark signatures detected ({n_isolated_peaks} peaks)"
329
+
330
+ return {
331
+ "test": "Watermark Detection",
332
+ "isolated_peaks": n_isolated_peaks,
333
+ "c2pa_found": c2pa_found,
334
+ "score": score,
335
+ "note": note,
336
+ }
337
+
338
+
339
+ # ─── Compression Ghost Detection ────────────────────────────────────
340
+ def analyze_compression_ghosts(img: Image.Image) -> Dict[str, Any]:
341
+ """
342
+ Double JPEG compression leaves 'ghosts' β€” periodic artifacts at
343
+ block boundaries that differ from single compression.
344
+ Detect by analyzing 8Γ—8 block boundary discontinuities.
345
+ """
346
+ gray = np.array(img.convert("L")).astype(np.float64)
347
+ h, w = gray.shape
348
+ h_crop, w_crop = (h // 8) * 8, (w // 8) * 8
349
+ gray = gray[:h_crop, :w_crop]
350
+
351
+ # Measure discontinuity at 8Γ—8 block boundaries
352
+ boundary_diffs = []
353
+ interior_diffs = []
354
+
355
+ for i in range(1, h_crop):
356
+ if i % 8 == 0:
357
+ # Block boundary row
358
+ boundary_diffs.extend(np.abs(gray[i, :] - gray[i - 1, :]).tolist())
359
+ else:
360
+ interior_diffs.extend(np.abs(gray[i, :] - gray[i - 1, :]).tolist())
361
+
362
+ for j in range(1, w_crop):
363
+ if j % 8 == 0:
364
+ boundary_diffs.extend(np.abs(gray[:, j] - gray[:, j - 1]).tolist())
365
+ else:
366
+ interior_diffs.extend(np.abs(gray[:, j] - gray[:, j - 1]).tolist())
367
+
368
+ if boundary_diffs and interior_diffs:
369
+ boundary_mean = float(np.mean(boundary_diffs))
370
+ interior_mean = float(np.mean(interior_diffs))
371
+ blockiness = boundary_mean / (interior_mean + 1e-9)
372
+ else:
373
+ blockiness = 1.0
374
+
375
+ # Blockiness > 1.2 suggests JPEG compression; > 1.5 suggests double compression
376
+ if blockiness > 1.5:
377
+ score = 0.3
378
+ note = f"Strong block boundary artifacts (blockiness={blockiness:.3f}, possible double JPEG)"
379
+ elif blockiness > 1.2:
380
+ score = -0.1
381
+ note = f"Normal JPEG blockiness ({blockiness:.3f})"
382
+ elif blockiness < 1.02:
383
+ score = 0.1
384
+ note = f"No block boundaries (blockiness={blockiness:.3f}, non-JPEG or AI)"
385
+ else:
386
+ score = 0.0
387
+ note = f"Mild blockiness ({blockiness:.3f})"
388
+
389
+ return {
390
+ "test": "Compression Ghost Detection",
391
+ "blockiness_ratio": round(blockiness, 4),
392
+ "score": score,
393
+ "note": note,
394
+ }
395
+
396
+
397
  # ─── Main Agent Entry Point ─────────────────────────────────────────
398
  def run_metadata_agent(img: Image.Image) -> AgentEvidence:
399
  """Run all metadata analysis tests."""
400
  findings = []
401
  scores = []
402
 
403
+ for fn in [analyze_exif, analyze_ela, analyze_ai_metadata,
404
+ analyze_thumbnail_consistency, analyze_watermarks, analyze_compression_ghosts]:
405
  try:
406
  result = fn(img)
407
  findings.append(result)
 
437
  agent_name="Metadata Agent",
438
  violation_score=np.clip(avg_score, -1, 1),
439
  confidence=confidence,
440
+ failure_prob=max(0.0, 1.0 - len(scores) / 6),
441
  rationale=rationale,
442
  sub_findings=findings,
443
  visual_evidence=ela_img,