| """FORENSIQ β Shared utilities for all agents.""" |
|
|
| import numpy as np |
| from typing import List, Dict, Any |
|
|
|
|
| def compute_agent_confidence(scores: List[float]) -> float: |
| """ |
| Compute agent confidence using agreement-vs-cancellation logic. |
| |
| Key fix: consistent weak agreement across MANY tests compounds confidence. |
| 4 tests weakly agreeing is more confident than 1 test strongly agreeing. |
| """ |
| if not scores: |
| return 0.1 |
| |
| avg = float(np.mean(scores)) |
| n = len(scores) |
| |
| |
| signs = [1 if s > 0.05 else (-1 if s < -0.05 else 0) for s in scores] |
| n_pos = sum(1 for s in signs if s > 0) |
| n_neg = sum(1 for s in signs if s < 0) |
| n_neu = sum(1 for s in signs if s == 0) |
| n_directional = n_pos + n_neg |
| |
| if n_directional == 0: |
| |
| return 0.15 |
| |
| if n_pos > 0 and n_neg > 0: |
| |
| majority = max(n_pos, n_neg) |
| minority = min(n_pos, n_neg) |
| agreement_ratio = majority / n_directional |
| |
| |
| return min(1.0, 0.1 + 0.4 * abs(avg) * agreement_ratio) |
| else: |
| |
| |
| n_agreeing = max(n_pos, n_neg) |
| |
| |
| magnitude_conf = abs(avg) |
| |
| |
| agreement_bonus = min(1.0, np.sqrt(n_agreeing / 3.0)) |
| |
| |
| conf = 0.2 + 0.7 * magnitude_conf * agreement_bonus |
| |
| |
| if n_agreeing >= 5: |
| conf += 0.1 |
| if n_agreeing >= 10: |
| conf += 0.1 |
| |
| return min(1.0, conf) |
|
|
|
|
| def compute_failure_prob(n_ran: int, n_total: int, n_insufficient: int = 0) -> float: |
| """ |
| Compute agent failure probability. |
| Accounts for both crashed tests AND tests returning insufficient data. |
| |
| n_ran: tests that returned a score (including insufficient-data ones) |
| n_total: total tests attempted |
| n_insufficient: tests that returned score=0 due to insufficient data |
| """ |
| n_effective = n_ran - n_insufficient |
| return max(0.0, 1.0 - n_effective / max(n_total, 1)) |
|
|
|
|
| def run_agent_tests(tests, img, agent_name, modality_adjustments=None): |
| """ |
| Shared test runner for all signal-processing agents. |
| Handles: running tests, tagging insufficient-data, computing confidence properly. |
| Applies modality-aware score reweighting when modality_adjustments is provided. |
| """ |
| findings, scores = [], [] |
| n_insufficient = 0 |
| |
| for fn in tests: |
| try: |
| r = fn(img) |
| findings.append(r) |
| |
| sc = r.get("score", 0) |
| note = r.get("note", "") |
| test_name = r.get("test", fn.__name__) |
| |
| |
| is_insufficient = (sc == 0.0 and any(kw in note.lower() for kw in |
| ["insufficient", "too small", "no data", "not available", "few ", "no ", |
| "not meaningful", "cannot read", "parse error"])) |
| |
| if is_insufficient: |
| r["not_applicable"] = True |
| n_insufficient += 1 |
| |
| |
| if modality_adjustments and test_name in modality_adjustments and not r.get("override_suppression", False): |
| multiplier = modality_adjustments[test_name] |
| original_score = sc |
| sc = sc * multiplier |
| r["score"] = sc |
| r["modality_adjusted"] = True |
| r["original_score"] = original_score |
| r["adjustment_multiplier"] = multiplier |
| if multiplier < 0.5: |
| r["note"] = f"[Modality-suppressed Γ{multiplier}] {note}" |
| elif r.get("override_suppression"): |
| r["note"] = f"[OVERRIDE: suppression bypassed] {note}" |
| |
| scores.append(sc) |
| except Exception as e: |
| findings.append({"test": fn.__name__, "error": str(e), "score": 0}) |
| |
| |
| active_scores = [s for s, f in zip(scores, findings) |
| if not f.get("not_applicable", False)] |
| |
| avg = float(np.mean(active_scores)) if active_scores else 0.0 |
| conf = compute_agent_confidence(active_scores) |
| fail = compute_failure_prob(len(scores), len(tests), n_insufficient) |
| |
| |
| viol = [f["test"] for f in findings if f.get("score", 0) > 0.2 and not f.get("not_applicable")] |
| comp = [f["test"] for f in findings if f.get("score", 0) < -0.1 and not f.get("not_applicable")] |
| |
| domain = agent_name.replace(" Agent", "") |
| if viol: |
| rat = f"{domain} violations: {', '.join(viol)}." |
| elif comp: |
| rat = f"{domain} consistent: {', '.join(comp)}." |
| else: |
| rat = f"{domain} inconclusive." |
| |
| for f in findings: |
| if f.get("note") and not f.get("not_applicable"): |
| rat += f" [{f['test']}]: {f['note']}." |
| |
| return findings, avg, conf, fail, rat |
|
|