| import base64 |
| import json |
| import os |
| import secrets |
| import string |
| import time |
| import tempfile |
| import ast |
| from typing import List, Optional, Union, Any |
|
|
| import httpx |
| from dotenv import load_dotenv |
| from fastapi import FastAPI, HTTPException |
| from fastapi.responses import JSONResponse, StreamingResponse |
| from pydantic import BaseModel, Field, model_validator |
|
|
| |
| from gradio_client import Client, handle_file |
|
|
| |
| load_dotenv() |
|
|
| |
| IMAGE_API_URL = os.environ.get("IMAGE_API_URL", "https://image.api.example.com") |
| SNAPZION_UPLOAD_URL = "https://upload.snapzion.com/api/public-upload" |
| SNAPZION_API_KEY = os.environ.get("SNAP", "") |
| CHAT_API_URL = "https://www.chatwithmono.xyz/api/chat" |
| IMAGE_GEN_API_URL = "https://www.chatwithmono.xyz/api/image" |
| MODERATION_API_URL = "https://www.chatwithmono.xyz/api/moderation" |
|
|
| |
| AVAILABLE_MODELS = [ |
| {"id": "gpt-4-turbo", "object": "model", "created": int(time.time()), "owned_by": "system"}, |
| {"id": "gpt-4o", "object": "model", "created": int(time.time()), "owned_by": "system"}, |
| {"id": "gpt-3.5-turbo", "object": "model", "created": int(time.time()), "owned_by": "system"}, |
| {"id": "dall-e-3", "object": "model", "created": int(time.time()), "owned_by": "system"}, |
| {"id": "text-moderation-stable", "object": "model", "created": int(time.time()), "owned_by": "system"}, |
| {"id": "florence-2-ocr", "object": "model", "created": int(time.time()), "owned_by": "system"}, |
| ] |
| MODEL_ALIASES = {} |
|
|
| |
| app = FastAPI( |
| title="OpenAI Compatible API", |
| description="An adapter for various services to be compatible with the OpenAI API specification.", |
| version="1.1.3" |
| ) |
|
|
| |
| try: |
| ocr_client = Client("multimodalart/Florence-2-l4") |
| except Exception as e: |
| print(f"Warning: Could not initialize Gradio client for OCR: {e}") |
| ocr_client = None |
|
|
|
|
| |
| class Message(BaseModel): |
| role: str |
| content: str |
|
|
| class ChatRequest(BaseModel): |
| messages: List[Message] |
| model: str |
| stream: Optional[bool] = False |
| tools: Optional[Any] = None |
|
|
| class ImageGenerationRequest(BaseModel): |
| prompt: str |
| aspect_ratio: Optional[str] = "1:1" |
| n: Optional[int] = 1 |
| user: Optional[str] = None |
| model: Optional[str] = "default" |
|
|
| class ModerationRequest(BaseModel): |
| input: Union[str, List[str]] |
| model: Optional[str] = "text-moderation-stable" |
|
|
| class OcrRequest(BaseModel): |
| image_url: Optional[str] = Field(None, description="URL of the image to process.") |
| image_b64: Optional[str] = Field(None, description="Base64 encoded string of the image to process.") |
|
|
| @model_validator(mode='before') |
| @classmethod |
| def check_sources(cls, data: Any) -> Any: |
| if isinstance(data, dict): |
| if not (data.get('image_url') or data.get('image_b64')): |
| raise ValueError('Either image_url or image_b64 must be provided.') |
| if data.get('image_url') and data.get('image_b64'): |
| raise ValueError('Provide either image_url or image_b64, not both.') |
| return data |
|
|
| class OcrResponse(BaseModel): |
| ocr_text: str |
| raw_response: dict |
|
|
|
|
| |
| def generate_random_id(prefix: str, length: int = 29) -> str: |
| """Generates a cryptographically secure, random alphanumeric ID.""" |
| population = string.ascii_letters + string.digits |
| random_part = "".join(secrets.choice(population) for _ in range(length)) |
| return f"{prefix}{random_part}" |
|
|
|
|
| |
|
|
| @app.get("/v1/models", tags=["Models"]) |
| async def list_models(): |
| """Lists the available models.""" |
| return {"object": "list", "data": AVAILABLE_MODELS} |
|
|
|
|
| @app.post("/v1/chat/completions", tags=["Chat"]) |
| async def chat_completion(request: ChatRequest): |
| """Handles chat completion requests, supporting streaming and non-streaming.""" |
| model_id = MODEL_ALIASES.get(request.model, request.model) |
| chat_id = generate_random_id("chatcmpl-") |
| headers = { |
| 'accept': 'text/event-stream', |
| 'content-type': 'application/json', |
| 'origin': 'https://www.chatwithmono.xyz', |
| 'referer': 'https://www.chatwithmono.xyz/', |
| 'user-agent': 'Mozilla/5.0', |
| } |
|
|
| if request.tools: |
| tool_prompt = f"""You have access to the following tools. To call a tool, please respond with JSON for a tool call within <tool_call></tool_call> XML tags. Respond in the format {{"name": tool name, "parameters": dictionary of argument name and its value}}. Do not use variables. |
| Tools: {";".join(f"<tool>{tool}</tool>" for tool in request.tools)} |
| Response Format for tool call: |
| <tool_call> |
| {{"name": <function-name>, "arguments": <args-json-object>}} |
| </tool_call>""" |
| if request.messages[0].role == "system": |
| request.messages[0].content += "\n\n" + tool_prompt |
| else: |
| request.messages.insert(0, Message(role="system", content=tool_prompt)) |
|
|
| payload = {"messages": [msg.model_dump() for msg in request.messages], "model": model_id} |
|
|
| if request.stream: |
| async def event_stream(): |
| created = int(time.time()) |
| usage_info = None |
| is_first_chunk = True |
| tool_call_buffer = "" |
| in_tool_call = False |
|
|
| try: |
| async with httpx.AsyncClient(timeout=120) as client: |
| async with client.stream("POST", CHAT_API_URL, headers=headers, json=payload) as response: |
| response.raise_for_status() |
| async for line in response.aiter_lines(): |
| if not line: |
| continue |
|
|
| if line.startswith("0:"): |
| try: |
| content_piece = json.loads(line[2:]) |
| except json.JSONDecodeError: |
| continue |
| |
| current_buffer = content_piece |
| if in_tool_call: |
| current_buffer = tool_call_buffer + content_piece |
|
|
| if "</tool_call>" in current_buffer: |
| tool_str = current_buffer.split("<tool_call>")[1].split("</tool_call>")[0] |
| tool_json = json.loads(tool_str.strip()) |
| delta = { |
| "content": None, |
| "tool_calls": [{"index": 0, "id": generate_random_id("call_"), "type": "function", |
| "function": {"name": tool_json["name"], "arguments": json.dumps(tool_json["parameters"])}}] |
| } |
| chunk = {"id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id, |
| "choices": [{"index": 0, "delta": delta, "finish_reason": None}], "usage": None} |
| yield f"data: {json.dumps(chunk)}\n\n" |
| |
| in_tool_call = False |
| tool_call_buffer = "" |
| remaining_text = current_buffer.split("</tool_call>", 1)[1] |
| if remaining_text: |
| content_piece = remaining_text |
| else: |
| continue |
|
|
| if "<tool_call>" in content_piece: |
| in_tool_call = True |
| tool_call_buffer += content_piece.split("<tool_call>", 1)[1] |
| text_before = content_piece.split("<tool_call>", 1)[0] |
| if text_before: |
| delta = {"content": text_before, "tool_calls": None} |
| chunk = {"id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id, |
| "choices": [{"index": 0, "delta": delta, "finish_reason": None}], "usage": None} |
| yield f"data: {json.dumps(chunk)}\n\n" |
| if "</tool_call>" not in tool_call_buffer: |
| continue |
| |
| if not in_tool_call: |
| delta = {"content": content_piece} |
| if is_first_chunk: |
| delta["role"] = "assistant" |
| is_first_chunk = False |
| chunk = {"id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id, |
| "choices": [{"index": 0, "delta": delta, "finish_reason": None}], "usage": None} |
| yield f"data: {json.dumps(chunk)}\n\n" |
|
|
| elif line.startswith(("e:", "d:")): |
| try: |
| usage_info = json.loads(line[2:]).get("usage") |
| except (json.JSONDecodeError, AttributeError): |
| pass |
| break |
| |
| final_usage = None |
| if usage_info: |
| prompt_tokens = usage_info.get("promptTokens", 0) |
| completion_tokens = usage_info.get("completionTokens", 0) |
| final_usage = { |
| "prompt_tokens": prompt_tokens, |
| "completion_tokens": completion_tokens, |
| "total_tokens": prompt_tokens + completion_tokens |
| } |
| |
| finish_reason = "tool_calls" if in_tool_call else "stop" |
| done_chunk = {"id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id, |
| "choices": [{"index": 0, "delta": {}, "finish_reason": finish_reason}], "usage": final_usage} |
| yield f"data: {json.dumps(done_chunk)}\n\n" |
|
|
| except httpx.HTTPStatusError as e: |
| error_content = {"error": {"message": f"Upstream API error: {e.response.status_code}. Details: {e.response.text}", "type": "upstream_error", "code": str(e.response.status_code)}} |
| yield f"data: {json.dumps(error_content)}\n\n" |
| finally: |
| yield "data: [DONE]\n\n" |
| |
| return StreamingResponse(event_stream(), media_type="text/event-stream") |
| |
| else: |
| full_response, usage_info = "", {} |
| try: |
| async with httpx.AsyncClient(timeout=120) as client: |
| async with client.stream("POST", CHAT_API_URL, headers=headers, json=payload) as response: |
| response.raise_for_status() |
| async for chunk in response.aiter_lines(): |
| if chunk.startswith("0:"): |
| try: |
| full_response += json.loads(chunk[2:]) |
| except: |
| continue |
| elif chunk.startswith(("e:", "d:")): |
| try: |
| usage_info = json.loads(chunk[2:]).get("usage", {}) |
| except: |
| continue |
|
|
| tool_calls = None |
| content_response = full_response |
| finish_reason = "stop" |
| if "<tool_call>" in full_response and "</tool_call>" in full_response: |
| tool_call_str = full_response.split("<tool_call>")[1].split("</tool_call>")[0] |
| tool_call = json.loads(tool_call_str.strip()) |
| tool_calls = [{ |
| "id": generate_random_id("call_"), |
| "type": "function", |
| "function": { |
| "name": tool_call["name"], |
| "arguments": json.dumps(tool_call["parameters"]) |
| } |
| }] |
| content_response = None |
| finish_reason = "tool_calls" |
|
|
| prompt_tokens = usage_info.get("promptTokens", 0) |
| completion_tokens = usage_info.get("completionTokens", 0) |
| |
| return JSONResponse(content={ |
| "id": chat_id, |
| "object": "chat.completion", |
| "created": int(time.time()), |
| "model": model_id, |
| "choices": [{ |
| "index": 0, |
| "message": { |
| "role": "assistant", |
| "content": content_response, |
| "tool_calls": tool_calls |
| }, |
| "finish_reason": finish_reason |
| }], |
| "usage": { |
| "prompt_tokens": prompt_tokens, |
| "completion_tokens": completion_tokens, |
| "total_tokens": prompt_tokens + completion_tokens |
| } |
| }) |
| except httpx.HTTPStatusError as e: |
| return JSONResponse( |
| status_code=e.response.status_code, |
| content={"error": {"message": f"Upstream API error. Details: {e.response.text}", "type": "upstream_error"}} |
| ) |
|
|
|
|
| @app.post("/v1/images/generations", tags=["Images"]) |
| async def generate_images(request: ImageGenerationRequest): |
| """Handles image generation requests.""" |
| results = [] |
| try: |
| async with httpx.AsyncClient(timeout=120) as client: |
| for _ in range(request.n): |
| model = request.model or "default" |
| if model in ["gpt-image-1", "dall-e-3", "dall-e-2", "nextlm-image-1"]: |
| headers = {'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0', 'Referer': 'https://www.chatwithmono.xyz/'} |
| payload = {"prompt": request.prompt, "model": model} |
| resp = await client.post(IMAGE_GEN_API_URL, headers=headers, json=payload) |
| resp.raise_for_status() |
| data = resp.json() |
| b64_image = data.get("image") |
| if not b64_image: |
| return JSONResponse(status_code=502, content={"error": "Missing base64 image in response"}) |
| |
| image_url = f"data:image/png;base64,{b64_image}" |
| if SNAPZION_API_KEY: |
| upload_headers = {"Authorization": SNAPZION_API_KEY} |
| upload_files = {'file': ('image.png', base64.b64decode(b64_image), 'image/png')} |
| upload_resp = await client.post(SNAPZION_UPLOAD_URL, headers=upload_headers, files=upload_files) |
| if upload_resp.status_code == 200: |
| image_url = upload_resp.json().get("url", image_url) |
| |
| results.append({"url": image_url, "b64_json": b64_image, "revised_prompt": data.get("revised_prompt")}) |
| else: |
| params = {"prompt": request.prompt, "aspect_ratio": request.aspect_ratio, "link": "typegpt.net"} |
| resp = await client.get(IMAGE_API_URL, params=params) |
| resp.raise_for_status() |
| data = resp.json() |
| results.append({"url": data.get("image_link"), "b64_json": data.get("base64_output")}) |
| except httpx.HTTPStatusError as e: |
| return JSONResponse(status_code=502, content={"error": f"Image generation failed. Upstream error: {e.response.status_code}", "details": e.response.text}) |
| except Exception as e: |
| return JSONResponse(status_code=500, content={"error": "An internal error occurred.", "details": str(e)}) |
|
|
| return {"created": int(time.time()), "data": results} |
|
|
|
|
| @app.post("/v1/ocr", response_model=OcrResponse, tags=["OCR"]) |
| async def perform_ocr(request: OcrRequest): |
| """ |
| Performs Optical Character Recognition (OCR) on an image using the Florence-2 model. |
| Provide an image via a URL or a base64 encoded string. |
| """ |
| if not ocr_client: |
| raise HTTPException(status_code=503, detail="OCR service is not available. Gradio client failed to initialize.") |
|
|
| image_path, temp_file_path = None, None |
| try: |
| if request.image_url: |
| image_path = request.image_url |
| elif request.image_b64: |
| image_bytes = base64.b64decode(request.image_b64) |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file: |
| temp_file.write(image_bytes) |
| temp_file_path = temp_file.name |
| image_path = temp_file_path |
|
|
| prediction = ocr_client.predict(image=handle_file(image_path), task_prompt="OCR", api_name="/process_image") |
| |
| if not prediction or not isinstance(prediction, tuple) or len(prediction) == 0: |
| raise HTTPException(status_code=502, detail="Invalid or empty response from OCR service.") |
| |
| raw_output = prediction[0] |
| raw_result_dict = {} |
|
|
| |
| if isinstance(raw_output, str): |
| try: |
| |
| raw_result_dict = json.loads(raw_output) |
| except json.JSONDecodeError: |
| try: |
| |
| parsed_output = ast.literal_eval(raw_output) |
| if isinstance(parsed_output, dict): |
| raw_result_dict = parsed_output |
| else: |
| |
| raw_result_dict = {"result": str(parsed_output)} |
| except (ValueError, SyntaxError): |
| |
| raw_result_dict = {"ocr_text_from_string": raw_output} |
| elif isinstance(raw_output, dict): |
| |
| raw_result_dict = raw_output |
| else: |
| |
| raise HTTPException(status_code=502, detail=f"Unexpected data type from OCR service: {type(raw_output)}") |
|
|
| |
| ocr_text = raw_result_dict.get("OCR", |
| raw_result_dict.get("ocr_text_from_string", |
| str(raw_result_dict))) |
| |
| return OcrResponse(ocr_text=ocr_text, raw_response=raw_result_dict) |
|
|
| except Exception as e: |
| if isinstance(e, HTTPException): |
| raise e |
| raise HTTPException(status_code=500, detail=f"An error occurred during OCR processing: {str(e)}") |
| finally: |
| if temp_file_path and os.path.exists(temp_file_path): |
| os.unlink(temp_file_path) |
|
|
|
|
| @app.post("/v1/moderations", tags=["Moderation"]) |
| async def create_moderation(request: ModerationRequest): |
| """Handles moderation requests, conforming to the OpenAI API specification.""" |
| input_texts = [request.input] if isinstance(request.input, str) else request.input |
| if not input_texts: |
| return JSONResponse(status_code=400, content={"error": {"message": "Request must have at least one input string."}}) |
|
|
| headers = {'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0', 'Referer': 'https://www.chatwithmono.xyz/'} |
| results = [] |
| |
| try: |
| async with httpx.AsyncClient(timeout=30) as client: |
| for text_input in input_texts: |
| payload = {"text": text_input} |
| resp = await client.post(MODERATION_API_URL, headers=headers, json=payload) |
| resp.raise_for_status() |
| |
| upstream_data = resp.json() |
| upstream_categories = upstream_data.get("categories", {}) |
| |
| openai_categories = { |
| "hate": upstream_categories.get("hate", False), |
| "hate/threatening": False, |
| "harassment": False, |
| "harassment/threatening": False, |
| "self-harm": upstream_categories.get("self-harm", False), |
| "self-harm/intent": False, |
| "self-harm/instructions": False, |
| "sexual": upstream_categories.get("sexual", False), |
| "sexual/minors": False, |
| "violence": upstream_categories.get("violence", False), |
| "violence/graphic": False, |
| } |
| |
| result_item = { |
| "flagged": upstream_data.get("overall_sentiment") == "flagged", |
| "categories": openai_categories, |
| "category_scores": {k: 1.0 if v else 0.0 for k, v in openai_categories.items()}, |
| } |
| |
| if reason := upstream_data.get("reason"): |
| result_item["reason"] = reason |
| |
| results.append(result_item) |
|
|
| except httpx.HTTPStatusError as e: |
| return JSONResponse( |
| status_code=502, |
| content={"error": {"message": f"Moderation failed. Upstream error: {e.response.status_code}", "details": e.response.text}} |
| ) |
| except Exception as e: |
| return JSONResponse( |
| status_code=500, |
| content={"error": {"message": "An internal error occurred during moderation.", "details": str(e)}} |
| ) |
| |
| final_response = { |
| "id": generate_random_id("modr-"), |
| "model": request.model, |
| "results": results, |
| } |
| return JSONResponse(content=final_response) |
|
|
|
|
| |
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=8000) |