""" SOR Parser ---------- Parses BWDB/LGED Schedule of Rates PDFs into SORItem lists. This version avoids pdfplumber table parsing for BWDB SOR because large scanned rate tables can be slow or inconsistent. PyMuPDF text extraction is faster and stable enough for item-code/rate matching. """ import json import pathlib import re from typing import Dict, List, Optional from .sor_models import SORItem _CACHE_DIR = pathlib.Path(__file__).parent / "_cache" _CODE_RE = re.compile(r"^\d{1,3}-\d{1,3}(?:-\d{1,3})?$") _RATE_RE = re.compile(r"^\d{1,3}(?:,\d{3})*(?:\.\d+)?$|^\d+(?:\.\d+)?$") _ZONE_RE = re.compile(r"Zone\s*[-]?\s*([ABCD])", re.IGNORECASE) _UNIT_WORDS = { "sqm", "cum", "cum/km", "m", "no", "nos", "each", "kg", "km", "pmt", "pspc", "pltcum", "hr", "day", "job", "ltr", "pcs", "section" } def _cache_path(label: str) -> pathlib.Path: _CACHE_DIR.mkdir(exist_ok=True) return _CACHE_DIR / f"{label}.json" def _load_cache(label: str) -> Optional[List[dict]]: p = _cache_path(label) if not p.exists(): return None try: data = json.loads(p.read_text(encoding="utf-8")) return data if data else None except json.JSONDecodeError: return None def _save_cache(label: str, data: List[dict]): _cache_path(label).write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") def parse_bwdb_sor(pdf_path: str) -> List[SORItem]: """Parse BWDB SOR PDF and return list of SORItem.""" cache = _load_cache("bwdb_sor") if cache: return [SORItem(**d) for d in cache] items = _parse_bwdb_with_pymupdf(pdf_path) if not items: items = _parse_bwdb_with_pdfplumber(pdf_path) unique = _dedupe(items) _save_cache("bwdb_sor", [i.as_dict() for i in unique]) print(f" [SOR] BWDB: parsed {len(unique)} items") return unique def parse_lged_sor(pdf_path: str) -> List[SORItem]: """Parse LGED SOR PDF. Kept simple; current project mainly uses BWDB.""" cache = _load_cache("lged_sor") if cache: return [SORItem(**d) for d in cache] return [] def build_sor_lookup(items: List[SORItem]) -> Dict[str, SORItem]: """Build a dict with exact and relaxed keys for item-code lookup.""" lookup = {} for item in items: for key in _code_keys(item.item_code): lookup.setdefault(key, item) return lookup def _parse_bwdb_with_pymupdf(pdf_path: str) -> List[SORItem]: try: import fitz except ImportError: return [] items: List[SORItem] = [] doc = fitz.open(pdf_path) for page_index in range(len(doc)): text = doc[page_index].get_text() or "" lines = [ln.strip() for ln in text.splitlines() if ln.strip()] zone_order = _detect_zone_order(lines) chapter = _detect_chapter(lines) code_indexes = [idx for idx, value in enumerate(lines) if _CODE_RE.fullmatch(value)] for pos, i in enumerate(code_indexes): line = lines[i] code = _normalise_sor_code(line) prev_i = code_indexes[pos - 1] if pos > 0 else max(0, i - 45) next_i = code_indexes[pos + 1] if pos + 1 < len(code_indexes) else min(len(lines), i + 45) after_segment = lines[i:next_i] before_segment = lines[prev_i:i + 1] # BWDB SOR pages are inconsistent: some rows put item code before # description/rates, others put the item code after the rates. segment = after_segment if _count_rates(after_segment) >= 4 else before_segment rates = [_to_float(x) for x in segment if _is_rate(x)] if len(rates) < 4: continue rates = rates[-4:] if segment is before_segment else rates[:4] unit = _find_unit(segment) desc = _find_description_from_segment(segment, line) zone_rates = _assign_zone_rates(zone_order, rates) items.append(SORItem( item_code=code, description=desc, unit=unit, zone_a=zone_rates.get("A", 0.0), zone_b=zone_rates.get("B", 0.0), zone_c=zone_rates.get("C", 0.0), zone_d=zone_rates.get("D", 0.0), source="BWDB_2023", chapter=chapter, sl_no=str(len(items) + 1), )) doc.close() return items def _parse_bwdb_with_pdfplumber(pdf_path: str) -> List[SORItem]: try: import pdfplumber except ImportError: return [] items = [] with pdfplumber.open(pdf_path) as pdf: for page in pdf.pages: text = page.extract_text() or "" lines = [ln.strip() for ln in text.splitlines() if ln.strip()] zone_order = _detect_zone_order(lines) for i, line in enumerate(lines): if not _CODE_RE.fullmatch(line): continue window = lines[i:i + 35] rates = [_to_float(x) for x in window if _is_rate(x)] if len(rates) < 4: continue zone_rates = _assign_zone_rates(zone_order, rates[:4]) items.append(SORItem( item_code=_normalise_sor_code(line), description=_find_description(lines, i), unit=_find_unit(window), zone_a=zone_rates.get("A", 0.0), zone_b=zone_rates.get("B", 0.0), zone_c=zone_rates.get("C", 0.0), zone_d=zone_rates.get("D", 0.0), source="BWDB_2023", chapter=_detect_chapter(lines), sl_no=str(len(items) + 1), )) return items def _detect_zone_order(lines: List[str]) -> List[str]: head = "\n".join(lines[:60]) zones = [m.group(1).upper() for m in _ZONE_RE.finditer(head)] zones = [z for z in zones if z in {"A", "B", "C", "D"}] if len(zones) >= 4: return zones[:4] return ["A", "B", "C", "D"] def _assign_zone_rates(zone_order: List[str], rates: List[float]) -> Dict[str, float]: assigned = {} for zone, rate in zip(zone_order, rates): assigned[zone] = rate return assigned def _detect_chapter(lines: List[str]) -> str: for line in lines[:80]: if re.fullmatch(r"\d{2,3}\.\s+.+", line): return line return "" def _find_unit(window: List[str]) -> str: for value in window: v = value.strip().lower() if v in _UNIT_WORDS: return v return "" def _find_description(lines: List[str], code_index: int) -> str: return _find_description_from_segment(lines[max(0, code_index - 18):code_index + 1], lines[code_index]) def _find_description_from_segment(segment: List[str], code: str) -> str: pieces = [] for line in segment: low = line.lower() if line == code or _is_rate(line) or _CODE_RE.fullmatch(line) or low in _UNIT_WORDS: continue if re.fullmatch(r"\d+|\d+\(\s*\d+\s*\)", line): continue if "bwdb standard" in low or "item rate" in low or "sl. no" in low: continue if "zone" in low or "note:" in low: continue pieces.append(line) desc = " ".join(pieces[-12:]).strip() return re.sub(r"\s+", " ", desc)[:250] def _count_rates(values: List[str]) -> int: return sum(1 for value in values if _is_rate(value)) def _is_rate(value: str) -> bool: return bool(_RATE_RE.fullmatch(str(value).replace(" ", ""))) and _to_float(value) > 0 def _to_float(value) -> float: try: return float(str(value).replace(",", "").strip()) except (ValueError, TypeError): return 0.0 def _normalise_sor_code(code: str) -> str: parts = [p for p in code.replace(" ", "").split("-") if p] if not parts: return code parts[0] = parts[0].zfill(2) if len(parts) == 2: parts.append("00") return "-".join(parts) def _code_keys(code: str) -> List[str]: norm = _normalise_sor_code(code).lower() parts = norm.split("-") keys = {norm, norm.replace("-", "")} if len(parts) == 3 and parts[2] == "00": short = "-".join(parts[:2]) keys.add(short) keys.add(short.replace("-", "")) if parts and parts[0].startswith("0"): no_zero = "-".join([parts[0].lstrip("0") or "0"] + parts[1:]) keys.add(no_zero) keys.add(no_zero.replace("-", "")) if len(parts) == 3 and parts[2] == "00": short_no_zero = "-".join([parts[0].lstrip("0") or "0"] + parts[1:2]) keys.add(short_no_zero) keys.add(short_no_zero.replace("-", "")) return list(keys) def _dedupe(items: List[SORItem]) -> List[SORItem]: seen = set() unique = [] for item in items: key = _normalise_sor_code(item.item_code) if key in seen: continue seen.add(key) unique.append(item) return unique