Spaces:
Runtime error
Runtime error
File size: 4,221 Bytes
2d43b8b 8ac8a9d 2d43b8b a745005 2d43b8b 74e887d 2d43b8b 8ac8a9d 2d43b8b 8ac8a9d 2d43b8b 74e887d 2d43b8b 8ac8a9d 2d43b8b 8ac8a9d 74e887d 2d43b8b 74e887d 2d43b8b 8ac8a9d 2d43b8b 8ac8a9d 2d43b8b 74e887d 2d43b8b 8ac8a9d 74e887d 2d43b8b 74e887d 2d43b8b 8ac8a9d 2d43b8b 8ac8a9d 74e887d 2d43b8b 8ac8a9d 2d43b8b 8ac8a9d 2d43b8b 8ac8a9d 2d43b8b 8ac8a9d 2d43b8b 8ac8a9d 2d43b8b a745005 2d43b8b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 | """Integration tests for workflow error handling and cost limits."""
import pytest
from unittest.mock import AsyncMock
from src.workflows.market_analysis import MarketIntelligenceWorkflow
@pytest.mark.asyncio
class TestWorkflowErrorRecovery:
"""Test workflow error handling and recovery."""
async def test_research_error_ends_workflow(self):
"""Test workflow ends gracefully when research fails."""
workflow = MarketIntelligenceWorkflow(checkpoint_path=":memory:")
# Mock research to fail
# Mock research to fail
workflow.research_agent.run = AsyncMock(
side_effect=Exception("Research API failed")
)
result = await workflow.run(
company_name="Test Company", thread_id="test-error-1"
)
assert len(result["errors"]) > 0
assert result["current_agent"] == "research"
async def test_budget_exceeded_stops_workflow(self):
"""Test workflow stops when budget is exceeded."""
workflow = MarketIntelligenceWorkflow(
max_budget=0.001, checkpoint_path=":memory:"
)
# Mock research to succeed with some cost
# Mock research to succeed with some cost
async def mock_run(*args, **kwargs):
workflow.cost_tracker.track_usage("openai/gpt-5-mini", 10000, 5000)
return {
"company_name": "Mock Company",
"competitors": "Competitor A, Competitor B",
"market_trends": "Market is growing",
"raw_sources": [],
"industry": "Tech",
"company_overview": "Overview",
}
workflow.research_agent.run = AsyncMock(side_effect=mock_run)
result = await workflow.run(
company_name="Test Company", thread_id="test-budget-1"
)
# Should have errors about budget
assert len(result.get("errors", [])) > 0 or result["total_cost"] < 0.001
@pytest.mark.asyncio
class TestWorkflowIntegration:
"""Integration tests for full workflow."""
async def test_workflow_with_mocked_agents(self):
"""Test complete workflow with mocked agent responses."""
workflow = MarketIntelligenceWorkflow(checkpoint_path=":memory:")
# Mock all agents
# Mock all agents
workflow.research_agent.run = AsyncMock(
return_value={
"company_name": "Test Company",
"competitors": "Competitor A",
"market_trends": "Market growing",
"raw_sources": [{"url": "test.com"}],
"industry": "Tech",
"company_overview": "Overview",
}
)
workflow.analysis_agent.run = AsyncMock(
return_value={
"company_name": "Test Company",
"swot": "Strengths: Good",
"competitive_matrix": "Matrix data",
"positioning": "Leader",
"strategic_recommendations": "Buy low sell high",
}
)
workflow.writer_agent.run = AsyncMock(
return_value={
"executive_summary": "Test summary",
"full_report": "# Test Report",
"metadata": {},
}
)
result = await workflow.run(
company_name="Test Company", thread_id="test-integration-1"
)
assert result["approved"] is True
assert "Test summary" in result["executive_summary"]
assert result["total_cost"] == 0.0
class TestWorkflowCheckpointing:
"""Test checkpoint persistence and recovery."""
def test_checkpoint_file_created(self):
"""Test that checkpoint database is created."""
import os
checkpoint_path = "./test_checkpoint.db"
# Clean up first
if os.path.exists(checkpoint_path):
os.remove(checkpoint_path)
workflow = MarketIntelligenceWorkflow(checkpoint_path=checkpoint_path)
# Checkpoint file should be created when workflow is compiled
assert workflow.graph_builder is not None
# Clean up
if os.path.exists(checkpoint_path):
os.remove(checkpoint_path)
|