| """ |
| PlannerAgent for SPARKNET - LangChain Version |
| Breaks down complex VISTA scenarios into executable workflows |
| Uses LangChain chains for structured task decomposition |
| """ |
|
|
| from typing import List, Dict, Optional, Any |
| from dataclasses import dataclass, field |
| from loguru import logger |
| import json |
| import networkx as nx |
| from pydantic import BaseModel, Field |
|
|
| from langchain_core.prompts import ChatPromptTemplate |
| from langchain_core.output_parsers import JsonOutputParser |
| from langchain_core.messages import HumanMessage, SystemMessage |
|
|
| from .base_agent import BaseAgent, Task, Message |
| from ..llm.langchain_ollama_client import LangChainOllamaClient |
| from ..workflow.langgraph_state import SubTask as SubTaskModel, TaskStatus |
|
|
|
|
| |
| class TaskDecomposition(BaseModel): |
| """Structured output from planning chain""" |
| subtasks: List[Dict[str, Any]] = Field(description="List of subtasks with dependencies") |
| reasoning: str = Field(description="Explanation of the planning strategy") |
| estimated_total_duration: float = Field(description="Total estimated duration in seconds") |
|
|
|
|
| @dataclass |
| class TaskGraph: |
| """Directed acyclic graph of tasks with dependencies.""" |
| subtasks: Dict[str, SubTaskModel] = field(default_factory=dict) |
| graph: nx.DiGraph = field(default_factory=nx.DiGraph) |
|
|
| def add_subtask(self, subtask: SubTaskModel): |
| """Add a subtask to the graph.""" |
| self.subtasks[subtask.id] = subtask |
| self.graph.add_node(subtask.id, task=subtask) |
|
|
| |
| for dep_id in subtask.dependencies: |
| if dep_id in self.subtasks: |
| self.graph.add_edge(dep_id, subtask.id) |
|
|
| def get_execution_order(self) -> List[List[str]]: |
| """ |
| Get tasks in execution order (topological sort). |
| Returns list of lists - inner lists can be executed in parallel. |
| """ |
| try: |
| generations = list(nx.topological_generations(self.graph)) |
| return generations |
| except nx.NetworkXError as e: |
| logger.error(f"Error in topological sort: {e}") |
| return [] |
|
|
| def validate(self) -> bool: |
| """Validate graph has no cycles.""" |
| return nx.is_directed_acyclic_graph(self.graph) |
|
|
|
|
| class PlannerAgent(BaseAgent): |
| """ |
| Agent specialized in task decomposition and workflow planning. |
| Uses LangChain chains with qwen2.5:14b for complex reasoning. |
| """ |
|
|
| |
| SCENARIO_TEMPLATES = { |
| 'patent_wakeup': { |
| 'description': 'Analyze dormant patent and create valorization roadmap', |
| 'stages': [ |
| { |
| 'name': 'document_analysis', |
| 'agent': 'DocumentAnalysisAgent', |
| 'description': 'Extract and analyze patent content', |
| 'dependencies': [], |
| }, |
| { |
| 'name': 'market_analysis', |
| 'agent': 'MarketAnalysisAgent', |
| 'description': 'Identify market opportunities for patent', |
| 'dependencies': ['document_analysis'], |
| }, |
| { |
| 'name': 'matchmaking', |
| 'agent': 'MatchmakingAgent', |
| 'description': 'Match patent with potential licensees', |
| 'dependencies': ['document_analysis', 'market_analysis'], |
| }, |
| { |
| 'name': 'outreach', |
| 'agent': 'OutreachAgent', |
| 'description': 'Generate valorization brief and outreach materials', |
| 'dependencies': ['matchmaking'], |
| }, |
| ], |
| }, |
| 'agreement_safety': { |
| 'description': 'Review legal agreement for risks and compliance', |
| 'stages': [ |
| { |
| 'name': 'document_parsing', |
| 'agent': 'LegalAnalysisAgent', |
| 'description': 'Parse agreement and extract clauses', |
| 'dependencies': [], |
| }, |
| { |
| 'name': 'compliance_check', |
| 'agent': 'ComplianceAgent', |
| 'description': 'Check GDPR and Law 25 compliance', |
| 'dependencies': ['document_parsing'], |
| }, |
| { |
| 'name': 'risk_assessment', |
| 'agent': 'RiskAssessmentAgent', |
| 'description': 'Identify problematic clauses and risks', |
| 'dependencies': ['document_parsing'], |
| }, |
| { |
| 'name': 'recommendations', |
| 'agent': 'RecommendationAgent', |
| 'description': 'Generate improvement suggestions', |
| 'dependencies': ['compliance_check', 'risk_assessment'], |
| }, |
| ], |
| }, |
| 'partner_matching': { |
| 'description': 'Match stakeholders based on complementary capabilities', |
| 'stages': [ |
| { |
| 'name': 'profiling', |
| 'agent': 'ProfilingAgent', |
| 'description': 'Extract stakeholder capabilities and needs', |
| 'dependencies': [], |
| }, |
| { |
| 'name': 'semantic_matching', |
| 'agent': 'SemanticMatchingAgent', |
| 'description': 'Find complementary partners using embeddings', |
| 'dependencies': ['profiling'], |
| }, |
| { |
| 'name': 'network_analysis', |
| 'agent': 'NetworkAnalysisAgent', |
| 'description': 'Identify strategic network connections', |
| 'dependencies': ['profiling'], |
| }, |
| { |
| 'name': 'facilitation', |
| 'agent': 'ConnectionFacilitatorAgent', |
| 'description': 'Generate introduction materials', |
| 'dependencies': ['semantic_matching', 'network_analysis'], |
| }, |
| ], |
| }, |
| } |
|
|
| def __init__( |
| self, |
| llm_client: LangChainOllamaClient, |
| memory_agent: Optional['MemoryAgent'] = None, |
| temperature: float = 0.7, |
| ): |
| """ |
| Initialize PlannerAgent with LangChain client. |
| |
| Args: |
| llm_client: LangChain Ollama client |
| memory_agent: Optional memory agent for context |
| temperature: LLM temperature for planning |
| """ |
| self.llm_client = llm_client |
| self.memory_agent = memory_agent |
| self.temperature = temperature |
|
|
| |
| self.planning_chain = self._create_planning_chain() |
| self.refinement_chain = self._create_refinement_chain() |
|
|
| |
| self.name = "PlannerAgent" |
| self.description = "Task decomposition and workflow planning" |
|
|
| logger.info(f"Initialized PlannerAgent with LangChain (complexity: complex)") |
|
|
| def _create_planning_chain(self): |
| """ |
| Create LangChain chain for task decomposition. |
| |
| Returns: |
| Runnable chain: prompt | llm | parser |
| """ |
| system_template = """You are a strategic planning agent for research valorization tasks. |
| |
| Your role is to: |
| 1. Analyze complex tasks and break them into manageable subtasks |
| 2. Identify dependencies between subtasks |
| 3. Assign appropriate agents to each subtask |
| 4. Estimate task complexity and duration |
| 5. Create optimal execution plans |
| |
| Available agent types: |
| - ExecutorAgent: General task execution |
| - DocumentAnalysisAgent: Analyze patents and documents |
| - MarketAnalysisAgent: Market research and opportunity identification |
| - MatchmakingAgent: Stakeholder matching and connections |
| - OutreachAgent: Generate outreach materials and briefs |
| - LegalAnalysisAgent: Legal document analysis |
| - ComplianceAgent: Compliance checking (GDPR, Law 25) |
| - RiskAssessmentAgent: Risk identification |
| - ProfilingAgent: Stakeholder profiling |
| - SemanticMatchingAgent: Semantic similarity matching |
| - NetworkAnalysisAgent: Network and relationship analysis |
| |
| Output your plan as a structured JSON object with: |
| - subtasks: List of subtask objects with id, description, agent_type, dependencies, estimated_duration, priority |
| - reasoning: Your strategic reasoning for this decomposition |
| - estimated_total_duration: Total estimated time in seconds""" |
|
|
| human_template = """Given the following task, create a detailed execution plan: |
| |
| Task: {task_description} |
| |
| {context_section} |
| |
| Break this down into specific subtasks. For each subtask: |
| - Give it a unique ID (use snake_case) |
| - Describe what needs to be done |
| - Specify which agent type should handle it |
| - List any dependencies (IDs of tasks that must complete first) |
| - Estimate duration in seconds |
| - Set priority (1=highest) |
| |
| Think step-by-step about: |
| - What is the ultimate goal? |
| - What information is needed? |
| - What are the logical stages? |
| - Which subtasks can run in parallel? |
| - What are the critical dependencies? |
| |
| Output JSON only.""" |
|
|
| prompt = ChatPromptTemplate.from_messages([ |
| ("system", system_template), |
| ("human", human_template) |
| ]) |
|
|
| |
| llm = self.llm_client.get_llm(complexity="complex", temperature=self.temperature) |
|
|
| |
| parser = JsonOutputParser(pydantic_object=TaskDecomposition) |
|
|
| |
| chain = prompt | llm | parser |
|
|
| return chain |
|
|
| def _create_refinement_chain(self): |
| """ |
| Create LangChain chain for replanning based on feedback. |
| |
| Returns: |
| Runnable chain for refinement |
| """ |
| system_template = """You are refining an existing task plan based on feedback. |
| |
| Your role is to: |
| 1. Review the original plan and feedback |
| 2. Identify what went wrong or could be improved |
| 3. Create an improved plan that addresses the issues |
| 4. Maintain successful elements from the original plan |
| |
| Be thoughtful about what to change and what to keep.""" |
|
|
| human_template = """Refine the following plan based on feedback: |
| |
| Original Task: {task_description} |
| |
| Original Plan: |
| {original_plan} |
| |
| Feedback from execution: |
| {feedback} |
| |
| Issues encountered: |
| {issues} |
| |
| Create an improved plan that addresses these issues while maintaining what worked well. |
| Output JSON in the same format as before.""" |
|
|
| prompt = ChatPromptTemplate.from_messages([ |
| ("system", system_template), |
| ("human", human_template) |
| ]) |
|
|
| llm = self.llm_client.get_llm(complexity="complex", temperature=self.temperature) |
| parser = JsonOutputParser(pydantic_object=TaskDecomposition) |
|
|
| chain = prompt | llm | parser |
|
|
| return chain |
|
|
| async def process_task(self, task: Task) -> Task: |
| """ |
| Process planning task by decomposing into workflow. |
| |
| Args: |
| task: High-level task to plan |
| |
| Returns: |
| Updated task with plan in result |
| """ |
| logger.info(f"PlannerAgent planning task: {task.id}") |
| task.status = "in_progress" |
|
|
| try: |
| |
| scenario = task.metadata.get('scenario') if task.metadata else None |
|
|
| if scenario and scenario in self.SCENARIO_TEMPLATES: |
| |
| logger.info(f"Using template for scenario: {scenario}") |
| task_graph = await self._plan_from_template(task, scenario) |
| else: |
| |
| logger.info("Using LangChain planning for custom task") |
| task_graph = await self._plan_with_langchain(task) |
|
|
| |
| if not task_graph.validate(): |
| raise ValueError("Generated task graph contains cycles!") |
|
|
| |
| task.result = { |
| 'task_graph': task_graph, |
| 'execution_order': task_graph.get_execution_order(), |
| 'total_subtasks': len(task_graph.subtasks), |
| } |
| task.status = "completed" |
|
|
| logger.info(f"Planning completed: {len(task_graph.subtasks)} subtasks") |
|
|
| except Exception as e: |
| logger.error(f"Planning failed: {e}") |
| task.status = "failed" |
| task.error = str(e) |
|
|
| return task |
|
|
| async def _plan_from_template(self, task: Task, scenario: str) -> TaskGraph: |
| """ |
| Create task graph from scenario template. |
| |
| Args: |
| task: Original task |
| scenario: Scenario identifier |
| |
| Returns: |
| TaskGraph based on template |
| """ |
| template = self.SCENARIO_TEMPLATES[scenario] |
| task_graph = TaskGraph() |
|
|
| |
| params = task.metadata.get('parameters', {}) if task.metadata else {} |
|
|
| |
| for i, stage in enumerate(template['stages']): |
| subtask = SubTaskModel( |
| id=f"{task.id}_{stage['name']}", |
| description=stage['description'], |
| agent_type=stage['agent'], |
| dependencies=[f"{task.id}_{dep}" for dep in stage['dependencies']], |
| estimated_duration=30.0, |
| priority=i + 1, |
| parameters=params, |
| status=TaskStatus.PENDING |
| ) |
| task_graph.add_subtask(subtask) |
|
|
| logger.debug(f"Created task graph with {len(task_graph.subtasks)} subtasks from template") |
|
|
| return task_graph |
|
|
| async def _plan_with_langchain(self, task: Task, context: Optional[List[Any]] = None) -> TaskGraph: |
| """ |
| Create task graph using LangChain planning chain. |
| |
| Args: |
| task: Original task |
| context: Optional context from memory |
| |
| Returns: |
| TaskGraph generated by LangChain |
| """ |
| |
| context_section = "" |
| if context and len(context) > 0: |
| context_section = "Relevant past experiences:\n" |
| for i, ctx in enumerate(context[:3], 1): |
| context_section += f"{i}. {ctx.page_content[:200]}...\n" |
|
|
| |
| try: |
| result = await self.planning_chain.ainvoke({ |
| "task_description": task.description, |
| "context_section": context_section |
| }) |
|
|
| |
| task_graph = TaskGraph() |
|
|
| for subtask_data in result.get('subtasks', []): |
| subtask = SubTaskModel( |
| id=f"{task.id}_{subtask_data.get('id', f'subtask_{len(task_graph.subtasks)}')}", |
| description=subtask_data.get('description', ''), |
| agent_type=subtask_data.get('agent_type', 'ExecutorAgent'), |
| dependencies=[f"{task.id}_{dep}" for dep in subtask_data.get('dependencies', [])], |
| estimated_duration=subtask_data.get('estimated_duration', 30.0), |
| priority=subtask_data.get('priority', 0), |
| parameters=subtask_data.get('parameters', {}), |
| status=TaskStatus.PENDING |
| ) |
| task_graph.add_subtask(subtask) |
|
|
| logger.debug(f"Created task graph with {len(task_graph.subtasks)} subtasks from LangChain") |
|
|
| return task_graph |
|
|
| except Exception as e: |
| logger.error(f"Failed to parse LangChain planning response: {e}") |
| raise ValueError(f"Failed to generate plan: {e}") |
|
|
| async def decompose_task( |
| self, |
| task_description: str, |
| scenario: Optional[str] = None, |
| context: Optional[List[Any]] = None |
| ) -> TaskGraph: |
| """ |
| Decompose a high-level task into subtasks. |
| |
| Args: |
| task_description: Natural language description |
| scenario: Optional scenario identifier |
| context: Optional context from memory |
| |
| Returns: |
| TaskGraph with subtasks and dependencies |
| """ |
| |
| task = Task( |
| id=f"plan_{hash(task_description) % 10000}", |
| description=task_description, |
| metadata={'scenario': scenario} if scenario else {}, |
| ) |
|
|
| |
| result_task = await self.process_task(task) |
|
|
| if result_task.status == "completed" and result_task.result: |
| return result_task.result['task_graph'] |
| else: |
| raise RuntimeError(f"Planning failed: {result_task.error}") |
|
|
| async def adapt_plan( |
| self, |
| task_graph: TaskGraph, |
| feedback: str, |
| issues: List[str] |
| ) -> TaskGraph: |
| """ |
| Adapt an existing plan based on execution feedback. |
| |
| Args: |
| task_graph: Original task graph |
| feedback: Feedback from execution |
| issues: List of issues encountered |
| |
| Returns: |
| Updated task graph |
| """ |
| logger.info("Adapting plan based on feedback") |
|
|
| |
| original_plan = { |
| "subtasks": [ |
| { |
| "id": st.id, |
| "description": st.description, |
| "agent_type": st.agent_type, |
| "dependencies": st.dependencies |
| } |
| for st in task_graph.subtasks.values() |
| ] |
| } |
|
|
| try: |
| |
| result = await self.refinement_chain.ainvoke({ |
| "task_description": "Refine task decomposition", |
| "original_plan": json.dumps(original_plan, indent=2), |
| "feedback": feedback, |
| "issues": "\n".join(f"- {issue}" for issue in issues) |
| }) |
|
|
| |
| new_task_graph = TaskGraph() |
|
|
| for subtask_data in result.get('subtasks', []): |
| subtask = SubTaskModel( |
| id=subtask_data.get('id', f'subtask_{len(new_task_graph.subtasks)}'), |
| description=subtask_data.get('description', ''), |
| agent_type=subtask_data.get('agent_type', 'ExecutorAgent'), |
| dependencies=subtask_data.get('dependencies', []), |
| estimated_duration=subtask_data.get('estimated_duration', 30.0), |
| priority=subtask_data.get('priority', 0), |
| parameters=subtask_data.get('parameters', {}), |
| status=TaskStatus.PENDING |
| ) |
| new_task_graph.add_subtask(subtask) |
|
|
| logger.info(f"Plan adapted: {len(new_task_graph.subtasks)} subtasks") |
| return new_task_graph |
|
|
| except Exception as e: |
| logger.error(f"Plan adaptation failed: {e}, returning original plan") |
| return task_graph |
|
|
| def get_parallel_tasks(self, task_graph: TaskGraph) -> List[List[SubTaskModel]]: |
| """ |
| Get tasks that can be executed in parallel. |
| |
| Args: |
| task_graph: Task graph |
| |
| Returns: |
| List of parallel task groups |
| """ |
| execution_order = task_graph.get_execution_order() |
| parallel_groups = [] |
|
|
| for task_ids in execution_order: |
| group = [task_graph.subtasks[task_id] for task_id in task_ids] |
| parallel_groups.append(group) |
|
|
| return parallel_groups |
|
|