| """ |
| Workflow execution and monitoring endpoints |
| """ |
|
|
| from fastapi import APIRouter, BackgroundTasks, HTTPException, WebSocket, WebSocketDisconnect |
| from pydantic import BaseModel |
| from typing import Dict, List |
| import uuid |
| from datetime import datetime |
| import asyncio |
| from loguru import logger |
|
|
| router = APIRouter() |
|
|
| class WorkflowRequest(BaseModel): |
| """Request to start a workflow""" |
| patent_id: str |
| scenario: str = "patent_wakeup" |
|
|
| class WorkflowResponse(BaseModel): |
| """Workflow execution response""" |
| workflow_id: str |
| status: str |
| message: str |
|
|
| @router.post("/execute", response_model=WorkflowResponse) |
| async def execute_workflow( |
| request: WorkflowRequest, |
| background_tasks: BackgroundTasks |
| ): |
| """ |
| Start Patent Wake-Up workflow execution. |
| |
| Args: |
| request: Workflow execution request |
| |
| Returns: |
| Workflow ID for tracking progress |
| """ |
| from api.main import app_state |
|
|
| |
| if request.patent_id not in app_state["patents"]: |
| raise HTTPException( |
| status_code=404, |
| detail=f"Patent not found: {request.patent_id}" |
| ) |
|
|
| |
| workflow_id = str(uuid.uuid4()) |
|
|
| |
| workflow_state = { |
| "id": workflow_id, |
| "patent_id": request.patent_id, |
| "scenario": request.scenario, |
| "status": "queued", |
| "progress": 0, |
| "current_step": None, |
| "started_at": datetime.utcnow().isoformat(), |
| "completed_at": None, |
| "execution_time_seconds": None, |
| "result": None, |
| "error": None, |
| "steps": [] |
| } |
|
|
| app_state["workflows"][workflow_id] = workflow_state |
|
|
| |
| app_state["patents"][request.patent_id]["status"] = "analyzing" |
| app_state["patents"][request.patent_id]["workflow_id"] = workflow_id |
|
|
| logger.info(f"🚀 Starting workflow {workflow_id} for patent {request.patent_id}") |
|
|
| |
| background_tasks.add_task( |
| run_workflow, |
| workflow_id, |
| request.patent_id, |
| request.scenario |
| ) |
|
|
| return WorkflowResponse( |
| workflow_id=workflow_id, |
| status="queued", |
| message="Workflow started successfully" |
| ) |
|
|
| async def run_workflow(workflow_id: str, patent_id: str, scenario: str): |
| """ |
| Background task to execute workflow. |
| |
| Args: |
| workflow_id: Unique workflow identifier |
| patent_id: Patent to analyze |
| scenario: Workflow scenario type |
| """ |
| from api.main import app_state |
| from src.workflow.langgraph_state import ScenarioType |
|
|
| workflow_state = app_state["workflows"][workflow_id] |
| patent = app_state["patents"][patent_id] |
|
|
| start_time = datetime.utcnow() |
|
|
| try: |
| logger.info(f"📊 Executing workflow {workflow_id}...") |
|
|
| |
| workflow_state["status"] = "running" |
| workflow_state["progress"] = 10 |
| workflow_state["current_step"] = "initializing" |
|
|
| |
| scenario_map = { |
| "patent_wakeup": ScenarioType.PATENT_WAKEUP |
| } |
| scenario_type = scenario_map.get(scenario, ScenarioType.PATENT_WAKEUP) |
|
|
| |
| logger.info(f"Analyzing patent: {patent['filename']}") |
|
|
| workflow_state["current_step"] = "document_analysis" |
| workflow_state["progress"] = 25 |
|
|
| result = await app_state["workflow"].run( |
| task_description=f"Analyze patent: {patent['filename']} and create valorization roadmap", |
| scenario=scenario_type, |
| input_data={"patent_path": patent["path"]}, |
| task_id=workflow_id |
| ) |
|
|
| |
| end_time = datetime.utcnow() |
| execution_time = (end_time - start_time).total_seconds() |
|
|
| |
| workflow_state["status"] = "completed" |
| workflow_state["progress"] = 100 |
| workflow_state["current_step"] = "completed" |
| workflow_state["completed_at"] = end_time.isoformat() |
| workflow_state["execution_time_seconds"] = execution_time |
|
|
| |
| workflow_state["result"] = { |
| "success": result.success, |
| "quality_score": result.quality_score, |
| "iterations_used": result.iterations_used, |
| "status_value": result.status.value, |
|
|
| |
| "document_analysis": result.agent_outputs.get("document_analysis"), |
|
|
| |
| "market_analysis": result.agent_outputs.get("market_analysis"), |
|
|
| |
| "matches": result.agent_outputs.get("matches", []), |
|
|
| |
| "brief": result.agent_outputs.get("brief"), |
|
|
| |
| "executor_output": result.agent_outputs.get("executor", {}) |
| } |
|
|
| |
| patent["status"] = "analyzed" |
|
|
| logger.success(f"✅ Workflow {workflow_id} completed in {execution_time:.1f}s") |
|
|
| except Exception as e: |
| logger.error(f"❌ Workflow {workflow_id} failed: {e}") |
|
|
| workflow_state["status"] = "failed" |
| workflow_state["error"] = str(e) |
| workflow_state["completed_at"] = datetime.utcnow().isoformat() |
|
|
| |
| patent["status"] = "failed" |
|
|
| import traceback |
| traceback.print_exc() |
|
|
| @router.get("/{workflow_id}", response_model=Dict) |
| async def get_workflow(workflow_id: str): |
| """ |
| Get workflow status and results. |
| |
| Args: |
| workflow_id: Unique workflow identifier |
| |
| Returns: |
| Workflow state including results if completed |
| """ |
| from api.main import app_state |
|
|
| if workflow_id not in app_state["workflows"]: |
| raise HTTPException( |
| status_code=404, |
| detail=f"Workflow not found: {workflow_id}" |
| ) |
|
|
| return app_state["workflows"][workflow_id] |
|
|
| @router.get("/", response_model=List[Dict]) |
| async def list_workflows( |
| status: str = None, |
| limit: int = 100, |
| offset: int = 0 |
| ): |
| """ |
| List all workflows. |
| |
| Args: |
| status: Filter by status (queued, running, completed, failed) |
| limit: Maximum number of results |
| offset: Pagination offset |
| |
| Returns: |
| List of workflow states |
| """ |
| from api.main import app_state |
|
|
| workflows = list(app_state["workflows"].values()) |
|
|
| |
| if status: |
| workflows = [w for w in workflows if w["status"] == status] |
|
|
| |
| workflows.sort(key=lambda x: x["started_at"], reverse=True) |
|
|
| |
| workflows = workflows[offset:offset + limit] |
|
|
| return workflows |
|
|
| @router.websocket("/{workflow_id}/stream") |
| async def stream_workflow(websocket: WebSocket, workflow_id: str): |
| """ |
| WebSocket endpoint for real-time workflow updates. |
| |
| Args: |
| websocket: WebSocket connection |
| workflow_id: Workflow to stream |
| """ |
| from api.main import app_state |
|
|
| await websocket.accept() |
|
|
| logger.info(f"📡 WebSocket connected for workflow {workflow_id}") |
|
|
| if workflow_id not in app_state["workflows"]: |
| await websocket.send_json({"error": "Workflow not found"}) |
| await websocket.close() |
| return |
|
|
| try: |
| |
| while True: |
| workflow_state = app_state["workflows"].get(workflow_id) |
|
|
| if not workflow_state: |
| await websocket.send_json({"error": "Workflow removed"}) |
| break |
|
|
| |
| await websocket.send_json(workflow_state) |
|
|
| |
| if workflow_state["status"] in ["completed", "failed"]: |
| logger.info(f"Workflow {workflow_id} finished, closing WebSocket") |
| break |
|
|
| |
| await asyncio.sleep(1) |
|
|
| except WebSocketDisconnect: |
| logger.info(f"WebSocket disconnected for workflow {workflow_id}") |
| except Exception as e: |
| logger.error(f"WebSocket error: {e}") |
| finally: |
| await websocket.close() |
|
|
| @router.get("/{workflow_id}/brief/download") |
| async def download_brief(workflow_id: str): |
| """ |
| Download the generated valorization brief. |
| |
| Args: |
| workflow_id: Workflow identifier |
| |
| Returns: |
| PDF file |
| """ |
| from api.main import app_state |
| from fastapi.responses import FileResponse |
| from pathlib import Path |
|
|
| if workflow_id not in app_state["workflows"]: |
| raise HTTPException( |
| status_code=404, |
| detail="Workflow not found" |
| ) |
|
|
| workflow = app_state["workflows"][workflow_id] |
|
|
| if workflow["status"] != "completed": |
| raise HTTPException( |
| status_code=400, |
| detail="Workflow not yet completed" |
| ) |
|
|
| |
| result = workflow.get("result") or {} |
| brief = result.get("brief") or {} |
| pdf_path = brief.get("pdf_path") if isinstance(brief, dict) else None |
|
|
| if not pdf_path: |
| raise HTTPException( |
| status_code=404, |
| detail="Valorization brief not found" |
| ) |
|
|
| file_path = Path(pdf_path) |
|
|
| if not file_path.exists(): |
| raise HTTPException( |
| status_code=404, |
| detail="Brief file not found on disk" |
| ) |
|
|
| return FileResponse( |
| path=file_path, |
| media_type="application/pdf", |
| filename=file_path.name |
| ) |
|
|