File size: 11,257 Bytes
7681765
 
 
2d43b8b
74e887d
7681765
c895509
7681765
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8ac8a9d
 
 
 
7681765
 
 
 
 
 
 
8ac8a9d
7681765
 
 
 
8ac8a9d
7681765
 
8ac8a9d
 
 
 
 
 
 
 
 
7681765
a745005
 
7681765
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a745005
7681765
 
 
 
 
 
 
 
 
 
8ac8a9d
7681765
 
 
 
 
 
74e887d
 
7681765
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74e887d
 
 
7681765
74e887d
7681765
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74e887d
 
 
 
7681765
74e887d
7681765
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8ac8a9d
c895509
7681765
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8ac8a9d
c895509
7681765
 
8ac8a9d
74e887d
 
 
 
 
 
 
 
 
 
7681765
74e887d
 
 
 
7681765
 
 
 
 
 
 
 
 
 
 
 
 
a745005
7681765
 
 
74e887d
 
 
8ac8a9d
74e887d
 
 
 
 
 
a745005
7681765
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
"""Main LangGraph workflow for market intelligence."""

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from langgraph.checkpoint.memory import MemorySaver

from src.workflows.types import IntelligenceState, ResearchType
from src.agents.researcher import ResearchAgent
from src.agents.analyst import AnalysisAgent
from src.agents.writer import WriterAgent
from src.utils.cost_tracker import CostTracker, BudgetExceededError
from src.utils.logging import setup_logger

logger = setup_logger(__name__)


