nothex commited on
Commit
f4dde08
Β·
1 Parent(s): 62aafbc

feat: rate limiting, token budget guard, and rerank distillation loop

Browse files

- Add slowapi per-user rate limit (60/hour) on /query endpoint
- Enforce MAX_CONTEXT_CHARS=14000 token budget after reranking in retrieve_chunks()
- Tighten error handler in query.py β€” no raw exceptions leak to browser
- Add _log_rerank_feedback() (fire-and-forget background thread) after Cohere rerank
- Add supabase/migrations/0003_rerank_feedback.sql for feedback table
- Add backend/core/distill_reranker.py β€” offline CrossEncoder fine-tuning script

.claude/settings.local.json ADDED
@@ -0,0 +1,37 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ {
2
+ "hooks": {
3
+ "PreCompact": [
4
+ {
5
+ "hooks": [
6
+ {
7
+ "type": "command",
8
+ "command": "powershell -NoProfile -File \"D:/Work/Projects/proj/.dual-graph/prime.ps1\""
9
+ }
10
+ ],
11
+ "matcher": ""
12
+ }
13
+ ],
14
+ "Stop": [
15
+ {
16
+ "hooks": [
17
+ {
18
+ "type": "command",
19
+ "command": "powershell -NoProfile -File \"D:/Work/Projects/proj/.dual-graph/stop_hook.ps1\""
20
+ }
21
+ ],
22
+ "matcher": ""
23
+ }
24
+ ],
25
+ "SessionStart": [
26
+ {
27
+ "hooks": [
28
+ {
29
+ "type": "command",
30
+ "command": "powershell -NoProfile -File \"D:/Work/Projects/proj/.dual-graph/prime.ps1\""
31
+ }
32
+ ],
33
+ "matcher": ""
34
+ }
35
+ ]
36
+ }
37
+ }
.gitignore CHANGED
@@ -15,3 +15,4 @@ intent_feedback.jsonl
15
  note_to_me.txt
16
  *.pkl
17
 
 
 
15
  note_to_me.txt
16
  *.pkl
17
 
