Spaces:
Sleeping
Sleeping
File size: 1,099 Bytes
1cfeb15 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | #!/usr/bin/env python3
"""Verbose WebSocket dump — handy when wire-format changes.
Sends ``reset`` then a single ``query_policy`` step and prints both the
raw frame and a pretty-printed parse of each response.
"""
from __future__ import annotations
import asyncio
import json
import os
import websockets
WS_URL = os.environ.get("CLAIMS_ENV_WS", "ws://127.0.0.1:7860/ws")
def _show(label: str, raw: str) -> None:
print(f"\n=== RAW {label} ===")
print(raw)
print(f"\n=== PARSED {label} ===")
print(json.dumps(json.loads(raw), indent=2))
async def main() -> None:
print(f"connecting to {WS_URL} …")
async with websockets.connect(WS_URL) as ws:
await ws.send(json.dumps({"type": "reset", "data": {}}))
_show("RESET", await ws.recv())
await ws.send(
json.dumps(
{
"type": "step",
"data": {"action_type": "query_policy", "parameters": {}},
}
)
)
_show("STEP", await ws.recv())
if __name__ == "__main__":
asyncio.run(main())
|