class MarketIntelligenceWorkflow:
    """
    LangGraph workflow orchestrating research, analysis, and writing agents.

    Features:
    - Multi-agent coordination
    - State persistence with checkpointing
    - Cost tracking and budget enforcement
    - Human-in-the-loop approval
    - Error recovery
    """

    def __init__(
        self,
        checkpoint_path: str = "./checkpoints.db",
        max_budget: float = 2.0,
        model_name: str | None = None,
    ):
        """
        Initialize workflow.

        Args:
            checkpoint_path: Path to SQLite checkpoint database
            max_budget: Maximum cost per run in USD
            model_name: Name of the LLM model to use
        """
        self.max_budget = max_budget
        self.cost_tracker = CostTracker()
        self.checkpoint_path = checkpoint_path
        self.model_name = model_name

        # Initialize agents (shared cost tracker)
        self.research_agent = ResearchAgent(
            cost_tracker=self.cost_tracker, model=model_name
        )
        self.analysis_agent = AnalysisAgent(
            cost_tracker=self.cost_tracker, model=model_name
        )
        self.writer_agent = WriterAgent(
            cost_tracker=self.cost_tracker, model=model_name
        )

        # Build workflow graph blueprint
        self.graph_builder = self._build_graph()

        logger.info("Market Intelligence Workflow initialized")

    def _build_graph(self) -> StateGraph:
        """Build LangGraph workflow."""
        # Initialize graph
        graph = StateGraph(IntelligenceState)

        # Add nodes (agent wrappers)
        graph.add_node("research", self._research_node)
        graph.add_node("analysis", self._analysis_node)
        graph.add_node("writing", self._writing_node)
        graph.add_node("human_review", self._human_review_node)

        # Set entry point
        graph.set_entry_point("research")

        # Add edges
        graph.add_conditional_edges(
            "research",
            self._should_continue_to_analysis,
            {"analysis": "analysis", "end": END},
        )

        graph.add_edge("analysis", "writing")
        graph.add_edge("writing", "human_review")

        graph.add_conditional_edges(
            "human_review",
            self._check_approval,
            {"approved": END, "revise": "research", "max_revisions": END},
        )

        return graph

    async def _research_node(self, state: IntelligenceState) -> dict:
        """Research agent node."""
        logger.info(f"Research node: {state['company_name']}")

        try:
            # Run research agent
            research_results = await self.research_agent.run(
                company_name=state["company_name"],
                industry=state.get("industry"),
                research_depth=state.get("research_depth", "comprehensive"),
            )

            # Update state
            return {
                "current_agent": "research",
                "research_data": research_results,
                "competitors": research_results.get("competitors", ""),
                "market_trends": research_results.get("market_trends", ""),
                "raw_sources": research_results.get("raw_sources", []),
                "iteration": state.get("iteration", 0) + 1,
            }

        except Exception as e:
            logger.error(f"Research node failed: {e}")
            return {
                "errors": [f"Research failed: {str(e)}"],
                "current_agent": "research",
            }

    async def _analysis_node(self, state: IntelligenceState) -> dict:
        """Analysis agent node."""
        logger.info(f"Analysis node: {state['company_name']}")

        try:
            # Check budget before expensive analysis
            self.cost_tracker.check_budget(self.max_budget)

            # Run analysis agent
            analysis_results = await self.analysis_agent.run(
                research_data=state["research_data"]
            )

            # Update state
            return {
                "current_agent": "analysis",
                "swot": analysis_results.get("swot", ""),
                "competitive_matrix": analysis_results.get("competitive_matrix", ""),
                "positioning": analysis_results.get("positioning", ""),
                "strategic_recommendations": analysis_results.get(
                    "strategic_recommendations", ""
                ),
            }

        except BudgetExceededError as e:
            logger.error(f"Budget exceeded: {e}")
            return {
                "errors": [f"Budget exceeded: {str(e)}"],
                "current_agent": "analysis",
            }
        except Exception as e:
            logger.error(f"Analysis node failed: {e}")
            return {
                "errors": [f"Analysis failed: {str(e)}"],
                "current_agent": "analysis",
            }

    async def _writing_node(self, state: IntelligenceState) -> dict:
        """Writer agent node."""
        logger.info(f"Writing node: {state['company_name']}")

        try:
            # Run writer agent
            report_results = await self.writer_agent.run(
                research_data=state["research_data"],
                analysis_data={
                    "company_name": state["company_name"],
                    "swot": state.get("swot", ""),
                    "competitive_matrix": state.get("competitive_matrix", ""),
                    "positioning": state.get("positioning", ""),
                    "strategic_recommendations": state.get(
                        "strategic_recommendations", ""
                    ),
                },
            )

            # Get cost summary
            cost_summary = self.cost_tracker.get_summary()

            # Update state
            return {
                "current_agent": "writing",
                "executive_summary": report_results.get("executive_summary", ""),
                "full_report": report_results.get("full_report", ""),
                "report_metadata": report_results.get("metadata", {}),
                "total_cost": cost_summary["total_cost"],
                "total_tokens": cost_summary["total_tokens"],
            }

        except Exception as e:
            logger.error(f"Writing node failed: {e}")
            return {
                "errors": [f"Writing failed: {str(e)}"],
                "current_agent": "writing",
            }

    async def _human_review_node(self, state: IntelligenceState) -> dict:
        """Human review node (placeholder for now)."""
        logger.info(f"Human review node: {state['company_name']}")

        # For now, auto-approve
        # In Phase 5, this will connect to the Gradio UI
        return {
            "current_agent": "human_review",
            "approved": True,  # Auto-approve for testing
            "human_feedback": None,
        }

    def _should_continue_to_analysis(self, state: IntelligenceState) -> str:
        """Decide whether to continue to analysis or end."""
        # Check if research was successful
        if state.get("errors") and state["errors"]:
            logger.warning("Research had errors, ending workflow")
            return "end"

        if not state.get("research_data"):
            logger.warning("No research data, ending workflow")
            return "end"

        return "analysis"

    def _check_approval(self, state: IntelligenceState) -> str:
        """Check if report is approved or needs revision."""
        # Check max revisions
        revision_count = state.get("revision_count", 0)
        if revision_count >= 2:
            logger.warning("Max revisions reached")
            return "max_revisions"

        # Check approval
        if state.get("approved"):
            return "approved"

        # Revision requested
        if state.get("human_feedback"):
            return "revise"

        # Default to approved
        return "approved"

    async def run(
        self,
        company_name: str,
        industry: str | None = None,
        thread_id: str | None = None,
        research_depth: str = "comprehensive",
        research_type: ResearchType = ResearchType.COMPANY_ANALYSIS,
    ) -> dict:
        """
        Run the complete workflow.

        Args:
            company_name: Target company name
            industry: Optional industry context
            thread_id: Optional thread ID for checkpointing

        Returns:
            Final state dictionary
        """
        logger.info(f"Starting workflow for: {company_name}")

        # Initial state
        initial_state: IntelligenceState = {
            "research_type": research_type,
            "company_name": company_name,
            "industry": industry,
            "research_depth": research_depth,
            "research_data": {
                "company_name": company_name,
                "industry": industry,
                "company_overview": "",
                "competitors": "",
                "market_trends": "",
                "raw_sources": [],
            },
            "competitors": "",
            "market_trends": "",
            "raw_sources": [],
            "swot": "",
            "competitive_matrix": "",
            "positioning": "",
            "strategic_recommendations": "",
            "executive_summary": "",
            "full_report": "",
            "report_metadata": {},
            "current_agent": "research",
            "iteration": 0,
            "total_cost": 0.0,
            "total_tokens": 0,
            "errors": [],
            "human_feedback": None,
            "approved": False,
            "revision_count": 0,
        }

        # Run workflow with async checkpointer
        config = {"configurable": {"thread_id": thread_id or "default"}}

        try:
            if self.checkpoint_path == ":memory:":
                memory_checkpointer = MemorySaver()
                workflow = self.graph_builder.compile(checkpointer=memory_checkpointer)
                final_state = await workflow.ainvoke(initial_state, config)  # type: ignore[arg-type]
            else:
                async with AsyncSqliteSaver.from_conn_string(
                    self.checkpoint_path
                ) as checkpointer:
                    workflow = self.graph_builder.compile(checkpointer=checkpointer)
                    final_state = await workflow.ainvoke(initial_state, config)  # type: ignore[arg-type]

            logger.info(f"Workflow complete. Cost: ${final_state['total_cost']:.4f}")
            return final_state

        except Exception as e:
            logger.error(f"Workflow failed: {e}")
            raise