File size: 5,678 Bytes
3168ac9
 
 
 
 
 
 
 
 
 
89b6e5c
 
3168ac9
 
 
 
 
89b6e5c
3168ac9
 
 
 
 
 
89b6e5c
3168ac9
89b6e5c
3168ac9
89b6e5c
 
 
 
 
 
 
 
 
 
3168ac9
89b6e5c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3168ac9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6ea04f9
3168ac9
 
 
6ea04f9
3168ac9
 
 
 
 
 
 
 
 
 
 
6ea04f9
3168ac9
 
 
6ea04f9
 
3168ac9
 
 
 
 
c21bd7b
 
6ea04f9
 
 
 
 
 
 
 
 
c21bd7b
 
6ea04f9
3168ac9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""FORENSIQ β€” Shared utilities for all agents."""

import numpy as np
from typing import List, Dict, Any


def compute_agent_confidence(scores: List[float]) -> float:
    """
    Compute agent confidence using agreement-vs-cancellation logic.
    
    Key fix: consistent weak agreement across MANY tests compounds confidence.
    4 tests weakly agreeing is more confident than 1 test strongly agreeing.
    """
    if not scores:
        return 0.1
    
    avg = float(np.mean(scores))
    n = len(scores)
    
    # Classify each score's direction
    signs = [1 if s > 0.05 else (-1 if s < -0.05 else 0) for s in scores]
    n_pos = sum(1 for s in signs if s > 0)
    n_neg = sum(1 for s in signs if s < 0)
    n_neu = sum(1 for s in signs if s == 0)
    n_directional = n_pos + n_neg
    
    if n_directional == 0:
        # All genuinely neutral β€” low confidence
        return 0.15
    
    if n_pos > 0 and n_neg > 0:
        # Mixed signals β€” confidence based on majority strength
        majority = max(n_pos, n_neg)
        minority = min(n_pos, n_neg)
        agreement_ratio = majority / n_directional
        
        # Penalize cancellation but still give credit for majority direction
        return min(1.0, 0.1 + 0.4 * abs(avg) * agreement_ratio)
    else:
        # All directional scores agree β€” confidence compounds with count
        # Key insight: 10 tests all saying -0.1 is more confident than 1 test saying -0.3
        n_agreeing = max(n_pos, n_neg)
        
        # Base confidence from magnitude
        magnitude_conf = abs(avg)
        
        # Agreement bonus: sqrt scaling so 4 tests β‰ˆ 2x confidence vs 1 test
        agreement_bonus = min(1.0, np.sqrt(n_agreeing / 3.0))  # normalized: 3 agreeing = 1.0
        
        # Combined: magnitude Γ— agreement, with floor at 0.2 for any agreement
        conf = 0.2 + 0.7 * magnitude_conf * agreement_bonus
        
        # Additional boost for broad agreement (many tests, not just a few)
        if n_agreeing >= 5:
            conf += 0.1
        if n_agreeing >= 10:
            conf += 0.1
        
        return min(1.0, conf)


def compute_failure_prob(n_ran: int, n_total: int, n_insufficient: int = 0) -> float:
    """
    Compute agent failure probability.
    Accounts for both crashed tests AND tests returning insufficient data.
    
    n_ran: tests that returned a score (including insufficient-data ones)
    n_total: total tests attempted
    n_insufficient: tests that returned score=0 due to insufficient data
    """
    n_effective = n_ran - n_insufficient  # tests that actually produced signal
    return max(0.0, 1.0 - n_effective / max(n_total, 1))


def run_agent_tests(tests, img, agent_name, modality_adjustments=None):
    """
    Shared test runner for all signal-processing agents.
    Handles: running tests, tagging insufficient-data, computing confidence properly.
    Applies modality-aware score reweighting when modality_adjustments is provided.
    """
    findings, scores = [], []
    n_insufficient = 0
    
    for fn in tests:
        try:
            r = fn(img)
            findings.append(r)
            
            sc = r.get("score", 0)
            note = r.get("note", "")
            test_name = r.get("test", fn.__name__)
            
            # P7: Detect insufficient-data results β€” tag as not_applicable
            is_insufficient = (sc == 0.0 and any(kw in note.lower() for kw in 
                ["insufficient", "too small", "no data", "not available", "few ", "no ",
                 "not meaningful", "cannot read", "parse error"]))
            
            if is_insufficient:
                r["not_applicable"] = True
                n_insufficient += 1
            
            # Apply modality adjustment if present β€” BUT respect override_suppression
            if modality_adjustments and test_name in modality_adjustments and not r.get("override_suppression", False):
                multiplier = modality_adjustments[test_name]
                original_score = sc
                sc = sc * multiplier
                r["score"] = sc
                r["modality_adjusted"] = True
                r["original_score"] = original_score
                r["adjustment_multiplier"] = multiplier
                if multiplier < 0.5:
                    r["note"] = f"[Modality-suppressed Γ—{multiplier}] {note}"
            elif r.get("override_suppression"):
                r["note"] = f"[OVERRIDE: suppression bypassed] {note}"
            
            scores.append(sc)
        except Exception as e:
            findings.append({"test": fn.__name__, "error": str(e), "score": 0})
    
    # Filter out not_applicable scores for averaging
    active_scores = [s for s, f in zip(scores, findings) 
                     if not f.get("not_applicable", False)]
    
    avg = float(np.mean(active_scores)) if active_scores else 0.0
    conf = compute_agent_confidence(active_scores)
    fail = compute_failure_prob(len(scores), len(tests), n_insufficient)
    
    # Build rationale
    viol = [f["test"] for f in findings if f.get("score", 0) > 0.2 and not f.get("not_applicable")]
    comp = [f["test"] for f in findings if f.get("score", 0) < -0.1 and not f.get("not_applicable")]
    
    domain = agent_name.replace(" Agent", "")
    if viol:
        rat = f"{domain} violations: {', '.join(viol)}."
    elif comp:
        rat = f"{domain} consistent: {', '.join(comp)}."
    else:
        rat = f"{domain} inconclusive."
    
    for f in findings:
        if f.get("note") and not f.get("not_applicable"):
            rat += f" [{f['test']}]: {f['note']}."
    
    return findings, avg, conf, fail, rat