Spaces:
Running
Running
| """Parser for structured policy decisions. | |
| Provides strict XML parsing, soft parsing fallback, and repair-backed parsing. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import xml.etree.ElementTree as ET | |
| from app.common.exceptions import ParserError | |
| from app.models.policy.output_schema import DecisionSchema | |
| from app.models.policy.repair import repair_partial_decision | |
| _REQUIRED_XML_FIELDS = { | |
| "mode", | |
| "action_type", | |
| "target_drug", | |
| "replacement_drug", | |
| "dose_bucket", | |
| "taper_days", | |
| "candidate_id", | |
| "confidence", | |
| } | |
| _ALLOWED_XML_FIELDS = _REQUIRED_XML_FIELDS | {"monitoring_plan", "rationale_brief"} | |
| def _normalize_scalar(value: object) -> object: | |
| if value is None: | |
| return None | |
| if isinstance(value, str) and value.strip().lower() in {"", "none", "null", "na"}: | |
| return None | |
| return value | |
| def _coerce_payload_types(payload: dict[str, object]) -> dict[str, object]: | |
| coerced = dict(payload) | |
| for key in ["target_drug", "replacement_drug", "monitoring_plan", "taper_days", "rationale_brief"]: | |
| coerced[key] = _normalize_scalar(coerced.get(key)) | |
| if coerced.get("taper_days") is not None: | |
| coerced["taper_days"] = int(coerced["taper_days"]) | |
| if "confidence" in coerced: | |
| coerced["confidence"] = float(coerced["confidence"]) | |
| return coerced | |
| def parse_decision_strict_xml(raw: str) -> DecisionSchema: | |
| """Strictly parse the required XML decision schema.""" | |
| try: | |
| root = ET.fromstring(raw.strip()) | |
| except Exception as exc: # noqa: BLE001 | |
| raise ParserError(f"Invalid XML: {exc}") from exc | |
| if root.tag != "decision": | |
| raise ParserError("XML decision root must be <decision>.") | |
| payload: dict[str, object] = {} | |
| for child in root: | |
| if child.tag not in _ALLOWED_XML_FIELDS: | |
| raise ParserError(f"Unknown XML field: {child.tag}") | |
| payload[child.tag] = child.text | |
| missing = sorted(_REQUIRED_XML_FIELDS - payload.keys()) | |
| if missing: | |
| raise ParserError(f"Missing required XML fields: {missing}") | |
| return DecisionSchema.model_validate(_coerce_payload_types(payload)) | |
| def parse_decision_soft(raw: str) -> DecisionSchema: | |
| """Best-effort parsing across XML or JSON decision payloads.""" | |
| stripped = raw.strip() | |
| if not stripped: | |
| raise ParserError("Empty decision payload.") | |
| if "<decision" in stripped: | |
| # Try exact parse first. | |
| try: | |
| return parse_decision_strict_xml(stripped) | |
| except ParserError: | |
| # Attempt to recover a wrapped/trailing buffer. | |
| start = stripped.find("<decision") | |
| end = stripped.rfind("</decision>") | |
| if start >= 0 and end > start: | |
| end += len("</decision>") | |
| return parse_decision_strict_xml(stripped[start:end]) | |
| raise | |
| try: | |
| payload = json.loads(stripped) | |
| except Exception as exc: # noqa: BLE001 | |
| raise ParserError(f"Unable to parse JSON decision: {exc}") from exc | |
| return DecisionSchema.model_validate(_coerce_payload_types(payload)) | |
| def parse_decision_with_repair(raw: str) -> DecisionSchema: | |
| """Soft-parse and deterministically repair malformed decision payloads.""" | |
| try: | |
| parsed = parse_decision_soft(raw) | |
| repaired = repair_partial_decision(parsed.model_dump(mode="json")) | |
| return DecisionSchema.model_validate(repaired) | |
| except Exception: | |
| stripped = raw.strip() | |
| payload: dict[str, object] = {} | |
| if stripped.startswith("{"): | |
| try: | |
| payload = json.loads(stripped) | |
| except Exception: # noqa: BLE001 | |
| payload = {} | |
| repaired = repair_partial_decision(payload) | |
| try: | |
| return DecisionSchema.model_validate(repaired) | |
| except Exception as exc: # noqa: BLE001 | |
| raise ParserError(f"Unable to parse decision after repair: {exc}") from exc | |
| def parse_decision(raw: str) -> DecisionSchema: | |
| """Primary parse entrypoint (strict XML -> soft JSON/XML recovery).""" | |
| if raw.strip().startswith("<decision"): | |
| return parse_decision_strict_xml(raw) | |
| return parse_decision_soft(raw) | |