File size: 32,825 Bytes
21626e7 d33c8fb 21626e7 2966f10 21626e7 1e58371 21626e7 4066df3 21626e7 d33c8fb 589d46e d33c8fb 589d46e a7a22f5 d33c8fb 589d46e a7a22f5 d33c8fb 589d46e a7a22f5 589d46e 58fc4b4 589d46e 21626e7 2966f10 1e58371 58fc4b4 2966f10 547ec21 2966f10 547ec21 2966f10 547ec21 2966f10 21626e7 2966f10 21626e7 666cd44 1e58371 21626e7 589d46e 2966f10 58fc4b4 21626e7 666cd44 2966f10 666cd44 2966f10 21626e7 589d46e 666cd44 589d46e 666cd44 589d46e 58fc4b4 21626e7 2966f10 21626e7 1e58371 21626e7 1e58371 589d46e 1e58371 589d46e 1e58371 589d46e 1e58371 8552318 1e58371 8552318 1e58371 8552318 1e58371 8552318 1e58371 58fc4b4 1e58371 21626e7 1e58371 21626e7 1e58371 21626e7 1e58371 d72272a 21626e7 666cd44 21626e7 666cd44 21626e7 666cd44 21626e7 589d46e 21626e7 2966f10 21626e7 2966f10 21626e7 666cd44 21626e7 666cd44 21626e7 a7a22f5 1e58371 8552318 1e58371 58fc4b4 1e58371 58fc4b4 1e58371 21626e7 2966f10 1e58371 2966f10 1e58371 2966f10 1e58371 2966f10 1e58371 2966f10 1e58371 2966f10 1e58371 2966f10 1e58371 2966f10 1e58371 2966f10 1e58371 2966f10 1e58371 2966f10 21626e7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 | """Hybrid retrieval (BM25 + semantic) with cross-encoder reranking."""
import json
import math
import os
import re
import sys
from collections import Counter, defaultdict
import snowballstemmer
from .config import PROCESSED_DIR
from .synonyms import expand_query
K1 = 1.5
B = 0.75
RRF_K = 60 # reciprocal-rank-fusion damping constant
W_SEM = 2.0 # weight on the semantic retriever in the fusion (1.0 = equal; eval-tuned)
CANDIDATES = 80 # hits each retriever contributes to the fusion
RERANK_POOL = 50 # top fused candidates the cross-encoder rescores
MN_WEIGHT = float(os.environ.get("CANLEX_MN_WEIGHT", "0.0024"))
# title-match boost per unit of idf-weighted overlap between
# the query and a candidate's marginal note (section title)
MN_CAP = float(os.environ.get("CANLEX_MN_CAP", "0.012"))
# ceiling on the title-match boost -- it nudges the ranking
# without overriding a strong base score
REG_PENALTY = float(os.environ.get("CANLEX_REG_PENALTY", "0.004"))
# small fusion penalty on regulation sections, so the Act
# that creates a duty outranks the regulation elaborating it
# (sweep-tuned 2026-05-23 from 0.008 -> 0.004; see sweep.log)
BACKMATTER_PENALTY = float(os.environ.get("CANLEX_BACKMATTER_PENALTY", "0.004"))
# likewise for a collective agreement's back-matter
# (memoranda, letters of understanding) vs its numbered articles
# (sweep-tuned 2026-05-23 from 0.008 -> 0.004)
SOURCE_CAP = 2 # max chunks one case or memorandum may contribute
APPENDIX_CAP = 3 # max referenced appendices co-surfaced into a result set
# Primary instruments -- enacted law, collective agreements, the NJC directives
# incorporated into them, and the IRPA delegation instruments. Their sections or
# items are distinct provisions, so (like legislation) they are never collapsed
# under the diversity cap.
PRIMARY_DOC_TYPES = frozenset({"legislation", "agreement", "directive", "delegation"})
_TOKEN = re.compile(r"[a-z0-9]+")
_SECTION_REF = re.compile(r"\bs(?:ec(?:tion)?)?s?\.?\s*(\d+(?:\.\d+)?)")
# A cross-reference to another provision -- "section 34", "subsection 25(1)",
# "paragraph 36(1)(a)", "s. 34" -- capturing the top-level section number.
_XREF = re.compile(
r"\b(?:sections?|subsections?|paragraphs?|ss?\.)\s*(\d+(?:\.\d+)?)",
re.IGNORECASE)
# A line opening with "(1)", "(a)" or "(b.1)" -- a citable subdivision
# (subsection, paragraph or subparagraph) of a provision.
_MARKER = re.compile(r"(?m)^\(([0-9a-zA-Z]+(?:\.\d+)?)\)")
# A D-memorandum's reference to a provision -- "section 32 of the Customs Act",
# or "section 32 of the Act" (the Act a D-memo administers -- the Customs Act).
_MEMO_CITE = re.compile(
r"\b(?:sub)?sections?\s+(\d+(?:\.\d+)?)(?:\([^)]+\))*\s+of\s+the\s+"
r"(Customs Act|Customs Tariff|Act)\b", re.IGNORECASE)
# A directive section's reference to an appendix of the same directive --
# "as specified in Appendix C". A trailing "of" ("Appendix C of the NJC Travel
# Directive") marks a cross-directive citation and is deliberately left alone.
_APPENDIX_REF = re.compile(r"\bAppendi(?:x|ces)\s+([A-Za-z])\b(?!\s+of\b)",
re.IGNORECASE)
# A directive chunk that *is* an appendix: its title opens "Appendix C ...".
_APPENDIX_HEAD = re.compile(r"Appendix\s+([A-Za-z])\b", re.IGNORECASE)
_STEMMER = snowballstemmer.stemmer("english")
_STEM_CACHE = {}
# Stem pairs Snowball does not merge but that share a legal meaning, so a
# query naming the verb still matches a provision titled with the noun (or
# vice versa). Mapped to the verb form on both index and query sides, which
# is consistent and arbitrary -- the merge is what matters.
_STEM_NORMALIZE = {
"seizur": "seiz", # seizure -> seize
"forfeitur": "forfeit", # forfeiture -> forfeit
"appel": "appeal", # appellate/appellant -> appeal
"detent": "detain", # detention -> detain
"exclus": "exclud", # exclusion -> exclude
"admiss": "admit", # admission/admissibility -> admit
"applic": "appli", # application -> apply
"complianc": "compli", # compliance -> comply
"grievanc": "griev", # grievance -> grieve
}
def _stem(word):
"""Snowball-stem a word, memoised -- legal text repeats terms heavily.
A small post-stem normalization merges a few verb/noun pairs Snowball
leaves apart ('seize'/'seizure', 'forfeit'/'forfeiture')."""
stemmed = _STEM_CACHE.get(word)
if stemmed is None:
stemmed = _STEMMER.stemWord(word)
stemmed = _STEM_NORMALIZE.get(stemmed, stemmed)
_STEM_CACHE[word] = stemmed
return stemmed
def tokenize(text):
"""Lower-case, split on word characters, and Snowball-stem each token, so a
query matches a provision even when their word forms differ -- 'possession'
vs 'possess', 'reporting' vs 'report', 'importation' vs 'import'."""
return [_stem(w) for w in _TOKEN.findall(text.lower())]
def _section_refs(query):
"""Pull explicit section numbers from a query, e.g. 'section 34', 's. 20.1'."""
return set(_SECTION_REF.findall(query.lower()))
def topical_title(chunk):
"""Return the chunk's topic-bearing string, used wherever a section's
'title' is weighted for retrieval -- BM25 indexing, the title-match boost,
and the semantic embedding. Differs by doc_type because the field that
carries the topic differs: legislation/agreement/directive/delegation use
the marginal_note (section heading); D-memoranda use 'part' because their
marginal_note is a generic banner; case-law uses 'heading' because its
marginal_note is just the paragraph range ('paras 11-13') and the case
proposition lives in heading."""
doc_type = chunk.get("doc_type")
if doc_type == "memorandum":
return chunk.get("part") or chunk["marginal_note"]
if doc_type == "caselaw":
return chunk.get("heading") or chunk["marginal_note"]
return chunk["marginal_note"]
def _provision_units(text):
"""Citable parts of a provision, for pinpoint scoring -- a list of
(citation_suffix, scoring_text, snippet). One entry per paragraph, with its
subsection chapeau prepended to scoring_text for context, plus one per
paragraph-less subsection. Returns [] when the provision is too flat to
pinpoint (fewer than two subdivisions)."""
marks = list(_MARKER.finditer(text))
if len(marks) < 2:
return []
spans = []
for i, m in enumerate(marks):
end = marks[i + 1].start() if i + 1 < len(marks) else len(text)
spans.append((m.group(1), text[m.start():end].strip()))
units, cur_sub, cur_intro = [], "", ""
for j, (token, body) in enumerate(spans):
if "[Repealed" in body[:40]:
if token[0].isdigit():
cur_sub, cur_intro = f"({token})", ""
continue
if token[0].isdigit():
cur_sub, cur_intro = f"({token})", body
nxt = spans[j + 1][0] if j + 1 < len(spans) else ""
if not nxt or nxt[0].isdigit():
units.append((cur_sub, body, body)) # subsection has no paragraphs
# otherwise the chapeau is emitted via its paragraphs below
else:
label = f"{cur_sub}({token})" if cur_sub else f"({token})"
units.append((label, f"{cur_intro} {body}".strip(), body))
return units
class LegislationIndex:
def __init__(self):
self.chunks = []
for path in sorted(PROCESSED_DIR.glob("*.json")):
self.chunks.extend(json.loads(path.read_text(encoding="utf-8")))
if not self.chunks:
raise RuntimeError(
f"No processed legislation in {PROCESSED_DIR}. Run 'canlex.ingest' first.")
self._build_bm25()
self._build_note_tokens()
self._build_xref()
self._build_appendix_index()
self._load_semantic()
self._load_reranker()
def _build_bm25(self):
self.doc_len = []
self.postings = defaultdict(list) # term -> [(doc_idx, term_frequency), ...]
df = defaultdict(int)
for idx, c in enumerate(self.chunks):
# The topical title is repeated to weight it above body text;
# the Act name, code and section are indexed too, so an Act's own
# terminology (e.g. "controlled substance") and its codes/numbers
# are searchable even when a section's text omits them. The title
# is doc_type-aware via topical_title -- for case-law it picks
# the case proposition (heading), not the paragraph range
# (marginal_note), so a leading case surfaces on a topical query.
title = topical_title(c)
blob = " ".join((title, title, c["heading"],
c["part"], c["division"], c["act_name"], c["act_code"],
c["section"], c["text"]))
counts = Counter(tokenize(blob))
self.doc_len.append(sum(counts.values()))
for term, tf in counts.items():
self.postings[term].append((idx, tf))
df[term] += 1
n = len(self.chunks)
self.avgdl = sum(self.doc_len) / n
self.idf = {t: math.log(1 + (n - d + 0.5) / (d + 0.5)) for t, d in df.items()}
def _build_note_tokens(self):
"""Pre-tokenise each chunk's topical title (see topical_title) for the
title-match boost in search(). Each chunk is also flagged as a
regulation (act codes beginning SOR/C.R.C.) for the Act-over-regulation
preference, and as collective-agreement back-matter (memoranda and
letters with no article number) for the back-matter penalty."""
self._note_tokens = []
self._is_regulation = []
self._is_backmatter = []
for c in self.chunks:
self._note_tokens.append(set(tokenize(topical_title(c))))
self._is_regulation.append(
c.get("doc_type", "legislation") == "legislation"
and c["act_code"].startswith(("SOR", "C.R.C")))
self._is_backmatter.append(
c.get("doc_type") == "agreement"
and not str(c["section"])[:1].isdigit())
def _build_appendix_index(self):
"""Index directive appendices by (act_code, letter), so a directive
section that cites 'Appendix C' can pull that appendix into the result
set -- a directive's rate-table appendices are bare numbers and rank
poorly on a natural-language query, yet the section citing them is of
little use without them."""
self._appendix = defaultdict(list)
for idx, c in enumerate(self.chunks):
if c.get("doc_type") != "directive":
continue
m = _APPENDIX_HEAD.match(c["marginal_note"])
if m:
self._appendix[(c["act_code"], m.group(1).upper())].append(idx)
def _load_semantic(self):
"""Load precomputed embeddings and the query embedder.
Any failure (missing embeddings, or numpy/model unavailable) degrades the
index to BM25-only rather than breaking retrieval.
"""
self.semantic = False
emb_path = PROCESSED_DIR / "embeddings.npz"
if not emb_path.exists():
print("CanLex index: no embeddings.npz; using BM25 only "
"(run 'canlex.embed' to enable semantic search).", file=sys.stderr)
return
try:
import numpy as np
from .embed import Embedder
with np.load(emb_path) as data:
id_to_vec = dict(zip(data["ids"].tolist(), data["vectors"]))
dim = int(data["vectors"].shape[1])
missing = 0
rows = []
for c in self.chunks:
vec = id_to_vec.get(c["id"])
if vec is None:
missing += 1
rows.append(np.zeros(dim, dtype=np.float32))
else:
rows.append(vec)
self._np = np
self.vectors = np.vstack(rows)
self.embedder = Embedder()
self.semantic = True
if missing:
print(f"CanLex index: {missing}/{len(self.chunks)} sections lack "
f"embeddings; re-run 'canlex.embed' to refresh.", file=sys.stderr)
except Exception as exc:
print(f"CanLex index: semantic search disabled ({type(exc).__name__}: "
f"{exc}); using BM25 only.", file=sys.stderr)
self.semantic = False
def _load_reranker(self):
"""Load the cross-encoder reranker; degrade to the fusion order on failure."""
self.reranker = None
try:
from .rerank import Reranker
self.reranker = Reranker()
except Exception as exc:
print(f"CanLex index: reranker disabled ({type(exc).__name__}: {exc}); "
f"using hybrid fusion order.", file=sys.stderr)
def _bm25_scores(self, query):
scores = defaultdict(float)
for term in set(tokenize(query)):
idf = self.idf.get(term)
if idf is None:
continue
for idx, tf in self.postings[term]:
dl = self.doc_len[idx]
denom = tf + K1 * (1 - B + B * dl / self.avgdl)
scores[idx] += idf * tf * (K1 + 1) / denom
return scores
def _semantic_ranking(self, query):
qv = self.embedder.encode_query(query)
sims = self.vectors @ qv
order = self._np.argsort(sims)[::-1][:CANDIDATES]
# The top cosine similarity doubles as a corpus-coverage signal: a query
# the corpus cannot answer has no passage close to it.
return [int(i) for i in order], float(sims.max())
def _rerank_doc(self, idx):
c = self.chunks[idx]
return f"{c['citation']} — {c['marginal_note']}\n{c['text']}"
def _source_key(self, idx):
"""The parent document a chunk belongs to, for diversity capping. Returns
None for primary instruments -- legislation, collective agreements and
directives -- whose sections are distinct provisions and are never
capped; case law is keyed by citation, memoranda by memo number."""
c = self.chunks[idx]
doc_type = c.get("doc_type", "legislation")
if doc_type in PRIMARY_DOC_TYPES:
return None
if doc_type == "memorandum":
return ("memorandum", c["section"]) # act_code is a shared constant
return (doc_type, c["act_code"]) # one decision, keyed by citation
def _diversify(self, ordered):
"""Reorder so no single case, memorandum, agreement or directive can
monopolise the results: once a source has contributed SOURCE_CAP chunks,
its remaining chunks are deferred below every other candidate. This stops
a heavily paragraph-chunked decision from crowding out the statute it
interprets. Legislation is never capped."""
kept, deferred, counts = [], [], defaultdict(int)
for idx in ordered:
key = self._source_key(idx)
if key is None:
kept.append(idx)
continue
counts[key] += 1
(kept if counts[key] <= SOURCE_CAP else deferred).append(idx)
return kept + deferred
def _ensure_primary(self, ordered, top_k, q_tokens):
"""Guarantee the governing primary instrument is surfaced: when the
natural top_k is monopolised by case law or D-memoranda that interpret
a statute, pull the most topically-on-target Act/agreement/directive/
delegation section into the top_k, displacing the lowest-ranked
secondary sources. The single best match is always kept in place.
Two changes from the older 'ensure_legislation' guarantee: (i) all
primary instruments count, not only legislation -- so an FB-Agreement
query that surfaces only FPSLREB case law gets the agreement article
pulled in too; (ii) the candidate to promote is chosen by title-match
against the query (the section whose marginal_note covers the most of
the query's distinctive vocabulary), not by raw fusion rank. The
fusion rank surfaces tangentially-on-topic sections that share the
Act's general vocabulary; the title-match scorer surfaces the section
actually about the topic ('Seizure and forfeiture' over 'Report to
President' for a 'seize currency' query)."""
if top_k < 3:
return ordered
def is_primary(i):
return self.chunks[i].get("doc_type", "legislation") in PRIMARY_DOC_TYPES
top, rest = ordered[:top_k], ordered[top_k:]
need = 2 - sum(1 for i in top if is_primary(i))
if need <= 0:
return ordered
primary_in_rest = [i for i in rest if is_primary(i)]
if not primary_in_rest:
return ordered
if q_tokens:
def title_score(idx):
note_tokens = self._note_tokens[idx]
if not note_tokens:
return 0.0
matched = sum(self.idf.get(t, 0.0)
for t in note_tokens if t in q_tokens)
total = sum(self.idf.get(t, 0.0) for t in note_tokens) or 1.0
score = matched * matched / total
# Mirror the fusion-stage hierarchy preferences for tiebreaks:
# the governing Act beats its regulation, and numbered
# agreement articles beat their back-matter, when both have
# identical titles (e.g. IRPA s. 112 and IRPR s. 160 both
# marginal-noted 'Application for protection').
if self._is_regulation[idx]:
score -= REG_PENALTY
if self._is_backmatter[idx]:
score -= BACKMATTER_PENALTY
return score
# Sort by title-match descending, then by original fusion order as
# a tiebreak (stable sort: keep the original rest order).
primary_in_rest.sort(key=title_score, reverse=True)
promote = primary_in_rest[:need]
drop = [i for i in reversed(top) if not is_primary(i)][:len(promote)]
if not drop:
return ordered
promote = promote[:len(drop)]
dropped, promoted = set(drop), set(promote)
kept = [i for i in top if i not in dropped]
return kept[:1] + promote + kept[1:] + drop + [
i for i in rest if i not in promoted]
def _cosurface_appendices(self, top):
"""Append the appendices the directive results cite but that retrieval
missed. A directive's rate tables ('Appendix C') rank poorly on a
natural-language query, yet a section that cites them is of little use
without them -- so the appendix travels with it. When more appendices
are cited than APPENDIX_CAP allows, the ones cited by the most results
win, so a lone off-topic result cannot outvote the relevant ones.
Returns `top` extended by up to APPENDIX_CAP appendix chunks."""
have = set(top)
cited = Counter()
for idx in top:
c = self.chunks[idx]
if c.get("doc_type") != "directive":
continue
seen = set() # count an appendix once per citing result
for m in _APPENDIX_REF.finditer(c["text"]):
key = (c["act_code"], m.group(1).upper())
for app in self._appendix.get(key, ()):
if app not in have and app not in seen:
seen.add(app)
cited[app] += 1
return top + [app for app, _ in cited.most_common(APPENDIX_CAP)]
def _highlight(self, query, indices):
"""For each result chunk, the subsection or paragraph most on point for
the query: {result_position: (citation_suffix, snippet)}. Uses the
cross-encoder; returns {} if it is unavailable or nothing is structured.
Only the first results are scored -- a pinpoint deep in the list is not
worth the cross-encoder cost."""
if not self.reranker:
return {}
jobs = [] # (result_position, label, scoring_text, snippet)
for pos, idx in enumerate(indices[:8]):
c = self.chunks[idx]
if c.get("doc_type", "legislation") != "legislation":
continue
note = c["marginal_note"]
for label, scoring, snippet in _provision_units(c["text"]):
jobs.append((pos, label, f"{note}. {scoring}", snippet))
if not jobs:
return {}
best = {} # result_position -> (score, label, snippet)
for (pos, label, _, snippet), score in zip(
jobs, self.reranker.score(query, [j[2] for j in jobs])):
if pos not in best or score > best[pos][0]:
best[pos] = (score, label, snippet)
return {pos: (label, " ".join(snippet[:240].split()))
for pos, (score, label, snippet) in best.items()}
def search(self, query, top_k=6, act=None, doc_type=None):
"""Hybrid candidate fusion (BM25 + semantic), then cross-encoder rerank."""
# Expand legal abbreviations (PRRA, H&C, ...) into statutory wording for
# the recall stages; the reranker still sees the user's original query.
expanded = expand_query(query)
confidence = None
fused = defaultdict(float)
bm25 = self._bm25_scores(expanded)
for rank, idx in enumerate(sorted(bm25, key=bm25.get, reverse=True)[:CANDIDATES]):
fused[idx] += 1.0 / (RRF_K + rank)
if self.semantic:
sem_order, confidence = self._semantic_ranking(expanded)
for rank, idx in enumerate(sem_order):
fused[idx] += W_SEM / (RRF_K + rank)
# Ensure explicitly-referenced sections are retrieved even if recall
# missed them -- but only for Acts the query actually names. A query
# like "IRPA s. 40 misrepresentation defence" uses the section number
# topically; pulling every Act's s. 40 into the pool would drown out
# the case law that interprets the section the user meant. Substring
# check rather than token-overlap because act_codes split into trivial
# tokens ("A-8.8" -> {a, 8}) that spuriously match common query words.
refs = _section_refs(query)
q_lc = query.lower()
def _act_in_query(c):
short = c["act_short"].lower()
code = c["act_code"].lower()
return ((short and short in q_lc)
or (code and len(code) >= 3 and code in q_lc))
if refs:
for idx, c in enumerate(self.chunks):
if (c["section"] in refs and idx not in fused
and _act_in_query(c)):
fused[idx] = 0.0
# Title-match boost: the marginal note is a section's canonical subject.
# Reward a candidate by how completely and how specifically the query
# matches its marginal note. The overlap is idf-weighted (matching a
# distinctive title like "hours of work" counts far more than a generic
# one like "Decision"), scaled by coverage, and capped -- so it nudges
# ranking toward the provision a question names by topic without
# overriding a strong base score.
q_tokens = set(tokenize(expanded))
for idx in list(fused):
note_tokens = self._note_tokens[idx]
total = sum(self.idf.get(t, 0.0) for t in note_tokens)
if total <= 0:
continue
matched = sum(self.idf.get(t, 0.0)
for t in note_tokens if t in q_tokens)
if matched > 0:
fused[idx] += min(MN_WEIGHT * matched * matched / total, MN_CAP)
# Hierarchy penalties: a topical question should surface the governing
# provision, not the supplementary material around it. An Act creates a
# duty while a regulation only elaborates procedure; a collective
# agreement's numbered articles are its substance while its memoranda
# and letters of understanding are back-matter. Both take a small
# fusion penalty so the governing provision wins a close contest.
for idx in list(fused):
if self._is_regulation[idx]:
fused[idx] -= REG_PENALTY
elif self._is_backmatter[idx]:
fused[idx] -= BACKMATTER_PENALTY
def allowed(idx):
c = self.chunks[idx]
if act and act.lower() not in (c["act_short"].lower(), c["act_code"].lower()):
return False
if doc_type and c.get("doc_type", "legislation") != doc_type:
return False
return True
candidates = [i for i in sorted(fused, key=fused.get, reverse=True) if allowed(i)]
if not candidates:
return []
scores = {i: fused[i] for i in candidates}
# Precision stage: the cross-encoder rescores the top candidate pool, but
# may only PROMOTE -- each pooled candidate is placed at the better of its
# fusion rank and its rerank rank, never below its fusion rank. The
# reranker reliably surfaces a strong answer the fusion ranked low, yet is
# unreliable on long statutory text (it can score the right section
# negative), so its power to demote a candidate is deliberately removed.
if self.reranker:
pool = candidates[:RERANK_POOL]
ce = dict(zip(pool, self.reranker.score(
query, [self._rerank_doc(i) for i in pool])))
fusion_rank = {idx: r for r, idx in enumerate(pool)}
rerank_rank = {idx: r for r, idx in enumerate(
sorted(pool, key=ce.get, reverse=True))}
pool.sort(key=lambda i: (min(fusion_rank[i], rerank_rank[i]),
fusion_rank[i]))
candidates = pool + candidates[RERANK_POOL:]
# Explicit section references are pinned to the very top -- using the
# same Act-mentioned constraint as the recall step above, for the same
# reason: a bare "s. 40" without an Act name is usually topical
# (e.g. "the IRPA s. 40 misrepresentation defence"), not a lookup.
if refs:
pinned = [i for i in candidates
if self.chunks[i]["section"] in refs
and _act_in_query(self.chunks[i])]
if pinned:
pinned_set = set(pinned)
candidates = pinned + [i for i in candidates if i not in pinned_set]
# Cap one-source monopolies, then guarantee a primary instrument on
# the topic is represented. The guarantee operates on a fixed visible
# window of min(top_k, 5), not the full top_k -- with top_k=20 (the
# eval default) the larger window almost always contains incidental
# legislation, so the guarantee never fires even when the governing
# provision is buried at rank 10+.
candidates = self._diversify(candidates)
candidates = self._ensure_primary(candidates, min(top_k, 5), q_tokens)
top = self._cosurface_appendices(candidates[:top_k])
highlights = self._highlight(query, top)
results = []
for pos, i in enumerate(top):
result = {**self.chunks[i], "score": round(scores.get(i, 0.0), 4),
"confidence": confidence}
if pos in highlights:
result["highlight"] = highlights[pos]
results.append(result)
return results
def get_section(self, act, section):
act = act.lower()
for c in self.chunks:
if c["section"] == section and act in (c["act_short"].lower(), c["act_code"].lower()):
return c
return None
def _build_xref(self):
"""Index legislation by (act, section); find each Act's definitions
section; link every regulation to its enabling Act and every
D-memorandum to the provisions it cites -- all for cross-referencing."""
self._by_section = {}
self._defs_section = {}
acts, regs = {}, {} # act_code -> (act_short, act_name)
for c in self.chunks:
if c.get("doc_type", "legislation") != "legislation":
continue
self._by_section[(c["act_code"], c["section"])] = c
if c["act_code"] not in self._defs_section and (
c["marginal_note"].strip().lower() in (
"definitions", "definition", "interpretation")):
self._defs_section[c["act_code"]] = c
bucket = regs if c["act_code"].startswith(("SOR", "C.R.C")) else acts
bucket.setdefault(c["act_code"], (c["act_short"], c["act_name"]))
# Link a regulation to the Act it is made under by matching their names
# ("X Regulations" <-> "X Act").
self._enabling_act = {} # reg code -> (act_short, act_name)
self._regulations = defaultdict(list) # act code -> [(reg_short, reg_name)]
def base(name):
return re.sub(r"\b(?:Act|Regulations)\b", "", name).strip().lower()
act_by_base = {base(n): (code, s, n) for code, (s, n) in acts.items()}
for rcode, (rshort, rname) in regs.items():
hit = act_by_base.get(base(rname))
if hit:
self._enabling_act[rcode] = (hit[1], hit[2])
self._regulations[hit[0]].append((rshort, rname))
# Link D-memoranda to the Customs Act / Customs Tariff provisions they
# cite; an unqualified "the Act" in a D-memo means the Customs Act.
by_short = {s.lower(): code for code, (s, n) in acts.items()}
customs, tariff = by_short.get("customs act"), by_short.get("customs tariff")
self._memos_for_section = defaultdict(set) # (act_code, section) -> memos
for c in self.chunks:
if c.get("doc_type") != "memorandum":
continue
for num, which in _MEMO_CITE.findall(c["text"]):
code = tariff if which.lower() == "customs tariff" else customs
if code:
self._memos_for_section[(code, num)].add(c["section"])
def related(self, chunk):
"""Cross-references for a legislation result, as a dict: 'provisions'
(intra-Act sections it cites, plus the definitions section),
'regulations' (made under this Act), 'enabling_act' (for a regulation,
the Act it is made under) and 'memoranda' (D-memo numbers citing this
section). Empty dict for case law, memoranda, etc."""
if chunk.get("doc_type", "legislation") != "legislation":
return {}
act = chunk["act_code"]
provisions, seen = [], {chunk["section"]}
defs = self._defs_section.get(act)
if defs and defs["section"] not in seen:
provisions.append((defs["section"], defs["marginal_note"]))
seen.add(defs["section"])
for match in _XREF.finditer(chunk["text"]):
sec = match.group(1)
if sec in seen:
continue
target = self._by_section.get((act, sec))
if target:
provisions.append((sec, target["marginal_note"]))
seen.add(sec)
if len(provisions) >= 8:
break
return {
"provisions": provisions,
"regulations": self._regulations.get(act, []),
"enabling_act": self._enabling_act.get(act),
"memoranda": sorted(self._memos_for_section.get(
(act, chunk["section"]), []))[:6],
}
def main():
if len(sys.argv) < 2:
print('usage: python -m canlex.index "your query"')
return
query = " ".join(sys.argv[1:])
index = LegislationIndex()
if index.reranker:
mode = "hybrid + cross-encoder rerank"
elif index.semantic:
mode = "hybrid (BM25 + semantic)"
else:
mode = "BM25 only"
print(f"{len(index.chunks)} sections indexed - {mode}. Query: {query!r}\n")
for r in index.search(query):
print(f"[{r['score']:.3f}] {r['citation']} - {r['marginal_note']}")
print(f" {r['text'].replace(chr(10), ' ')[:160]}")
print()
if __name__ == "__main__":
main()
|