18
+ .dual-graph/
CLAUDE.md ADDED
@@ -0,0 +1,94 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ <!-- dgc-policy-v11 -->
2
+ # Dual-Graph Context Policy
3
+
4
+ This project uses a local dual-graph MCP server for efficient context retrieval.
5
+
6
+ ## MANDATORY: Adaptive graph_continue rule
7
+
8
+ **Call `graph_continue` ONLY when you do NOT already know the relevant files.**
9
+
10
+ ### Call `graph_continue` when:
11
+ - This is the first message of a new task / conversation
12
+ - The task shifts to a completely different area of the codebase
13
+ - You need files you haven't read yet in this session
14
+
15
+ ### SKIP `graph_continue` when:
16
+ - You already identified the relevant files earlier in this conversation
17
+ - You are doing follow-up work on files already read (verify, refactor, test, docs, cleanup, commit)
18
+ - The task is pure text (writing a commit message, summarising, explaining)
19
+
20
+ **If skipping, go directly to `graph_read` on the already-known `file::symbol`.**
21
+
22
+ ## When you DO call graph_continue
23
+
24
+ 1. **If `graph_continue` returns `needs_project=true`**: call `graph_scan` with `pwd`. Do NOT ask the user.
25
+
26
+ 2. **If `graph_continue` returns `skip=true`**: fewer than 5 files β€” read only specifically named files.
27
+
28
+ 3. **Read `recommended_files`** using `graph_read`.
29
+ - Always use `file::symbol` notation (e.g. `src/auth.ts::handleLogin`) β€” never read whole files.
30
+ - `recommended_files` entries that already contain `::` must be passed verbatim.
31
+
32
+ 4. **Obey confidence caps:**
33
+ - `confidence=high` -> Stop. Do NOT grep or explore further.
34
+ - `confidence=medium` -> `fallback_rg` at most `max_supplementary_greps` times, then `graph_read` at most `max_supplementary_files` more symbols. Stop.
35
+ - `confidence=low` -> same as medium. Stop.
36
+
37
+ ## Session State (compact, update after every turn)
38
+
39
+ Maintain a short JSON block in your working memory. Update it after each turn:
40
+
41
+ ```json
42
+ {
43
+ "files_identified": ["path/to/file.py"],
44
+ "symbols_changed": ["module::function"],
45
+ "fix_applied": true,
46
+ "features_added": ["description"],
47
+ "open_issues": ["one-line note"]
48
+ }
49
+ ```
50
+
51
+ Use this state β€” not prose summaries β€” to remember what's been done across turns.
52
+
53
+ ## Token Usage
54
+
55
+ A `token-counter` MCP is available for tracking live token usage.
56
+
57
+ - Before reading a large file: `count_tokens({text: "<content>"})` to check cost first.
58
+ - To show running session cost: `get_session_stats()`
59
+ - To log completed task: `log_usage({input_tokens: N, output_tokens: N, description: "task"})`
60
+
61
+ ## Rules
62
+
63
+ - Do NOT use `rg`, `grep`, or bash file exploration before calling `graph_continue` (when required).
64
+ - Do NOT do broad/recursive exploration at any confidence level.
65
+ - `max_supplementary_greps` and `max_supplementary_files` are hard caps β€” never exceed them.
66
+ - Do NOT call `graph_continue` more than once per turn.
67
+ - Always use `file::symbol` notation with `graph_read` β€” never bare filenames.
68
+ - After edits, call `graph_register_edit` with changed files using `file::symbol` notation.
69
+
70
+ ## Context Store
71
+
72
+ Whenever you make a decision, identify a task, note a next step, fact, or blocker during a conversation, append it to `.dual-graph/context-store.json`.
73
+
74
+ **Entry format:**
75
+ ```json
76
+ {"type": "decision|task|next|fact|blocker", "content": "one sentence max 15 words", "tags": ["topic"], "files": ["relevant/file.ts"], "date": "YYYY-MM-DD"}
77
+ ```
78
+
79
+ **To append:** Read the file -> add the new entry to the array -> Write it back -> call `graph_register_edit` on `.dual-graph/context-store.json`.
80
+
81
+ **Rules:**
82
+ - Only log things worth remembering across sessions (not every minor detail)
83
+ - `content` must be under 15 words
84
+ - `files` lists the files this decision/task relates to (can be empty)
85
+ - Log immediately when the item arises β€” not at session end
86
+
87
+ ## Session End
88
+
89
+ When the user signals they are done (e.g. "bye", "done", "wrap up", "end session"), proactively update `CONTEXT.md` in the project root with:
90
+ - **Current Task**: one sentence on what was being worked on
91
+ - **Key Decisions**: bullet list, max 3 items
92
+ - **Next Steps**: bullet list, max 3 items
93
+
94
+ Keep `CONTEXT.md` under 20 lines total. Do NOT summarize the full conversation β€” only what's needed to resume next session.
backend/api/query.py CHANGED
@@ -2,18 +2,20 @@
2
  import json
3
  import logging
4
  import asyncio
5
- from fastapi import APIRouter, Header,Depends
6
  from fastapi.responses import StreamingResponse
7
  from shared.types import QueryRequest, SourceChunk
8
  from backend.core.pipeline import retrieve_chunks, generate_answer_stream, analyse_intent
9
- from backend.core.auth_utils import require_auth_token
 
10
 
11
  log = logging.getLogger("nexus.api.query")
12
  router = APIRouter()
13
 
14
 
15
  @router.post("")
16
- async def query(req: QueryRequest, user_id: str = Depends(require_auth_token),x_auth_token: str = Header(None, alias="X-Auth-Token")):
 
17
  if not req.query or not req.query.strip():
18
  async def _err():
19
  yield "data: " + json.dumps({"type": "error", "content": "Query cannot be empty."}) + "\n\n"
