"""Ingest Justice Laws XML into structured, section-level JSON chunks.""" import json import re import sys import time import urllib.request import xml.etree.ElementTree as ET from .config import SOURCES, RAW_DIR, PROCESSED_DIR LIMS = "{http://justice.gc.ca/lims}" BLOCK_TAGS = {"Subsection", "Paragraph", "Subparagraph", "Clause", "Definition"} def _norm(text): """Collapse all whitespace, including en-spaces and NBSP, to single spaces.""" return re.sub(r"\s+", " ", text or "").strip() def _itertext(el): return _norm("".join(el.itertext())) def fetch_xml(code, force=False): src = SOURCES[code] RAW_DIR.mkdir(parents=True, exist_ok=True) dest = RAW_DIR / f"{code}.xml" if dest.exists() and not force: return dest print(f" downloading {src['xml_url']}") req = urllib.request.Request(src["xml_url"], headers={"User-Agent": "CanLex/0.1"}) with urllib.request.urlopen(req, timeout=120) as resp: dest.write_bytes(resp.read()) time.sleep(1.0) # be polite to the Justice Laws server between downloads return dest def _heading_text(h): """Render a as 'LABEL - TITLE' when both parts are present.""" label, title = h.find("Label"), h.find("TitleText") if label is not None or title is not None: parts = [_itertext(e) for e in (label, title) if e is not None] return " - ".join(p for p in parts if p) return _itertext(h) def _render_block(el, is_section=False): """Recursively render a provision element into readable, structured text.""" label = note = "" inline, blocks = [], [] for child in el: tag = child.tag if tag == "Label": label = _itertext(child) elif tag == "MarginalNote": if not is_section: # a section's own marginal note is its title note = _itertext(child) elif tag == "Text": inline.append(_itertext(child)) elif tag == "HistoricalNote": continue elif tag in BLOCK_TAGS: blocks.append(_render_block(child)) else: nested = _render_block(child) if nested: blocks.append(nested) head = " ".join(p for p in (label, f"[{note}]" if note else "", " ".join(inline)) if p) if blocks: return (head + "\n" if head else "") + "\n".join(b for b in blocks if b) return head def _history(section): note = section.find("HistoricalNote") if note is None: return "" return "; ".join(_itertext(i) for i in note.iter("HistoricalNoteSubItem")) def parse_legislation(xml_path, code): src = SOURCES[code] data = xml_path.read_bytes() if data[:3] == b"\xef\xbb\xbf": # some Justice Laws XML files carry a UTF-8 BOM data = data[3:] root = ET.fromstring(data) current_to = root.get(f"{LIMS}current-date", "") body = root.find("Body") if body is None: raise ValueError(f"{code}: no element (root <{root.tag}>)") headings, chunks = {}, [] for el in body: if el.tag == "Heading": try: level = int(el.get("level", "0")) except ValueError: level = 0 headings = {k: v for k, v in headings.items() if k < level} headings[level] = _heading_text(el) continue if el.tag != "Section": continue label_el = el.find("Label") number = _itertext(label_el) if label_el is not None else "" note_el = el.find("MarginalNote") marginal = _itertext(note_el) if note_el is not None else "" body_text = _render_block(el, is_section=True) if number: chunk_id = f"{code}-s{number}" citation = f"{src['short']}, s. {number}" source_url = f"{src['web_base']}section-{number}.html" else: # An unnumbered
is a preamble or enacting recital. Keep it # unless it has no text; the LIMS id keeps the chunk id unique when # an Act contains more than one such block. if not body_text: continue number = marginal or "Preamble" marginal = marginal or "Preamble" lims_id = el.get(f"{LIMS}id") or str(len(chunks)) chunk_id = f"{code}-pre-{lims_id}" citation = f"{src['short']}, {number}" source_url = src["web_base"] part = next((v for _, v in sorted(headings.items()) if v.upper().startswith("PART")), "") division = next((v for _, v in sorted(headings.items()) if v.upper().startswith("DIVISION")), "") nearest = headings[max(headings)] if headings else "" chunks.append({ "id": chunk_id, "act_code": code, "act_short": src["short"], "act_name": src["name"], "section": number, "marginal_note": marginal, "part": part, "division": division, "heading": nearest, "text": body_text, "history": _history(el), "last_amended": el.get(f"{LIMS}lastAmendedDate", ""), "current_to": current_to, "citation": citation, "source_url": source_url, }) return chunks def ingest(code, force=False): print(f"Ingesting {code} ({SOURCES[code]['short']})...") xml_path = fetch_xml(code, force=force) chunks = parse_legislation(xml_path, code) PROCESSED_DIR.mkdir(parents=True, exist_ok=True) out = PROCESSED_DIR / f"{code}.json" out.write_text(json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8") print(f" {len(chunks)} sections -> {out.name}") return chunks def main(): force = "--force" in sys.argv only = [a for a in sys.argv[1:] if not a.startswith("-")] codes = only or list(SOURCES) failures = [] for code in codes: if code not in SOURCES: print(f" SKIP {code}: not a known source") continue try: ingest(code, force=force) except Exception as exc: failures.append((code, exc)) print(f" FAILED {code}: {type(exc).__name__}: {exc}") print(f"\nDone: {len(codes) - len(failures)}/{len(codes)} ingested.") for code, exc in failures: print(f" FAILED {code}: {exc}") if __name__ == "__main__": main()