| """Hybrid retrieval (BM25 + semantic) with cross-encoder reranking.""" |
| import json |
| import math |
| import os |
| import re |
| import sys |
| from collections import Counter, defaultdict |
|
|
| import snowballstemmer |
|
|
| from .config import PROCESSED_DIR |
| from .synonyms import expand_query |
|
|
| K1 = 1.5 |
| B = 0.75 |
| RRF_K = 60 |
| W_SEM = 2.0 |
| CANDIDATES = 80 |
| RERANK_POOL = 50 |
| MN_WEIGHT = float(os.environ.get("CANLEX_MN_WEIGHT", "0.0024")) |
| |
| |
| MN_CAP = float(os.environ.get("CANLEX_MN_CAP", "0.012")) |
| |
| |
| REG_PENALTY = float(os.environ.get("CANLEX_REG_PENALTY", "0.004")) |
| |
| |
| |
| BACKMATTER_PENALTY = float(os.environ.get("CANLEX_BACKMATTER_PENALTY", "0.004")) |
| |
| |
| |
| SOURCE_CAP = 2 |
| APPENDIX_CAP = 3 |
|
|
| |
| |
| |
| |
| PRIMARY_DOC_TYPES = frozenset({"legislation", "agreement", "directive", "delegation"}) |
|
|
| _TOKEN = re.compile(r"[a-z0-9]+") |
| _SECTION_REF = re.compile(r"\bs(?:ec(?:tion)?)?s?\.?\s*(\d+(?:\.\d+)?)") |
| |
| |
| _XREF = re.compile( |
| r"\b(?:sections?|subsections?|paragraphs?|ss?\.)\s*(\d+(?:\.\d+)?)", |
| re.IGNORECASE) |
|
|
| |
| |
| _MARKER = re.compile(r"(?m)^\(([0-9a-zA-Z]+(?:\.\d+)?)\)") |
|
|
| |
| |
| _MEMO_CITE = re.compile( |
| r"\b(?:sub)?sections?\s+(\d+(?:\.\d+)?)(?:\([^)]+\))*\s+of\s+the\s+" |
| r"(Customs Act|Customs Tariff|Act)\b", re.IGNORECASE) |
|
|
| |
| |
| |
| _APPENDIX_REF = re.compile(r"\bAppendi(?:x|ces)\s+([A-Za-z])\b(?!\s+of\b)", |
| re.IGNORECASE) |
| |
| _APPENDIX_HEAD = re.compile(r"Appendix\s+([A-Za-z])\b", re.IGNORECASE) |
|
|
|
|
| _STEMMER = snowballstemmer.stemmer("english") |
| _STEM_CACHE = {} |
|
|
|
|
| |
| |
| |
| |
| _STEM_NORMALIZE = { |
| "seizur": "seiz", |
| "forfeitur": "forfeit", |
| "appel": "appeal", |
| "detent": "detain", |
| "exclus": "exclud", |
| "admiss": "admit", |
| "applic": "appli", |
| "complianc": "compli", |
| "grievanc": "griev", |
| } |
|
|
|
|
| def _stem(word): |
| """Snowball-stem a word, memoised -- legal text repeats terms heavily. |
| A small post-stem normalization merges a few verb/noun pairs Snowball |
| leaves apart ('seize'/'seizure', 'forfeit'/'forfeiture').""" |
| stemmed = _STEM_CACHE.get(word) |
| if stemmed is None: |
| stemmed = _STEMMER.stemWord(word) |
| stemmed = _STEM_NORMALIZE.get(stemmed, stemmed) |
| _STEM_CACHE[word] = stemmed |
| return stemmed |
|
|
|
|
| def tokenize(text): |
| """Lower-case, split on word characters, and Snowball-stem each token, so a |
| query matches a provision even when their word forms differ -- 'possession' |
| vs 'possess', 'reporting' vs 'report', 'importation' vs 'import'.""" |
| return [_stem(w) for w in _TOKEN.findall(text.lower())] |
|
|
|
|
| def _section_refs(query): |
| """Pull explicit section numbers from a query, e.g. 'section 34', 's. 20.1'.""" |
| return set(_SECTION_REF.findall(query.lower())) |
|
|
|
|
| def topical_title(chunk): |
| """Return the chunk's topic-bearing string, used wherever a section's |
| 'title' is weighted for retrieval -- BM25 indexing, the title-match boost, |
| and the semantic embedding. Differs by doc_type because the field that |
| carries the topic differs: legislation/agreement/directive/delegation use |
| the marginal_note (section heading); D-memoranda use 'part' because their |
| marginal_note is a generic banner; case-law uses 'heading' because its |
| marginal_note is just the paragraph range ('paras 11-13') and the case |
| proposition lives in heading.""" |
| doc_type = chunk.get("doc_type") |
| if doc_type == "memorandum": |
| return chunk.get("part") or chunk["marginal_note"] |
| if doc_type == "caselaw": |
| return chunk.get("heading") or chunk["marginal_note"] |
| return chunk["marginal_note"] |
|
|
|
|
| def _provision_units(text): |
| """Citable parts of a provision, for pinpoint scoring -- a list of |
| (citation_suffix, scoring_text, snippet). One entry per paragraph, with its |
| subsection chapeau prepended to scoring_text for context, plus one per |
| paragraph-less subsection. Returns [] when the provision is too flat to |
| pinpoint (fewer than two subdivisions).""" |
| marks = list(_MARKER.finditer(text)) |
| if len(marks) < 2: |
| return [] |
| spans = [] |
| for i, m in enumerate(marks): |
| end = marks[i + 1].start() if i + 1 < len(marks) else len(text) |
| spans.append((m.group(1), text[m.start():end].strip())) |
| units, cur_sub, cur_intro = [], "", "" |
| for j, (token, body) in enumerate(spans): |
| if "[Repealed" in body[:40]: |
| if token[0].isdigit(): |
| cur_sub, cur_intro = f"({token})", "" |
| continue |
| if token[0].isdigit(): |
| cur_sub, cur_intro = f"({token})", body |
| nxt = spans[j + 1][0] if j + 1 < len(spans) else "" |
| if not nxt or nxt[0].isdigit(): |
| units.append((cur_sub, body, body)) |
| |
| else: |
| label = f"{cur_sub}({token})" if cur_sub else f"({token})" |
| units.append((label, f"{cur_intro} {body}".strip(), body)) |
| return units |
|
|
|
|
| class LegislationIndex: |
| def __init__(self): |
| self.chunks = [] |
| for path in sorted(PROCESSED_DIR.glob("*.json")): |
| self.chunks.extend(json.loads(path.read_text(encoding="utf-8"))) |
| if not self.chunks: |
| raise RuntimeError( |
| f"No processed legislation in {PROCESSED_DIR}. Run 'canlex.ingest' first.") |
| self._build_bm25() |
| self._build_note_tokens() |
| self._build_xref() |
| self._build_appendix_index() |
| self._load_semantic() |
| self._load_reranker() |
|
|
| def _build_bm25(self): |
| self.doc_len = [] |
| self.postings = defaultdict(list) |
| df = defaultdict(int) |
| for idx, c in enumerate(self.chunks): |
| |
| |
| |
| |
| |
| |
| |
| title = topical_title(c) |
| blob = " ".join((title, title, c["heading"], |
| c["part"], c["division"], c["act_name"], c["act_code"], |
| c["section"], c["text"])) |
| counts = Counter(tokenize(blob)) |
| self.doc_len.append(sum(counts.values())) |
| for term, tf in counts.items(): |
| self.postings[term].append((idx, tf)) |
| df[term] += 1 |
| n = len(self.chunks) |
| self.avgdl = sum(self.doc_len) / n |
| self.idf = {t: math.log(1 + (n - d + 0.5) / (d + 0.5)) for t, d in df.items()} |
|
|
| def _build_note_tokens(self): |
| """Pre-tokenise each chunk's topical title (see topical_title) for the |
| title-match boost in search(). Each chunk is also flagged as a |
| regulation (act codes beginning SOR/C.R.C.) for the Act-over-regulation |
| preference, and as collective-agreement back-matter (memoranda and |
| letters with no article number) for the back-matter penalty.""" |
| self._note_tokens = [] |
| self._is_regulation = [] |
| self._is_backmatter = [] |
| for c in self.chunks: |
| self._note_tokens.append(set(tokenize(topical_title(c)))) |
| self._is_regulation.append( |
| c.get("doc_type", "legislation") == "legislation" |
| and c["act_code"].startswith(("SOR", "C.R.C"))) |
| self._is_backmatter.append( |
| c.get("doc_type") == "agreement" |
| and not str(c["section"])[:1].isdigit()) |
|
|
| def _build_appendix_index(self): |
| """Index directive appendices by (act_code, letter), so a directive |
| section that cites 'Appendix C' can pull that appendix into the result |
| set -- a directive's rate-table appendices are bare numbers and rank |
| poorly on a natural-language query, yet the section citing them is of |
| little use without them.""" |
| self._appendix = defaultdict(list) |
| for idx, c in enumerate(self.chunks): |
| if c.get("doc_type") != "directive": |
| continue |
| m = _APPENDIX_HEAD.match(c["marginal_note"]) |
| if m: |
| self._appendix[(c["act_code"], m.group(1).upper())].append(idx) |
|
|
| def _load_semantic(self): |
| """Load precomputed embeddings and the query embedder. |
| |
| Any failure (missing embeddings, or numpy/model unavailable) degrades the |
| index to BM25-only rather than breaking retrieval. |
| """ |
| self.semantic = False |
| emb_path = PROCESSED_DIR / "embeddings.npz" |
| if not emb_path.exists(): |
| print("CanLex index: no embeddings.npz; using BM25 only " |
| "(run 'canlex.embed' to enable semantic search).", file=sys.stderr) |
| return |
| try: |
| import numpy as np |
| from .embed import Embedder |
| with np.load(emb_path) as data: |
| id_to_vec = dict(zip(data["ids"].tolist(), data["vectors"])) |
| dim = int(data["vectors"].shape[1]) |
| missing = 0 |
| rows = [] |
| for c in self.chunks: |
| vec = id_to_vec.get(c["id"]) |
| if vec is None: |
| missing += 1 |
| rows.append(np.zeros(dim, dtype=np.float32)) |
| else: |
| rows.append(vec) |
| self._np = np |
| self.vectors = np.vstack(rows) |
| self.embedder = Embedder() |
| self.semantic = True |
| if missing: |
| print(f"CanLex index: {missing}/{len(self.chunks)} sections lack " |
| f"embeddings; re-run 'canlex.embed' to refresh.", file=sys.stderr) |
| except Exception as exc: |
| print(f"CanLex index: semantic search disabled ({type(exc).__name__}: " |
| f"{exc}); using BM25 only.", file=sys.stderr) |
| self.semantic = False |
|
|
| def _load_reranker(self): |
| """Load the cross-encoder reranker; degrade to the fusion order on failure.""" |
| self.reranker = None |
| try: |
| from .rerank import Reranker |
| self.reranker = Reranker() |
| except Exception as exc: |
| print(f"CanLex index: reranker disabled ({type(exc).__name__}: {exc}); " |
| f"using hybrid fusion order.", file=sys.stderr) |
|
|
| def _bm25_scores(self, query): |
| scores = defaultdict(float) |
| for term in set(tokenize(query)): |
| idf = self.idf.get(term) |
| if idf is None: |
| continue |
| for idx, tf in self.postings[term]: |
| dl = self.doc_len[idx] |
| denom = tf + K1 * (1 - B + B * dl / self.avgdl) |
| scores[idx] += idf * tf * (K1 + 1) / denom |
| return scores |
|
|
| def _semantic_ranking(self, query): |
| qv = self.embedder.encode_query(query) |
| sims = self.vectors @ qv |
| order = self._np.argsort(sims)[::-1][:CANDIDATES] |
| |
| |
| return [int(i) for i in order], float(sims.max()) |
|
|
| def _rerank_doc(self, idx): |
| c = self.chunks[idx] |
| return f"{c['citation']} — {c['marginal_note']}\n{c['text']}" |
|
|
| def _source_key(self, idx): |
| """The parent document a chunk belongs to, for diversity capping. Returns |
| None for primary instruments -- legislation, collective agreements and |
| directives -- whose sections are distinct provisions and are never |
| capped; case law is keyed by citation, memoranda by memo number.""" |
| c = self.chunks[idx] |
| doc_type = c.get("doc_type", "legislation") |
| if doc_type in PRIMARY_DOC_TYPES: |
| return None |
| if doc_type == "memorandum": |
| return ("memorandum", c["section"]) |
| return (doc_type, c["act_code"]) |
|
|
| def _diversify(self, ordered): |
| """Reorder so no single case, memorandum, agreement or directive can |
| monopolise the results: once a source has contributed SOURCE_CAP chunks, |
| its remaining chunks are deferred below every other candidate. This stops |
| a heavily paragraph-chunked decision from crowding out the statute it |
| interprets. Legislation is never capped.""" |
| kept, deferred, counts = [], [], defaultdict(int) |
| for idx in ordered: |
| key = self._source_key(idx) |
| if key is None: |
| kept.append(idx) |
| continue |
| counts[key] += 1 |
| (kept if counts[key] <= SOURCE_CAP else deferred).append(idx) |
| return kept + deferred |
|
|
| def _ensure_primary(self, ordered, top_k, q_tokens): |
| """Guarantee the governing primary instrument is surfaced: when the |
| natural top_k is monopolised by case law or D-memoranda that interpret |
| a statute, pull the most topically-on-target Act/agreement/directive/ |
| delegation section into the top_k, displacing the lowest-ranked |
| secondary sources. The single best match is always kept in place. |
| |
| Two changes from the older 'ensure_legislation' guarantee: (i) all |
| primary instruments count, not only legislation -- so an FB-Agreement |
| query that surfaces only FPSLREB case law gets the agreement article |
| pulled in too; (ii) the candidate to promote is chosen by title-match |
| against the query (the section whose marginal_note covers the most of |
| the query's distinctive vocabulary), not by raw fusion rank. The |
| fusion rank surfaces tangentially-on-topic sections that share the |
| Act's general vocabulary; the title-match scorer surfaces the section |
| actually about the topic ('Seizure and forfeiture' over 'Report to |
| President' for a 'seize currency' query).""" |
| if top_k < 3: |
| return ordered |
| def is_primary(i): |
| return self.chunks[i].get("doc_type", "legislation") in PRIMARY_DOC_TYPES |
| top, rest = ordered[:top_k], ordered[top_k:] |
| need = 2 - sum(1 for i in top if is_primary(i)) |
| if need <= 0: |
| return ordered |
| primary_in_rest = [i for i in rest if is_primary(i)] |
| if not primary_in_rest: |
| return ordered |
| if q_tokens: |
| def title_score(idx): |
| note_tokens = self._note_tokens[idx] |
| if not note_tokens: |
| return 0.0 |
| matched = sum(self.idf.get(t, 0.0) |
| for t in note_tokens if t in q_tokens) |
| total = sum(self.idf.get(t, 0.0) for t in note_tokens) or 1.0 |
| score = matched * matched / total |
| |
| |
| |
| |
| |
| if self._is_regulation[idx]: |
| score -= REG_PENALTY |
| if self._is_backmatter[idx]: |
| score -= BACKMATTER_PENALTY |
| return score |
| |
| |
| primary_in_rest.sort(key=title_score, reverse=True) |
| promote = primary_in_rest[:need] |
| drop = [i for i in reversed(top) if not is_primary(i)][:len(promote)] |
| if not drop: |
| return ordered |
| promote = promote[:len(drop)] |
| dropped, promoted = set(drop), set(promote) |
| kept = [i for i in top if i not in dropped] |
| return kept[:1] + promote + kept[1:] + drop + [ |
| i for i in rest if i not in promoted] |
|
|
| def _cosurface_appendices(self, top): |
| """Append the appendices the directive results cite but that retrieval |
| missed. A directive's rate tables ('Appendix C') rank poorly on a |
| natural-language query, yet a section that cites them is of little use |
| without them -- so the appendix travels with it. When more appendices |
| are cited than APPENDIX_CAP allows, the ones cited by the most results |
| win, so a lone off-topic result cannot outvote the relevant ones. |
| Returns `top` extended by up to APPENDIX_CAP appendix chunks.""" |
| have = set(top) |
| cited = Counter() |
| for idx in top: |
| c = self.chunks[idx] |
| if c.get("doc_type") != "directive": |
| continue |
| seen = set() |
| for m in _APPENDIX_REF.finditer(c["text"]): |
| key = (c["act_code"], m.group(1).upper()) |
| for app in self._appendix.get(key, ()): |
| if app not in have and app not in seen: |
| seen.add(app) |
| cited[app] += 1 |
| return top + [app for app, _ in cited.most_common(APPENDIX_CAP)] |
|
|
| def _highlight(self, query, indices): |
| """For each result chunk, the subsection or paragraph most on point for |
| the query: {result_position: (citation_suffix, snippet)}. Uses the |
| cross-encoder; returns {} if it is unavailable or nothing is structured. |
| Only the first results are scored -- a pinpoint deep in the list is not |
| worth the cross-encoder cost.""" |
| if not self.reranker: |
| return {} |
| jobs = [] |
| for pos, idx in enumerate(indices[:8]): |
| c = self.chunks[idx] |
| if c.get("doc_type", "legislation") != "legislation": |
| continue |
| note = c["marginal_note"] |
| for label, scoring, snippet in _provision_units(c["text"]): |
| jobs.append((pos, label, f"{note}. {scoring}", snippet)) |
| if not jobs: |
| return {} |
| best = {} |
| for (pos, label, _, snippet), score in zip( |
| jobs, self.reranker.score(query, [j[2] for j in jobs])): |
| if pos not in best or score > best[pos][0]: |
| best[pos] = (score, label, snippet) |
| return {pos: (label, " ".join(snippet[:240].split())) |
| for pos, (score, label, snippet) in best.items()} |
|
|
| def search(self, query, top_k=6, act=None, doc_type=None): |
| """Hybrid candidate fusion (BM25 + semantic), then cross-encoder rerank.""" |
| |
| |
| expanded = expand_query(query) |
| confidence = None |
| fused = defaultdict(float) |
| bm25 = self._bm25_scores(expanded) |
| for rank, idx in enumerate(sorted(bm25, key=bm25.get, reverse=True)[:CANDIDATES]): |
| fused[idx] += 1.0 / (RRF_K + rank) |
| if self.semantic: |
| sem_order, confidence = self._semantic_ranking(expanded) |
| for rank, idx in enumerate(sem_order): |
| fused[idx] += W_SEM / (RRF_K + rank) |
|
|
| |
| |
| |
| |
| |
| |
| |
| refs = _section_refs(query) |
| q_lc = query.lower() |
| def _act_in_query(c): |
| short = c["act_short"].lower() |
| code = c["act_code"].lower() |
| return ((short and short in q_lc) |
| or (code and len(code) >= 3 and code in q_lc)) |
| if refs: |
| for idx, c in enumerate(self.chunks): |
| if (c["section"] in refs and idx not in fused |
| and _act_in_query(c)): |
| fused[idx] = 0.0 |
|
|
| |
| |
| |
| |
| |
| |
| |
| q_tokens = set(tokenize(expanded)) |
| for idx in list(fused): |
| note_tokens = self._note_tokens[idx] |
| total = sum(self.idf.get(t, 0.0) for t in note_tokens) |
| if total <= 0: |
| continue |
| matched = sum(self.idf.get(t, 0.0) |
| for t in note_tokens if t in q_tokens) |
| if matched > 0: |
| fused[idx] += min(MN_WEIGHT * matched * matched / total, MN_CAP) |
|
|
| |
| |
| |
| |
| |
| |
| for idx in list(fused): |
| if self._is_regulation[idx]: |
| fused[idx] -= REG_PENALTY |
| elif self._is_backmatter[idx]: |
| fused[idx] -= BACKMATTER_PENALTY |
|
|
| def allowed(idx): |
| c = self.chunks[idx] |
| if act and act.lower() not in (c["act_short"].lower(), c["act_code"].lower()): |
| return False |
| if doc_type and c.get("doc_type", "legislation") != doc_type: |
| return False |
| return True |
|
|
| candidates = [i for i in sorted(fused, key=fused.get, reverse=True) if allowed(i)] |
| if not candidates: |
| return [] |
| scores = {i: fused[i] for i in candidates} |
|
|
| |
| |
| |
| |
| |
| |
| if self.reranker: |
| pool = candidates[:RERANK_POOL] |
| ce = dict(zip(pool, self.reranker.score( |
| query, [self._rerank_doc(i) for i in pool]))) |
| fusion_rank = {idx: r for r, idx in enumerate(pool)} |
| rerank_rank = {idx: r for r, idx in enumerate( |
| sorted(pool, key=ce.get, reverse=True))} |
| pool.sort(key=lambda i: (min(fusion_rank[i], rerank_rank[i]), |
| fusion_rank[i])) |
| candidates = pool + candidates[RERANK_POOL:] |
|
|
| |
| |
| |
| |
| if refs: |
| pinned = [i for i in candidates |
| if self.chunks[i]["section"] in refs |
| and _act_in_query(self.chunks[i])] |
| if pinned: |
| pinned_set = set(pinned) |
| candidates = pinned + [i for i in candidates if i not in pinned_set] |
|
|
| |
| |
| |
| |
| |
| |
| candidates = self._diversify(candidates) |
| candidates = self._ensure_primary(candidates, min(top_k, 5), q_tokens) |
|
|
| top = self._cosurface_appendices(candidates[:top_k]) |
| highlights = self._highlight(query, top) |
| results = [] |
| for pos, i in enumerate(top): |
| result = {**self.chunks[i], "score": round(scores.get(i, 0.0), 4), |
| "confidence": confidence} |
| if pos in highlights: |
| result["highlight"] = highlights[pos] |
| results.append(result) |
| return results |
|
|
| def get_section(self, act, section): |
| act = act.lower() |
| for c in self.chunks: |
| if c["section"] == section and act in (c["act_short"].lower(), c["act_code"].lower()): |
| return c |
| return None |
|
|
| def _build_xref(self): |
| """Index legislation by (act, section); find each Act's definitions |
| section; link every regulation to its enabling Act and every |
| D-memorandum to the provisions it cites -- all for cross-referencing.""" |
| self._by_section = {} |
| self._defs_section = {} |
| acts, regs = {}, {} |
| for c in self.chunks: |
| if c.get("doc_type", "legislation") != "legislation": |
| continue |
| self._by_section[(c["act_code"], c["section"])] = c |
| if c["act_code"] not in self._defs_section and ( |
| c["marginal_note"].strip().lower() in ( |
| "definitions", "definition", "interpretation")): |
| self._defs_section[c["act_code"]] = c |
| bucket = regs if c["act_code"].startswith(("SOR", "C.R.C")) else acts |
| bucket.setdefault(c["act_code"], (c["act_short"], c["act_name"])) |
|
|
| |
| |
| self._enabling_act = {} |
| self._regulations = defaultdict(list) |
| def base(name): |
| return re.sub(r"\b(?:Act|Regulations)\b", "", name).strip().lower() |
| act_by_base = {base(n): (code, s, n) for code, (s, n) in acts.items()} |
| for rcode, (rshort, rname) in regs.items(): |
| hit = act_by_base.get(base(rname)) |
| if hit: |
| self._enabling_act[rcode] = (hit[1], hit[2]) |
| self._regulations[hit[0]].append((rshort, rname)) |
|
|
| |
| |
| by_short = {s.lower(): code for code, (s, n) in acts.items()} |
| customs, tariff = by_short.get("customs act"), by_short.get("customs tariff") |
| self._memos_for_section = defaultdict(set) |
| for c in self.chunks: |
| if c.get("doc_type") != "memorandum": |
| continue |
| for num, which in _MEMO_CITE.findall(c["text"]): |
| code = tariff if which.lower() == "customs tariff" else customs |
| if code: |
| self._memos_for_section[(code, num)].add(c["section"]) |
|
|
| def related(self, chunk): |
| """Cross-references for a legislation result, as a dict: 'provisions' |
| (intra-Act sections it cites, plus the definitions section), |
| 'regulations' (made under this Act), 'enabling_act' (for a regulation, |
| the Act it is made under) and 'memoranda' (D-memo numbers citing this |
| section). Empty dict for case law, memoranda, etc.""" |
| if chunk.get("doc_type", "legislation") != "legislation": |
| return {} |
| act = chunk["act_code"] |
| provisions, seen = [], {chunk["section"]} |
| defs = self._defs_section.get(act) |
| if defs and defs["section"] not in seen: |
| provisions.append((defs["section"], defs["marginal_note"])) |
| seen.add(defs["section"]) |
| for match in _XREF.finditer(chunk["text"]): |
| sec = match.group(1) |
| if sec in seen: |
| continue |
| target = self._by_section.get((act, sec)) |
| if target: |
| provisions.append((sec, target["marginal_note"])) |
| seen.add(sec) |
| if len(provisions) >= 8: |
| break |
| return { |
| "provisions": provisions, |
| "regulations": self._regulations.get(act, []), |
| "enabling_act": self._enabling_act.get(act), |
| "memoranda": sorted(self._memos_for_section.get( |
| (act, chunk["section"]), []))[:6], |
| } |
|
|
|
|
| def main(): |
| if len(sys.argv) < 2: |
| print('usage: python -m canlex.index "your query"') |
| return |
| query = " ".join(sys.argv[1:]) |
| index = LegislationIndex() |
| if index.reranker: |
| mode = "hybrid + cross-encoder rerank" |
| elif index.semantic: |
| mode = "hybrid (BM25 + semantic)" |
| else: |
| mode = "BM25 only" |
| print(f"{len(index.chunks)} sections indexed - {mode}. Query: {query!r}\n") |
| for r in index.search(query): |
| print(f"[{r['score']:.3f}] {r['citation']} - {r['marginal_note']}") |
| print(f" {r['text'].replace(chr(10), ' ')[:160]}") |
| print() |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|