TahaFawzyElshrif commited on
Commit
03615c4
·
1 Parent(s): e694bc0

working on queue, finalization

Browse files
Files changed (2) hide show
  1. Consumer.py +42 -20
  2. encryption_utils.py +0 -1
Consumer.py CHANGED
@@ -26,6 +26,22 @@ redis_port = os.environ["REDIS_PORT"]
26
  redis_password = os.environ["REDIS_PASSWORD"]
27
 
28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
 
30
  def model_call(request,token):
31
  # fill with last state
@@ -40,45 +56,47 @@ def model_call(request,token):
40
  print(f"MODEL CALL WITH STATE {state} and PROMPT {request['prompt']} and MEMORY {request['memory']} and HT_TOKEN {token} and USER_EMAIL {request['user_email']} and USER_NAME {request['user_name']}")
41
  answer = get_response(request['prompt'], request['memory'],token,state,request['user_email'],request['user_name'])
42
 
43
- print(f"ANSWER {answer}")
44
  # drop unserlizable keys
45
  for k in ["llm","rag_model"]:
46
  answer[k] = ""
47
 
 
 
48
 
49
- r = redis.Redis(
50
- host=redis_host,
51
- port=redis_port,
52
- decode_responses=True,
53
- username="default",
54
- password=redis_password,
55
- )
56
-
57
- user_id = request["user_id"]
58
- msg_id = request["msg_id"]
59
- print(f"STORING ANSWER IN REDIS FOR USER_ID {user_id} AND MSG_ID {msg_id}")
60
- success = r.set(f'ANSWER_FOR_USER_ID{user_id}_OF_{msg_id}',json.dumps(answer))
61
 
 
 
 
62
 
63
- return {"STATUS": success , "user_id": user_id, "msg_id": msg_id}
64
 
65
  def get_connection():
66
  params = pika.URLParameters(RABBITMQ_URL)
67
  return pika.BlockingConnection(params)
68
 
69
  def callback(ch, method, properties, body):
 
70
  recieved_msg = json.loads(body.decode())
71
  print("-------------------------------------------------")
72
  print(f"MSG AT CONSUMER {consumer_id}" )
73
 
74
- # simulate processing
75
- token = decrypt_token_from_json(json.loads(recieved_msg['ht_token_encrypted_dumped']))
76
- print(f"TYPE {type(recieved_msg)}, CONTENT {recieved_msg} , TOKEN {token}")
77
- model_call(recieved_msg,token)
78
-
79
- # (put your logic here)
80
  print(f"CONSUMER {consumer_id}:::: Processing done")
81
 
 
 
82
  ch.basic_ack(delivery_tag=method.delivery_tag)
83
 
84
  def start_consumer():
@@ -99,6 +117,10 @@ def start_consumer():
99
  print("Waiting for messages...")
100
  channel.start_consuming()
101
 
 
 
 
 
102
  if __name__ == "__main__":
103
  print(f"Starting New Consumer {consumer_id}...")
104
  start_consumer()
 
26
  redis_password = os.environ["REDIS_PASSWORD"]
27
 
28
 
29
+ ##################################################
30
+ # PROCESSING METHODS
31
+ ##################################################
32
+
33
+ def redis_send(user_id,msg_id,answer):
34
+ r = redis.Redis(
35
+ host=redis_host,
36
+ port=redis_port,
37
+ decode_responses=True,
38
+ username="default",
39
+ password=redis_password,
40
+ )
41
+
42
+ print(f"STORING ANSWER IN REDIS FOR USER_ID {user_id} AND MSG_ID {msg_id}")
43
+ success = r.set(f'ANSWER_FOR_USER_ID{user_id}_OF_{msg_id}',json.dumps(answer))
44
+ return success
45
 
46
  def model_call(request,token):
47
  # fill with last state
 
56
  print(f"MODEL CALL WITH STATE {state} and PROMPT {request['prompt']} and MEMORY {request['memory']} and HT_TOKEN {token} and USER_EMAIL {request['user_email']} and USER_NAME {request['user_name']}")
57
  answer = get_response(request['prompt'], request['memory'],token,state,request['user_email'],request['user_name'])
58
 
 
59
  # drop unserlizable keys
60
  for k in ["llm","rag_model"]:
61
  answer[k] = ""
62
 
63
+ print(f"ANSWER {answer}")
64
+ return answer
65
 
66
+ def process_message(recieved_msg):
67
+ # decrypt token
68
+ token = decrypt_token_from_json(json.loads(recieved_msg['ht_token_encrypted_dumped']))
69
+ print(f"TYPE {type(recieved_msg)}, CONTENT {recieved_msg} , TOKEN {token}")
70
+ # call the model
71
+ model_answer = model_call(recieved_msg,token)
72
+ # send answer to redis
73
+ user_id = recieved_msg["user_id"]
74
+ msg_id = recieved_msg["msg_id"]
75
+ redis_send_res = redis_send(user_id,msg_id,model_answer)
76
+ print({"STATUS": redis_send_res , "user_id": user_id, "msg_id": msg_id})
77
+
78
 
79
+ ##################################################
80
+ # CONSUMER METHODS
81
+ ##################################################
82
 
 
83
 
84
  def get_connection():
85
  params = pika.URLParameters(RABBITMQ_URL)
86
  return pika.BlockingConnection(params)
87
 
88
  def callback(ch, method, properties, body):
89
+ ##### Recieve message and process it
90
  recieved_msg = json.loads(body.decode())
91
  print("-------------------------------------------------")
92
  print(f"MSG AT CONSUMER {consumer_id}" )
93
 
94
+ ##### Process Message
95
+ process_message(recieved_msg)
 
 
 
 
96
  print(f"CONSUMER {consumer_id}:::: Processing done")
97
 
98
+
99
+ ###### Finalize
100
  ch.basic_ack(delivery_tag=method.delivery_tag)
101
 
102
  def start_consumer():
 
117
  print("Waiting for messages...")
118
  channel.start_consuming()
119
 
120
+ ##################################################
121
+ # MAIN
122
+ ##################################################
123
+
124
  if __name__ == "__main__":
125
  print(f"Starting New Consumer {consumer_id}...")
126
  start_consumer()
encryption_utils.py CHANGED
@@ -18,7 +18,6 @@ def encrypt_token_to_json(token: str) -> dict:
18
 
19
  encrypted = aesgcm.encrypt(iv, token.encode(), None)
20
 
21
- # في AESGCM في Python: التاج (tag) بيكون في آخر 16 بايت
22
  ciphertext = encrypted[:-16]
23
  tag = encrypted[-16:]
24
 
 
18
 
19
  encrypted = aesgcm.encrypt(iv, token.encode(), None)
20
 
 
21
  ciphertext = encrypted[:-16]
22
  tag = encrypted[-16:]
23