| """Parser for structured policy decisions. |
| |
| Provides strict XML parsing, soft parsing fallback, and repair-backed parsing. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import xml.etree.ElementTree as ET |
|
|
| from app.common.exceptions import ParserError |
| from app.models.policy.output_schema import DecisionSchema |
| from app.models.policy.repair import repair_partial_decision |
|
|
| _REQUIRED_XML_FIELDS = { |
| "mode", |
| "action_type", |
| "target_drug", |
| "replacement_drug", |
| "dose_bucket", |
| "taper_days", |
| "candidate_id", |
| "confidence", |
| } |
|
|
| _ALLOWED_XML_FIELDS = _REQUIRED_XML_FIELDS | {"monitoring_plan", "rationale_brief"} |
|
|
|
|
| def _normalize_scalar(value: object) -> object: |
| if value is None: |
| return None |
| if isinstance(value, str) and value.strip().lower() in {"", "none", "null", "na"}: |
| return None |
| return value |
|
|
|
|
| def _coerce_payload_types(payload: dict[str, object]) -> dict[str, object]: |
| coerced = dict(payload) |
| for key in ["target_drug", "replacement_drug", "monitoring_plan", "taper_days", "rationale_brief"]: |
| coerced[key] = _normalize_scalar(coerced.get(key)) |
| if coerced.get("taper_days") is not None: |
| coerced["taper_days"] = int(coerced["taper_days"]) |
| if "confidence" in coerced: |
| coerced["confidence"] = float(coerced["confidence"]) |
| return coerced |
|
|
|
|
| def parse_decision_strict_xml(raw: str) -> DecisionSchema: |
| """Strictly parse the required XML decision schema.""" |
| try: |
| root = ET.fromstring(raw.strip()) |
| except Exception as exc: |
| raise ParserError(f"Invalid XML: {exc}") from exc |
| if root.tag != "decision": |
| raise ParserError("XML decision root must be <decision>.") |
|
|
| payload: dict[str, object] = {} |
| for child in root: |
| if child.tag not in _ALLOWED_XML_FIELDS: |
| raise ParserError(f"Unknown XML field: {child.tag}") |
| payload[child.tag] = child.text |
|
|
| missing = sorted(_REQUIRED_XML_FIELDS - payload.keys()) |
| if missing: |
| raise ParserError(f"Missing required XML fields: {missing}") |
|
|
| return DecisionSchema.model_validate(_coerce_payload_types(payload)) |
|
|
|
|
| def parse_decision_soft(raw: str) -> DecisionSchema: |
| """Best-effort parsing across XML or JSON decision payloads.""" |
| stripped = raw.strip() |
| if not stripped: |
| raise ParserError("Empty decision payload.") |
|
|
| if "<decision" in stripped: |
| |
| try: |
| return parse_decision_strict_xml(stripped) |
| except ParserError: |
| |
| start = stripped.find("<decision") |
| end = stripped.rfind("</decision>") |
| if start >= 0 and end > start: |
| end += len("</decision>") |
| return parse_decision_strict_xml(stripped[start:end]) |
| raise |
|
|
| try: |
| payload = json.loads(stripped) |
| except Exception as exc: |
| raise ParserError(f"Unable to parse JSON decision: {exc}") from exc |
| return DecisionSchema.model_validate(_coerce_payload_types(payload)) |
|
|
|
|
| def parse_decision_with_repair(raw: str) -> DecisionSchema: |
| """Soft-parse and deterministically repair malformed decision payloads.""" |
| try: |
| parsed = parse_decision_soft(raw) |
| repaired = repair_partial_decision(parsed.model_dump(mode="json")) |
| return DecisionSchema.model_validate(repaired) |
| except Exception: |
| stripped = raw.strip() |
| payload: dict[str, object] = {} |
| if stripped.startswith("{"): |
| try: |
| payload = json.loads(stripped) |
| except Exception: |
| payload = {} |
| repaired = repair_partial_decision(payload) |
| try: |
| return DecisionSchema.model_validate(repaired) |
| except Exception as exc: |
| raise ParserError(f"Unable to parse decision after repair: {exc}") from exc |
|
|
|
|
| def parse_decision(raw: str) -> DecisionSchema: |
| """Primary parse entrypoint (strict XML -> soft JSON/XML recovery).""" |
| if raw.strip().startswith("<decision"): |
| return parse_decision_strict_xml(raw) |
| return parse_decision_soft(raw) |
|
|