| """Ingest National Joint Council directives (HTML) into section-level chunks. |
| |
| NJC directives are negotiated by employer and bargaining-agent representatives; |
| their provisions form part of collective agreements (and the rate tables in |
| their appendices apply too). Chunks are tagged doc_type="directive". |
| """ |
| import json |
| import re |
| import subprocess |
| import sys |
| import time |
|
|
| from bs4 import BeautifulSoup |
|
|
| from .config import RAW_DIR, PROCESSED_DIR |
|
|
| INDEX_URL = "https://www.njc-cnm.gc.ca/directive/en" |
| BASE = "https://www.njc-cnm.gc.ca" |
| DIRECTIVE_DIR = RAW_DIR / "directives" |
| OUT_FILE = PROCESSED_DIR / "directives.json" |
| _UA = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " |
| "(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36") |
| _CONTENT_TAGS = {"p", "dl", "table", "h4", "h5", "h6", "blockquote"} |
| _NUMBERED = re.compile(r"^(\d+(?:\.\d+)*)\s+(.+)") |
| _CODE = re.compile(r"/directive/([^/]+)/") |
|
|
|
|
| def _norm(text): |
| return re.sub(r"\s+", " ", text or "").strip() |
|
|
|
|
| def _fetch(url, dest, force=False): |
| """Fetch via PowerShell's (.NET) HTTP stack -- some government sites block |
| Python's HTTP client at the TLS layer.""" |
| if dest.exists() and not force: |
| return dest.read_bytes() |
| dest.parent.mkdir(parents=True, exist_ok=True) |
| command = (f"Invoke-WebRequest -Uri '{url}' -OutFile '{dest}' " |
| f"-UseBasicParsing -UserAgent '{_UA}'") |
| subprocess.run(["powershell", "-NoProfile", "-NonInteractive", "-Command", command], |
| check=True, capture_output=True, timeout=180) |
| time.sleep(0.5) |
| return dest.read_bytes() |
|
|
|
|
| def directive_links(force=False): |
| """Return [(url, title, date), ...] for the current NJC directives.""" |
| html = _fetch(INDEX_URL, DIRECTIVE_DIR / "_index.html", force=force) |
| soup = BeautifulSoup(html, "html.parser") |
| ul = soup.find("ul", class_="directive-list") |
| if ul is None: |
| return [] |
| out = [] |
| for li in ul.find_all("li", recursive=False): |
| a = li.find("a", href=True) |
| if not a: |
| continue |
| url = a["href"] if a["href"].startswith("http") else BASE + a["href"] |
| span = li.find("span", class_="date") |
| out.append((url, _norm(a.get_text(" ", strip=True)), |
| _norm(span.get_text()) if span else "")) |
| return out |
|
|
|
|
| def _block_text(heading): |
| """Readable text from a heading up to the next h2/h3 (sections unwrapped).""" |
| lines = [] |
| for sib in heading.find_next_siblings(): |
| if sib.name in ("h2", "h3"): |
| break |
| if sib.name in ("ul", "ol"): |
| for li in sib.find_all("li", recursive=False): |
| item = _norm(li.get_text(" ", strip=True)) |
| if item: |
| lines.append(f"- {item}") |
| elif sib.name in _CONTENT_TAGS: |
| text = _norm(sib.get_text(" ", strip=True)) |
| if text: |
| lines.append(text) |
| return "\n".join(lines) |
|
|
|
|
| def parse_directive(html, url, title, date): |
| """Parse one NJC directive page into one chunk per h2/h3 section.""" |
| soup = BeautifulSoup(html, "html.parser") |
| main = soup.find("main") |
| if main is None: |
| return [] |
| for tag in main.find_all(["section", "div"]): |
| tag.unwrap() |
|
|
| code_match = _CODE.search(url) |
| code = code_match.group(1) if code_match else url |
| chunks = [] |
| current_part = "" |
| for h in main.find_all(["h2", "h3"]): |
| text = _norm(h.get_text(" ", strip=True)) |
| if not text: |
| continue |
| if h.name == "h2": |
| current_part = text |
| body = _block_text(h) |
| if not body: |
| continue |
| numbered = _NUMBERED.match(text) |
| if numbered: |
| number, note = numbered.group(1), numbered.group(2).strip() |
| citation = f"{title}, s. {number}" |
| else: |
| |
| |
| number, note = "", text |
| citation = title |
| chunks.append({ |
| "id": f"directive-{code}-{len(chunks) + 1}", |
| "doc_type": "directive", |
| "act_code": code, |
| "act_short": title, |
| "act_name": f"NJC {title}", |
| "section": number or text, |
| "marginal_note": note, |
| "part": current_part if h.name == "h3" else "", |
| "division": "", |
| "heading": "", |
| "text": body, |
| "history": "", |
| "last_amended": "", |
| "current_to": date, |
| "citation": citation, |
| "source_url": url, |
| }) |
| return chunks |
|
|
|
|
| def _print_link(html): |
| """Find a 'Print Full Directive' / 'Print all FSDs' link on a TOC page.""" |
| soup = BeautifulSoup(html, "html.parser") |
| for a in soup.find_all("a", href=True): |
| text = _norm(a.get_text(" ", strip=True)).lower() |
| if "print full directive" in text or "print all fsd" in text: |
| return a["href"] if a["href"].startswith("http") else BASE + a["href"] |
| return None |
|
|
|
|
| def main(): |
| force = "--force" in sys.argv |
| limit = next((int(a.split("=", 1)[1]) for a in sys.argv[1:] |
| if a.startswith("--limit=")), None) |
| directives = directive_links(force=force) |
| if limit: |
| directives = directives[:limit] |
| print(f"Ingesting {len(directives)} NJC directives...") |
| all_chunks, failures = [], [] |
| for url, title, date in directives: |
| code_match = _CODE.search(url) |
| code = code_match.group(1) if code_match else "x" |
| try: |
| html = _fetch(url, DIRECTIVE_DIR / f"{code}.html", force=force) |
| chunks = parse_directive(html, url, title, date) |
| if not chunks: |
| |
| |
| print_url = _print_link(html) |
| if print_url: |
| full = _fetch(print_url, DIRECTIVE_DIR / f"{code}-full.html", |
| force=force) |
| chunks = parse_directive(full, url, title, date) |
| if chunks: |
| all_chunks.extend(chunks) |
| print(f" {title}: {len(chunks)} chunks") |
| else: |
| failures.append((title, "no content parsed")) |
| except Exception as exc: |
| failures.append((title, f"{type(exc).__name__}: {exc}")) |
| PROCESSED_DIR.mkdir(parents=True, exist_ok=True) |
| OUT_FILE.write_text(json.dumps(all_chunks, ensure_ascii=False, indent=2), |
| encoding="utf-8") |
| print(f"\n{len(all_chunks)} chunks from {len(directives) - len(failures)} " |
| f"directives -> {OUT_FILE.name}") |
| for title, why in failures: |
| print(f" FAILED {title}: {why}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|