Spaces:
Sleeping
Sleeping
| # Two-tier LLM client — primary / fallback, both Ollama Cloud over OpenAI-compatible HTTP. | |
| import re | |
| from collections.abc import Iterator | |
| from functools import lru_cache | |
| from typing import Any | |
| from openai import OpenAI | |
| from backend.config.settings import settings | |
| def _build_client(base_url: str, api_key: str) -> OpenAI: | |
| return OpenAI(base_url=base_url, api_key=api_key) | |
| def get_client(tier: str | None = None) -> OpenAI: | |
| resolved = tier or settings.active_llm_tier | |
| if resolved == "fallback": | |
| return _build_client(settings.fallback_base_url, settings.fallback_api_key) | |
| return _build_client(settings.primary_base_url, settings.primary_api_key) | |
| def active_model(tier: str | None = None) -> str: | |
| resolved = tier or settings.active_llm_tier | |
| models = {"primary": settings.primary_model, "fallback": settings.fallback_model} | |
| if resolved not in models: | |
| raise ValueError(f"Unknown LLM tier: '{resolved}'. Must be primary/fallback.") | |
| return models[resolved] | |
| def _apply_no_think(messages: list[dict]) -> list[dict]: | |
| # Prepend /no_think to first user message (Ollama thinking suppression). | |
| result = list(messages) | |
| for i, msg in enumerate(result): | |
| if msg.get("role") == "user": | |
| result[i] = {**msg, "content": f"/no_think\n\n{msg['content']}"} | |
| break | |
| return result | |
| def _strip_think_tags(text: str) -> str: | |
| return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip() | |
| def chat_complete( | |
| messages: list[dict], | |
| max_tokens: int, | |
| tier: str | None = None, | |
| temperature: float = 0.7, | |
| **kwargs: Any, | |
| ) -> str: | |
| resolved_tier = tier or settings.active_llm_tier | |
| model = active_model(resolved_tier) | |
| client = get_client(resolved_tier) | |
| patched_messages = messages | |
| extra_body: dict[str, Any] = kwargs.pop("extra_body", {}) | |
| if settings.thinking_mode == "suppress": | |
| patched_messages = _apply_no_think(messages) | |
| effective_max_tokens = max_tokens | |
| if settings.thinking_mode in ("strip", "full"): | |
| effective_max_tokens = max_tokens + settings.thinking_token_budget | |
| resp = client.chat.completions.create( | |
| model=model, | |
| messages=patched_messages, | |
| max_tokens=effective_max_tokens, | |
| temperature=temperature, | |
| extra_body=extra_body or None, | |
| **kwargs, | |
| ) | |
| raw = (resp.choices[0].message.content if resp.choices else "") or "" | |
| print( | |
| f"[llm_client] tier={resolved_tier} model={model} raw_len={len(raw)} raw={raw[:200]!r}" | |
| ) | |
| if settings.thinking_mode in ("off", "strip"): | |
| raw = _strip_think_tags(raw) | |
| stripped = raw.strip() | |
| if not stripped: | |
| print( | |
| f"[llm_client] WARNING: empty response after strip. finish_reason={resp.choices[0].finish_reason if resp.choices else 'none'}" | |
| ) | |
| return stripped | |
| def chat_complete_stream( | |
| messages: list[dict], | |
| max_tokens: int, | |
| tier: str | None = None, | |
| temperature: float = 0.7, | |
| **kwargs: Any, | |
| ) -> Iterator[str]: | |
| """Yield token deltas as they arrive. Thinking-mode stripping is applied | |
| post-hoc on the buffered text by the caller — streaming <think>…</think> | |
| into the UI would confuse the picker anyway. | |
| """ | |
| resolved_tier = tier or settings.active_llm_tier | |
| model = active_model(resolved_tier) | |
| client = get_client(resolved_tier) | |
| patched_messages = messages | |
| extra_body: dict[str, Any] = kwargs.pop("extra_body", {}) | |
| if settings.thinking_mode == "suppress": | |
| patched_messages = _apply_no_think(messages) | |
| effective_max_tokens = max_tokens | |
| if settings.thinking_mode in ("strip", "full"): | |
| effective_max_tokens = max_tokens + settings.thinking_token_budget | |
| stream = client.chat.completions.create( | |
| model=model, | |
| messages=patched_messages, | |
| max_tokens=effective_max_tokens, | |
| temperature=temperature, | |
| stream=True, | |
| extra_body=extra_body or None, | |
| **kwargs, | |
| ) | |
| for chunk in stream: | |
| if not chunk.choices: | |
| continue | |
| delta = chunk.choices[0].delta | |
| piece = getattr(delta, "content", None) or "" | |
| if piece: | |
| yield piece | |
| def finalize_streamed(text: str) -> str: | |
| """Apply the same post-processing chat_complete does once a stream is done.""" | |
| if settings.thinking_mode in ("off", "strip"): | |
| text = _strip_think_tags(text) | |
| return text.strip() | |
| def warmup(tier: str | None = None) -> None: | |
| chat_complete( | |
| messages=[{"role": "user", "content": "hi"}], | |
| max_tokens=5, | |
| tier=tier, | |
| temperature=0.0, | |
| ) | |