Spaces:
Sleeping
Sleeping
File size: 3,950 Bytes
745f62a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | """Smoke test: POST a real recording with metadata override to /api/process-audio-stream,
print the final extracted form.child / patient / metadata envelope so we can confirm
apply_metadata kicked in (i.e. names/age/sex came from header, not the LLM).
Usage: python scripts/_smoke_metadata.py <audio_path> [--visit-type child_health]
"""
from __future__ import annotations
import argparse
import json
import sys
import time
import urllib.request
import urllib.error
import uuid
try:
sys.stdout.reconfigure(encoding="utf-8")
except Exception:
pass
def post_multipart(url: str, audio_path: str, fields: dict) -> str:
boundary = f"----sakhi-smoke-{uuid.uuid4().hex}"
body = bytearray()
for k, v in fields.items():
body += f"--{boundary}\r\n".encode()
body += f'Content-Disposition: form-data; name="{k}"\r\n\r\n'.encode()
body += str(v).encode("utf-8") + b"\r\n"
with open(audio_path, "rb") as f:
audio_bytes = f.read()
fname = audio_path.replace("\\", "/").rsplit("/", 1)[-1]
body += f"--{boundary}\r\n".encode()
body += f'Content-Disposition: form-data; name="audio"; filename="{fname}"\r\n'.encode()
body += b"Content-Type: application/octet-stream\r\n\r\n"
body += audio_bytes + b"\r\n"
body += f"--{boundary}--\r\n".encode()
req = urllib.request.Request(
url, data=bytes(body),
headers={"Content-Type": f"multipart/form-data; boundary={boundary}"},
)
last_complete = None
t0 = time.time()
with urllib.request.urlopen(req, timeout=600) as resp:
# Stream SSE
for raw in resp:
line = raw.decode("utf-8", errors="replace").rstrip("\r\n")
if not line.startswith("data: "):
continue
try:
evt = json.loads(line[6:])
except json.JSONDecodeError:
continue
if evt.get("stage") == "complete":
last_complete = evt
print(f"[smoke] complete at {time.time()-t0:.1f}s")
break
else:
stage = evt.get("stage") or evt.get("error") or "?"
status = evt.get("status") or ""
print(f"[smoke] {time.time()-t0:5.1f}s {stage:<10} {status}")
return last_complete
def main():
ap = argparse.ArgumentParser()
ap.add_argument("audio")
ap.add_argument("--visit-type", default="child_health")
ap.add_argument("--patient-name", default="आरव")
ap.add_argument("--patient-age", default="14")
ap.add_argument("--age-unit", default="months")
ap.add_argument("--patient-sex", default="male")
ap.add_argument("--asha-id", default="ASHA-9999")
ap.add_argument("--visit-date", default="2026-05-13")
ap.add_argument("--url", default="http://localhost:8000/api/process-audio-stream")
args = ap.parse_args()
fields = {
"visit_type": args.visit_type,
"patient_name": args.patient_name,
"patient_age": args.patient_age,
"age_unit": args.age_unit,
"patient_sex": args.patient_sex,
"asha_id": args.asha_id,
"visit_date": args.visit_date,
}
print(f"[smoke] POST {args.url} audio={args.audio}")
print(f"[smoke] fields={fields}")
evt = post_multipart(args.url, args.audio, fields)
if not evt:
print("[smoke] no complete event received")
sys.exit(2)
print("\n=== detected visit_type:", evt.get("visit_type"))
print("=== metadata envelope:")
print(json.dumps(evt.get("metadata"), ensure_ascii=False, indent=2))
form = evt.get("form") or {}
print("=== form.child:")
print(json.dumps(form.get("child"), ensure_ascii=False, indent=2))
print("=== form.patient:")
print(json.dumps(form.get("patient"), ensure_ascii=False, indent=2))
print("=== timing:")
print(json.dumps(evt.get("timing"), ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()
|