incognitolm commited on
Commit
47416d1
·
1 Parent(s): c0cb659

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +72 -83
app.py CHANGED
@@ -1093,104 +1093,93 @@ async def websocket_chat(ws: WebSocket):
1093
  # Internal endpoint (avoids Hugging Face proxy redirect)
1094
  internal_url = "http://127.0.0.1:7860/gen/chat/completions"
1095
 
1096
- # Queue to ensure sequential request processing
1097
- import asyncio
1098
- request_queue = asyncio.Queue()
1099
-
1100
- async def process_requests():
1101
- # Persistent HTTP client for streaming
1102
- async with httpx.AsyncClient(
1103
- timeout=None,
1104
- follow_redirects=False
1105
- ) as client:
1106
  while True:
1107
  try:
1108
- request_id, body, headers = await request_queue.get()
1109
- except asyncio.CancelledError:
1110
  break
1111
 
1112
  try:
1113
- async with client.stream(
1114
- "POST",
1115
- internal_url,
1116
- json=body,
1117
- headers=headers
1118
- ) as response:
1119
-
1120
- # Handle upstream errors
1121
- if response.status_code >= 400:
1122
- error_text = ""
1123
- try:
1124
- error_text = (await response.aread()).decode(
1125
- "utf-8", errors="replace"
1126
- )[:500]
1127
- except Exception:
1128
- pass
1129
-
1130
- error_payload = json.dumps({
1131
- "error": "Upstream request failed",
1132
- "status": response.status_code,
1133
- "detail": error_text
1134
- })
1135
- try:
1136
- await ws.send_text(f"{request_id}:{error_payload}")
1137
- except RuntimeError:
1138
- return
1139
- continue
1140
-
1141
- # Stream tokens/lines to the websocket
1142
- async for line in response.aiter_lines():
1143
- if not line:
1144
- continue
1145
 
1146
- try:
1147
- await ws.send_text(f"{request_id}:{line}")
1148
- except RuntimeError:
1149
- return
1150
 
1151
- except Exception as stream_error:
1152
- try:
1153
- error_payload = json.dumps({
1154
- "error": "Streaming error",
1155
- "detail": str(stream_error)
1156
- })
1157
- await ws.send_text(f"{request_id}:{error_payload}")
1158
- except Exception:
1159
- pass
1160
 
1161
- # Start the request processor task
1162
- processor_task = asyncio.create_task(process_requests())
 
 
 
1163
 
1164
- try:
1165
- request_counter = 0
1166
- while True:
1167
  try:
1168
- msg = await ws.receive_text()
1169
- except WebSocketDisconnect:
1170
- break
 
 
 
1171
 
1172
- try:
1173
- data = json.loads(msg)
1174
- except Exception:
1175
- await ws.send_json({"error": "Invalid JSON"})
1176
- continue
 
 
 
 
1177
 
1178
- body = data.get("body")
1179
- headers = data.get("headers") or {}
 
 
 
 
 
 
 
 
1180
 
1181
- if not body:
1182
- await ws.send_json({"error": "Missing body"})
1183
- continue
 
1184
 
1185
- request_counter += 1
1186
- await request_queue.put((request_counter, body, headers))
 
 
1187
 
1188
- finally:
1189
- processor_task.cancel()
1190
- try:
1191
- await processor_task
1192
- except asyncio.CancelledError:
1193
- pass
 
 
 
 
 
 
1194
 
1195
  except WebSocketDisconnect:
1196
  return
 
1093
  # Internal endpoint (avoids Hugging Face proxy redirect)
1094
  internal_url = "http://127.0.0.1:7860/gen/chat/completions"
1095
 
1096
+ # Persistent HTTP client for streaming
1097
+ async with httpx.AsyncClient(
1098
+ timeout=None,
1099
+ follow_redirects=False
1100
+ ) as client:
1101
+ request_counter = 0
1102
+
1103
+ # Background task to handle incoming requests
1104
+ async def handle_incoming_requests():
1105
+ nonlocal request_counter
1106
  while True:
1107
  try:
1108
+ msg = await ws.receive_text()
1109
+ except WebSocketDisconnect:
1110
  break
1111
 
1112
  try:
1113
+ data = json.loads(msg)
1114
+ except Exception:
1115
+ await ws.send_json({"error": "Invalid JSON"})
1116
+ continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1117
 
1118
+ body = data.get("body")
1119
+ headers = data.get("headers") or {}
 
 
1120
 
1121
+ if not body:
1122
+ await ws.send_json({"error": "Missing body"})
1123
+ continue
 
 
 
 
 
 
1124
 
1125
+ request_counter += 1
1126
+ request_id = request_counter
1127
+
1128
+ # Process this request concurrently
1129
+ asyncio.create_task(process_request(client, request_id, body, headers))
1130
 
1131
+ async def process_request(client, request_id, body, headers):
 
 
1132
  try:
1133
+ async with client.stream(
1134
+ "POST",
1135
+ internal_url,
1136
+ json=body,
1137
+ headers=headers
1138
+ ) as response:
1139
 
1140
+ # Handle upstream errors
1141
+ if response.status_code >= 400:
1142
+ error_text = ""
1143
+ try:
1144
+ error_text = (await response.aread()).decode(
1145
+ "utf-8", errors="replace"
1146
+ )[:500]
1147
+ except Exception:
1148
+ pass
1149
 
1150
+ error_payload = json.dumps({
1151
+ "error": "Upstream request failed",
1152
+ "status": response.status_code,
1153
+ "detail": error_text
1154
+ })
1155
+ try:
1156
+ await ws.send_text(f"{request_id}:{error_payload}")
1157
+ except RuntimeError:
1158
+ return
1159
+ return
1160
 
1161
+ # Stream tokens/lines to the websocket
1162
+ async for line in response.aiter_lines():
1163
+ if not line:
1164
+ continue
1165
 
1166
+ try:
1167
+ await ws.send_text(f"{request_id}:{line}")
1168
+ except RuntimeError:
1169
+ return
1170
 
1171
+ except Exception as stream_error:
1172
+ try:
1173
+ error_payload = json.dumps({
1174
+ "error": "Streaming error",
1175
+ "detail": str(stream_error)
1176
+ })
1177
+ await ws.send_text(f"{request_id}:{error_payload}")
1178
+ except Exception:
1179
+ pass
1180
+
1181
+ # Start handling incoming requests
1182
+ await handle_incoming_requests()
1183
 
1184
  except WebSocketDisconnect:
1185
  return