| import io |
| from fastapi import FastAPI, File, UploadFile |
|
|
| import subprocess |
| import os |
| import requests |
| import random |
|
|
| import shutil |
| import json |
| |
| from pydantic import BaseModel |
| from typing import Annotated |
|
|
| from fastapi import Form |
|
|
|
|
| import selenium |
|
|
| from selenium import webdriver |
| from selenium.webdriver import ChromeOptions |
| from selenium.webdriver.chrome.service import Service |
| import threading |
| import random |
| import string |
| import time |
|
|
|
|
| |
|
|
| |
| |
|
|
| |
|
|
|
|
| |
| |
|
|
|
|
|
|
|
|
| |
|
|
|
|
| class Query(BaseModel): |
| text: str |
| host:str |
|
|
| |
|
|
|
|
| from fastapi import FastAPI, Request, Depends, UploadFile, File |
| from fastapi.exceptions import HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import JSONResponse |
|
|
|
|
| app = FastAPI() |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=['*'], |
| allow_credentials=True, |
| allow_methods=['*'], |
| allow_headers=['*'], |
| ) |
|
|
|
|
| |
| |
| |
| |
|
|
| from selenium.webdriver.common.by import By |
| from pymongo.mongo_client import MongoClient |
|
|
| @app.on_event("startup") |
| async def startup_event(): |
| print("on startup") |
| |
| |
| |
| |
|
|
|
|
|
|
|
|
|
|
| mycol = None |
|
|
|
|
| @app.post("/url") |
| async def get_url(request: Request ): |
| return "k" |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
|
|
|
|
|
|
| def extract(): |
| options = ChromeOptions() |
| options.add_argument('--no-sandbox') |
| options.add_argument('-headless') |
| service = Service() |
| driver = webdriver.Chrome(options= options,service=service) |
| |
| |
| global mycol |
| |
| |
| if True: |
| |
| try: |
| driver.get("https://talkai.info/chat/") |
| element = driver.find_element(By.CSS_SELECTOR,".chat") |
| api_key = element.get_attribute("data-api-key") |
| dict={"key":"open-ai"} |
| mycol.delete_one(dict) |
| dict={"key":"open-ai","value":api_key} |
| mycol.insert_one(dict) |
| print(api_key) |
| driver.delete_all_cookies() |
| driver.quit() |
| |
| except Exception as e: |
| print('error in extract ',e) |
| pass |
| |
|
|
| |
|
|
| from queue import Queue |
| chatq = Queue() |
| imgq= Queue() |
|
|
|
|
| def makeqchat(): |
|
|
| while chatq.qsize()<2: |
| print("appending in chat queue") |
| options = ChromeOptions() |
| options.add_argument('--no-sandbox') |
| options.add_argument('-headless') |
| service = Service() |
| driver = webdriver.Chrome(options= options,service=service) |
| driver.get("https://talkai.info/chat/") |
| chatq.put(driver) |
| |
| |
|
|
| def makeqimg(): |
|
|
| while imgq.qsize()<2: |
| print("appending in img queue") |
| options = ChromeOptions() |
| options.add_argument('--no-sandbox') |
| options.add_argument('-headless') |
| service = Service() |
| driver = webdriver.Chrome(options= options,service=service) |
| driver.get("https://talkai.info/image/") |
| imgq.put(driver) |
| |
| |
|
|
|
|
|
|
|
|
|
|
| @app.post("/") |
| async def get_answer(request: Request ): |
| data = await request.json() |
| |
| text = data['text'] |
| host= '' |
|
|
| temperature=-1 |
|
|
| try: |
| temperature= data['temperature'] |
| temperature= float(temperature) |
| temperature= round(temperature,1) |
| |
| except: |
| print("No temperature") |
| |
|
|
| |
| |
| |
| |
|
|
| id= '' |
| |
| |
| |
| |
|
|
| res= do_ML(id,text,host,0,temperature) |
| |
|
|
| dict={"ChatGPT":res} |
| |
| |
| |
| return JSONResponse(dict) |
|
|
|
|
|
|
|
|
|
|
|
|
| def do_ML(id:str,text:str,host:str, trycount:int,temperature:float): |
| |
| try: |
| starttime=time.time() |
| options = ChromeOptions() |
| options.add_argument('--no-sandbox') |
| options.add_argument('-headless') |
| service = Service() |
| driver = webdriver.Chrome(options= options,service=service) |
| driver.get("https://talkai.info/chat/") |
| if temperature>=0 and temperature<=2: |
| try: |
| print("setting temperature ",temperature) |
| while True: |
| currtime= time.time() |
| if(currtime>starttime+10): |
| return "Requested Could not be proceed" |
| |
| try: |
| setting_button = driver.find_element(By.ID, "openSettings") |
| setting_button.click() |
| break |
| except: |
| time.sleep(0.2) |
|
|
| while True: |
| currtime= time.time() |
| if(currtime>starttime+10): |
| return "Requested Could not be proceed" |
| try: |
| input_element = driver.find_element(By.CLASS_NAME,"styled-slider") |
| new_value = temperature |
| driver.execute_script("arguments[0].value = arguments[1]", input_element, new_value) |
| break |
| except: |
| time.sleep(0.2) |
| while True: |
| currtime= time.time() |
| if(currtime>starttime+10): |
| return "Requested Could not be proceed" |
| try: |
| confirm_button = driver.find_element(By.CLASS_NAME, "settingsButtonConfirm") |
| confirm_button.click() |
| break |
| except: |
| time.sleep(0.2) |
| except: |
| print("could not set temperature") |
|
|
|
|
| while True: |
| currtime= time.time() |
| if(currtime>starttime+10): |
| return "Requested Could not be proceed" |
| try: |
| textarea = driver.find_element(By.CSS_SELECTOR, "textarea") |
| textarea.send_keys(text) |
| |
| button = driver.find_element(By.CLASS_NAME, "sectionChatFormButton") |
| button.click() |
| break |
| except: |
| time.sleep(0.2) |
|
|
| |
| prev ="" |
| |
| |
| while True: |
| time.sleep(0.2) |
| currtime= time.time() |
| if(currtime>starttime+18.5): |
| |
| return "Requested Could not be proceed" |
| |
| value="" |
| try: |
| messages = driver.find_elements(By.CLASS_NAME, 'messageContain') |
| last_message_contain = messages[len(messages)-2] |
| value = last_message_contain.text |
| value = value[8:len(value)] |
| print(value) |
| if value=="Please, wait...": |
| continue |
| except: |
| continue |
| |
| |
| |
| driver.delete_all_cookies() |
| driver.quit() |
| return value |
| |
| |
|
|
| except: |
| print("Error") |
| driver.delete_all_cookies() |
| if trycount>3: |
| |
| return |
| driver.quit() |
| return do_ML(id,text,host,trycount+1) |
| |
| |
|
|
|
|
|
|
|
|
| @app.post("/image") |
| async def get_answer(q: Query ): |
| |
| text = q.text |
| host= q.host |
|
|
| |
| |
| |
| |
|
|
| id= '' |
| |
| |
| |
|
|
| url = do_ML2(id,text,host,0) |
| |
| dict= {"url":url} |
|
|
| |
| |
| |
| |
| return JSONResponse(dict) |
|
|
| |
| |
|
|
|
|
| def do_ML2(id:str,text:str,host:str, trycount:int): |
| |
| try: |
| starttime=time.time() |
| |
| options = ChromeOptions() |
| options.add_argument('--no-sandbox') |
| options.add_argument('-headless') |
| service = Service() |
| driver = webdriver.Chrome(options= options,service=service) |
| driver.get("https://talkai.info/image/") |
| while True: |
| currtime= time.time() |
| if(currtime>starttime+10): |
| return "Requested Could not be proceed" |
| try: |
| textarea = driver.find_element(By.CSS_SELECTOR, "textarea") |
| textarea.send_keys(text) |
| time.sleep(0.1) |
| button = driver.find_element(By.CLASS_NAME, "sectionChatFormButton") |
| button.click() |
| break |
| except: |
| time.sleep(0.2) |
| |
| |
| while True: |
| currtime= time.time() |
| if(currtime>starttime+10): |
| return "Requested Could not be proceed" |
| |
| time.sleep(0.2) |
| currtime= time.time() |
| if(currtime>starttime+18.5): |
| |
| return "Request Could not be proceed" |
| try: |
| messages = driver.find_elements(By.XPATH, "//div[@class='messageContain']/p/img") |
| last_message_contain = messages[len(messages)-2] |
| src = last_message_contain.get_attribute("src") |
| print(src) |
|
|
| driver.delete_all_cookies() |
| driver.quit() |
| |
| return src |
| break |
| except: |
| continue |
| |
| except: |
| print("Error") |
| driver.delete_all_cookies() |
| if trycount>1: |
| |
| return "Request Could not be proceed" |
| driver.quit() |
| return do_ML2(id,text,host,trycount+1) |
| |
| |
|
|
| |
| |
|
|
|
|
|
|