| import argparse |
| import json |
| import os |
| import shlex |
| import subprocess |
| from contextlib import asynccontextmanager |
| from typing import Any, Dict |
|
|
| import aiohttp |
| from bot_constants import ( |
| MAX_SESSION_TIME, |
| REQUIRED_ENV_VARS, |
| ) |
| from bot_registry import BotRegistry |
| from bot_runner_helpers import ( |
| determine_room_capabilities, |
| ensure_prompt_config, |
| process_dialin_request, |
| ) |
| from fastapi import FastAPI, HTTPException, Request |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import JSONResponse |
|
|
| from pipecat.transports.services.helpers.daily_rest import ( |
| DailyRESTHelper, |
| DailyRoomParams, |
| DailyRoomProperties, |
| DailyRoomSipParams, |
| ) |
|
|
| daily_helpers = {} |
| bot_registry = BotRegistry() |
|
|
| async def create_daily_room(room_url: str = None, config_body: Dict[str, Any] = None): |
| if not room_url: |
| capabilities = determine_room_capabilities(config_body) |
| sip_params = None |
| if capabilities["enable_dialin"]: |
| sip_params = DailyRoomSipParams( |
| display_name="dialin-user", video=False, sip_mode="dial-in", num_endpoints=2 |
| ) |
| properties = DailyRoomProperties(sip=sip_params) |
| if capabilities["enable_dialout"]: |
| properties.enable_dialout = True |
| capability_str = ", ".join([f"{k}={v}" for k, v in capabilities.items()]) |
| print(f"Creating room with capabilities: {capability_str}") |
| params = DailyRoomParams(properties=properties) |
| print("Creating new room...") |
| room = await daily_helpers["rest"].create_room(params=params) |
| else: |
| try: |
| room = await daily_helpers["rest"].get_room_from_url(room_url) |
| except Exception: |
| raise HTTPException(status_code=500, detail=f"Room not found: {room_url}") |
| print(f"Daily room: {room.url} {room.config.sip_endpoint}") |
| token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME) |
| if not room or not token: |
| raise HTTPException(status_code=500, detail="Failed to get room or token") |
| return {"room": room.url, "token": token, "sip_endpoint": room.config.sip_endpoint} |
|
|
| async def start_bot(room_details: Dict[str, str], body: Dict[str, Any], example: str) -> bool: |
| room_url = room_details["room"] |
| token = room_details["token"] |
| body_json = json.dumps(body).replace('"', '\\"') |
| print(f"++++ Body JSON: {body_json}") |
| bot_proc = f'python3 -m {example} -u {room_url} -t {token} -b "{body_json}"' |
| print(f"Starting bot. Example: {example}, Room: {room_url}") |
| try: |
| command_parts = shlex.split(bot_proc) |
| subprocess.Popen(command_parts, bufsize=1, cwd=os.path.dirname(os.path.abspath(__file__))) |
| return True |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}") |
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| aiohttp_session = aiohttp.ClientSession() |
| daily_helpers["rest"] = DailyRESTHelper( |
| daily_api_key=os.environ.get("HF_DAILY_API_KEY", ""), |
| daily_api_url=os.environ.get("DAILY_API_URL", "https://api.daily.co/v1"), |
| aiohttp_session=aiohttp_session, |
| ) |
| yield |
| await aiohttp_session.close() |
|
|
| app = FastAPI(lifespan=lifespan) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| @app.post("/start") |
| async def handle_start_request(request: Request) -> JSONResponse: |
| room_url = os.environ.get("HF_DAILY_SAMPLE_ROOM_URL", None) |
| try: |
| data = await request.json() |
| if "test" in data: |
| return JSONResponse({"test": True}) |
| if all(key in data for key in ["From", "To", "callId", "callDomain"]): |
| body = await process_dialin_request(data) |
| elif "config" in data: |
| body = bot_registry.setup_configuration(data["config"]) |
| else: |
| raise HTTPException(status_code=400, detail="Invalid request format") |
| body = ensure_prompt_config(body) |
| bot_type_name = bot_registry.detect_bot_type(body) |
| if not bot_type_name: |
| raise HTTPException(status_code=400, detail="Configuration doesn't match any supported scenario") |
| room_details = await create_daily_room(room_url, body) |
| await start_bot(room_details, body, bot_type_name) |
| bot_type = bot_registry.get_bot(bot_type_name) |
| response = {"status": "Bot started", "bot_type": bot_type_name} |
| if bot_type.has_test_mode(body): |
| response["room_url"] = room_details["room"] |
| if "llm" in body: |
| response["llm_provider"] = body["llm"] |
| if "dialout_settings" in body and len(body["dialout_settings"]) > 0: |
| first_setting = body["dialout_settings"][0] |
| if "phoneNumber" in first_setting: |
| response["dialing_to"] = f"phone:{first_setting['phoneNumber']}" |
| elif "sipUri" in first_setting: |
| response["dialing_to"] = f"sip:{first_setting['sipUri']}" |
| return JSONResponse(response) |
| except json.JSONDecodeError: |
| raise HTTPException(status_code=400, detail="Invalid JSON in request body") |
| except Exception as e: |
| raise HTTPException(status_code=400, detail=f"Request processing error: {str(e)}") |
|
|
| if __name__ == "__main__": |
| for env_var in REQUIRED_ENV_VARS: |
| hf_env_var = f"HF_{env_var}" |
| if hf_env_var not in os.environ: |
| raise Exception(f"Missing environment variable: {hf_env_var}.") |
| parser = argparse.ArgumentParser(description="Pipecat Bot Runner") |
| parser.add_argument("--host", type=str, default=os.environ.get("HOST", "0.0.0.0"), help="Host address") |
| parser.add_argument("--port", type=int, default=os.environ.get("PORT", 7860), help="Port number") |
| parser.add_argument("--reload", action="store_true", default=True, help="Reload code on change") |
| config = parser.parse_args() |
| try: |
| import uvicorn |
| uvicorn.run("bot_runner:app", host=config.host, port=config.port, reload=config.reload) |
| except KeyboardInterrupt: |
| print("Pipecat runner shutting down...") |