briefing-32 / rank.py
mukunda1729's picture
Upload 9 files
9884451 verified
"""Two-pass ranker on a ≤32B open-weight model via HF Inference Providers.
Pass 1: cheap relevance filter — for each item, "is this AI news worth a
senior engineer's two minutes?" Yes/no.
Pass 2: structured 0-10 ranking on the survivors. Surfaces the top items.
The down-port story for Build Small: the production ai-news-agent runs a
single 70B-Groq scoring pass over the full batch. That works but it spends
70B-class budget on items that are obviously noise (HN posts about
non-AI scams that hit the AI keyword set). At 32B we split the work — a
cheap binary filter first to drop obvious junk, then a graded score on the
real candidates. Same end signal, half the prompt tokens at the expensive
step.
"""
from __future__ import annotations
import json
import os
import time
from dataclasses import dataclass
import httpx
from config import DEFAULT_BASE_URL, DEFAULT_MODEL, MIN_RELEVANCE
# ---------------------------------------------------------------------------
# Provider client
# ---------------------------------------------------------------------------
@dataclass
class RankerConfig:
base_url: str = DEFAULT_BASE_URL
model: str = DEFAULT_MODEL
api_key: str = "" # populated from HF_TOKEN at call time if blank
timeout: float = 90.0
def _client(cfg: RankerConfig) -> httpx.Client:
api_key = cfg.api_key or os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_TOKEN", "")
if not api_key:
raise RuntimeError(
"HF_TOKEN missing — set it in the environment or pass api_key= explicitly."
)
return httpx.Client(
base_url=cfg.base_url,
timeout=cfg.timeout,
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
)
def _chat(cfg: RankerConfig, system: str, user: str, *, json_mode: bool = True,
temperature: float = 0.2, max_tokens: int = 4000) -> str:
payload = {
"model": cfg.model,
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": user},
],
"temperature": temperature,
"max_tokens": max_tokens,
}
if json_mode:
payload["response_format"] = {"type": "json_object"}
with _client(cfg) as cli:
r = cli.post("/chat/completions", json=payload)
r.raise_for_status()
return r.json()["choices"][0]["message"]["content"]
# ---------------------------------------------------------------------------
# Pass 1 — binary relevance filter
# ---------------------------------------------------------------------------
_FILTER_SYSTEM = "You are a precise JSON-only classifier. No prose."
_FILTER_PROMPT = """You are pre-filtering items for a 2-hour AI-news briefing for a senior AI engineer.
Mark each item KEEP if it is AI/ML news that a senior engineer would care about (model releases, capability shifts, key research, important industry moves, notable benchmarks, infrastructure changes). Mark DROP if it is noise, off-topic, hype-with-no-substance, repeat news from earlier today, or non-AI items.
Return JSON only:
{{"verdicts": [{{"i": 0, "v": "KEEP"}}, {{"i": 1, "v": "DROP"}}, ...]}}
Items:
{items_json}
"""
def filter_relevant(items: list[dict], cfg: RankerConfig | None = None) -> list[dict]:
"""Pass 1 — drop obvious noise. Returns items that survived."""
if not items:
return []
cfg = cfg or RankerConfig()
indexed = [
{"i": i, "source": it.get("source", ""), "title": (it.get("title") or "")[:200]}
for i, it in enumerate(items)
]
raw = _chat(
cfg,
_FILTER_SYSTEM,
_FILTER_PROMPT.format(items_json=json.dumps(indexed, ensure_ascii=False)),
)
try:
data = json.loads(raw)
keep = {entry["i"] for entry in data.get("verdicts", []) if entry.get("v") == "KEEP"}
except Exception as e:
print(f"[filter] parse failed, keeping all: {e}")
keep = set(range(len(items)))
return [items[i] for i in range(len(items)) if i in keep]
# ---------------------------------------------------------------------------
# Pass 2 — graded ranker
# ---------------------------------------------------------------------------
_RANKER_SYSTEM = "You are a precise JSON-only scorer. No prose."
_RANKER_PROMPT = """You are an AI-news editor scoring items for a 2-hour briefing for a senior AI engineer.
Score each item 0-10 on importance and novelty. High scores (8-10) = major model releases, significant research breakthroughs, capability shifts, key industry moves, notable benchmarks. Medium (5-7) = relevant but smaller updates, useful tools, interesting research. Low (0-4) = noise, hype with no substance, repackaged news, off-topic.
Return JSON only:
{{"scores": [{{"i": 0, "score": 8, "reason": "short why"}}, ...]}}
Items:
{items_json}
"""
def rank_items(items: list[dict], cfg: RankerConfig | None = None) -> list[dict]:
"""Pass 2 — graded score 0-10. Items below MIN_RELEVANCE are dropped.
Returns sorted descending by score, each item gets a `score` and
`reason` field added.
"""
if not items:
return []
cfg = cfg or RankerConfig()
indexed = [
{"i": i, "source": it.get("source", ""), "title": (it.get("title") or "")[:200]}
for i, it in enumerate(items)
]
raw = _chat(
cfg,
_RANKER_SYSTEM,
_RANKER_PROMPT.format(items_json=json.dumps(indexed, ensure_ascii=False)),
)
try:
data = json.loads(raw)
score_map = {entry["i"]: (int(entry["score"]), entry.get("reason", ""))
for entry in data.get("scores", [])}
except Exception as e:
print(f"[rank] parse failed, defaulting all to 5: {e}")
score_map = {i: (5, "parse error") for i in range(len(items))}
out: list[dict] = []
for i, item in enumerate(items):
score, reason = score_map.get(i, (5, ""))
if score < MIN_RELEVANCE:
continue
out.append({**item, "score": score, "reason": reason})
out.sort(key=lambda x: x["score"], reverse=True)
return out
# ---------------------------------------------------------------------------
# Combined pipeline
# ---------------------------------------------------------------------------
@dataclass
class RankResult:
raw_count: int
after_filter: int
after_rank: int
items: list[dict]
filter_latency: float
rank_latency: float
def rank_pipeline(items: list[dict], cfg: RankerConfig | None = None) -> RankResult:
"""Filter then rank. Returns the surviving items plus per-stage latency."""
cfg = cfg or RankerConfig()
t0 = time.perf_counter()
filtered = filter_relevant(items, cfg)
t1 = time.perf_counter()
ranked = rank_items(filtered, cfg)
t2 = time.perf_counter()
return RankResult(
raw_count= len(items),
after_filter= len(filtered),
after_rank= len(ranked),
items= ranked,
filter_latency= t1 - t0,
rank_latency= t2 - t1,
)