Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """Verbose WebSocket dump — handy when wire-format changes. | |
| Sends ``reset`` then a single ``query_policy`` step and prints both the | |
| raw frame and a pretty-printed parse of each response. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import os | |
| import websockets | |
| WS_URL = os.environ.get("CLAIMS_ENV_WS", "ws://127.0.0.1:7860/ws") | |
| def _show(label: str, raw: str) -> None: | |
| print(f"\n=== RAW {label} ===") | |
| print(raw) | |
| print(f"\n=== PARSED {label} ===") | |
| print(json.dumps(json.loads(raw), indent=2)) | |
| async def main() -> None: | |
| print(f"connecting to {WS_URL} …") | |
| async with websockets.connect(WS_URL) as ws: | |
| await ws.send(json.dumps({"type": "reset", "data": {}})) | |
| _show("RESET", await ws.recv()) | |
| await ws.send( | |
| json.dumps( | |
| { | |
| "type": "step", | |
| "data": {"action_type": "query_policy", "parameters": {}}, | |
| } | |
| ) | |
| ) | |
| _show("STEP", await ws.recv()) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |