File size: 6,453 Bytes
21626e7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 | """Ingest Justice Laws XML into structured, section-level JSON chunks."""
import json
import re
import sys
import time
import urllib.request
import xml.etree.ElementTree as ET
from .config import SOURCES, RAW_DIR, PROCESSED_DIR
LIMS = "{http://justice.gc.ca/lims}"
BLOCK_TAGS = {"Subsection", "Paragraph", "Subparagraph", "Clause", "Definition"}
def _norm(text):
"""Collapse all whitespace, including en-spaces and NBSP, to single spaces."""
return re.sub(r"\s+", " ", text or "").strip()
def _itertext(el):
return _norm("".join(el.itertext()))
def fetch_xml(code, force=False):
src = SOURCES[code]
RAW_DIR.mkdir(parents=True, exist_ok=True)
dest = RAW_DIR / f"{code}.xml"
if dest.exists() and not force:
return dest
print(f" downloading {src['xml_url']}")
req = urllib.request.Request(src["xml_url"], headers={"User-Agent": "CanLex/0.1"})
with urllib.request.urlopen(req, timeout=120) as resp:
dest.write_bytes(resp.read())
time.sleep(1.0) # be polite to the Justice Laws server between downloads
return dest
def _heading_text(h):
"""Render a <Heading> as 'LABEL - TITLE' when both parts are present."""
label, title = h.find("Label"), h.find("TitleText")
if label is not None or title is not None:
parts = [_itertext(e) for e in (label, title) if e is not None]
return " - ".join(p for p in parts if p)
return _itertext(h)
def _render_block(el, is_section=False):
"""Recursively render a provision element into readable, structured text."""
label = note = ""
inline, blocks = [], []
for child in el:
tag = child.tag
if tag == "Label":
label = _itertext(child)
elif tag == "MarginalNote":
if not is_section: # a section's own marginal note is its title
note = _itertext(child)
elif tag == "Text":
inline.append(_itertext(child))
elif tag == "HistoricalNote":
continue
elif tag in BLOCK_TAGS:
blocks.append(_render_block(child))
else:
nested = _render_block(child)
if nested:
blocks.append(nested)
head = " ".join(p for p in (label, f"[{note}]" if note else "", " ".join(inline)) if p)
if blocks:
return (head + "\n" if head else "") + "\n".join(b for b in blocks if b)
return head
def _history(section):
note = section.find("HistoricalNote")
if note is None:
return ""
return "; ".join(_itertext(i) for i in note.iter("HistoricalNoteSubItem"))
def parse_legislation(xml_path, code):
src = SOURCES[code]
data = xml_path.read_bytes()
if data[:3] == b"\xef\xbb\xbf": # some Justice Laws XML files carry a UTF-8 BOM
data = data[3:]
root = ET.fromstring(data)
current_to = root.get(f"{LIMS}current-date", "")
body = root.find("Body")
if body is None:
raise ValueError(f"{code}: no <Body> element (root <{root.tag}>)")
headings, chunks = {}, []
for el in body:
if el.tag == "Heading":
try:
level = int(el.get("level", "0"))
except ValueError:
level = 0
headings = {k: v for k, v in headings.items() if k < level}
headings[level] = _heading_text(el)
continue
if el.tag != "Section":
continue
label_el = el.find("Label")
number = _itertext(label_el) if label_el is not None else ""
note_el = el.find("MarginalNote")
marginal = _itertext(note_el) if note_el is not None else ""
body_text = _render_block(el, is_section=True)
if number:
chunk_id = f"{code}-s{number}"
citation = f"{src['short']}, s. {number}"
source_url = f"{src['web_base']}section-{number}.html"
else:
# An unnumbered <Section> is a preamble or enacting recital. Keep it
# unless it has no text; the LIMS id keeps the chunk id unique when
# an Act contains more than one such block.
if not body_text:
continue
number = marginal or "Preamble"
marginal = marginal or "Preamble"
lims_id = el.get(f"{LIMS}id") or str(len(chunks))
chunk_id = f"{code}-pre-{lims_id}"
citation = f"{src['short']}, {number}"
source_url = src["web_base"]
part = next((v for _, v in sorted(headings.items())
if v.upper().startswith("PART")), "")
division = next((v for _, v in sorted(headings.items())
if v.upper().startswith("DIVISION")), "")
nearest = headings[max(headings)] if headings else ""
chunks.append({
"id": chunk_id,
"act_code": code,
"act_short": src["short"],
"act_name": src["name"],
"section": number,
"marginal_note": marginal,
"part": part,
"division": division,
"heading": nearest,
"text": body_text,
"history": _history(el),
"last_amended": el.get(f"{LIMS}lastAmendedDate", ""),
"current_to": current_to,
"citation": citation,
"source_url": source_url,
})
return chunks
def ingest(code, force=False):
print(f"Ingesting {code} ({SOURCES[code]['short']})...")
xml_path = fetch_xml(code, force=force)
chunks = parse_legislation(xml_path, code)
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
out = PROCESSED_DIR / f"{code}.json"
out.write_text(json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8")
print(f" {len(chunks)} sections -> {out.name}")
return chunks
def main():
force = "--force" in sys.argv
only = [a for a in sys.argv[1:] if not a.startswith("-")]
codes = only or list(SOURCES)
failures = []
for code in codes:
if code not in SOURCES:
print(f" SKIP {code}: not a known source")
continue
try:
ingest(code, force=force)
except Exception as exc:
failures.append((code, exc))
print(f" FAILED {code}: {type(exc).__name__}: {exc}")
print(f"\nDone: {len(codes) - len(failures)}/{len(codes)} ingested.")
for code, exc in failures:
print(f" FAILED {code}: {exc}")
if __name__ == "__main__":
main()
|