File size: 4,221 Bytes
2d43b8b
 
 
8ac8a9d
2d43b8b
a745005
2d43b8b
 
 
 
 
 
 
 
74e887d
2d43b8b
 
8ac8a9d
 
 
 
2d43b8b
8ac8a9d
 
 
2d43b8b
 
 
 
 
 
74e887d
 
 
2d43b8b
 
8ac8a9d
 
2d43b8b
 
8ac8a9d
74e887d
 
2d43b8b
74e887d
 
2d43b8b
 
8ac8a9d
2d43b8b
8ac8a9d
 
 
2d43b8b
 
 
 
 
 
 
 
 
 
 
74e887d
2d43b8b
 
8ac8a9d
 
 
 
74e887d
 
2d43b8b
74e887d
 
2d43b8b
8ac8a9d
2d43b8b
8ac8a9d
 
74e887d
 
 
 
 
2d43b8b
8ac8a9d
2d43b8b
8ac8a9d
 
2d43b8b
 
8ac8a9d
2d43b8b
8ac8a9d
2d43b8b
 
8ac8a9d
2d43b8b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a745005
2d43b8b
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""Integration tests for workflow error handling and cost limits."""

import pytest
from unittest.mock import AsyncMock

from src.workflows.market_analysis import MarketIntelligenceWorkflow


@pytest.mark.asyncio
class TestWorkflowErrorRecovery:
    """Test workflow error handling and recovery."""

    async def test_research_error_ends_workflow(self):
        """Test workflow ends gracefully when research fails."""
        workflow = MarketIntelligenceWorkflow(checkpoint_path=":memory:")

        # Mock research to fail
        # Mock research to fail
        workflow.research_agent.run = AsyncMock(
            side_effect=Exception("Research API failed")
        )

        result = await workflow.run(
            company_name="Test Company", thread_id="test-error-1"
        )

        assert len(result["errors"]) > 0
        assert result["current_agent"] == "research"

    async def test_budget_exceeded_stops_workflow(self):
        """Test workflow stops when budget is exceeded."""
        workflow = MarketIntelligenceWorkflow(
            max_budget=0.001, checkpoint_path=":memory:"
        )

        # Mock research to succeed with some cost
        # Mock research to succeed with some cost
        async def mock_run(*args, **kwargs):
            workflow.cost_tracker.track_usage("openai/gpt-5-mini", 10000, 5000)
            return {
                "company_name": "Mock Company",
                "competitors": "Competitor A, Competitor B",
                "market_trends": "Market is growing",
                "raw_sources": [],
                "industry": "Tech",
                "company_overview": "Overview",
            }

        workflow.research_agent.run = AsyncMock(side_effect=mock_run)

        result = await workflow.run(
            company_name="Test Company", thread_id="test-budget-1"
        )

        # Should have errors about budget
        assert len(result.get("errors", [])) > 0 or result["total_cost"] < 0.001


@pytest.mark.asyncio
class TestWorkflowIntegration:
    """Integration tests for full workflow."""

    async def test_workflow_with_mocked_agents(self):
        """Test complete workflow with mocked agent responses."""
        workflow = MarketIntelligenceWorkflow(checkpoint_path=":memory:")

        # Mock all agents
        # Mock all agents
        workflow.research_agent.run = AsyncMock(
            return_value={
                "company_name": "Test Company",
                "competitors": "Competitor A",
                "market_trends": "Market growing",
                "raw_sources": [{"url": "test.com"}],
                "industry": "Tech",
                "company_overview": "Overview",
            }
        )

        workflow.analysis_agent.run = AsyncMock(
            return_value={
                "company_name": "Test Company",
                "swot": "Strengths: Good",
                "competitive_matrix": "Matrix data",
                "positioning": "Leader",
                "strategic_recommendations": "Buy low sell high",
            }
        )

        workflow.writer_agent.run = AsyncMock(
            return_value={
                "executive_summary": "Test summary",
                "full_report": "# Test Report",
                "metadata": {},
            }
        )

        result = await workflow.run(
            company_name="Test Company", thread_id="test-integration-1"
        )

        assert result["approved"] is True
        assert "Test summary" in result["executive_summary"]
        assert result["total_cost"] == 0.0


class TestWorkflowCheckpointing:
    """Test checkpoint persistence and recovery."""

    def test_checkpoint_file_created(self):
        """Test that checkpoint database is created."""
        import os

        checkpoint_path = "./test_checkpoint.db"

        # Clean up first
        if os.path.exists(checkpoint_path):
            os.remove(checkpoint_path)

        workflow = MarketIntelligenceWorkflow(checkpoint_path=checkpoint_path)

        # Checkpoint file should be created when workflow is compiled
        assert workflow.graph_builder is not None

        # Clean up
        if os.path.exists(checkpoint_path):
            os.remove(checkpoint_path)