YT-AI-Automation / backend /src /core /run_manager.py
github-actions
Sync Docker Space
5f3e9f5
"""Persistent run tracking for backend-owned generation jobs."""
from __future__ import annotations
import json
import os
import threading
import time
from pathlib import Path
from typing import Any
RUNS_DIR = Path("output") / "runs"
INDEX_PATH = RUNS_DIR / "index.json"
_LOCK = threading.RLock()
_ROOT_DIR = Path.cwd().resolve()
def _now() -> float:
return time.time()
def _ensure_dir() -> None:
RUNS_DIR.mkdir(parents=True, exist_ok=True)
def _run_path(run_id: str) -> Path:
safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in str(run_id))
return RUNS_DIR / f"{safe}.json"
def _read_json(path: Path, default: Any) -> Any:
if not path.exists():
return default
try:
with path.open("r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return default
def _write_json(path: Path, data: Any) -> None:
_ensure_dir()
tmp_path = path.with_suffix(path.suffix + ".tmp")
with tmp_path.open("w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(tmp_path, path)
def _normalize_path_string(value: str) -> str:
normalized = value.replace("\\", "/")
try:
path = Path(normalized)
if path.is_absolute():
resolved = path.resolve()
try:
return resolved.relative_to(_ROOT_DIR).as_posix()
except ValueError:
return resolved.as_posix()
except Exception:
pass
return normalized
def _normalize_output_value(value: Any) -> Any:
if isinstance(value, str):
return _normalize_path_string(value)
if isinstance(value, list):
return [_normalize_output_value(item) for item in value]
if isinstance(value, dict):
return {key: _normalize_output_value(item) for key, item in value.items()}
return value
def _normalize_outputs(outputs: dict[str, Any] | None) -> dict[str, Any]:
if not outputs:
return {}
return {key: _normalize_output_value(value) for key, value in outputs.items()}
def _load_index() -> list[dict[str, Any]]:
index = _read_json(INDEX_PATH, [])
return index if isinstance(index, list) else []
def _save_index(index: list[dict[str, Any]]) -> None:
_write_json(INDEX_PATH, index[:500])
def _summarize(run: dict[str, Any]) -> dict[str, Any]:
return {
"run_id": run.get("run_id"),
"operation_id": run.get("operation_id"),
"tool": run.get("tool"),
"title": run.get("title"),
"status": run.get("status"),
"progress": run.get("progress", 0),
"stage": run.get("stage"),
"message": run.get("message"),
"model_choice": run.get("model_choice"),
"input_preview": run.get("input_preview"),
"input_length": run.get("input_length"),
"input_fingerprint": run.get("input_fingerprint"),
"created_at": run.get("created_at") or run.get("started_at"),
"queued_at": run.get("queued_at"),
"started_at": run.get("started_at"),
"updated_at": run.get("updated_at"),
"completed_at": run.get("completed_at"),
"duration_seconds": run.get("duration_seconds"),
"queued_seconds": run.get("queued_seconds"),
"queue_position": run.get("queue_position"),
"outputs": _normalize_outputs(run.get("outputs", {})),
"metrics": run.get("metrics", {}),
}
def _upsert_index_summary(run: dict[str, Any]) -> None:
summary = _summarize(run)
index = [item for item in _load_index() if item.get("run_id") != run.get("run_id")]
index.insert(0, summary)
index.sort(
key=lambda item: item.get("created_at")
or item.get("started_at")
or item.get("queued_at")
or 0,
reverse=True,
)
_save_index(index)
def create_run(
*,
tool: str,
title: str,
input_text: str,
settings: dict[str, Any] | None = None,
model_choice: str | None = None,
operation_id: str | None = None,
run_id: str | None = None,
status: str = "running",
input_fingerprint: str | None = None,
) -> dict[str, Any]:
with _LOCK:
created_at = _now()
run_id = run_id or operation_id or f"run_{int(created_at * 1000)}"
run = {
"run_id": run_id,
"operation_id": operation_id,
"tool": tool,
"title": title,
"status": status,
"progress": 0,
"stage": "queued" if status == "queued" else "created",
"message": "Process queued" if status == "queued" else "Process created",
"model_choice": model_choice,
"input": input_text,
"input_preview": input_text[:240],
"input_length": len(input_text),
"input_fingerprint": input_fingerprint,
"settings": settings or {},
"outputs": {},
"metrics": {},
"events": [],
"created_at": created_at,
"queued_at": created_at if status == "queued" else None,
"started_at": created_at if status == "running" else None,
"updated_at": created_at,
"completed_at": None,
"duration_seconds": None,
"queued_seconds": None,
"queue_position": None,
}
run["events"].append({
"time": created_at,
"type": "created",
"stage": run["stage"],
"progress": 0,
"message": run["message"],
})
_write_json(_run_path(run_id), run)
_upsert_index_summary(run)
return run
def update_run(
run_id: str,
*,
status: str | None = None,
stage: str | None = None,
message: str | None = None,
progress: int | None = None,
settings: dict[str, Any] | None = None,
outputs: dict[str, Any] | None = None,
metrics: dict[str, Any] | None = None,
queue_position: int | None = None,
) -> dict[str, Any] | None:
with _LOCK:
run = _read_json(_run_path(run_id), None)
if not isinstance(run, dict):
return None
ts = _now()
if status is not None:
run["status"] = status
if stage is not None:
run["stage"] = stage
if message is not None:
run["message"] = message
if progress is not None:
run["progress"] = max(0, min(100, int(progress)))
if settings:
run.setdefault("settings", {}).update(settings)
if outputs:
run.setdefault("outputs", {}).update(_normalize_outputs(outputs))
if metrics:
run.setdefault("metrics", {}).update(metrics)
if queue_position is not None:
run["queue_position"] = queue_position
if status is not None and status != "queued":
run["queue_position"] = None
if status == "running":
if not run.get("started_at"):
run["started_at"] = ts
if run.get("queued_at"):
try:
run["queued_seconds"] = round(float(run["started_at"]) - float(run["queued_at"]), 2)
except Exception:
pass
run["updated_at"] = ts
_write_json(_run_path(run_id), run)
_upsert_index_summary(run)
return run
def add_event(
run_id: str,
*,
event_type: str,
message: str,
stage: str | None = None,
progress: int | None = None,
data: dict[str, Any] | None = None,
) -> dict[str, Any] | None:
with _LOCK:
run = _read_json(_run_path(run_id), None)
if not isinstance(run, dict):
return None
ts = _now()
event = {
"time": ts,
"type": event_type,
"stage": stage,
"progress": progress,
"message": message,
"data": data or {},
}
run.setdefault("events", []).append(event)
if stage is not None:
run["stage"] = stage
if progress is not None:
run["progress"] = max(0, min(100, int(progress)))
run["message"] = message
run["updated_at"] = ts
_write_json(_run_path(run_id), run)
_upsert_index_summary(run)
return run
def attach_output(run_id: str, key: str, value: Any) -> None:
update_run(run_id, outputs={key: value})
def update_metrics(run_id: str, values: dict[str, Any]) -> None:
update_run(run_id, metrics=values)
def finish_run(
run_id: str,
*,
status: str,
message: str,
progress: int | None = 100,
outputs: dict[str, Any] | None = None,
metrics: dict[str, Any] | None = None,
) -> dict[str, Any] | None:
with _LOCK:
run = _read_json(_run_path(run_id), None)
if not isinstance(run, dict):
return None
ts = _now()
run["status"] = status
run["message"] = message
run["stage"] = status
if progress is not None:
run["progress"] = max(0, min(100, int(progress)))
run["completed_at"] = ts
run["updated_at"] = ts
if run.get("started_at"):
try:
run["duration_seconds"] = round(ts - float(run["started_at"]), 2)
except Exception:
pass
if outputs:
run.setdefault("outputs", {}).update(_normalize_outputs(outputs))
if metrics:
run.setdefault("metrics", {}).update(metrics)
run.setdefault("events", []).append({
"time": ts,
"type": status,
"stage": status,
"progress": run.get("progress"),
"message": message,
})
_write_json(_run_path(run_id), run)
_upsert_index_summary(run)
return run
def get_run(run_id: str, include_input: bool = False) -> dict[str, Any] | None:
with _LOCK:
run = _read_json(_run_path(run_id), None)
if not isinstance(run, dict):
return None
run = dict(run)
if not include_input:
run.pop("input", None)
run["outputs"] = _normalize_outputs(run.get("outputs"))
return run
def list_runs(limit: int = 100) -> list[dict[str, Any]]:
with _LOCK:
return [dict(item) for item in _load_index()[:limit] if isinstance(item, dict)]
def _recovered_outputs_for_interrupted_run(run: dict[str, Any]) -> dict[str, Any] | None:
"""Detect outputs already on disk for a run that was interrupted at restart.
When the backend is restarted while a generation is queued/running, the
run's status is still queued/running in the index even though the
workflow's MP4 / PPTX may already be fully written to disk. Mark such
runs as ``completed`` instead of ``failed`` so the user isn't told the
run failed when they actually have a working video.
Returns a dict of output keys (matching the keys ``ctx.complete()`` would
have written) when the canonical output for the run's tool exists on
disk with a non-trivial size and a modification time after the run
started. Returns ``None`` otherwise (caller falls back to the failed
"interrupted by app restart" behavior).
"""
if not isinstance(run, dict):
return None
tool = str(run.get("tool") or "")
settings = run.get("settings") or {}
if not isinstance(settings, dict):
settings = {}
input_text = str(run.get("input") or "")
output_format = str(settings.get("output_format") or "").lower()
# Lazy imports — ``run_manager`` lives in ``backend/src/core`` and the
# canonical-stem helper lives under ``backend/routes``. Importing at
# module load would create a cycle.
try:
import sys as _sys
_here = Path(__file__).resolve()
_backend = _here.parents[2]
for _path in (_backend, _backend / "routes", _backend / "config"):
if str(_path) not in _sys.path:
_sys.path.insert(0, str(_path))
from helpers import youtube_video_stem # type: ignore
try:
from config import ( # type: ignore
POWERPOINT_OUTPUT_FOLDER,
POWERPOINT_VIDEO_FOLDER,
)
except Exception:
POWERPOINT_OUTPUT_FOLDER = "output/presentations"
POWERPOINT_VIDEO_FOLDER = "output/videos"
except Exception:
return None
project_info = {
"class_name": settings.get("class_name") or "",
"subject": settings.get("subject") or "",
"title": settings.get("title") or "",
"exercise_year": settings.get("exercise_year") or "",
}
output_name = str(settings.get("output_name") or settings.get("title") or "")
try:
stem = youtube_video_stem(project_info, output_name, input_text)
except Exception:
return None
started_at = 0.0
for key in ("started_at", "queued_at", "created_at"):
try:
value = float(run.get(key) or 0)
except (TypeError, ValueError):
value = 0.0
if value:
started_at = value
break
def _check(rel: str, min_size: int) -> bool:
path = Path(rel)
if not path.is_absolute():
path = Path.cwd() / path
try:
stat = path.stat()
except OSError:
return False
if stat.st_size < min_size:
return False
# Guard against marking a run as completed because of a stale file
# left over from a *previous* run that wrote the same canonical
# path. Require the file to have been touched at or after this
# run's start (with a small clock skew margin).
if started_at and stat.st_mtime + 5 < started_at:
return False
return True
is_video_tool = (
tool in {"text-to-video", "html-to-video", "screenshots-to-video", "image-to-video"}
or output_format == "video"
)
is_pptx_tool = (
tool in {"text-to-pptx", "html-to-pptx"}
or output_format == "pptx"
)
outputs: dict[str, Any] = {}
if is_video_tool:
video_rel = f"{POWERPOINT_VIDEO_FOLDER}/{stem}.mp4"
if _check(video_rel, 100_000):
outputs["video_file"] = video_rel
outputs["video_path"] = video_rel
pptx_rel = f"{POWERPOINT_OUTPUT_FOLDER}/{stem}.pptx"
if _check(pptx_rel, 10_000):
outputs["presentation_file"] = pptx_rel
outputs["presentation_path"] = pptx_rel
elif is_pptx_tool:
pptx_rel = f"{POWERPOINT_OUTPUT_FOLDER}/{stem}.pptx"
if _check(pptx_rel, 10_000):
outputs["presentation_file"] = pptx_rel
outputs["presentation_path"] = pptx_rel
return outputs or None
def mark_interrupted_active_runs() -> dict[str, int]:
"""Reconcile queued/running/paused runs with what's actually on disk.
Called once on app startup. Runs whose canonical video/pptx output
already exists on disk are marked ``completed`` (the workflow ran to
completion, the user just didn't see it because the app restarted
before the run JSON reached the ``completed`` state). Everything else
is marked ``failed`` with the legacy "interrupted" message.
"""
with _LOCK:
interrupted = 0
recovered = 0
for item in list(_load_index()):
if str(item.get("status") or "").lower() not in {"queued", "running", "paused"}:
continue
run_id = str(item.get("run_id") or "")
if not run_id:
continue
full_run = _read_json(_run_path(run_id), None)
recovered_outputs = (
_recovered_outputs_for_interrupted_run(full_run)
if isinstance(full_run, dict)
else None
)
if recovered_outputs:
finish_run(
run_id,
status="completed",
message="Recovered after app restart — output was already written to disk.",
progress=100,
outputs=recovered_outputs,
)
recovered += 1
else:
finish_run(
run_id,
status="failed",
message="Interrupted by app restart before the queue could finish.",
progress=item.get("progress"),
)
interrupted += 1
return {"interrupted": interrupted, "recovered": recovered}
def find_active_run_by_fingerprint(
tool: str,
input_fingerprint: str,
statuses: set[str] | None = None,
) -> dict[str, Any] | None:
statuses = statuses or {"queued", "running", "paused"}
with _LOCK:
for item in _load_index():
if item.get("tool") != tool:
continue
if item.get("status") not in statuses:
continue
if item.get("input_fingerprint") == input_fingerprint:
return item
return None
def find_recent_run_by_fingerprint(
tool: str,
input_fingerprint: str,
within_seconds: float,
) -> dict[str, Any] | None:
"""Return a recently-finished run with a matching fingerprint, if any.
Used as defense-in-depth against the client dispatching the same payload
twice in quick succession. ``find_active_run_by_fingerprint`` already
blocks simultaneous duplicates; this catches the narrower race where
the client fires a second identical submission within ``within_seconds``
of the first completing. Only ``completed`` runs are considered — a
recently failed/cancelled run should always be allowed to retry.
"""
if within_seconds <= 0:
return None
now = time.time()
with _LOCK:
for item in _load_index():
if item.get("tool") != tool:
continue
if item.get("status") != "completed":
continue
if item.get("input_fingerprint") != input_fingerprint:
continue
finished_at = item.get("completed_at") or item.get("updated_at") or 0
try:
finished_at = float(finished_at)
except (TypeError, ValueError):
continue
if finished_at <= 0:
continue
if now - finished_at <= within_seconds:
return item
return None