| """ |
| Financial Document Analysis Workflow - Agno Workflow 2.0 Implementation (Fixed) |
| |
| This workflow processes financial documents through a multi-agent system using the new |
| step-based architecture introduced in Agno Workflow 2.0: |
| 1. Data Extractor Agent: Extracts structured financial data |
| 2. Data Arrangement Function: Organizes data into Excel-ready format |
| 3. Code Generator Agent: Creates professional Excel reports |
| |
| Built according to Agno Workflow 2.0 standards with simple sequential execution. |
| """ |
|
|
| import json |
| import time |
| from pathlib import Path |
| from typing import Optional, Dict, Any |
| from textwrap import dedent |
| import os |
|
|
| from agno.agent import Agent |
| from agno.models.google import Gemini |
| from agno.tools.file import FileTools |
| from agno.tools.shell import ShellTools |
| from agno.tools.python import PythonTools |
| from agno.workflow.v2.workflow import Workflow |
| from agno.workflow.v2.types import StepInput, StepOutput |
| from agno.workflow.v2.step import Step |
| from agno.storage.sqlite import SqliteStorage |
| from agno.utils.log import logger |
| from pydantic import BaseModel, Field |
|
|
| from config.settings import settings |
| from utils.prompt_loader import prompt_loader |
| from utils.shell_toolkit import RestrictedShellTools |
| from utils.restricted_python_tools import RestrictedPythonTools |
|
|
|
|
| class DataPoint(BaseModel): |
| """Individual financial data point.""" |
| field_name: str = Field(description="Name of the financial data field") |
| value: str = Field(description="Value of the field") |
| category: str = Field(description="Financial category (revenue, expenses, assets, etc.)") |
| period: str = Field(default="", description="Time period if applicable") |
| unit: str = Field(default="", description="Currency or measurement unit") |
| confidence: float = Field(default=0.9, description="Confidence score 0-1") |
|
|
|
|
| class Metadata(BaseModel): |
| """Metadata for extracted financial data.""" |
| company_name: str = Field(default="Unknown Company", description="Company name") |
| document_type: str = Field(default="Unknown", description="Type of financial document") |
| reporting_period: str = Field(default="", description="Reporting period") |
| currency: str = Field(default="", description="Primary currency used") |
|
|
|
|
| class ExtractedFinancialData(BaseModel): |
| """Structured model for extracted financial data.""" |
| data_points: list[DataPoint] = Field(description="List of extracted financial data points") |
| summary: str = Field(description="Summary of the extracted data") |
| metadata: Metadata = Field(default_factory=Metadata, description="Additional metadata") |
|
|
|
|
| class FinancialDocumentWorkflow(Workflow): |
| """ |
| Financial document analysis workflow using Agno Workflow 2.0 step-based architecture. |
| |
| This workflow processes financial documents through three specialized steps: |
| - Data extraction with structured outputs |
| - Data arrangement for Excel compatibility |
| - Excel report generation with formatting |
| """ |
| |
| def __init__(self, session_id: Optional[str] = None, **kwargs): |
| """Initialize workflow with session management and step-based architecture.""" |
| |
| |
| self._setup_session_directories(session_id) |
| |
| |
| storage = SqliteStorage( |
| table_name="financial_workflows", |
| db_file="tmp/agno_workflows.db", |
| mode="workflow_v2", |
| auto_upgrade_schema=True |
| ) |
| |
| |
| self.data_extractor = self._create_data_extractor() |
| self.data_arranger = self._create_data_arranger() |
| self.code_generator = self._create_code_generator() |
| |
| |
| data_extraction_step = Step( |
| name="FinancialDataExtractor", |
| agent=self.data_extractor, |
| description="Expert financial data extraction specialist optimized for Gemini" |
| ) |
| |
| data_arrangement_step = Step( |
| name="DataArrangement", |
| executor=self._arrangement_function, |
| description="User-defined callable step for data arrangement" |
| ) |
| |
| excel_generation_step = Step( |
| name="ExcelReportGenerator", |
| agent=self.code_generator, |
| description="Excel report generator optimized for Gemini with cross-platform support" |
| ) |
| |
| |
| super().__init__( |
| name="FinancialDocumentWorkflow", |
| description=dedent("""\ |
| Financial document analysis workflow using Agno Workflow 2.0 with step-based execution. |
| Processes financial documents through extraction, arrangement, and Excel report generation. |
| Uses session state for caching and proper error recovery mechanisms. |
| """), |
| steps=[ |
| data_extraction_step, |
| data_arrangement_step, |
| excel_generation_step |
| ], |
| session_id=session_id, |
| storage=storage, |
| debug_mode=True, |
| **kwargs |
| ) |
| |
| logger.info(f"FinancialDocumentWorkflow v2.0 initialized with session: {self.session_id}") |
| logger.info(f"Session directories: {list(self.session_directories.keys())}") |
| |
| def _setup_session_directories(self, session_id: Optional[str] = None): |
| """Setup session-specific directories.""" |
| self.session_id = session_id |
| self.session_directories = settings.create_session_directories(self.session_id) |
| self.session_output_dir = self.session_directories["output"] |
| self.session_input_dir = self.session_directories["input"] |
| self.session_temp_dir = self.session_directories["temp"] |
| self.session_cache_dir = self.session_directories["cache"] |
| |
| def _create_data_extractor(self) -> Agent: |
| """Create the data extraction agent.""" |
| return Agent( |
| model=Gemini( |
| id=settings.DATA_EXTRACTOR_MODEL, |
| thinking_budget=settings.DATA_EXTRACTOR_MODEL_THINKING_BUDGET, |
| api_key=settings.GOOGLE_API_KEY |
| ), |
| name="FinancialDataExtractor", |
| description="Expert financial data extraction specialist optimized for Gemini", |
| instructions=prompt_loader.load_instructions_as_list("agents/data_extractor"), |
| response_model=ExtractedFinancialData, |
| structured_outputs=True, |
| debug_mode=True, |
| retries=10, |
| delay_between_retries=10, |
| exponential_backoff=True, |
| ) |
| |
| def _create_data_arranger(self) -> Agent: |
| """Create the data arrangement agent.""" |
| logger.info(f"Data arranger base directory: {self.session_output_dir}") |
| logger.info(f"Directory exists: {self.session_output_dir.exists()}") |
| logger.info(f"Directory is writable: {os.access(self.session_output_dir, os.W_OK)}") |
| return Agent( |
| model=Gemini( |
| id=settings.DATA_ARRANGER_MODEL, |
| thinking_budget=settings.DATA_ARRANGER_MODEL_THINKING_BUDGET, |
| api_key=settings.GOOGLE_API_KEY |
| ), |
| name="FinancialDataArranger", |
| description="Financial data organization specialist optimized for Gemini", |
| instructions=prompt_loader.load_instructions_as_list("agents/data_arranger"), |
| tools=[ |
| RestrictedShellTools(base_dir=self.session_output_dir), |
| FileTools(base_dir=self.session_output_dir, save_files=True, read_files=True, list_files=True), |
| ], |
| markdown=False, |
| debug_mode=True, |
| add_memory_references=True, |
| add_session_summary_references=True, |
| retries=10, |
| delay_between_retries=10, |
| exponential_backoff=True, |
| ) |
| |
| def _create_code_generator(self) -> Agent: |
| """Create the code generation agent.""" |
| return Agent( |
| model=Gemini( |
| id=settings.CODE_GENERATOR_MODEL, |
| thinking_budget=settings.CODE_GENERATOR_MODEL_THINKING_BUDGET, |
| api_key=settings.GOOGLE_API_KEY |
| ), |
| name="ExcelReportGenerator", |
| description="Excel report generator optimized for Gemini with cross-platform support", |
| goal="Generate professional Excel reports from arranged financial data with multiple worksheets and formatting", |
| instructions=prompt_loader.load_instructions_as_list("agents/code_generator"), |
| expected_output="A professionally formatted Excel file with multiple worksheets, charts, and proper styling", |
| additional_context=f"Working directory: {self.session_output_dir}. All files must be saved in this directory only.", |
| tools=[ |
| RestrictedShellTools(base_dir=self.session_output_dir), |
| RestrictedPythonTools(base_dir=self.session_output_dir), |
| FileTools(base_dir=self.session_output_dir, save_files=True, read_files=True, list_files=True) |
| ], |
| markdown=False, |
| show_tool_calls=True, |
| debug_mode=True, |
| add_datetime_to_instructions=True, |
| retries=10, |
| delay_between_retries=10, |
| exponential_backoff=True, |
| ) |
| |
| def _arrangement_function(self, step_input: StepInput) -> StepOutput: |
| """Custom function for data arrangement step.""" |
| try: |
| message = step_input.message |
| previous_step_content = step_input.previous_step_content |
| |
| logger.info("Starting data arrangement step") |
| |
| |
| arrangement_prompt = prompt_loader.load_prompt("workflow/data_arrangement") |
| |
| |
| full_arrangement_prompt = f"{arrangement_prompt}\n\nHere is the extracted financial data to arrange:\n\n{previous_step_content}" |
| |
| |
| response = self.data_arranger.run(full_arrangement_prompt) |
| |
| |
| if hasattr(self, 'session_state') and self.session_state: |
| cache_key = f"arrangement_{int(time.time())}" |
| self.session_state[cache_key] = response.content |
| logger.info(f"Cached arrangement results with key: {cache_key}") |
| |
| logger.info("Data arrangement completed successfully") |
| |
| return StepOutput( |
| content=response.content, |
| response=response, |
| success=True |
| ) |
| |
| except Exception as e: |
| logger.error(f"Data arrangement failed: {str(e)}") |
| return StepOutput( |
| content=f"Data arrangement failed: {str(e)}", |
| success=False, |
| ) |
| |
| def run(self, file_path: str = None, **kwargs): |
| """ |
| Main workflow execution using Workflow 2.0 step-based architecture. |
| |
| Args: |
| file_path: Path to the financial document to process |
| **kwargs: Additional parameters |
| |
| Returns: |
| Workflow execution result using the new step-based system |
| """ |
| |
| if file_path is None: |
| file_path = kwargs.get('file_path') |
| |
| if file_path is None: |
| logger.error("file_path is required but not provided") |
| raise ValueError("file_path is required but not provided") |
| |
| start_time = time.time() |
| |
| try: |
| |
| file_path = Path(file_path).resolve() |
| if not file_path.exists(): |
| logger.error(f"File not found: {file_path}") |
| raise FileNotFoundError(f"File not found: {file_path}") |
| |
| |
| input_file = self.session_input_dir / file_path.name |
| input_file.write_bytes(file_path.read_bytes()) |
| |
| logger.info(f"Starting financial document analysis for: {file_path.name}") |
| |
| |
| from agno.media import File |
| document = File(filepath=str(file_path)) |
| |
| |
| extraction_prompt = prompt_loader.load_prompt( |
| "workflow/data_extraction", |
| file_path=str(file_path), |
| output_directory=str(self.session_output_dir) |
| ) |
| |
| |
| |
| result = super().run( |
| message=extraction_prompt, |
| files=[document], |
| **kwargs |
| ) |
| |
| |
| execution_time = time.time() - start_time |
| status = self._get_workflow_status() |
| |
| logger.info(f"Workflow completed successfully in {execution_time:.2f} seconds") |
| logger.info(f"Results: {status}") |
| |
| return result |
| |
| except Exception as e: |
| logger.error(f"Workflow execution failed: {str(e)}") |
| raise |
| |
| def _get_workflow_status(self) -> Dict[str, Any]: |
| """Get current workflow status and file counts.""" |
| status = { |
| "session_id": self.session_id, |
| "output_directory": str(self.session_output_dir), |
| "json_files": 0, |
| "excel_files": 0, |
| "data_points": 0 |
| } |
| |
| if self.session_output_dir.exists(): |
| status["json_files"] = len(list(self.session_output_dir.glob("*.json"))) |
| status["excel_files"] = len(list(self.session_output_dir.glob("*.xlsx"))) |
| |
| return status |
|
|
|
|
| |
| def create_financial_workflow(session_id: Optional[str] = None, **kwargs) -> FinancialDocumentWorkflow: |
| """ |
| Create a new FinancialDocumentWorkflow instance using Workflow 2.0. |
| |
| Args: |
| session_id: Optional session ID for tracking workflow execution |
| **kwargs: Additional parameters for workflow configuration |
| |
| Returns: |
| FinancialDocumentWorkflow: Configured workflow instance |
| """ |
| return FinancialDocumentWorkflow(session_id=session_id, **kwargs) |