Boka73's picture
Deploy Gradio app
dd6303a verified
"""
SOR Parser
----------
Parses BWDB/LGED Schedule of Rates PDFs into SORItem lists.
This version avoids pdfplumber table parsing for BWDB SOR because large scanned
rate tables can be slow or inconsistent. PyMuPDF text extraction is faster and
stable enough for item-code/rate matching.
"""
import json
import pathlib
import re
from typing import Dict, List, Optional
from .sor_models import SORItem
_CACHE_DIR = pathlib.Path(__file__).parent / "_cache"
_CODE_RE = re.compile(r"^\d{1,3}-\d{1,3}(?:-\d{1,3})?$")
_RATE_RE = re.compile(r"^\d{1,3}(?:,\d{3})*(?:\.\d+)?$|^\d+(?:\.\d+)?$")
_ZONE_RE = re.compile(r"Zone\s*[-]?\s*([ABCD])", re.IGNORECASE)
_UNIT_WORDS = {
"sqm", "cum", "cum/km", "m", "no", "nos", "each", "kg", "km", "pmt",
"pspc", "pltcum", "hr", "day", "job", "ltr", "pcs", "section"
}
def _cache_path(label: str) -> pathlib.Path:
_CACHE_DIR.mkdir(exist_ok=True)
return _CACHE_DIR / f"{label}.json"
def _load_cache(label: str) -> Optional[List[dict]]:
p = _cache_path(label)
if not p.exists():
return None
try:
data = json.loads(p.read_text(encoding="utf-8"))
return data if data else None
except json.JSONDecodeError:
return None
def _save_cache(label: str, data: List[dict]):
_cache_path(label).write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def parse_bwdb_sor(pdf_path: str) -> List[SORItem]:
"""Parse BWDB SOR PDF and return list of SORItem."""
cache = _load_cache("bwdb_sor")
if cache:
return [SORItem(**d) for d in cache]
items = _parse_bwdb_with_pymupdf(pdf_path)
if not items:
items = _parse_bwdb_with_pdfplumber(pdf_path)
unique = _dedupe(items)
_save_cache("bwdb_sor", [i.as_dict() for i in unique])
print(f" [SOR] BWDB: parsed {len(unique)} items")
return unique
def parse_lged_sor(pdf_path: str) -> List[SORItem]:
"""Parse LGED SOR PDF. Kept simple; current project mainly uses BWDB."""
cache = _load_cache("lged_sor")
if cache:
return [SORItem(**d) for d in cache]
return []
def build_sor_lookup(items: List[SORItem]) -> Dict[str, SORItem]:
"""Build a dict with exact and relaxed keys for item-code lookup."""
lookup = {}
for item in items:
for key in _code_keys(item.item_code):
lookup.setdefault(key, item)
return lookup
def _parse_bwdb_with_pymupdf(pdf_path: str) -> List[SORItem]:
try:
import fitz
except ImportError:
return []
items: List[SORItem] = []
doc = fitz.open(pdf_path)
for page_index in range(len(doc)):
text = doc[page_index].get_text() or ""
lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
zone_order = _detect_zone_order(lines)
chapter = _detect_chapter(lines)
code_indexes = [idx for idx, value in enumerate(lines) if _CODE_RE.fullmatch(value)]
for pos, i in enumerate(code_indexes):
line = lines[i]
code = _normalise_sor_code(line)
prev_i = code_indexes[pos - 1] if pos > 0 else max(0, i - 45)
next_i = code_indexes[pos + 1] if pos + 1 < len(code_indexes) else min(len(lines), i + 45)
after_segment = lines[i:next_i]
before_segment = lines[prev_i:i + 1]
# BWDB SOR pages are inconsistent: some rows put item code before
# description/rates, others put the item code after the rates.
segment = after_segment if _count_rates(after_segment) >= 4 else before_segment
rates = [_to_float(x) for x in segment if _is_rate(x)]
if len(rates) < 4:
continue
rates = rates[-4:] if segment is before_segment else rates[:4]
unit = _find_unit(segment)
desc = _find_description_from_segment(segment, line)
zone_rates = _assign_zone_rates(zone_order, rates)
items.append(SORItem(
item_code=code,
description=desc,
unit=unit,
zone_a=zone_rates.get("A", 0.0),
zone_b=zone_rates.get("B", 0.0),
zone_c=zone_rates.get("C", 0.0),
zone_d=zone_rates.get("D", 0.0),
source="BWDB_2023",
chapter=chapter,
sl_no=str(len(items) + 1),
))
doc.close()
return items
def _parse_bwdb_with_pdfplumber(pdf_path: str) -> List[SORItem]:
try:
import pdfplumber
except ImportError:
return []
items = []
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
text = page.extract_text() or ""
lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
zone_order = _detect_zone_order(lines)
for i, line in enumerate(lines):
if not _CODE_RE.fullmatch(line):
continue
window = lines[i:i + 35]
rates = [_to_float(x) for x in window if _is_rate(x)]
if len(rates) < 4:
continue
zone_rates = _assign_zone_rates(zone_order, rates[:4])
items.append(SORItem(
item_code=_normalise_sor_code(line),
description=_find_description(lines, i),
unit=_find_unit(window),
zone_a=zone_rates.get("A", 0.0),
zone_b=zone_rates.get("B", 0.0),
zone_c=zone_rates.get("C", 0.0),
zone_d=zone_rates.get("D", 0.0),
source="BWDB_2023",
chapter=_detect_chapter(lines),
sl_no=str(len(items) + 1),
))
return items
def _detect_zone_order(lines: List[str]) -> List[str]:
head = "\n".join(lines[:60])
zones = [m.group(1).upper() for m in _ZONE_RE.finditer(head)]
zones = [z for z in zones if z in {"A", "B", "C", "D"}]
if len(zones) >= 4:
return zones[:4]
return ["A", "B", "C", "D"]
def _assign_zone_rates(zone_order: List[str], rates: List[float]) -> Dict[str, float]:
assigned = {}
for zone, rate in zip(zone_order, rates):
assigned[zone] = rate
return assigned
def _detect_chapter(lines: List[str]) -> str:
for line in lines[:80]:
if re.fullmatch(r"\d{2,3}\.\s+.+", line):
return line
return ""
def _find_unit(window: List[str]) -> str:
for value in window:
v = value.strip().lower()
if v in _UNIT_WORDS:
return v
return ""
def _find_description(lines: List[str], code_index: int) -> str:
return _find_description_from_segment(lines[max(0, code_index - 18):code_index + 1], lines[code_index])
def _find_description_from_segment(segment: List[str], code: str) -> str:
pieces = []
for line in segment:
low = line.lower()
if line == code or _is_rate(line) or _CODE_RE.fullmatch(line) or low in _UNIT_WORDS:
continue
if re.fullmatch(r"\d+|\d+\(\s*\d+\s*\)", line):
continue
if "bwdb standard" in low or "item rate" in low or "sl. no" in low:
continue
if "zone" in low or "note:" in low:
continue
pieces.append(line)
desc = " ".join(pieces[-12:]).strip()
return re.sub(r"\s+", " ", desc)[:250]
def _count_rates(values: List[str]) -> int:
return sum(1 for value in values if _is_rate(value))
def _is_rate(value: str) -> bool:
return bool(_RATE_RE.fullmatch(str(value).replace(" ", ""))) and _to_float(value) > 0
def _to_float(value) -> float:
try:
return float(str(value).replace(",", "").strip())
except (ValueError, TypeError):
return 0.0
def _normalise_sor_code(code: str) -> str:
parts = [p for p in code.replace(" ", "").split("-") if p]
if not parts:
return code
parts[0] = parts[0].zfill(2)
if len(parts) == 2:
parts.append("00")
return "-".join(parts)
def _code_keys(code: str) -> List[str]:
norm = _normalise_sor_code(code).lower()
parts = norm.split("-")
keys = {norm, norm.replace("-", "")}
if len(parts) == 3 and parts[2] == "00":
short = "-".join(parts[:2])
keys.add(short)
keys.add(short.replace("-", ""))
if parts and parts[0].startswith("0"):
no_zero = "-".join([parts[0].lstrip("0") or "0"] + parts[1:])
keys.add(no_zero)
keys.add(no_zero.replace("-", ""))
if len(parts) == 3 and parts[2] == "00":
short_no_zero = "-".join([parts[0].lstrip("0") or "0"] + parts[1:2])
keys.add(short_no_zero)
keys.add(short_no_zero.replace("-", ""))
return list(keys)
def _dedupe(items: List[SORItem]) -> List[SORItem]:
seen = set()
unique = []
for item in items:
key = _normalise_sor_code(item.item_code)
if key in seen:
continue
seen.add(key)
unique.append(item)
return unique