anhkhoiphan commited on
Commit
8b5cd07
·
verified ·
1 Parent(s): 4927c50

Sửa lỗi requests.post chặn event loop khiến chainlit không xử lý được tín hiệu WebSocket gây disconnect

Browse files
Files changed (1) hide show
  1. app.py +119 -111
app.py CHANGED
@@ -2,13 +2,12 @@ import re
2
  import time
3
  import chainlit as cl
4
  import pandas as pd
5
- import requests
6
  import asyncio
7
  from typing import Dict, List, Any, Optional, Callable
8
  from dataclasses import dataclass, field
9
  import os
10
  import uuid
11
- import threading
12
  from datetime import datetime, timedelta
13
 
14
 
@@ -25,9 +24,9 @@ class ConversationState:
25
  outputs: Optional[Dict[str, Any]] = None
26
  selected_model: str = "Gemini 2.0 Flash"
27
  product_model_search: bool = False
28
- # New fields for delayed cleanup
29
  pending_cleanup: bool = False
30
- cleanup_timer: Optional[threading.Timer] = None
31
  last_activity: datetime = field(default_factory=datetime.now)
32
 
33
  def reset(self):
@@ -39,16 +38,16 @@ class ConversationState:
39
  self.outputs = None
40
  self.selected_model = "Gemini 2.0 Flash"
41
  self.product_model_search = False
42
- # Reset cleanup fields but don't touch timers
43
  self.pending_cleanup = False
44
  self.last_activity = datetime.now()
45
 
46
- def cancel_cleanup_timer(self):
47
- """Cancel pending cleanup timer if exists"""
48
- if self.cleanup_timer:
49
- self.cleanup_timer.cancel()
50
- self.cleanup_timer = None
51
- print(f"🚫 Cancelled cleanup timer for session: {self.session_id}")
52
 
53
 
54
  class StateManager:
@@ -56,12 +55,12 @@ class StateManager:
56
 
57
  # CLASS-LEVEL session storage for isolation between different browser sessions
58
  _session_states: Dict[str, ConversationState] = {}
59
- _lock = threading.Lock() # Thread safety for timer operations
60
 
61
  @staticmethod
62
- def get_or_create_session_state(session_id: str) -> ConversationState:
63
  """Get existing session state or create new one"""
64
- with StateManager._lock:
65
  if session_id not in StateManager._session_states:
66
  state = ConversationState()
67
  state.session_id = session_id
@@ -73,7 +72,7 @@ class StateManager:
73
 
74
  # CRITICAL: If session was pending cleanup, cancel it because user is active again
75
  if state.pending_cleanup:
76
- state.cancel_cleanup_timer()
77
  state.pending_cleanup = False
78
  print(f"♻️ User activity detected! Cancelled pending cleanup for: {session_id}")
79
 
@@ -82,35 +81,41 @@ class StateManager:
82
  return state
83
 
84
  @staticmethod
85
- def schedule_delayed_cleanup(session_id: str, delay_seconds: int = 3600):
86
- """Schedule delayed cleanup for a session (default 1 hours for disconnect tolerance)"""
87
- with StateManager._lock:
88
  if session_id not in StateManager._session_states:
89
  print(f"⚠️ Cannot schedule cleanup for non-existent session: {session_id}")
90
  return
91
 
92
  state = StateManager._session_states[session_id]
93
 
94
- # Cancel existing timer if any
95
- state.cancel_cleanup_timer()
96
 
97
  # Mark as pending cleanup
98
  state.pending_cleanup = True
99
 
100
- # Schedule new cleanup
101
- def delayed_cleanup():
102
- print(f"⏰ Executing delayed cleanup for session: {session_id}")
103
- StateManager._perform_actual_cleanup(session_id)
 
 
 
 
 
 
 
104
 
105
- state.cleanup_timer = threading.Timer(delay_seconds, delayed_cleanup)
106
- state.cleanup_timer.start()
107
 
108
  print(f"⏱️ Scheduled cleanup in {delay_seconds}s for session: {session_id} (likely disconnect)")
109
 
110
  @staticmethod
111
- def _perform_actual_cleanup(session_id: str):
112
  """Perform the actual cleanup after delay"""
113
- with StateManager._lock:
114
  if session_id not in StateManager._session_states:
115
  print(f"⚠️ Session already cleaned or doesn't exist: {session_id}")
116
  return
@@ -122,7 +127,7 @@ class StateManager:
122
  print(f"🚫 Cleanup cancelled - user activity detected for: {session_id}")
123
  return
124
 
125
- # Perform API cleanup
126
  try:
127
  if API_BASE_URL:
128
  payload = {
@@ -130,8 +135,9 @@ class StateManager:
130
  "reset_model": False,
131
  "session_id": session_id
132
  }
133
- response = requests.post(f"{API_BASE_URL}/clear_memory", json=payload, timeout=30)
134
- print(f"Clear memory response for {session_id}: {response.status_code}")
 
