FORENSIQ / agents /utils.py
anky2002's picture
Upload agents/utils.py with huggingface_hub
c21bd7b verified
"""FORENSIQ β€” Shared utilities for all agents."""
import numpy as np
from typing import List, Dict, Any
def compute_agent_confidence(scores: List[float]) -> float:
"""
Compute agent confidence using agreement-vs-cancellation logic.
Key fix: consistent weak agreement across MANY tests compounds confidence.
4 tests weakly agreeing is more confident than 1 test strongly agreeing.
"""
if not scores:
return 0.1
avg = float(np.mean(scores))
n = len(scores)
# Classify each score's direction
signs = [1 if s > 0.05 else (-1 if s < -0.05 else 0) for s in scores]
n_pos = sum(1 for s in signs if s > 0)
n_neg = sum(1 for s in signs if s < 0)
n_neu = sum(1 for s in signs if s == 0)
n_directional = n_pos + n_neg
if n_directional == 0:
# All genuinely neutral β€” low confidence
return 0.15
if n_pos > 0 and n_neg > 0:
# Mixed signals β€” confidence based on majority strength
majority = max(n_pos, n_neg)
minority = min(n_pos, n_neg)
agreement_ratio = majority / n_directional
# Penalize cancellation but still give credit for majority direction
return min(1.0, 0.1 + 0.4 * abs(avg) * agreement_ratio)
else:
# All directional scores agree β€” confidence compounds with count
# Key insight: 10 tests all saying -0.1 is more confident than 1 test saying -0.3
n_agreeing = max(n_pos, n_neg)
# Base confidence from magnitude
magnitude_conf = abs(avg)
# Agreement bonus: sqrt scaling so 4 tests β‰ˆ 2x confidence vs 1 test
agreement_bonus = min(1.0, np.sqrt(n_agreeing / 3.0)) # normalized: 3 agreeing = 1.0
# Combined: magnitude Γ— agreement, with floor at 0.2 for any agreement
conf = 0.2 + 0.7 * magnitude_conf * agreement_bonus
# Additional boost for broad agreement (many tests, not just a few)
if n_agreeing >= 5:
conf += 0.1
if n_agreeing >= 10:
conf += 0.1
return min(1.0, conf)
def compute_failure_prob(n_ran: int, n_total: int, n_insufficient: int = 0) -> float:
"""
Compute agent failure probability.
Accounts for both crashed tests AND tests returning insufficient data.
n_ran: tests that returned a score (including insufficient-data ones)
n_total: total tests attempted
n_insufficient: tests that returned score=0 due to insufficient data
"""
n_effective = n_ran - n_insufficient # tests that actually produced signal
return max(0.0, 1.0 - n_effective / max(n_total, 1))
def run_agent_tests(tests, img, agent_name, modality_adjustments=None):
"""
Shared test runner for all signal-processing agents.
Handles: running tests, tagging insufficient-data, computing confidence properly.
Applies modality-aware score reweighting when modality_adjustments is provided.
"""
findings, scores = [], []
n_insufficient = 0
for fn in tests:
try:
r = fn(img)
findings.append(r)
sc = r.get("score", 0)
note = r.get("note", "")
test_name = r.get("test", fn.__name__)
# P7: Detect insufficient-data results β€” tag as not_applicable
is_insufficient = (sc == 0.0 and any(kw in note.lower() for kw in
["insufficient", "too small", "no data", "not available", "few ", "no ",
"not meaningful", "cannot read", "parse error"]))
if is_insufficient:
r["not_applicable"] = True
n_insufficient += 1
# Apply modality adjustment if present β€” BUT respect override_suppression
if modality_adjustments and test_name in modality_adjustments and not r.get("override_suppression", False):
multiplier = modality_adjustments[test_name]
original_score = sc
sc = sc * multiplier
r["score"] = sc
r["modality_adjusted"] = True
r["original_score"] = original_score
r["adjustment_multiplier"] = multiplier
if multiplier < 0.5:
r["note"] = f"[Modality-suppressed Γ—{multiplier}] {note}"
elif r.get("override_suppression"):
r["note"] = f"[OVERRIDE: suppression bypassed] {note}"
scores.append(sc)
except Exception as e:
findings.append({"test": fn.__name__, "error": str(e), "score": 0})
# Filter out not_applicable scores for averaging
active_scores = [s for s, f in zip(scores, findings)
if not f.get("not_applicable", False)]
avg = float(np.mean(active_scores)) if active_scores else 0.0
conf = compute_agent_confidence(active_scores)
fail = compute_failure_prob(len(scores), len(tests), n_insufficient)
# Build rationale
viol = [f["test"] for f in findings if f.get("score", 0) > 0.2 and not f.get("not_applicable")]
comp = [f["test"] for f in findings if f.get("score", 0) < -0.1 and not f.get("not_applicable")]
domain = agent_name.replace(" Agent", "")
if viol:
rat = f"{domain} violations: {', '.join(viol)}."
elif comp:
rat = f"{domain} consistent: {', '.join(comp)}."
else:
rat = f"{domain} inconclusive."
for f in findings:
if f.get("note") and not f.get("not_applicable"):
rat += f" [{f['test']}]: {f['note']}."
return findings, avg, conf, fail, rat