Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import threading | |
| import tempfile | |
| import zipfile | |
| import time | |
| import rft_flightrecorder as fr | |
| def _append_with_retry(*, tries: int = 40, sleep_s: float = 0.01, **kwargs): | |
| last = "" | |
| for _ in range(tries): | |
| ev, msg = fr.append_event(**kwargs) | |
| if ev is not None: | |
| return ev, msg | |
| last = msg or "" | |
| if "busy" in last.lower() or "lock" in last.lower(): | |
| time.sleep(sleep_s) | |
| continue | |
| break | |
| return None, last | |
| def two_tab_spam_test(log_path: str, threads: int = 6, events_per_thread: int = 80): | |
| sk_hex, _pk_hex = fr.gen_keys() | |
| sid, msg = fr.start_session(log_path, "brutal-test", "deterministic", "spam test", False, sk_hex) | |
| assert sid, f"Failed to start session: {msg}" | |
| errors = [] | |
| errors_lock = threading.Lock() | |
| def worker(tid: int): | |
| for i in range(events_per_thread): | |
| payload = json.dumps({"thread": tid, "i": i}, ensure_ascii=False) | |
| ev, m = _append_with_retry( | |
| log_path=log_path, | |
| session_id=sid, | |
| event_type="note", | |
| payload_text=payload, | |
| parent_event_hash="", | |
| sign_event=False, | |
| sk_hex=sk_hex, | |
| model_id="brutal-test", | |
| run_mode="deterministic", | |
| ) | |
| if not ev: | |
| with errors_lock: | |
| errors.append(m) | |
| ts = [threading.Thread(target=worker, args=(t,)) for t in range(threads)] | |
| for t in ts: | |
| t.start() | |
| for t in ts: | |
| t.join() | |
| assert not errors, f"Append errors occurred (first 5): {errors[:5]}" | |
| status, ok, report = fr.verify_session(log_path, sid, pk_hex="", require_signatures=False) | |
| assert ok, f"Session failed verification after spam.\n{status}\n{report}" | |
| all_events, _corrupt = fr.read_jsonl(log_path) | |
| evs = fr.events_for_session(all_events, sid) | |
| expected_before = 1 + (threads * events_per_thread) | |
| assert len(evs) == expected_before, f"Expected {expected_before} events before finalise, got {len(evs)}" | |
| anchor, m = fr.finalise_session(log_path, sid, False, sk_hex, "brutal-test", "deterministic") | |
| assert anchor, f"Finalise failed: {m}" | |
| ev, m = fr.append_event( | |
| log_path=log_path, | |
| session_id=sid, | |
| event_type="note", | |
| payload_text='{"post_end": true}', | |
| parent_event_hash="", | |
| sign_event=False, | |
| sk_hex=sk_hex, | |
| model_id="brutal-test", | |
| run_mode="deterministic", | |
| ) | |
| assert ev is None and "end" in (m or "").lower(), f"Expected refused append after end, got: {m}" | |
| status, ok, report = fr.verify_session(log_path, sid, pk_hex="", require_signatures=False) | |
| assert ok, f"Session failed verification after finalise.\n{status}\n{report}" | |
| all_events, _corrupt = fr.read_jsonl(log_path) | |
| evs = fr.events_for_session(all_events, sid) | |
| expected_after = expected_before + 1 # session_end | |
| assert len(evs) == expected_after, f"Expected {expected_after} events after finalise, got {len(evs)}" | |
| return sid | |
| def tamper_zip_test(log_path: str): | |
| sk_hex, _pk_hex = fr.gen_keys() | |
| sid, msg = fr.start_session(log_path, "brutal-test", "deterministic", "tamper test", False, sk_hex) | |
| assert sid, f"Failed to start session: {msg}" | |
| for i in range(25): | |
| ev, m = _append_with_retry( | |
| log_path=log_path, | |
| session_id=sid, | |
| event_type="note", | |
| payload_text=json.dumps({"i": i}), | |
| parent_event_hash="", | |
| sign_event=False, | |
| sk_hex=sk_hex, | |
| model_id="brutal-test", | |
| run_mode="deterministic", | |
| ) | |
| assert ev, f"Append failed during tamper test: {m}" | |
| anchor, m = fr.finalise_session(log_path, sid, False, sk_hex, "brutal-test", "deterministic") | |
| assert anchor, f"Finalise failed during tamper test: {m}" | |
| zip_name, msg = fr.export_session_bundle(log_path, sid) | |
| assert zip_name and os.path.exists(zip_name), f"Export failed: {msg}" | |
| with tempfile.TemporaryDirectory() as td: | |
| with zipfile.ZipFile(zip_name, "r") as z: | |
| z.extractall(td) | |
| events_file = None | |
| for fn in os.listdir(td): | |
| if fn.endswith("_events.jsonl"): | |
| events_file = os.path.join(td, fn) | |
| break | |
| assert events_file, "No events jsonl found inside exported zip" | |
| with open(events_file, "r", encoding="utf-8") as f: | |
| lines = f.read().splitlines() | |
| assert len(lines) > 5, "Not enough events in bundle to tamper" | |
| obj = json.loads(lines[4]) | |
| obj.setdefault("payload", {}) | |
| if isinstance(obj["payload"], dict): | |
| obj["payload"]["tampered"] = True | |
| lines[4] = json.dumps(obj, ensure_ascii=False) | |
| with open(events_file, "w", encoding="utf-8") as f: | |
| f.write("\n".join(lines) + "\n") | |
| tampered_zip = os.path.join(td, "tampered.zip") | |
| with zipfile.ZipFile(tampered_zip, "w", compression=zipfile.ZIP_DEFLATED) as z: | |
| z.write(events_file, arcname=os.path.basename(events_file)) | |
| status, ok, report, _ = fr.import_bundle_verify( | |
| bundle_path=tampered_zip, | |
| pk_hex="", | |
| require_signatures=False, | |
| store_into_log=False, | |
| log_path=log_path, | |
| ) | |
| assert not ok, f"Tampered bundle incorrectly verified PASS.\n{status}\n{report}" | |
| return True | |
| def main(): | |
| with tempfile.TemporaryDirectory() as td: | |
| log_path = os.path.join(td, "flightlog.jsonl") | |
| sid1 = two_tab_spam_test(log_path, threads=8, events_per_thread=60) | |
| print(f"[PASS] two_tab_spam_test: session_id={sid1}") | |
| ok = tamper_zip_test(log_path) | |
| print(f"[PASS] tamper_zip_test: {ok}") | |
| print("[ALL PASS]") | |
| if __name__ == "__main__": | |
| main() | |