File size: 5,678 Bytes
3168ac9 89b6e5c 3168ac9 89b6e5c 3168ac9 89b6e5c 3168ac9 89b6e5c 3168ac9 89b6e5c 3168ac9 89b6e5c 3168ac9 6ea04f9 3168ac9 6ea04f9 3168ac9 6ea04f9 3168ac9 6ea04f9 3168ac9 c21bd7b 6ea04f9 c21bd7b 6ea04f9 3168ac9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | """FORENSIQ β Shared utilities for all agents."""
import numpy as np
from typing import List, Dict, Any
def compute_agent_confidence(scores: List[float]) -> float:
"""
Compute agent confidence using agreement-vs-cancellation logic.
Key fix: consistent weak agreement across MANY tests compounds confidence.
4 tests weakly agreeing is more confident than 1 test strongly agreeing.
"""
if not scores:
return 0.1
avg = float(np.mean(scores))
n = len(scores)
# Classify each score's direction
signs = [1 if s > 0.05 else (-1 if s < -0.05 else 0) for s in scores]
n_pos = sum(1 for s in signs if s > 0)
n_neg = sum(1 for s in signs if s < 0)
n_neu = sum(1 for s in signs if s == 0)
n_directional = n_pos + n_neg
if n_directional == 0:
# All genuinely neutral β low confidence
return 0.15
if n_pos > 0 and n_neg > 0:
# Mixed signals β confidence based on majority strength
majority = max(n_pos, n_neg)
minority = min(n_pos, n_neg)
agreement_ratio = majority / n_directional
# Penalize cancellation but still give credit for majority direction
return min(1.0, 0.1 + 0.4 * abs(avg) * agreement_ratio)
else:
# All directional scores agree β confidence compounds with count
# Key insight: 10 tests all saying -0.1 is more confident than 1 test saying -0.3
n_agreeing = max(n_pos, n_neg)
# Base confidence from magnitude
magnitude_conf = abs(avg)
# Agreement bonus: sqrt scaling so 4 tests β 2x confidence vs 1 test
agreement_bonus = min(1.0, np.sqrt(n_agreeing / 3.0)) # normalized: 3 agreeing = 1.0
# Combined: magnitude Γ agreement, with floor at 0.2 for any agreement
conf = 0.2 + 0.7 * magnitude_conf * agreement_bonus
# Additional boost for broad agreement (many tests, not just a few)
if n_agreeing >= 5:
conf += 0.1
if n_agreeing >= 10:
conf += 0.1
return min(1.0, conf)
def compute_failure_prob(n_ran: int, n_total: int, n_insufficient: int = 0) -> float:
"""
Compute agent failure probability.
Accounts for both crashed tests AND tests returning insufficient data.
n_ran: tests that returned a score (including insufficient-data ones)
n_total: total tests attempted
n_insufficient: tests that returned score=0 due to insufficient data
"""
n_effective = n_ran - n_insufficient # tests that actually produced signal
return max(0.0, 1.0 - n_effective / max(n_total, 1))
def run_agent_tests(tests, img, agent_name, modality_adjustments=None):
"""
Shared test runner for all signal-processing agents.
Handles: running tests, tagging insufficient-data, computing confidence properly.
Applies modality-aware score reweighting when modality_adjustments is provided.
"""
findings, scores = [], []
n_insufficient = 0
for fn in tests:
try:
r = fn(img)
findings.append(r)
sc = r.get("score", 0)
note = r.get("note", "")
test_name = r.get("test", fn.__name__)
# P7: Detect insufficient-data results β tag as not_applicable
is_insufficient = (sc == 0.0 and any(kw in note.lower() for kw in
["insufficient", "too small", "no data", "not available", "few ", "no ",
"not meaningful", "cannot read", "parse error"]))
if is_insufficient:
r["not_applicable"] = True
n_insufficient += 1
# Apply modality adjustment if present β BUT respect override_suppression
if modality_adjustments and test_name in modality_adjustments and not r.get("override_suppression", False):
multiplier = modality_adjustments[test_name]
original_score = sc
sc = sc * multiplier
r["score"] = sc
r["modality_adjusted"] = True
r["original_score"] = original_score
r["adjustment_multiplier"] = multiplier
if multiplier < 0.5:
r["note"] = f"[Modality-suppressed Γ{multiplier}] {note}"
elif r.get("override_suppression"):
r["note"] = f"[OVERRIDE: suppression bypassed] {note}"
scores.append(sc)
except Exception as e:
findings.append({"test": fn.__name__, "error": str(e), "score": 0})
# Filter out not_applicable scores for averaging
active_scores = [s for s, f in zip(scores, findings)
if not f.get("not_applicable", False)]
avg = float(np.mean(active_scores)) if active_scores else 0.0
conf = compute_agent_confidence(active_scores)
fail = compute_failure_prob(len(scores), len(tests), n_insufficient)
# Build rationale
viol = [f["test"] for f in findings if f.get("score", 0) > 0.2 and not f.get("not_applicable")]
comp = [f["test"] for f in findings if f.get("score", 0) < -0.1 and not f.get("not_applicable")]
domain = agent_name.replace(" Agent", "")
if viol:
rat = f"{domain} violations: {', '.join(viol)}."
elif comp:
rat = f"{domain} consistent: {', '.join(comp)}."
else:
rat = f"{domain} inconclusive."
for f in findings:
if f.get("note") and not f.get("not_applicable"):
rat += f" [{f['test']}]: {f['note']}."
return findings, avg, conf, fail, rat
|