Spaces:
Running
Running
File size: 5,516 Bytes
5f3e9f5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 | """Process-wide concurrency / dedup guard for generation runs.
Why this exists
---------------
Two recurring user complaints justified adding a server-side guard:
1. *"Sometimes 2 processes run, one is duplicate of another."* β the React
UI already debounces the Submit button, but a hard refresh, a stuck
SSE stream that hasn't been GC'd by the browser, or a script firing
``/generate-sse`` from two tabs all spawn a parallel run. Two parallel
runs share the Playwright browser pool and the AI quota, double the
wall-clock time, and produce overlapping batch folders.
2. *"It is too slow β sometimes 10β15 min."* β once a stuck run is in
flight, every retry submission piles on top of it because nothing on
the backend serializes them. Fail-fast with HTTP 409 lets the UI tell
the user "another run is already in progress" instead of silently
queuing more load.
Design
------
* **Single-flight:** at most one generation runs at a time per process.
``begin_run()`` reserves the slot atomically; ``end_run()`` (idempotent)
releases it.
* **Dedup window:** if the same fingerprint (route + body hash) arrives
within ``DEDUP_WINDOW_SECS`` of an *accepted* run starting, the second
request is rejected even if the first has completed. This catches
double-submits caused by accidental form re-POSTs.
* **Stale-slot watchdog:** if a slot is held longer than
``MAX_RUN_SECS`` (default 30 min) it is auto-released. This is a safety
net for crashes that don't run ``end_run`` (e.g. SIGKILL of a worker
thread in a debugger). It is **not** a soft cap β the run continues; it
just stops blocking new submissions.
"""
from __future__ import annotations
import hashlib
import os
import threading
import time
from dataclasses import dataclass
from typing import Optional
DEDUP_WINDOW_SECS = float(os.environ.get("RUN_DEDUP_WINDOW_SECS", "5"))
MAX_RUN_SECS = float(os.environ.get("RUN_MAX_SECS", str(30 * 60)))
@dataclass
class _ActiveRun:
operation_id: str
fingerprint: str
started_at: float
class RunRejected(Exception):
"""Raised when a new run is rejected because of an in-flight run.
The HTTP layer translates this into a 409 Conflict response. ``reason``
is one of ``"in_flight"`` or ``"duplicate"`` so the client can choose
between "wait" and "retry without changes" UX.
"""
def __init__(self, reason: str, message: str, operation_id: Optional[str] = None):
super().__init__(message)
self.reason = reason
self.message = message
self.operation_id = operation_id
_lock = threading.Lock()
_active: Optional[_ActiveRun] = None
# Maps fingerprint -> wall-clock time when the matching run started. We
# keep at most a handful of entries (one per accepted run inside
# ``DEDUP_WINDOW_SECS``) so this never grows unbounded.
_recent: dict[str, float] = {}
def _gc_locked(now: float) -> None:
"""Drop dedup entries older than the window. Caller holds the lock."""
cutoff = now - DEDUP_WINDOW_SECS
stale = [fp for fp, ts in _recent.items() if ts < cutoff]
for fp in stale:
_recent.pop(fp, None)
def _force_release_if_stale_locked(now: float) -> None:
"""Auto-release a slot held longer than ``MAX_RUN_SECS``."""
global _active
if _active is not None and (now - _active.started_at) > MAX_RUN_SECS:
_active = None
def fingerprint(route: str, payload: object) -> str:
"""Stable hash of ``(route, payload)``. Used for dedup window matching."""
raw = f"{route}|{repr(payload)}".encode("utf-8", errors="replace")
return hashlib.sha256(raw).hexdigest()[:32]
def begin_run(operation_id: str, route: str, payload: object) -> _ActiveRun:
"""Reserve the single-flight slot.
Raises ``RunRejected`` if another run is already in flight or if the
same payload was just submitted within the dedup window.
"""
global _active
fp = fingerprint(route, payload)
now = time.time()
with _lock:
_gc_locked(now)
_force_release_if_stale_locked(now)
if _active is not None:
raise RunRejected(
reason="in_flight",
message=(
f"Another generation is already running "
f"(operation_id={_active.operation_id}). "
"Cancel it first or wait for it to finish."
),
operation_id=_active.operation_id,
)
recent_ts = _recent.get(fp)
if recent_ts is not None and (now - recent_ts) < DEDUP_WINDOW_SECS:
raise RunRejected(
reason="duplicate",
message=(
"Duplicate request β an identical submission was just "
f"accepted {now - recent_ts:.1f}s ago. Wait a moment "
"and try again if this was intentional."
),
)
run = _ActiveRun(operation_id=operation_id, fingerprint=fp, started_at=now)
_active = run
_recent[fp] = now
return run
def end_run(operation_id: str) -> None:
"""Release the slot. Idempotent β safe to call from a ``finally`` block."""
global _active
with _lock:
if _active is not None and _active.operation_id == operation_id:
_active = None
def current_run() -> Optional[str]:
"""Return the operation_id of the in-flight run, if any."""
with _lock:
return _active.operation_id if _active is not None else None
|