| """Ingest Justice Laws XML into structured, section-level JSON chunks.""" |
| import json |
| import re |
| import sys |
| import time |
| import urllib.request |
| import xml.etree.ElementTree as ET |
|
|
| from .config import SOURCES, RAW_DIR, PROCESSED_DIR |
|
|
| LIMS = "{http://justice.gc.ca/lims}" |
| BLOCK_TAGS = {"Subsection", "Paragraph", "Subparagraph", "Clause", "Definition"} |
|
|
|
|
| def _norm(text): |
| """Collapse all whitespace, including en-spaces and NBSP, to single spaces.""" |
| return re.sub(r"\s+", " ", text or "").strip() |
|
|
|
|
| def _itertext(el): |
| return _norm("".join(el.itertext())) |
|
|
|
|
| def fetch_xml(code, force=False): |
| src = SOURCES[code] |
| RAW_DIR.mkdir(parents=True, exist_ok=True) |
| dest = RAW_DIR / f"{code}.xml" |
| if dest.exists() and not force: |
| return dest |
| print(f" downloading {src['xml_url']}") |
| req = urllib.request.Request(src["xml_url"], headers={"User-Agent": "CanLex/0.1"}) |
| with urllib.request.urlopen(req, timeout=120) as resp: |
| dest.write_bytes(resp.read()) |
| time.sleep(1.0) |
| return dest |
|
|
|
|
| def _heading_text(h): |
| """Render a <Heading> as 'LABEL - TITLE' when both parts are present.""" |
| label, title = h.find("Label"), h.find("TitleText") |
| if label is not None or title is not None: |
| parts = [_itertext(e) for e in (label, title) if e is not None] |
| return " - ".join(p for p in parts if p) |
| return _itertext(h) |
|
|
|
|
| def _render_block(el, is_section=False): |
| """Recursively render a provision element into readable, structured text.""" |
| label = note = "" |
| inline, blocks = [], [] |
| for child in el: |
| tag = child.tag |
| if tag == "Label": |
| label = _itertext(child) |
| elif tag == "MarginalNote": |
| if not is_section: |
| note = _itertext(child) |
| elif tag == "Text": |
| inline.append(_itertext(child)) |
| elif tag == "HistoricalNote": |
| continue |
| elif tag in BLOCK_TAGS: |
| blocks.append(_render_block(child)) |
| else: |
| nested = _render_block(child) |
| if nested: |
| blocks.append(nested) |
| head = " ".join(p for p in (label, f"[{note}]" if note else "", " ".join(inline)) if p) |
| if blocks: |
| return (head + "\n" if head else "") + "\n".join(b for b in blocks if b) |
| return head |
|
|
|
|
| def _history(section): |
| note = section.find("HistoricalNote") |
| if note is None: |
| return "" |
| return "; ".join(_itertext(i) for i in note.iter("HistoricalNoteSubItem")) |
|
|
|
|
| def parse_legislation(xml_path, code): |
| src = SOURCES[code] |
| data = xml_path.read_bytes() |
| if data[:3] == b"\xef\xbb\xbf": |
| data = data[3:] |
| root = ET.fromstring(data) |
| current_to = root.get(f"{LIMS}current-date", "") |
| body = root.find("Body") |
| if body is None: |
| raise ValueError(f"{code}: no <Body> element (root <{root.tag}>)") |
|
|
| headings, chunks = {}, [] |
| for el in body: |
| if el.tag == "Heading": |
| try: |
| level = int(el.get("level", "0")) |
| except ValueError: |
| level = 0 |
| headings = {k: v for k, v in headings.items() if k < level} |
| headings[level] = _heading_text(el) |
| continue |
| if el.tag != "Section": |
| continue |
|
|
| label_el = el.find("Label") |
| number = _itertext(label_el) if label_el is not None else "" |
| note_el = el.find("MarginalNote") |
| marginal = _itertext(note_el) if note_el is not None else "" |
| body_text = _render_block(el, is_section=True) |
|
|
| if number: |
| chunk_id = f"{code}-s{number}" |
| citation = f"{src['short']}, s. {number}" |
| source_url = f"{src['web_base']}section-{number}.html" |
| else: |
| |
| |
| |
| if not body_text: |
| continue |
| number = marginal or "Preamble" |
| marginal = marginal or "Preamble" |
| lims_id = el.get(f"{LIMS}id") or str(len(chunks)) |
| chunk_id = f"{code}-pre-{lims_id}" |
| citation = f"{src['short']}, {number}" |
| source_url = src["web_base"] |
|
|
| part = next((v for _, v in sorted(headings.items()) |
| if v.upper().startswith("PART")), "") |
| division = next((v for _, v in sorted(headings.items()) |
| if v.upper().startswith("DIVISION")), "") |
| nearest = headings[max(headings)] if headings else "" |
|
|
| chunks.append({ |
| "id": chunk_id, |
| "act_code": code, |
| "act_short": src["short"], |
| "act_name": src["name"], |
| "section": number, |
| "marginal_note": marginal, |
| "part": part, |
| "division": division, |
| "heading": nearest, |
| "text": body_text, |
| "history": _history(el), |
| "last_amended": el.get(f"{LIMS}lastAmendedDate", ""), |
| "current_to": current_to, |
| "citation": citation, |
| "source_url": source_url, |
| }) |
|
|
| return chunks |
|
|
|
|
| def ingest(code, force=False): |
| print(f"Ingesting {code} ({SOURCES[code]['short']})...") |
| xml_path = fetch_xml(code, force=force) |
| chunks = parse_legislation(xml_path, code) |
| PROCESSED_DIR.mkdir(parents=True, exist_ok=True) |
| out = PROCESSED_DIR / f"{code}.json" |
| out.write_text(json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8") |
| print(f" {len(chunks)} sections -> {out.name}") |
| return chunks |
|
|
|
|
| def main(): |
| force = "--force" in sys.argv |
| only = [a for a in sys.argv[1:] if not a.startswith("-")] |
| codes = only or list(SOURCES) |
| failures = [] |
| for code in codes: |
| if code not in SOURCES: |
| print(f" SKIP {code}: not a known source") |
| continue |
| try: |
| ingest(code, force=force) |
| except Exception as exc: |
| failures.append((code, exc)) |
| print(f" FAILED {code}: {type(exc).__name__}: {exc}") |
| print(f"\nDone: {len(codes) - len(failures)}/{len(codes)} ingested.") |
| for code, exc in failures: |
| print(f" FAILED {code}: {exc}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|