"""FORENSIQ — Shared utilities for all agents.""" import numpy as np from typing import List, Dict, Any def compute_agent_confidence(scores: List[float]) -> float: """ Compute agent confidence using agreement-vs-cancellation logic. Key fix: consistent weak agreement across MANY tests compounds confidence. 4 tests weakly agreeing is more confident than 1 test strongly agreeing. """ if not scores: return 0.1 avg = float(np.mean(scores)) n = len(scores) # Classify each score's direction signs = [1 if s > 0.05 else (-1 if s < -0.05 else 0) for s in scores] n_pos = sum(1 for s in signs if s > 0) n_neg = sum(1 for s in signs if s < 0) n_neu = sum(1 for s in signs if s == 0) n_directional = n_pos + n_neg if n_directional == 0: # All genuinely neutral — low confidence return 0.15 if n_pos > 0 and n_neg > 0: # Mixed signals — confidence based on majority strength majority = max(n_pos, n_neg) minority = min(n_pos, n_neg) agreement_ratio = majority / n_directional # Penalize cancellation but still give credit for majority direction return min(1.0, 0.1 + 0.4 * abs(avg) * agreement_ratio) else: # All directional scores agree — confidence compounds with count # Key insight: 10 tests all saying -0.1 is more confident than 1 test saying -0.3 n_agreeing = max(n_pos, n_neg) # Base confidence from magnitude magnitude_conf = abs(avg) # Agreement bonus: sqrt scaling so 4 tests ≈ 2x confidence vs 1 test agreement_bonus = min(1.0, np.sqrt(n_agreeing / 3.0)) # normalized: 3 agreeing = 1.0 # Combined: magnitude × agreement, with floor at 0.2 for any agreement conf = 0.2 + 0.7 * magnitude_conf * agreement_bonus # Additional boost for broad agreement (many tests, not just a few) if n_agreeing >= 5: conf += 0.1 if n_agreeing >= 10: conf += 0.1 return min(1.0, conf) def compute_failure_prob(n_ran: int, n_total: int, n_insufficient: int = 0) -> float: """ Compute agent failure probability. Accounts for both crashed tests AND tests returning insufficient data. n_ran: tests that returned a score (including insufficient-data ones) n_total: total tests attempted n_insufficient: tests that returned score=0 due to insufficient data """ n_effective = n_ran - n_insufficient # tests that actually produced signal return max(0.0, 1.0 - n_effective / max(n_total, 1)) def run_agent_tests(tests, img, agent_name, modality_adjustments=None): """ Shared test runner for all signal-processing agents. Handles: running tests, tagging insufficient-data, computing confidence properly. Applies modality-aware score reweighting when modality_adjustments is provided. """ findings, scores = [], [] n_insufficient = 0 for fn in tests: try: r = fn(img) findings.append(r) sc = r.get("score", 0) note = r.get("note", "") test_name = r.get("test", fn.__name__) # P7: Detect insufficient-data results — tag as not_applicable is_insufficient = (sc == 0.0 and any(kw in note.lower() for kw in ["insufficient", "too small", "no data", "not available", "few ", "no ", "not meaningful", "cannot read", "parse error"])) if is_insufficient: r["not_applicable"] = True n_insufficient += 1 # Apply modality adjustment if present — BUT respect override_suppression if modality_adjustments and test_name in modality_adjustments and not r.get("override_suppression", False): multiplier = modality_adjustments[test_name] original_score = sc sc = sc * multiplier r["score"] = sc r["modality_adjusted"] = True r["original_score"] = original_score r["adjustment_multiplier"] = multiplier if multiplier < 0.5: r["note"] = f"[Modality-suppressed ×{multiplier}] {note}" elif r.get("override_suppression"): r["note"] = f"[OVERRIDE: suppression bypassed] {note}" scores.append(sc) except Exception as e: findings.append({"test": fn.__name__, "error": str(e), "score": 0}) # Filter out not_applicable scores for averaging active_scores = [s for s, f in zip(scores, findings) if not f.get("not_applicable", False)] avg = float(np.mean(active_scores)) if active_scores else 0.0 conf = compute_agent_confidence(active_scores) fail = compute_failure_prob(len(scores), len(tests), n_insufficient) # Build rationale viol = [f["test"] for f in findings if f.get("score", 0) > 0.2 and not f.get("not_applicable")] comp = [f["test"] for f in findings if f.get("score", 0) < -0.1 and not f.get("not_applicable")] domain = agent_name.replace(" Agent", "") if viol: rat = f"{domain} violations: {', '.join(viol)}." elif comp: rat = f"{domain} consistent: {', '.join(comp)}." else: rat = f"{domain} inconclusive." for f in findings: if f.get("note") and not f.get("not_applicable"): rat += f" [{f['test']}]: {f['note']}." return findings, avg, conf, fail, rat