Spaces:
Running
Running
| """Hybrid skill search engine (BM25 + embedding + lexical boost). | |
| Implements the search pipeline: | |
| Phase 1: BM25 rough-rank over all candidates | |
| Phase 2: Vector scoring (embedding cosine similarity) | |
| Phase 3: Hybrid score = vector_score + lexical_boost | |
| Phase 4: Deduplication + limit | |
| Used by MCP ``search_skills`` tool, ``retrieve_skill`` agent tool, | |
| and potentially other search interfaces. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import logging | |
| import re | |
| from typing import Any, Dict, List, Optional | |
| logger = logging.getLogger("openspace.cloud") | |
| def _check_safety(text: str) -> list[str]: | |
| """Lazy wrapper — avoids importing skill_engine at module load time.""" | |
| from openspace.skill_engine.skill_utils import check_skill_safety | |
| return check_skill_safety(text) | |
| def _is_safe(flags: list[str]) -> bool: | |
| from openspace.skill_engine.skill_utils import is_skill_safe | |
| return is_skill_safe(flags) | |
| _WORD_RE = re.compile(r"[a-z0-9]+") | |
| def _tokenize(value: str) -> list[str]: | |
| return _WORD_RE.findall(value.lower()) if value else [] | |
| def _lexical_boost(query_tokens: list[str], name: str, slug: str) -> float: | |
| """Compute lexical boost score based on exact/prefix token matching.""" | |
| slug_tokens = _tokenize(slug) | |
| name_tokens = _tokenize(name) | |
| boost = 0.0 | |
| # Slug exact / prefix | |
| if slug_tokens and all( | |
| any(ct == qt for ct in slug_tokens) for qt in query_tokens | |
| ): | |
| boost += 1.4 | |
| elif slug_tokens and all( | |
| any(ct.startswith(qt) for ct in slug_tokens) for qt in query_tokens | |
| ): | |
| boost += 0.8 | |
| # Name exact / prefix | |
| if name_tokens and all( | |
| any(ct == qt for ct in name_tokens) for qt in query_tokens | |
| ): | |
| boost += 1.1 | |
| elif name_tokens and all( | |
| any(ct.startswith(qt) for ct in name_tokens) for qt in query_tokens | |
| ): | |
| boost += 0.6 | |
| return boost | |
| class SkillSearchEngine: | |
| """Hybrid BM25 + embedding search engine for skills. | |
| Usage:: | |
| engine = SkillSearchEngine() | |
| results = engine.search( | |
| query="weather forecast", | |
| candidates=candidates, | |
| query_embedding=[...], # optional | |
| limit=20, | |
| ) | |
| """ | |
| def search( | |
| self, | |
| query: str, | |
| candidates: List[Dict[str, Any]], | |
| *, | |
| query_embedding: Optional[List[float]] = None, | |
| limit: int = 20, | |
| ) -> List[Dict[str, Any]]: | |
| """Run the full search pipeline on candidates. | |
| Each candidate dict should have at minimum: | |
| - ``skill_id``, ``name``, ``description`` | |
| - ``_embedding`` (optional): pre-computed embedding vector | |
| - ``source``: "openspace-local" | "cloud" | |
| Args: | |
| query: Search query text. | |
| candidates: Candidate dicts to rank. | |
| query_embedding: Pre-computed query embedding (if available). | |
| limit: Max results to return. | |
| Returns: | |
| Sorted list of result dicts (highest score first). | |
| """ | |
| q = query.strip() | |
| if not q or not candidates: | |
| return [] | |
| query_tokens = _tokenize(q) | |
| if not query_tokens: | |
| return [] | |
| # Phase 1: BM25 rough-rank | |
| filtered = self._bm25_phase(q, candidates, limit) | |
| # Phase 2+3: Vector + lexical scoring | |
| scored = self._score_phase(filtered, query_tokens, query_embedding) | |
| # Phase 4: Deduplicate and limit | |
| return self._dedup_and_limit(scored, limit) | |
| def _bm25_phase( | |
| self, | |
| query: str, | |
| candidates: List[Dict[str, Any]], | |
| limit: int, | |
| ) -> List[Dict[str, Any]]: | |
| """BM25 rough-rank to keep top candidates for embedding stage.""" | |
| from openspace.skill_engine.skill_ranker import SkillRanker, SkillCandidate | |
| ranker = SkillRanker(enable_cache=True) | |
| bm25_candidates = [ | |
| SkillCandidate( | |
| skill_id=c.get("skill_id", ""), | |
| name=c.get("name", ""), | |
| description=c.get("description", ""), | |
| body="", | |
| metadata=c, | |
| ) | |
| for c in candidates | |
| ] | |
| ranked = ranker.bm25_only(query, bm25_candidates, top_k=min(limit * 3, len(candidates))) | |
| ranked_ids = {sc.skill_id for sc in ranked} | |
| filtered = [c for c in candidates if c.get("skill_id") in ranked_ids] | |
| # If BM25 found nothing, fall back to all candidates | |
| return filtered if filtered else candidates | |
| def _score_phase( | |
| self, | |
| candidates: List[Dict[str, Any]], | |
| query_tokens: list[str], | |
| query_embedding: Optional[List[float]], | |
| ) -> List[Dict[str, Any]]: | |
| """Compute hybrid score = vector_score + lexical_boost.""" | |
| from openspace.cloud.embedding import cosine_similarity | |
| scored = [] | |
| for c in candidates: | |
| name = c.get("name", "") | |
| slug = c.get("skill_id", name).split("__")[0].replace(":", "-") | |
| # Vector score | |
| vector_score = 0.0 | |
| if query_embedding: | |
| skill_emb = c.get("_embedding") | |
| if skill_emb and isinstance(skill_emb, list): | |
| vector_score = cosine_similarity(query_embedding, skill_emb) | |
| # Lexical boost | |
| lexical = _lexical_boost(query_tokens, name, slug) | |
| final_score = vector_score + lexical | |
| entry: Dict[str, Any] = { | |
| "skill_id": c.get("skill_id", ""), | |
| "name": name, | |
| "description": c.get("description", ""), | |
| "source": c.get("source", ""), | |
| "score": round(final_score, 4), | |
| } | |
| if vector_score > 0: | |
| entry["vector_score"] = round(vector_score, 4) | |
| # Include optional fields | |
| for key in ("path", "visibility", "created_by", "origin", "tags", "quality", "safety_flags"): | |
| if c.get(key): | |
| entry[key] = c[key] | |
| scored.append(entry) | |
| scored.sort(key=lambda x: -x["score"]) | |
| return scored | |
| def _dedup_and_limit( | |
| scored: List[Dict[str, Any]], | |
| limit: int, | |
| ) -> List[Dict[str, Any]]: | |
| """Deduplicate by name and apply limit.""" | |
| seen: set[str] = set() | |
| deduped = [] | |
| for item in scored: | |
| name = item["name"] | |
| if name in seen: | |
| continue | |
| seen.add(name) | |
| deduped.append(item) | |
| return deduped[:limit] | |
| def build_local_candidates( | |
| skills: list, | |
| store: Any = None, | |
| ) -> List[Dict[str, Any]]: | |
| """Build search candidate dicts from SkillRegistry skills. | |
| Args: | |
| skills: List of ``SkillMeta`` from ``registry.list_skills()``. | |
| store: Optional ``SkillStore`` instance for quality data enrichment. | |
| Returns: | |
| List of candidate dicts ready for ``SkillSearchEngine.search()``. | |
| """ | |
| from openspace.cloud.embedding import build_skill_embedding_text | |
| candidates: List[Dict[str, Any]] = [] | |
| for s in skills: | |
| # Read SKILL.md body | |
| readme_body = "" | |
| try: | |
| raw = s.path.read_text(encoding="utf-8") | |
| m = re.match(r"^---\n.*?\n---\n?", raw, re.DOTALL) | |
| readme_body = raw[m.end():].strip() if m else raw | |
| except Exception: | |
| pass | |
| embedding_text = build_skill_embedding_text(s.name, s.description, readme_body) | |
| # Safety check | |
| flags = _check_safety(embedding_text) | |
| if not _is_safe(flags): | |
| logger.info(f"BLOCKED local skill {s.skill_id} — {flags}") | |
| continue | |
| candidates.append({ | |
| "skill_id": s.skill_id, | |
| "name": s.name, | |
| "description": s.description, | |
| "source": "openspace-local", | |
| "path": str(s.path), | |
| "is_local": True, | |
| "safety_flags": flags if flags else None, | |
| "_embedding_text": embedding_text, | |
| }) | |
| # Enrich with quality data | |
| if store and candidates: | |
| try: | |
| all_records = store.load_all(active_only=True) | |
| for c in candidates: | |
| rec = all_records.get(c["skill_id"]) | |
| if rec: | |
| c["quality"] = { | |
| "total_selections": rec.total_selections, | |
| "completion_rate": round(rec.completion_rate, 3), | |
| "effective_rate": round(rec.effective_rate, 3), | |
| } | |
| c["tags"] = rec.tags | |
| except Exception as e: | |
| logger.warning(f"Quality lookup failed: {e}") | |
| return candidates | |
| def build_cloud_candidates( | |
| items: List[Dict[str, Any]], | |
| ) -> List[Dict[str, Any]]: | |
| """Build search candidate dicts from cloud metadata items. | |
| Args: | |
| items: Items from ``OpenSpaceClient.fetch_metadata()``. | |
| Returns: | |
| List of candidate dicts (with safety filtering applied). | |
| """ | |
| candidates: List[Dict[str, Any]] = [] | |
| for item in items: | |
| name = item.get("name", "") | |
| desc = item.get("description", "") | |
| tags = item.get("tags", []) | |
| safety_text = f"{name}\n{desc}\n{' '.join(tags)}" | |
| flags = _check_safety(safety_text) | |
| if not _is_safe(flags): | |
| continue | |
| c_entry: Dict[str, Any] = { | |
| "skill_id": item.get("record_id", ""), | |
| "name": name, | |
| "description": desc, | |
| "source": "cloud", | |
| "visibility": item.get("visibility", "public"), | |
| "is_local": False, | |
| "created_by": item.get("created_by", ""), | |
| "origin": item.get("origin", ""), | |
| "tags": tags, | |
| "safety_flags": flags if flags else None, | |
| } | |
| # Carry pre-computed embedding | |
| platform_emb = item.get("embedding") | |
| if platform_emb and isinstance(platform_emb, list): | |
| c_entry["_embedding"] = platform_emb | |
| candidates.append(c_entry) | |
| return candidates | |
| async def hybrid_search_skills( | |
| query: str, | |
| local_skills: list = None, | |
| store: Any = None, | |
| source: str = "all", | |
| limit: int = 20, | |
| ) -> List[Dict[str, Any]]: | |
| """Shared cloud+local skill search with graceful fallback. | |
| Builds candidates, generates embeddings, runs ``SkillSearchEngine``. | |
| Cloud is attempted when *source* includes it; failures are silently | |
| skipped so the caller always gets local results at minimum. | |
| Args: | |
| query: Free-text search query. | |
| local_skills: ``SkillMeta`` list (from ``registry.list_skills()``). | |
| store: Optional ``SkillStore`` for quality enrichment. | |
| source: ``"all"`` | ``"local"`` | ``"cloud"``. | |
| limit: Maximum results. | |
| Returns: | |
| Ranked result dicts (same format as ``SkillSearchEngine.search()``). | |
| """ | |
| from openspace.cloud.embedding import generate_embedding | |
| q = query.strip() | |
| if not q: | |
| return [] | |
| candidates: List[Dict[str, Any]] = [] | |
| if source in ("all", "local") and local_skills: | |
| candidates.extend(build_local_candidates(local_skills, store)) | |
| if source in ("all", "cloud"): | |
| try: | |
| from openspace.cloud.auth import get_openspace_auth | |
| from openspace.cloud.client import OpenSpaceClient | |
| auth_headers, api_base = get_openspace_auth() | |
| if auth_headers: | |
| client = OpenSpaceClient(auth_headers, api_base) | |
| try: | |
| from openspace.cloud.embedding import resolve_embedding_api | |
| has_emb = bool(resolve_embedding_api()[0]) | |
| except Exception: | |
| has_emb = False | |
| items = await asyncio.to_thread( | |
| client.fetch_metadata, include_embedding=has_emb, limit=200, | |
| ) | |
| candidates.extend(build_cloud_candidates(items)) | |
| except Exception as e: | |
| logger.warning(f"hybrid_search_skills: cloud unavailable: {e}") | |
| if not candidates: | |
| return [] | |
| # query embedding (optional — key/URL resolved inside generate_embedding) | |
| query_embedding: Optional[List[float]] = None | |
| try: | |
| query_embedding = await asyncio.to_thread(generate_embedding, q) | |
| if query_embedding: | |
| for c in candidates: | |
| if not c.get("_embedding") and c.get("_embedding_text"): | |
| emb = await asyncio.to_thread( | |
| generate_embedding, c["_embedding_text"], | |
| ) | |
| if emb: | |
| c["_embedding"] = emb | |
| except Exception: | |
| pass | |
| engine = SkillSearchEngine() | |
| return engine.search(q, candidates, query_embedding=query_embedding, limit=limit) | |