| """Ingest CBSA D-Memoranda (HTML) into structured, section-level chunks. |
| |
| D-Memoranda are CBSA's administrative guidance on how it applies the Customs Act |
| and related law. They are persuasive, not binding -- every chunk is tagged |
| doc_type="memorandum" so the rest of CanLex can keep them distinct from statute. |
| """ |
| import io |
| import json |
| import re |
| import sys |
| import time |
| import urllib.request |
| from urllib.parse import urljoin |
|
|
| from bs4 import BeautifulSoup |
| from pypdf import PdfReader |
|
|
| from .config import RAW_DIR, PROCESSED_DIR |
|
|
| INDEX_URL = "https://www.cbsa-asfc.gc.ca/publications/dm-md/d1-d23-eng.html" |
| DMEMO_DIR = RAW_DIR / "dmemos" |
| OUT_FILE = PROCESSED_DIR / "dmemos.json" |
|
|
| |
| _SKIP_HEADINGS = {"contact us", "related links"} |
| _MEMO_HREF = re.compile(r"/dm-md/d\d+/d[\d-]+-eng\.html") |
| _URL_NUMBER = re.compile(r"/(d\d+-[\d-]+)-eng\.html") |
|
|
|
|
| def _norm(text): |
| return re.sub(r"\s+", " ", text or "").strip() |
|
|
|
|
| def _fetch(url, dest, force=False): |
| if dest.exists() and not force: |
| return dest.read_bytes() |
| req = urllib.request.Request(url, headers={"User-Agent": "CanLex/0.1"}) |
| with urllib.request.urlopen(req, timeout=60) as resp: |
| data = resp.read() |
| dest.parent.mkdir(parents=True, exist_ok=True) |
| dest.write_bytes(data) |
| time.sleep(0.5) |
| return data |
|
|
|
|
| def memo_urls(force=False): |
| """All individual D-memo URLs listed on the CBSA index page.""" |
| html = _fetch(INDEX_URL, DMEMO_DIR / "_index.html", force=force) |
| soup = BeautifulSoup(html, "html.parser") |
| urls, seen = [], set() |
| for a in soup.find_all("a", href=True): |
| if _MEMO_HREF.search(a["href"]): |
| full = urljoin(INDEX_URL, a["href"]) |
| if full not in seen: |
| seen.add(full) |
| urls.append(full) |
| return urls |
|
|
|
|
| def _render_section(h2): |
| """Readable text from an <h2> up to the next <h2> (sections already unwrapped).""" |
| lines = [] |
| for sib in h2.find_next_siblings(): |
| if sib.name == "h2" or sib.get("id") == "wb-dtmd": |
| break |
| if sib.name in ("ul", "ol"): |
| for li in sib.find_all("li", recursive=False): |
| item = _norm(li.get_text(" ", strip=True)) |
| if item: |
| lines.append(f"- {item}") |
| else: |
| text = _norm(sib.get_text(" ", strip=True)) |
| if text: |
| lines.append(text) |
| return "\n".join(lines) |
|
|
|
|
| def parse_memo(html, url): |
| """Parse one D-memo HTML page into one chunk per <h2> content section.""" |
| soup = BeautifulSoup(html, "html.parser") |
| main = soup.find("main") |
| if main is None: |
| return [] |
| for section in main.find_all("section"): |
| section.unwrap() |
|
|
| match = _URL_NUMBER.search(url) |
| number = match.group(1).upper() if match else url |
| h1 = main.find("h1") |
| topic = "" |
| if h1: |
| |
| |
| |
| small = h1.find("small") |
| raw = (small.get_text(" ", strip=True) if small |
| else h1.get_text(" ", strip=True)) |
| topic = re.sub(r"^Memorandum\s+D[\w-]+\s*[:–-]\s*", "", |
| _norm(raw), flags=re.I) |
| dm = main.find("time", attrs={"property": "dateModified"}) |
| date = _norm(dm.get("datetime") or dm.get_text()) if dm else "" |
|
|
| chunks = [] |
| for h2 in main.find_all("h2"): |
| heading = _norm(h2.get_text(" ", strip=True)) |
| if not heading or heading.lower() in _SKIP_HEADINGS: |
| continue |
| body = _render_section(h2) |
| if not body: |
| continue |
| chunks.append({ |
| "id": f"dmemo-{number}-{len(chunks) + 1}", |
| "doc_type": "memorandum", |
| "act_code": "D-Memo", |
| "act_short": "D-Memo", |
| "act_name": "CBSA D-Memoranda", |
| "section": number, |
| "marginal_note": heading, |
| "part": topic, |
| "division": "", |
| "heading": "", |
| "text": body, |
| "history": "", |
| "last_amended": date, |
| "current_to": date, |
| "citation": f"Memorandum {number}", |
| "source_url": url, |
| }) |
| return chunks |
|
|
|
|
| def _pdf_clean(text): |
| text = re.sub(r"[ \t]+", " ", text) |
| text = re.sub(r"\n[ \t]+", "\n", text) |
| return re.sub(r"\n{3,}", "\n\n", text).strip() |
|
|
|
|
| def _pdf_text(pdf_bytes): |
| try: |
| reader = PdfReader(io.BytesIO(pdf_bytes)) |
| return _pdf_clean("\n".join((p.extract_text() or "") for p in reader.pages)) |
| except Exception: |
| return "" |
|
|
|
|
| def _split(text, target=3000): |
| """Split long PDF text into ~target-sized pieces at line boundaries.""" |
| if len(text) <= target: |
| return [text] |
| parts, buf, size = [], [], 0 |
| for line in text.split("\n"): |
| if size + len(line) > target and buf: |
| parts.append("\n".join(buf)) |
| buf, size = [], 0 |
| buf.append(line) |
| size += len(line) + 1 |
| if buf: |
| parts.append("\n".join(buf)) |
| return parts |
|
|
|
|
| def parse_pdf_memo(html, url): |
| """Fallback for memos whose HTML page is only a stub linking to a PDF.""" |
| soup = BeautifulSoup(html, "html.parser") |
| main = soup.find("main") |
| if main is None: |
| return [] |
| pdf_href = next((a["href"] for a in main.find_all("a", href=True) |
| if a["href"].lower().endswith(".pdf")), None) |
| if not pdf_href: |
| return [] |
| pdf_url = urljoin(url, pdf_href) |
|
|
| match = _URL_NUMBER.search(url) |
| number = match.group(1).upper() if match else url |
| h1 = main.find("h1") |
| topic = _norm(h1.get_text(" ", strip=True)) if h1 else "" |
| topic = re.sub(r"^Memorandum\s+D[\w-]+\s*[:–-]\s*", "", topic, flags=re.I) |
| dm = main.find("time", attrs={"property": "dateModified"}) |
| date = _norm(dm.get("datetime") or dm.get_text()) if dm else "" |
|
|
| pdf_bytes = _fetch(pdf_url, DMEMO_DIR / "pdf" / pdf_url.rsplit("/", 1)[-1]) |
| text = _pdf_text(pdf_bytes) |
| if not text: |
| return [] |
|
|
| parts = _split(text) |
| chunks = [] |
| for i, part in enumerate(parts, 1): |
| label = topic or number |
| if len(parts) > 1: |
| label = f"{label} (part {i})" |
| chunks.append({ |
| "id": f"dmemo-{number}-pdf{i}", |
| "doc_type": "memorandum", |
| "act_code": "D-Memo", |
| "act_short": "D-Memo", |
| "act_name": "CBSA D-Memoranda", |
| "section": number, |
| "marginal_note": label, |
| "part": topic, |
| "division": "", |
| "heading": "", |
| "text": part, |
| "history": "", |
| "last_amended": date, |
| "current_to": date, |
| "citation": f"Memorandum {number}", |
| "source_url": url, |
| }) |
| return chunks |
|
|
|
|
| def ingest(force=False, limit=None): |
| urls = memo_urls(force=force) |
| if limit: |
| urls = urls[:limit] |
| print(f"Ingesting {len(urls)} D-Memoranda...") |
| all_chunks, failures = [], [] |
| for i, url in enumerate(urls, 1): |
| try: |
| html = _fetch(url, DMEMO_DIR / url.rsplit("/", 1)[-1], force=force) |
| chunks = parse_memo(html, url) or parse_pdf_memo(html, url) |
| if chunks: |
| all_chunks.extend(chunks) |
| else: |
| failures.append((url, "no content parsed")) |
| except Exception as exc: |
| failures.append((url, f"{type(exc).__name__}: {exc}")) |
| if i % 50 == 0: |
| print(f" {i}/{len(urls)} ...") |
| PROCESSED_DIR.mkdir(parents=True, exist_ok=True) |
| OUT_FILE.write_text(json.dumps(all_chunks, ensure_ascii=False, indent=2), encoding="utf-8") |
| print(f" {len(all_chunks)} section-chunks from {len(urls) - len(failures)} memos " |
| f"-> {OUT_FILE.name}") |
| if failures: |
| print(f" {len(failures)} memos with no content / errors:") |
| for url, why in failures[:15]: |
| print(f" - {url.rsplit('/', 1)[-1]}: {why}") |
|
|
|
|
| def main(): |
| force = "--force" in sys.argv |
| limit = next((int(a.split("=", 1)[1]) for a in sys.argv[1:] |
| if a.startswith("--limit=")), None) |
| ingest(force=force, limit=limit) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|