claims-env / test_websocket_debug.py
akhiilll's picture
Deploy ClaimSense adjudication gym
1cfeb15 verified
#!/usr/bin/env python3
"""Verbose WebSocket dump — handy when wire-format changes.
Sends ``reset`` then a single ``query_policy`` step and prints both the
raw frame and a pretty-printed parse of each response.
"""
from __future__ import annotations
import asyncio
import json
import os
import websockets
WS_URL = os.environ.get("CLAIMS_ENV_WS", "ws://127.0.0.1:7860/ws")
def _show(label: str, raw: str) -> None:
print(f"\n=== RAW {label} ===")
print(raw)
print(f"\n=== PARSED {label} ===")
print(json.dumps(json.loads(raw), indent=2))
async def main() -> None:
print(f"connecting to {WS_URL} …")
async with websockets.connect(WS_URL) as ws:
await ws.send(json.dumps({"type": "reset", "data": {}}))
_show("RESET", await ws.recv())
await ws.send(
json.dumps(
{
"type": "step",
"data": {"action_type": "query_policy", "parameters": {}},
}
)
)
_show("STEP", await ws.recv())
if __name__ == "__main__":
asyncio.run(main())