import logging logger = logging.getLogger(__name__) """ ReportAgent — Full ReACT Analytical Agent Replaces the 72-line stub with a complete ReACT (Reasoning + Acting) loop powered by three specialized tools (InsightForge, PanoramaSearch, QuickSearch). Architecture: - InsightForgeTool: Hybrid broad-spectrum retriever (vector + graph + community) - PanoramaSearchTool: Entity-type sweep for macro-level statistics - QuickSearchTool: Fast single-entity lookup with direct relationships """ from __future__ import annotations import asyncio import json from datetime import datetime, timezone from typing import Any, Dict, List, Literal, Optional from pydantic import BaseModel, Field from ..core.neo4j_store import Neo4jStore from ..core.llm_factory import UnifiedLLMProvider from ..config import settings # ── Result models ─────────────────────────────────────────────────────────── class ReportSection(BaseModel): title: str content: str class ReportResult(BaseModel): topic: str executive_summary: str sections: Dict[str, str] = Field(default_factory=dict) key_entities: List[str] = Field(default_factory=list) confidence: float = 0.0 tool_calls_made: int = 0 generated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc).replace(tzinfo=None)) markdown: str = "" # ── Specialized analytical tools ───────────────────────────────────────────── class InsightForgeTool: """ Broad-spectrum hybrid retriever: merges vector similarity + graph neighborhood + community summaries for cross-entity insights. Use for open-ended analytical questions. """ name = "InsightForge" description = ( "Hybrid broad-spectrum retriever combining vector similarity, graph " "neighborhood, and community summaries. Best for open-ended analysis." ) def __init__(self, store: Neo4jStore, llm: UnifiedLLMProvider) -> None: self.store = store self.llm = llm async def run(self, query: str, k: int = 8, tenant_id: Optional[str] = None) -> List[Dict[str, Any]]: results: List[Dict[str, Any]] = [] # 1. Hybrid vector + BM25 chunk retrieval try: embedding = await self.llm.embed(query) bm25_task = self.store.bm25_search(query, k=k, tenant_id=tenant_id) vector_task = self.store.search(query_vector=embedding, k=k, tenant_id=tenant_id) bm25_r, vector_r = await asyncio.gather( bm25_task, vector_task, return_exceptions=True ) for r in (bm25_r if not isinstance(bm25_r, Exception) else []): r["source"] = "bm25" results.append(r) for r in (vector_r if not isinstance(vector_r, Exception) else []): r["source"] = "vector" results.append(r) except Exception: pass # 2. Graph neighborhood of top chunk entities try: entity_query = """ CALL db.index.fulltext.queryNodes('chunk_text_index', $q) YIELD node, score WHERE ($tenant_id IS NULL OR node.tenant_id = $tenant_id) MATCH (node)-[:MENTIONS]->(e:Entity) WHERE ($tenant_id IS NULL OR e.tenant_id = $tenant_id) RETURN DISTINCT e.name as name, e.summary as summary LIMIT 10 """ entity_rows = await self.store.execute_query( entity_query, {"q": query, "tenant_id": tenant_id} ) for row in entity_rows: if row.get("summary"): results.append({ "text": f"[Entity Profile] {row['name']}: {row['summary']}", "source": "entity_summary", "retrieval_method": "insight_forge", }) except Exception: pass # 3. Community summaries try: community_query = """ MATCH (e:Entity) WHERE e.community_id IS NOT NULL AND ($tenant_id IS NULL OR e.tenant_id = $tenant_id) WITH e.community_id as cid, collect(e.name)[..5] as members RETURN cid, members ORDER BY size(members) DESC LIMIT 3 """ communities = await self.store.execute_query(community_query, {"tenant_id": tenant_id}) for comm in communities: member_summary = ", ".join(comm.get("members", [])) results.append({ "text": ( f"[Community {comm['cid']} — " f"{len(comm.get('members', []))} entities]: " f"{member_summary}" ), "source": "community", "retrieval_method": "insight_forge", }) except Exception: pass # Deduplicate by text seen: set = set() unique: List[Dict] = [] for r in results: key = r.get("text", "")[:80] if key and key not in seen: seen.add(key) unique.append(r) return unique[:k] class PanoramaSearchTool: """ Macro-level entity sweep: returns all entities of a given type with statistics. Useful for 'How many X?', 'List all Y', 'What types of Z?' """ name = "PanoramaSearch" description = ( "Broad entity sweep returning all nodes of a specified type. " "Best for: counting, listing, macro-level statistics." ) def __init__(self, store: Neo4jStore) -> None: self.store = store async def run( self, entity_type: str = "Entity", limit: int = 30, tenant_id: Optional[str] = None ) -> List[Dict[str, Any]]: """Return entities of the given type with their summaries.""" query = """ MATCH (e:Entity) WHERE (e.type = $type OR $type = 'Entity') AND ($tenant_id IS NULL OR e.tenant_id = $tenant_id) RETURN e.name as name, e.type as type, e.summary as summary, size((e)--()) as degree ORDER BY degree DESC LIMIT $limit """ try: rows = await self.store.execute_query( query, {"type": entity_type, "limit": limit, "tenant_id": tenant_id} ) results = [] for r in rows: text = f"[{r.get('type', 'Entity')}] {r.get('name', '')}" if r.get("summary"): text += f": {r['summary']}" results.append({ "text": text, "name": r.get("name"), "type": r.get("type"), "degree": r.get("degree", 0), "retrieval_method": "panorama_search", }) return results except Exception as exc: logger.info(f"[PanoramaSearch] Error: {exc}") return [] class QuickSearchTool: """ Fast single-entity lookup by name with direct 1-hop relationships. Useful for 'Who is X?', 'What does Y do?', 'Tell me about Z'. """ name = "QuickSearch" description = ( "Fast entity lookup by name. Returns entity summary + direct " "relationships. Best for specific entity questions." ) def __init__(self, store: Neo4jStore, llm: UnifiedLLMProvider) -> None: self.store = store self.llm = llm async def run(self, entity_name: str, tenant_id: Optional[str] = None) -> List[Dict[str, Any]]: """Look up an entity and return its profile + connections.""" # Exact match first, then fuzzy BM25 entity_query = """ MATCH (e:Entity) WHERE (e.name = $name OR toLower(e.name) CONTAINS toLower($name)) AND ($tenant_id IS NULL OR e.tenant_id = $tenant_id) RETURN e.name as name, e.type as type, e.summary as summary LIMIT 3 """ try: entities = await self.store.execute_query( entity_query, {"name": entity_name, "tenant_id": tenant_id} ) except Exception: entities = [] results: List[Dict[str, Any]] = [] for entity in entities: name = entity.get("name", entity_name) summary = entity.get("summary", "") entry: Dict[str, Any] = { "name": name, "type": entity.get("type", "Entity"), "retrieval_method": "quick_search", } # Get direct relationships rel_query = """ MATCH (e:Entity {name: $name})-[r]-(other:Entity) WHERE ($tenant_id IS NULL OR (e.tenant_id = $tenant_id AND other.tenant_id = $tenant_id AND r.tenant_id = $tenant_id)) RETURN type(r) as rel_type, other.name as other_name, other.type as other_type LIMIT 20 """ try: rels = await self.store.execute_query( rel_query, {"name": name, "tenant_id": tenant_id} ) rel_lines = [ f"{r['rel_type']} → {r['other_name']} ({r['other_type']})" for r in rels ] rel_text = "; ".join(rel_lines) if rel_lines else "no connections" except Exception: rel_text = "unavailable" text_parts = [f"[Entity] {name}"] if summary: text_parts.append(summary) text_parts.append(f"Connections: {rel_text}") entry["text"] = " | ".join(text_parts) results.append(entry) return results # ── Main ReportAgent ────────────────────────────────────────────────────────── class ReportAgent: """ Full ReACT analytical reporting agent. Workflow: DECOMPOSE → Break topic into 3-5 sub-questions REACT LOOP → For each sub-question: THINK → pick best tool ACT → run InsightForge / PanoramaSearch / QuickSearch OBS → record retrieved context WRITE → draft answer section COMPILE → Assemble all sections into a structured markdown report """ MAX_REACT_LOOPS = 6 TOOLS_DESC = """ Available tools: - InsightForge(query): Hybrid broad-spectrum retriever. Best for analytical questions. - PanoramaSearch(entity_type): Sweep all entities of a type. Best for counting/listing. - QuickSearch(entity_name): Fast entity lookup + connections. Best for "Who is X?" questions. """ def __init__(self, store: Neo4jStore, llm: UnifiedLLMProvider) -> None: self.store = store self.llm = llm self.insight_forge = InsightForgeTool(store, llm) self.panorama = PanoramaSearchTool(store) self.quick_search = QuickSearchTool(store, llm) self._tool_calls = 0 # ── Public ───────────────────────────────────────────────────────────── async def generate_report( self, topic: str, report_type: Literal["executive", "detailed", "entity_focus"] = "detailed", target_entity: Optional[str] = None, tenant_id: Optional[str] = None, ) -> ReportResult: """ Generate an analytical report on the given topic. Args: topic: High-level topic or question report_type: "executive" (short), "detailed" (full), "entity_focus" (scoped) target_entity: For entity_focus — name of the entity to focus on Returns: ReportResult with sections, summary, and compiled markdown """ self._tool_calls = 0 # 1. Decompose topic into sub-questions sub_questions = await self._decompose_topic( topic, report_type, target_entity ) # 2. ReACT loop for each sub-question sections: Dict[str, str] = {} all_contexts: List[str] = [] key_entities: List[str] = [] total_confidence_score = 0.0 for question in sub_questions: section_content, contexts, entities, confidence = await self._react_loop(question, tenant_id=tenant_id) if section_content: sections[question] = section_content all_contexts.extend(contexts) key_entities.extend(entities) total_confidence_score += confidence # 3. Executive summary exec_summary = await self._write_executive_summary(topic, sections) # 4. Key entities dedup key_entities = list(dict.fromkeys(key_entities))[:10] # 5. Confidence logic updated to use actual contextual relevancy confidence rather than length overall_confidence = round(total_confidence_score / max(len(sub_questions), 1), 2) # 6. Compile markdown markdown = self._compile_markdown( topic, exec_summary, sections, key_entities ) return ReportResult( topic=topic, executive_summary=exec_summary, sections=sections, key_entities=key_entities, confidence=overall_confidence, tool_calls_made=self._tool_calls, markdown=markdown, ) # ── Internal steps ───────────────────────────────────────────────────── async def _decompose_topic( self, topic: str, report_type: str, target_entity: Optional[str], ) -> List[str]: """Ask LLM to decompose the topic into sub-questions.""" n = 3 if report_type == "executive" else 5 focus = ( f"Focus specifically on the entity '{target_entity}'." if target_entity else "" ) prompt = f"""You are planning an analytical report about: "{topic}" {focus} Generate {n} specific sub-questions that would together create a complete report. Each sub-question should be answerable from a knowledge graph. Return ONLY a JSON list of strings: ["question 1", "question 2", ...]""" try: response = await self.llm.complete(prompt, temperature=0.3) cleaned = response.strip() for marker in ("```json", "```"): if marker in cleaned: cleaned = cleaned.split(marker)[1].split("```")[0] questions = json.loads(cleaned.strip()) if isinstance(questions, list) and questions: return [str(q) for q in questions[:n]] except Exception: pass # Fallback return [ f"What are the main entities related to {topic}?", f"What are the key relationships in {topic}?", f"What are the most important findings about {topic}?", ] async def _react_loop( self, question: str, tenant_id: Optional[str] = None ) -> tuple[str, List[str], List[str], float]: """ Run a ReACT iteration for one sub-question. Returns (section_content, context_texts, entity_names, confidence_score). """ collected_contexts: List[str] = [] entity_names: List[str] = [] observations: List[str] = [] for step in range(self.MAX_REACT_LOOPS): # THINK: which tool next? thought, tool_name, tool_arg = await self._think( question, observations ) observations.append(f"[Thought]\n{thought}") if tool_name == "DONE": break # ACT: run the chosen tool tool_results = await self._act(tool_name, tool_arg, tenant_id=tenant_id) self._tool_calls += 1 if tool_results: obs_texts = [r.get("text", str(r))[:300] for r in tool_results] obs_summary = "\n".join(f"• {t}" for t in obs_texts[:5]) observations.append(f"[{tool_name}({tool_arg})]\n{obs_summary}") collected_contexts.extend(obs_texts) # Collect entity names for r in tool_results: if r.get("name"): entity_names.append(r["name"]) else: observations.append(f"[{tool_name}({tool_arg})] No results.") break # WRITE: draft the section answer if collected_contexts: section, confidence = await self._write_section_with_confidence(question, collected_contexts) else: section = "Insufficient data found in the knowledge graph." confidence = 0.0 return section, collected_contexts, entity_names, confidence async def _think( self, question: str, observations: List[str] ) -> tuple[str, str, str]: """Decide which tool to call next, or return DONE.""" obs_text = "\n".join(observations[-3:]) if observations else "None yet." prompt = f"""You are deciding the next action to answer: "{question}" {self.TOOLS_DESC} Observations so far: {obs_text} If you have enough information to write a good answer, choose the DONE tool. Otherwise, deeply analyze the observations in a step-by-step thought and pick the most appropriate tool.""" class AgentAction(BaseModel): thought: str = Field(description="Step by step reasoning analyzing the observations and deciding what to do next.") tool_name: Literal["InsightForge", "PanoramaSearch", "QuickSearch", "DONE"] = Field(description="The chosen tool.") tool_arg: str = Field(description="The argument to pass to the tool. Empty if DONE.") try: action: AgentAction = await self.llm.complete_structured( prompt=prompt, response_model=AgentAction, system_prompt="You are a meticulous investigative agent generating JSON." ) return action.thought, action.tool_name, action.tool_arg except Exception as exc: return f"Failed to parse or error: {exc}", "DONE", "" async def _act( self, tool_name: str, tool_arg: str, tenant_id: Optional[str] = None ) -> List[Dict[str, Any]]: """Dispatch tool call and return results.""" try: if tool_name == "InsightForge": return await self.insight_forge.run(tool_arg, tenant_id=tenant_id) elif tool_name == "PanoramaSearch": return await self.panorama.run(tool_arg, tenant_id=tenant_id) elif tool_name == "QuickSearch": return await self.quick_search.run(tool_arg, tenant_id=tenant_id) except Exception as exc: logger.info(f"[ReportAgent] Tool {tool_name} failed: {exc}") return [] async def _write_section_with_confidence( self, question: str, contexts: List[str] ) -> tuple[str, float]: """Generate a report section from retrieved contexts and provide a structured confidence score.""" context_text = "\n\n".join(f"[Source {i+1}]: {c}" for i, c in enumerate(contexts[:8])) prompt = f"""Write a factual, well-structured paragraph answering: "{question}" Based ONLY on the following knowledge graph data: {context_text} Instructions: - Be specific and cite entities by name - Do not hallucinate or add information not in the sources - 2-4 sentences is ideal - Also output a confidence score from 0.0 to 1.0 reflecting how fully the data answers the question. """ class SectionResult(BaseModel): content: str = Field(description="The 2-4 sentence report section or state data is insufficient.") confidence: float = Field(description="Confidence score 0.0 to 1.0 based on data sufficiency") try: res: SectionResult = await self.llm.complete_structured( prompt=prompt, response_model=SectionResult, system_prompt="You are an analytical writer crafting a knowledge-graph report section.", ) return res.content, res.confidence except Exception: return "Unable to generate section due to LLM error.", 0.0 async def _write_executive_summary( self, topic: str, sections: Dict[str, str] ) -> str: """Synthesize all sections into a 3-sentence executive summary.""" section_text = "\n\n".join( f"## {q}\n{a}" for q, a in list(sections.items())[:5] ) prompt = f"""Write a 2-3 sentence executive summary for a report on: "{topic}" Report findings: {section_text[:2000]} Executive summary (concise, factual, highlight the most important finding):""" try: return await self.llm.complete(prompt, temperature=0.3) except Exception: return f"Analysis of {topic} based on knowledge graph data." def _compile_markdown( self, topic: str, exec_summary: str, sections: Dict[str, str], key_entities: List[str], ) -> str: lines = [ f"# Report: {topic}", f"*Generated: {datetime.now(timezone.utc).replace(tzinfo=None).strftime('%Y-%m-%d %H:%M UTC')}*\n", "## Executive Summary", exec_summary, "", ] for question, content in sections.items(): lines.append(f"## {question}") lines.append(content) lines.append("") if key_entities: lines.append("## Key Entities Referenced") lines.append(", ".join(key_entities)) return "\n".join(lines)