import json import asyncio from fastapi import FastAPI #, HTTPException from fastapi.responses import StreamingResponse from pydantic import BaseModel from typing import List from langchain_core.messages import HumanMessage, AIMessage # Import our compiled LangGraph agent from src.agent.graph import app as agent_app from src.api.cache import check_cache, save_to_cache # 1. Initialize the FastAPI Server app = FastAPI( title="OmniRouter Streaming API Agent", description="Enterprise RAG Agent powered by LangGraph and FastAPI", version="1.0.1" ) # 2. Define our Request Schema using Pydantic class ChatRequest(BaseModel): query: str async def stream_generator(query: str): # ========================================== # 1. THE CACHE LAYER (Lightning Fast) # ========================================== cached_answer = check_cache(query) if cached_answer: # We chop the cached string into words and stream them instantly for word in cached_answer.split(" "): yield f"data: {json.dumps({'token': word + ' '})}\n\n" # We add a tiny 20ms sleep just to preserve the "typewriter" feel for the user await asyncio.sleep(0.02) return # EXIT EARLY! The LLM is never triggered. """ An async generator that yields tokens from the LangGraph agent in a format compatible with Server-Sent Events (SSE). """ # ========================================== # 2. THE AGENT LAYER (Heavy Compute) # ========================================== initial_state = {"messages": [HumanMessage(content=query)]} full_answer = "" # We need to collect the tokens to save them later # .astream_events is the key to deep-access streaming in LangChain/LangGraph async for event in agent_app.astream_events(initial_state, version="v1"): kind = event["event"] # We are looking for the 'on_chat_model_stream' event # This triggers every time a new token is generated by the LLM if kind == "on_chat_model_stream": content = event["data"]["chunk"].content if content: full_answer += content # SSE format requires the "data: " prefix yield f"data: {json.dumps({'token': content})}\n\n" # ========================================== # 3. SAVE FOR THE FUTURE # ========================================== # Only cache if we got an answer, AND the answer isn't our fallback failure phrase failure_phrase = "I do not have enough information" if full_answer and failure_phrase not in full_answer: save_to_cache(query, full_answer) else: print("\n⚠️ [CACHE SKIP] Agent failed to answer. Did not poison the cache.") @app.post("/chat/stream") async def chat_streaming_endpoint(request: ChatRequest): return StreamingResponse( stream_generator(request.query), media_type="text/event-stream" )