File size: 3,950 Bytes
745f62a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
"""Smoke test: POST a real recording with metadata override to /api/process-audio-stream,
print the final extracted form.child / patient / metadata envelope so we can confirm
apply_metadata kicked in (i.e. names/age/sex came from header, not the LLM).

Usage: python scripts/_smoke_metadata.py <audio_path> [--visit-type child_health]
"""
from __future__ import annotations
import argparse
import json
import sys
import time
import urllib.request
import urllib.error
import uuid

try:
    sys.stdout.reconfigure(encoding="utf-8")
except Exception:
    pass


def post_multipart(url: str, audio_path: str, fields: dict) -> str:
    boundary = f"----sakhi-smoke-{uuid.uuid4().hex}"
    body = bytearray()
    for k, v in fields.items():
        body += f"--{boundary}\r\n".encode()
        body += f'Content-Disposition: form-data; name="{k}"\r\n\r\n'.encode()
        body += str(v).encode("utf-8") + b"\r\n"
    with open(audio_path, "rb") as f:
        audio_bytes = f.read()
    fname = audio_path.replace("\\", "/").rsplit("/", 1)[-1]
    body += f"--{boundary}\r\n".encode()
    body += f'Content-Disposition: form-data; name="audio"; filename="{fname}"\r\n'.encode()
    body += b"Content-Type: application/octet-stream\r\n\r\n"
    body += audio_bytes + b"\r\n"
    body += f"--{boundary}--\r\n".encode()
    req = urllib.request.Request(
        url, data=bytes(body),
        headers={"Content-Type": f"multipart/form-data; boundary={boundary}"},
    )
    last_complete = None
    t0 = time.time()
    with urllib.request.urlopen(req, timeout=600) as resp:
        # Stream SSE
        for raw in resp:
            line = raw.decode("utf-8", errors="replace").rstrip("\r\n")
            if not line.startswith("data: "):
                continue
            try:
                evt = json.loads(line[6:])
            except json.JSONDecodeError:
                continue
            if evt.get("stage") == "complete":
                last_complete = evt
                print(f"[smoke] complete at {time.time()-t0:.1f}s")
                break
            else:
                stage = evt.get("stage") or evt.get("error") or "?"
                status = evt.get("status") or ""
                print(f"[smoke] {time.time()-t0:5.1f}s  {stage:<10} {status}")
    return last_complete


def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("audio")
    ap.add_argument("--visit-type", default="child_health")
    ap.add_argument("--patient-name", default="आरव")
    ap.add_argument("--patient-age", default="14")
    ap.add_argument("--age-unit", default="months")
    ap.add_argument("--patient-sex", default="male")
    ap.add_argument("--asha-id", default="ASHA-9999")
    ap.add_argument("--visit-date", default="2026-05-13")
    ap.add_argument("--url", default="http://localhost:8000/api/process-audio-stream")
    args = ap.parse_args()

    fields = {
        "visit_type": args.visit_type,
        "patient_name": args.patient_name,
        "patient_age": args.patient_age,
        "age_unit": args.age_unit,
        "patient_sex": args.patient_sex,
        "asha_id": args.asha_id,
        "visit_date": args.visit_date,
    }
    print(f"[smoke] POST {args.url}  audio={args.audio}")
    print(f"[smoke] fields={fields}")
    evt = post_multipart(args.url, args.audio, fields)
    if not evt:
        print("[smoke] no complete event received")
        sys.exit(2)

    print("\n=== detected visit_type:", evt.get("visit_type"))
    print("=== metadata envelope:")
    print(json.dumps(evt.get("metadata"), ensure_ascii=False, indent=2))
    form = evt.get("form") or {}
    print("=== form.child:")
    print(json.dumps(form.get("child"), ensure_ascii=False, indent=2))
    print("=== form.patient:")
    print(json.dumps(form.get("patient"), ensure_ascii=False, indent=2))
    print("=== timing:")
    print(json.dumps(evt.get("timing"), ensure_ascii=False, indent=2))


if __name__ == "__main__":
    main()