File size: 3,053 Bytes
b534a53
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import json
import asyncio
from fastapi import FastAPI  #, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import List
from langchain_core.messages import HumanMessage, AIMessage

# Import our compiled LangGraph agent
from src.agent.graph import app as agent_app
from src.api.cache import check_cache, save_to_cache

# 1. Initialize the FastAPI Server
app = FastAPI(
    title="OmniRouter Streaming API Agent",
    description="Enterprise RAG Agent powered by LangGraph and FastAPI",
    version="1.0.1"
)

# 2. Define our Request Schema using Pydantic
class ChatRequest(BaseModel):
    query: str

async def stream_generator(query: str):
    # ==========================================
    # 1. THE CACHE LAYER (Lightning Fast)
    # ==========================================
    cached_answer = check_cache(query)
    
    if cached_answer:
        # We chop the cached string into words and stream them instantly
        for word in cached_answer.split(" "):
            yield f"data: {json.dumps({'token': word + ' '})}\n\n"
            # We add a tiny 20ms sleep just to preserve the "typewriter" feel for the user
            await asyncio.sleep(0.02) 
        return # EXIT EARLY! The LLM is never triggered.
    
    """

    An async generator that yields tokens from the LangGraph agent 

    in a format compatible with Server-Sent Events (SSE).

    """
    # ==========================================
    # 2. THE AGENT LAYER (Heavy Compute)
    # ==========================================
    initial_state = {"messages": [HumanMessage(content=query)]}
    full_answer = "" # We need to collect the tokens to save them later

    # .astream_events is the key to deep-access streaming in LangChain/LangGraph
    async for event in agent_app.astream_events(initial_state, version="v1"):
        kind = event["event"]

        # We are looking for the 'on_chat_model_stream' event
        # This triggers every time a new token is generated by the LLM
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                full_answer += content
                # SSE format requires the "data: " prefix
                yield f"data: {json.dumps({'token': content})}\n\n"

    # ==========================================
    # 3. SAVE FOR THE FUTURE
    # ==========================================
    # Only cache if we got an answer, AND the answer isn't our fallback failure phrase
    failure_phrase = "I do not have enough information"

    if full_answer and failure_phrase not in full_answer:
        save_to_cache(query, full_answer)
    else:
        print("\n⚠️ [CACHE SKIP] Agent failed to answer. Did not poison the cache.")

@app.post("/chat/stream")
async def chat_streaming_endpoint(request: ChatRequest):
    return StreamingResponse(
        stream_generator(request.query), 
        media_type="text/event-stream"
    )