CanLex / canlex /index.py
Beemer
Sweep-tune the regulation and back-matter penalties; revert the failed swap
a7a22f5
"""Hybrid retrieval (BM25 + semantic) with cross-encoder reranking."""
import json
import math
import os
import re
import sys
from collections import Counter, defaultdict
import snowballstemmer
from .config import PROCESSED_DIR
from .synonyms import expand_query
K1 = 1.5
B = 0.75
RRF_K = 60 # reciprocal-rank-fusion damping constant
W_SEM = 2.0 # weight on the semantic retriever in the fusion (1.0 = equal; eval-tuned)
CANDIDATES = 80 # hits each retriever contributes to the fusion
RERANK_POOL = 50 # top fused candidates the cross-encoder rescores
MN_WEIGHT = float(os.environ.get("CANLEX_MN_WEIGHT", "0.0024"))
# title-match boost per unit of idf-weighted overlap between
# the query and a candidate's marginal note (section title)
MN_CAP = float(os.environ.get("CANLEX_MN_CAP", "0.012"))
# ceiling on the title-match boost -- it nudges the ranking
# without overriding a strong base score
REG_PENALTY = float(os.environ.get("CANLEX_REG_PENALTY", "0.004"))
# small fusion penalty on regulation sections, so the Act
# that creates a duty outranks the regulation elaborating it
# (sweep-tuned 2026-05-23 from 0.008 -> 0.004; see sweep.log)
BACKMATTER_PENALTY = float(os.environ.get("CANLEX_BACKMATTER_PENALTY", "0.004"))
# likewise for a collective agreement's back-matter
# (memoranda, letters of understanding) vs its numbered articles
# (sweep-tuned 2026-05-23 from 0.008 -> 0.004)
SOURCE_CAP = 2 # max chunks one case or memorandum may contribute
APPENDIX_CAP = 3 # max referenced appendices co-surfaced into a result set
# Primary instruments -- enacted law, collective agreements, the NJC directives
# incorporated into them, and the IRPA delegation instruments. Their sections or
# items are distinct provisions, so (like legislation) they are never collapsed
# under the diversity cap.
PRIMARY_DOC_TYPES = frozenset({"legislation", "agreement", "directive", "delegation"})
_TOKEN = re.compile(r"[a-z0-9]+")
_SECTION_REF = re.compile(r"\bs(?:ec(?:tion)?)?s?\.?\s*(\d+(?:\.\d+)?)")
# A cross-reference to another provision -- "section 34", "subsection 25(1)",
# "paragraph 36(1)(a)", "s. 34" -- capturing the top-level section number.
_XREF = re.compile(
r"\b(?:sections?|subsections?|paragraphs?|ss?\.)\s*(\d+(?:\.\d+)?)",
re.IGNORECASE)
# A line opening with "(1)", "(a)" or "(b.1)" -- a citable subdivision
# (subsection, paragraph or subparagraph) of a provision.
_MARKER = re.compile(r"(?m)^\(([0-9a-zA-Z]+(?:\.\d+)?)\)")
# A D-memorandum's reference to a provision -- "section 32 of the Customs Act",
# or "section 32 of the Act" (the Act a D-memo administers -- the Customs Act).
_MEMO_CITE = re.compile(
r"\b(?:sub)?sections?\s+(\d+(?:\.\d+)?)(?:\([^)]+\))*\s+of\s+the\s+"
r"(Customs Act|Customs Tariff|Act)\b", re.IGNORECASE)
# A directive section's reference to an appendix of the same directive --
# "as specified in Appendix C". A trailing "of" ("Appendix C of the NJC Travel
# Directive") marks a cross-directive citation and is deliberately left alone.
_APPENDIX_REF = re.compile(r"\bAppendi(?:x|ces)\s+([A-Za-z])\b(?!\s+of\b)",
re.IGNORECASE)
# A directive chunk that *is* an appendix: its title opens "Appendix C ...".
_APPENDIX_HEAD = re.compile(r"Appendix\s+([A-Za-z])\b", re.IGNORECASE)
_STEMMER = snowballstemmer.stemmer("english")
_STEM_CACHE = {}
# Stem pairs Snowball does not merge but that share a legal meaning, so a
# query naming the verb still matches a provision titled with the noun (or
# vice versa). Mapped to the verb form on both index and query sides, which
# is consistent and arbitrary -- the merge is what matters.
_STEM_NORMALIZE = {
"seizur": "seiz", # seizure -> seize
"forfeitur": "forfeit", # forfeiture -> forfeit
"appel": "appeal", # appellate/appellant -> appeal
"detent": "detain", # detention -> detain
"exclus": "exclud", # exclusion -> exclude
"admiss": "admit", # admission/admissibility -> admit
"applic": "appli", # application -> apply
"complianc": "compli", # compliance -> comply
"grievanc": "griev", # grievance -> grieve
}
def _stem(word):
"""Snowball-stem a word, memoised -- legal text repeats terms heavily.
A small post-stem normalization merges a few verb/noun pairs Snowball
leaves apart ('seize'/'seizure', 'forfeit'/'forfeiture')."""
stemmed = _STEM_CACHE.get(word)
if stemmed is None:
stemmed = _STEMMER.stemWord(word)
stemmed = _STEM_NORMALIZE.get(stemmed, stemmed)
_STEM_CACHE[word] = stemmed
return stemmed
def tokenize(text):
"""Lower-case, split on word characters, and Snowball-stem each token, so a
query matches a provision even when their word forms differ -- 'possession'
vs 'possess', 'reporting' vs 'report', 'importation' vs 'import'."""
return [_stem(w) for w in _TOKEN.findall(text.lower())]
def _section_refs(query):
"""Pull explicit section numbers from a query, e.g. 'section 34', 's. 20.1'."""
return set(_SECTION_REF.findall(query.lower()))
def topical_title(chunk):
"""Return the chunk's topic-bearing string, used wherever a section's
'title' is weighted for retrieval -- BM25 indexing, the title-match boost,
and the semantic embedding. Differs by doc_type because the field that
carries the topic differs: legislation/agreement/directive/delegation use
the marginal_note (section heading); D-memoranda use 'part' because their
marginal_note is a generic banner; case-law uses 'heading' because its
marginal_note is just the paragraph range ('paras 11-13') and the case
proposition lives in heading."""
doc_type = chunk.get("doc_type")
if doc_type == "memorandum":
return chunk.get("part") or chunk["marginal_note"]
if doc_type == "caselaw":
return chunk.get("heading") or chunk["marginal_note"]
return chunk["marginal_note"]
def _provision_units(text):
"""Citable parts of a provision, for pinpoint scoring -- a list of
(citation_suffix, scoring_text, snippet). One entry per paragraph, with its
subsection chapeau prepended to scoring_text for context, plus one per
paragraph-less subsection. Returns [] when the provision is too flat to
pinpoint (fewer than two subdivisions)."""
marks = list(_MARKER.finditer(text))
if len(marks) < 2:
return []
spans = []
for i, m in enumerate(marks):
end = marks[i + 1].start() if i + 1 < len(marks) else len(text)
spans.append((m.group(1), text[m.start():end].strip()))
units, cur_sub, cur_intro = [], "", ""
for j, (token, body) in enumerate(spans):
if "[Repealed" in body[:40]:
if token[0].isdigit():
cur_sub, cur_intro = f"({token})", ""
continue
if token[0].isdigit():
cur_sub, cur_intro = f"({token})", body
nxt = spans[j + 1][0] if j + 1 < len(spans) else ""
if not nxt or nxt[0].isdigit():
units.append((cur_sub, body, body)) # subsection has no paragraphs
# otherwise the chapeau is emitted via its paragraphs below
else:
label = f"{cur_sub}({token})" if cur_sub else f"({token})"
units.append((label, f"{cur_intro} {body}".strip(), body))
return units
class LegislationIndex:
def __init__(self):
self.chunks = []
for path in sorted(PROCESSED_DIR.glob("*.json")):
self.chunks.extend(json.loads(path.read_text(encoding="utf-8")))
if not self.chunks:
raise RuntimeError(
f"No processed legislation in {PROCESSED_DIR}. Run 'canlex.ingest' first.")
self._build_bm25()
self._build_note_tokens()
self._build_xref()
self._build_appendix_index()
self._load_semantic()
self._load_reranker()
def _build_bm25(self):
self.doc_len = []
self.postings = defaultdict(list) # term -> [(doc_idx, term_frequency), ...]
df = defaultdict(int)
for idx, c in enumerate(self.chunks):
# The topical title is repeated to weight it above body text;
# the Act name, code and section are indexed too, so an Act's own
# terminology (e.g. "controlled substance") and its codes/numbers
# are searchable even when a section's text omits them. The title
# is doc_type-aware via topical_title -- for case-law it picks
# the case proposition (heading), not the paragraph range
# (marginal_note), so a leading case surfaces on a topical query.
title = topical_title(c)
blob = " ".join((title, title, c["heading"],
c["part"], c["division"], c["act_name"], c["act_code"],
c["section"], c["text"]))
counts = Counter(tokenize(blob))
self.doc_len.append(sum(counts.values()))
for term, tf in counts.items():
self.postings[term].append((idx, tf))
df[term] += 1
n = len(self.chunks)
self.avgdl = sum(self.doc_len) / n
self.idf = {t: math.log(1 + (n - d + 0.5) / (d + 0.5)) for t, d in df.items()}
def _build_note_tokens(self):
"""Pre-tokenise each chunk's topical title (see topical_title) for the
title-match boost in search(). Each chunk is also flagged as a
regulation (act codes beginning SOR/C.R.C.) for the Act-over-regulation
preference, and as collective-agreement back-matter (memoranda and
letters with no article number) for the back-matter penalty."""
self._note_tokens = []
self._is_regulation = []
self._is_backmatter = []
for c in self.chunks:
self._note_tokens.append(set(tokenize(topical_title(c))))
self._is_regulation.append(
c.get("doc_type", "legislation") == "legislation"
and c["act_code"].startswith(("SOR", "C.R.C")))
self._is_backmatter.append(
c.get("doc_type") == "agreement"
and not str(c["section"])[:1].isdigit())
def _build_appendix_index(self):
"""Index directive appendices by (act_code, letter), so a directive
section that cites 'Appendix C' can pull that appendix into the result
set -- a directive's rate-table appendices are bare numbers and rank
poorly on a natural-language query, yet the section citing them is of
little use without them."""
self._appendix = defaultdict(list)
for idx, c in enumerate(self.chunks):
if c.get("doc_type") != "directive":
continue
m = _APPENDIX_HEAD.match(c["marginal_note"])
if m:
self._appendix[(c["act_code"], m.group(1).upper())].append(idx)
def _load_semantic(self):
"""Load precomputed embeddings and the query embedder.
Any failure (missing embeddings, or numpy/model unavailable) degrades the
index to BM25-only rather than breaking retrieval.
"""
self.semantic = False
emb_path = PROCESSED_DIR / "embeddings.npz"
if not emb_path.exists():
print("CanLex index: no embeddings.npz; using BM25 only "
"(run 'canlex.embed' to enable semantic search).", file=sys.stderr)
return
try:
import numpy as np
from .embed import Embedder
with np.load(emb_path) as data:
id_to_vec = dict(zip(data["ids"].tolist(), data["vectors"]))
dim = int(data["vectors"].shape[1])
missing = 0
rows = []
for c in self.chunks:
vec = id_to_vec.get(c["id"])
if vec is None:
missing += 1
rows.append(np.zeros(dim, dtype=np.float32))
else:
rows.append(vec)
self._np = np
self.vectors = np.vstack(rows)
self.embedder = Embedder()
self.semantic = True
if missing:
print(f"CanLex index: {missing}/{len(self.chunks)} sections lack "
f"embeddings; re-run 'canlex.embed' to refresh.", file=sys.stderr)
except Exception as exc:
print(f"CanLex index: semantic search disabled ({type(exc).__name__}: "
f"{exc}); using BM25 only.", file=sys.stderr)
self.semantic = False
def _load_reranker(self):
"""Load the cross-encoder reranker; degrade to the fusion order on failure."""
self.reranker = None
try:
from .rerank import Reranker
self.reranker = Reranker()
except Exception as exc:
print(f"CanLex index: reranker disabled ({type(exc).__name__}: {exc}); "
f"using hybrid fusion order.", file=sys.stderr)
def _bm25_scores(self, query):
scores = defaultdict(float)
for term in set(tokenize(query)):
idf = self.idf.get(term)
if idf is None:
continue
for idx, tf in self.postings[term]:
dl = self.doc_len[idx]
denom = tf + K1 * (1 - B + B * dl / self.avgdl)
scores[idx] += idf * tf * (K1 + 1) / denom
return scores
def _semantic_ranking(self, query):
qv = self.embedder.encode_query(query)
sims = self.vectors @ qv
order = self._np.argsort(sims)[::-1][:CANDIDATES]
# The top cosine similarity doubles as a corpus-coverage signal: a query
# the corpus cannot answer has no passage close to it.
return [int(i) for i in order], float(sims.max())
def _rerank_doc(self, idx):
c = self.chunks[idx]
return f"{c['citation']}{c['marginal_note']}\n{c['text']}"
def _source_key(self, idx):
"""The parent document a chunk belongs to, for diversity capping. Returns
None for primary instruments -- legislation, collective agreements and
directives -- whose sections are distinct provisions and are never
capped; case law is keyed by citation, memoranda by memo number."""
c = self.chunks[idx]
doc_type = c.get("doc_type", "legislation")
if doc_type in PRIMARY_DOC_TYPES:
return None
if doc_type == "memorandum":
return ("memorandum", c["section"]) # act_code is a shared constant
return (doc_type, c["act_code"]) # one decision, keyed by citation
def _diversify(self, ordered):
"""Reorder so no single case, memorandum, agreement or directive can
monopolise the results: once a source has contributed SOURCE_CAP chunks,
its remaining chunks are deferred below every other candidate. This stops
a heavily paragraph-chunked decision from crowding out the statute it
interprets. Legislation is never capped."""
kept, deferred, counts = [], [], defaultdict(int)
for idx in ordered:
key = self._source_key(idx)
if key is None:
kept.append(idx)
continue
counts[key] += 1
(kept if counts[key] <= SOURCE_CAP else deferred).append(idx)
return kept + deferred
def _ensure_primary(self, ordered, top_k, q_tokens):
"""Guarantee the governing primary instrument is surfaced: when the
natural top_k is monopolised by case law or D-memoranda that interpret
a statute, pull the most topically-on-target Act/agreement/directive/
delegation section into the top_k, displacing the lowest-ranked
secondary sources. The single best match is always kept in place.
Two changes from the older 'ensure_legislation' guarantee: (i) all
primary instruments count, not only legislation -- so an FB-Agreement
query that surfaces only FPSLREB case law gets the agreement article
pulled in too; (ii) the candidate to promote is chosen by title-match
against the query (the section whose marginal_note covers the most of
the query's distinctive vocabulary), not by raw fusion rank. The
fusion rank surfaces tangentially-on-topic sections that share the
Act's general vocabulary; the title-match scorer surfaces the section
actually about the topic ('Seizure and forfeiture' over 'Report to
President' for a 'seize currency' query)."""
if top_k < 3:
return ordered
def is_primary(i):
return self.chunks[i].get("doc_type", "legislation") in PRIMARY_DOC_TYPES
top, rest = ordered[:top_k], ordered[top_k:]
need = 2 - sum(1 for i in top if is_primary(i))
if need <= 0:
return ordered
primary_in_rest = [i for i in rest if is_primary(i)]
if not primary_in_rest:
return ordered
if q_tokens:
def title_score(idx):
note_tokens = self._note_tokens[idx]
if not note_tokens:
return 0.0
matched = sum(self.idf.get(t, 0.0)
for t in note_tokens if t in q_tokens)
total = sum(self.idf.get(t, 0.0) for t in note_tokens) or 1.0
score = matched * matched / total
# Mirror the fusion-stage hierarchy preferences for tiebreaks:
# the governing Act beats its regulation, and numbered
# agreement articles beat their back-matter, when both have
# identical titles (e.g. IRPA s. 112 and IRPR s. 160 both
# marginal-noted 'Application for protection').
if self._is_regulation[idx]:
score -= REG_PENALTY
if self._is_backmatter[idx]:
score -= BACKMATTER_PENALTY
return score
# Sort by title-match descending, then by original fusion order as
# a tiebreak (stable sort: keep the original rest order).
primary_in_rest.sort(key=title_score, reverse=True)
promote = primary_in_rest[:need]
drop = [i for i in reversed(top) if not is_primary(i)][:len(promote)]
if not drop:
return ordered
promote = promote[:len(drop)]
dropped, promoted = set(drop), set(promote)
kept = [i for i in top if i not in dropped]
return kept[:1] + promote + kept[1:] + drop + [
i for i in rest if i not in promoted]
def _cosurface_appendices(self, top):
"""Append the appendices the directive results cite but that retrieval
missed. A directive's rate tables ('Appendix C') rank poorly on a
natural-language query, yet a section that cites them is of little use
without them -- so the appendix travels with it. When more appendices
are cited than APPENDIX_CAP allows, the ones cited by the most results
win, so a lone off-topic result cannot outvote the relevant ones.
Returns `top` extended by up to APPENDIX_CAP appendix chunks."""
have = set(top)
cited = Counter()
for idx in top:
c = self.chunks[idx]
if c.get("doc_type") != "directive":
continue
seen = set() # count an appendix once per citing result
for m in _APPENDIX_REF.finditer(c["text"]):
key = (c["act_code"], m.group(1).upper())
for app in self._appendix.get(key, ()):
if app not in have and app not in seen:
seen.add(app)
cited[app] += 1
return top + [app for app, _ in cited.most_common(APPENDIX_CAP)]
def _highlight(self, query, indices):
"""For each result chunk, the subsection or paragraph most on point for
the query: {result_position: (citation_suffix, snippet)}. Uses the
cross-encoder; returns {} if it is unavailable or nothing is structured.
Only the first results are scored -- a pinpoint deep in the list is not
worth the cross-encoder cost."""
if not self.reranker:
return {}
jobs = [] # (result_position, label, scoring_text, snippet)
for pos, idx in enumerate(indices[:8]):
c = self.chunks[idx]
if c.get("doc_type", "legislation") != "legislation":
continue
note = c["marginal_note"]
for label, scoring, snippet in _provision_units(c["text"]):
jobs.append((pos, label, f"{note}. {scoring}", snippet))
if not jobs:
return {}
best = {} # result_position -> (score, label, snippet)
for (pos, label, _, snippet), score in zip(
jobs, self.reranker.score(query, [j[2] for j in jobs])):
if pos not in best or score > best[pos][0]:
best[pos] = (score, label, snippet)
return {pos: (label, " ".join(snippet[:240].split()))
for pos, (score, label, snippet) in best.items()}
def search(self, query, top_k=6, act=None, doc_type=None):
"""Hybrid candidate fusion (BM25 + semantic), then cross-encoder rerank."""
# Expand legal abbreviations (PRRA, H&C, ...) into statutory wording for
# the recall stages; the reranker still sees the user's original query.
expanded = expand_query(query)
confidence = None
fused = defaultdict(float)
bm25 = self._bm25_scores(expanded)
for rank, idx in enumerate(sorted(bm25, key=bm25.get, reverse=True)[:CANDIDATES]):
fused[idx] += 1.0 / (RRF_K + rank)
if self.semantic:
sem_order, confidence = self._semantic_ranking(expanded)
for rank, idx in enumerate(sem_order):
fused[idx] += W_SEM / (RRF_K + rank)
# Ensure explicitly-referenced sections are retrieved even if recall
# missed them -- but only for Acts the query actually names. A query
# like "IRPA s. 40 misrepresentation defence" uses the section number
# topically; pulling every Act's s. 40 into the pool would drown out
# the case law that interprets the section the user meant. Substring
# check rather than token-overlap because act_codes split into trivial
# tokens ("A-8.8" -> {a, 8}) that spuriously match common query words.
refs = _section_refs(query)
q_lc = query.lower()
def _act_in_query(c):
short = c["act_short"].lower()
code = c["act_code"].lower()
return ((short and short in q_lc)
or (code and len(code) >= 3 and code in q_lc))
if refs:
for idx, c in enumerate(self.chunks):
if (c["section"] in refs and idx not in fused
and _act_in_query(c)):
fused[idx] = 0.0
# Title-match boost: the marginal note is a section's canonical subject.
# Reward a candidate by how completely and how specifically the query
# matches its marginal note. The overlap is idf-weighted (matching a
# distinctive title like "hours of work" counts far more than a generic
# one like "Decision"), scaled by coverage, and capped -- so it nudges
# ranking toward the provision a question names by topic without
# overriding a strong base score.
q_tokens = set(tokenize(expanded))
for idx in list(fused):
note_tokens = self._note_tokens[idx]
total = sum(self.idf.get(t, 0.0) for t in note_tokens)
if total <= 0:
continue
matched = sum(self.idf.get(t, 0.0)
for t in note_tokens if t in q_tokens)
if matched > 0:
fused[idx] += min(MN_WEIGHT * matched * matched / total, MN_CAP)
# Hierarchy penalties: a topical question should surface the governing
# provision, not the supplementary material around it. An Act creates a
# duty while a regulation only elaborates procedure; a collective
# agreement's numbered articles are its substance while its memoranda
# and letters of understanding are back-matter. Both take a small
# fusion penalty so the governing provision wins a close contest.
for idx in list(fused):
if self._is_regulation[idx]:
fused[idx] -= REG_PENALTY
elif self._is_backmatter[idx]:
fused[idx] -= BACKMATTER_PENALTY
def allowed(idx):
c = self.chunks[idx]
if act and act.lower() not in (c["act_short"].lower(), c["act_code"].lower()):
return False
if doc_type and c.get("doc_type", "legislation") != doc_type:
return False
return True
candidates = [i for i in sorted(fused, key=fused.get, reverse=True) if allowed(i)]
if not candidates:
return []
scores = {i: fused[i] for i in candidates}
# Precision stage: the cross-encoder rescores the top candidate pool, but
# may only PROMOTE -- each pooled candidate is placed at the better of its
# fusion rank and its rerank rank, never below its fusion rank. The
# reranker reliably surfaces a strong answer the fusion ranked low, yet is
# unreliable on long statutory text (it can score the right section
# negative), so its power to demote a candidate is deliberately removed.
if self.reranker:
pool = candidates[:RERANK_POOL]
ce = dict(zip(pool, self.reranker.score(
query, [self._rerank_doc(i) for i in pool])))
fusion_rank = {idx: r for r, idx in enumerate(pool)}
rerank_rank = {idx: r for r, idx in enumerate(
sorted(pool, key=ce.get, reverse=True))}
pool.sort(key=lambda i: (min(fusion_rank[i], rerank_rank[i]),
fusion_rank[i]))
candidates = pool + candidates[RERANK_POOL:]
# Explicit section references are pinned to the very top -- using the
# same Act-mentioned constraint as the recall step above, for the same
# reason: a bare "s. 40" without an Act name is usually topical
# (e.g. "the IRPA s. 40 misrepresentation defence"), not a lookup.
if refs:
pinned = [i for i in candidates
if self.chunks[i]["section"] in refs
and _act_in_query(self.chunks[i])]
if pinned:
pinned_set = set(pinned)
candidates = pinned + [i for i in candidates if i not in pinned_set]
# Cap one-source monopolies, then guarantee a primary instrument on
# the topic is represented. The guarantee operates on a fixed visible
# window of min(top_k, 5), not the full top_k -- with top_k=20 (the
# eval default) the larger window almost always contains incidental
# legislation, so the guarantee never fires even when the governing
# provision is buried at rank 10+.
candidates = self._diversify(candidates)
candidates = self._ensure_primary(candidates, min(top_k, 5), q_tokens)
top = self._cosurface_appendices(candidates[:top_k])
highlights = self._highlight(query, top)
results = []
for pos, i in enumerate(top):
result = {**self.chunks[i], "score": round(scores.get(i, 0.0), 4),
"confidence": confidence}
if pos in highlights:
result["highlight"] = highlights[pos]
results.append(result)
return results
def get_section(self, act, section):
act = act.lower()
for c in self.chunks:
if c["section"] == section and act in (c["act_short"].lower(), c["act_code"].lower()):
return c
return None
def _build_xref(self):
"""Index legislation by (act, section); find each Act's definitions
section; link every regulation to its enabling Act and every
D-memorandum to the provisions it cites -- all for cross-referencing."""
self._by_section = {}
self._defs_section = {}
acts, regs = {}, {} # act_code -> (act_short, act_name)
for c in self.chunks:
if c.get("doc_type", "legislation") != "legislation":
continue
self._by_section[(c["act_code"], c["section"])] = c
if c["act_code"] not in self._defs_section and (
c["marginal_note"].strip().lower() in (
"definitions", "definition", "interpretation")):
self._defs_section[c["act_code"]] = c
bucket = regs if c["act_code"].startswith(("SOR", "C.R.C")) else acts
bucket.setdefault(c["act_code"], (c["act_short"], c["act_name"]))
# Link a regulation to the Act it is made under by matching their names
# ("X Regulations" <-> "X Act").
self._enabling_act = {} # reg code -> (act_short, act_name)
self._regulations = defaultdict(list) # act code -> [(reg_short, reg_name)]
def base(name):
return re.sub(r"\b(?:Act|Regulations)\b", "", name).strip().lower()
act_by_base = {base(n): (code, s, n) for code, (s, n) in acts.items()}
for rcode, (rshort, rname) in regs.items():
hit = act_by_base.get(base(rname))
if hit:
self._enabling_act[rcode] = (hit[1], hit[2])
self._regulations[hit[0]].append((rshort, rname))
# Link D-memoranda to the Customs Act / Customs Tariff provisions they
# cite; an unqualified "the Act" in a D-memo means the Customs Act.
by_short = {s.lower(): code for code, (s, n) in acts.items()}
customs, tariff = by_short.get("customs act"), by_short.get("customs tariff")
self._memos_for_section = defaultdict(set) # (act_code, section) -> memos
for c in self.chunks:
if c.get("doc_type") != "memorandum":
continue
for num, which in _MEMO_CITE.findall(c["text"]):
code = tariff if which.lower() == "customs tariff" else customs
if code:
self._memos_for_section[(code, num)].add(c["section"])
def related(self, chunk):
"""Cross-references for a legislation result, as a dict: 'provisions'
(intra-Act sections it cites, plus the definitions section),
'regulations' (made under this Act), 'enabling_act' (for a regulation,
the Act it is made under) and 'memoranda' (D-memo numbers citing this
section). Empty dict for case law, memoranda, etc."""
if chunk.get("doc_type", "legislation") != "legislation":
return {}
act = chunk["act_code"]
provisions, seen = [], {chunk["section"]}
defs = self._defs_section.get(act)
if defs and defs["section"] not in seen:
provisions.append((defs["section"], defs["marginal_note"]))
seen.add(defs["section"])
for match in _XREF.finditer(chunk["text"]):
sec = match.group(1)
if sec in seen:
continue
target = self._by_section.get((act, sec))
if target:
provisions.append((sec, target["marginal_note"]))
seen.add(sec)
if len(provisions) >= 8:
break
return {
"provisions": provisions,
"regulations": self._regulations.get(act, []),
"enabling_act": self._enabling_act.get(act),
"memoranda": sorted(self._memos_for_section.get(
(act, chunk["section"]), []))[:6],
}
def main():
if len(sys.argv) < 2:
print('usage: python -m canlex.index "your query"')
return
query = " ".join(sys.argv[1:])
index = LegislationIndex()
if index.reranker:
mode = "hybrid + cross-encoder rerank"
elif index.semantic:
mode = "hybrid (BM25 + semantic)"
else:
mode = "BM25 only"
print(f"{len(index.chunks)} sections indexed - {mode}. Query: {query!r}\n")
for r in index.search(query):
print(f"[{r['score']:.3f}] {r['citation']} - {r['marginal_note']}")
print(f" {r['text'].replace(chr(10), ' ')[:160]}")
print()
if __name__ == "__main__":
main()