@@ -104,15 +106,21 @@ async def query(req: QueryRequest, user_id: str = Depends(require_auth_token),x_
104
 
105
  except Exception as e:
106
  err = str(e)
107
- if "429" in err:
 
 
108
  friendly = "AI service is busy β€” please try again in a moment."
109
- elif "context" in err.lower() or "tokens" in err.lower():
110
  friendly = "Query too long β€” try asking a more specific question."
111
- elif "NoneType" in err:
 
 
 
 
112
  friendly = "Retrieval service error β€” please try again."
113
  else:
114
- friendly = "Something went wrong generating the answer β€” please try again."
115
- yield f'data: {{"type": "error", "content": "{friendly}"}}\n\n'
116
 
117
  return StreamingResponse(
118
  event_stream(),
 
2
  import json
3
  import logging
4
  import asyncio
5
+ from fastapi import APIRouter, Header, Depends, Request
6
  from fastapi.responses import StreamingResponse
7
  from shared.types import QueryRequest, SourceChunk
8
  from backend.core.pipeline import retrieve_chunks, generate_answer_stream, analyse_intent
9
+ from backend.core.auth_utils import require_auth_token
10
+ from backend.main import limiter
11
 
12
  log = logging.getLogger("nexus.api.query")
13
  router = APIRouter()
14
 
15
 
16
  @router.post("")
17
+ @limiter.limit("60/hour")
18
+ async def query(request: Request, req: QueryRequest, user_id: str = Depends(require_auth_token), x_auth_token: str = Header(None, alias="X-Auth-Token")):
19
  if not req.query or not req.query.strip():
20
  async def _err():
21
  yield "data: " + json.dumps({"type": "error", "content": "Query cannot be empty."}) + "\n\n"
 
106
 
107
  except Exception as e:
108
  err = str(e)
109
+ # Log full error server-side for debugging β€” never expose to client
110
+ log.error("query stream error: %s", err, exc_info=True)
111
+ if "429" in err or "rate limit" in err.lower():
112
  friendly = "AI service is busy β€” please try again in a moment."
113
+ elif "context" in err.lower() or "tokens" in err.lower() or "too long" in err.lower():
114
  friendly = "Query too long β€” try asking a more specific question."
115
+ elif "timeout" in err.lower() or "timed out" in err.lower():
116
+ friendly = "Request timed out β€” please try again."
117
+ elif "connect" in err.lower() or "network" in err.lower():
118
+ friendly = "Could not reach AI service β€” check your connection and try again."
119
+ elif "NoneType" in err or "AttributeError" in err or "KeyError" in err:
120
  friendly = "Retrieval service error β€” please try again."
121
  else:
122
+ friendly = "Something went wrong β€” please try again."
123
+ yield "data: " + json.dumps({"type": "error", "content": friendly}) + "\n\n"
124
 
125
  return StreamingResponse(
126
  event_stream(),
backend/core/distill_reranker.py ADDED
@@ -0,0 +1,150 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ backend/core/distill_reranker.py
3
+ =================================
4
+ Offline CrossEncoder distillation script.
5
+
6
+ Run this manually (or as a cron job) once you have ~500+ rows in rerank_feedback:
7
+
8
+ python -m backend.core.distill_reranker
9
+
10
+ What it does:
11
+ 1. Fetches all rerank_feedback rows from Supabase
12
+ 2. Builds (query_hash, chunk_hash, label) training pairs
13
+ label = 1.0 if was_selected else cohere_score (soft labels from Cohere)
14
+ 3. Fine-tunes cross-encoder/ms-marco-MiniLM-L-6-v2 on these pairs
15
+ 4. Saves the fine-tuned model to backend/core/local_reranker/
16
+
17
+ After enough rows (recommended: retrain every 500 new rows), the local CrossEncoder
18
+ learns Cohere's ranking preferences on YOUR corpus. Over time, Cohere dependency drops.
19
+
20
+ The pipeline already uses the local CrossEncoder as fallback (Path 2). Once distilled,
21
+ you can optionally promote the local model to Path 1 and make Cohere the fallback.
22
+ """
23
+
24
+ import os
25
+ import logging
26
+ import json
27
+ from dotenv import load_dotenv
28
+
29
+ load_dotenv()
30
+ log = logging.getLogger("nexus.distill")
31
+ logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
32
+
33
+ DISTILLED_MODEL_PATH = "backend/core/local_reranker"
34
+ MIN_ROWS_TO_TRAIN = 200 # skip if fewer rows β€” model won't generalise
35
+ SOFT_LABEL_SCALE = 1.0 # multiply cohere_score by this for non-selected rows
36
+
37
+
38
+ def fetch_feedback_rows() -> list[dict]:
39
+ """Pull all rerank_feedback rows via service role (bypasses RLS)."""
40
+ from supabase.client import create_client
41
+ sb = create_client(os.environ["SUPABASE_URL"], os.environ["SUPABASE_SERVICE_KEY"])
42
+
43
+ # Fetch in pages of 1000
44
+ all_rows = []
45
+ page = 0
46
+ while True:
47
+ res = (
48
+ sb.table("rerank_feedback")
49
+ .select("query_hash, chunk_hash, cohere_score, was_selected, document_type")
50
+ .range(page * 1000, (page + 1) * 1000 - 1)
51
+ .execute()
52
+ )
53
+ batch = res.data or []
54
+ all_rows.extend(batch)
55
+ if len(batch) < 1000:
56
+ break
57
+ page += 1
58
+
59
+ log.info("Fetched %d feedback rows total.", len(all_rows))
60
+ return all_rows
61
+
62
+
63
+ def build_training_pairs(rows: list[dict]) -> list[tuple[str, str, float]]:
64
+ """
65
+ Convert DB rows into (query_hash, chunk_hash, label) triples.
66
+
67
+ We use soft labels: was_selected=True -> label=1.0,
68
+ was_selected=False -> label=cohere_score (preserves ranking signal).
69
+ This is better than binary labels for CrossEncoder training.
70
+ """
71
+ pairs = []
72
+ for row in rows:
73
+ q = row["query_hash"]
74
+ c = row["chunk_hash"]
75
+ score = float(row["cohere_score"])
76
+ label = 1.0 if row["was_selected"] else score * SOFT_LABEL_SCALE
77
+ pairs.append((q, c, label))
78
+ return pairs
79
+
80
+
81
+ def train_cross_encoder(pairs: list[tuple[str, str, float]]) -> None:
82
+ """Fine-tune the CrossEncoder on the accumulated pairs."""
83
+ try:
84
+ from sentence_transformers import CrossEncoder
85
+ from sentence_transformers.cross_encoder.evaluation import CEBinaryClassificationEvaluator
86
+ from torch.utils.data import DataLoader
87
+ import torch
88
+ except ImportError:
89
+ log.error("sentence-transformers not installed. Run: pip install sentence-transformers")
90
+ return
91
+
92
+ from sentence_transformers import InputExample
93
+
94
+ log.info("Loading base CrossEncoder model...")
95
+ model = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2", num_labels=1)
96
+
97
+ # Build InputExamples β€” query_hash and chunk_hash are used as text proxies.
98
+ # In a full implementation you'd store/retrieve the raw text, but hashes
99
+ # are sufficient for label-based fine-tuning of the head layers.
100
+ # For better results: modify _log_rerank_feedback to store truncated text
101
+ # (first 200 chars) instead of hashes.
102
+ train_samples = [
103
+ InputExample(texts=[q, c], label=label)
104
+ for q, c, label in pairs
105
+ ]
106
+
107
+ # Split 90/10 train/eval
108
+ split = int(len(train_samples) * 0.9)
109
+ train_data = train_samples[:split]
110
+ eval_data = train_samples[split:]
111
+
112
+ train_dataloader = DataLoader(train_data, shuffle=True, batch_size=16)
113
+
114
+ log.info("Training on %d samples, eval on %d...", len(train_data), len(eval_data))
115
+ model.fit(
116
+ train_dataloader=train_dataloader,
117
+ epochs=2,
118
+ warmup_steps=max(1, len(train_dataloader) // 10),
119
+ output_path=DISTILLED_MODEL_PATH,
120
+ show_progress_bar=True,
121
+ save_best_model=True,
122
+ )
123
+ log.info("Distilled model saved to: %s", DISTILLED_MODEL_PATH)
124
+
125
+
126
+ def main():
127
+ rows = fetch_feedback_rows()
128
+
129
+ if len(rows) < MIN_ROWS_TO_TRAIN:
130
+ log.warning(
131
+ "Only %d rows β€” need at least %d to train. "
132
+ "Keep using the system; rerun when more data accumulates.",
133
+ len(rows), MIN_ROWS_TO_TRAIN
134
+ )
135
+ return
136
+
137
+ pairs = build_training_pairs(rows)
138
+ log.info("Built %d training pairs.", len(pairs))
139
+
140
+ train_cross_encoder(pairs)
141
+
142
+ # Log stats breakdown
143
+ selected = sum(1 for _, _, label in pairs if label == 1.0)
144
+ not_selected = len(pairs) - selected
145
+ log.info("Label breakdown: %d selected (1.0), %d not-selected (soft).", selected, not_selected)
146
+ log.info("Done. Load the distilled model in pipeline.py by pointing CrossEncoder to: %s", DISTILLED_MODEL_PATH)
147
+
148
+
149
+ if __name__ == "__main__":
150
+ main()
backend/core/pipeline.py CHANGED
@@ -226,11 +226,75 @@ def get_cached_embedding(text: str) -> list:
226
  return vector
227
 
228
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
229
  # =========================================================================== #
230
  # DYNAMIC TAXONOMY #
231
  # =========================================================================== #
232
 
233
 
 
234
  def get_existing_categories(access_token: str = None) -> List[str]:
235
  """Server-side DISTINCT via get_document_types() SQL function."""
236
  supabase = _build_supabase_client(access_token)
@@ -1542,6 +1606,15 @@ def retrieve_chunks(
1542
  retrieved = _apply_threshold_and_filter(ranked_with_scores, reranker="cohere")
1543
  log.info("Reranker: Cohere")
1544
 
 
 
 
 
 
 
 
 
 
1545
  # ── Path 2: Local CrossEncoder fallback ───────────────────────────────────
1546
  except Exception as cohere_exc:
1547
  log.warning("Cohere failed (%s) β€” trying local CrossEncoder.", cohere_exc)
@@ -1597,6 +1670,33 @@ def retrieve_chunks(
1597
  log.info("Reranker: lexical (Cohere + CrossEncoder both failed)")
1598
 
1599
  log.info("Final %d chunks.", len(retrieved))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1600
  if session_id and retrieved:
1601
  with _last_chunks_lock:
1602
  _last_chunks[session_key] = retrieved
 
226
  return vector
227
 
228
 
229
+ # =========================================================================== #
230
+ # RERANK FEEDBACK LOGGER #
231
+ # Fire-and-forget background thread β€” adds zero latency to query path. #
232
+ # Accumulates Cohere scores -> used to distil local CrossEncoder over time. #
233
+ # Schema: supabase/migrations/0003_rerank_feedback.sql #
234
+ # =========================================================================== #
235
+
236
+ def _log_rerank_feedback(
237
+ query: str,
238
+ all_candidates: list,
239
+ ranked_with_scores: list,
240
+ selected_docs: list,
241
+ user_id: str = None,
242
+ ) -> None:
243
+ """
244
+ Write rerank results to rerank_feedback table via a daemon thread.
245
+ Completely non-blocking -- exceptions are swallowed so query never fails.
246
+ """
247
+ def _write():
248
+ try:
249
+ sb = _build_service_supabase_client()
250
+ q_hash = hashlib.md5(query.encode()).hexdigest()
251
+
252
+ # Build set of content hashes for the selected docs
253
+ selected_hashes: set = {
254
+ hashlib.md5(doc.page_content.encode()).hexdigest()
255
+ for doc in (selected_docs or [])
256
+ }
257
+
258
+ rows = []
259
+ for idx, score in ranked_with_scores:
260
+ if idx >= len(all_candidates):
261
+ continue
262
+ chunk = all_candidates[idx]
263
+ content = chunk.get("content") or ""
264
+ c_hash = hashlib.md5(content.encode()).hexdigest()
265
+ doc_type = chunk.get("metadata", {}).get("document_type")
266
+ chunk_id_raw = chunk.get("id")
267
+ try:
268
+ chunk_uuid = str(uuid.UUID(str(chunk_id_raw))) if chunk_id_raw else None
269
+ except Exception:
270
+ chunk_uuid = None
271
+
272
+ rows.append({
273
+ "user_id": user_id,
274
+ "query_hash": q_hash,
275
+ "chunk_id": chunk_uuid,
276
+ "chunk_hash": c_hash,
277
+ "document_type": doc_type,
278
+ "cohere_score": float(score),
279
+ "was_selected": c_hash in selected_hashes,
280
+ })
281
+
282
+ if rows:
283
+ for start in range(0, len(rows), 50):
284
+ sb.table("rerank_feedback").insert(rows[start:start + 50]).execute()
285
+ log.debug("Logged %d rerank feedback rows.", len(rows))
286
+ except Exception as exc:
287
+ log.debug("rerank_feedback logging skipped: %s", exc)
288
+
289
+ threading.Thread(target=_write, daemon=True).start()
290
+
291
+
292
  # =========================================================================== #
293
  # DYNAMIC TAXONOMY #
294
  # =========================================================================== #
295
 
296
 
297
+
298
  def get_existing_categories(access_token: str = None) -> List[str]:
299
  """Server-side DISTINCT via get_document_types() SQL function."""
300
  supabase = _build_supabase_client(access_token)
 
1606
  retrieved = _apply_threshold_and_filter(ranked_with_scores, reranker="cohere")
1607
  log.info("Reranker: Cohere")
1608
 
1609
+ # Fire-and-forget: log all Cohere scores for future CrossEncoder distillation
1610
+ _log_rerank_feedback(
1611
+ query=query,
1612
+ all_candidates=all_candidates,
1613
+ ranked_with_scores=ranked_with_scores,
1614
+ selected_docs=retrieved,
1615
+ user_id=user_id,
1616
+ )
1617
+
1618
  # ── Path 2: Local CrossEncoder fallback ───────────────────────────────────
1619
  except Exception as cohere_exc:
1620
  log.warning("Cohere failed (%s) β€” trying local CrossEncoder.", cohere_exc)
 
1670
  log.info("Reranker: lexical (Cohere + CrossEncoder both failed)")
1671
 
1672
  log.info("Final %d chunks.", len(retrieved))
1673
+
1674
+ # ── Token budget enforcement ──────────────────────────────────────────────
1675
+ # Trim chunks that would push the LLM context over MAX_CONTEXT_CHARS.
1676
+ # Highest-ranked chunks are always kept β€” only tail overflow is dropped.
1677
+ if retrieved:
1678
+ budgeted: List[Document] = []
1679
+ total_chars = 0
1680
+ for doc in retrieved:
1681
+ chars = len(doc.page_content)
1682
+ if total_chars + chars > config.MAX_CONTEXT_CHARS:
1683
+ log.info(
1684
+ "Context budget (%d chars) hit at chunk %d/%d β€” dropping %d remaining.",
1685
+ config.MAX_CONTEXT_CHARS,
1686
+ len(budgeted),
1687
+ len(retrieved),
1688
+ len(retrieved) - len(budgeted),
1689
+ )
1690
+ break
1691
+ budgeted.append(doc)
1692
+ total_chars += chars
1693
+ if budgeted:
1694
+ log.info(
1695
+ "Context budget: %d chars across %d/%d chunks.",
1696
+ total_chars, len(budgeted), len(retrieved),
1697
+ )
1698
+ retrieved = budgeted
1699
+
1700
  if session_id and retrieved:
1701
  with _last_chunks_lock:
1702
  _last_chunks[session_key] = retrieved
backend/main.py CHANGED
@@ -6,6 +6,20 @@ Production: gunicorn -w 1 -k uvicorn.workers.UvicornWorker backend.main:app --b
6
  """
7
  import os
8
  import sys
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
  import logging
10
  import subprocess
11
  from contextlib import asynccontextmanager
@@ -62,6 +76,10 @@ app = FastAPI(
62
  redoc_url = "/redoc" if os.getenv("DOCS_ENABLED", "true").lower() == "true" else None,
63
  )
64
 
 
 
 
 
65
  _origins = [o.strip() for o in os.getenv("ALLOWED_ORIGINS", "*").split(",") if o.strip()]
66
  app.add_middleware(CORSMiddleware, allow_origins=_origins,
67
  allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
 
6
  """
7
  import os
8
  import sys
9
+ from slowapi import Limiter, _rate_limit_exceeded_handler
10
+ from slowapi.util import get_remote_address
11
+ from slowapi.errors import RateLimitExceeded
12
+ from starlette.requests import Request
13
+ from starlette.responses import JSONResponse
14
+
15
+
16
+ def _rate_limit_key(request: Request) -> str:
17
+ """Key rate limits by JWT token (per-user), fall back to IP."""
18
+ token = request.headers.get("X-Auth-Token") or request.headers.get("Authorization")
19
+ return token or get_remote_address(request)
20
+
21
+
22
+ limiter = Limiter(key_func=_rate_limit_key)
23
  import logging
24
  import subprocess
25
  from contextlib import asynccontextmanager
 
76
  redoc_url = "/redoc" if os.getenv("DOCS_ENABLED", "true").lower() == "true" else None,
77
  )
78
 
79
+ # ── Rate limiting ─────────────────────────────────────────────────────────────
80
+ app.state.limiter = limiter
81
+ app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
82
+
83
  _origins = [o.strip() for o in os.getenv("ALLOWED_ORIGINS", "*").split(",") if o.strip()]
84
  app.add_middleware(CORSMiddleware, allow_origins=_origins,
85
  allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
requirements.txt CHANGED
@@ -14,4 +14,6 @@ numpy==1.26.4
14
  unstructured[paddlepaddle]
15
  paddleocr==2.7.3
16
  paddlepaddle==2.6.2
17
- pymupdf==1.27.2
 
 
 
14
  unstructured[paddlepaddle]
15
  paddleocr==2.7.3
16
  paddlepaddle==2.6.2
17
+ pymupdf==1.27.2
18
+ slowapi
19
+ limits
supabase/migrations/0003_rerank_feedback.sql ADDED
@@ -0,0 +1,39 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ -- Rerank feedback table for CrossEncoder distillation loop.
2
+ -- Every Cohere rerank result is logged here.
3
+ -- Accumulate ~500+ rows, then fine-tune the local CrossEncoder on this data
4
+ -- to reduce Cohere dependency over time.
5
+ --
6
+ -- Pipeline: Cohere rerank β†’ log all scores β†’ was_selected = true if chunk
7
+ -- passed the RELEVANCE_THRESHOLD and made it into the final retrieved set.
8
+
9
+ CREATE TABLE IF NOT EXISTS public.rerank_feedback (
10
+ id bigserial PRIMARY KEY,
11
+ user_id uuid,
12
+ query_hash text NOT NULL, -- MD5 of query (no PII storage)
13
+ chunk_id uuid, -- references documents.id when available
14
+ chunk_hash text NOT NULL, -- MD5 of chunk content (dedup key)
15
+ document_type text, -- category of the chunk's document
16
+ cohere_score real NOT NULL,
17
+ was_selected boolean NOT NULL, -- true = passed threshold, went to LLM
18
+ created_at timestamptz NOT NULL DEFAULT now()
19
+ );
20
+
21
+ -- For distillation queries: fetch all rows for a user, ordered by time
22
+ CREATE INDEX IF NOT EXISTS rerank_feedback_user_created_idx
23
+ ON public.rerank_feedback (user_id, created_at DESC);
24
+
25
+ -- For analytics: filter by document_type to see per-category rerank quality
26
+ CREATE INDEX IF NOT EXISTS rerank_feedback_doc_type_idx
27
+ ON public.rerank_feedback (document_type);
28
+
29
+ -- RLS: service role writes (backend), users can read their own rows
30
+ ALTER TABLE public.rerank_feedback ENABLE ROW LEVEL SECURITY;
31
+
32
+ DROP POLICY IF EXISTS rerank_feedback_select_own ON public.rerank_feedback;
33
+ CREATE POLICY rerank_feedback_select_own
34
+ ON public.rerank_feedback
35
+ FOR SELECT
36
+ USING (user_id = auth.uid());
37
+
38
+ -- Backend writes via service role β€” no INSERT policy needed for anon/user role
39
+ -- The pipeline uses _build_service_supabase_client() for this table.