File size: 6,329 Bytes
27760c8 661eb14 27760c8 b14fc84 27760c8 ed344be 27760c8 b14fc84 27760c8 661eb14 27760c8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 | import json
from datetime import datetime, timezone
import streamlit as st
from core import audit, bidder_processor, fallback
from core.config import CONFIDENCE_HIGH, CONFIDENCE_REVIEW, MODEL_VERSION
from core.llm_client import LLM, LLMUnavailable
from core.prompts import EVALUATE_CRITERION_PROMPT_SYSTEM
from core.schemas import Criterion, Source, Verdict
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
@st.cache_resource
def _get_llm() -> LLM:
return LLM()
def _combined_confidence(
llm_confidence: float, source_type: str, ocr_confidence: float | None
) -> float:
if source_type == "text_pdf":
return llm_confidence
elif source_type == "vision_llm":
return 0.7 * llm_confidence + 0.3 * 0.95
elif source_type == "tesseract":
tc = ocr_confidence if ocr_confidence and ocr_confidence >= 0 else 0.3
return 0.6 * llm_confidence + 0.4 * tc
return llm_confidence
def _apply_thresholds(verdict: str, combined: float) -> str:
if verdict == "needs_review":
return "needs_review"
if combined >= CONFIDENCE_HIGH:
return verdict
if CONFIDENCE_REVIEW <= combined < CONFIDENCE_HIGH and verdict == "not_eligible":
return "needs_review"
if combined < CONFIDENCE_REVIEW:
return "needs_review"
return verdict
def evaluate(bidder_id: str, criterion: Criterion) -> Verdict:
evidence = bidder_processor.gather_evidence(bidder_id, criterion)
if not evidence:
v = Verdict(
bidder_id=bidder_id,
criterion_id=criterion.id,
verdict="needs_review",
reason="No matching evidence found in submitted documents.",
llm_confidence=0.0,
combined_confidence=0.0,
model_version=MODEL_VERSION,
timestamp=_now_iso(),
)
audit.log("criterion_evaluated", bidder_id=bidder_id,
criterion_id=criterion.id, verdict="needs_review",
llm_verdict="needs_review", extracted_value="",
llm_confidence=0.0, combined_confidence=0.0,
ocr_tier="", escalation_reason="no evidence found", reason=v.reason)
return v
evidence_dicts = [
{
"doc_name": e.doc_name,
"page": e.page,
"ocr_confidence": e.ocr_confidence,
"source_type": e.source_type,
"text": e.text[:1500],
}
for e in evidence
]
user_prompt = f"""CRITERION:
{criterion.model_dump_json(indent=2)}
RETRIEVED EVIDENCE (top-k chunks from bidder {bidder_id}):
{json.dumps(evidence_dicts, indent=2)}
Return JSON:
{{
"verdict": "eligible" | "not_eligible" | "needs_review",
"extracted_value": "<short string as found in evidence>",
"normalized_value": <number or null>,
"chosen_source": {{"doc_name": "...", "page": <int>, "snippet": "<= 200 chars", "source_type": "..."}},
"llm_confidence": <0.0 to 1.0>,
"reason": "<one or two sentences>"
}}
Rules:
- If evidence directly contains a value satisfying the rule, verdict=eligible with high llm_confidence.
- If evidence directly contradicts the rule, verdict=not_eligible.
- If no relevant evidence retrieved, verdict=needs_review, llm_confidence<=0.4.
- If the source is OCR with low confidence and the value is borderline, lean to needs_review.
"""
try:
llm = _get_llm()
result = llm.chat_json(EVALUATE_CRITERION_PROMPT_SYSTEM, user_prompt)
except LLMUnavailable:
audit.log("precomputed_fallback_used", bidder_id=bidder_id,
criterion_id=criterion.id, reason="LLMUnavailable in evaluate")
if "fallback_active" not in st.session_state:
st.session_state["fallback_active"] = True
return fallback.load_evaluation(bidder_id, criterion.id)
llm_verdict = result.get("verdict", "needs_review")
extracted_value = result.get("extracted_value")
normalized_value = result.get("normalized_value")
chosen_src = result.get("chosen_source") or {}
llm_confidence = float(result.get("llm_confidence", 0.5))
reason = result.get("reason", "")
source_type = chosen_src.get("source_type") or "text_pdf"
best_evidence = next(
(e for e in evidence if e.doc_name == chosen_src.get("doc_name")),
evidence[0] if evidence else None,
)
ocr_confidence = best_evidence.ocr_confidence if best_evidence else None
if ocr_confidence and ocr_confidence < 0:
ocr_confidence = None
source = Source(
doc_name=chosen_src.get("doc_name", ""),
page=int(chosen_src.get("page", 1)),
snippet=chosen_src.get("snippet", "")[:200],
source_type=source_type,
) if chosen_src else None
combined = _combined_confidence(llm_confidence, source_type, ocr_confidence)
final_verdict = _apply_thresholds(llm_verdict, combined)
v = Verdict(
bidder_id=bidder_id,
criterion_id=criterion.id,
verdict=final_verdict,
extracted_value=extracted_value,
normalized_value=normalized_value,
source=source,
llm_confidence=llm_confidence,
ocr_confidence=ocr_confidence,
combined_confidence=round(combined, 4),
reason=reason,
model_version=MODEL_VERSION,
timestamp=_now_iso(),
review_status="pending",
)
escalation_reason = None
if llm_verdict != final_verdict:
if combined < CONFIDENCE_REVIEW:
escalation_reason = f"auto-escalated: combined confidence {combined:.0%} below threshold"
elif combined < CONFIDENCE_HIGH and llm_verdict == "not_eligible":
escalation_reason = f"auto-escalated: borderline confidence {combined:.0%} on disqualification"
audit.log(
"criterion_evaluated",
bidder_id=bidder_id,
criterion_id=criterion.id,
verdict=final_verdict,
llm_verdict=llm_verdict,
extracted_value=extracted_value or "",
llm_confidence=round(llm_confidence, 4),
combined_confidence=round(combined, 4),
ocr_tier=source_type,
escalation_reason=escalation_reason or "",
reason=reason,
)
return v
def evaluate_bidder(bidder_id: str, criteria: list[Criterion]) -> list[Verdict]:
return [evaluate(bidder_id, c) for c in criteria]
|