Spaces:
Runtime error
Runtime error
LangGraph Workflow Architecture
Technical documentation for the multi-agent orchestration system.
System Architecture
User Input β Research Agent β Analysis Agent β Writer Agent β Report
β β β
Tavily API SWOT/Matrix Markdown
State Flow: LangGraph StateGraph manages shared state across agents with SQLite checkpointing for crash recovery.
Agent Responsibilities
| Agent | Input | Output | External Calls |
|---|---|---|---|
| Research | Company name, industry | Competitors, market data, sources | Tavily API (3 queries) |
| Analysis | Research data | SWOT, competitive matrix, recommendations | LLM (4-6 calls) |
| Writer | Research + Analysis | Executive summary, full report | LLM (2-3 calls) |
Conditional Routing
Research β Analysis:
- If errors or no data: END
- Else: Continue to Analysis
Human Review β END/Revision:
- If approved: END
- If max revisions (2): END
- If feedback provided: Loop to Research
State Schema
IntelligenceState = {
"company_name": str,
"industry": str | None,
"research_data": dict,
"swot": dict,
"full_report": str,
"current_agent": str,
"total_cost": float,
"approved": bool,
"errors": list,
# ... 15 more fields
}
Full schema: src/workflows/state.py
Cost Management
Budget enforcement at 3 points:
- Before Analysis node (most expensive)
- After each LLM call via CostTracker
- Workflow raises
BudgetExceededErrorif exceeded
Default: $2.00 per run
Checkpointing
SQLite checkpoints (./checkpoints.db) enable:
- Resume after crashes
- Audit trail for compliance
- Debug state at each step
# Resume from checkpoint
workflow = MarketIntelligenceWorkflow()
result = await workflow.run(
company_name="Tesla",
thread_id="tesla-analysis-1" # Same ID = resume
)
Error Handling
Errors accumulate in state["errors"]:
- Research failure β Workflow stops
- Analysis error β Logged, may continue
- Budget exceeded β Immediate stop
Usage
Basic:
from src.workflows.market_analysis import MarketIntelligenceWorkflow
workflow = MarketIntelligenceWorkflow()
result = await workflow.run(
company_name="Tesla Model Y",
industry="Electric Vehicles"
)
Custom Budget:
workflow = MarketIntelligenceWorkflow(max_budget=5.0)
Performance Metrics
Typical execution:
- Time: 3-5 minutes
- Cost: $0 (free) to $1.50 (Claude)
- API Calls: 9-14 total (3 search + 6-11 LLM)
- Tokens: 50K-100K
Configuration
Environment variables (.env):
DEFAULT_MODEL=x-ai/grok-4.1-fast:free
MAX_COST_PER_RUN=2.0
LANGCHAIN_TRACING=true
Observability
LangSmith integration provides:
- Full execution traces
- Agent decision debugging
- Cost tracking per call
- Performance bottleneck identification
Enable: Set LANGCHAIN_TRACING=true in .env
Dashboard: https://smith.langchain.com
Testing
pytest tests/unit/test_workflow.py -v # 11 workflow tests
pytest tests/integration/ -v # Integration tests
python scripts/test_workflow.py # E2E with real APIs
Extending the Workflow
Add New Agent:
- Create agent in
src/agents/new_agent.py - Add node wrapper:
async def _new_agent_node(self, state):
result = await self.new_agent.run(state["research_data"])
return {"new_field": result}
- Wire into graph:
graph.add_node("new_agent", self._new_agent_node)
graph.add_edge("analysis", "new_agent")
Modify Routing:
def _custom_routing(self, state):
if state["company_name"].startswith("Enterprise"):
return "deep_analysis"
return "standard_analysis"
Troubleshooting
| Issue | Solution |
|---|---|
| Workflow stops early | Check result["errors"], verify API keys |
| Budget exceeded | Increase max_budget or use cheaper model |
| Slow performance | Check LangSmith traces, consider caching |
| Checkpoint errors | Delete checkpoints.db, check permissions |
Production Checklist
- Cost tracking and budget enforcement
- State persistence with checkpoints
- Error recovery and graceful degradation
- Observability integration
- Human-in-the-loop UI integration
- Rate limiting for API calls
- Result caching for repeated queries