| """LREC 2026 LLM-as-Annotator — FastAPI backend. |
| |
| Corpus-centered SPA: the client (static/index.html) is the entire UI. This |
| file exposes a small REST API and a tiny in-memory session store. State is |
| ephemeral and per-process; perfect for a single-user demo or HF Space. |
| """ |
| from __future__ import annotations |
| from copy import deepcopy |
|
|
| import asyncio |
| import os |
| from typing import Any, Optional |
|
|
| from fastapi import FastAPI, HTTPException, Header |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import FileResponse, JSONResponse, PlainTextResponse |
| from fastapi.staticfiles import StaticFiles |
| from pydantic import BaseModel |
|
|
| from paths import APP_DIR, TUTORIAL_HANDOUTS_DIR, read_text |
| from schemas import ( |
| AnnotationSchema, |
| from_preset, list_presets, to_json_schema, |
| validate as schema_validate, schema_from_dict, AGGREGATORS, |
| ) |
| from prompts import ( |
| DEFAULT_SYSTEM_PROMPT, DEFAULT_FEW_SHOT, |
| ICLPool, ICLExample, render_prompt, |
| ) |
| from io_utils import ( |
| tokenize, align_or_warn, |
| export_tsv, export_conllu, export_jsonl_finetune, |
| ) |
| from moe import aggregate |
| from provider import ( |
| LLMClient, |
| PROVIDERS, |
| BASE_URLS, |
| CURATED_MODELS_BY_PROVIDER, |
| test_connection_sync |
| ) |
| from tutorial import EXERCISES, prefill |
|
|
| STATIC_DIR = APP_DIR / "static" |
|
|
|
|
| |
| |
| |
|
|
| def _default_schema() -> AnnotationSchema: |
| return from_preset("ud_upos_morph") |
|
|
|
|
| ENV_API_KEYS = { |
| "openrouter": os.environ.get("OPENROUTER_API_KEY", ""), |
| "mistral": os.environ.get("MISTRAL_API_KEY", ""), |
| "openai": os.environ.get("OPENAI_API_KEY", ""), |
| "ilaas": os.environ.get("ILAAS_API_KEY", ""), |
| } |
|
|
|
|
| def _resolve_key(provider: str, header_key: Optional[str]) -> str: |
| """Prefer the per-request header, fall back to provider-specific env key.""" |
| return (header_key or "").strip() or ENV_API_KEYS.get(provider, "") |
|
|
|
|
| SESSION: dict[str, Any] = { |
| "schema": _default_schema().to_dict(), |
| "language": "", |
| "system_prompt": DEFAULT_SYSTEM_PROMPT, |
| "user_template": DEFAULT_FEW_SHOT, |
| "provider": "openrouter", |
| "models": list(CURATED_MODELS_BY_PROVIDER["openrouter"][:1]), |
| "priority": [], |
| "temperature": 0.0, |
| "n_icl": 5, |
| "icl_pool": ICLPool(), |
| "sentences": [], |
| "rendered_user_cache": "", |
| } |
|
|
|
|
| def _new_sentence(idx: int, surface_tokens: list[str], *, sentence_id: str = "", language: str = "") -> dict: |
| return { |
| "idx": idx, |
| "id": sentence_id or f"s{idx + 1}", |
| "language": language, |
| "tokens": [{"surface": s} for s in surface_tokens], |
| "per_model": {}, |
| "disagreements": [], |
| "status": "pending", |
| "error": "", |
| "n_disagreements": 0, |
| "validated": False, |
| } |
|
|
|
|
| def _public_state() -> dict: |
| """The full session view returned to the client. No API key ever leaves the server.""" |
| sess = SESSION |
| pool: ICLPool = sess["icl_pool"] |
| schema = schema_from_dict(sess["schema"]) |
| return { |
| "schema": sess["schema"], |
| "schema_hash": schema.hash(), |
| "json_schema": to_json_schema(schema), |
| "language": sess["language"], |
| "system_prompt": sess["system_prompt"], |
| "user_template": sess["user_template"], |
| "has_env_key": bool(ENV_API_KEYS.get(sess["provider"], "")), |
| "models": sess["models"], |
| "priority": sess["priority"], |
| "temperature": sess["temperature"], |
| "n_icl": sess["n_icl"], |
| "icl_pool": { |
| "version": pool.version, |
| "size": len(pool.entries), |
| "entries": [ |
| { |
| "idx": i, |
| "language": e.language, |
| "schema_hash": e.schema_hash, |
| "source": e.source, |
| "preview": " ".join(e.tokens[:8]) + ("…" if len(e.tokens) > 8 else ""), |
| } |
| for i, e in enumerate(pool.entries) |
| ], |
| }, |
| "sentences": sess["sentences"], |
| "presets": [{"key": k, "label": label} for k, label in list_presets()], |
| "provider": sess["provider"], |
| "providers": list(PROVIDERS), |
| "curated_models": CURATED_MODELS_BY_PROVIDER.get(sess["provider"], []), |
| "curated_models_by_provider": CURATED_MODELS_BY_PROVIDER, |
| "aggregators": AGGREGATORS, |
| "exercises": [ |
| {"idx": i, "title": ex.title, "summary": ex.summary, "language": ex.language_code, "models": ex.models} |
| for i, ex in enumerate(EXERCISES) |
| ], |
| } |
|
|
|
|
| |
| |
| |
|
|
| class TaskPresetReq(BaseModel): |
| key: str |
|
|
|
|
| class TaskSchemaReq(BaseModel): |
| annotation_schema: dict |
|
|
|
|
| class LoadPasteReq(BaseModel): |
| text: str |
| tokenizer: str = "whitespace" |
| language: str = "" |
| split_per_line: bool = True |
|
|
|
|
| class LoadExerciseReq(BaseModel): |
| idx: int |
|
|
|
|
| class SettingsReq(BaseModel): |
| provider: Optional[str] = None |
| models: Optional[list[str]] = None |
| priority: Optional[list[str]] = None |
| temperature: Optional[float] = None |
| n_icl: Optional[int] = None |
| system_prompt: Optional[str] = None |
| user_template: Optional[str] = None |
| language: Optional[str] = None |
|
|
|
|
| class TokenUpdateReq(BaseModel): |
| token: dict |
|
|
|
|
| class AnnotateReq(BaseModel): |
| sentence_idxs: Optional[list[int]] = None |
|
|
|
|
| class TestKeyReq(BaseModel): |
| api_key: str |
| provider: Optional[str] = "openrouter" |
| model: Optional[str] = None |
|
|
|
|
| |
| |
| |
|
|
| app = FastAPI(title="LREC 2026 — LLM-as-Annotator") |
| app.add_middleware( |
| CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], |
| ) |
|
|
|
|
| @app.get("/") |
| def index(): |
| return FileResponse(STATIC_DIR / "index.html") |
|
|
|
|
| @app.get("/api/state") |
| def get_state(): |
| return _public_state() |
|
|
|
|
| @app.get("/api/cheatsheet", response_class=PlainTextResponse) |
| def cheatsheet(): |
| try: |
| return read_text(TUTORIAL_HANDOUTS_DIR / "participant_cheatsheet.md") |
| except Exception: |
| return "(cheatsheet not found)" |
|
|
|
|
| |
|
|
| @app.post("/api/task/preset") |
| def set_task_preset(req: TaskPresetReq): |
| schema = from_preset(req.key) |
| SESSION["schema"] = schema.to_dict() |
| return _public_state() |
|
|
|
|
| @app.post("/api/task/schema") |
| def set_task_schema(req: TaskSchemaReq): |
| try: |
| schema = schema_from_dict(req.annotation_schema) |
| to_json_schema(schema) |
| except Exception as e: |
| raise HTTPException(400, f"Invalid schema: {e}") |
| SESSION["schema"] = schema.to_dict() |
| return _public_state() |
|
|
|
|
| |
|
|
| @app.post("/api/settings") |
| def set_settings(req: SettingsReq): |
| if req.provider is not None: |
| if req.provider not in PROVIDERS: |
| raise HTTPException(400, f"Unknown provider {req.provider!r}; expected one of {list(PROVIDERS)}") |
| if req.provider != SESSION["provider"]: |
| SESSION["provider"] = req.provider |
| |
| curated = CURATED_MODELS_BY_PROVIDER.get(req.provider) or [] |
| SESSION["models"] = list(curated[:1]) |
| if req.models is not None: |
| models = list(req.models) |
| |
| if SESSION["provider"] != "openrouter" and len(models) > 1: |
| models = models[:1] |
| SESSION["models"] = models |
| if req.priority is not None: |
| SESSION["priority"] = list(req.priority) |
| if req.temperature is not None: |
| SESSION["temperature"] = float(req.temperature) |
| if req.n_icl is not None: |
| SESSION["n_icl"] = int(req.n_icl) |
| if req.system_prompt is not None: |
| SESSION["system_prompt"] = req.system_prompt |
| if req.user_template is not None: |
| SESSION["user_template"] = req.user_template |
| if req.language is not None: |
| SESSION["language"] = req.language |
| return _public_state() |
|
|
|
|
| @app.post("/api/settings/test_key") |
| def test_key(req: TestKeyReq): |
| provider = req.provider or "openrouter" |
| ok, msg = test_connection_sync(req.api_key, provider=provider, model=req.model) |
| return {"ok": ok, "message": msg} |
|
|
|
|
| |
|
|
| @app.post("/api/corpus/paste") |
| def load_paste(req: LoadPasteReq): |
| text = req.text or "" |
| sentences = [] |
| if req.split_per_line: |
| lines = [ln for ln in text.splitlines() if ln.strip()] |
| else: |
| lines = [text] |
| for i, line in enumerate(lines): |
| toks = tokenize(line, strategy=req.tokenizer) |
| if toks: |
| sentences.append(_new_sentence(i, toks, language=req.language)) |
| SESSION["sentences"] = sentences |
| if req.language: |
| SESSION["language"] = req.language |
| return _public_state() |
|
|
|
|
| def _default_models_for_provider(provider: str) -> list[str]: |
| """Helper to get the default model(s) for a provider, used when switching providers.""" |
| curated = CURATED_MODELS_BY_PROVIDER.get(provider) or [] |
| return list(curated[:1]) |
|
|
|
|
| @app.post("/api/corpus/exercise") |
| def load_exercise(req: LoadExerciseReq): |
| if req.idx < 0 or req.idx >= len(EXERCISES): |
| raise HTTPException(404, "Unknown exercise idx") |
| data = prefill(req.idx) |
| schema = from_preset(data["preset_key"]) |
| SESSION["schema"] = schema.to_dict() |
| SESSION["language"] = data["language_name"] |
| SESSION["user_template"] = data["user_template"] |
| SESSION["system_prompt"] = data["system_prompt"] |
| |
| if SESSION["provider"] == "openrouter": |
| SESSION["models"] = list(data["models"]) |
| else: |
| SESSION["models"] = _default_models_for_provider(SESSION["provider"]) |
| |
| pool = ICLPool() |
| for ex in data["icl_examples"]: |
| pool.add(ex) |
| SESSION["icl_pool"] = pool |
| |
| from io_utils import read_sandbox_tsv, sandbox_sentence |
| from paths import corpus_file |
| rows = read_sandbox_tsv(corpus_file(data["language_code"], "train"), max_rows=2000) |
| sentences = [] |
| sentences.append(_new_sentence(0, data["tokens"], language=data["language_name"])) |
| for k in range(1, 3): |
| offset = k * (len(data["tokens"]) + 5) + 500 |
| s2, _gold = sandbox_sentence(rows, offset, len(data["tokens"])) |
| if s2: |
| sentences.append(_new_sentence(len(sentences), s2, language=data["language_name"])) |
| SESSION["sentences"] = sentences |
| return _public_state() |
|
|
|
|
| @app.post("/api/corpus/clear") |
| def clear_corpus(): |
| SESSION["sentences"] = [] |
| return _public_state() |
|
|
|
|
| @app.post("/api/reset") |
| def reset_all(): |
| """Wipe everything except the API key (which lives client-side).""" |
| SESSION["schema"] = _default_schema().to_dict() |
| SESSION["language"] = "" |
| SESSION["provider"] = "openrouter" |
| SESSION["models"] = _default_models_for_provider("openrouter") |
| SESSION["priority"] = [] |
| SESSION["temperature"] = 0.0 |
| SESSION["n_icl"] = 5 |
| SESSION["icl_pool"] = ICLPool() |
| SESSION["sentences"] = [] |
| SESSION["system_prompt"] = DEFAULT_SYSTEM_PROMPT |
| SESSION["user_template"] = DEFAULT_FEW_SHOT |
| SESSION["rendered_user_cache"] = "" |
| return _public_state() |
|
|
|
|
| |
| def _add_or_update_sentence_in_icl(idx: int) -> str: |
| sents = SESSION["sentences"] |
| if idx < 0 or idx >= len(sents): |
| raise HTTPException(404, "Bad sentence idx") |
| sent = sents[idx] |
| schema_obj = schema_from_dict(SESSION["schema"]) |
| pool: ICLPool = SESSION["icl_pool"] |
| tokens_snapshot = deepcopy(sent["tokens"]) |
| ann = { |
| "sentence_id": sent["id"], |
| "language": sent["language"] or SESSION["language"], |
| "tokens": tokens_snapshot, |
| } |
|
|
| result = pool.add(ICLExample( |
| language=sent["language"] or SESSION["language"] or "", |
| schema_hash=schema_obj.hash(), |
| tokens=[t["surface"] for t in tokens_snapshot], |
| gold_annotation=ann, |
| source="corrected", |
| )) |
|
|
| sent["validated"] = True |
| return result |
|
|
|
|
| @app.post("/api/sentence/{idx}/token/{tidx}") |
| def update_token(idx: int, tidx: int, req: TokenUpdateReq): |
| sents = SESSION["sentences"] |
|
|
| if idx < 0 or idx >= len(sents): |
| raise HTTPException(404, "Bad sentence idx") |
| if tidx < 0 or tidx >= len(sents[idx]["tokens"]): |
| raise HTTPException(404, "Bad token idx") |
|
|
| sent = sents[idx] |
| was_validated = bool(sent.get("validated")) |
|
|
| surface = sent["tokens"][tidx]["surface"] |
| new_tok = {**req.token, "surface": surface} |
| sent["tokens"][tidx] = new_tok |
|
|
| sent["disagreements"] = [ |
| d for d in sent["disagreements"] |
| if d["token_idx"] != tidx |
| ] |
| sent["n_disagreements"] = len(sent["disagreements"]) |
|
|
| icl_result = None |
|
|
| |
| if was_validated: |
| icl_result = _add_or_update_sentence_in_icl(idx) |
|
|
| state = _public_state() |
| state["updated_sentence_idx"] = idx |
| state["icl_add_result"] = icl_result |
| state["icl_duplicate"] = icl_result == "unchanged" |
| state["icl_updated"] = icl_result == "updated" |
| state["icl_inserted"] = icl_result == "inserted" |
|
|
| return state |
|
|
|
|
| @app.post("/api/bulk_similar") |
| def bulk_similar(payload: dict): |
| """Propagate field updates to every token with the same `surface` across the corpus. |
| |
| payload = { |
| "surface": "τῆς", |
| "updates": {"pos": "DET", "lemma": "ὁ"}, |
| "exclude": [{"s": sidx, "t": tidx}, ...] # optional, e.g. the source token |
| } |
| Returns: |
| { |
| "affected": [{"s": sidx, "t": tidx}, ...], |
| "sentences": [{"idx": sidx, "sentence": {...}}, ...] |
| } |
| """ |
| surface = payload.get("surface") |
| updates = payload.get("updates") or {} |
| if not surface or not updates: |
| raise HTTPException(400, "Missing 'surface' or 'updates'.") |
| exclude = {(int(d["s"]), int(d["t"])) for d in payload.get("exclude", [])} |
| affected = [] |
| for sidx, sent in enumerate(SESSION["sentences"]): |
| for tidx, tok in enumerate(sent["tokens"]): |
| if (sidx, tidx) in exclude: |
| continue |
| if tok.get("surface") != surface: |
| continue |
| for k, v in updates.items(): |
| if k in ("surface",) or k.startswith("_"): |
| continue |
| tok[k] = v |
| tok["_corrected"] = True |
| |
| sent["disagreements"] = [ |
| d for d in sent["disagreements"] |
| if not (d["token_idx"] == tidx and d["field_path"] in updates) |
| ] |
| sent["n_disagreements"] = len(sent["disagreements"]) |
| affected.append({"s": sidx, "t": tidx}) |
| return { |
| "affected": affected, |
| "sentences": [ |
| {"idx": i, "sentence": SESSION["sentences"][i]} |
| for i in sorted({a["s"] for a in affected}) |
| ], |
| } |
|
|
|
|
| @app.post("/api/sentence/{idx}/bulk") |
| def bulk_update(idx: int, payload: dict): |
| """payload = {"token_idxs": [..], "field": "pos", "value": "..."}""" |
| sents = SESSION["sentences"] |
| if idx < 0 or idx >= len(sents): |
| raise HTTPException(404, "Bad sentence idx") |
| idxs = payload.get("token_idxs", []) |
| field = payload.get("field") |
| value = payload.get("value") |
| if not field: |
| raise HTTPException(400, "Missing 'field'") |
| for ti in idxs: |
| if 0 <= ti < len(sents[idx]["tokens"]): |
| sents[idx]["tokens"][ti][field] = value |
| sents[idx]["disagreements"] = [d for d in sents[idx]["disagreements"] if |
| not (d["token_idx"] == ti and d["field_path"] == field)] |
| sents[idx]["n_disagreements"] = len(sents[idx]["disagreements"]) |
| return sents[idx] |
|
|
|
|
| |
| @app.post("/api/sentence/{idx}/add_to_icl") |
| def add_sentence_to_icl(idx: int): |
| result = _add_or_update_sentence_in_icl(idx) |
|
|
| state = _public_state() |
| state["icl_add_result"] = result |
| state["icl_duplicate"] = result == "unchanged" |
| state["icl_updated"] = result == "updated" |
| state["icl_inserted"] = result == "inserted" |
| return state |
|
|
|
|
| @app.post("/api/sentence/{idx}/sent_score") |
| def set_validated(idx: int, payload: dict): |
| """payload = {value: bool}. Toggles the user-validation flag on a sentence.""" |
| sents = SESSION["sentences"] |
| if idx < 0 or idx >= len(sents): |
| raise HTTPException(404, "Bad sentence idx") |
| sents[idx]["validated"] = bool(payload.get("value", True)) |
| return sents[idx] |
|
|
|
|
| @app.post("/api/icl/clear") |
| def clear_icl(): |
| SESSION["icl_pool"] = ICLPool() |
| return _public_state() |
|
|
|
|
| @app.get("/api/icl/download") |
| def icl_download(): |
| pool: ICLPool = SESSION["icl_pool"] |
| return PlainTextResponse(pool.to_jsonl() + ("\n" if pool.entries else ""), media_type="application/jsonl") |
|
|
|
|
| |
|
|
| async def _annotate_sentence(sent: dict, client: LLMClient, |
| schema: AnnotationSchema, sys_prompt: str, |
| user_template: str, language: str, |
| pool: ICLPool, n_icl: int, temperature: float, |
| priority: list[str], models: list[str]) -> dict: |
| tokens = [t["surface"] for t in sent["tokens"]] |
| examples = pool.sample( |
| n=int(n_icl), schema_hash=schema.hash(), |
| strategy="most_recent_corrections", |
| ) |
| rendered_user = render_prompt( |
| user_template, schema=schema, tokens=tokens, |
| language=language or sent["language"], sentence_id=sent["id"], |
| few_shot_examples=examples, |
| ) |
| SESSION["rendered_user_cache"] = rendered_user |
| sent["status"] = "annotating" |
| sent["validated"] = False |
| results = await client.annotate_many( |
| models=models, system=sys_prompt, user=rendered_user, |
| schema=schema, temperature=float(temperature), |
| ) |
| per_model = {} |
| errors = [] |
| warnings: list[str] = [] |
| for r in results: |
| if r.ok and r.annotation: |
| output_surfaces = [t.get("surface", "") for t in r.annotation.get("tokens", [])] |
| status, msgs = align_or_warn(tokens, output_surfaces) |
| if status == "length_mismatch": |
| errors.append(f"{r.model}: {msgs[0]}") |
| continue |
| if status == "drift": |
| |
| for j, t in enumerate(r.annotation["tokens"]): |
| t["surface"] = tokens[j] |
| warnings.append(f"{r.model}: surface drift on {len(msgs)} token(s) — repaired") |
| per_model[r.model] = r.annotation |
| else: |
| errors.append(f"{r.model}: {(r.error or 'unknown')[:200]}") |
| if not per_model: |
| sent["status"] = "error" |
| sent["error"] = " | ".join(errors) |
| return sent |
| consensus, disagreements = aggregate(per_model, schema, priority=priority or list(per_model.keys())) |
| dis_dicts = [d.to_dict() for d in disagreements] |
| sent["tokens"] = consensus["tokens"] |
| sent["per_model"] = per_model |
| sent["disagreements"] = dis_dicts |
| sent["n_disagreements"] = len(dis_dicts) |
| sent["status"] = "done" |
| parts = [] |
| if errors: |
| parts.append(" | ".join(errors)) |
| if warnings: |
| parts.append("warnings: " + " | ".join(warnings)) |
| sent["error"] = " · ".join(parts) |
| return sent |
|
|
|
|
| @app.post("/api/annotate") |
| async def annotate( |
| req: AnnotateReq, |
| x_api_key: Optional[str] = Header(default=None), |
| x_openrouter_key: Optional[str] = Header(default=None), |
| x_llm_provider: Optional[str] = Header(default=None), |
| ): |
| sess = SESSION |
| provider = (x_llm_provider or sess["provider"]).strip() |
| if provider not in PROVIDERS: |
| raise HTTPException(400, f"Unknown provider {provider!r}") |
| api_key = _resolve_key(provider, x_api_key or x_openrouter_key) |
| if not api_key: |
| raise HTTPException(400, f"Set your {provider} API key first.") |
| if not sess["models"]: |
| raise HTTPException(400, "Select at least one model.") |
| if provider != "openrouter" and len(sess["models"]) > 1: |
| raise HTTPException(400, |
| f"MoE (multiple models) is only supported on OpenRouter. Pick one model for {provider}.") |
| schema_obj = schema_from_dict(sess["schema"]) |
| if provider != "openrouter": |
| allowed = set(CURATED_MODELS_BY_PROVIDER.get(provider) or []) |
| unknown = [m for m in sess["models"] if m not in allowed] |
| if unknown: |
| raise HTTPException( |
| 400, |
| f"Model(s) not available for provider {provider}: {unknown}. " |
| f"Pick one of: {sorted(allowed)}" |
| ) |
| async with LLMClient(provider=provider, api_key=api_key) as client: |
| pool: ICLPool = sess["icl_pool"] |
| sents = sess["sentences"] |
| target_idxs = req.sentence_idxs if req.sentence_idxs is not None else list(range(len(sents))) |
| coros = [] |
| for i in target_idxs: |
| if 0 <= i < len(sents): |
| sents[i]["status"] = "annotating" |
| coros.append(_annotate_sentence( |
| sents[i], client, schema_obj, sess["system_prompt"], sess["user_template"], |
| sess["language"], pool, sess["n_icl"], sess["temperature"], |
| sess["priority"], sess["models"], |
| )) |
| await asyncio.gather(*coros) |
| return _public_state() |
|
|
|
|
| @app.post("/api/annotate/token") |
| async def annotate_one_token( |
| payload: dict, |
| x_api_key: Optional[str] = Header(default=None), |
| x_openrouter_key: Optional[str] = Header(default=None), |
| x_llm_provider: Optional[str] = Header(default=None), |
| ): |
| """Re-ask a specific model for a specific token. payload = {sent: int, tok: int, model: str}""" |
| sess = SESSION |
| provider = (x_llm_provider or sess["provider"]).strip() |
|
|
| if provider not in PROVIDERS: |
| raise HTTPException(400, f"Unknown provider {provider!r}") |
| api_key = _resolve_key(provider, x_api_key or x_openrouter_key) |
| if not api_key: |
| raise HTTPException(400, f"Set your {provider} API key first.") |
| idx = int(payload["sent"]) |
| tidx = int(payload["tok"]) |
| model = str(payload["model"]) |
| if idx < 0 or idx >= len(sess["sentences"]): |
| raise HTTPException(404, "Bad sentence idx") |
| sent = sess["sentences"][idx] |
| if tidx < 0 or tidx >= len(sent["tokens"]): |
| raise HTTPException(404, "Bad token idx") |
| schema = schema_from_dict(sess["schema"]) |
| tokens = [t["surface"] for t in sent["tokens"]] |
| pool: ICLPool = sess["icl_pool"] |
| examples = pool.sample(n=int(sess["n_icl"]), schema_hash=schema.hash(), strategy="most_recent_corrections") |
| rendered_user = render_prompt( |
| sess["user_template"], schema=schema, tokens=tokens, |
| language=sess["language"] or sent["language"], sentence_id=sent["id"], |
| few_shot_examples=examples, |
| ) + f"\n\nFocus especially on token index {tidx} (surface={tokens[tidx]!r}). Return JSON for all tokens; preserve the order." |
| if provider != "openrouter": |
| allowed = set(CURATED_MODELS_BY_PROVIDER.get(provider) or []) |
| if model not in allowed: |
| raise HTTPException( |
| 400, |
| f"Model {model!r} is not available for provider {provider}. " |
| f"Pick one of: {sorted(allowed)}" |
| ) |
| async with LLMClient(provider=provider, api_key=api_key) as client: |
| result = await client.annotate_one( |
| system=sess["system_prompt"], |
| user=rendered_user, |
| schema=schema, |
| model=model, |
| temperature=float(sess["temperature"]), |
| ) |
| if not result.ok or not result.annotation: |
| raise HTTPException(502, f"{model} failed: {result.error}") |
| if tidx >= len(result.annotation.get("tokens", [])): |
| raise HTTPException(502, f"{model} returned too few tokens.") |
| |
| new_tok = result.annotation["tokens"][tidx] |
| new_tok["surface"] = tokens[tidx] |
| sent["tokens"][tidx] = new_tok |
| sent["disagreements"] = [d for d in sent["disagreements"] if d["token_idx"] != tidx] |
| sent["n_disagreements"] = len(sent["disagreements"]) |
| return sent |
|
|
|
|
| |
|
|
| def _all_annotations() -> list[dict]: |
| out = [] |
| for s in SESSION["sentences"]: |
| out.append({ |
| "sentence_id": s["id"], |
| "language": s["language"] or SESSION["language"], |
| "tokens": s["tokens"], |
| }) |
| return out |
|
|
|
|
| @app.get("/api/export/{fmt}") |
| def export(fmt: str): |
| schema = schema_from_dict(SESSION["schema"]) |
| anns = _all_annotations() |
| fmt = fmt.lower() |
| if fmt == "tsv": |
| parts = [export_tsv(a, schema) for a in anns] |
| return PlainTextResponse("\n".join(parts), media_type="text/tab-separated-values", |
| headers={"Content-Disposition": "attachment; filename=annotation.tsv"}) |
| if fmt == "json": |
| return JSONResponse(anns, headers={"Content-Disposition": "attachment; filename=annotation.json"}) |
| if fmt == "conllu": |
| body = "".join(export_conllu(a, schema) for a in anns) |
| return PlainTextResponse(body, media_type="text/plain", |
| headers={"Content-Disposition": "attachment; filename=annotation.conllu"}) |
| if fmt == "jsonl": |
| rendered_user = SESSION.get("rendered_user_cache", "") |
| body = "".join(export_jsonl_finetune(a, SESSION["system_prompt"], rendered_user) for a in anns) |
| return PlainTextResponse(body, media_type="application/jsonl", |
| headers={"Content-Disposition": "attachment; filename=annotation.jsonl"}) |
| raise HTTPException(400, f"Unknown format: {fmt}") |
|
|
|
|
| |
|
|
| @app.post("/api/sentence/{idx}/validate") |
| def validate_sentence(idx: int): |
| schema = schema_from_dict(SESSION["schema"]) |
| sent = SESSION["sentences"][idx] |
| ann = { |
| "sentence_id": sent["id"], |
| "language": sent["language"] or SESSION["language"], |
| "tokens": sent["tokens"], |
| } |
| ok, errs = schema_validate(schema, ann) |
| return {"ok": ok, "errors": errs} |
|
|
|
|
| |
| |
| |
|
|
| app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static") |
|
|
| if __name__ == "__main__": |
| import uvicorn |
|
|
| uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False) |
|
|