Spaces:
Running
Running
| """Single-worker queue for long-running generation workflows.""" | |
| from __future__ import annotations | |
| import threading | |
| import traceback | |
| import sys | |
| from collections import deque | |
| from dataclasses import dataclass | |
| from typing import Callable | |
| from core.ai_client import unregister_operation | |
| from core import run_manager | |
| from core.workflow_runner import WorkflowContext, publish_run_event | |
| from utils.run_guard import RunRejected, begin_run, end_run | |
| class QueuedJob: | |
| run_id: str | |
| operation_id: str | |
| fn: Callable[[WorkflowContext], None] | |
| cancel_event: threading.Event | |
| label: str = "workflow" | |
| pipeline_enabled: bool = False | |
| _QUEUE: deque[QueuedJob] = deque() | |
| _CURRENT: dict[str, QueuedJob] = {} | |
| _CONDITION = threading.Condition() | |
| _STARTED = False | |
| def _prevent_sleep() -> bool: | |
| if sys.platform != "win32": | |
| return False | |
| try: | |
| import ctypes | |
| ES_CONTINUOUS = 0x80000000 | |
| ES_SYSTEM_REQUIRED = 0x00000001 | |
| ES_DISPLAY_REQUIRED = 0x00000002 | |
| ctypes.windll.kernel32.SetThreadExecutionState( | |
| ES_CONTINUOUS | ES_SYSTEM_REQUIRED | ES_DISPLAY_REQUIRED | |
| ) | |
| return True | |
| except Exception: | |
| return False | |
| def _allow_sleep() -> None: | |
| if sys.platform != "win32": | |
| return | |
| try: | |
| import ctypes | |
| ES_CONTINUOUS = 0x80000000 | |
| ctypes.windll.kernel32.SetThreadExecutionState(ES_CONTINUOUS) | |
| except Exception: | |
| pass | |
| def _queue_positions() -> dict[str, int]: | |
| return {job.run_id: index + 1 for index, job in enumerate(_QUEUE)} | |
| def _publish_queue_positions() -> None: | |
| positions = _queue_positions() | |
| for job in list(_QUEUE): | |
| position = positions.get(job.run_id) | |
| message = f"Queued at position {position}" if position else "Queued" | |
| run_manager.update_run( | |
| job.run_id, | |
| status="queued", | |
| stage="queued", | |
| message=message, | |
| progress=0, | |
| queue_position=position, | |
| ) | |
| run_manager.add_event( | |
| job.run_id, | |
| event_type="queued", | |
| stage="queued", | |
| progress=0, | |
| message=message, | |
| data={"queue_position": position}, | |
| ) | |
| publish_run_event(job.run_id, { | |
| "type": "queued", | |
| "run_id": job.run_id, | |
| "operation_id": job.operation_id, | |
| "message": message, | |
| "progress": 0, | |
| "queue_position": position, | |
| }) | |
| def _worker_loop() -> None: | |
| while True: | |
| with _CONDITION: | |
| while not _QUEUE: | |
| _CONDITION.wait() | |
| job = _QUEUE.popleft() | |
| _CURRENT[job.run_id] = job | |
| _publish_queue_positions() | |
| _run_job(job) | |
| def _run_job(job: QueuedJob) -> None: | |
| ctx = WorkflowContext(job.run_id, job.operation_id, job.cancel_event) | |
| # In classic mode, hold the global single-flight slot for the | |
| # duration of the job. Pipeline mode deliberately releases that | |
| # coarse lock and lets the workflow's own phase locks protect | |
| # screenshots / PowerPoint while AI can overlap. | |
| slot_held = False | |
| sleep_guard = False | |
| try: | |
| sleep_guard = _prevent_sleep() | |
| if not job.pipeline_enabled: | |
| try: | |
| begin_run(job.operation_id, "/job-queue", {"run_id": job.run_id}) | |
| slot_held = True | |
| except RunRejected as rr: | |
| ctx.fail(f"Backend busy: {rr.message}") | |
| return | |
| if job.cancel_event.is_set(): | |
| ctx.cancel() | |
| else: | |
| ctx.started(f"{job.label} started") | |
| job.fn(ctx) | |
| except Exception as exc: | |
| traceback.print_exc() | |
| run = run_manager.get_run(job.run_id) | |
| if run and run.get("status") not in {"completed", "failed", "cancelled"}: | |
| ctx.fail(f"Error: {exc}") | |
| finally: | |
| if sleep_guard: | |
| _allow_sleep() | |
| if slot_held: | |
| end_run(job.operation_id) | |
| with _CONDITION: | |
| _CURRENT.pop(job.run_id, None) | |
| _publish_queue_positions() | |
| def _ensure_worker() -> None: | |
| global _STARTED | |
| with _CONDITION: | |
| if _STARTED: | |
| return | |
| thread = threading.Thread(target=_worker_loop, daemon=True, name="workflow-job-queue") | |
| thread.start() | |
| _STARTED = True | |
| def enqueue( | |
| run_id: str, | |
| operation_id: str, | |
| fn: Callable[[WorkflowContext], None], | |
| cancel_event: threading.Event, | |
| label: str = "workflow", | |
| pipeline_enabled: bool = False, | |
| ) -> int: | |
| job = QueuedJob( | |
| run_id=run_id, | |
| operation_id=operation_id, | |
| fn=fn, | |
| cancel_event=cancel_event, | |
| label=label, | |
| pipeline_enabled=pipeline_enabled, | |
| ) | |
| _ensure_worker() | |
| with _CONDITION: | |
| if pipeline_enabled and not _QUEUE and not any( | |
| not current.pipeline_enabled for current in _CURRENT.values() | |
| ): | |
| _CURRENT[job.run_id] = job | |
| _publish_queue_positions() | |
| thread = threading.Thread( | |
| target=_run_job, | |
| args=(job,), | |
| daemon=True, | |
| name=f"workflow-{run_id}", | |
| ) | |
| thread.start() | |
| return 1 | |
| _QUEUE.append(job) | |
| position = len(_QUEUE) | |
| _publish_queue_positions() | |
| _CONDITION.notify() | |
| return position | |
| _SOFT_CANCEL_STAGES = { | |
| "after_html": ("html_saved", "Cancellation requested. The process will stop after the HTML file is saved."), | |
| "after_screenshots": ("screenshots_done", "Cancellation requested. The process will stop after screenshots are captured."), | |
| "after_pptx": ("pptx_built", "Cancellation requested. The process will stop after the PowerPoint deck is saved."), | |
| "after_video": ("video_export_done", "Cancellation requested. The process will stop after the MP4 export finishes."), | |
| } | |
| def cancel_job(run_id: str, mode: str = "now", delete_outputs: bool = False) -> bool: | |
| mode = str(mode or "now").strip().lower() | |
| delete_outputs = bool(delete_outputs) | |
| with _CONDITION: | |
| for job in list(_QUEUE): | |
| if job.run_id == run_id or job.operation_id == run_id: | |
| job.cancel_event.set() | |
| _QUEUE.remove(job) | |
| run_manager.update_run(job.run_id, settings={"delete_outputs_on_cancel": delete_outputs}) | |
| run_manager.finish_run(job.run_id, status="cancelled", message="Operation cancelled before start", progress=0) | |
| unregister_operation(job.operation_id) | |
| publish_run_event(job.run_id, { | |
| "type": "cancelled", | |
| "run_id": job.run_id, | |
| "operation_id": job.operation_id, | |
| "message": "Operation cancelled before start", | |
| }) | |
| _publish_queue_positions() | |
| return True | |
| current = next( | |
| (job for job in _CURRENT.values() if job.run_id == run_id or job.operation_id == run_id), | |
| None, | |
| ) | |
| if current: | |
| if mode in _SOFT_CANCEL_STAGES: | |
| stage, message = _SOFT_CANCEL_STAGES[mode] | |
| current_run = run_manager.get_run(current.run_id) or {} | |
| run_manager.update_run( | |
| current.run_id, | |
| status="running", | |
| stage="cancelling", | |
| message=message, | |
| progress=None, | |
| settings={ | |
| "cancel_after_stage": stage, | |
| "delete_outputs_on_cancel": False, | |
| }, | |
| ) | |
| publish_run_event(current.run_id, { | |
| "type": "progress", | |
| "run_id": current.run_id, | |
| "operation_id": current.operation_id, | |
| "message": message, | |
| "stage": "cancelling", | |
| "progress": current_run.get("progress", 0), | |
| }) | |
| return True | |
| current.cancel_event.set() | |
| current_run = run_manager.get_run(current.run_id) or {} | |
| run_manager.update_run( | |
| current.run_id, | |
| status="running", | |
| stage="cancelling", | |
| message="Cancellation requested. Waiting for the running step to stop.", | |
| progress=None, | |
| settings={"delete_outputs_on_cancel": delete_outputs}, | |
| ) | |
| publish_run_event(current.run_id, { | |
| "type": "progress", | |
| "run_id": current.run_id, | |
| "operation_id": current.operation_id, | |
| "message": "Cancellation requested", | |
| "stage": "cancelling", | |
| "progress": current_run.get("progress", 0), | |
| }) | |
| return True | |
| return False | |
| def get_queue_snapshot() -> dict: | |
| with _CONDITION: | |
| running = list(_CURRENT.values()) | |
| return { | |
| "running": { | |
| "run_id": running[0].run_id, | |
| "operation_id": running[0].operation_id, | |
| "label": running[0].label, | |
| } if running else None, | |
| "running_all": [ | |
| { | |
| "run_id": job.run_id, | |
| "operation_id": job.operation_id, | |
| "label": job.label, | |
| "pipeline_enabled": job.pipeline_enabled, | |
| } | |
| for job in running | |
| ], | |
| "queued": [ | |
| { | |
| "run_id": job.run_id, | |
| "operation_id": job.operation_id, | |
| "label": job.label, | |
| "position": index + 1, | |
| } | |
| for index, job in enumerate(_QUEUE) | |
| ], | |
| } | |