Spaces:
Running
Running
| """Process-wide concurrency / dedup guard for generation runs. | |
| Why this exists | |
| --------------- | |
| Two recurring user complaints justified adding a server-side guard: | |
| 1. *"Sometimes 2 processes run, one is duplicate of another."* β the React | |
| UI already debounces the Submit button, but a hard refresh, a stuck | |
| SSE stream that hasn't been GC'd by the browser, or a script firing | |
| ``/generate-sse`` from two tabs all spawn a parallel run. Two parallel | |
| runs share the Playwright browser pool and the AI quota, double the | |
| wall-clock time, and produce overlapping batch folders. | |
| 2. *"It is too slow β sometimes 10β15 min."* β once a stuck run is in | |
| flight, every retry submission piles on top of it because nothing on | |
| the backend serializes them. Fail-fast with HTTP 409 lets the UI tell | |
| the user "another run is already in progress" instead of silently | |
| queuing more load. | |
| Design | |
| ------ | |
| * **Single-flight:** at most one generation runs at a time per process. | |
| ``begin_run()`` reserves the slot atomically; ``end_run()`` (idempotent) | |
| releases it. | |
| * **Dedup window:** if the same fingerprint (route + body hash) arrives | |
| within ``DEDUP_WINDOW_SECS`` of an *accepted* run starting, the second | |
| request is rejected even if the first has completed. This catches | |
| double-submits caused by accidental form re-POSTs. | |
| * **Stale-slot watchdog:** if a slot is held longer than | |
| ``MAX_RUN_SECS`` (default 30 min) it is auto-released. This is a safety | |
| net for crashes that don't run ``end_run`` (e.g. SIGKILL of a worker | |
| thread in a debugger). It is **not** a soft cap β the run continues; it | |
| just stops blocking new submissions. | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| import os | |
| import threading | |
| import time | |
| from dataclasses import dataclass | |
| from typing import Optional | |
| DEDUP_WINDOW_SECS = float(os.environ.get("RUN_DEDUP_WINDOW_SECS", "5")) | |
| MAX_RUN_SECS = float(os.environ.get("RUN_MAX_SECS", str(30 * 60))) | |
| class _ActiveRun: | |
| operation_id: str | |
| fingerprint: str | |
| started_at: float | |
| class RunRejected(Exception): | |
| """Raised when a new run is rejected because of an in-flight run. | |
| The HTTP layer translates this into a 409 Conflict response. ``reason`` | |
| is one of ``"in_flight"`` or ``"duplicate"`` so the client can choose | |
| between "wait" and "retry without changes" UX. | |
| """ | |
| def __init__(self, reason: str, message: str, operation_id: Optional[str] = None): | |
| super().__init__(message) | |
| self.reason = reason | |
| self.message = message | |
| self.operation_id = operation_id | |
| _lock = threading.Lock() | |
| _active: Optional[_ActiveRun] = None | |
| # Maps fingerprint -> wall-clock time when the matching run started. We | |
| # keep at most a handful of entries (one per accepted run inside | |
| # ``DEDUP_WINDOW_SECS``) so this never grows unbounded. | |
| _recent: dict[str, float] = {} | |
| def _gc_locked(now: float) -> None: | |
| """Drop dedup entries older than the window. Caller holds the lock.""" | |
| cutoff = now - DEDUP_WINDOW_SECS | |
| stale = [fp for fp, ts in _recent.items() if ts < cutoff] | |
| for fp in stale: | |
| _recent.pop(fp, None) | |
| def _force_release_if_stale_locked(now: float) -> None: | |
| """Auto-release a slot held longer than ``MAX_RUN_SECS``.""" | |
| global _active | |
| if _active is not None and (now - _active.started_at) > MAX_RUN_SECS: | |
| _active = None | |
| def fingerprint(route: str, payload: object) -> str: | |
| """Stable hash of ``(route, payload)``. Used for dedup window matching.""" | |
| raw = f"{route}|{repr(payload)}".encode("utf-8", errors="replace") | |
| return hashlib.sha256(raw).hexdigest()[:32] | |
| def begin_run(operation_id: str, route: str, payload: object) -> _ActiveRun: | |
| """Reserve the single-flight slot. | |
| Raises ``RunRejected`` if another run is already in flight or if the | |
| same payload was just submitted within the dedup window. | |
| """ | |
| global _active | |
| fp = fingerprint(route, payload) | |
| now = time.time() | |
| with _lock: | |
| _gc_locked(now) | |
| _force_release_if_stale_locked(now) | |
| if _active is not None: | |
| raise RunRejected( | |
| reason="in_flight", | |
| message=( | |
| f"Another generation is already running " | |
| f"(operation_id={_active.operation_id}). " | |
| "Cancel it first or wait for it to finish." | |
| ), | |
| operation_id=_active.operation_id, | |
| ) | |
| recent_ts = _recent.get(fp) | |
| if recent_ts is not None and (now - recent_ts) < DEDUP_WINDOW_SECS: | |
| raise RunRejected( | |
| reason="duplicate", | |
| message=( | |
| "Duplicate request β an identical submission was just " | |
| f"accepted {now - recent_ts:.1f}s ago. Wait a moment " | |
| "and try again if this was intentional." | |
| ), | |
| ) | |
| run = _ActiveRun(operation_id=operation_id, fingerprint=fp, started_at=now) | |
| _active = run | |
| _recent[fp] = now | |
| return run | |
| def end_run(operation_id: str) -> None: | |
| """Release the slot. Idempotent β safe to call from a ``finally`` block.""" | |
| global _active | |
| with _lock: | |
| if _active is not None and _active.operation_id == operation_id: | |
| _active = None | |
| def current_run() -> Optional[str]: | |
| """Return the operation_id of the in-flight run, if any.""" | |
| with _lock: | |
| return _active.operation_id if _active is not None else None | |