| import logging |
|
|
| import requests |
| import os, shutil |
| from flask import Flask, send_from_directory, request, jsonify |
|
|
| app = Flask(__name__, static_folder='online_log/static') |
|
|
| app.logger.setLevel(logging.ERROR) |
|
|
| log = logging.getLogger('werkzeug') |
| log.setLevel(logging.ERROR) |
|
|
| messages = [] |
| import threading |
| from urllib.parse import parse_qs |
|
|
| FILE_DIR = os.path.dirname(os.path.abspath(__file__)) |
| OUTPUT_DIR = os.path.join(FILE_DIR, "WareHouse") |
| def check_outdir(): |
| if not os.path.exists(OUTPUT_DIR): |
| os.mkdir(OUTPUT_DIR) |
| else: |
| shutil.rmtree(OUTPUT_DIR) |
| os.mkdir(OUTPUT_DIR) |
|
|
|
|
| def zip_all_files(): |
| shutil.make_archive("online_log/static/Outputs", "zip", OUTPUT_DIR) |
|
|
|
|
| def clear_all_files(): |
| shutil.rmtree(OUTPUT_DIR) |
| os.mkdir(OUTPUT_DIR) |
|
|
|
|
| def send_msg(role, text): |
| try: |
| data = {"role": role, "text": text} |
| response = requests.post("http://127.0.0.1:7860/send_message", json=data) |
| if response.status_code == 200: |
| print("Message sent successfully!") |
| else: |
| print("Failed to send message.") |
| except: |
| logging.info("flask app.py did not start for online log") |
|
|
|
|
| @app.route("/") |
| def index(): |
| return send_from_directory("online_log/static", "index.html") |
|
|
| @app.route("/Outputs.zip") |
| def Outputs(): |
| return send_from_directory("online_log/static", "Outputs.zip") |
|
|
| @app.route("/chain_visualizer") |
| def chain_visualizer(): |
| return send_from_directory("online_log/static", "chain_visualizer.html") |
|
|
| @app.route("/replay") |
| def replay(): |
| return send_from_directory("online_log/static", "replay.html") |
|
|
| @app.route("/download") |
| def download(): |
| return send_from_directory("online_log/static", "index.html") |
|
|
| @app.route("/get_messages") |
| def get_messages(): |
| return jsonify(messages) |
|
|
|
|
| @app.route("/send_message", methods=["POST"]) |
| def send_message(): |
| data = request.get_json() |
| role = data.get("role") |
| text = data.get("text") |
|
|
| avatarUrl = find_avatar_url(role) |
|
|
| message = {"role": role, "text": text, "avatarUrl": avatarUrl} |
| messages.append(message) |
| return jsonify(message) |
|
|
|
|
| @app.post("/download") |
| def run(): |
| data = request.get_data().decode('utf-8') |
| query_params = parse_qs(data) |
| task = query_params['task'][0].replace("+", " ") |
| config = query_params['config'][0] |
| api_key = query_params['api_key'][0] |
| os.environ["OPENAI_API_KEY"] = api_key |
| check_outdir() |
| from run import runchatdev |
| |
| |
| |
| runchatdev(task, config) |
| zip_all_files() |
| return send_from_directory("online_log/static", "index.html") |
|
|
| def find_avatar_url(role): |
| role = role.replace(" ", "%20") |
| avatar_filename = f"avatars/{role}.png" |
| avatar_url = f"/static/{avatar_filename}" |
| return avatar_url |
|
|
|
|
| if __name__ == "__main__": |
| app.run(host="0.0.0.0", port=7860) |