| from fastapi import APIRouter, UploadFile, File, HTTPException, Query |
| from fastapi.responses import StreamingResponse |
| from pydantic import BaseModel |
| from typing import Optional |
| import tempfile |
| import os |
| from pathlib import Path |
| import uuid |
| import io |
|
|
| from services.stt_service import speech_to_text |
| from services.tts_service import text_to_speech |
| from services.chat_service import generate_chat_response |
| from models.voice_chat import TextChatRequest, VoiceChatResponse |
|
|
| router = APIRouter(prefix="/voice-chat", tags=["Voice Chat"]) |
|
|
| |
| audio_cache = {} |
|
|
| @router.post("/voice", response_model=VoiceChatResponse) |
| async def voice_chat_endpoint( |
| file: UploadFile = File(...), |
| conversation_id: Optional[str] = Query(None) |
| ): |
| """ |
| Complete voice chat endpoint (English only): |
| 1. STT: Audio β Text |
| 2. Chatbot: Text β Response |
| 3. TTS: Response β Audio |
| """ |
| |
| if not file.content_type or not file.content_type.startswith('audio/'): |
| raise HTTPException( |
| status_code=400, |
| detail=f"File must be an audio file. Received: {file.content_type}" |
| ) |
| |
| |
| if not conversation_id: |
| conversation_id = str(uuid.uuid4()) |
| |
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_file: |
| temp_path = temp_file.name |
| content = await file.read() |
| |
| if len(content) == 0: |
| os.unlink(temp_path) |
| raise HTTPException(status_code=400, detail="Audio file is empty") |
| |
| temp_file.write(content) |
| |
| try: |
| |
| user_text = speech_to_text(temp_path) |
| |
| if not user_text or user_text.strip() == "": |
| raise HTTPException( |
| status_code=400, |
| detail="No speech detected in audio." |
| ) |
| |
| print(f"π€ STT Result: {user_text}") |
| |
| |
| chatbot_response = generate_chat_response( |
| user_input=user_text, |
| conversation_id=conversation_id |
| ) |
| |
| print(f"π€ Chatbot Response: {chatbot_response}") |
| |
| |
| audio_path = text_to_speech( |
| text=chatbot_response, |
| voice="Aaliyah-PlayAI", |
| fmt="wav" |
| ) |
| |
| |
| with open(audio_path, "rb") as audio_file: |
| audio_data = audio_file.read() |
| |
| audio_cache[conversation_id] = { |
| "audio": audio_data, |
| "text": chatbot_response |
| } |
| |
| |
| os.unlink(temp_path) |
| if Path(audio_path).exists(): |
| os.unlink(audio_path) |
| |
| |
| return VoiceChatResponse( |
| text_response=chatbot_response, |
| audio_url=f"/voice-chat/audio/{conversation_id}", |
| conversation_id=conversation_id |
| ) |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| |
| if os.path.exists(temp_path): |
| os.unlink(temp_path) |
| |
| import traceback |
| error_details = traceback.format_exc() |
| print(f"β Error in voice_chat_endpoint: {error_details}") |
| |
| raise HTTPException( |
| status_code=500, |
| detail=f"Error during voice processing: {str(e)}" |
| ) |
|
|
| @router.post("/text", response_model=VoiceChatResponse) |
| async def text_chat_endpoint(request: TextChatRequest): |
| """ |
| Text chat with audio response (English only) |
| For users who prefer to type but hear the response |
| """ |
| try: |
| |
| if not request.conversation_id: |
| conversation_id = str(uuid.uuid4()) |
| else: |
| conversation_id = request.conversation_id |
| |
| |
| if not request.text or request.text.strip() == "": |
| raise HTTPException(status_code=400, detail="Text cannot be empty") |
| |
| print(f"π Text received: {request.text}") |
| |
| |
| chatbot_response = generate_chat_response( |
| user_input=request.text, |
| conversation_id=conversation_id |
| ) |
| |
| print(f"π€ Chatbot Response: {chatbot_response}") |
| |
| |
| audio_path = text_to_speech( |
| text=chatbot_response, |
| voice="Aaliyah-PlayAI", |
| fmt="wav" |
| ) |
| |
| |
| with open(audio_path, "rb") as audio_file: |
| audio_data = audio_file.read() |
| |
| audio_cache[conversation_id] = { |
| "audio": audio_data, |
| "text": chatbot_response |
| } |
| |
| |
| if Path(audio_path).exists(): |
| os.unlink(audio_path) |
| |
| |
| return VoiceChatResponse( |
| text_response=chatbot_response, |
| audio_url=f"/voice-chat/audio/{conversation_id}", |
| conversation_id=conversation_id |
| ) |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| import traceback |
| error_details = traceback.format_exc() |
| print(f"β Error in text_chat_endpoint: {error_details}") |
| |
| raise HTTPException( |
| status_code=500, |
| detail=f"Error during chat: {str(e)}" |
| ) |
|
|
| @router.get("/audio/{conversation_id}") |
| async def get_audio_stream(conversation_id: str): |
| """ |
| Stream audio of the last response |
| """ |
| if conversation_id not in audio_cache: |
| raise HTTPException( |
| status_code=404, |
| detail=f"No audio found for conversation {conversation_id}" |
| ) |
| |
| audio_data = audio_cache[conversation_id]["audio"] |
| |
| return StreamingResponse( |
| io.BytesIO(audio_data), |
| media_type="audio/wav", |
| headers={ |
| "Content-Disposition": f"attachment; filename=response_{conversation_id[:8]}.wav" |
| } |
| ) |
|
|
| @router.get("/test") |
| async def test_endpoint(): |
| """ |
| Test endpoint to verify API is working |
| """ |
| return { |
| "status": "ok", |
| "message": "Voice Chat API is working (English only)", |
| "endpoints": { |
| "POST /voice-chat/voice": "Voice input β Voice response", |
| "POST /voice-chat/text": "Text input β Voice response", |
| "GET /voice-chat/audio/{id}": "Get audio response", |
| "POST /stt/": "Speech to text", |
| "POST /tts/": "Text to speech" |
| } |
| } |