muthuk1's picture
Add .gitignore, dataset metadata, retrieval layer, and latest web/graphrag updates
577adc4
"""
Layer 3: LLM Layer β€” All LLM interactions with token tracking
=============================================================
Handles generation, entity extraction, keyword extraction,
query complexity analysis, and graph reasoning explanation.
"""
import json
import logging
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List
logger = logging.getLogger(__name__)
@dataclass
class LLMResponse:
"""Container for LLM response with usage metadata."""
content: str = ""
input_tokens: int = 0
output_tokens: int = 0
total_tokens: int = 0
latency_ms: float = 0.0
cost_usd: float = 0.0
model: str = ""
@dataclass
class TokenTracker:
"""Tracks cumulative token usage and costs."""
total_input_tokens: int = 0
total_output_tokens: int = 0
total_cost: float = 0.0
call_count: int = 0
calls: List[Dict] = field(default_factory=list)
def record(self, resp: LLMResponse, label: str = ""):
self.total_input_tokens += resp.input_tokens
self.total_output_tokens += resp.output_tokens
self.total_cost += resp.cost_usd
self.call_count += 1
self.calls.append({
"label": label, "input_tokens": resp.input_tokens,
"output_tokens": resp.output_tokens, "cost_usd": resp.cost_usd,
"latency_ms": resp.latency_ms
})
def summary(self):
return {
"total_input_tokens": self.total_input_tokens,
"total_output_tokens": self.total_output_tokens,
"total_cost_usd": round(self.total_cost, 6),
"call_count": self.call_count
}
class LLMLayer:
"""
Layer 3: Handles all LLM interactions.
Supports OpenAI API with mock fallback for testing.
"""
def __init__(self, api_key="", model="gpt-4o-mini",
cost_per_1k_input=0.00015, cost_per_1k_output=0.0006):
self.model = model
self.cost_in = cost_per_1k_input
self.cost_out = cost_per_1k_output
self.client = None
self._api_key = api_key
def initialize(self):
"""Initialize the OpenAI client."""
try:
from openai import OpenAI
import os
key = self._api_key or os.getenv("OPENAI_API_KEY", "")
if key:
base_url = os.getenv("OPENAI_BASE_URL", "")
self.client = OpenAI(api_key=key, base_url=base_url) if base_url else OpenAI(api_key=key)
logger.info(f"LLM initialized: {self.model}")
else:
logger.warning("No API key β€” using mock mode")
except ImportError:
logger.warning("openai not installed β€” mock mode")
def _cost(self, inp, out):
return inp / 1000 * self.cost_in + out / 1000 * self.cost_out
def generate(self, messages, temperature=0.0, max_tokens=1024, json_mode=False):
"""Generate a response from the LLM."""
start = time.perf_counter()
if self.client is None:
return LLMResponse(
content="[Mock response β€” no API key configured]",
input_tokens=50, output_tokens=20, total_tokens=70,
latency_ms=100.0, cost_usd=self._cost(50, 20), model=self.model
)
try:
kwargs = {"model": self.model, "messages": messages,
"temperature": temperature, "max_tokens": max_tokens}
if json_mode:
kwargs["response_format"] = {"type": "json_object"}
resp = self.client.chat.completions.create(**kwargs)
elapsed = (time.perf_counter() - start) * 1000
u = resp.usage
return LLMResponse(
content=resp.choices[0].message.content,
input_tokens=u.prompt_tokens, output_tokens=u.completion_tokens,
total_tokens=u.prompt_tokens + u.completion_tokens,
latency_ms=elapsed,
cost_usd=self._cost(u.prompt_tokens, u.completion_tokens),
model=self.model
)
except Exception as e:
elapsed = (time.perf_counter() - start) * 1000
logger.error(f"LLM error: {e}")
return LLMResponse(content=f"[Error: {e}]", latency_ms=elapsed, model=self.model)
# ── Specialized Functions ─────────────────────────────
def generate_answer(self, query, context, system_prompt=None):
"""Generate an answer given query and context."""
if not system_prompt:
system_prompt = (
"You are a helpful assistant. Answer accurately using ONLY the provided context. "
"If the context doesn't contain enough info, say so. Be concise and precise."
)
return self.generate([
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}\n\nAnswer:"}
], max_tokens=512)
def extract_entities(self, text, entity_types=None, relation_types=None):
"""Extract entities and relationships using schema-bounded extraction (novelty)."""
if not entity_types:
entity_types = ["PERSON", "ORGANIZATION", "LOCATION", "EVENT",
"DATE", "CONCEPT", "WORK", "PRODUCT", "TECHNOLOGY"]
if not relation_types:
relation_types = ["WORKS_FOR", "LOCATED_IN", "FOUNDED_BY", "PART_OF",
"RELATED_TO", "CREATED_BY", "HAPPENED_IN", "MEMBER_OF",
"COLLABORATES_WITH", "INFLUENCES"]
prompt = f"""Extract all entities and relationships from the text.
ALLOWED ENTITY TYPES: {json.dumps(entity_types)}
ALLOWED RELATION TYPES: {json.dumps(relation_types)}
Return JSON:
{{"entities": [{{"name": "exact name", "type": "one of allowed types", "description": "brief 1-sentence"}}],
"relations": [{{"source": "source entity name", "target": "target entity name", "type": "one of allowed types", "description": "brief"}}]}}
Text: {text}"""
return self.generate([{"role": "user", "content": prompt}], max_tokens=4096, json_mode=False)
def extract_keywords(self, query):
"""Extract dual-level keywords for GraphRAG retrieval (novelty: LightRAG-inspired)."""
prompt = """Extract search keywords from this question. Return JSON:
{"high_level": ["abstract themes/topics"], "low_level": ["specific entities/names/dates"]}
Question: """ + query
return self.generate([{"role": "user", "content": prompt}], max_tokens=256, json_mode=True)
def analyze_query_complexity(self, query):
"""Analyze query complexity for adaptive routing (novelty)."""
prompt = """Rate this question's complexity from 0.0 to 1.0. Return JSON:
{"complexity_score": 0.0-1.0, "reasoning": "brief", "query_type": "factoid|comparison|bridge|multi_hop", "estimated_hops": 1-4}
Score guide: 0.0-0.3 simple factoid, 0.3-0.6 moderate, 0.6-0.8 complex multi-entity, 0.8-1.0 multi-hop reasoning
Question: """ + query
return self.generate([{"role": "user", "content": prompt}], max_tokens=256, json_mode=True)
def generate_graph_explanation(self, query, entities, relations, answer):
"""Generate natural language explanation of graph reasoning path (novelty)."""
ent_str = "\n".join([f"- {e.get('name','?')} ({e.get('entity_type','?')}): {e.get('description','')}"
for e in entities[:10]])
rel_str = "\n".join([f"- {r}" for r in relations[:15]])
prompt = f"""Explain the graph reasoning path for this answer step-by-step.
Question: {query}
Entities Found:
{ent_str}
Relationships Traversed:
{rel_str}
Generated Answer: {answer}
Format as:
1. **Entry Points**: [which entities were found first]
2. **Traversal**: [which relationships were followed, use A β†’ B β†’ C notation]
3. **Evidence**: [which facts support the answer]
4. **Conclusion**: [how the answer was derived]"""
return self.generate([{"role": "user", "content": prompt}], max_tokens=512)