135
  except Exception as e:
136
  print(f"Warning: clear_memory failed for {session_id}: {e}")
137
 
@@ -140,13 +146,13 @@ class StateManager:
140
  print(f"🗑️ Successfully cleaned up session: {session_id}")
141
 
142
  @staticmethod
143
- def cleanup_session_immediate(session_id: str):
144
  """Immediate cleanup (for testing or forced cleanup)"""
145
- with StateManager._lock:
146
  if session_id in StateManager._session_states:
147
  state = StateManager._session_states[session_id]
148
- state.cancel_cleanup_timer()
149
- StateManager._perform_actual_cleanup(session_id)
150
 
151
  @staticmethod
152
  async def clear_chat_state(state: ConversationState):
@@ -158,8 +164,9 @@ class StateManager:
158
  "reset_model": False,
159
  "session_id": state.session_id
160
  }
161
- response = requests.post(f"{API_BASE_URL}/clear_memory", json=payload, timeout=30)
162
- print(f"Clear memory response: {response.status_code}")
 
163
  except Exception as e:
164
  print(f"Warning: clear_memory failed: {e}")
165
 
@@ -169,26 +176,26 @@ class StateManager:
169
  state.session_id = session_id
170
 
171
  @staticmethod
172
- def change_model(state: ConversationState, model_name: str):
173
  """Change the selected model"""
174
  state.selected_model = model_name
175
  state.last_activity = datetime.now()
176
 
177
  @staticmethod
178
- def toggle_product_model_search(state: ConversationState):
179
  """Toggle product model search mode"""
180
  state.product_model_search = not state.product_model_search
181
  state.last_activity = datetime.now()
182
 
183
  @staticmethod
184
- def get_session_status() -> Dict[str, Dict[str, Any]]:
185
  """Get status of all sessions (for debugging)"""
186
- with StateManager._lock:
187
  status = {}
188
  for session_id, state in StateManager._session_states.items():
