File size: 4,463 Bytes
741570f
2d43b8b
741570f
2d43b8b
741570f
2d43b8b
 
741570f
 
 
2d43b8b
 
741570f
2d43b8b
741570f
2d43b8b
741570f
 
 
 
 
2d43b8b
741570f
2d43b8b
741570f
 
 
2d43b8b
741570f
 
 
 
2d43b8b
741570f
2d43b8b
 
741570f
 
 
 
 
 
 
 
 
 
 
 
2d43b8b
 
741570f
2d43b8b
 
 
741570f
 
 
 
2d43b8b
 
 
 
 
741570f
 
 
 
2d43b8b
741570f
 
 
 
 
 
 
 
2d43b8b
 
 
741570f
 
 
2d43b8b
 
741570f
2d43b8b
741570f
2d43b8b
a745005
2d43b8b
 
 
 
 
 
 
 
741570f
2d43b8b
 
 
 
741570f
2d43b8b
 
741570f
 
 
 
2d43b8b
 
 
741570f
2d43b8b
741570f
2d43b8b
5857a45
2d43b8b
 
 
 
741570f
 
 
 
 
2d43b8b
5857a45
2d43b8b
741570f
2d43b8b
 
 
 
741570f
 
 
2d43b8b
 
741570f
2d43b8b
741570f
2d43b8b
741570f
 
 
 
 
 
 
 
 
 
 
 
2d43b8b
741570f
2d43b8b
741570f
2d43b8b
 
 
 
 
 
 
741570f
 
 
 
 
 
 
 
 
 
 
 
 
b1b661d
741570f
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# LangGraph Workflow Architecture

Technical documentation for the multi-agent orchestration system.

## System Architecture

```
User Input β†’ Research Agent β†’ Analysis Agent β†’ Writer Agent β†’ Report
                ↓                ↓                ↓
            Tavily API       SWOT/Matrix      Markdown
```

**State Flow:** LangGraph StateGraph manages shared state across agents with SQLite checkpointing for crash recovery.

## Agent Responsibilities

| Agent | Input | Output | External Calls |
|-------|-------|--------|----------------|
| Research | Company name, industry | Competitors, market data, sources | Tavily API (3 queries) |
| Analysis | Research data | SWOT, competitive matrix, recommendations | LLM (4-6 calls) |
| Writer | Research + Analysis | Executive summary, full report | LLM (2-3 calls) |

## Conditional Routing

**Research β†’ Analysis:**
- If errors or no data: END
- Else: Continue to Analysis

**Human Review β†’ END/Revision:**
- If approved: END
- If max revisions (2): END
- If feedback provided: Loop to Research

## State Schema

```python
IntelligenceState = {
    "company_name": str,
    "industry": str | None,
    "research_data": dict,
    "swot": dict,
    "full_report": str,
    "current_agent": str,
    "total_cost": float,
    "approved": bool,
    "errors": list,
    # ... 15 more fields
}
```

Full schema: `src/workflows/state.py`

## Cost Management

Budget enforcement at 3 points:
1. Before Analysis node (most expensive)
2. After each LLM call via CostTracker
3. Workflow raises `BudgetExceededError` if exceeded

Default: $2.00 per run

## Checkpointing

SQLite checkpoints (`./checkpoints.db`) enable:
- Resume after crashes
- Audit trail for compliance
- Debug state at each step

```python
# Resume from checkpoint
workflow = MarketIntelligenceWorkflow()
result = await workflow.run(
    company_name="Tesla",
    thread_id="tesla-analysis-1"  # Same ID = resume
)
```

## Error Handling

Errors accumulate in `state["errors"]`:
- Research failure β†’ Workflow stops
- Analysis error β†’ Logged, may continue
- Budget exceeded β†’ Immediate stop

## Usage

**Basic:**
```python
from src.workflows.market_analysis import MarketIntelligenceWorkflow

workflow = MarketIntelligenceWorkflow()
result = await workflow.run(
    company_name="Tesla Model Y",
    industry="Electric Vehicles"
)
```

**Custom Budget:**
```python
workflow = MarketIntelligenceWorkflow(max_budget=5.0)
```

## Performance Metrics

Typical execution:
- **Time:** 3-5 minutes
- **Cost:** $0 (free) to $1.50 (Claude)
- **API Calls:** 9-14 total (3 search + 6-11 LLM)
- **Tokens:** 50K-100K

## Configuration

Environment variables (`.env`):
```bash
DEFAULT_MODEL=x-ai/grok-4.1-fast:free
MAX_COST_PER_RUN=2.0
LANGCHAIN_TRACING=true
```

## Observability

LangSmith integration provides:
- Full execution traces
- Agent decision debugging
- Cost tracking per call
- Performance bottleneck identification

Enable: Set `LANGCHAIN_TRACING=true` in `.env`

Dashboard: https://smith.langchain.com

## Testing

```bash
pytest tests/unit/test_workflow.py -v        # 11 workflow tests
pytest tests/integration/ -v                  # Integration tests
python scripts/test_workflow.py              # E2E with real APIs
```

## Extending the Workflow

**Add New Agent:**

1. Create agent in `src/agents/new_agent.py`
2. Add node wrapper:
```python
async def _new_agent_node(self, state):
    result = await self.new_agent.run(state["research_data"])
    return {"new_field": result}
```
3. Wire into graph:
```python
graph.add_node("new_agent", self._new_agent_node)
graph.add_edge("analysis", "new_agent")
```

**Modify Routing:**
```python
def _custom_routing(self, state):
    if state["company_name"].startswith("Enterprise"):
        return "deep_analysis"
    return "standard_analysis"
```

## Troubleshooting

| Issue | Solution |
|-------|----------|
| Workflow stops early | Check `result["errors"]`, verify API keys |
| Budget exceeded | Increase `max_budget` or use cheaper model |
| Slow performance | Check LangSmith traces, consider caching |
| Checkpoint errors | Delete `checkpoints.db`, check permissions |

## Production Checklist

- [x] Cost tracking and budget enforcement
- [x] State persistence with checkpoints
- [x] Error recovery and graceful degradation
- [x] Observability integration
- [ ] Human-in-the-loop UI integration
- [ ] Rate limiting for API calls
- [ ] Result caching for repeated queries