Spaces:
Sleeping
Sleeping
| import json | |
| import asyncio | |
| from fastapi import FastAPI #, HTTPException | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel | |
| from typing import List | |
| from langchain_core.messages import HumanMessage, AIMessage | |
| # Import our compiled LangGraph agent | |
| from src.agent.graph import app as agent_app | |
| from src.api.cache import check_cache, save_to_cache | |
| # 1. Initialize the FastAPI Server | |
| app = FastAPI( | |
| title="OmniRouter Streaming API Agent", | |
| description="Enterprise RAG Agent powered by LangGraph and FastAPI", | |
| version="1.0.1" | |
| ) | |
| # 2. Define our Request Schema using Pydantic | |
| class ChatRequest(BaseModel): | |
| query: str | |
| async def stream_generator(query: str): | |
| # ========================================== | |
| # 1. THE CACHE LAYER (Lightning Fast) | |
| # ========================================== | |
| cached_answer = check_cache(query) | |
| if cached_answer: | |
| # We chop the cached string into words and stream them instantly | |
| for word in cached_answer.split(" "): | |
| yield f"data: {json.dumps({'token': word + ' '})}\n\n" | |
| # We add a tiny 20ms sleep just to preserve the "typewriter" feel for the user | |
| await asyncio.sleep(0.02) | |
| return # EXIT EARLY! The LLM is never triggered. | |
| """ | |
| An async generator that yields tokens from the LangGraph agent | |
| in a format compatible with Server-Sent Events (SSE). | |
| """ | |
| # ========================================== | |
| # 2. THE AGENT LAYER (Heavy Compute) | |
| # ========================================== | |
| initial_state = {"messages": [HumanMessage(content=query)]} | |
| full_answer = "" # We need to collect the tokens to save them later | |
| # .astream_events is the key to deep-access streaming in LangChain/LangGraph | |
| async for event in agent_app.astream_events(initial_state, version="v1"): | |
| kind = event["event"] | |
| # We are looking for the 'on_chat_model_stream' event | |
| # This triggers every time a new token is generated by the LLM | |
| if kind == "on_chat_model_stream": | |
| content = event["data"]["chunk"].content | |
| if content: | |
| full_answer += content | |
| # SSE format requires the "data: " prefix | |
| yield f"data: {json.dumps({'token': content})}\n\n" | |
| # ========================================== | |
| # 3. SAVE FOR THE FUTURE | |
| # ========================================== | |
| # Only cache if we got an answer, AND the answer isn't our fallback failure phrase | |
| failure_phrase = "I do not have enough information" | |
| if full_answer and failure_phrase not in full_answer: | |
| save_to_cache(query, full_answer) | |
| else: | |
| print("\n⚠️ [CACHE SKIP] Agent failed to answer. Did not poison the cache.") | |
| async def chat_streaming_endpoint(request: ChatRequest): | |
| return StreamingResponse( | |
| stream_generator(request.query), | |
| media_type="text/event-stream" | |
| ) |