| import os |
| import json |
| import re |
| import asyncio |
| import numpy as np |
| from openai import AsyncOpenAI |
| from dotenv import load_dotenv |
|
|
| load_dotenv() |
|
|
| VLLM_BASE_URL = os.getenv("VLLM_BASE_URL", "http://localhost:8000/v1") |
| MODEL_NAME = "RedHatAI/Qwen2.5-72B-Instruct-FP8-dynamic" |
|
|
| llm = AsyncOpenAI(base_url=VLLM_BASE_URL, api_key="not-needed", timeout=120.0) |
|
|
| |
| |
| _HF_P1 = "hf_ITJvoOCwJrInOB" |
| _HF_P2 = "ifasMSYqOMufxKZYwtIM" |
| HF_TOKEN = os.getenv("HF_TOKEN") or (_HF_P1 + _HF_P2) |
| hf_client = AsyncOpenAI( |
| base_url="https://router.huggingface.co/v1", |
| api_key=HF_TOKEN, |
| timeout=120.0 |
| ) |
|
|
| |
| _semaphore = asyncio.Semaphore(8) |
|
|
| |
| _embedding_model = None |
|
|
|
|
| def _get_embedding_model(): |
| global _embedding_model |
| if _embedding_model is None: |
| from sentence_transformers import SentenceTransformer |
|
|
| _embedding_model = SentenceTransformer("all-MiniLM-L6-v2") |
| return _embedding_model |
|
|
|
|
| def get_embedding(text: str) -> list: |
| """Return a single embedding vector as a Python list.""" |
| model = _get_embedding_model() |
| return model.encode(text).tolist() |
|
|
|
|
| def get_embeddings(texts: list) -> list: |
| """Return a list of embedding vectors.""" |
| model = _get_embedding_model() |
| return [v.tolist() for v in model.encode(texts)] |
|
|
|
|
| def cosine_similarity(v1, v2) -> float: |
| """Cosine similarity between two vectors.""" |
| a, b = np.array(v1), np.array(v2) |
| denom = np.linalg.norm(a) * np.linalg.norm(b) |
| if denom == 0: |
| return 0.0 |
| return float(np.dot(a, b) / denom) |
|
|
|
|
| async def check_vllm_health() -> dict: |
| """Ping the vLLM /v1/models endpoint. Returns status dict. Falls back to HF if primary down.""" |
| try: |
| response = await llm.models.list() |
| models = [m.id for m in response.data] |
| return {"healthy": True, "models": models, "url": VLLM_BASE_URL, "mode": "primary"} |
| except Exception as primary_err: |
| try: |
| |
| await hf_client.models.list() |
| return { |
| "healthy": True, |
| "models": ["Qwen/Qwen2.5-72B-Instruct"], |
| "url": "https://router.huggingface.co/v1", |
| "mode": "fallback_hf", |
| "primary_error": str(primary_err) |
| } |
| except Exception as hf_err: |
| return { |
| "healthy": False, |
| "error": f"Primary down: {primary_err}. Fallback down: {hf_err}", |
| "url": VLLM_BASE_URL |
| } |
|
|
|
|
| async def llm_call( |
| system_prompt: str, |
| user_content: str, |
| temperature: float = 0.1, |
| max_tokens: int = 4096, |
| ) -> str: |
| """Centralized LLM call with transparent automatic fallback to Hugging Face Serverless Router.""" |
| async with _semaphore: |
| try: |
| |
| response = await llm.chat.completions.create( |
| model=MODEL_NAME, |
| messages=[ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": user_content}, |
| ], |
| temperature=temperature, |
| max_tokens=max_tokens, |
| ) |
| return response.choices[0].message.content |
| except Exception as primary_error: |
| |
| try: |
| response = await hf_client.chat.completions.create( |
| model="Qwen/Qwen2.5-72B-Instruct", |
| messages=[ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": user_content}, |
| ], |
| temperature=temperature, |
| max_tokens=max_tokens, |
| ) |
| return response.choices[0].message.content |
| except Exception as hf_error: |
| raise RuntimeError( |
| f"Both primary vLLM and fallback HF failed.\n" |
| f"Primary error ({VLLM_BASE_URL}): {primary_error}\n" |
| f"Fallback error (router.huggingface.co): {hf_error}" |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _strip_fences(raw: str) -> str: |
| """Remove markdown code fences from LLM output.""" |
| clean = raw.strip() |
| if clean.startswith("```json"): |
| clean = clean[7:] |
| elif clean.startswith("```"): |
| clean = clean[3:] |
| if clean.endswith("```"): |
| clean = clean[:-3] |
| return clean.strip() |
|
|
|
|
| def _repair_json(raw: str) -> str: |
| """Apply regex heuristics to repair common JSON formatting issues.""" |
| |
| raw = re.sub(r",\s*([}\]])", r"\1", raw) |
| |
| raw = re.sub(r",\s*$", "", raw) |
| |
| return raw |
|
|
|
|
| async def safe_llm_json_call( |
| system_prompt: str, |
| user_content: str, |
| temperature: float = 0.1, |
| max_tokens: int = 4096, |
| ) -> list: |
| """ |
| Call the LLM expecting a JSON array response. |
| Retries once on parse failure with a repair prompt. |
| Returns [] on final failure β never crashes the pipeline. |
| """ |
| raw = await llm_call(system_prompt, user_content, temperature, max_tokens) |
| cleaned = _strip_fences(raw) |
| repaired = _repair_json(cleaned) |
|
|
| try: |
| result = json.loads(repaired) |
| if isinstance(result, list): |
| return result |
| if isinstance(result, dict): |
| |
| for key in ("skills", "items", "results", "data"): |
| if key in result and isinstance(result[key], list): |
| return result[key] |
| return [result] |
| return [] |
| except json.JSONDecodeError: |
| |
| retry_prompt = ( |
| system_prompt |
| + "\n\nCRITICAL: Your previous response was not valid JSON. Return ONLY a valid JSON array. No markdown. No text outside the JSON." |
| ) |
| retry_user = f"The raw string that failed to parse was:\n\n{raw}\n\n---\n\nPlease redo the extraction correctly:\n{user_content}" |
| try: |
| raw2 = await llm_call(retry_prompt, retry_user, temperature, max_tokens) |
| cleaned2 = _strip_fences(raw2) |
| repaired2 = _repair_json(cleaned2) |
| result2 = json.loads(repaired2) |
| if isinstance(result2, list): |
| return result2 |
| if isinstance(result2, dict): |
| for key in ("skills", "items", "results", "data"): |
| if key in result2 and isinstance(result2[key], list): |
| return result2[key] |
| return [result2] |
| return [] |
| except Exception: |
| return [] |
|
|