CanLex / canlex /ingest.py
Beemer
CanLex MCP server
21626e7
"""Ingest Justice Laws XML into structured, section-level JSON chunks."""
import json
import re
import sys
import time
import urllib.request
import xml.etree.ElementTree as ET
from .config import SOURCES, RAW_DIR, PROCESSED_DIR
LIMS = "{http://justice.gc.ca/lims}"
BLOCK_TAGS = {"Subsection", "Paragraph", "Subparagraph", "Clause", "Definition"}
def _norm(text):
"""Collapse all whitespace, including en-spaces and NBSP, to single spaces."""
return re.sub(r"\s+", " ", text or "").strip()
def _itertext(el):
return _norm("".join(el.itertext()))
def fetch_xml(code, force=False):
src = SOURCES[code]
RAW_DIR.mkdir(parents=True, exist_ok=True)
dest = RAW_DIR / f"{code}.xml"
if dest.exists() and not force:
return dest
print(f" downloading {src['xml_url']}")
req = urllib.request.Request(src["xml_url"], headers={"User-Agent": "CanLex/0.1"})
with urllib.request.urlopen(req, timeout=120) as resp:
dest.write_bytes(resp.read())
time.sleep(1.0) # be polite to the Justice Laws server between downloads
return dest
def _heading_text(h):
"""Render a <Heading> as 'LABEL - TITLE' when both parts are present."""
label, title = h.find("Label"), h.find("TitleText")
if label is not None or title is not None:
parts = [_itertext(e) for e in (label, title) if e is not None]
return " - ".join(p for p in parts if p)
return _itertext(h)
def _render_block(el, is_section=False):
"""Recursively render a provision element into readable, structured text."""
label = note = ""
inline, blocks = [], []
for child in el:
tag = child.tag
if tag == "Label":
label = _itertext(child)
elif tag == "MarginalNote":
if not is_section: # a section's own marginal note is its title
note = _itertext(child)
elif tag == "Text":
inline.append(_itertext(child))
elif tag == "HistoricalNote":
continue
elif tag in BLOCK_TAGS:
blocks.append(_render_block(child))
else:
nested = _render_block(child)
if nested:
blocks.append(nested)
head = " ".join(p for p in (label, f"[{note}]" if note else "", " ".join(inline)) if p)
if blocks:
return (head + "\n" if head else "") + "\n".join(b for b in blocks if b)
return head
def _history(section):
note = section.find("HistoricalNote")
if note is None:
return ""
return "; ".join(_itertext(i) for i in note.iter("HistoricalNoteSubItem"))
def parse_legislation(xml_path, code):
src = SOURCES[code]
data = xml_path.read_bytes()
if data[:3] == b"\xef\xbb\xbf": # some Justice Laws XML files carry a UTF-8 BOM
data = data[3:]
root = ET.fromstring(data)
current_to = root.get(f"{LIMS}current-date", "")
body = root.find("Body")
if body is None:
raise ValueError(f"{code}: no <Body> element (root <{root.tag}>)")
headings, chunks = {}, []
for el in body:
if el.tag == "Heading":
try:
level = int(el.get("level", "0"))
except ValueError:
level = 0
headings = {k: v for k, v in headings.items() if k < level}
headings[level] = _heading_text(el)
continue
if el.tag != "Section":
continue
label_el = el.find("Label")
number = _itertext(label_el) if label_el is not None else ""
note_el = el.find("MarginalNote")
marginal = _itertext(note_el) if note_el is not None else ""
body_text = _render_block(el, is_section=True)
if number:
chunk_id = f"{code}-s{number}"
citation = f"{src['short']}, s. {number}"
source_url = f"{src['web_base']}section-{number}.html"
else:
# An unnumbered <Section> is a preamble or enacting recital. Keep it
# unless it has no text; the LIMS id keeps the chunk id unique when
# an Act contains more than one such block.
if not body_text:
continue
number = marginal or "Preamble"
marginal = marginal or "Preamble"
lims_id = el.get(f"{LIMS}id") or str(len(chunks))
chunk_id = f"{code}-pre-{lims_id}"
citation = f"{src['short']}, {number}"
source_url = src["web_base"]
part = next((v for _, v in sorted(headings.items())
if v.upper().startswith("PART")), "")
division = next((v for _, v in sorted(headings.items())
if v.upper().startswith("DIVISION")), "")
nearest = headings[max(headings)] if headings else ""
chunks.append({
"id": chunk_id,
"act_code": code,
"act_short": src["short"],
"act_name": src["name"],
"section": number,
"marginal_note": marginal,
"part": part,
"division": division,
"heading": nearest,
"text": body_text,
"history": _history(el),
"last_amended": el.get(f"{LIMS}lastAmendedDate", ""),
"current_to": current_to,
"citation": citation,
"source_url": source_url,
})
return chunks
def ingest(code, force=False):
print(f"Ingesting {code} ({SOURCES[code]['short']})...")
xml_path = fetch_xml(code, force=force)
chunks = parse_legislation(xml_path, code)
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
out = PROCESSED_DIR / f"{code}.json"
out.write_text(json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8")
print(f" {len(chunks)} sections -> {out.name}")
return chunks
def main():
force = "--force" in sys.argv
only = [a for a in sys.argv[1:] if not a.startswith("-")]
codes = only or list(SOURCES)
failures = []
for code in codes:
if code not in SOURCES:
print(f" SKIP {code}: not a known source")
continue
try:
ingest(code, force=force)
except Exception as exc:
failures.append((code, exc))
print(f" FAILED {code}: {type(exc).__name__}: {exc}")
print(f"\nDone: {len(codes) - len(failures)}/{len(codes)} ingested.")
for code, exc in failures:
print(f" FAILED {code}: {exc}")
if __name__ == "__main__":
main()