| import re
|
| import json
|
| import argparse
|
| from typing import List, Dict
|
| import bisect
|
|
|
| class ClaudeTokenizer:
|
| def __init__(self, config_file: str, algorithm: str = "trie"):
|
| with open(config_file, "r") as f:
|
| config = json.load(f)
|
|
|
| self.vocab = sorted(config["vocab"])
|
| self.vocab_size = config["n_vocab_size"]
|
| self.pat_str = config["pat_str"]
|
| self.special_tokens = config["special_tokens"]
|
|
|
| self.token_to_id = {token: i for i, token in enumerate(self.vocab)}
|
| self.id_to_token = {i: token for token, i in self.token_to_id.items()}
|
|
|
| for token, id in self.special_tokens.items():
|
| self.token_to_id[token] = id
|
| self.id_to_token[id] = token
|
|
|
| self.pat = re.compile(self.pat_str)
|
| self.vocab_trie = self._build_trie(self.vocab)
|
|
|
| self.algorithm = algorithm
|
| if algorithm not in ["trie", "linear"]:
|
| raise ValueError("Invalid algorithm. Choose 'trie' or 'linear'.")
|
|
|
| def _build_trie(self, vocab: List[str]) -> Dict:
|
| trie = {}
|
| for token in vocab:
|
| current = trie
|
| for char in token:
|
| if isinstance(current, str):
|
| break
|
| if char not in current:
|
| current[char] = {}
|
| current = current[char]
|
| if isinstance(current, dict):
|
| current["*"] = token
|
| return trie
|
|
|
| def tokenize(self, text: str) -> List[str]:
|
| if self.algorithm == "trie":
|
| tokens = []
|
| for part in self.pat.findall(text):
|
| tokens.extend(self._tokenize_part_trie(part))
|
| return tokens
|
| else:
|
| return self._tokenize_part_linear(text)
|
|
|
| def encode(self, text: str) -> List[int]:
|
| tokens = self.tokenize(text)
|
| return [
|
| self.token_to_id.get(token, self.special_tokens["<META>"])
|
| for token in tokens
|
| ]
|
|
|
| def decode(self, ids: List[int]) -> str:
|
| return "".join(self.id_to_token.get(id, "") for id in ids)
|
|
|
| def _tokenize_part_trie(self, text: str) -> List[str]:
|
| tokens = []
|
| while text:
|
| current = self.vocab_trie
|
| longest_match = ""
|
| for i, char in enumerate(text):
|
| if char not in current:
|
| break
|
| current = current[char]
|
| if "*" in current:
|
| longest_match = current["*"]
|
| if longest_match:
|
| tokens.append(longest_match)
|
| text = text[len(longest_match):]
|
| else:
|
| tokens.append(text[0])
|
| text = text[1:]
|
| return tokens
|
|
|
| def _tokenize_part_linear(self, text: str) -> List[str]:
|
| tokens = []
|
| while text:
|
| longest_match = self._binary_search_prefix(text)
|
| if longest_match:
|
| tokens.append(longest_match)
|
| text = text[len(longest_match):]
|
| else:
|
| tokens.append(text[0])
|
| text = text[1:]
|
| return tokens
|
|
|
| def _binary_search_prefix(self, text: str) -> str:
|
| left, right = 0, len(self.vocab) - 1
|
| longest_match = ""
|
|
|
| while left <= right:
|
| mid = (left + right) // 2
|
| if text.startswith(self.vocab[mid]):
|
| longest_match = self.vocab[mid]
|
| left = mid + 1
|
| elif self.vocab[mid] < text:
|
| left = mid + 1
|
| else:
|
| right = mid - 1
|
|
|
| return longest_match
|
|
|
| def process_file(file_path: str, tokenizer: ClaudeTokenizer) -> List[Dict]:
|
| encodings = ['utf-8', 'utf-16', 'latin-1', 'iso-8859-1']
|
|
|
| for encoding in encodings:
|
| try:
|
| with open(file_path, 'r', encoding=encoding) as f:
|
| text = f.read()
|
| break
|
| except UnicodeDecodeError:
|
| continue
|
| else:
|
| raise ValueError(f"Unable to decode the file {file_path} with any of the attempted encodings.")
|
|
|
| tokens = tokenizer.tokenize(text)
|
| encoded = tokenizer.encode(text)
|
|
|
| result = [{"token": token, "id": id} for token, id in zip(tokens, encoded)]
|
| result.append({"total": len(tokens)})
|
|
|
| return result
|
|
|
| def main():
|
| parser = argparse.ArgumentParser(description="Tokenize text using Claude Tokenizer")
|
| parser.add_argument("--text", type=str, help="Text to tokenize")
|
| parser.add_argument("--file", type=str, help="File to tokenize")
|
| parser.add_argument("--algo", type=str, choices=["linear", "trie"], required=True, help="Tokenization algorithm")
|
| args = parser.parse_args()
|
|
|
| if not args.text and not args.file:
|
| parser.error("Either --text or --file must be specified")
|
|
|
| try:
|
| tokenizer = ClaudeTokenizer("tokenizer_config.json", algorithm=args.algo)
|
|
|
| if args.file:
|
| result = process_file(args.file, tokenizer)
|
| output_file = args.file + ".tokens"
|
| with open(output_file, 'w', encoding='utf-8') as f:
|
| json.dump(result, f, indent=2, ensure_ascii=False)
|
| print(f"Tokenization results saved to {output_file}")
|
| else:
|
| tokens = tokenizer.tokenize(args.text)
|
| encoded = tokenizer.encode(args.text)
|
| result = [{"token": token, "id": id} for token, id in zip(tokens, encoded)]
|
| result.append({"total": len(tokens)})
|
| print(json.dumps(result, indent=2, ensure_ascii=False))
|
| except Exception as e:
|
| print(f"An error occurred: {str(e)}")
|
| import traceback
|
| traceback.print_exc()
|
|
|
| if __name__ == "__main__":
|
| main()
|
|
|