agentic-market-research / tests /integration /test_workflow_integration.py
pkgprateek's picture
refactor: align codebase with seperated agent logics
74e887d
"""Integration tests for workflow error handling and cost limits."""
import pytest
from unittest.mock import AsyncMock
from src.workflows.market_analysis import MarketIntelligenceWorkflow
@pytest.mark.asyncio
class TestWorkflowErrorRecovery:
"""Test workflow error handling and recovery."""
async def test_research_error_ends_workflow(self):
"""Test workflow ends gracefully when research fails."""
workflow = MarketIntelligenceWorkflow(checkpoint_path=":memory:")
# Mock research to fail
# Mock research to fail
workflow.research_agent.run = AsyncMock(
side_effect=Exception("Research API failed")
)
result = await workflow.run(
company_name="Test Company", thread_id="test-error-1"
)
assert len(result["errors"]) > 0
assert result["current_agent"] == "research"
async def test_budget_exceeded_stops_workflow(self):
"""Test workflow stops when budget is exceeded."""
workflow = MarketIntelligenceWorkflow(
max_budget=0.001, checkpoint_path=":memory:"
)
# Mock research to succeed with some cost
# Mock research to succeed with some cost
async def mock_run(*args, **kwargs):
workflow.cost_tracker.track_usage("openai/gpt-5-mini", 10000, 5000)
return {
"company_name": "Mock Company",
"competitors": "Competitor A, Competitor B",
"market_trends": "Market is growing",
"raw_sources": [],
"industry": "Tech",
"company_overview": "Overview",
}
workflow.research_agent.run = AsyncMock(side_effect=mock_run)
result = await workflow.run(
company_name="Test Company", thread_id="test-budget-1"
)
# Should have errors about budget
assert len(result.get("errors", [])) > 0 or result["total_cost"] < 0.001
@pytest.mark.asyncio
class TestWorkflowIntegration:
"""Integration tests for full workflow."""
async def test_workflow_with_mocked_agents(self):
"""Test complete workflow with mocked agent responses."""
workflow = MarketIntelligenceWorkflow(checkpoint_path=":memory:")
# Mock all agents
# Mock all agents
workflow.research_agent.run = AsyncMock(
return_value={
"company_name": "Test Company",
"competitors": "Competitor A",
"market_trends": "Market growing",
"raw_sources": [{"url": "test.com"}],
"industry": "Tech",
"company_overview": "Overview",
}
)
workflow.analysis_agent.run = AsyncMock(
return_value={
"company_name": "Test Company",
"swot": "Strengths: Good",
"competitive_matrix": "Matrix data",
"positioning": "Leader",
"strategic_recommendations": "Buy low sell high",
}
)
workflow.writer_agent.run = AsyncMock(
return_value={
"executive_summary": "Test summary",
"full_report": "# Test Report",
"metadata": {},
}
)
result = await workflow.run(
company_name="Test Company", thread_id="test-integration-1"
)
assert result["approved"] is True
assert "Test summary" in result["executive_summary"]
assert result["total_cost"] == 0.0
class TestWorkflowCheckpointing:
"""Test checkpoint persistence and recovery."""
def test_checkpoint_file_created(self):
"""Test that checkpoint database is created."""
import os
checkpoint_path = "./test_checkpoint.db"
# Clean up first
if os.path.exists(checkpoint_path):
os.remove(checkpoint_path)
workflow = MarketIntelligenceWorkflow(checkpoint_path=checkpoint_path)
# Checkpoint file should be created when workflow is compiled
assert workflow.graph_builder is not None
# Clean up
if os.path.exists(checkpoint_path):
os.remove(checkpoint_path)