""" semanticmatcher.py ==================== Deterministic semantic string matcher for short strings (10–12 words). Algorithm: Weighted ensemble of three independent signals 1. Lexical Jaccard — lemmatized token overlap (weight: 0.20) 2. Synonym Jaccard — WordNet-expanded token overlap (weight: 0.25) 3. Semantic Cosine — sentence-transformers embedding similarity (weight: 0.55) All three layers are fully deterministic: same inputs → same score, always. Install dependencies: python -m nltk.downloader wordnet omw-1.4 stopwords punkt punkt_tab averaged_perceptron_tagger_eng """ import re import string from functools import lru_cache import nltk import numpy as np from nltk.corpus import wordnet, stopwords from nltk.stem import WordNetLemmatizer from sentence_transformers import SentenceTransformer # ── Config ──────────────────────────────────────────────────────────────────── WEIGHTS = { "lexical": 0.20, # Plain lemma overlap "synonym": 0.25, # WordNet-expanded overlap "semantic": 0.55, # Embedding cosine similarity } MATCH_THRESHOLD = 0.72 # Score ≥ this → strings "mean the same thing" STRONG_THRESHOLD = 0.88 # Score ≥ this → high-confidence match # Embedding model: deterministic, no sampling _EMBEDDING_MODEL_NAME = "all-MiniLM-L6-v2" # ── Lazy singletons ─────────────────────────────────────────────────────────── _model: SentenceTransformer | None = None _lemmatizer: WordNetLemmatizer | None = None _stop_words: set[str] | None = None def _get_model() -> SentenceTransformer: global _model if _model is None: _model = SentenceTransformer(_EMBEDDING_MODEL_NAME) return _model def _get_lemmatizer() -> WordNetLemmatizer: global _lemmatizer if _lemmatizer is None: _lemmatizer = WordNetLemmatizer() return _lemmatizer def _get_stopwords() -> set[str]: global _stop_words if _stop_words is None: _stop_words = set(stopwords.words("english")) return _stop_words # ── Text preprocessing ──────────────────────────────────────────────────────── def _get_wordnet_pos(treebank_tag: str) -> str: """Map POS treebank tag to WordNet POS constant for better lemmatization.""" if treebank_tag.startswith("J"): return wordnet.ADJ elif treebank_tag.startswith("V"): return wordnet.VERB elif treebank_tag.startswith("R"): return wordnet.ADV return wordnet.NOUN def normalize(text: str) -> str: """Lowercase, strip punctuation, collapse whitespace.""" text = text.lower() text = text.translate(str.maketrans("", "", string.punctuation)) text = re.sub(r"\s+", " ", text).strip() return text def tokenize_and_lemmatize(text: str) -> list[str]: """Tokenize, POS-tag, lemmatize, and remove stopwords.""" lemmatizer = _get_lemmatizer() stop_words = _get_stopwords() tokens = nltk.word_tokenize(normalize(text)) pos_tags = nltk.pos_tag(tokens) lemmas = [ lemmatizer.lemmatize(word, _get_wordnet_pos(pos)) for word, pos in pos_tags if word not in stop_words and word.isalpha() ] return lemmas # ── WordNet synonym expansion ───────────────────────────────────────────────── @lru_cache(maxsize=512) def _synonyms(word: str) -> frozenset[str]: """Return all WordNet lemma names for a word (including the word itself).""" syns: set[str] = {word} for synset in wordnet.synsets(word): for lemma in synset.lemmas(): # type: ignore syns.add(lemma.name().replace("_", " ").lower()) return frozenset(syns) def expand_with_synonyms(tokens: list[str]) -> set[str]: """Expand a token list to include all WordNet synonyms.""" expanded: set[str] = set() for token in tokens: expanded.update(_synonyms(token)) return expanded # ── Similarity metrics ──────────────────────────────────────────────────────── def jaccard(set_a: set[str], set_b: set[str]) -> float: """Jaccard similarity: |A ∩ B| / |A ∪ B|""" if not set_a and not set_b: return 1.0 intersection = set_a & set_b union = set_a | set_b return len(intersection) / len(union) def cosine_similarity(vec_a: np.ndarray, vec_b: np.ndarray) -> float: """Cosine similarity between two L2-normalized vectors.""" norm_a = np.linalg.norm(vec_a) norm_b = np.linalg.norm(vec_b) if norm_a == 0 or norm_b == 0: return 0.0 return float(np.dot(vec_a, vec_b) / (norm_a * norm_b)) # ── Core matcher ────────────────────────────────────────────────────────────── class SemanticMatcher: """ Deterministic semantic matcher for short strings. Usage: matcher = SemanticMatcher() result = matcher.match("The cat sat on the mat", "A cat was sitting on the mat") print(result) """ def __init__( self, match_threshold: float = MATCH_THRESHOLD, strong_threshold: float = STRONG_THRESHOLD, weights: dict[str, float] | None = None, ): self.match_threshold = match_threshold self.strong_threshold = strong_threshold self.weights = weights or WEIGHTS self.confidence_level: str = "no_match" total = sum(self.weights.values()) assert abs(total - 1.0) < 1e-6, f"Weights must sum to 1.0 (got {total:.4f})" # ── Inner Functions ──────────────────────────────────────────────────── def _layer_lexical(self, tokens_a: list[str], tokens_b: list[str]) -> float: return jaccard(set(tokens_a), set(tokens_b)) def _layer_synonym(self, tokens_a: list[str], tokens_b: list[str]) -> float: expanded_a = expand_with_synonyms(tokens_a) expanded_b = expand_with_synonyms(tokens_b) return jaccard(expanded_a, expanded_b) def _layer_semantic(self, text_a: str, text_b: str) -> float: model = _get_model() # encode() is deterministic: no sampling, fixed weights embeddings = model.encode( [normalize(text_a), normalize(text_b)], convert_to_numpy=True, normalize_embeddings=True, ) return cosine_similarity(embeddings[0], embeddings[1]) # type: ignore # ── Public Functions ──────────────────────────────────────────────────── def matchscore(self, text_a: str, text_b: str) -> float: """ Compare two strings and return a score of whether they are matching. Returns a float between 0.0 and 1.0, where 1.0 indicates a perfect match. """ # Fast-path: normalized exact match if normalize(text_a) == normalize(text_b): self.confidence_level = "strong" return 1.0 tokens_a = tokenize_and_lemmatize(text_a) tokens_b = tokenize_and_lemmatize(text_b) layer_scores = { "lexical": self._layer_lexical(tokens_a, tokens_b), "synonym": self._layer_synonym(tokens_a, tokens_b), "semantic": self._layer_semantic(text_a, text_b), } score = sum(self.weights[k] * v for k, v in layer_scores.items()) if score >= self.strong_threshold: self.confidence_level = "strong" elif score >= self.match_threshold: self.confidence_level = "moderate" else: self.confidence_level = "no_match" return score def match(self, text_a: str, text_b: str) -> bool: """Return True if the two texts are considered a match based on the score.""" score = self.matchscore(text_a, text_b) return score >= self.match_threshold def confidence(self) -> str: """Return 'strong' if score ≥ strong_threshold, else 'moderate' or 'no_match'.""" return self.confidence_level