"""Safety and legality verifier.""" from __future__ import annotations from urllib.parse import urlparse from app.common.enums import ActionType, DoseBucket from app.common.types import PolyGuardAction, PolyGuardState, SafetyReport from app.knowledge.ddi_knowledge import is_contraindicated_pair from app.knowledge.duplicate_therapy_rules import has_duplicate_therapy from app.knowledge.drug_catalog import DRUG_CLASSES from app.knowledge.renal_rules import is_renal_unsafe from app.knowledge.hepatic_rules import is_hepatic_unsafe from app.knowledge.substitution_rules import get_substitutions from app.knowledge.taper_rules import requires_taper def verify_action_legality(state: PolyGuardState, action: PolyGuardAction) -> SafetyReport: violations: list[str] = [] patient = state.patient med_names = [m.drug for m in patient.medications] if action.target_drug and action.target_drug not in med_names and action.action_type != ActionType.SUBSTITUTE_WITHIN_CLASS: violations.append("target_drug_not_in_regimen") if action.action_type == ActionType.SUBSTITUTE_WITHIN_CLASS and not action.replacement_drug: violations.append("replacement_drug_required") if action.action_type == ActionType.SUBSTITUTE_WITHIN_CLASS and action.target_drug and action.replacement_drug: allowed = set(get_substitutions(action.target_drug)) if action.replacement_drug not in allowed: violations.append("unsafe_substitution") if action.action_type == ActionType.RECOMMEND_ALTERNATIVE: if not action.target_drug or not action.replacement_drug: violations.append("alternative_requires_target_and_replacement") elif action.target_drug and action.replacement_drug: allowed = set(get_substitutions(action.target_drug)) if action.replacement_drug not in allowed: violations.append("unsafe_alternative_recommendation") if action.action_type == ActionType.FETCH_EXTERNAL_EVIDENCE: if not action.evidence_query: violations.append("missing_evidence_query") else: query = action.evidence_query.strip() if query.startswith("http"): host = urlparse(query).netloc.lower() allowlist = {"nih.gov", "fda.gov", "who.int", "ema.europa.eu"} if host and not any(host.endswith(domain) for domain in allowlist): violations.append("evidence_domain_not_allowlisted") if action.action_type == ActionType.DECOMPOSE_NEW_DRUG: if not action.new_drug_name: violations.append("missing_new_drug_name") if not action.candidate_components: violations.append("missing_candidate_components") if action.action_type == ActionType.STOP_DRUG and action.target_drug and requires_taper(action.target_drug): if action.taper_days is None: violations.append("abrupt_stop_requires_taper") if action.action_type in {ActionType.TAPER_INITIATE, ActionType.TAPER_CONTINUE} and action.target_drug: if not requires_taper(action.target_drug): violations.append("invalid_taper_target") if action.action_type == ActionType.INCREASE_DOSE_BUCKET and action.dose_bucket == DoseBucket.HIGH: if action.target_drug and is_renal_unsafe(action.target_drug, patient.labs.egfr): violations.append("renal_unsafe_dose") if action.target_drug and is_hepatic_unsafe(action.target_drug, patient.labs.ast, patient.labs.alt): violations.append("hepatic_unsafe_dose") if action.action_type == ActionType.REDUCE_DOSE_BUCKET and action.target_drug: idx = med_names.index(action.target_drug) if action.target_drug in med_names else -1 if idx >= 0 and patient.medications[idx].dose_bucket in {DoseBucket.LOW, DoseBucket.HOLD}: violations.append("dose_already_minimized") if action.action_type == ActionType.INCREASE_DOSE_BUCKET and action.target_drug: idx = med_names.index(action.target_drug) if action.target_drug in med_names else -1 if idx >= 0 and patient.medications[idx].dose_bucket == DoseBucket.HIGH: violations.append("dose_overshoot_risk") if patient.frailty_score > 0.7 and DRUG_CLASSES.get(action.target_drug) == "sedative": violations.append("invalid_class_escalation") # Duplicate therapy check after substitutions/increases. if action.action_type in { ActionType.SUBSTITUTE_WITHIN_CLASS, ActionType.RECOMMEND_ALTERNATIVE, ActionType.INCREASE_DOSE_BUCKET, } and has_duplicate_therapy(patient.medications, action.target_drug, action.replacement_drug): violations.append("duplicate_therapy") if action.replacement_drug and action.target_drug and is_contraindicated_pair(action.target_drug, action.replacement_drug): violations.append("unsafe_substitution_contraindication") if action.replacement_drug: contraindication_hits = 0 for med in patient.medications: if med.drug != action.target_drug and is_contraindicated_pair(med.drug, action.replacement_drug): violations.append("creates_severe_contraindicated_pair") contraindication_hits += 1 if contraindication_hits >= 2: violations.append("dangerous_triple_risk_creation") if action.action_type in {ActionType.ORDER_MONITORING_AND_WAIT, ActionType.DOSE_HOLD} and not action.monitoring_plan: violations.append("invalid_monitoring_gap") if ( action.action_type == ActionType.STOP_DRUG and action.target_drug == "warfarin_like" and "afib" in patient.comorbidities and not action.replacement_drug ): violations.append("destabilizing_deprescribing") legal = len(violations) == 0 severity = "none" if legal else ("high" if len(violations) > 1 else "medium") fallback = ( ActionType.REQUEST_SPECIALIST_REVIEW if not legal else ActionType.KEEP_REGIMEN ) return SafetyReport( legal=legal, violations=violations, severity=severity, recommended_fallback=fallback, uncertainty_notes=["manual_review_recommended"] if not legal else [], )