| """Safety and legality verifier.""" |
|
|
| from __future__ import annotations |
|
|
| from urllib.parse import urlparse |
|
|
| from app.common.enums import ActionType, DoseBucket |
| from app.common.types import PolyGuardAction, PolyGuardState, SafetyReport |
| from app.knowledge.ddi_knowledge import is_contraindicated_pair |
| from app.knowledge.duplicate_therapy_rules import has_duplicate_therapy |
| from app.knowledge.drug_catalog import DRUG_CLASSES |
| from app.knowledge.renal_rules import is_renal_unsafe |
| from app.knowledge.hepatic_rules import is_hepatic_unsafe |
| from app.knowledge.substitution_rules import get_substitutions |
| from app.knowledge.taper_rules import requires_taper |
|
|
|
|
| def verify_action_legality(state: PolyGuardState, action: PolyGuardAction) -> SafetyReport: |
| violations: list[str] = [] |
| patient = state.patient |
| med_names = [m.drug for m in patient.medications] |
|
|
| if action.target_drug and action.target_drug not in med_names and action.action_type != ActionType.SUBSTITUTE_WITHIN_CLASS: |
| violations.append("target_drug_not_in_regimen") |
|
|
| if action.action_type == ActionType.SUBSTITUTE_WITHIN_CLASS and not action.replacement_drug: |
| violations.append("replacement_drug_required") |
| if action.action_type == ActionType.SUBSTITUTE_WITHIN_CLASS and action.target_drug and action.replacement_drug: |
| allowed = set(get_substitutions(action.target_drug)) |
| if action.replacement_drug not in allowed: |
| violations.append("unsafe_substitution") |
|
|
| if action.action_type == ActionType.RECOMMEND_ALTERNATIVE: |
| if not action.target_drug or not action.replacement_drug: |
| violations.append("alternative_requires_target_and_replacement") |
| elif action.target_drug and action.replacement_drug: |
| allowed = set(get_substitutions(action.target_drug)) |
| if action.replacement_drug not in allowed: |
| violations.append("unsafe_alternative_recommendation") |
|
|
| if action.action_type == ActionType.FETCH_EXTERNAL_EVIDENCE: |
| if not action.evidence_query: |
| violations.append("missing_evidence_query") |
| else: |
| query = action.evidence_query.strip() |
| if query.startswith("http"): |
| host = urlparse(query).netloc.lower() |
| allowlist = {"nih.gov", "fda.gov", "who.int", "ema.europa.eu"} |
| if host and not any(host.endswith(domain) for domain in allowlist): |
| violations.append("evidence_domain_not_allowlisted") |
|
|
| if action.action_type == ActionType.DECOMPOSE_NEW_DRUG: |
| if not action.new_drug_name: |
| violations.append("missing_new_drug_name") |
| if not action.candidate_components: |
| violations.append("missing_candidate_components") |
|
|
| if action.action_type == ActionType.STOP_DRUG and action.target_drug and requires_taper(action.target_drug): |
| if action.taper_days is None: |
| violations.append("abrupt_stop_requires_taper") |
| if action.action_type in {ActionType.TAPER_INITIATE, ActionType.TAPER_CONTINUE} and action.target_drug: |
| if not requires_taper(action.target_drug): |
| violations.append("invalid_taper_target") |
|
|
| if action.action_type == ActionType.INCREASE_DOSE_BUCKET and action.dose_bucket == DoseBucket.HIGH: |
| if action.target_drug and is_renal_unsafe(action.target_drug, patient.labs.egfr): |
| violations.append("renal_unsafe_dose") |
| if action.target_drug and is_hepatic_unsafe(action.target_drug, patient.labs.ast, patient.labs.alt): |
| violations.append("hepatic_unsafe_dose") |
| if action.action_type == ActionType.REDUCE_DOSE_BUCKET and action.target_drug: |
| idx = med_names.index(action.target_drug) if action.target_drug in med_names else -1 |
| if idx >= 0 and patient.medications[idx].dose_bucket in {DoseBucket.LOW, DoseBucket.HOLD}: |
| violations.append("dose_already_minimized") |
| if action.action_type == ActionType.INCREASE_DOSE_BUCKET and action.target_drug: |
| idx = med_names.index(action.target_drug) if action.target_drug in med_names else -1 |
| if idx >= 0 and patient.medications[idx].dose_bucket == DoseBucket.HIGH: |
| violations.append("dose_overshoot_risk") |
| if patient.frailty_score > 0.7 and DRUG_CLASSES.get(action.target_drug) == "sedative": |
| violations.append("invalid_class_escalation") |
|
|
| |
| if action.action_type in { |
| ActionType.SUBSTITUTE_WITHIN_CLASS, |
| ActionType.RECOMMEND_ALTERNATIVE, |
| ActionType.INCREASE_DOSE_BUCKET, |
| } and has_duplicate_therapy(patient.medications, action.target_drug, action.replacement_drug): |
| violations.append("duplicate_therapy") |
|
|
| if action.replacement_drug and action.target_drug and is_contraindicated_pair(action.target_drug, action.replacement_drug): |
| violations.append("unsafe_substitution_contraindication") |
| if action.replacement_drug: |
| contraindication_hits = 0 |
| for med in patient.medications: |
| if med.drug != action.target_drug and is_contraindicated_pair(med.drug, action.replacement_drug): |
| violations.append("creates_severe_contraindicated_pair") |
| contraindication_hits += 1 |
| if contraindication_hits >= 2: |
| violations.append("dangerous_triple_risk_creation") |
|
|
| if action.action_type in {ActionType.ORDER_MONITORING_AND_WAIT, ActionType.DOSE_HOLD} and not action.monitoring_plan: |
| violations.append("invalid_monitoring_gap") |
|
|
| if ( |
| action.action_type == ActionType.STOP_DRUG |
| and action.target_drug == "warfarin_like" |
| and "afib" in patient.comorbidities |
| and not action.replacement_drug |
| ): |
| violations.append("destabilizing_deprescribing") |
|
|
| legal = len(violations) == 0 |
| severity = "none" if legal else ("high" if len(violations) > 1 else "medium") |
| fallback = ( |
| ActionType.REQUEST_SPECIALIST_REVIEW |
| if not legal |
| else ActionType.KEEP_REGIMEN |
| ) |
| return SafetyReport( |
| legal=legal, |
| violations=violations, |
| severity=severity, |
| recommended_fallback=fallback, |
| uncertainty_notes=["manual_review_recommended"] if not legal else [], |
| ) |
|
|