Spaces:
Runtime error
Runtime error
| # LangGraph Workflow Architecture | |
| Technical documentation for the multi-agent orchestration system. | |
| ## System Architecture | |
| ``` | |
| User Input → Research Agent → Analysis Agent → Writer Agent → Report | |
| ↓ ↓ ↓ | |
| Tavily API SWOT/Matrix Markdown | |
| ``` | |
| **State Flow:** LangGraph StateGraph manages shared state across agents with SQLite checkpointing for crash recovery. | |
| ## Agent Responsibilities | |
| | Agent | Input | Output | External Calls | | |
| |-------|-------|--------|----------------| | |
| | Research | Company name, industry | Competitors, market data, sources | Tavily API (3 queries) | | |
| | Analysis | Research data | SWOT, competitive matrix, recommendations | LLM (4-6 calls) | | |
| | Writer | Research + Analysis | Executive summary, full report | LLM (2-3 calls) | | |
| ## Conditional Routing | |
| **Research → Analysis:** | |
| - If errors or no data: END | |
| - Else: Continue to Analysis | |
| **Human Review → END/Revision:** | |
| - If approved: END | |
| - If max revisions (2): END | |
| - If feedback provided: Loop to Research | |
| ## State Schema | |
| ```python | |
| IntelligenceState = { | |
| "company_name": str, | |
| "industry": str | None, | |
| "research_data": dict, | |
| "swot": dict, | |
| "full_report": str, | |
| "current_agent": str, | |
| "total_cost": float, | |
| "approved": bool, | |
| "errors": list, | |
| # ... 15 more fields | |
| } | |
| ``` | |
| Full schema: `src/workflows/state.py` | |
| ## Cost Management | |
| Budget enforcement at 3 points: | |
| 1. Before Analysis node (most expensive) | |
| 2. After each LLM call via CostTracker | |
| 3. Workflow raises `BudgetExceededError` if exceeded | |
| Default: $2.00 per run | |
| ## Checkpointing | |
| SQLite checkpoints (`./checkpoints.db`) enable: | |
| - Resume after crashes | |
| - Audit trail for compliance | |
| - Debug state at each step | |
| ```python | |
| # Resume from checkpoint | |
| workflow = MarketIntelligenceWorkflow() | |
| result = await workflow.run( | |
| company_name="Tesla", | |
| thread_id="tesla-analysis-1" # Same ID = resume | |
| ) | |
| ``` | |
| ## Error Handling | |
| Errors accumulate in `state["errors"]`: | |
| - Research failure → Workflow stops | |
| - Analysis error → Logged, may continue | |
| - Budget exceeded → Immediate stop | |
| ## Usage | |
| **Basic:** | |
| ```python | |
| from src.workflows.market_analysis import MarketIntelligenceWorkflow | |
| workflow = MarketIntelligenceWorkflow() | |
| result = await workflow.run( | |
| company_name="Tesla Model Y", | |
| industry="Electric Vehicles" | |
| ) | |
| ``` | |
| **Custom Budget:** | |
| ```python | |
| workflow = MarketIntelligenceWorkflow(max_budget=5.0) | |
| ``` | |
| ## Performance Metrics | |
| Typical execution: | |
| - **Time:** 3-5 minutes | |
| - **Cost:** $0 (free) to $1.50 (Claude) | |
| - **API Calls:** 9-14 total (3 search + 6-11 LLM) | |
| - **Tokens:** 50K-100K | |
| ## Configuration | |
| Environment variables (`.env`): | |
| ```bash | |
| DEFAULT_MODEL=x-ai/grok-4.1-fast:free | |
| MAX_COST_PER_RUN=2.0 | |
| LANGCHAIN_TRACING=true | |
| ``` | |
| ## Observability | |
| LangSmith integration provides: | |
| - Full execution traces | |
| - Agent decision debugging | |
| - Cost tracking per call | |
| - Performance bottleneck identification | |
| Enable: Set `LANGCHAIN_TRACING=true` in `.env` | |
| Dashboard: https://smith.langchain.com | |
| ## Testing | |
| ```bash | |
| pytest tests/unit/test_workflow.py -v # 11 workflow tests | |
| pytest tests/integration/ -v # Integration tests | |
| python scripts/test_workflow.py # E2E with real APIs | |
| ``` | |
| ## Extending the Workflow | |
| **Add New Agent:** | |
| 1. Create agent in `src/agents/new_agent.py` | |
| 2. Add node wrapper: | |
| ```python | |
| async def _new_agent_node(self, state): | |
| result = await self.new_agent.run(state["research_data"]) | |
| return {"new_field": result} | |
| ``` | |
| 3. Wire into graph: | |
| ```python | |
| graph.add_node("new_agent", self._new_agent_node) | |
| graph.add_edge("analysis", "new_agent") | |
| ``` | |
| **Modify Routing:** | |
| ```python | |
| def _custom_routing(self, state): | |
| if state["company_name"].startswith("Enterprise"): | |
| return "deep_analysis" | |
| return "standard_analysis" | |
| ``` | |
| ## Troubleshooting | |
| | Issue | Solution | | |
| |-------|----------| | |
| | Workflow stops early | Check `result["errors"]`, verify API keys | | |
| | Budget exceeded | Increase `max_budget` or use cheaper model | | |
| | Slow performance | Check LangSmith traces, consider caching | | |
| | Checkpoint errors | Delete `checkpoints.db`, check permissions | | |
| ## Production Checklist | |
| - [x] Cost tracking and budget enforcement | |
| - [x] State persistence with checkpoints | |
| - [x] Error recovery and graceful degradation | |
| - [x] Observability integration | |
| - [ ] Human-in-the-loop UI integration | |
| - [ ] Rate limiting for API calls | |
| - [ ] Result caching for repeated queries | |