| """ |
| Layer 3: LLM Layer β All LLM interactions with token tracking |
| ============================================================= |
| Handles generation, entity extraction, keyword extraction, |
| query complexity analysis, and graph reasoning explanation. |
| """ |
| import json |
| import logging |
| import time |
| from dataclasses import dataclass, field |
| from typing import Any, Dict, List |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class LLMResponse: |
| """Container for LLM response with usage metadata.""" |
| content: str = "" |
| input_tokens: int = 0 |
| output_tokens: int = 0 |
| total_tokens: int = 0 |
| latency_ms: float = 0.0 |
| cost_usd: float = 0.0 |
| model: str = "" |
|
|
|
|
| @dataclass |
| class TokenTracker: |
| """Tracks cumulative token usage and costs.""" |
| total_input_tokens: int = 0 |
| total_output_tokens: int = 0 |
| total_cost: float = 0.0 |
| call_count: int = 0 |
| calls: List[Dict] = field(default_factory=list) |
|
|
| def record(self, resp: LLMResponse, label: str = ""): |
| self.total_input_tokens += resp.input_tokens |
| self.total_output_tokens += resp.output_tokens |
| self.total_cost += resp.cost_usd |
| self.call_count += 1 |
| self.calls.append({ |
| "label": label, "input_tokens": resp.input_tokens, |
| "output_tokens": resp.output_tokens, "cost_usd": resp.cost_usd, |
| "latency_ms": resp.latency_ms |
| }) |
|
|
| def summary(self): |
| return { |
| "total_input_tokens": self.total_input_tokens, |
| "total_output_tokens": self.total_output_tokens, |
| "total_cost_usd": round(self.total_cost, 6), |
| "call_count": self.call_count |
| } |
|
|
|
|
| class LLMLayer: |
| """ |
| Layer 3: Handles all LLM interactions. |
| Supports OpenAI API with mock fallback for testing. |
| """ |
|
|
| def __init__(self, api_key="", model="gpt-4o-mini", |
| cost_per_1k_input=0.00015, cost_per_1k_output=0.0006): |
| self.model = model |
| self.cost_in = cost_per_1k_input |
| self.cost_out = cost_per_1k_output |
| self.client = None |
| self._api_key = api_key |
|
|
| def initialize(self): |
| """Initialize the OpenAI client.""" |
| try: |
| from openai import OpenAI |
| import os |
| key = self._api_key or os.getenv("OPENAI_API_KEY", "") |
| if key: |
| base_url = os.getenv("OPENAI_BASE_URL", "") |
| self.client = OpenAI(api_key=key, base_url=base_url) if base_url else OpenAI(api_key=key) |
| logger.info(f"LLM initialized: {self.model}") |
| else: |
| logger.warning("No API key β using mock mode") |
| except ImportError: |
| logger.warning("openai not installed β mock mode") |
|
|
| def _cost(self, inp, out): |
| return inp / 1000 * self.cost_in + out / 1000 * self.cost_out |
|
|
| def generate(self, messages, temperature=0.0, max_tokens=1024, json_mode=False): |
| """Generate a response from the LLM.""" |
| start = time.perf_counter() |
|
|
| if self.client is None: |
| return LLMResponse( |
| content="[Mock response β no API key configured]", |
| input_tokens=50, output_tokens=20, total_tokens=70, |
| latency_ms=100.0, cost_usd=self._cost(50, 20), model=self.model |
| ) |
|
|
| try: |
| kwargs = {"model": self.model, "messages": messages, |
| "temperature": temperature, "max_tokens": max_tokens} |
| if json_mode: |
| kwargs["response_format"] = {"type": "json_object"} |
| resp = self.client.chat.completions.create(**kwargs) |
| elapsed = (time.perf_counter() - start) * 1000 |
| u = resp.usage |
| return LLMResponse( |
| content=resp.choices[0].message.content, |
| input_tokens=u.prompt_tokens, output_tokens=u.completion_tokens, |
| total_tokens=u.prompt_tokens + u.completion_tokens, |
| latency_ms=elapsed, |
| cost_usd=self._cost(u.prompt_tokens, u.completion_tokens), |
| model=self.model |
| ) |
| except Exception as e: |
| elapsed = (time.perf_counter() - start) * 1000 |
| logger.error(f"LLM error: {e}") |
| return LLMResponse(content=f"[Error: {e}]", latency_ms=elapsed, model=self.model) |
|
|
| |
|
|
| def generate_answer(self, query, context, system_prompt=None): |
| """Generate an answer given query and context.""" |
| if not system_prompt: |
| system_prompt = ( |
| "You are a helpful assistant. Answer accurately using ONLY the provided context. " |
| "If the context doesn't contain enough info, say so. Be concise and precise." |
| ) |
| return self.generate([ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}\n\nAnswer:"} |
| ], max_tokens=512) |
|
|
| def extract_entities(self, text, entity_types=None, relation_types=None): |
| """Extract entities and relationships using schema-bounded extraction (novelty).""" |
| if not entity_types: |
| entity_types = ["PERSON", "ORGANIZATION", "LOCATION", "EVENT", |
| "DATE", "CONCEPT", "WORK", "PRODUCT", "TECHNOLOGY"] |
| if not relation_types: |
| relation_types = ["WORKS_FOR", "LOCATED_IN", "FOUNDED_BY", "PART_OF", |
| "RELATED_TO", "CREATED_BY", "HAPPENED_IN", "MEMBER_OF", |
| "COLLABORATES_WITH", "INFLUENCES"] |
|
|
| prompt = f"""Extract all entities and relationships from the text. |
| ALLOWED ENTITY TYPES: {json.dumps(entity_types)} |
| ALLOWED RELATION TYPES: {json.dumps(relation_types)} |
| |
| Return JSON: |
| {{"entities": [{{"name": "exact name", "type": "one of allowed types", "description": "brief 1-sentence"}}], |
| "relations": [{{"source": "source entity name", "target": "target entity name", "type": "one of allowed types", "description": "brief"}}]}} |
| |
| Text: {text}""" |
| return self.generate([{"role": "user", "content": prompt}], max_tokens=4096, json_mode=False) |
|
|
| def extract_keywords(self, query): |
| """Extract dual-level keywords for GraphRAG retrieval (novelty: LightRAG-inspired).""" |
| prompt = """Extract search keywords from this question. Return JSON: |
| {"high_level": ["abstract themes/topics"], "low_level": ["specific entities/names/dates"]} |
| |
| Question: """ + query |
| return self.generate([{"role": "user", "content": prompt}], max_tokens=256, json_mode=True) |
|
|
| def analyze_query_complexity(self, query): |
| """Analyze query complexity for adaptive routing (novelty).""" |
| prompt = """Rate this question's complexity from 0.0 to 1.0. Return JSON: |
| {"complexity_score": 0.0-1.0, "reasoning": "brief", "query_type": "factoid|comparison|bridge|multi_hop", "estimated_hops": 1-4} |
| |
| Score guide: 0.0-0.3 simple factoid, 0.3-0.6 moderate, 0.6-0.8 complex multi-entity, 0.8-1.0 multi-hop reasoning |
| |
| Question: """ + query |
| return self.generate([{"role": "user", "content": prompt}], max_tokens=256, json_mode=True) |
|
|
| def generate_graph_explanation(self, query, entities, relations, answer): |
| """Generate natural language explanation of graph reasoning path (novelty).""" |
| ent_str = "\n".join([f"- {e.get('name','?')} ({e.get('entity_type','?')}): {e.get('description','')}" |
| for e in entities[:10]]) |
| rel_str = "\n".join([f"- {r}" for r in relations[:15]]) |
| prompt = f"""Explain the graph reasoning path for this answer step-by-step. |
| |
| Question: {query} |
| |
| Entities Found: |
| {ent_str} |
| |
| Relationships Traversed: |
| {rel_str} |
| |
| Generated Answer: {answer} |
| |
| Format as: |
| 1. **Entry Points**: [which entities were found first] |
| 2. **Traversal**: [which relationships were followed, use A β B β C notation] |
| 3. **Evidence**: [which facts support the answer] |
| 4. **Conclusion**: [how the answer was derived]""" |
| return self.generate([{"role": "user", "content": prompt}], max_tokens=512) |
|
|