Spaces:
Running
Running
| """ | |
| generation.py β LLM answer generation for code questions. | |
| Two providers, one interface: | |
| - Groq (primary): Fast inference via their API. Free tier is generous. | |
| Models: llama-3.3-70b-versatile, mixtral-8x7b-32768 | |
| - Anthropic (fallback): claude-haiku-4-5 β used if GROQ_API_KEY is unset. | |
| Why two providers? | |
| Groq is fast and free but can hit rate limits. Having Anthropic as a fallback | |
| means the app keeps working. In production you'd add retry logic, but for a | |
| learning project provider-switching is simpler and teaches the same concept. | |
| Conditional LLM parameters: | |
| Code questions are not all alike. "How does backpropagation work?" needs a | |
| precise, technical answer. "Can you explain this function like I'm ten?" needs | |
| an approachable, creative answer. We detect the query type and switch: | |
| Technical: temperature=0.1 (deterministic, accurate) | |
| Creative: temperature=0.7 (expressive, natural) | |
| This is the same principle as a teacher who writes on a whiteboard vs draws | |
| an analogy β same knowledge, different delivery. | |
| System prompt structure: | |
| The system prompt tells the LLM three things: | |
| 1. What it is (a code assistant with access to retrieved source) | |
| 2. How to format answers (code blocks, citations by source number) | |
| 3. What NOT to do (hallucinate, answer outside the context) | |
| """ | |
| from pathlib import Path | |
| import sys | |
| import threading | |
| from typing import Iterator | |
| sys.path.insert(0, str(Path(__file__).parent.parent.parent)) | |
| from backend.config import settings | |
| # Thread-local storage for the "quality-only" flag. | |
| # When generate_quality() or generate_synthesis() is active on a thread, | |
| # this flag tells _try_fallback() to skip Cerebras 8B for THAT thread only. | |
| # Using threading.local() instead of a shared counter eliminates a race | |
| # condition where concurrent Phase 2 workers could decrement each other's | |
| # counter to zero mid-cascade, accidentally unblocking Cerebras. | |
| _quality_local = threading.local() | |
| # Qwen3-Coder via OpenRouter free tier β strong coding model from Alibaba (Apache 2.0 licence). | |
| # The `:free` suffix means OpenRouter serves it at no cost (rate limited but generous). | |
| _OPENROUTER_MODEL = "qwen/qwen3-coder:free" | |
| def _openrouter_client(api_key: str): | |
| from openai import OpenAI | |
| return OpenAI( | |
| api_key=api_key, | |
| base_url="https://openrouter.ai/api/v1", | |
| timeout=45, max_retries=0, | |
| default_headers={ | |
| "HTTP-Referer": "http://localhost:3000", | |
| "X-Title": "Cartographer", | |
| }, | |
| ) | |
| # ββ Thought-token stripping ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Gemma 4 (and some other reasoning models) wrap their internal chain-of-thought | |
| # in <thought>...</thought> tags before the final answer. These tags are useful | |
| # for the model's reasoning but should NOT be shown to the user β they're noisy | |
| # and often incomplete mid-stream. | |
| # | |
| # We strip them with a simple state machine over the token stream: | |
| # - "outside": normal text, yield tokens as-is | |
| # - "inside": inside a <thought> block, buffer but don't yield | |
| # | |
| # Edge case: the opening/closing tag may be split across multiple tokens | |
| # (e.g. "<" then "thought>"). We keep a small look-ahead buffer to handle this. | |
| def _strip_thought_tokens(tokens: Iterator[str]) -> Iterator[str]: | |
| """Strip <thought>...</thought> blocks from a token stream. | |
| Gemma 4 and some other reasoning models wrap their chain-of-thought in | |
| <thought>...</thought> before (or instead of) the final answer. We strip | |
| the tags but keep the answer. If the entire response is inside a thought | |
| block with nothing after it (Gemma 4 sometimes does this when the answer | |
| IS the reasoning), we fall back to yielding the thought content so the | |
| user always gets a response. | |
| State machine: | |
| outside β yield tokens, detect <thought> opening tag | |
| inside β buffer and discard, detect </thought> closing tag | |
| fallback β at EOF, if no answer tokens were ever yielded, yield thought buf | |
| """ | |
| OPEN = "<thought>" | |
| CLOSE = "</thought>" | |
| buf = "" # look-ahead window for partial tag detection | |
| inside = False | |
| thought_buf = "" # accumulates full thought content (for fallback) | |
| answer_yielded = False | |
| for tok in tokens: | |
| buf += tok | |
| while True: | |
| if inside: | |
| end = buf.find(CLOSE) | |
| if end != -1: | |
| # Found closing tag β everything before it is thought content | |
| thought_buf += buf[:end] | |
| buf = buf[end + len(CLOSE):] | |
| inside = False | |
| else: | |
| # Discard confirmed-thought text, keep tail for partial-tag detection | |
| keep = len(CLOSE) - 1 | |
| if len(buf) > keep: | |
| thought_buf += buf[:-keep] | |
| buf = buf[-keep:] | |
| break | |
| else: | |
| start = buf.find(OPEN) | |
| if start != -1: | |
| # Yield everything before the opening tag as answer text | |
| if start > 0: | |
| pre = buf[:start] | |
| if pre.strip(): | |
| answer_yielded = True | |
| yield pre | |
| buf = buf[start + len(OPEN):] | |
| inside = True | |
| else: | |
| # No opening tag β safe to yield all but the partial-tag tail | |
| keep = len(OPEN) - 1 | |
| if len(buf) > keep: | |
| chunk = buf[:-keep] | |
| if chunk.strip(): | |
| answer_yielded = True | |
| yield chunk | |
| buf = buf[-keep:] | |
| break | |
| # ββ End of stream ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if inside: | |
| # Stream ended mid-thought (no closing tag found). | |
| # Yield the accumulated thought content β it IS the answer. | |
| yield thought_buf + buf | |
| return # skip the fallback check β we just yielded | |
| if buf: | |
| if buf.strip(): | |
| answer_yielded = True | |
| yield buf | |
| # Fallback: model wrapped everything in <thought>...</thought> with nothing | |
| # substantive after (either empty or only whitespace). Rather than leaving | |
| # the answer blank, yield the thought content so the user always gets output. | |
| if not answer_yielded and thought_buf: | |
| yield thought_buf | |
| # ββ LLM parameters per query type βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Technical queries need precision; creative queries need warmth. | |
| # max_tokens is higher for creative to allow longer explanations/analogies. | |
| _PARAMS = { | |
| "technical": {"temperature": 0.1, "max_tokens": 4096}, | |
| "creative": {"temperature": 0.7, "max_tokens": 4096}, | |
| } | |
| # ββ Query classifier signals βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Weighted keyword lists. Higher weight = stronger signal for that type. | |
| # Technical wins ties β better to be precise than wrong by being too creative. | |
| _CREATIVE_SIGNALS: dict[str, int] = { | |
| "explain": 1, | |
| "intuition": 2, | |
| "intuitively": 3, | |
| "analogy": 3, | |
| "like i": 2, | |
| "simply": 2, | |
| "eli5": 3, | |
| "in plain": 2, | |
| "overview": 1, | |
| "what is": 1, | |
| } | |
| _TECHNICAL_SIGNALS: dict[str, int] = { | |
| "implement": 2, | |
| "debug": 2, | |
| "trace": 2, | |
| "step by step": 2, | |
| "formula": 3, | |
| "algorithm": 2, | |
| "complexity": 2, | |
| "optimize": 2, | |
| "refactor": 2, | |
| "how does": 1, | |
| "what does": 1, | |
| "show me": 1, | |
| } | |
| # ββ System prompts βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _SYSTEM_BASE = """You are an expert code assistant answering questions about a specific codebase. You have access to retrieved source code snippets and answer based strictly on what those snippets contain. | |
| NEVER hallucinate function names, file paths, or behaviours not present in the provided sources. | |
| NEVER say a feature "likely" or "probably" works a certain way β either the source shows it or say it is not in the retrieved context. | |
| NEVER write generic programming advice that ignores the actual code provided. | |
| If the retrieved context does not contain enough information to answer, say so clearly rather than filling gaps with assumptions. | |
| Cite every claim: "According to Source 2 (src/model.py, lines 45β72)..." | |
| Format all code in fenced code blocks with the appropriate language tag.""" | |
| _SYSTEM_TECHNICAL = _SYSTEM_BASE + """ | |
| Be precise and technical. Show exact function signatures, types, and return values when relevant. | |
| Prefer short, accurate answers. NEVER write multi-paragraph explanations for single-sentence questions.""" | |
| _SYSTEM_CREATIVE = _SYSTEM_BASE + """ | |
| Explain clearly and accessibly. Use analogies where they genuinely help understanding. | |
| Build from simple to complex. A diagram described in text is fine. | |
| NEVER sacrifice accuracy for approachability β analogies must match what the code actually does.""" | |
| def classify_query(question: str) -> str: | |
| """ | |
| Classify a question as 'technical' or 'creative' using weighted signals. | |
| Returns 'technical' if technical signals outweigh creative ones, or on tie. | |
| This biases toward precision β better to be technically correct. | |
| Example: | |
| "explain intuitively how attention works" β creative (intuitively=3 > nothing) | |
| "explain what BLEU score was used" β technical (BLEU=technical word; 'explain' alone | |
| scores 1 creative vs 0 technical, but if no technical signals present: creative) | |
| Wait β "explain what BLEU score was used" has 'what' in it which hits "what is" β creative 1. | |
| "explain" β creative 1. "BLEU" is not in signals. Technical=0. Creative=2 β creative. | |
| That's actually correct! It IS a creative "explain what" question. | |
| For "step by step trace of backprop" β technical=4, creative=0 β technical. | |
| """ | |
| q = question.lower() | |
| creative_score = sum(w for s, w in _CREATIVE_SIGNALS.items() if s in q) | |
| technical_score = sum(w for s, w in _TECHNICAL_SIGNALS.items() if s in q) | |
| # Technical wins ties | |
| return "creative" if creative_score > technical_score else "technical" | |
| class GenerationService: | |
| """ | |
| Wraps multiple LLM providers. Chooses the best available free provider on init, | |
| then falls back through the chain if rate limits or quota errors are hit. | |
| Priority: Gemini (gemma-4-31b-it) β Cerebras (llama-3.3-70b) β Anthropic (haiku) | |
| β OpenRouter (qwen3-coder) β Groq (llama-3.3-70b-versatile) | |
| Free providers (Gemini, Cerebras, OpenRouter, Groq) come first. | |
| Anthropic is kept as a paid fallback β only used if all free providers fail. | |
| One shared instance handles all tasks: Q&A streaming, diagram generation, | |
| README generation, and contextual enrichment. Gemini gemma-4-31b-it is the | |
| strongest free model available and handles all task types well. | |
| Usage: | |
| gen = GenerationService() | |
| answer = gen.answer(question, context, query_type) | |
| for token in gen.stream(question, context, query_type): | |
| print(token, end="", flush=True) | |
| """ | |
| def __init__(self): | |
| self.provider = self._init_provider() | |
| self._init_premium() | |
| # Process-global premium override. False at runtime; the prebake CLI | |
| # flips this on so every downstream `gen.generate()` call routes to | |
| # the premium client without needing a `premium=True` kwarg threaded | |
| # through every service layer. | |
| self.premium_mode = False | |
| # ββ Premium-mode cap overrides βββββββββββββββββββββββββββββββββββββββββ | |
| # The artifact-generation pipeline has many small caps β max_tokens for | |
| # ReAct rounds, character limits on chunks shown to the model, README | |
| # length budgets, etc. They were tuned for free-tier providers (Cerebras | |
| # 8B, Gemini Flash) which have shorter context windows and verbose | |
| # reasoning patterns. Applied to a Sonnet 4.6 prebake, those defaults | |
| # leave quality on the table β long classes get truncated, agent | |
| # reasoning is cut mid-thought, READMEs run short. | |
| # | |
| # When premium_mode is True, cap() returns the entry from PREMIUM_CAPS | |
| # below if one exists; otherwise the caller's default. Per-callsite | |
| # invocation looks like: | |
| # max_tokens = self._gen.cap("react_round_tokens", 700) | |
| # Free-tier runtime paths (premium_mode=False) get the default unchanged. | |
| PREMIUM_CAPS = { | |
| # ββ Tour generation (TourAgent) ββ | |
| "react_round_tokens": 1500, # was 700 β let Sonnet reason fully per round | |
| "react_done_tokens": 1200, # was 600 β richer forced-DONE summaries | |
| "phase_map_tokens": 2048, # was 1024 β more concept candidates surfaced | |
| "concept_desc_tokens": 1800, # was 900 β deeper per-concept descriptions | |
| "tour_synthesis_tokens": 16384, # was 8192 β full Sonnet output budget | |
| "tool_result_tokens": 800, # was 400 β give the agent more of each tool result | |
| "phase3_code_chars": 6000, # was 3000 β include more code per concept | |
| # ββ Diagrams ββ | |
| "diagram_tokens": 4096, # was 2048 β JSON output room for richer node lists | |
| # ββ README ββ | |
| "readme_tokens": 4096, # was 1800 β comprehensive README budget | |
| # ββ Contextual retrieval (ingestion) ββ | |
| "context_chunk_tokens": 400, # was 200 β longer contextual descriptions | |
| "context_chunk_chars": 2000, # was 800 β model sees more of each chunk | |
| "context_doc_chars": 12000, # was 6000 β model sees more surrounding file | |
| } | |
| def cap(self, name: str, default: int) -> int: | |
| """Resolve a tunable cap. In premium_mode, returns the override from | |
| PREMIUM_CAPS if present; otherwise returns the caller's default. | |
| Lets every cap site stay readable as a single line: | |
| max_tokens=self._gen.cap("react_round_tokens", 700) | |
| """ | |
| if self.premium_mode and name in self.PREMIUM_CAPS: | |
| return self.PREMIUM_CAPS[name] | |
| return default | |
| def _init_premium(self) -> None: | |
| """Initialise the optional premium client (Claude Sonnet 4.6) used | |
| for one-time generation of cached artifacts (tour, README, diagrams, | |
| contextual descriptions). Default runtime traffic still goes through | |
| the cascade β premium is opt-in per call via `premium=True`. | |
| If ANTHROPIC_API_KEY is not set, premium is disabled and `premium=True` | |
| callers fall through to the cascade with a one-time warning.""" | |
| self._premium_client = None | |
| self._premium_model = None | |
| if not settings.anthropic_api_key: | |
| return | |
| try: | |
| import anthropic | |
| self._premium_client = anthropic.Anthropic( | |
| api_key=settings.anthropic_api_key, | |
| max_retries=0, | |
| timeout=120, # premium artifact generation can be long; allow it | |
| ) | |
| self._premium_model = getattr(settings, "anthropic_premium_model", None) or "claude-sonnet-4-6" | |
| print(f"Generation: premium tier ready ({self._premium_model})") | |
| except Exception as e: | |
| print(f"Generation: premium tier disabled ({e})") | |
| self._premium_client = None | |
| self._premium_model = None | |
| def _init_provider(self) -> str: | |
| """Pick the best available provider. | |
| Priority: Gemini β Cerebras β Anthropic haiku β OpenRouter β Groq | |
| """ | |
| _TIMEOUT = 30 # seconds β prevents any provider from hanging indefinitely | |
| if settings.gemini_api_key: | |
| from openai import OpenAI | |
| self._client = OpenAI( | |
| api_key=settings.gemini_api_key, | |
| base_url="https://generativelanguage.googleapis.com/v1beta/openai/", | |
| timeout=_TIMEOUT, max_retries=0, | |
| ) | |
| self._model = "gemini-2.5-flash" | |
| # gemini-2.0-flash-lite: ~4x faster and lighter than 2.5-flash. | |
| # Used for high-volume enrichment calls (one per chunk at ingest time) | |
| # so the stronger model's quota is preserved for synthesis tasks. | |
| self._fast_model = "gemini-2.0-flash-lite" | |
| print("Generation: using Gemini 2.5 Flash via Google Gemini API") | |
| return "gemini" | |
| elif settings.sambanova_api_key: | |
| from openai import OpenAI | |
| self._client = OpenAI( | |
| api_key=settings.sambanova_api_key, | |
| base_url="https://api.sambanova.ai/v1", | |
| timeout=_TIMEOUT, max_retries=0, | |
| ) | |
| # DeepSeek-V3.1 β best open-source model on SambaNova free tier (200K tok/day). | |
| # Meta-Llama-3.1-405B-Instruct was deprecated Apr 2026; DeepSeek-V3.1 is the | |
| # replacement and outperforms it on coding + reasoning benchmarks. | |
| self._model = "DeepSeek-V3.1" | |
| self._fast_model = self._model # no lighter SambaNova tier available | |
| print("Generation: using SambaNova (DeepSeek-V3.1) β free tier") | |
| return "sambanova" | |
| elif settings.cerebras_api_key: | |
| from openai import OpenAI | |
| self._client = OpenAI( | |
| api_key=settings.cerebras_api_key, | |
| base_url="https://api.cerebras.ai/v1", | |
| timeout=_TIMEOUT, max_retries=0, | |
| ) | |
| # llama3.1-8b: only confirmed-available model on Cerebras free tier (Apr 2026). | |
| # llama3.3-70b and gpt-oss-120b both return 404 on the free tier. | |
| self._model = "llama3.1-8b" | |
| self._fast_model = "llama3.1-8b" | |
| print("Generation: using Cerebras (llama3.1-8b) β free tier") | |
| return "cerebras" | |
| elif settings.anthropic_api_key: | |
| import anthropic | |
| self._client = anthropic.Anthropic(api_key=settings.anthropic_api_key, max_retries=0) | |
| self._model = "claude-haiku-4-5-20251001" | |
| self._fast_model = self._model # haiku is already the fast Anthropic tier | |
| print(f"Generation: using Anthropic ({self._model})") | |
| return "anthropic" | |
| elif settings.openrouter_api_key: | |
| self._client = _openrouter_client(settings.openrouter_api_key) | |
| self._model = _OPENROUTER_MODEL | |
| self._fast_model = self._model | |
| print(f"Generation: using OpenRouter ({self._model})") | |
| return "openrouter" | |
| elif settings.mistral_api_key: | |
| from openai import OpenAI | |
| self._client = OpenAI( | |
| api_key=settings.mistral_api_key, | |
| base_url="https://api.mistral.ai/v1", | |
| timeout=_TIMEOUT, max_retries=0, | |
| ) | |
| # mistral-small-latest: free tier, 1B tok/month, strong at structured output. | |
| self._model = "mistral-small-latest" | |
| self._fast_model = self._model | |
| print("Generation: using Mistral (mistral-small-latest) β free tier") | |
| return "mistral" | |
| elif settings.groq_api_key: | |
| from groq import Groq | |
| self._client = Groq(api_key=settings.groq_api_key) | |
| self._model = "llama-3.3-70b-versatile" | |
| self._fast_model = self._model | |
| print(f"Generation: using Groq ({self._model})") | |
| return "groq" | |
| else: | |
| raise ValueError( | |
| "No LLM API key found. Set GEMINI_API_KEY, CEREBRAS_API_KEY, SAMBANOVA_API_KEY, " | |
| "MISTRAL_API_KEY, OPENROUTER_API_KEY, GROQ_API_KEY, or ANTHROPIC_API_KEY in .env" | |
| ) | |
| def _try_fallback(self) -> bool: | |
| """ | |
| Switch to the next available provider when the current one is exhausted. | |
| Called at runtime when a credits/quota error is caught. | |
| Returns True if a fallback was found and initialized, False if we're already | |
| on the last provider. | |
| Records a 60-second exhaustion window for the failed provider so that | |
| _reset_to_primary() doesn't immediately reset parallel calls back to it. | |
| Without this, 12 parallel ReAct calls all 429 on Gemini simultaneously | |
| and cascade all the way to Groq, exhausting its daily token budget. | |
| Each candidate provider is checked against _exhausted_until before being | |
| selected. This allows generate_synthesis() to block specific providers | |
| (e.g. Cerebras 8B) for a single high-stakes call by writing a far-future | |
| timestamp into the dict β without needing a separate code path. | |
| """ | |
| import time | |
| # Mark the current provider as exhausted for 60 seconds | |
| if not hasattr(self, '_exhausted_until'): | |
| self._exhausted_until = {} | |
| self._exhausted_until[self.provider] = time.monotonic() + 60 | |
| failed = self.provider | |
| print(f"Generation: {failed} rate-limited β trying next provider") | |
| # Cascade order β Google tier first (same key), then SambaNova, then others. | |
| # Gemma 4 is placed before SambaNova because it shares the Gemini API key: | |
| # when Gemini 2.5 Flash exhausts, Gemma 4 31B is a free same-key extension. | |
| # Only when both Google models are exhausted do we spend SambaNova quota. | |
| # _all must match this order exactly β the _all[:_all.index(X)] pattern | |
| # means "allow any provider that comes before X in _all" to fall through to X. | |
| _all = ("gemini", "gemma4", "sambanova", "cerebras", "anthropic", "openrouter", "mistral", "groq") | |
| # Helper: is a provider currently blocked by an exhaustion window? | |
| # This is what allows generate_synthesis() to block Cerebras (or any provider) | |
| # for a single call by writing to _exhausted_until β without this check, | |
| # _try_fallback() blindly switches to it regardless. | |
| now = time.monotonic() | |
| def _is_blocked(name: str) -> bool: | |
| return self._exhausted_until.get(name, 0) > now | |
| # Gemma 4 31B β same GEMINI_API_KEY, separate rate-limit bucket from Gemini 2.5 Flash. | |
| # Skipped when _skip_thinking=True: Gemma 4 is a reasoning model that prepends a THINK | |
| # block to every response β this eats the entire token budget on compact-JSON tasks. | |
| if (self.provider in _all[:_all.index("gemma4")] | |
| and settings.gemini_api_key | |
| and not _is_blocked("gemma4") | |
| and not getattr(self, '_skip_thinking', False)): | |
| from openai import OpenAI | |
| self._client = OpenAI( | |
| api_key=settings.gemini_api_key, | |
| base_url="https://generativelanguage.googleapis.com/v1beta/openai/", | |
| timeout=90, max_retries=0, # Gemma 4 31B is slow; no SDK retries | |
| ) | |
| self._model = "gemma-4-31b-it" | |
| self._fast_model = "gemma-4-31b-it" | |
| self.provider = "gemma4" | |
| print("Generation: switched to Gemma 4 31B (same Gemini key)") | |
| return True | |
| if getattr(self, '_skip_thinking', False) and self.provider in _all[:_all.index("gemma4")] and settings.gemini_api_key: | |
| print("Generation: skipping Gemma 4 (thinking model) β falling through to SambaNova") | |
| # SambaNova DeepSeek-V3.1 β excellent at code, best quality after Google tier (200K tok/day) | |
| if self.provider in _all[:_all.index("sambanova")] and settings.sambanova_api_key and not _is_blocked("sambanova"): | |
| from openai import OpenAI | |
| self._client = OpenAI(api_key=settings.sambanova_api_key, base_url="https://api.sambanova.ai/v1", timeout=30, max_retries=0) | |
| self._model = "DeepSeek-V3.1" | |
| self.provider = "sambanova" | |
| print("Generation: switched to SambaNova (DeepSeek-V3.1)") | |
| return True | |
| # Cerebras llama3.1-8b is blocked during generate_quality() and generate_synthesis() calls. | |
| # _quality_local.quality_only is a per-thread boolean set by those methods. | |
| # Using threading.local() instead of a shared counter eliminates a race condition | |
| # where Worker B finishing its quality call would decrement a shared counter to 0 | |
| # while Worker A is still mid-cascade, accidentally unblocking Cerebras for A. | |
| _in_synthesis = getattr(_quality_local, 'quality_only', False) | |
| if self.provider in _all[:_all.index("cerebras")] and settings.cerebras_api_key and not _is_blocked("cerebras") and not _in_synthesis: | |
| from openai import OpenAI | |
| self._client = OpenAI(api_key=settings.cerebras_api_key, base_url="https://api.cerebras.ai/v1", timeout=30, max_retries=0) | |
| # llama3.1-8b: only confirmed-available model on Cerebras free tier as of Apr 2026 | |
| # (llama3.3-70b and gpt-oss-120b both return 404 on free tier) | |
| self._model = "llama3.1-8b" | |
| self.provider = "cerebras" | |
| print("Generation: switched to Cerebras (llama3.1-8b)") | |
| return True | |
| if (_is_blocked("cerebras") or _in_synthesis) and self.provider in _all[:_all.index("cerebras")]: | |
| print("Generation: skipping Cerebras (blocked for synthesis) β falling through") | |
| if self.provider in _all[:_all.index("anthropic")] and settings.anthropic_api_key and not _is_blocked("anthropic"): | |
| import anthropic | |
| self._client = anthropic.Anthropic(api_key=settings.anthropic_api_key, max_retries=0) | |
| self._model = "claude-haiku-4-5-20251001" | |
| self.provider = "anthropic" | |
| print("Generation: switched to Anthropic (claude-haiku-4-5)") | |
| return True | |
| if self.provider in _all[:_all.index("openrouter")] and settings.openrouter_api_key and not _is_blocked("openrouter"): | |
| self._client = _openrouter_client(settings.openrouter_api_key) | |
| self._model = _OPENROUTER_MODEL | |
| self.provider = "openrouter" | |
| print(f"Generation: switched to OpenRouter ({_OPENROUTER_MODEL})") | |
| return True | |
| if self.provider in _all[:_all.index("mistral")] and settings.mistral_api_key and not _is_blocked("mistral"): | |
| from openai import OpenAI | |
| self._client = OpenAI(api_key=settings.mistral_api_key, base_url="https://api.mistral.ai/v1", timeout=30, max_retries=0) | |
| self._model = "mistral-small-latest" | |
| self.provider = "mistral" | |
| print("Generation: switched to Mistral (mistral-small-latest)") | |
| return True | |
| if self.provider in _all[:_all.index("groq")] and settings.groq_api_key and not _is_blocked("groq"): | |
| from groq import Groq | |
| self._client = Groq(api_key=settings.groq_api_key) | |
| self._model = "llama-3.3-70b-versatile" | |
| self.provider = "groq" | |
| print("Generation: switched to Groq (llama-3.3-70b-versatile) β last resort") | |
| return True | |
| print(f"Generation: all providers exhausted β no fallback available") | |
| return False | |
| # Providers whose architecture emits a THINK/reasoning block before the answer. | |
| # These are incompatible with compact-JSON tasks (forced DONE, short synthesis) | |
| # because the thinking block consumes the entire token budget before the JSON starts. | |
| _THINKING_PROVIDERS = frozenset({"gemma4"}) | |
| def generate_quality(self, system: str, prompt: str, **kwargs) -> str: | |
| """ | |
| Quality-gated generation: blocks Cerebras llama3.1-8b and waits for strong | |
| providers to recover rather than falling through to a weak model. | |
| WHY THIS EXISTS | |
| βββββββββββββββ | |
| Phase 2 investigations ask the model to summarise tool call results (read_file, | |
| grep, search_symbol) into a coherent concept description. An 8B model (Cerebras) | |
| does not faithfully report what the tools returned β it hallucinates file paths, | |
| function names, and behaviours not present in the code. The tour then contains | |
| fabricated content that looks plausible but is completely wrong. | |
| The right response when only a weak model is available is to wait for a strong | |
| model to recover, not to produce fast but hallucinated output. A slow correct | |
| tour is better than a fast wrong one. | |
| BEHAVIOUR | |
| βββββββββ | |
| 1. Waits for Gemini's rate-limit window to expire if it is currently blocked | |
| (same logic as generate_synthesis). | |
| 2. Clears Gemini/SambaNova exhaustion windows so they are retried fresh. | |
| 3. Sets _quality_local.quality_only=True (per-thread) so _try_fallback() skips Cerebras 8B. | |
| 4. Skips thinking models (Gemma 4) since the THINK block wastes budget. | |
| 5. If all quality providers are exhausted (Gemini, SambaNova, Anthropic, | |
| OpenRouter, Mistral, Groq), raises RuntimeError rather than using Cerebras. | |
| The caller should treat this as a skipped investigation, not a crash. | |
| """ | |
| import time | |
| if not hasattr(self, '_exhausted_until'): | |
| self._exhausted_until = {} | |
| # Wait for Gemini to recover if its window is still active. | |
| gemini_wait = self._exhausted_until.get('gemini', 0) - time.monotonic() | |
| if gemini_wait > 0: | |
| wait = min(gemini_wait + 2, 65) | |
| print(f"Generation: quality call waiting {wait:.0f}s for Gemini to recoverβ¦") | |
| time.sleep(wait) | |
| # Clear windows so the cascade retries strong providers. | |
| self._exhausted_until.pop('gemini', None) | |
| self._exhausted_until.pop('gemma4', None) | |
| self._exhausted_until.pop('sambanova', None) | |
| # Block Cerebras for this thread via the thread-local flag. | |
| # Using _quality_local (threading.local) instead of a shared counter ensures | |
| # that each concurrent Phase 2 worker blocks Cerebras independently. | |
| # A shared counter races: Worker B finishing its call decrements to 0 while | |
| # Worker A is mid-cascade, accidentally unblocking Cerebras for Worker A. | |
| old_quality = getattr(_quality_local, 'quality_only', False) | |
| _quality_local.quality_only = True | |
| self._skip_thinking = True | |
| try: | |
| return self.generate(system, prompt, **kwargs) | |
| finally: | |
| self._skip_thinking = False | |
| _quality_local.quality_only = old_quality | |
| def generate_non_thinking(self, system: str, prompt: str, **kwargs) -> str: | |
| """ | |
| Like generate() but skips thinking models (e.g. Gemma 4) in the entire fallback chain. | |
| Gemma 4 unconditionally prepends a THINK block to every response. This is architectural | |
| β prompt instructions cannot suppress it. On compact-JSON tasks (forced DONE, synthesis) | |
| the THINK block consumes the entire token budget before the JSON even starts. | |
| This sets _skip_thinking=True for the duration of the call. _try_fallback() checks | |
| this flag and bypasses Gemma 4, falling directly to SambaNova (or the next available | |
| non-thinking provider). The flag is cleared in a finally block so it never leaks | |
| into subsequent calls. | |
| """ | |
| self._skip_thinking = True | |
| try: | |
| return self.generate(system, prompt, **kwargs) | |
| finally: | |
| self._skip_thinking = False | |
| def generate_synthesis(self, system: str, prompt: str, **kwargs) -> str: | |
| """ | |
| Like generate_non_thinking() but engineered for Phase 3 synthesis quality. | |
| WHY THIS IS NEEDED | |
| ββββββββββββββββββ | |
| Phase 1 MAP runs up to 16 ReAct rounds. Phase 2 INVESTIGATE runs up to 11 | |
| parallel investigations with 3 workers, each up to 4 rounds. Together they | |
| exhaust Gemini's 15 RPM free-tier quota, triggering a 60-second exhaustion | |
| window in _try_fallback(). If Phase 3 synthesis runs while that window is | |
| still active, the call cascades all the way to Cerebras llama3.1-8b β an | |
| 8B model that cannot reliably produce 3000-token structured JSON. | |
| THREE INTERVENTIONS | |
| βββββββββββββββββββ | |
| 1. Wait for Gemini/SambaNova to recover: simply clearing the exhaustion window | |
| doesn't help β Gemini still 429s on the next call if it hasn't recovered. | |
| We sleep until the window expires (max 65s) so the strongest model is | |
| genuinely available when synthesis starts. | |
| 2. Block Cerebras for this call only: llama3.1-8b cannot reliably write 3000- | |
| token JSON. We mark it exhausted for 3600s and restore it in finally so | |
| Phase 1/2 tool calls can still use it. _try_fallback() now checks | |
| _exhausted_until for ALL providers (not just Gemini) so the block takes | |
| effect in the cascade. | |
| 3. Skip thinking models: Gemma 4's THINK block eats the token budget before | |
| the JSON even starts. _skip_thinking=True causes _try_fallback() to jump | |
| over Gemma 4 to SambaNova. | |
| Acceptable synthesis providers in priority order: | |
| Gemini 2.5 Flash β SambaNova DeepSeek-V3.1 β Anthropic Haiku β | |
| OpenRouter Qwen3-Coder β Mistral Small β Groq llama-3.3-70b | |
| (Cerebras llama3.1-8b: blocked; Gemma 4: skipped β thinking model) | |
| """ | |
| import time | |
| if not hasattr(self, '_exhausted_until'): | |
| self._exhausted_until = {} | |
| # 1. Wait for Gemini to recover if it's in a short exhaustion window. | |
| # Phase 1/2 can exhaust Gemini's 15 RPM free tier across parallel calls. | |
| # Synthesis is ONE sequential call β waiting up to 65s here is acceptable | |
| # and ensures we get the strongest model rather than silently falling to 8B. | |
| # We don't simply clear the window: clearing it and immediately retrying | |
| # still 429s if Gemini hasn't actually recovered yet, which sends the call | |
| # down the entire cascade again. | |
| gemini_blocked_until = self._exhausted_until.get('gemini', 0) | |
| wait_secs = gemini_blocked_until - time.monotonic() | |
| if wait_secs > 0: | |
| wait_secs = min(wait_secs + 2, 65) # 2s buffer; cap at 65s | |
| print(f"Generation: synthesis waiting {wait_secs:.0f}s for Gemini to recoverβ¦") | |
| time.sleep(wait_secs) | |
| # Similarly wait for SambaNova if it's our only strong fallback after Gemini. | |
| # SambaNova DeepSeek-V3.1 is the best option after Gemini; waiting for it is | |
| # preferable to falling to an 8B model. | |
| # After the Gemini wait (if any), check SambaNova β only wait if Gemini is | |
| # now clear (its timestamp is in the past) so we don't double-wait. | |
| gemini_still_blocked = self._exhausted_until.get('gemini', 0) > time.monotonic() | |
| sambanova_blocked_until = self._exhausted_until.get('sambanova', 0) | |
| wait_secs = sambanova_blocked_until - time.monotonic() | |
| if wait_secs > 0 and not gemini_still_blocked: | |
| wait_secs = min(wait_secs + 2, 65) | |
| print(f"Generation: synthesis waiting {wait_secs:.0f}s for SambaNova to recoverβ¦") | |
| time.sleep(wait_secs) | |
| # 2. Clear Google exhaustion windows (we either waited them out or they expired). | |
| self._exhausted_until.pop('gemini', None) | |
| self._exhausted_until.pop('gemma4', None) | |
| self._exhausted_until.pop('sambanova', None) | |
| # 3. Block Cerebras for this thread using threading.local(). | |
| # WHY threading.local() instead of a shared counter: Phase 3 synthesis is | |
| # sequential, but generate_quality() calls from Phase 2 workers also set this | |
| # flag concurrently. A shared counter would race: Worker B finishing its call | |
| # decrements to 0 while Worker A is mid-cascade, unblocking Cerebras for A. | |
| # threading.local() is per-thread β each thread's flag is independent. | |
| old_quality = getattr(_quality_local, 'quality_only', False) | |
| _quality_local.quality_only = True | |
| # 4. Skip thinking models β Gemma 4's THINK block eats the 3000-token budget. | |
| self._skip_thinking = True | |
| try: | |
| # 5. Retry loop: if all providers are exhausted (Phase 2 burned through all | |
| # 60s windows right before synthesis runs), wait one full window cycle and | |
| # retry. Synthesis is the ONE place where a 67s wait is acceptable β it's | |
| # a single sequential call and "all providers exhausted" is recoverable. | |
| for attempt in range(3): | |
| try: | |
| return self.generate(system, prompt, **kwargs) | |
| except RuntimeError as e: | |
| if "rate-limited" not in str(e).lower() or attempt >= 2: | |
| raise | |
| wait = 67 # 60s window + 7s buffer β enough for ALL providers to clear | |
| print(f"Generation: synthesis all-exhausted β waiting {wait}s for providers to recover (attempt {attempt + 1}/3)β¦") | |
| time.sleep(wait) | |
| # Clear every exhaustion window; they've all expired by now. | |
| self._exhausted_until.clear() | |
| finally: | |
| self._skip_thinking = False | |
| _quality_local.quality_only = old_quality | |
| def grade_answer(self, question: str, context: str, answer: str, query_type: str = "technical") -> dict: | |
| """ | |
| Model-based grading: ask the LLM to evaluate the answer against the sources. | |
| WHY THIS EXISTS | |
| βββββββββββββββ | |
| RAG answers can silently hallucinate β the LLM sounds confident but makes | |
| claims not backed by any retrieved source. Without grading, the user has no | |
| signal about answer reliability beyond manually cross-checking citations. | |
| Model-based grading turns the LLM into its own fact-checker. A second, short | |
| call reads (question + sources + answer) and outputs a confidence verdict. | |
| This is the same technique used in production RAG evaluation pipelines. | |
| FREE-TIER DESIGN | |
| ββββββββββββββββ | |
| The grading prompt is intentionally tiny: | |
| - Context is truncated to 2000 chars (enough to spot hallucinations) | |
| - Output is capped at 80 tokens (just the JSON verdict) | |
| - Uses the same provider already active β no second model needed | |
| This adds ~300ms after the last token, which is acceptable. | |
| Returns: | |
| {"confidence": "high|medium|low", "faithful": bool, "note": "one sentence"} | |
| On any failure: {"confidence": "unknown", "faithful": True, "note": ""} | |
| """ | |
| # Creative queries (high-level summaries, explanations) require synthesis. | |
| # Grading them on faithfulness penalises correct answers that interpret code. | |
| # Only grade technical queries where every claim should be traceable to source. | |
| if query_type == "creative": | |
| return {"confidence": "unknown", "faithful": True, "note": ""} | |
| # Extract source headers only (file + function name per source) so the grader | |
| # sees ALL retrieved sources without blowing up the prompt with full code text. | |
| # Truncating to 1200 chars caused false "low" grades because sources 3-6 were | |
| # cut off β the answer would cite them but the grader couldn't see them. | |
| import re as _re | |
| source_headers = "\n".join(_re.findall(r"\[Source \d+[^\]]*\]", context)) | |
| if not source_headers: | |
| source_headers = context[:2000] # fallback if format unexpected | |
| _GRADE_SYSTEM = ( | |
| "RAG grader. Does the answer match the sources? Return ONLY JSON." | |
| ) | |
| _GRADE_PROMPT = ( | |
| f"SOURCES RETRIEVED:\n{source_headers}\n\nANSWER:\n{answer[:1000]}\n\n" | |
| 'Return: {"confidence":"high|medium|low","note":"one sentence"}\n' | |
| "high=answer claims match retrieved sources, medium=minor extrapolation, low=claims contradict or aren't in sources" | |
| ) | |
| try: | |
| raw = self.generate(_GRADE_SYSTEM, _GRADE_PROMPT, temperature=0.0, json_mode=True, max_tokens=80) | |
| if not raw or not raw.strip(): | |
| return {"confidence": "unknown", "faithful": True, "note": ""} | |
| import json as _json | |
| grade = _json.loads(raw.strip()) | |
| # Normalise β guard against unexpected LLM field values | |
| grade["confidence"] = grade.get("confidence", "unknown").lower() | |
| if grade["confidence"] not in ("high", "medium", "low"): | |
| grade["confidence"] = "unknown" | |
| grade["faithful"] = bool(grade.get("faithful", True)) | |
| grade["note"] = str(grade.get("note", ""))[:200] | |
| return grade | |
| except Exception as e: | |
| print(f"Grading failed (non-fatal): {e}") | |
| return {"confidence": "unknown", "faithful": True, "note": ""} | |
| def _reset_to_primary(self) -> None: | |
| """Reset to Gemini (primary) so recovered rate limits get used again. | |
| Called at the start of each public method β if Gemini is still in its | |
| 60-second exhaustion window (set by _try_fallback after a 429), we skip | |
| the reset so parallel calls don't all hammer Gemini simultaneously. | |
| Without the exhaustion window, 12 parallel ReAct calls all reset to Gemini, | |
| all 429, all cascade to Groq β exhausting its daily token limit. | |
| """ | |
| import time | |
| now = time.monotonic() | |
| exhausted_until = getattr(self, '_exhausted_until', {}) | |
| if exhausted_until.get("gemini", 0) > now: | |
| # Gemini still rate-limited β stay on current fallback provider | |
| return | |
| if self.provider not in ("gemini", "gemma4") and settings.gemini_api_key: | |
| from openai import OpenAI | |
| self._client = OpenAI( | |
| api_key=settings.gemini_api_key, | |
| base_url="https://generativelanguage.googleapis.com/v1beta/openai/", | |
| timeout=30, max_retries=0, | |
| ) | |
| self._model = "gemini-2.5-flash" | |
| self.provider = "gemini" | |
| print(f"Generation: reset to Gemini (primary)") | |
| def generate(self, system: str, prompt: str, temperature: float = 0.2, | |
| json_mode: bool = False, max_tokens: int = 2048, | |
| fast: bool = False, premium: bool = False) -> str: | |
| """ | |
| One-shot generation with a custom system prompt β no RAG context, no history. | |
| Used for structured tasks like diagram/tour generation where we control | |
| the full prompt ourselves. | |
| temperature=0.0 for factual/structured output (tour, concept graphs). | |
| temperature=0.2 for Mermaid diagrams (slight variation helps layout). | |
| fast=True: use the lightweight model tier (_fast_model) instead of the | |
| primary model. Intended for high-volume enrichment calls (one per chunk | |
| during contextual retrieval) where synthesis quality is secondary and | |
| throughput/quota preservation matters more. | |
| Thread-safe: model selection is passed through the params dict (created | |
| fresh per call) rather than mutating self._model. | |
| premium=True: route to the premium client (Claude Sonnet 4.6) instead | |
| of the cascade. Used when generating artifacts that get cached and | |
| reused indefinitely β paying once for top quality is worth it. If | |
| ANTHROPIC_API_KEY is not configured, falls through to the cascade | |
| with a one-line warning so callers don't fail silently. | |
| json_mode=True: tells the provider to output valid JSON. | |
| - OpenAI-compatible (Groq, Gemini, OpenRouter): sets response_format=json_object | |
| which FORCES the model to output valid JSON β no markdown fences, no preamble. | |
| - Anthropic: no separate flag needed; the system prompt already says "Return ONLY | |
| valid JSON". Prompt caching (see _anthropic_complete) keeps this cheap to repeat. | |
| """ | |
| # Premium path β bypass the cascade and use the dedicated client. | |
| # `premium=True` per call OR self.premium_mode set at process scope | |
| # both qualify; the prebake CLI uses the latter to flip everything | |
| # at once without threading the kwarg through every service layer. | |
| wants_premium = premium or self.premium_mode | |
| if wants_premium and self._premium_client: | |
| print(f" [generate] premium={self._premium_model} max_tokens={max_tokens} json={json_mode}") | |
| messages = [{"role": "user", "content": prompt}] | |
| params = {"temperature": temperature, "max_tokens": max_tokens, | |
| "json_mode": json_mode, "model": self._premium_model} | |
| return self._premium_complete(system, messages, params) | |
| if wants_premium and not self._premium_client: | |
| print(" [generate] premium requested but ANTHROPIC_API_KEY missing β falling back to cascade") | |
| # Only reset to primary on the first call, not on fallback retries. | |
| # Without this guard, the recursive retry call resets back to Groq, | |
| # which 429s again, which retries again β infinite recursion. | |
| if not getattr(self, '_in_fallback', False): | |
| self._reset_to_primary() | |
| model = getattr(self, '_fast_model', self._model) if fast else self._model | |
| print(f" [generate] provider={self.provider} model={model} max_tokens={max_tokens} json={json_mode}") | |
| messages = [{"role": "user", "content": prompt}] | |
| params = {"temperature": temperature, "max_tokens": max_tokens, | |
| "json_mode": json_mode, "model": model} | |
| try: | |
| if self.provider in ("cerebras", "groq", "gemini", "gemma4", "openrouter", "sambanova", "mistral"): | |
| return self._groq_complete(system, messages, params) | |
| else: | |
| return self._anthropic_complete(system, messages, params) | |
| except Exception as e: | |
| print(f" [generate] {self.provider} failed: {str(e)[:120]}") | |
| if _is_exhausted(e) and self._try_fallback(): | |
| self._in_fallback = True | |
| try: | |
| return self.generate(system, prompt, temperature, json_mode, max_tokens, fast) | |
| finally: | |
| self._in_fallback = False | |
| if _is_exhausted(e): | |
| # All providers exhausted β raise a clean message instead of the raw API | |
| # error (which leaks provider names, org IDs, and 429 HTTP details) | |
| raise RuntimeError( | |
| "All LLM providers are currently rate-limited or unavailable. " | |
| "Please wait a few minutes and try again." | |
| ) from None | |
| raise | |
| def current_model(self, premium: bool = False, fast: bool = False) -> str: | |
| """Return the model id that a `generate(premium=..., fast=...)` call | |
| would currently use. Surfaced so callers can persist it as the | |
| `generated_by_model` field on cached artifacts without inspecting | |
| internal state. Honours the process-global premium_mode flag too.""" | |
| if (premium or self.premium_mode) and self._premium_model: | |
| return self._premium_model | |
| if fast and getattr(self, "_fast_model", None): | |
| return self._fast_model | |
| return self._model | |
| def answer(self, question: str, context: str, query_type: str, history: list[dict] | None = None) -> str: | |
| """ | |
| Generate a complete answer (non-streaming). | |
| Args: | |
| question: The user's question | |
| context: Retrieved code chunks formatted as a numbered source list | |
| query_type: 'technical' or 'creative' β selects system prompt + params | |
| history: Prior conversation turns [{role, content}, ...] for follow-up questions | |
| """ | |
| system = _SYSTEM_TECHNICAL if query_type == "technical" else _SYSTEM_CREATIVE | |
| params = _PARAMS[query_type] | |
| messages = _build_messages(question, context, history) | |
| try: | |
| if self.provider in ("cerebras", "groq", "gemini", "gemma4", "openrouter", "sambanova", "mistral"): | |
| return self._groq_complete(system, messages, params) | |
| else: | |
| return self._anthropic_complete(system, messages, params) | |
| except Exception as e: | |
| if _is_exhausted(e) and self._try_fallback(): | |
| return self.answer(question, context, query_type, history) # retry with new provider | |
| raise | |
| def stream(self, question: str, context: str, query_type: str, history: list[dict] | None = None) -> Iterator[str]: | |
| """ | |
| Generate a streaming answer, yielding tokens as they arrive. | |
| Streaming gives the UI token-by-token output β the user sees the answer | |
| being written in real time rather than waiting for the full response. | |
| FastAPI uses Server-Sent Events (SSE) to push these tokens to the browser. | |
| Args: | |
| question: The user's question | |
| context: Formatted context string from RetrievalService.format_context() | |
| query_type: 'technical' or 'creative' | |
| history: Prior conversation turns [{role, content}, ...] for follow-up questions | |
| Yields: | |
| str: Individual text tokens as they are generated | |
| """ | |
| if not getattr(self, '_in_fallback', False): | |
| self._reset_to_primary() | |
| system = _SYSTEM_TECHNICAL if query_type == "technical" else _SYSTEM_CREATIVE | |
| params = _PARAMS[query_type] | |
| messages = _build_messages(question, context, history) | |
| try: | |
| if self.provider in ("cerebras", "groq", "gemini", "gemma4", "openrouter", "sambanova", "mistral"): | |
| raw = self._groq_stream(system, messages, params) | |
| else: | |
| raw = self._anthropic_stream(system, messages, params) | |
| yield from _strip_thought_tokens(raw) | |
| except Exception as e: | |
| if _is_exhausted(e) and self._try_fallback(): | |
| self._in_fallback = True | |
| try: | |
| yield from self.stream(question, context, query_type, history) | |
| finally: | |
| self._in_fallback = False | |
| else: | |
| raise | |
| # ββ Groq / Gemini / OpenRouter implementation βββββββββββββββββββββββββββββ | |
| # All three use the OpenAI SDK interface, so one implementation covers all. | |
| # Hard output-token ceilings per provider. Each provider enforces its own | |
| # limit server-side β exceeding it returns a 400 that _is_exhausted doesn't | |
| # catch. We cap here so fallback providers silently use their actual limit | |
| # rather than erroring out. Callers can request more; we deliver as much as | |
| # the current provider allows. | |
| _MAX_OUTPUT: dict[str, int] = { | |
| "gemini": 65536, | |
| "gemma4": 32768, | |
| "cerebras": 16384, | |
| "sambanova": 4096, # DeepSeek-V3.1 free tier β conservative cap; fits Phase 3 synthesis (max_tokens=3000) | |
| "openrouter": 8192, # conservative; varies by routed model | |
| "mistral": 32768, | |
| "groq": 32768, | |
| } | |
| def _groq_complete(self, system: str, messages: list[dict], params: dict) -> str: | |
| model = params.get("model", self._model) | |
| max_out = min(params["max_tokens"], self._MAX_OUTPUT.get(self.provider, params["max_tokens"])) | |
| kwargs: dict = dict( | |
| model=model, | |
| messages=[{"role": "system", "content": system}] + messages, | |
| temperature=params["temperature"], | |
| max_tokens=max_out, | |
| ) | |
| # Structured JSON output: instructs the model to emit ONLY a valid JSON | |
| # object. No markdown fences, no explanatory text β just the JSON. | |
| # json_object response_format is reliable for Groq, Cerebras, Mistral, SambaNova. | |
| # Gemini's OpenAI-compat endpoint does NOT reliably honour it β when set, Gemini | |
| # truncates the output at ~100 tokens regardless of max_tokens (silent failure). | |
| # For Gemini we rely solely on the system prompt instruction ("Return ONLY JSON"). | |
| if params.get("json_mode") and self.provider not in ("gemini", "gemma4"): | |
| kwargs["response_format"] = {"type": "json_object"} | |
| # Gemini 2.5 Flash has adaptive thinking enabled by default. Thinking tokens count | |
| # against the output token budget (max_tokens). For a synthesis call with | |
| # max_tokens=3000, Gemini can spend ~2500 tokens on thinking and only ~500 on actual | |
| # output β producing a truncated JSON. The simplest fix: request enough tokens that | |
| # thinking + actual output both fit. max_tokens=8192 gives Gemini ~5-6k for output | |
| # even after spending 2-3k on thinking. _MAX_OUTPUT['gemini']=65536 so this is fine. | |
| # NOTE: the OpenAI-compat extra_body field for thinking config is NOT "thinking_config" | |
| # (that is the native REST API name). Until we confirm the compat field name, use | |
| # a generous max_tokens instead. | |
| if getattr(self, '_skip_thinking', False) and self.provider == "gemini" and params.get("json_mode"): | |
| # Bump the token budget so thinking tokens don't crowd out the JSON output. | |
| max_out = min(8192, self._MAX_OUTPUT.get(self.provider, 8192)) | |
| kwargs["max_tokens"] = max_out | |
| response = self._client.chat.completions.create(**kwargs) | |
| choice = response.choices[0] | |
| finish = choice.finish_reason | |
| content = choice.message.content or "" | |
| if finish == "length": | |
| print(f" [gen] finish_reason=length provider={self.provider} model={self._model} " | |
| f"max_tokens={params['max_tokens']} output_len={len(content)}") | |
| # For large JSON budgets (synthesis at 2000+ tokens), truncation is a hard failure: | |
| # the JSON will be incomplete and unparseable. Raise to trigger the fallback cascade. | |
| # For small budgets (<2000 tokens, e.g. Phase 1 map at 1024 or investigation at 900), | |
| # truncation may be benign β the caller handles partial output gracefully. Don't raise. | |
| # The threshold is 2000: synthesis uses max_tokens=3000-8192; small calls use β€1024. | |
| if params.get("json_mode") and params.get("max_tokens", 0) >= 2000: | |
| raise RuntimeError(f"output truncated (finish_reason=length) β trying next provider") | |
| return content | |
| def _groq_stream(self, system: str, messages: list[dict], params: dict) -> Iterator[str]: | |
| stream = self._client.chat.completions.create( | |
| model=self._model, | |
| messages=[{"role": "system", "content": system}] + messages, | |
| temperature=params["temperature"], | |
| max_tokens=params["max_tokens"], | |
| stream=True, | |
| ) | |
| yielded = False | |
| for chunk in stream: | |
| delta = chunk.choices[0].delta.content | |
| if delta: | |
| yielded = True | |
| yield delta | |
| if not yielded: | |
| # Model returned 200 OK but streamed no content (e.g. stepfun free tier). | |
| # Raise so the caller's exception handler can try the next provider. | |
| raise RuntimeError("Stream completed with no content from model") | |
| # ββ Anthropic implementation βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _anthropic_complete(self, system: str, messages: list[dict], params: dict) -> str: | |
| # Prompt caching: wrapping the system prompt in a cache_control block | |
| # tells Anthropic to cache the compiled KV state for this text. | |
| # On the SECOND call with the same system prompt, Anthropic returns | |
| # the cached result instantly β no re-processing, ~90% cost reduction | |
| # for the cached tokens. | |
| # | |
| # Rules for caching to activate: | |
| # - Content must be β₯ 1024 tokens (for claude-3-5+) | |
| # - Same content must appear in the same position across calls | |
| # - Cache TTL is 5 minutes (ephemeral) β enough for multi-turn chat | |
| # | |
| # Our system prompts are short (~50 tokens), so they won't cache on their | |
| # own. But for large code-context calls (diagram generation), the user | |
| # message is large and we mark the last user turn cacheable there too. | |
| # The real benefit kicks in when the same large prompt is reused across | |
| # multiple diagram types for the same repo. | |
| system_block = [{"type": "text", "text": system, "cache_control": {"type": "ephemeral"}}] | |
| response = self._client.messages.create( | |
| model=self._model, | |
| system=system_block, | |
| messages=messages, | |
| temperature=params["temperature"], | |
| max_tokens=params["max_tokens"], | |
| ) | |
| return response.content[0].text | |
| def _premium_complete(self, system: str, messages: list[dict], params: dict) -> str: | |
| """Run a single Anthropic call against the premium client/model | |
| (kept separate from the cascade's primary `_client` so the regular | |
| runtime path is unaffected). Used by callers that pass `premium=True` | |
| to `generate()` for one-time generation of artifacts that get cached | |
| and reused (tour, README, diagrams). | |
| Includes 429 retry-with-backoff. Anthropic returns the rate-limit | |
| reset time on the response (in `retry-after` seconds or in | |
| `anthropic-ratelimit-output-tokens-reset` ISO timestamp). We sleep | |
| until the bucket refills and retry, capped at MAX_RETRIES so a | |
| broken account state doesn't loop forever. Self-paces the prebake | |
| within whatever Anthropic tier limit applies β no quality loss when | |
| rate-limited, just slower wall time.""" | |
| if not self._premium_client: | |
| raise RuntimeError("premium client not initialised") | |
| MAX_RETRIES = 4 | |
| system_block = [{"type": "text", "text": system, "cache_control": {"type": "ephemeral"}}] | |
| for attempt in range(MAX_RETRIES + 1): | |
| try: | |
| response = self._premium_client.messages.create( | |
| model=self._premium_model, | |
| system=system_block, | |
| messages=messages, | |
| temperature=params["temperature"], | |
| max_tokens=params["max_tokens"], | |
| ) | |
| return response.content[0].text | |
| except Exception as e: | |
| # Anthropic SDK exposes the response on rate-limit / status errors. | |
| # Try several places the wait hint might live: | |
| # 1. response.headers["retry-after"] (seconds) | |
| # 2. response.headers["anthropic-ratelimit-..."] (ISO timestamp) | |
| # 3. exponential backoff fallback | |
| status = getattr(e, "status_code", None) | |
| is_429 = status == 429 or "rate_limit" in str(e).lower() or "429" in str(e) | |
| if not is_429 or attempt >= MAX_RETRIES: | |
| raise | |
| wait_s = self._anthropic_retry_seconds(e, attempt) | |
| # Add a small fudge factor so we land just past the reset boundary | |
| wait_s = max(1.0, wait_s + 1.0) | |
| print(f" [premium] rate limited; sleeping {wait_s:.1f}s before retry {attempt + 1}/{MAX_RETRIES}") | |
| time.sleep(wait_s) | |
| # Defensive β loop above should always either return or raise. | |
| raise RuntimeError("premium retry loop exited without resolution") | |
| def _anthropic_retry_seconds(err: Exception, attempt: int) -> float: | |
| """Best-effort parse of how long to wait before retrying an Anthropic | |
| 429. Falls back to exponential backoff (5, 10, 20, 40 s) if no header | |
| information is available.""" | |
| from datetime import datetime, timezone | |
| headers = {} | |
| # The SDK attaches the underlying response on httpx.HTTPStatusError-like errors. | |
| for attr in ("response", "_response", "http_response"): | |
| r = getattr(err, attr, None) | |
| if r is not None: | |
| headers = dict(getattr(r, "headers", {}) or {}) | |
| if headers: | |
| break | |
| # Normalise to lowercase keys so capitalised variants don't slip past. | |
| headers = {k.lower(): v for k, v in headers.items()} | |
| retry_after = headers.get("retry-after") | |
| if retry_after: | |
| try: | |
| return float(retry_after) | |
| except (TypeError, ValueError): | |
| pass | |
| reset_iso = headers.get("anthropic-ratelimit-output-tokens-reset") | |
| if reset_iso: | |
| try: | |
| reset_dt = datetime.fromisoformat(reset_iso.replace("Z", "+00:00")) | |
| now = datetime.now(timezone.utc) | |
| return max(1.0, (reset_dt - now).total_seconds()) | |
| except Exception: | |
| pass | |
| # Exponential backoff fallback: 5, 10, 20, 40 s | |
| return min(60.0, 5.0 * (2 ** attempt)) | |
| def _anthropic_stream(self, system: str, messages: list[dict], params: dict) -> Iterator[str]: | |
| system_block = [{"type": "text", "text": system, "cache_control": {"type": "ephemeral"}}] | |
| with self._client.messages.stream( | |
| model=self._model, | |
| system=system_block, | |
| messages=messages, | |
| temperature=params["temperature"], | |
| max_tokens=params["max_tokens"], | |
| ) as stream: | |
| for text in stream.text_stream: | |
| yield text | |
| # ββ Helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _is_exhausted(e: Exception) -> bool: | |
| """Return True if this error means the provider is unavailable and we should try another. | |
| Includes 429 (rate limit) because each provider's rate limit is independent β | |
| if Gemini is rate-limited, OpenRouter likely isn't. The fallback chain is designed | |
| to only go to safe providers (Groq excluded from agent due to hermes bug), so | |
| triggering fallback on 429 is safe and gives the user an immediate response | |
| instead of "please wait a moment". | |
| """ | |
| msg = str(e).lower() | |
| return any(kw in msg for kw in ( | |
| "credit", "billing", "quota", "rate_limit", "rate limit", | |
| "resource_exhausted", "daily limit", "429", "no content from model", | |
| # Timeouts mean the provider is overloaded β try the next one rather than | |
| # surfacing a confusing error. Confirmed from logs: Gemma 4 times out on | |
| # Phase 3 synthesis (3000 tokens) while SDK retries eat 3+ minutes. | |
| "timed out", "timeout", "read timeout", | |
| # 503 high demand / unavailable β treat same as rate limit, try next provider | |
| "503", "unavailable", "high demand", "overloaded", | |
| # 404 model-not-found: treat as exhausted so we fall through to the next | |
| # provider rather than surfacing the error to the user. Happens when a | |
| # free-tier model is removed or renamed (e.g. Cerebras model slug changes). | |
| "404", "model_not_found", "does not exist", | |
| # 500 internal server error β transient server-side failure, not a caller bug. | |
| # Gemini returns this as {'code': 500, 'status': 'INTERNAL'} during overload. | |
| # Fall through to the next provider rather than crashing the tour. | |
| "500", "internal error", "internal_error", | |
| # finish_reason=length on a JSON call means the output was truncated mid-JSON | |
| # and is unusable. Fall to the next provider (SambaNova) which doesn't have | |
| # Gemini's thinking-token overhead that causes this truncation. | |
| "output truncated (finish_reason=length)", | |
| )) | |
| def _build_messages(question: str, context: str, history: list[dict] | None = None) -> list[dict]: | |
| """ | |
| Build the LLM message list: prior conversation turns + current question with code context. | |
| History turns are bare question/answer pairs β we don't re-attach the retrieved | |
| code context for them (that context is no longer relevant). Only the current | |
| question gets the freshly retrieved context injected. | |
| Context comes first in the current turn because: | |
| - LLMs attend better to information near the start of the prompt | |
| - The question at the end is a clear instruction after reading the context | |
| """ | |
| messages = [{"role": h["role"], "content": h["content"]} for h in (history or [])] | |
| current_prompt = f"""Here is the relevant source code retrieved from the repository: | |
| {context} | |
| --- | |
| Question: {question} | |
| Answer based on the source code above. Cite sources by number (e.g. "Source 1").""" | |
| messages.append({"role": "user", "content": current_prompt}) | |
| return messages | |