"""Persistent run tracking for backend-owned generation jobs.""" from __future__ import annotations import json import os import threading import time from pathlib import Path from typing import Any RUNS_DIR = Path("output") / "runs" INDEX_PATH = RUNS_DIR / "index.json" _LOCK = threading.RLock() _ROOT_DIR = Path.cwd().resolve() def _now() -> float: return time.time() def _ensure_dir() -> None: RUNS_DIR.mkdir(parents=True, exist_ok=True) def _run_path(run_id: str) -> Path: safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in str(run_id)) return RUNS_DIR / f"{safe}.json" def _read_json(path: Path, default: Any) -> Any: if not path.exists(): return default try: with path.open("r", encoding="utf-8") as f: return json.load(f) except Exception: return default def _write_json(path: Path, data: Any) -> None: _ensure_dir() tmp_path = path.with_suffix(path.suffix + ".tmp") with tmp_path.open("w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) os.replace(tmp_path, path) def _normalize_path_string(value: str) -> str: normalized = value.replace("\\", "/") try: path = Path(normalized) if path.is_absolute(): resolved = path.resolve() try: return resolved.relative_to(_ROOT_DIR).as_posix() except ValueError: return resolved.as_posix() except Exception: pass return normalized def _normalize_output_value(value: Any) -> Any: if isinstance(value, str): return _normalize_path_string(value) if isinstance(value, list): return [_normalize_output_value(item) for item in value] if isinstance(value, dict): return {key: _normalize_output_value(item) for key, item in value.items()} return value def _normalize_outputs(outputs: dict[str, Any] | None) -> dict[str, Any]: if not outputs: return {} return {key: _normalize_output_value(value) for key, value in outputs.items()} def _load_index() -> list[dict[str, Any]]: index = _read_json(INDEX_PATH, []) return index if isinstance(index, list) else [] def _save_index(index: list[dict[str, Any]]) -> None: _write_json(INDEX_PATH, index[:500]) def _summarize(run: dict[str, Any]) -> dict[str, Any]: return { "run_id": run.get("run_id"), "operation_id": run.get("operation_id"), "tool": run.get("tool"), "title": run.get("title"), "status": run.get("status"), "progress": run.get("progress", 0), "stage": run.get("stage"), "message": run.get("message"), "model_choice": run.get("model_choice"), "input_preview": run.get("input_preview"), "input_length": run.get("input_length"), "input_fingerprint": run.get("input_fingerprint"), "created_at": run.get("created_at") or run.get("started_at"), "queued_at": run.get("queued_at"), "started_at": run.get("started_at"), "updated_at": run.get("updated_at"), "completed_at": run.get("completed_at"), "duration_seconds": run.get("duration_seconds"), "queued_seconds": run.get("queued_seconds"), "queue_position": run.get("queue_position"), "outputs": _normalize_outputs(run.get("outputs", {})), "metrics": run.get("metrics", {}), } def _upsert_index_summary(run: dict[str, Any]) -> None: summary = _summarize(run) index = [item for item in _load_index() if item.get("run_id") != run.get("run_id")] index.insert(0, summary) index.sort( key=lambda item: item.get("created_at") or item.get("started_at") or item.get("queued_at") or 0, reverse=True, ) _save_index(index) def create_run( *, tool: str, title: str, input_text: str, settings: dict[str, Any] | None = None, model_choice: str | None = None, operation_id: str | None = None, run_id: str | None = None, status: str = "running", input_fingerprint: str | None = None, ) -> dict[str, Any]: with _LOCK: created_at = _now() run_id = run_id or operation_id or f"run_{int(created_at * 1000)}" run = { "run_id": run_id, "operation_id": operation_id, "tool": tool, "title": title, "status": status, "progress": 0, "stage": "queued" if status == "queued" else "created", "message": "Process queued" if status == "queued" else "Process created", "model_choice": model_choice, "input": input_text, "input_preview": input_text[:240], "input_length": len(input_text), "input_fingerprint": input_fingerprint, "settings": settings or {}, "outputs": {}, "metrics": {}, "events": [], "created_at": created_at, "queued_at": created_at if status == "queued" else None, "started_at": created_at if status == "running" else None, "updated_at": created_at, "completed_at": None, "duration_seconds": None, "queued_seconds": None, "queue_position": None, } run["events"].append({ "time": created_at, "type": "created", "stage": run["stage"], "progress": 0, "message": run["message"], }) _write_json(_run_path(run_id), run) _upsert_index_summary(run) return run def update_run( run_id: str, *, status: str | None = None, stage: str | None = None, message: str | None = None, progress: int | None = None, settings: dict[str, Any] | None = None, outputs: dict[str, Any] | None = None, metrics: dict[str, Any] | None = None, queue_position: int | None = None, ) -> dict[str, Any] | None: with _LOCK: run = _read_json(_run_path(run_id), None) if not isinstance(run, dict): return None ts = _now() if status is not None: run["status"] = status if stage is not None: run["stage"] = stage if message is not None: run["message"] = message if progress is not None: run["progress"] = max(0, min(100, int(progress))) if settings: run.setdefault("settings", {}).update(settings) if outputs: run.setdefault("outputs", {}).update(_normalize_outputs(outputs)) if metrics: run.setdefault("metrics", {}).update(metrics) if queue_position is not None: run["queue_position"] = queue_position if status is not None and status != "queued": run["queue_position"] = None if status == "running": if not run.get("started_at"): run["started_at"] = ts if run.get("queued_at"): try: run["queued_seconds"] = round(float(run["started_at"]) - float(run["queued_at"]), 2) except Exception: pass run["updated_at"] = ts _write_json(_run_path(run_id), run) _upsert_index_summary(run) return run def add_event( run_id: str, *, event_type: str, message: str, stage: str | None = None, progress: int | None = None, data: dict[str, Any] | None = None, ) -> dict[str, Any] | None: with _LOCK: run = _read_json(_run_path(run_id), None) if not isinstance(run, dict): return None ts = _now() event = { "time": ts, "type": event_type, "stage": stage, "progress": progress, "message": message, "data": data or {}, } run.setdefault("events", []).append(event) if stage is not None: run["stage"] = stage if progress is not None: run["progress"] = max(0, min(100, int(progress))) run["message"] = message run["updated_at"] = ts _write_json(_run_path(run_id), run) _upsert_index_summary(run) return run def attach_output(run_id: str, key: str, value: Any) -> None: update_run(run_id, outputs={key: value}) def update_metrics(run_id: str, values: dict[str, Any]) -> None: update_run(run_id, metrics=values) def finish_run( run_id: str, *, status: str, message: str, progress: int | None = 100, outputs: dict[str, Any] | None = None, metrics: dict[str, Any] | None = None, ) -> dict[str, Any] | None: with _LOCK: run = _read_json(_run_path(run_id), None) if not isinstance(run, dict): return None ts = _now() run["status"] = status run["message"] = message run["stage"] = status if progress is not None: run["progress"] = max(0, min(100, int(progress))) run["completed_at"] = ts run["updated_at"] = ts if run.get("started_at"): try: run["duration_seconds"] = round(ts - float(run["started_at"]), 2) except Exception: pass if outputs: run.setdefault("outputs", {}).update(_normalize_outputs(outputs)) if metrics: run.setdefault("metrics", {}).update(metrics) run.setdefault("events", []).append({ "time": ts, "type": status, "stage": status, "progress": run.get("progress"), "message": message, }) _write_json(_run_path(run_id), run) _upsert_index_summary(run) return run def get_run(run_id: str, include_input: bool = False) -> dict[str, Any] | None: with _LOCK: run = _read_json(_run_path(run_id), None) if not isinstance(run, dict): return None run = dict(run) if not include_input: run.pop("input", None) run["outputs"] = _normalize_outputs(run.get("outputs")) return run def list_runs(limit: int = 100) -> list[dict[str, Any]]: with _LOCK: return [dict(item) for item in _load_index()[:limit] if isinstance(item, dict)] def _recovered_outputs_for_interrupted_run(run: dict[str, Any]) -> dict[str, Any] | None: """Detect outputs already on disk for a run that was interrupted at restart. When the backend is restarted while a generation is queued/running, the run's status is still queued/running in the index even though the workflow's MP4 / PPTX may already be fully written to disk. Mark such runs as ``completed`` instead of ``failed`` so the user isn't told the run failed when they actually have a working video. Returns a dict of output keys (matching the keys ``ctx.complete()`` would have written) when the canonical output for the run's tool exists on disk with a non-trivial size and a modification time after the run started. Returns ``None`` otherwise (caller falls back to the failed "interrupted by app restart" behavior). """ if not isinstance(run, dict): return None tool = str(run.get("tool") or "") settings = run.get("settings") or {} if not isinstance(settings, dict): settings = {} input_text = str(run.get("input") or "") output_format = str(settings.get("output_format") or "").lower() # Lazy imports — ``run_manager`` lives in ``backend/src/core`` and the # canonical-stem helper lives under ``backend/routes``. Importing at # module load would create a cycle. try: import sys as _sys _here = Path(__file__).resolve() _backend = _here.parents[2] for _path in (_backend, _backend / "routes", _backend / "config"): if str(_path) not in _sys.path: _sys.path.insert(0, str(_path)) from helpers import youtube_video_stem # type: ignore try: from config import ( # type: ignore POWERPOINT_OUTPUT_FOLDER, POWERPOINT_VIDEO_FOLDER, ) except Exception: POWERPOINT_OUTPUT_FOLDER = "output/presentations" POWERPOINT_VIDEO_FOLDER = "output/videos" except Exception: return None project_info = { "class_name": settings.get("class_name") or "", "subject": settings.get("subject") or "", "title": settings.get("title") or "", "exercise_year": settings.get("exercise_year") or "", } output_name = str(settings.get("output_name") or settings.get("title") or "") try: stem = youtube_video_stem(project_info, output_name, input_text) except Exception: return None started_at = 0.0 for key in ("started_at", "queued_at", "created_at"): try: value = float(run.get(key) or 0) except (TypeError, ValueError): value = 0.0 if value: started_at = value break def _check(rel: str, min_size: int) -> bool: path = Path(rel) if not path.is_absolute(): path = Path.cwd() / path try: stat = path.stat() except OSError: return False if stat.st_size < min_size: return False # Guard against marking a run as completed because of a stale file # left over from a *previous* run that wrote the same canonical # path. Require the file to have been touched at or after this # run's start (with a small clock skew margin). if started_at and stat.st_mtime + 5 < started_at: return False return True is_video_tool = ( tool in {"text-to-video", "html-to-video", "screenshots-to-video", "image-to-video"} or output_format == "video" ) is_pptx_tool = ( tool in {"text-to-pptx", "html-to-pptx"} or output_format == "pptx" ) outputs: dict[str, Any] = {} if is_video_tool: video_rel = f"{POWERPOINT_VIDEO_FOLDER}/{stem}.mp4" if _check(video_rel, 100_000): outputs["video_file"] = video_rel outputs["video_path"] = video_rel pptx_rel = f"{POWERPOINT_OUTPUT_FOLDER}/{stem}.pptx" if _check(pptx_rel, 10_000): outputs["presentation_file"] = pptx_rel outputs["presentation_path"] = pptx_rel elif is_pptx_tool: pptx_rel = f"{POWERPOINT_OUTPUT_FOLDER}/{stem}.pptx" if _check(pptx_rel, 10_000): outputs["presentation_file"] = pptx_rel outputs["presentation_path"] = pptx_rel return outputs or None def mark_interrupted_active_runs() -> dict[str, int]: """Reconcile queued/running/paused runs with what's actually on disk. Called once on app startup. Runs whose canonical video/pptx output already exists on disk are marked ``completed`` (the workflow ran to completion, the user just didn't see it because the app restarted before the run JSON reached the ``completed`` state). Everything else is marked ``failed`` with the legacy "interrupted" message. """ with _LOCK: interrupted = 0 recovered = 0 for item in list(_load_index()): if str(item.get("status") or "").lower() not in {"queued", "running", "paused"}: continue run_id = str(item.get("run_id") or "") if not run_id: continue full_run = _read_json(_run_path(run_id), None) recovered_outputs = ( _recovered_outputs_for_interrupted_run(full_run) if isinstance(full_run, dict) else None ) if recovered_outputs: finish_run( run_id, status="completed", message="Recovered after app restart — output was already written to disk.", progress=100, outputs=recovered_outputs, ) recovered += 1 else: finish_run( run_id, status="failed", message="Interrupted by app restart before the queue could finish.", progress=item.get("progress"), ) interrupted += 1 return {"interrupted": interrupted, "recovered": recovered} def find_active_run_by_fingerprint( tool: str, input_fingerprint: str, statuses: set[str] | None = None, ) -> dict[str, Any] | None: statuses = statuses or {"queued", "running", "paused"} with _LOCK: for item in _load_index(): if item.get("tool") != tool: continue if item.get("status") not in statuses: continue if item.get("input_fingerprint") == input_fingerprint: return item return None def find_recent_run_by_fingerprint( tool: str, input_fingerprint: str, within_seconds: float, ) -> dict[str, Any] | None: """Return a recently-finished run with a matching fingerprint, if any. Used as defense-in-depth against the client dispatching the same payload twice in quick succession. ``find_active_run_by_fingerprint`` already blocks simultaneous duplicates; this catches the narrower race where the client fires a second identical submission within ``within_seconds`` of the first completing. Only ``completed`` runs are considered — a recently failed/cancelled run should always be allowed to retry. """ if within_seconds <= 0: return None now = time.time() with _LOCK: for item in _load_index(): if item.get("tool") != tool: continue if item.get("status") != "completed": continue if item.get("input_fingerprint") != input_fingerprint: continue finished_at = item.get("completed_at") or item.get("updated_at") or 0 try: finished_at = float(finished_at) except (TypeError, ValueError): continue if finished_at <= 0: continue if now - finished_at <= within_seconds: return item return None