omnirouter-api / src /api /server.py
sumitrwk's picture
Upload 33 files
b534a53 verified
import json
import asyncio
from fastapi import FastAPI #, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import List
from langchain_core.messages import HumanMessage, AIMessage
# Import our compiled LangGraph agent
from src.agent.graph import app as agent_app
from src.api.cache import check_cache, save_to_cache
# 1. Initialize the FastAPI Server
app = FastAPI(
title="OmniRouter Streaming API Agent",
description="Enterprise RAG Agent powered by LangGraph and FastAPI",
version="1.0.1"
)
# 2. Define our Request Schema using Pydantic
class ChatRequest(BaseModel):
query: str
async def stream_generator(query: str):
# ==========================================
# 1. THE CACHE LAYER (Lightning Fast)
# ==========================================
cached_answer = check_cache(query)
if cached_answer:
# We chop the cached string into words and stream them instantly
for word in cached_answer.split(" "):
yield f"data: {json.dumps({'token': word + ' '})}\n\n"
# We add a tiny 20ms sleep just to preserve the "typewriter" feel for the user
await asyncio.sleep(0.02)
return # EXIT EARLY! The LLM is never triggered.
"""
An async generator that yields tokens from the LangGraph agent
in a format compatible with Server-Sent Events (SSE).
"""
# ==========================================
# 2. THE AGENT LAYER (Heavy Compute)
# ==========================================
initial_state = {"messages": [HumanMessage(content=query)]}
full_answer = "" # We need to collect the tokens to save them later
# .astream_events is the key to deep-access streaming in LangChain/LangGraph
async for event in agent_app.astream_events(initial_state, version="v1"):
kind = event["event"]
# We are looking for the 'on_chat_model_stream' event
# This triggers every time a new token is generated by the LLM
if kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
full_answer += content
# SSE format requires the "data: " prefix
yield f"data: {json.dumps({'token': content})}\n\n"
# ==========================================
# 3. SAVE FOR THE FUTURE
# ==========================================
# Only cache if we got an answer, AND the answer isn't our fallback failure phrase
failure_phrase = "I do not have enough information"
if full_answer and failure_phrase not in full_answer:
save_to_cache(query, full_answer)
else:
print("\n⚠️ [CACHE SKIP] Agent failed to answer. Did not poison the cache.")
@app.post("/chat/stream")
async def chat_streaming_endpoint(request: ChatRequest):
return StreamingResponse(
stream_generator(request.query),
media_type="text/event-stream"
)