CodeBuddyAI / app.py
TahaFawzyElshrif
working on queue, finalization
14faaa0
from urllib import request
from fastapi import FastAPI
import uvicorn
import sys
import os
import json
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
from agent.agent_graph.StateTasks import ProblemState
import subprocess
from Queue_Producer import send_message
import redis
from utils import RequestModel, RequestAnswer
from Consumer import redis_send
##################################################
# VARIABLES
##################################################
redis_host = os.environ["REDIS_HOST"]
redis_port = os.environ["REDIS_PORT"]
redis_password = os.environ["REDIS_PASSWORD"]
##################################################
# START CONSUMERS in a separate process
##################################################
for i in range(1,4): # Start 3 consumers
subprocess.Popen(['python','-u','Consumer.py', '--id', str(i)])
##################################################
# START API and METHODS
##################################################
# Create Redis connection (global to make the get very light)
redis_conn = redis.Redis(
host=redis_host,
port=redis_port,
decode_responses=True,
username="default",
password=redis_password,
)
# model and rag are not global for better security ,at least for this version
# Create app instance
app = FastAPI()
print("Starting API Server...")
##################################################
# ROUTES
##################################################
@app.get("/")
def read_root():
return {"message": "Hello From CodeBuddyAI!"}
"""
def old_call(request: RequestModel):
# fill with last state
try:
state = json.loads(request.last_state)
except Exception:
state: ProblemState = {
"question": request.prompt,
"memory": request.memory
}
answer = get_response(request.prompt, request.memory,request.ht_token,state,request.user_email,request.user_name)
# drop unserlizable keys
for k in ["llm","rag_model"]:
answer[k] = ""
return {"Data": answer}
"""
@app.post("/Message/Send/")
def call(request: RequestModel):
redis_send(request.user_id,request.msg_id,{"status": "pending"})
return send_message(json.dumps(request.model_dump()))
@app.post("/Message/Answer/")
def call(request: RequestAnswer):
## MUST BE LIGHTWEIGHT, JUST CHECK IF ANSWER IS READY IN REDIS, IF YES RETURN IT, ELSE RETURN PENDING
try:
answer = redis_conn.get(f'ANSWER_FOR_USER_ID{request.user_id}_OF_{request.msg_id}')
if answer is None:
return {"status": "error"}
elif "status" in answer and json.loads(answer)["status"] == "pending":
return {"status": "pending"}
else:
redis_conn.delete(f'ANSWER_FOR_USER_ID{request.user_id}_OF_{request.msg_id}') # Clean up after fetching for memory and better secure as double call is wrong
return {"status": "ready", "data": json.loads(answer)}
except Exception as e:
print(f"Error fetching answer from Redis: {e}")
return {"status": "error", "message": str(e)}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)