| """Token-aware operations: counting + priority-based truncation. |
| |
| Uses tiktoken when available (cl100k_base encoder approximates Qwen tokens |
| within ~3 % on natural code/prose). Falls back to a 3.6 chars-per-token |
| estimator otherwise. |
| """ |
| from __future__ import annotations |
| from typing import Iterable, List, Sequence |
|
|
|
|
| _ENC = None |
|
|
|
|
| def _get_encoder(): |
| global _ENC |
| if _ENC is not None: |
| return _ENC |
| try: |
| import tiktoken |
| _ENC = tiktoken.get_encoding("cl100k_base") |
| except Exception: |
| _ENC = False |
| return _ENC |
|
|
|
|
| def count_tokens(text: str) -> int: |
| """Best-effort token count for the configured encoder.""" |
| if not text: |
| return 0 |
| enc = _get_encoder() |
| if enc: |
| return len(enc.encode(text, disallowed_special=())) |
| |
| return max(1, int(len(text) / 3.6)) |
|
|
|
|
| def truncate_to(text: str, max_tokens: int) -> str: |
| """Drop trailing tokens to fit a budget.""" |
| if max_tokens <= 0: |
| return "" |
| enc = _get_encoder() |
| if enc: |
| toks = enc.encode(text, disallowed_special=()) |
| if len(toks) <= max_tokens: |
| return text |
| return enc.decode(toks[:max_tokens]) |
| |
| chars = int(max_tokens * 3.6) |
| return text[:chars] |
|
|
|
|
| def fit_priority( |
| items: Sequence[tuple[str, int]], |
| max_tokens: int, |
| ) -> str: |
| """Pack texts in priority order until budget exhausted; truncate the last fitting one.""" |
| out: List[str] = [] |
| used = 0 |
| sorted_items = sorted(items, key=lambda t: t[1]) |
| for text, _prio in sorted_items: |
| n = count_tokens(text) |
| if used + n <= max_tokens: |
| out.append(text) |
| used += n |
| else: |
| remaining = max_tokens - used |
| if remaining > 32: |
| out.append(truncate_to(text, remaining)) |
| used = max_tokens |
| break |
| return "\n\n".join(out) |
|
|