| import logging |
| logger = logging.getLogger(__name__) |
| """ |
| ReportAgent β Full ReACT Analytical Agent |
| Replaces the 72-line stub with a complete ReACT (Reasoning + Acting) loop |
| powered by three specialized tools (InsightForge, PanoramaSearch, QuickSearch). |
| |
| Architecture: |
| - InsightForgeTool: Hybrid broad-spectrum retriever (vector + graph + community) |
| - PanoramaSearchTool: Entity-type sweep for macro-level statistics |
| - QuickSearchTool: Fast single-entity lookup with direct relationships |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import json |
| from datetime import datetime, timezone |
| from typing import Any, Dict, List, Literal, Optional |
|
|
| from pydantic import BaseModel, Field |
|
|
| from ..core.neo4j_store import Neo4jStore |
| from ..core.llm_factory import UnifiedLLMProvider |
| from ..config import settings |
|
|
|
|
| |
|
|
| class ReportSection(BaseModel): |
| title: str |
| content: str |
|
|
|
|
| class ReportResult(BaseModel): |
| topic: str |
| executive_summary: str |
| sections: Dict[str, str] = Field(default_factory=dict) |
| key_entities: List[str] = Field(default_factory=list) |
| confidence: float = 0.0 |
| tool_calls_made: int = 0 |
| generated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc).replace(tzinfo=None)) |
| markdown: str = "" |
|
|
|
|
| |
|
|
| class InsightForgeTool: |
| """ |
| Broad-spectrum hybrid retriever: merges vector similarity + graph |
| neighborhood + community summaries for cross-entity insights. |
| Use for open-ended analytical questions. |
| """ |
| name = "InsightForge" |
| description = ( |
| "Hybrid broad-spectrum retriever combining vector similarity, graph " |
| "neighborhood, and community summaries. Best for open-ended analysis." |
| ) |
|
|
| def __init__(self, store: Neo4jStore, llm: UnifiedLLMProvider) -> None: |
| self.store = store |
| self.llm = llm |
|
|
| async def run(self, query: str, k: int = 8, tenant_id: Optional[str] = None) -> List[Dict[str, Any]]: |
| results: List[Dict[str, Any]] = [] |
|
|
| |
| try: |
| embedding = await self.llm.embed(query) |
| bm25_task = self.store.bm25_search(query, k=k, tenant_id=tenant_id) |
| vector_task = self.store.search(query_vector=embedding, k=k, tenant_id=tenant_id) |
| bm25_r, vector_r = await asyncio.gather( |
| bm25_task, vector_task, return_exceptions=True |
| ) |
| for r in (bm25_r if not isinstance(bm25_r, Exception) else []): |
| r["source"] = "bm25" |
| results.append(r) |
| for r in (vector_r if not isinstance(vector_r, Exception) else []): |
| r["source"] = "vector" |
| results.append(r) |
| except Exception: |
| pass |
|
|
| |
| try: |
| entity_query = """ |
| CALL db.index.fulltext.queryNodes('chunk_text_index', $q) |
| YIELD node, score |
| WHERE ($tenant_id IS NULL OR node.tenant_id = $tenant_id) |
| MATCH (node)-[:MENTIONS]->(e:Entity) |
| WHERE ($tenant_id IS NULL OR e.tenant_id = $tenant_id) |
| RETURN DISTINCT e.name as name, e.summary as summary |
| LIMIT 10 |
| """ |
| entity_rows = await self.store.execute_query( |
| entity_query, {"q": query, "tenant_id": tenant_id} |
| ) |
| for row in entity_rows: |
| if row.get("summary"): |
| results.append({ |
| "text": f"[Entity Profile] {row['name']}: {row['summary']}", |
| "source": "entity_summary", |
| "retrieval_method": "insight_forge", |
| }) |
| except Exception: |
| pass |
|
|
| |
| try: |
| community_query = """ |
| MATCH (e:Entity) |
| WHERE e.community_id IS NOT NULL |
| AND ($tenant_id IS NULL OR e.tenant_id = $tenant_id) |
| WITH e.community_id as cid, collect(e.name)[..5] as members |
| RETURN cid, members |
| ORDER BY size(members) DESC |
| LIMIT 3 |
| """ |
| communities = await self.store.execute_query(community_query, {"tenant_id": tenant_id}) |
| for comm in communities: |
| member_summary = ", ".join(comm.get("members", [])) |
| results.append({ |
| "text": ( |
| f"[Community {comm['cid']} β " |
| f"{len(comm.get('members', []))} entities]: " |
| f"{member_summary}" |
| ), |
| "source": "community", |
| "retrieval_method": "insight_forge", |
| }) |
| except Exception: |
| pass |
|
|
| |
| seen: set = set() |
| unique: List[Dict] = [] |
| for r in results: |
| key = r.get("text", "")[:80] |
| if key and key not in seen: |
| seen.add(key) |
| unique.append(r) |
|
|
| return unique[:k] |
|
|
|
|
| class PanoramaSearchTool: |
| """ |
| Macro-level entity sweep: returns all entities of a given type with |
| statistics. Useful for 'How many X?', 'List all Y', 'What types of Z?' |
| """ |
| name = "PanoramaSearch" |
| description = ( |
| "Broad entity sweep returning all nodes of a specified type. " |
| "Best for: counting, listing, macro-level statistics." |
| ) |
|
|
| def __init__(self, store: Neo4jStore) -> None: |
| self.store = store |
|
|
| async def run( |
| self, entity_type: str = "Entity", limit: int = 30, tenant_id: Optional[str] = None |
| ) -> List[Dict[str, Any]]: |
| """Return entities of the given type with their summaries.""" |
| query = """ |
| MATCH (e:Entity) |
| WHERE (e.type = $type OR $type = 'Entity') |
| AND ($tenant_id IS NULL OR e.tenant_id = $tenant_id) |
| RETURN e.name as name, e.type as type, |
| e.summary as summary, |
| size((e)--()) as degree |
| ORDER BY degree DESC |
| LIMIT $limit |
| """ |
| try: |
| rows = await self.store.execute_query( |
| query, {"type": entity_type, "limit": limit, "tenant_id": tenant_id} |
| ) |
| results = [] |
| for r in rows: |
| text = f"[{r.get('type', 'Entity')}] {r.get('name', '')}" |
| if r.get("summary"): |
| text += f": {r['summary']}" |
| results.append({ |
| "text": text, |
| "name": r.get("name"), |
| "type": r.get("type"), |
| "degree": r.get("degree", 0), |
| "retrieval_method": "panorama_search", |
| }) |
| return results |
| except Exception as exc: |
| logger.info(f"[PanoramaSearch] Error: {exc}") |
| return [] |
|
|
|
|
| class QuickSearchTool: |
| """ |
| Fast single-entity lookup by name with direct 1-hop relationships. |
| Useful for 'Who is X?', 'What does Y do?', 'Tell me about Z'. |
| """ |
| name = "QuickSearch" |
| description = ( |
| "Fast entity lookup by name. Returns entity summary + direct " |
| "relationships. Best for specific entity questions." |
| ) |
|
|
| def __init__(self, store: Neo4jStore, llm: UnifiedLLMProvider) -> None: |
| self.store = store |
| self.llm = llm |
|
|
| async def run(self, entity_name: str, tenant_id: Optional[str] = None) -> List[Dict[str, Any]]: |
| """Look up an entity and return its profile + connections.""" |
| |
| entity_query = """ |
| MATCH (e:Entity) |
| WHERE (e.name = $name OR toLower(e.name) CONTAINS toLower($name)) |
| AND ($tenant_id IS NULL OR e.tenant_id = $tenant_id) |
| RETURN e.name as name, e.type as type, e.summary as summary |
| LIMIT 3 |
| """ |
| try: |
| entities = await self.store.execute_query( |
| entity_query, {"name": entity_name, "tenant_id": tenant_id} |
| ) |
| except Exception: |
| entities = [] |
|
|
| results: List[Dict[str, Any]] = [] |
| for entity in entities: |
| name = entity.get("name", entity_name) |
| summary = entity.get("summary", "") |
| entry: Dict[str, Any] = { |
| "name": name, |
| "type": entity.get("type", "Entity"), |
| "retrieval_method": "quick_search", |
| } |
|
|
| |
| rel_query = """ |
| MATCH (e:Entity {name: $name})-[r]-(other:Entity) |
| WHERE ($tenant_id IS NULL OR (e.tenant_id = $tenant_id AND other.tenant_id = $tenant_id AND r.tenant_id = $tenant_id)) |
| RETURN type(r) as rel_type, |
| other.name as other_name, |
| other.type as other_type |
| LIMIT 20 |
| """ |
| try: |
| rels = await self.store.execute_query( |
| rel_query, {"name": name, "tenant_id": tenant_id} |
| ) |
| rel_lines = [ |
| f"{r['rel_type']} β {r['other_name']} ({r['other_type']})" |
| for r in rels |
| ] |
| rel_text = "; ".join(rel_lines) if rel_lines else "no connections" |
| except Exception: |
| rel_text = "unavailable" |
|
|
| text_parts = [f"[Entity] {name}"] |
| if summary: |
| text_parts.append(summary) |
| text_parts.append(f"Connections: {rel_text}") |
| entry["text"] = " | ".join(text_parts) |
| results.append(entry) |
|
|
| return results |
|
|
|
|
| |
|
|
| class ReportAgent: |
| """ |
| Full ReACT analytical reporting agent. |
| |
| Workflow: |
| DECOMPOSE β Break topic into 3-5 sub-questions |
| REACT LOOP β For each sub-question: |
| THINK β pick best tool |
| ACT β run InsightForge / PanoramaSearch / QuickSearch |
| OBS β record retrieved context |
| WRITE β draft answer section |
| COMPILE β Assemble all sections into a structured markdown report |
| """ |
|
|
| MAX_REACT_LOOPS = 6 |
| TOOLS_DESC = """ |
| Available tools: |
| - InsightForge(query): Hybrid broad-spectrum retriever. Best for analytical questions. |
| - PanoramaSearch(entity_type): Sweep all entities of a type. Best for counting/listing. |
| - QuickSearch(entity_name): Fast entity lookup + connections. Best for "Who is X?" questions. |
| """ |
|
|
| def __init__(self, store: Neo4jStore, llm: UnifiedLLMProvider) -> None: |
| self.store = store |
| self.llm = llm |
| self.insight_forge = InsightForgeTool(store, llm) |
| self.panorama = PanoramaSearchTool(store) |
| self.quick_search = QuickSearchTool(store, llm) |
| self._tool_calls = 0 |
|
|
| |
|
|
| async def generate_report( |
| self, |
| topic: str, |
| report_type: Literal["executive", "detailed", "entity_focus"] = "detailed", |
| target_entity: Optional[str] = None, |
| tenant_id: Optional[str] = None, |
| ) -> ReportResult: |
| """ |
| Generate an analytical report on the given topic. |
| |
| Args: |
| topic: High-level topic or question |
| report_type: "executive" (short), "detailed" (full), "entity_focus" (scoped) |
| target_entity: For entity_focus β name of the entity to focus on |
| |
| Returns: |
| ReportResult with sections, summary, and compiled markdown |
| """ |
| self._tool_calls = 0 |
|
|
| |
| sub_questions = await self._decompose_topic( |
| topic, report_type, target_entity |
| ) |
|
|
| |
| sections: Dict[str, str] = {} |
| all_contexts: List[str] = [] |
| key_entities: List[str] = [] |
|
|
| total_confidence_score = 0.0 |
|
|
| for question in sub_questions: |
| section_content, contexts, entities, confidence = await self._react_loop(question, tenant_id=tenant_id) |
| if section_content: |
| sections[question] = section_content |
| all_contexts.extend(contexts) |
| key_entities.extend(entities) |
| total_confidence_score += confidence |
|
|
| |
| exec_summary = await self._write_executive_summary(topic, sections) |
|
|
| |
| key_entities = list(dict.fromkeys(key_entities))[:10] |
|
|
| |
| overall_confidence = round(total_confidence_score / max(len(sub_questions), 1), 2) |
|
|
| |
| markdown = self._compile_markdown( |
| topic, exec_summary, sections, key_entities |
| ) |
|
|
| return ReportResult( |
| topic=topic, |
| executive_summary=exec_summary, |
| sections=sections, |
| key_entities=key_entities, |
| confidence=overall_confidence, |
| tool_calls_made=self._tool_calls, |
| markdown=markdown, |
| ) |
|
|
| |
|
|
| async def _decompose_topic( |
| self, |
| topic: str, |
| report_type: str, |
| target_entity: Optional[str], |
| ) -> List[str]: |
| """Ask LLM to decompose the topic into sub-questions.""" |
| n = 3 if report_type == "executive" else 5 |
| focus = ( |
| f"Focus specifically on the entity '{target_entity}'." |
| if target_entity |
| else "" |
| ) |
| prompt = f"""You are planning an analytical report about: "{topic}" |
| {focus} |
| |
| Generate {n} specific sub-questions that would together create a complete report. |
| Each sub-question should be answerable from a knowledge graph. |
| |
| Return ONLY a JSON list of strings: |
| ["question 1", "question 2", ...]""" |
|
|
| try: |
| response = await self.llm.complete(prompt, temperature=0.3) |
| cleaned = response.strip() |
| for marker in ("```json", "```"): |
| if marker in cleaned: |
| cleaned = cleaned.split(marker)[1].split("```")[0] |
| questions = json.loads(cleaned.strip()) |
| if isinstance(questions, list) and questions: |
| return [str(q) for q in questions[:n]] |
| except Exception: |
| pass |
| |
| return [ |
| f"What are the main entities related to {topic}?", |
| f"What are the key relationships in {topic}?", |
| f"What are the most important findings about {topic}?", |
| ] |
|
|
| async def _react_loop( |
| self, question: str, tenant_id: Optional[str] = None |
| ) -> tuple[str, List[str], List[str], float]: |
| """ |
| Run a ReACT iteration for one sub-question. |
| Returns (section_content, context_texts, entity_names, confidence_score). |
| """ |
| collected_contexts: List[str] = [] |
| entity_names: List[str] = [] |
| observations: List[str] = [] |
|
|
| for step in range(self.MAX_REACT_LOOPS): |
| |
| thought, tool_name, tool_arg = await self._think( |
| question, observations |
| ) |
|
|
| observations.append(f"[Thought]\n{thought}") |
|
|
| if tool_name == "DONE": |
| break |
|
|
| |
| tool_results = await self._act(tool_name, tool_arg, tenant_id=tenant_id) |
| self._tool_calls += 1 |
|
|
| if tool_results: |
| obs_texts = [r.get("text", str(r))[:300] for r in tool_results] |
| obs_summary = "\n".join(f"β’ {t}" for t in obs_texts[:5]) |
| observations.append(f"[{tool_name}({tool_arg})]\n{obs_summary}") |
| collected_contexts.extend(obs_texts) |
|
|
| |
| for r in tool_results: |
| if r.get("name"): |
| entity_names.append(r["name"]) |
| else: |
| observations.append(f"[{tool_name}({tool_arg})] No results.") |
| break |
|
|
| |
| if collected_contexts: |
| section, confidence = await self._write_section_with_confidence(question, collected_contexts) |
| else: |
| section = "Insufficient data found in the knowledge graph." |
| confidence = 0.0 |
|
|
| return section, collected_contexts, entity_names, confidence |
|
|
| async def _think( |
| self, question: str, observations: List[str] |
| ) -> tuple[str, str, str]: |
| """Decide which tool to call next, or return DONE.""" |
| obs_text = "\n".join(observations[-3:]) if observations else "None yet." |
|
|
| prompt = f"""You are deciding the next action to answer: |
| "{question}" |
| |
| {self.TOOLS_DESC} |
| Observations so far: |
| {obs_text} |
| |
| If you have enough information to write a good answer, choose the DONE tool. |
| Otherwise, deeply analyze the observations in a step-by-step thought and pick the most appropriate tool.""" |
|
|
| class AgentAction(BaseModel): |
| thought: str = Field(description="Step by step reasoning analyzing the observations and deciding what to do next.") |
| tool_name: Literal["InsightForge", "PanoramaSearch", "QuickSearch", "DONE"] = Field(description="The chosen tool.") |
| tool_arg: str = Field(description="The argument to pass to the tool. Empty if DONE.") |
|
|
| try: |
| action: AgentAction = await self.llm.complete_structured( |
| prompt=prompt, |
| response_model=AgentAction, |
| system_prompt="You are a meticulous investigative agent generating JSON." |
| ) |
| return action.thought, action.tool_name, action.tool_arg |
| except Exception as exc: |
| return f"Failed to parse or error: {exc}", "DONE", "" |
|
|
| async def _act( |
| self, tool_name: str, tool_arg: str, tenant_id: Optional[str] = None |
| ) -> List[Dict[str, Any]]: |
| """Dispatch tool call and return results.""" |
| try: |
| if tool_name == "InsightForge": |
| return await self.insight_forge.run(tool_arg, tenant_id=tenant_id) |
| elif tool_name == "PanoramaSearch": |
| return await self.panorama.run(tool_arg, tenant_id=tenant_id) |
| elif tool_name == "QuickSearch": |
| return await self.quick_search.run(tool_arg, tenant_id=tenant_id) |
| except Exception as exc: |
| logger.info(f"[ReportAgent] Tool {tool_name} failed: {exc}") |
| return [] |
|
|
| async def _write_section_with_confidence( |
| self, question: str, contexts: List[str] |
| ) -> tuple[str, float]: |
| """Generate a report section from retrieved contexts and provide a structured confidence score.""" |
| context_text = "\n\n".join(f"[Source {i+1}]: {c}" for i, c in enumerate(contexts[:8])) |
|
|
| prompt = f"""Write a factual, well-structured paragraph answering: |
| "{question}" |
| |
| Based ONLY on the following knowledge graph data: |
| {context_text} |
| |
| Instructions: |
| - Be specific and cite entities by name |
| - Do not hallucinate or add information not in the sources |
| - 2-4 sentences is ideal |
| - Also output a confidence score from 0.0 to 1.0 reflecting how fully the data answers the question. |
| """ |
| class SectionResult(BaseModel): |
| content: str = Field(description="The 2-4 sentence report section or state data is insufficient.") |
| confidence: float = Field(description="Confidence score 0.0 to 1.0 based on data sufficiency") |
|
|
| try: |
| res: SectionResult = await self.llm.complete_structured( |
| prompt=prompt, |
| response_model=SectionResult, |
| system_prompt="You are an analytical writer crafting a knowledge-graph report section.", |
| ) |
| return res.content, res.confidence |
| except Exception: |
| return "Unable to generate section due to LLM error.", 0.0 |
|
|
| async def _write_executive_summary( |
| self, topic: str, sections: Dict[str, str] |
| ) -> str: |
| """Synthesize all sections into a 3-sentence executive summary.""" |
| section_text = "\n\n".join( |
| f"## {q}\n{a}" for q, a in list(sections.items())[:5] |
| ) |
| prompt = f"""Write a 2-3 sentence executive summary for a report on: "{topic}" |
| |
| Report findings: |
| {section_text[:2000]} |
| |
| Executive summary (concise, factual, highlight the most important finding):""" |
| try: |
| return await self.llm.complete(prompt, temperature=0.3) |
| except Exception: |
| return f"Analysis of {topic} based on knowledge graph data." |
|
|
| def _compile_markdown( |
| self, |
| topic: str, |
| exec_summary: str, |
| sections: Dict[str, str], |
| key_entities: List[str], |
| ) -> str: |
| lines = [ |
| f"# Report: {topic}", |
| f"*Generated: {datetime.now(timezone.utc).replace(tzinfo=None).strftime('%Y-%m-%d %H:%M UTC')}*\n", |
| "## Executive Summary", |
| exec_summary, |
| "", |
| ] |
| for question, content in sections.items(): |
| lines.append(f"## {question}") |
| lines.append(content) |
| lines.append("") |
|
|
| if key_entities: |
| lines.append("## Key Entities Referenced") |
| lines.append(", ".join(key_entities)) |
|
|
| return "\n".join(lines) |
|
|