# Two-tier LLM client — primary / fallback, both Ollama Cloud over OpenAI-compatible HTTP. import re from collections.abc import Iterator from functools import lru_cache from typing import Any from openai import OpenAI from backend.config.settings import settings @lru_cache(maxsize=2) def _build_client(base_url: str, api_key: str) -> OpenAI: return OpenAI(base_url=base_url, api_key=api_key) def get_client(tier: str | None = None) -> OpenAI: resolved = tier or settings.active_llm_tier if resolved == "fallback": return _build_client(settings.fallback_base_url, settings.fallback_api_key) return _build_client(settings.primary_base_url, settings.primary_api_key) def active_model(tier: str | None = None) -> str: resolved = tier or settings.active_llm_tier models = {"primary": settings.primary_model, "fallback": settings.fallback_model} if resolved not in models: raise ValueError(f"Unknown LLM tier: '{resolved}'. Must be primary/fallback.") return models[resolved] def _apply_no_think(messages: list[dict]) -> list[dict]: # Prepend /no_think to first user message (Ollama thinking suppression). result = list(messages) for i, msg in enumerate(result): if msg.get("role") == "user": result[i] = {**msg, "content": f"/no_think\n\n{msg['content']}"} break return result def _strip_think_tags(text: str) -> str: return re.sub(r".*?", "", text, flags=re.DOTALL).strip() def chat_complete( messages: list[dict], max_tokens: int, tier: str | None = None, temperature: float = 0.7, **kwargs: Any, ) -> str: resolved_tier = tier or settings.active_llm_tier model = active_model(resolved_tier) client = get_client(resolved_tier) patched_messages = messages extra_body: dict[str, Any] = kwargs.pop("extra_body", {}) if settings.thinking_mode == "suppress": patched_messages = _apply_no_think(messages) effective_max_tokens = max_tokens if settings.thinking_mode in ("strip", "full"): effective_max_tokens = max_tokens + settings.thinking_token_budget resp = client.chat.completions.create( model=model, messages=patched_messages, max_tokens=effective_max_tokens, temperature=temperature, extra_body=extra_body or None, **kwargs, ) raw = (resp.choices[0].message.content if resp.choices else "") or "" print( f"[llm_client] tier={resolved_tier} model={model} raw_len={len(raw)} raw={raw[:200]!r}" ) if settings.thinking_mode in ("off", "strip"): raw = _strip_think_tags(raw) stripped = raw.strip() if not stripped: print( f"[llm_client] WARNING: empty response after strip. finish_reason={resp.choices[0].finish_reason if resp.choices else 'none'}" ) return stripped def chat_complete_stream( messages: list[dict], max_tokens: int, tier: str | None = None, temperature: float = 0.7, **kwargs: Any, ) -> Iterator[str]: """Yield token deltas as they arrive. Thinking-mode stripping is applied post-hoc on the buffered text by the caller — streaming into the UI would confuse the picker anyway. """ resolved_tier = tier or settings.active_llm_tier model = active_model(resolved_tier) client = get_client(resolved_tier) patched_messages = messages extra_body: dict[str, Any] = kwargs.pop("extra_body", {}) if settings.thinking_mode == "suppress": patched_messages = _apply_no_think(messages) effective_max_tokens = max_tokens if settings.thinking_mode in ("strip", "full"): effective_max_tokens = max_tokens + settings.thinking_token_budget stream = client.chat.completions.create( model=model, messages=patched_messages, max_tokens=effective_max_tokens, temperature=temperature, stream=True, extra_body=extra_body or None, **kwargs, ) for chunk in stream: if not chunk.choices: continue delta = chunk.choices[0].delta piece = getattr(delta, "content", None) or "" if piece: yield piece def finalize_streamed(text: str) -> str: """Apply the same post-processing chat_complete does once a stream is done.""" if settings.thinking_mode in ("off", "strip"): text = _strip_think_tags(text) return text.strip() def warmup(tier: str | None = None) -> None: chat_complete( messages=[{"role": "user", "content": "hi"}], max_tokens=5, tier=tier, temperature=0.0, )