YT-AI-Automation / backend /src /utils /run_guard.py
github-actions
Sync Docker Space
5f3e9f5
"""Process-wide concurrency / dedup guard for generation runs.
Why this exists
---------------
Two recurring user complaints justified adding a server-side guard:
1. *"Sometimes 2 processes run, one is duplicate of another."* β€” the React
UI already debounces the Submit button, but a hard refresh, a stuck
SSE stream that hasn't been GC'd by the browser, or a script firing
``/generate-sse`` from two tabs all spawn a parallel run. Two parallel
runs share the Playwright browser pool and the AI quota, double the
wall-clock time, and produce overlapping batch folders.
2. *"It is too slow β€” sometimes 10–15 min."* β€” once a stuck run is in
flight, every retry submission piles on top of it because nothing on
the backend serializes them. Fail-fast with HTTP 409 lets the UI tell
the user "another run is already in progress" instead of silently
queuing more load.
Design
------
* **Single-flight:** at most one generation runs at a time per process.
``begin_run()`` reserves the slot atomically; ``end_run()`` (idempotent)
releases it.
* **Dedup window:** if the same fingerprint (route + body hash) arrives
within ``DEDUP_WINDOW_SECS`` of an *accepted* run starting, the second
request is rejected even if the first has completed. This catches
double-submits caused by accidental form re-POSTs.
* **Stale-slot watchdog:** if a slot is held longer than
``MAX_RUN_SECS`` (default 30 min) it is auto-released. This is a safety
net for crashes that don't run ``end_run`` (e.g. SIGKILL of a worker
thread in a debugger). It is **not** a soft cap β€” the run continues; it
just stops blocking new submissions.
"""
from __future__ import annotations
import hashlib
import os
import threading
import time
from dataclasses import dataclass
from typing import Optional
DEDUP_WINDOW_SECS = float(os.environ.get("RUN_DEDUP_WINDOW_SECS", "5"))
MAX_RUN_SECS = float(os.environ.get("RUN_MAX_SECS", str(30 * 60)))
@dataclass
class _ActiveRun:
operation_id: str
fingerprint: str
started_at: float
class RunRejected(Exception):
"""Raised when a new run is rejected because of an in-flight run.
The HTTP layer translates this into a 409 Conflict response. ``reason``
is one of ``"in_flight"`` or ``"duplicate"`` so the client can choose
between "wait" and "retry without changes" UX.
"""
def __init__(self, reason: str, message: str, operation_id: Optional[str] = None):
super().__init__(message)
self.reason = reason
self.message = message
self.operation_id = operation_id
_lock = threading.Lock()
_active: Optional[_ActiveRun] = None
# Maps fingerprint -> wall-clock time when the matching run started. We
# keep at most a handful of entries (one per accepted run inside
# ``DEDUP_WINDOW_SECS``) so this never grows unbounded.
_recent: dict[str, float] = {}
def _gc_locked(now: float) -> None:
"""Drop dedup entries older than the window. Caller holds the lock."""
cutoff = now - DEDUP_WINDOW_SECS
stale = [fp for fp, ts in _recent.items() if ts < cutoff]
for fp in stale:
_recent.pop(fp, None)
def _force_release_if_stale_locked(now: float) -> None:
"""Auto-release a slot held longer than ``MAX_RUN_SECS``."""
global _active
if _active is not None and (now - _active.started_at) > MAX_RUN_SECS:
_active = None
def fingerprint(route: str, payload: object) -> str:
"""Stable hash of ``(route, payload)``. Used for dedup window matching."""
raw = f"{route}|{repr(payload)}".encode("utf-8", errors="replace")
return hashlib.sha256(raw).hexdigest()[:32]
def begin_run(operation_id: str, route: str, payload: object) -> _ActiveRun:
"""Reserve the single-flight slot.
Raises ``RunRejected`` if another run is already in flight or if the
same payload was just submitted within the dedup window.
"""
global _active
fp = fingerprint(route, payload)
now = time.time()
with _lock:
_gc_locked(now)
_force_release_if_stale_locked(now)
if _active is not None:
raise RunRejected(
reason="in_flight",
message=(
f"Another generation is already running "
f"(operation_id={_active.operation_id}). "
"Cancel it first or wait for it to finish."
),
operation_id=_active.operation_id,
)
recent_ts = _recent.get(fp)
if recent_ts is not None and (now - recent_ts) < DEDUP_WINDOW_SECS:
raise RunRejected(
reason="duplicate",
message=(
"Duplicate request β€” an identical submission was just "
f"accepted {now - recent_ts:.1f}s ago. Wait a moment "
"and try again if this was intentional."
),
)
run = _ActiveRun(operation_id=operation_id, fingerprint=fp, started_at=now)
_active = run
_recent[fp] = now
return run
def end_run(operation_id: str) -> None:
"""Release the slot. Idempotent β€” safe to call from a ``finally`` block."""
global _active
with _lock:
if _active is not None and _active.operation_id == operation_id:
_active = None
def current_run() -> Optional[str]:
"""Return the operation_id of the in-flight run, if any."""
with _lock:
return _active.operation_id if _active is not None else None