File size: 1,996 Bytes
e3a472a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | """Token-aware operations: counting + priority-based truncation.
Uses tiktoken when available (cl100k_base encoder approximates Qwen tokens
within ~3 % on natural code/prose). Falls back to a 3.6 chars-per-token
estimator otherwise.
"""
from __future__ import annotations
from typing import Iterable, List, Sequence
_ENC = None
def _get_encoder():
global _ENC
if _ENC is not None:
return _ENC
try:
import tiktoken
_ENC = tiktoken.get_encoding("cl100k_base")
except Exception:
_ENC = False
return _ENC
def count_tokens(text: str) -> int:
"""Best-effort token count for the configured encoder."""
if not text:
return 0
enc = _get_encoder()
if enc:
return len(enc.encode(text, disallowed_special=()))
# Heuristic: ~3.6 chars/token on mixed code+prose
return max(1, int(len(text) / 3.6))
def truncate_to(text: str, max_tokens: int) -> str:
"""Drop trailing tokens to fit a budget."""
if max_tokens <= 0:
return ""
enc = _get_encoder()
if enc:
toks = enc.encode(text, disallowed_special=())
if len(toks) <= max_tokens:
return text
return enc.decode(toks[:max_tokens])
# heuristic
chars = int(max_tokens * 3.6)
return text[:chars]
def fit_priority(
items: Sequence[tuple[str, int]], # (text, priority — lower = include first)
max_tokens: int,
) -> str:
"""Pack texts in priority order until budget exhausted; truncate the last fitting one."""
out: List[str] = []
used = 0
sorted_items = sorted(items, key=lambda t: t[1])
for text, _prio in sorted_items:
n = count_tokens(text)
if used + n <= max_tokens:
out.append(text)
used += n
else:
remaining = max_tokens - used
if remaining > 32:
out.append(truncate_to(text, remaining))
used = max_tokens
break
return "\n\n".join(out)
|