File size: 1,996 Bytes
e3a472a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
"""Token-aware operations: counting + priority-based truncation.

Uses tiktoken when available (cl100k_base encoder approximates Qwen tokens
within ~3 % on natural code/prose). Falls back to a 3.6 chars-per-token
estimator otherwise.
"""
from __future__ import annotations
from typing import Iterable, List, Sequence


_ENC = None


def _get_encoder():
    global _ENC
    if _ENC is not None:
        return _ENC
    try:
        import tiktoken
        _ENC = tiktoken.get_encoding("cl100k_base")
    except Exception:
        _ENC = False
    return _ENC


def count_tokens(text: str) -> int:
    """Best-effort token count for the configured encoder."""
    if not text:
        return 0
    enc = _get_encoder()
    if enc:
        return len(enc.encode(text, disallowed_special=()))
    # Heuristic: ~3.6 chars/token on mixed code+prose
    return max(1, int(len(text) / 3.6))


def truncate_to(text: str, max_tokens: int) -> str:
    """Drop trailing tokens to fit a budget."""
    if max_tokens <= 0:
        return ""
    enc = _get_encoder()
    if enc:
        toks = enc.encode(text, disallowed_special=())
        if len(toks) <= max_tokens:
            return text
        return enc.decode(toks[:max_tokens])
    # heuristic
    chars = int(max_tokens * 3.6)
    return text[:chars]


def fit_priority(
    items: Sequence[tuple[str, int]],   # (text, priority — lower = include first)
    max_tokens: int,
) -> str:
    """Pack texts in priority order until budget exhausted; truncate the last fitting one."""
    out: List[str] = []
    used = 0
    sorted_items = sorted(items, key=lambda t: t[1])
    for text, _prio in sorted_items:
        n = count_tokens(text)
        if used + n <= max_tokens:
            out.append(text)
            used += n
        else:
            remaining = max_tokens - used
            if remaining > 32:
                out.append(truncate_to(text, remaining))
                used = max_tokens
            break
    return "\n\n".join(out)