189
  status[session_id] = {
190
  "pending_cleanup": state.pending_cleanup,
191
- "has_timer": state.cleanup_timer is not None,
192
  "last_activity": state.last_activity.isoformat(),
193
  "selected_model": state.selected_model,
194
  "product_model_search": state.product_model_search
@@ -197,7 +204,7 @@ class StateManager:
197
 
198
 
199
  class ChatService:
200
- """Handles chat-related operations"""
201
 
202
  @staticmethod
203
  async def respond_to_chat(
@@ -205,7 +212,7 @@ class ChatService:
205
  message: str,
206
  image_path: Optional[str] = None
207
  ) -> str:
208
- """Handle chat responses with image support"""
209
  print(f"🔄 === DEBUG STATE ===\n Chat request with model: {state.selected_model}, Product Model Search: {state.product_model_search}, Session ID: {state.session_id}")
210
 
211
  # Update activity timestamp - this is KEY to prevent cleanup during active use
@@ -219,12 +226,14 @@ class ChatService:
219
  if not state.session_id:
220
  return "Error: Session ID not initialized"
221
 
222
- # Call API
223
  try:
224
- if image_path:
225
- # For image uploads, use form-data format as expected by API
226
- with open(image_path, 'rb') as f:
227
- files = {"image": f}
 
 
228
  data = {
229
  "message": message,
230
  "product_model_search": str(state.product_model_search).lower(),
@@ -232,26 +241,27 @@ class ChatService:
232
  "llm_model": state.selected_model,
233
  "debug": "Normal"
234
  }
235
- resp = requests.post(
 
 
 
236
  f"{API_BASE_URL}/chat_with_image",
237
- files=files,
238
- data=data,
239
- timeout=600
 
 
 
 
 
 
 
 
 
 
 
 
240
  )
241
- else:
242
- # For text messages, use form-data format as expected by API
243
- data = {
244
- "message": message,
245
- "session_id": state.session_id,
246
- "debug": "Normal",
247
- "product_model_search": state.product_model_search,
248
- "llm_model": state.selected_model
249
- }
250
- resp = requests.post(
251
- f"{API_BASE_URL}/chat",
252
- data=data, # Changed from json to data for form format
253
- timeout=600
254
- )
255
 
256
  if resp.status_code == 200:
257
  j = resp.json()
@@ -283,12 +293,12 @@ class ChatService:
283
 
284
  # Filter products based on query
285
  if state.specs_advantages is not None:
286
- ChatService.get_specific_product_from_query(message, state)
287
 
288
  return response + f"\n\n*Thời gian xử lí: {end - start:.6f}s*"
289
 
290
  @staticmethod
291
- def get_specific_product_from_query(query, state):
292
  """Filter specs_advantages based on models found in query"""
293
  specs_map = state.specs_advantages or {}
294
  product_model_list = []
@@ -314,10 +324,10 @@ class ChatService:
314
 
315
 
316
  class DisplayService:
317
- """Handles display-related operations"""
318
 
319
  @staticmethod
320
- def show_specs(state: ConversationState) -> str:
321
  """Generate specifications table"""
322
  specs_map = state.specs_advantages
323
  columns = ["Thông số"]
@@ -420,7 +430,7 @@ class DisplayService:
420
  return f"📄 **Thông số kỹ thuật**\n\n{markdown_table}"
421
 
422
  @staticmethod
423
- def show_advantages(state: ConversationState) -> str:
424
  """Generate advantages as bullet list instead of table"""
425
  specs_map = state.specs_advantages
426
 
@@ -452,7 +462,7 @@ class DisplayService:
452
  return content
453
 
454
  @staticmethod
455
- def show_solution_packages(state: ConversationState) -> str:
456
  """Show solution packages in a structured format"""
457
  packages = state.solution_packages
458
 
@@ -463,7 +473,8 @@ class DisplayService:
463
  return markdown_table
464
 
465
  @staticmethod
466
- def show_all_products_table(state: ConversationState):
 
467
  outputs = state.outputs or {}
468
 
469
  if not outputs:
@@ -472,8 +483,8 @@ class DisplayService:
472
  try:
473
  # Updated to match API format - send outputs in request body
474
  payload = {"outputs": outputs}
475
- resp = requests.post(f"{API_BASE_URL}/products_by_category",
476
- json=payload, timeout=60)
477
 
478
  if resp.status_code == 200:
479
  data = resp.json()
@@ -542,8 +553,8 @@ class UIService:
542
  return msg
543
 
544
 
545
- # HELPER FUNCTIONS: Session management with proper error handling
546
- def ensure_session_state() -> Optional[ConversationState]:
547
  """Ensure session state exists, create if not"""
548
  try:
549
  session_id = cl.user_session.get("session_id")
@@ -552,21 +563,21 @@ def ensure_session_state() -> Optional[ConversationState]:
552
  print(f"Lỗi: Không lấy được session id ở ensure_session_state")
553
  return None
554
 
555
- return StateManager.get_or_create_session_state(session_id)
556
 
557
  except Exception as e:
558
  print(f"⚠️ Error ensuring session state: {e}")
559
  return None
560
 
561
 
562
- def get_current_session_state() -> Optional[ConversationState]:
563
  """Get current session state using Chainlit's session system"""
564
  try:
565
  # Use Chainlit's user session to get unique session ID
566
  chainlit_session_id = cl.user_session.get("session_id")
567
 
568
  if chainlit_session_id:
569
- return StateManager.get_or_create_session_state(chainlit_session_id)
570
  else:
571
  print("⚠️ No Chainlit session ID found")
572
  return None
@@ -586,7 +597,7 @@ async def on_chat_start():
586
  else:
587
  print(f"🔄 Reusing existing session_id: {session_id}")
588
 
589
- app_state = StateManager.get_or_create_session_state(session_id)
590
 
591
  await cl.Message(
592
  content=f"🛍️ **RangDong Sales Agent** (Session: {session_id[:8]}...)\n\n"
@@ -607,7 +618,7 @@ async def on_chat_start():
607
 
608
  @cl.on_chat_end
609
  async def on_chat_end():
610
- """Handle chat session end with delayed cleanup mechanism"""
611
  try:
612
  session_id = cl.user_session.get("session_id")
613
  print(f"📤 on_chat_end triggered for session {session_id}")
@@ -615,7 +626,7 @@ async def on_chat_end():
615
  if session_id:
616
  # Schedule delayed cleanup instead of immediate cleanup
617
  # Use shorter delay (30s) since this is likely just a temporary disconnect
618
- StateManager.schedule_delayed_cleanup(session_id, delay_seconds=3600)
619
  print(f"⏳ Scheduled delayed cleanup for session {session_id} (1h delay for disconnect tolerance)")
620
  else:
621
  print("⚠️ No session_id found in on_chat_end")
@@ -627,58 +638,58 @@ async def on_chat_end():
627
  @cl.action_callback("show_specs")
628
  async def on_show_specs(action):
629
  """Handle show specifications action"""
630
- app_state = ensure_session_state()
631
  if app_state is None:
632
  await cl.Message(content="Error: Session state not found", author="assistant").send()
633
  return
634
 
635
- specs_content = DisplayService.show_specs(app_state)
636
  await UIService.send_message_with_buttons(specs_content, app_state, author="assistant")
637
 
638
 
639
  @cl.action_callback("show_advantages")
640
  async def on_show_advantages(action):
641
  """Handle show advantages action"""
642
- app_state = ensure_session_state()
643
  if app_state is None:
644
  await cl.Message(content="Error: Session state not found", author="assistant").send()
645
  return
646
 
647
- adv_content = DisplayService.show_advantages(app_state)
648
  await UIService.send_message_with_buttons(adv_content, app_state, author="assistant")
649
 
650
 
651
  @cl.action_callback("show_packages")
652
  async def on_show_packages(action):
653
  """Handle show packages action"""
654
- app_state = ensure_session_state()
655
  if app_state is None:
656
  await cl.Message(content="Error: Session state not found", author="assistant").send()
657
  return
658
 
659
- pkg_content = DisplayService.show_solution_packages(app_state)
660
  await UIService.send_message_with_buttons(pkg_content, app_state, author="assistant")
661
 
662
  @cl.action_callback("show_all_products")
663
  async def on_show_all_products(action):
664
  """Handle show all products action"""
665
- app_state = ensure_session_state()
666
  if app_state is None:
667
  await cl.Message(content="Error: Session state not found", author="assistant").send()
668
  return
669
 
670
- all_products_content = DisplayService.show_all_products_table(app_state)
671
  await UIService.send_message_with_buttons(all_products_content, app_state, author="assistant")
672
 
673
  @cl.action_callback("toggle_product_search")
674
  async def on_toggle_product_search(action):
675
  """Handle toggle product model search action"""
676
- app_state = ensure_session_state()
677
  if app_state is None:
678
  await cl.Message(content="Error: Session state not found", author="assistant").send()
679
  return
680
 
681
- StateManager.toggle_product_model_search(app_state)
682
 
683
  status_message = (
684
  "✅ **Đã bật tìm kiếm theo mã sản phẩm**\n\n"
@@ -694,7 +705,7 @@ async def on_toggle_product_search(action):
694
  @cl.action_callback("change_model")
695
  async def on_change_model(action):
696
  """Handle model change action"""
697
- app_state = ensure_session_state()
698
  if app_state is None:
699
  await cl.Message(content="Error: Session state not found", author="assistant").send()
700
  return
@@ -720,7 +731,7 @@ async def on_change_model(action):
720
  @cl.action_callback("back_to_main")
721
  async def on_back_to_main(action):
722
  """Handle back to main menu action"""
723
- app_state = ensure_session_state()
724
  if app_state is None:
725
  await cl.Message(content="Error: Session state not found", author="assistant").send()
726
  return
@@ -735,34 +746,34 @@ async def on_back_to_main(action):
735
 
736
  @cl.action_callback("select_model_0")
737
  async def on_select_model_0(action):
738
- app_state = ensure_session_state()
739
  if app_state is None:
740
  await cl.Message(content="Error: Session state not found", author="assistant").send()
741
  return
742
 
743
- StateManager.change_model(app_state, "Gemini 2.0 Flash")
744
  await UIService.send_message_with_buttons("✅ Đã chuyển sang **Gemini 2.0 Flash**", app_state, author="assistant")
745
 
746
 
747
  @cl.action_callback("select_model_1")
748
  async def on_select_model_1(action):
749
- app_state = ensure_session_state()
750
  if app_state is None:
751
  await cl.Message(content="Error: Session state not found", author="assistant").send()
752
  return
753
 
754
- StateManager.change_model(app_state, "Gemini 2.5 Flash Lite")
755
  await UIService.send_message_with_buttons("✅ Đã chuyển sang **Gemini 2.5 Flash Lite**", app_state, author="assistant")
756
 
757
 
758
  @cl.action_callback("select_model_2")
759
  async def on_select_model_2(action):
760
- app_state = ensure_session_state()
761
  if app_state is None:
762
  await cl.Message(content="Error: Session state not found", author="assistant").send()
763
  return
764
 
765
- StateManager.change_model(app_state, "Gemini 2.0 Flash Lite")
766
  await UIService.send_message_with_buttons("✅ Đã chuyển sang **Gemini 2.0 Flash Lite**", app_state, author="assistant")
767
 
768
 
@@ -771,7 +782,7 @@ async def on_select_model_2(action):
771
  async def on_debug_sessions(action):
772
  """Debug action to show session status (can be added to debug builds)"""
773
  try:
774
- status = StateManager.get_session_status()
775
  debug_content = "🔍 **Debug: Session Status**\n\n"
776
 
777
  if not status:
@@ -780,7 +791,7 @@ async def on_debug_sessions(action):
780
  for session_id, info in status.items():
781
  debug_content += f"**Session: {session_id[:8]}...**\n"
782
  debug_content += f"- Pending cleanup: {info['pending_cleanup']}\n"
783
- debug_content += f"- Has timer: {info['has_timer']}\n"
784
  debug_content += f"- Last activity: {info['last_activity']}\n"
785
  debug_content += f"- Model: {info['selected_model']}\n"
786
  debug_content += f"- Product search: {info['product_model_search']}\n\n"
@@ -792,8 +803,8 @@ async def on_debug_sessions(action):
792
 
793
  @cl.on_message
794
  async def main(message: cl.Message):
795
- """Main message handler"""
796
- app_state = ensure_session_state()
797
  if app_state is None:
798
  await cl.Message(content="Error: Session state not found", author="assistant").send()
799
  return
@@ -809,14 +820,11 @@ async def main(message: cl.Message):
809
  # Show typing animation
810
  typing_msg = await UIService.create_typing_animation()
811
 
812
- # Get response from API
813
  response = await ChatService.respond_to_chat(app_state, message.content, image_path)
814
 
815
  # Update the typing message with final response and buttons
816
  typing_msg.content = response
817
  typing_msg.actions = UIService.create_action_buttons(app_state)
818
  typing_msg.author = "assistant"
819
- await typing_msg.update()
820
-
821
-
822
-
 
2
  import time
3
  import chainlit as cl
4
  import pandas as pd
5
+ import httpx
6
  import asyncio
7
  from typing import Dict, List, Any, Optional, Callable
8
  from dataclasses import dataclass, field
9
  import os
10
  import uuid
 
11
  from datetime import datetime, timedelta
12
 
13
 
 
24
  outputs: Optional[Dict[str, Any]] = None
25
  selected_model: str = "Gemini 2.0 Flash"
26
  product_model_search: bool = False
27
+ # New fields for delayed cleanup - now using asyncio
28
  pending_cleanup: bool = False
29
+ cleanup_task: Optional[asyncio.Task] = None
30
  last_activity: datetime = field(default_factory=datetime.now)
31
 
32
  def reset(self):
 
38
  self.outputs = None
39
  self.selected_model = "Gemini 2.0 Flash"
40
  self.product_model_search = False
41
+ # Reset cleanup fields but don't touch tasks
42
  self.pending_cleanup = False
43
  self.last_activity = datetime.now()
44
 
45
+ def cancel_cleanup_task(self):
46
+ """Cancel pending cleanup task if exists"""
47
+ if self.cleanup_task and not self.cleanup_task.done():
48
+ self.cleanup_task.cancel()
49
+ self.cleanup_task = None
50
+ print(f"🚫 Cancelled cleanup task for session: {self.session_id}")
51
 
52
 
53
  class StateManager:
 
55
 
56
  # CLASS-LEVEL session storage for isolation between different browser sessions
57
  _session_states: Dict[str, ConversationState] = {}
58
+ _lock = asyncio.Lock() # Async lock for consistency
59
 
60
  @staticmethod
61
+ async def get_or_create_session_state(session_id: str) -> ConversationState:
62
  """Get existing session state or create new one"""
63
+ async with StateManager._lock:
64
  if session_id not in StateManager._session_states:
65
  state = ConversationState()
66
  state.session_id = session_id
 
72
 
73
  # CRITICAL: If session was pending cleanup, cancel it because user is active again
74
  if state.pending_cleanup:
75
+ state.cancel_cleanup_task()
76
  state.pending_cleanup = False
77
  print(f"♻️ User activity detected! Cancelled pending cleanup for: {session_id}")
78
 
 
81
  return state
82
 
83
  @staticmethod
84
+ async def schedule_delayed_cleanup(session_id: str, delay_seconds: int = 3600):
85
+ """Schedule delayed cleanup for a session using asyncio (default 1 hour for disconnect tolerance)"""
86
+ async with StateManager._lock:
87
  if session_id not in StateManager._session_states:
88
  print(f"⚠️ Cannot schedule cleanup for non-existent session: {session_id}")
89
  return
90
 
91
  state = StateManager._session_states[session_id]
92
 
93
+ # Cancel existing task if any
94
+ state.cancel_cleanup_task()
95
 
96
  # Mark as pending cleanup
97
  state.pending_cleanup = True
98
 
99
+ # Schedule new cleanup using asyncio
100
+ async def delayed_cleanup():
101
+ try:
102
+ await asyncio.sleep(delay_seconds)
103
+ print(f"⏰ Executing delayed cleanup for session: {session_id}")
104
+ await StateManager._perform_actual_cleanup(session_id)
105
+ except asyncio.CancelledError:
106
+ print(f"🚫 Cleanup task cancelled for session: {session_id}")
107
+ raise
108
+ except Exception as e:
109
+ print(f"❌ Error in delayed cleanup for {session_id}: {e}")
110
 
111
+ state.cleanup_task = asyncio.create_task(delayed_cleanup())
 
112
 
113
  print(f"⏱️ Scheduled cleanup in {delay_seconds}s for session: {session_id} (likely disconnect)")
114
 
115
  @staticmethod
116
+ async def _perform_actual_cleanup(session_id: str):
117
  """Perform the actual cleanup after delay"""
118
+ async with StateManager._lock:
119
  if session_id not in StateManager._session_states:
120
  print(f"⚠️ Session already cleaned or doesn't exist: {session_id}")
121
  return
 
127
  print(f"🚫 Cleanup cancelled - user activity detected for: {session_id}")
128
  return
129
 
130
+ # Perform API cleanup using httpx
131
  try:
132
  if API_BASE_URL:
133
  payload = {
 
135
  "reset_model": False,
136
  "session_id": session_id
137
  }
138
+ async with httpx.AsyncClient(timeout=30.0) as client:
139
+ response = await client.post(f"{API_BASE_URL}/clear_memory", json=payload)
140
+ print(f"Clear memory response for {session_id}: {response.status_code}")
141
  except Exception as e:
142
  print(f"Warning: clear_memory failed for {session_id}: {e}")
143
 
 
146
  print(f"🗑️ Successfully cleaned up session: {session_id}")
147
 
148
  @staticmethod
149
+ async def cleanup_session_immediate(session_id: str):
150
  """Immediate cleanup (for testing or forced cleanup)"""
151
+ async with StateManager._lock:
152
  if session_id in StateManager._session_states:
153
  state = StateManager._session_states[session_id]
154
+ state.cancel_cleanup_task()
155
+ await StateManager._perform_actual_cleanup(session_id)
156
 
157
  @staticmethod
158
  async def clear_chat_state(state: ConversationState):
 
164
  "reset_model": False,
165
  "session_id": state.session_id
166
  }
167
+ async with httpx.AsyncClient(timeout=30.0) as client:
168
+ response = await client.post(f"{API_BASE_URL}/clear_memory", json=payload)
169
+ print(f"Clear memory response: {response.status_code}")
170
  except Exception as e:
171
  print(f"Warning: clear_memory failed: {e}")
172
 
 
176
  state.session_id = session_id
177
 
178
  @staticmethod
179
+ async def change_model(state: ConversationState, model_name: str):
180
  """Change the selected model"""
181
  state.selected_model = model_name
182
  state.last_activity = datetime.now()
183
 
184
  @staticmethod
185
+ async def toggle_product_model_search(state: ConversationState):
186
  """Toggle product model search mode"""
187
  state.product_model_search = not state.product_model_search
188
  state.last_activity = datetime.now()
189
 
190
  @staticmethod
191
+ async def get_session_status() -> Dict[str, Dict[str, Any]]:
192
  """Get status of all sessions (for debugging)"""
193
+ async with StateManager._lock:
194
  status = {}
195
  for session_id, state in StateManager._session_states.items():
196
  status[session_id] = {
197
  "pending_cleanup": state.pending_cleanup,
198
+ "has_task": state.cleanup_task is not None and not state.cleanup_task.done(),
199
  "last_activity": state.last_activity.isoformat(),
200
  "selected_model": state.selected_model,
201
  "product_model_search": state.product_model_search
 
204
 
205
 
206
  class ChatService:
207
+ """Handles chat-related operations with async HTTP calls"""
208
 
209
  @staticmethod
210
  async def respond_to_chat(
 
212
  message: str,
213
  image_path: Optional[str] = None
214
  ) -> str:
215
+ """Handle chat responses with image support using async HTTP"""
216
  print(f"🔄 === DEBUG STATE ===\n Chat request with model: {state.selected_model}, Product Model Search: {state.product_model_search}, Session ID: {state.session_id}")
217
 
218
  # Update activity timestamp - this is KEY to prevent cleanup during active use
 
226
  if not state.session_id:
227
  return "Error: Session ID not initialized"
228
 
229
+ # Call API using httpx for async HTTP
230
  try:
231
+ async with httpx.AsyncClient(timeout=600.0) as client:
232
+ if image_path:
233
+ # For image uploads, use form-data format as expected by API
234
+ with open(image_path, 'rb') as f:
235
+ files = {"image": f.read()}
236
+
237
  data = {
238
  "message": message,
239
  "product_model_search": str(state.product_model_search).lower(),
 
241
  "llm_model": state.selected_model,
242
  "debug": "Normal"
243
  }
244
+
245
+ # Use multipart form data for image upload
246
+ files_dict = {"image": ("image.jpg", files["image"], "image/jpeg")}
247
+ resp = await client.post(
248
  f"{API_BASE_URL}/chat_with_image",
249
+ files=files_dict,
250
+ data=data
251
+ )
252
+ else:
253
+ # For text messages, use form-data format as expected by API
254
+ data = {
255
+ "message": message,
256
+ "session_id": state.session_id,
257
+ "debug": "Normal",
258
+ "product_model_search": str(state.product_model_search).lower(),
259
+ "llm_model": state.selected_model
260
+ }
261
+ resp = await client.post(
262
+ f"{API_BASE_URL}/chat",
263
+ data=data # Form data format
264
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
265
 
266
  if resp.status_code == 200:
267
  j = resp.json()
 
293
 
294
  # Filter products based on query
295
  if state.specs_advantages is not None:
296
+ await ChatService.get_specific_product_from_query(message, state)
297
 
298
  return response + f"\n\n*Thời gian xử lí: {end - start:.6f}s*"
299
 
300
  @staticmethod
301
+ async def get_specific_product_from_query(query, state):
302
  """Filter specs_advantages based on models found in query"""
303
  specs_map = state.specs_advantages or {}
304
  product_model_list = []
 
324
 
325
 
326
  class DisplayService:
327
+ """Handles display-related operations with async HTTP calls"""
328
 
329
  @staticmethod
330
+ async def show_specs(state: ConversationState) -> str:
331
  """Generate specifications table"""
332
  specs_map = state.specs_advantages
333
  columns = ["Thông số"]
 
430
  return f"📄 **Thông số kỹ thuật**\n\n{markdown_table}"
431
 
432
  @staticmethod
433
+ async def show_advantages(state: ConversationState) -> str:
434
  """Generate advantages as bullet list instead of table"""
435
  specs_map = state.specs_advantages
436
 
 
462
  return content
463
 
464
  @staticmethod
465
+ async def show_solution_packages(state: ConversationState) -> str:
466
  """Show solution packages in a structured format"""
467
  packages = state.solution_packages
468
 
 
473
  return markdown_table
474
 
475
  @staticmethod
476
+ async def show_all_products_table(state: ConversationState):
477
+ """Show all products table using async HTTP"""
478
  outputs = state.outputs or {}
479
 
480
  if not outputs:
 
483
  try:
484
  # Updated to match API format - send outputs in request body
485
  payload = {"outputs": outputs}
486
+ async with httpx.AsyncClient(timeout=60.0) as client:
487
+ resp = await client.post(f"{API_BASE_URL}/products_by_category", json=payload)
488
 
489
  if resp.status_code == 200:
490
  data = resp.json()
 
553
  return msg
554
 
555
 
556
+ # HELPER FUNCTIONS: Session management with proper async error handling
557
+ async def ensure_session_state() -> Optional[ConversationState]:
558
  """Ensure session state exists, create if not"""
559
  try:
560
  session_id = cl.user_session.get("session_id")
 
563
  print(f"Lỗi: Không lấy được session id ở ensure_session_state")
564
  return None
565
 
566
+ return await StateManager.get_or_create_session_state(session_id)
567
 
568
  except Exception as e:
569
  print(f"⚠️ Error ensuring session state: {e}")
570
  return None
571
 
572
 
573
+ async def get_current_session_state() -> Optional[ConversationState]:
574
  """Get current session state using Chainlit's session system"""
575
  try:
576
  # Use Chainlit's user session to get unique session ID
577
  chainlit_session_id = cl.user_session.get("session_id")
578
 
579
  if chainlit_session_id:
580
+ return await StateManager.get_or_create_session_state(chainlit_session_id)
581
  else:
582
  print("⚠️ No Chainlit session ID found")
583
  return None
 
597
  else:
598
  print(f"🔄 Reusing existing session_id: {session_id}")
599
 
600
+ app_state = await StateManager.get_or_create_session_state(session_id)
601
 
602
  await cl.Message(
603
  content=f"🛍️ **RangDong Sales Agent** (Session: {session_id[:8]}...)\n\n"
 
618
 
619
  @cl.on_chat_end
620
  async def on_chat_end():
621
+ """Handle chat session end with delayed cleanup mechanism using asyncio"""
622
  try:
623
  session_id = cl.user_session.get("session_id")
624
  print(f"📤 on_chat_end triggered for session {session_id}")
 
626
  if session_id:
627
  # Schedule delayed cleanup instead of immediate cleanup
628
  # Use shorter delay (30s) since this is likely just a temporary disconnect
629
+ await StateManager.schedule_delayed_cleanup(session_id, delay_seconds=3600)
630
  print(f"⏳ Scheduled delayed cleanup for session {session_id} (1h delay for disconnect tolerance)")
631
  else:
632
  print("⚠️ No session_id found in on_chat_end")
 
638
  @cl.action_callback("show_specs")
639
  async def on_show_specs(action):
640
  """Handle show specifications action"""
641
+ app_state = await ensure_session_state()
642
  if app_state is None:
643
  await cl.Message(content="Error: Session state not found", author="assistant").send()
644
  return
645
 
646
+ specs_content = await DisplayService.show_specs(app_state)
647
  await UIService.send_message_with_buttons(specs_content, app_state, author="assistant")
648
 
649
 
650
  @cl.action_callback("show_advantages")
651
  async def on_show_advantages(action):
652
  """Handle show advantages action"""
653
+ app_state = await ensure_session_state()
654
  if app_state is None:
655
  await cl.Message(content="Error: Session state not found", author="assistant").send()
656
  return
657
 
658
+ adv_content = await DisplayService.show_advantages(app_state)
659
  await UIService.send_message_with_buttons(adv_content, app_state, author="assistant")
660
 
661
 
662
  @cl.action_callback("show_packages")
663
  async def on_show_packages(action):
664
  """Handle show packages action"""
665
+ app_state = await ensure_session_state()
666
  if app_state is None:
667
  await cl.Message(content="Error: Session state not found", author="assistant").send()
668
  return
669
 
670
+ pkg_content = await DisplayService.show_solution_packages(app_state)
671
  await UIService.send_message_with_buttons(pkg_content, app_state, author="assistant")
672
 
673
  @cl.action_callback("show_all_products")
674
  async def on_show_all_products(action):
675
  """Handle show all products action"""
676
+ app_state = await ensure_session_state()
677
  if app_state is None:
678
  await cl.Message(content="Error: Session state not found", author="assistant").send()
679
  return
680
 
681
+ all_products_content = await DisplayService.show_all_products_table(app_state)
682
  await UIService.send_message_with_buttons(all_products_content, app_state, author="assistant")
683
 
684
  @cl.action_callback("toggle_product_search")
685
  async def on_toggle_product_search(action):
686
  """Handle toggle product model search action"""
687
+ app_state = await ensure_session_state()
688
  if app_state is None:
689
  await cl.Message(content="Error: Session state not found", author="assistant").send()
690
  return
691
 
692
+ await StateManager.toggle_product_model_search(app_state)
693
 
694
  status_message = (
695
  "✅ **Đã bật tìm kiếm theo mã sản phẩm**\n\n"
 
705
  @cl.action_callback("change_model")
706
  async def on_change_model(action):
707
  """Handle model change action"""
708
+ app_state = await ensure_session_state()
709
  if app_state is None:
710
  await cl.Message(content="Error: Session state not found", author="assistant").send()
711
  return
 
731
  @cl.action_callback("back_to_main")
732
  async def on_back_to_main(action):
733
  """Handle back to main menu action"""
734
+ app_state = await ensure_session_state()
735
  if app_state is None:
736
  await cl.Message(content="Error: Session state not found", author="assistant").send()
737
  return
 
746
 
747
  @cl.action_callback("select_model_0")
748
  async def on_select_model_0(action):
749
+ app_state = await ensure_session_state()
750
  if app_state is None:
751
  await cl.Message(content="Error: Session state not found", author="assistant").send()
752
  return
753
 
754
+ await StateManager.change_model(app_state, "Gemini 2.0 Flash")
755
  await UIService.send_message_with_buttons("✅ Đã chuyển sang **Gemini 2.0 Flash**", app_state, author="assistant")
756
 
757
 
758
  @cl.action_callback("select_model_1")
759
  async def on_select_model_1(action):
760
+ app_state = await ensure_session_state()
761
  if app_state is None:
762
  await cl.Message(content="Error: Session state not found", author="assistant").send()
763
  return
764
 
765
+ await StateManager.change_model(app_state, "Gemini 2.5 Flash Lite")
766
  await UIService.send_message_with_buttons("✅ Đã chuyển sang **Gemini 2.5 Flash Lite**", app_state, author="assistant")
767
 
768
 
769
  @cl.action_callback("select_model_2")
770
  async def on_select_model_2(action):
771
+ app_state = await ensure_session_state()
772
  if app_state is None:
773
  await cl.Message(content="Error: Session state not found", author="assistant").send()
774
  return
775
 
776
+ await StateManager.change_model(app_state, "Gemini 2.0 Flash Lite")
777
  await UIService.send_message_with_buttons("✅ Đã chuyển sang **Gemini 2.0 Flash Lite**", app_state, author="assistant")
778
 
779
 
 
782
  async def on_debug_sessions(action):
783
  """Debug action to show session status (can be added to debug builds)"""
784
  try:
785
+ status = await StateManager.get_session_status()
786
  debug_content = "🔍 **Debug: Session Status**\n\n"
787
 
788
  if not status:
 
791
  for session_id, info in status.items():
792
  debug_content += f"**Session: {session_id[:8]}...**\n"
793
  debug_content += f"- Pending cleanup: {info['pending_cleanup']}\n"
794
+ debug_content += f"- Has task: {info['has_task']}\n"
795
  debug_content += f"- Last activity: {info['last_activity']}\n"
796
  debug_content += f"- Model: {info['selected_model']}\n"
797
  debug_content += f"- Product search: {info['product_model_search']}\n\n"
 
803
 
804
  @cl.on_message
805
  async def main(message: cl.Message):
806
+ """Main message handler with proper async flow"""
807
+ app_state = await ensure_session_state()
808
  if app_state is None:
809
  await cl.Message(content="Error: Session state not found", author="assistant").send()
810
  return
 
820
  # Show typing animation
821
  typing_msg = await UIService.create_typing_animation()
822
 
823
+ # Get response from API - now fully async, won't block event loop
824
  response = await ChatService.respond_to_chat(app_state, message.content, image_path)
825
 
826
  # Update the typing message with final response and buttons
827
  typing_msg.content = response
828
  typing_msg.actions = UIService.create_action_buttons(app_state)
829
  typing_msg.author = "assistant"
830
+ await typing_msg.update()