pkgprateek's picture
fix: fix LANGCHAIN_TRACING for LANGSMITH monitoring
5857a45
# LangGraph Workflow Architecture
Technical documentation for the multi-agent orchestration system.
## System Architecture
```
User Input → Research Agent → Analysis Agent → Writer Agent → Report
↓ ↓ ↓
Tavily API SWOT/Matrix Markdown
```
**State Flow:** LangGraph StateGraph manages shared state across agents with SQLite checkpointing for crash recovery.
## Agent Responsibilities
| Agent | Input | Output | External Calls |
|-------|-------|--------|----------------|
| Research | Company name, industry | Competitors, market data, sources | Tavily API (3 queries) |
| Analysis | Research data | SWOT, competitive matrix, recommendations | LLM (4-6 calls) |
| Writer | Research + Analysis | Executive summary, full report | LLM (2-3 calls) |
## Conditional Routing
**Research → Analysis:**
- If errors or no data: END
- Else: Continue to Analysis
**Human Review → END/Revision:**
- If approved: END
- If max revisions (2): END
- If feedback provided: Loop to Research
## State Schema
```python
IntelligenceState = {
"company_name": str,
"industry": str | None,
"research_data": dict,
"swot": dict,
"full_report": str,
"current_agent": str,
"total_cost": float,
"approved": bool,
"errors": list,
# ... 15 more fields
}
```
Full schema: `src/workflows/state.py`
## Cost Management
Budget enforcement at 3 points:
1. Before Analysis node (most expensive)
2. After each LLM call via CostTracker
3. Workflow raises `BudgetExceededError` if exceeded
Default: $2.00 per run
## Checkpointing
SQLite checkpoints (`./checkpoints.db`) enable:
- Resume after crashes
- Audit trail for compliance
- Debug state at each step
```python
# Resume from checkpoint
workflow = MarketIntelligenceWorkflow()
result = await workflow.run(
company_name="Tesla",
thread_id="tesla-analysis-1" # Same ID = resume
)
```
## Error Handling
Errors accumulate in `state["errors"]`:
- Research failure → Workflow stops
- Analysis error → Logged, may continue
- Budget exceeded → Immediate stop
## Usage
**Basic:**
```python
from src.workflows.market_analysis import MarketIntelligenceWorkflow
workflow = MarketIntelligenceWorkflow()
result = await workflow.run(
company_name="Tesla Model Y",
industry="Electric Vehicles"
)
```
**Custom Budget:**
```python
workflow = MarketIntelligenceWorkflow(max_budget=5.0)
```
## Performance Metrics
Typical execution:
- **Time:** 3-5 minutes
- **Cost:** $0 (free) to $1.50 (Claude)
- **API Calls:** 9-14 total (3 search + 6-11 LLM)
- **Tokens:** 50K-100K
## Configuration
Environment variables (`.env`):
```bash
DEFAULT_MODEL=x-ai/grok-4.1-fast:free
MAX_COST_PER_RUN=2.0
LANGCHAIN_TRACING=true
```
## Observability
LangSmith integration provides:
- Full execution traces
- Agent decision debugging
- Cost tracking per call
- Performance bottleneck identification
Enable: Set `LANGCHAIN_TRACING=true` in `.env`
Dashboard: https://smith.langchain.com
## Testing
```bash
pytest tests/unit/test_workflow.py -v # 11 workflow tests
pytest tests/integration/ -v # Integration tests
python scripts/test_workflow.py # E2E with real APIs
```
## Extending the Workflow
**Add New Agent:**
1. Create agent in `src/agents/new_agent.py`
2. Add node wrapper:
```python
async def _new_agent_node(self, state):
result = await self.new_agent.run(state["research_data"])
return {"new_field": result}
```
3. Wire into graph:
```python
graph.add_node("new_agent", self._new_agent_node)
graph.add_edge("analysis", "new_agent")
```
**Modify Routing:**
```python
def _custom_routing(self, state):
if state["company_name"].startswith("Enterprise"):
return "deep_analysis"
return "standard_analysis"
```
## Troubleshooting
| Issue | Solution |
|-------|----------|
| Workflow stops early | Check `result["errors"]`, verify API keys |
| Budget exceeded | Increase `max_budget` or use cheaper model |
| Slow performance | Check LangSmith traces, consider caching |
| Checkpoint errors | Delete `checkpoints.db`, check permissions |
## Production Checklist
- [x] Cost tracking and budget enforcement
- [x] State persistence with checkpoints
- [x] Error recovery and graceful degradation
- [x] Observability integration
- [ ] Human-in-the-loop UI integration
- [ ] Rate limiting for API calls
- [ ] Result caching for repeated queries