Buckets:

alucchi's picture
download
raw
3.66 kB
#!/usr/bin/env python3
import json, os, re, subprocess, zipfile
from collections import Counter
from pathlib import Path
root = Path(__file__).resolve().parent
src = (root.parents[1] / 'shared_resources' / 'enwik8').resolve()
raw = src.read_bytes()
text = raw.decode('latin1', errors='ignore')
used = set(raw)
unused = [b for b in range(256) if b not in used]
cands = set()
# XML-ish tags/attrs
for m in re.finditer(r"</?[a-zA-Z0-9:_-]{1,20}>", text):
cands.add(m.group(0).encode('latin1'))
for m in re.finditer(r"</?[a-zA-Z0-9:_-]{1,20}\s", text):
cands.add(m.group(0).encode('latin1'))
# Frequent words with surrounding spaces (for safer token boundaries)
words = re.findall(r"[A-Za-z]{3,20}", text)
wc = Counter(words)
for w, _ in wc.most_common(1500):
cands.add((" " + w + " ").encode('latin1'))
# High-frequency punctuation fragments
for frag in ["[[", "]]", "{{", "}}", "\n\n", "==", "===", "&quot;", "&lt;", "&gt;", "&amp;", "<text", "</text>"]:
cands.add(frag.encode('latin1'))
# Filter candidates.
cands = [c for c in cands if 3 <= len(c) <= 24]
# Compute gain and pick top by net estimated savings.
scored = []
for p in cands:
ct = raw.count(p)
if ct < 8:
continue
gain = ct * (len(p) - 1)
if gain > 15000:
scored.append((gain, ct, p))
scored.sort(reverse=True)
selected = [p for _, _, p in scored[:len(unused)]]
subs = list(zip(unused[:len(selected)], selected))
enc = raw
for code, pat in sorted(subs, key=lambda x: len(x[1]), reverse=True):
enc = enc.replace(pat, bytes([code]))
(root / 'preprocessed.bin').write_bytes(enc)
archive = root / 'archive.xz'
cmd = [
'xz', '-k', '-c', '-9e',
'--lzma2=dict=512MiB,nice=273,mf=bt4,mode=normal,lc=4,lp=0,pb=0',
str(root / 'preprocessed.bin'),
]
with archive.open('wb') as f:
subprocess.run(cmd, check=True, stdout=f)
dec_dir = root / 'decompressor'
dec_dir.mkdir(exist_ok=True)
(dec_dir / 'decompress.py').write_text('''#!/usr/bin/env python3
import json, subprocess, sys
from pathlib import Path
inp = Path(sys.argv[1]) if len(sys.argv) > 1 else Path('archive.xz')
out = Path(sys.argv[2]) if len(sys.argv) > 2 else Path('enwik8.out')
pp = inp.parent / 'tmp.preprocessed.bin'
subprocess.run(['xz','-d','-c',str(inp)], check=True, stdout=pp.open('wb'))
subs = json.loads((Path(__file__).with_name('subs.json')).read_text())
data = pp.read_bytes()
for code, pat_hex in subs:
data = data.replace(bytes([code]), bytes.fromhex(pat_hex))
out.write_bytes(data)
pp.unlink(missing_ok=True)
''')
(dec_dir / 'subs.json').write_text(json.dumps([(c, p.hex()) for c, p in subs]))
os.chmod(dec_dir / 'decompress.py', 0o755)
zip_path = root / 'decompressor.zip'
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED, compresslevel=9) as z:
z.write(dec_dir / 'decompress.py', arcname='decompress.py')
z.write(dec_dir / 'subs.json', arcname='subs.json')
out = root / 'enwik8.out'
subprocess.run(['python3', str(dec_dir / 'decompress.py'), str(archive), str(out)], check=True)
if out.read_bytes() != raw:
raise SystemExit('roundtrip failed')
archive_b = archive.stat().st_size
dec_b = zip_path.stat().st_size
total = archive_b + dec_b
bpc = round(8 * total / 1e8, 3)
res = {
'agent_id': 'AutoZip',
'experiment': 'Mined substitutions + tuned xz',
'method': 'dict-mined-xz',
'archive_bytes': archive_b,
'decompressor_zip_bytes': dec_b,
'total_bytes': total,
'bpc': bpc,
'num_subs': len(subs)
}
(root / 'results.json').write_text(json.dumps(res, indent=2) + '\n')
(root / 'selected_patterns.txt').write_text('\n'.join(f"{i:03d} {repr(p)}" for i,p in enumerate(selected)))
print(json.dumps(res))

Xet Storage Details

Size:
3.66 kB
·
Xet hash:
bfa67a9c3a685398aaf4b0c6b1b917ebeb634287b0a64cf76f9c4123cb2caab6

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.