from __future__ import annotations import argparse import os from datetime import datetime from pathlib import Path from typing import Any, Dict, Iterable, List, Optional PROJECT_ROOT = Path(__file__).resolve().parent.parent from flask import Flask, abort, jsonify, send_from_directory, url_for, request from openspace.recording.action_recorder import analyze_agent_actions, load_agent_actions from openspace.recording.utils import load_recording_session from openspace.skill_engine import SkillStore from openspace.skill_engine.types import SkillRecord API_PREFIX = "/api/v1" FRONTEND_DIST_DIR = PROJECT_ROOT / "frontend" / "dist" WORKFLOW_ROOTS = [ PROJECT_ROOT / "logs" / "recordings", PROJECT_ROOT / "logs" / "trajectories", PROJECT_ROOT / "gdpval_bench" / "results", ] PIPELINE_STAGES = [ { "id": "initialize", "title": "Initialize", "description": "Load LLM, grounding backends, recording, registry, analyzer, and evolver.", }, { "id": "select-skills", "title": "Skill Selection", "description": "Select candidate skills and write selection metadata before execution.", }, { "id": "phase-1-skill", "title": "Skill Phase", "description": "Run the task with injected skill context whenever matching skills exist.", }, { "id": "phase-2-fallback", "title": "Tool Fallback", "description": "Fallback to tool-only execution when the skill-guided phase fails or no skills match.", }, { "id": "analysis", "title": "Execution Analysis", "description": "Persist metadata, trajectory, and post-run execution judgments.", }, { "id": "evolution", "title": "Skill Evolution", "description": "Trigger fix / derived / captured evolution and periodic quality checks.", }, ] _STORE: SkillStore | None = None def create_app() -> Flask: app = Flask(__name__, static_folder=None) @app.before_request def check_api_key(): # Allow preflight requests (CORS) if request.method == "OPTIONS": return expected_key = os.environ.get("OPENSPACE_API_KEY") if expected_key: auth_header = request.headers.get("Authorization") x_api_key = request.headers.get("X-API-Key") is_valid_auth = auth_header and auth_header == f"Bearer {expected_key}" is_valid_x = x_api_key and x_api_key == expected_key if not (is_valid_auth or is_valid_x): abort(401, description="Unauthorized: Invalid or missing API Key") @app.route(f"{API_PREFIX}/health", methods=["GET"]) def health() -> Any: workflows = _discover_workflow_dirs() store = _get_store() return jsonify( { "status": "ok", "project_root": str(PROJECT_ROOT), "db_path": str(store.db_path), "db_exists": store.db_path.exists(), "frontend_dist_exists": FRONTEND_DIST_DIR.exists(), "workflow_roots": [str(path) for path in WORKFLOW_ROOTS], "workflow_count": len(workflows), } ) @app.route(f"{API_PREFIX}/overview", methods=["GET"]) def overview() -> Any: store = _get_store() skills = list(store.load_all(active_only=False).values()) workflows = [_build_workflow_summary(path) for path in _discover_workflow_dirs()] top_skills = _sort_skills(skills, sort_key="score")[:5] recent_skills = _sort_skills(skills, sort_key="updated")[:5] average_score = round( sum(_skill_score(record) for record in skills) / len(skills), 1 ) if skills else 0.0 average_workflow_success = round( (sum((item.get("success_rate") or 0.0) for item in workflows) / len(workflows)) * 100, 1, ) if workflows else 0.0 return jsonify( { "health": { "status": "ok", "db_path": str(store.db_path), "workflow_count": len(workflows), "frontend_dist_exists": FRONTEND_DIST_DIR.exists(), }, "pipeline": PIPELINE_STAGES, "skills": { "summary": _build_skill_stats(store, skills), "average_score": average_score, "top": [_serialize_skill(item) for item in top_skills], "recent": [_serialize_skill(item) for item in recent_skills], }, "workflows": { "total": len(workflows), "average_success_rate": average_workflow_success, "recent": workflows[:5], }, } ) @app.route(f"{API_PREFIX}/skills/me", methods=["GET"]) def my_skills() -> Any: store = _get_store() # 这里应该获取 user ID。因为前端传了 API KEY,如果没有绑定到具体用户,这里返回全部 skills = list(store.load_all(active_only=True).values()) limit = _int_arg("limit", 100) sort_key = (_str_arg("sort", "updated") or "updated").lower() items = [_serialize_skill(item) for item in _sort_skills(skills, sort_key=sort_key)[:limit]] return jsonify({"items": items, "count": len(items)}) @app.route(f"{API_PREFIX}/skills", methods=["GET"]) def list_skills() -> Any: store = _get_store() active_only = _bool_arg("active_only", True) limit = _int_arg("limit", 100) sort_key = (_str_arg("sort", "score") or "score").lower() skills = list(store.load_all(active_only=active_only).values()) query = (_str_arg("query", "") or "").strip().lower() if query: skills = [ record for record in skills if query in record.name.lower() or query in record.skill_id.lower() or query in record.description.lower() or any(query in tag.lower() for tag in record.tags) ] items = [_serialize_skill(item) for item in _sort_skills(skills, sort_key=sort_key)[:limit]] return jsonify({"items": items, "count": len(items), "active_only": active_only}) @app.route(f"{API_PREFIX}/skills/stats", methods=["GET"]) def skill_stats() -> Any: store = _get_store() skills = list(store.load_all(active_only=False).values()) return jsonify(_build_skill_stats(store, skills)) @app.route(f"{API_PREFIX}/skills/", methods=["GET"]) def skill_detail(skill_id: str) -> Any: store = _get_store() record = store.load_record(skill_id) if not record: abort(404, description=f"Unknown skill_id: {skill_id}") detail = _serialize_skill(record, include_recent_analyses=True) detail["lineage_graph"] = _build_lineage_payload(skill_id, store) detail["recent_analyses"] = [analysis.to_dict() for analysis in store.load_analyses(skill_id=skill_id, limit=10)] detail["source"] = _load_skill_source(record) return jsonify(detail) @app.route(f"{API_PREFIX}/skills//lineage", methods=["GET"]) def skill_lineage(skill_id: str) -> Any: store = _get_store() if not store.load_record(skill_id): abort(404, description=f"Unknown skill_id: {skill_id}") return jsonify(_build_lineage_payload(skill_id, store)) @app.route(f"{API_PREFIX}/skills//source", methods=["GET"]) def skill_source(skill_id: str) -> Any: store = _get_store() record = store.load_record(skill_id) if not record: abort(404, description=f"Unknown skill_id: {skill_id}") return jsonify(_load_skill_source(record)) @app.route(f"{API_PREFIX}/skills/", methods=["PUT"]) def update_skill(skill_id: str) -> Any: store = _get_store() record = store.load_record(skill_id) if not record: abort(404, description=f"Unknown skill_id: {skill_id}") data = request.json if not data: abort(400, description="Invalid JSON payload") # 鉴权:只有 created_by 和 API Key 对应的用户能修改,这里我们先允许有正确 API Key 的人修改 # 如果系统里没有存 user ID,我们可以暂时跳过 created_by 校验,因为之前 global 的 API key 已经验证过了。 from openspace.skill_engine.types import SkillVisibility try: if "visibility" in data: vis_val = data["visibility"].upper() if vis_val == "GROUP_ONLY": vis_val = "PRIVATE" record.visibility = SkillVisibility[vis_val] except KeyError: pass if "tags" in data and isinstance(data["tags"], list): record.tags = set(data["tags"]) if "description" in data: record.description = data["description"] # 注意:这里需要持久化 import asyncio loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(store.save_record(record)) finally: loop.close() return jsonify({"status": "success", "skill_id": skill_id}) @app.route(f"{API_PREFIX}/skills/", methods=["DELETE"]) def delete_skill(skill_id: str) -> Any: store = _get_store() record = store.load_record(skill_id) if not record: abort(404, description=f"Unknown skill_id: {skill_id}") # 删除文件 import shutil skill_dir = Path(record.path) if skill_dir.exists(): shutil.rmtree(skill_dir, ignore_errors=True) # 从数据库删除 import asyncio loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: # 假设 store 提供 delete_record 方法,如果没有我们可以把 is_active 置为 False if hasattr(store, 'delete_record'): loop.run_until_complete(store.delete_record(skill_id)) else: record.is_active = False loop.run_until_complete(store.save_record(record)) finally: loop.close() return jsonify({"status": "success", "skill_id": skill_id, "deleted": True}) @app.route(f"{API_PREFIX}/artifacts/stage", methods=["POST"]) def stage_artifact() -> Any: if 'files' not in request.files: abort(400, description="No files part in the request") files = request.files.getlist('files') if not files: abort(400, description="No files selected for uploading") import uuid artifact_id = f"art_{uuid.uuid4().hex[:8]}" stage_dir = PROJECT_ROOT / "skills" / artifact_id stage_dir.mkdir(parents=True, exist_ok=True) saved_count = 0 for file in files: if file.filename: # Need to handle nested paths like sub/file.txt # request.files filename could contain slashes if using webkitRelativePath # But typically multipart form-data filename is just the basename # However client.py sets: filename="{rel_path}" safe_path = (stage_dir / file.filename).resolve() if stage_dir not in safe_path.parents and safe_path != stage_dir: continue safe_path.parent.mkdir(parents=True, exist_ok=True) file.save(str(safe_path)) saved_count += 1 return jsonify({ "artifact_id": artifact_id, "stats": {"file_count": saved_count} }) @app.route(f"{API_PREFIX}/records", methods=["POST"]) def create_record() -> Any: data = request.json if not data: abort(400, description="Invalid JSON payload") artifact_id = data.get("artifact_id") if not artifact_id: abort(400, description="Missing artifact_id") stage_dir = PROJECT_ROOT / "skills" / artifact_id if not stage_dir.exists(): abort(404, description="Artifact not found") from openspace.skill_engine.skill_utils import parse_frontmatter skill_file = stage_dir / "SKILL.md" if not skill_file.exists(): abort(400, description="Missing SKILL.md") try: content = skill_file.read_text(encoding="utf-8") fm = parse_frontmatter(content) skill_name = fm.get("name", artifact_id) except Exception: skill_name = artifact_id final_dir = PROJECT_ROOT / "skills" / skill_name # If final_dir exists, we might need to overwrite or abort if final_dir.exists() and final_dir != stage_dir: import shutil shutil.rmtree(final_dir, ignore_errors=True) try: stage_dir.rename(final_dir) except OSError: import shutil shutil.copytree(stage_dir, final_dir, dirs_exist_ok=True) shutil.rmtree(stage_dir, ignore_errors=True) # Register the skill using SkillRegistry from openspace.skill_engine import SkillRegistry registry = SkillRegistry([PROJECT_ROOT / "skills"]) meta = registry.register_skill_dir(final_dir) if not meta: abort(400, description="Failed to register skill (missing SKILL.md or invalid)") store = _get_store() from openspace.skill_engine.types import SkillRecord, SkillLineage, SkillOrigin, SkillVisibility from openspace.skill_engine.patch import collect_skill_snapshot, compute_unified_diff try: origin_val = data.get("origin", "imported").upper() origin = SkillOrigin[origin_val] except KeyError: origin = SkillOrigin.IMPORTED try: vis_val = data.get("visibility", "public").upper() if vis_val == "GROUP_ONLY": vis_val = "PRIVATE" visibility = SkillVisibility[vis_val] except KeyError: visibility = SkillVisibility.PUBLIC snapshot = {} content_diff = "" try: snapshot = collect_skill_snapshot(final_dir) content_diff = "\n".join( compute_unified_diff("", text, filename=name) for name, text in sorted(snapshot.items()) if compute_unified_diff("", text, filename=name) ) except Exception: pass record = SkillRecord( skill_id=meta.skill_id, name=meta.name, description=meta.description, path=str(meta.path), is_active=True, visibility=visibility, tags=set(data.get("tags", [])), lineage=SkillLineage( origin=origin, generation=0, parent_skill_ids=data.get("parent_skill_ids", []), content_snapshot=snapshot, content_diff=content_diff, ), ) # Sync to DB import asyncio loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(store.save_record(record)) finally: loop.close() return jsonify({ "status": "success", "record_id": meta.skill_id, "skill_id": meta.skill_id, "name": meta.name, "local_path": str(final_dir) }), 201 @app.route(f"{API_PREFIX}/workflows", methods=["GET"]) def list_workflows() -> Any: items = [_build_workflow_summary(path) for path in _discover_workflow_dirs()] return jsonify({"items": items, "count": len(items)}) @app.route(f"{API_PREFIX}/workflows/", methods=["GET"]) def workflow_detail(workflow_id: str) -> Any: workflow_dir = _get_workflow_dir(workflow_id) if not workflow_dir: abort(404, description=f"Unknown workflow: {workflow_id}") session = load_recording_session(str(workflow_dir)) actions = load_agent_actions(str(workflow_dir)) metadata = session.get("metadata") or {} trajectory = session.get("trajectory") or [] plans = session.get("plans") or [] decisions = session.get("decisions") or [] action_stats = analyze_agent_actions(actions) enriched_trajectory = [] for step in trajectory: step_copy = dict(step) screenshot_rel = step_copy.get("screenshot") if screenshot_rel: step_copy["screenshot_url"] = url_for( "workflow_artifact", workflow_id=workflow_id, artifact_path=screenshot_rel, ) enriched_trajectory.append(step_copy) timeline = _build_timeline(actions, enriched_trajectory) artifacts = _build_workflow_artifacts(workflow_dir, workflow_id, metadata) return jsonify( { **_build_workflow_summary(workflow_dir), "metadata": metadata, "statistics": session.get("statistics") or {}, "trajectory": enriched_trajectory, "plans": plans, "decisions": decisions, "agent_actions": actions, "agent_statistics": action_stats, "timeline": timeline, "artifacts": artifacts, } ) @app.route(f"{API_PREFIX}/workflows//artifacts/", methods=["GET"]) def workflow_artifact(workflow_id: str, artifact_path: str) -> Any: workflow_dir = _get_workflow_dir(workflow_id) if not workflow_dir: abort(404, description=f"Unknown workflow: {workflow_id}") target = (workflow_dir / artifact_path).resolve() root = workflow_dir.resolve() if root not in target.parents and target != root: abort(404) if not target.exists() or not target.is_file(): abort(404) return send_from_directory(str(target.parent), target.name) @app.route("/", defaults={"path": ""}) @app.route("/") def serve_frontend(path: str) -> Any: if path.startswith("api/"): abort(404) if FRONTEND_DIST_DIR.exists(): requested = FRONTEND_DIST_DIR / path if path else FRONTEND_DIST_DIR / "index.html" if path and requested.exists() and requested.is_file(): return send_from_directory(str(FRONTEND_DIST_DIR), path) return send_from_directory(str(FRONTEND_DIST_DIR), "index.html") return jsonify( { "message": "OpenSpace dashboard API is running.", "frontend": "Build frontend/ first or run the Vite dev server.", } ) return app def _get_store() -> SkillStore: global _STORE if _STORE is None: _STORE = SkillStore() return _STORE def _bool_arg(name: str, default: bool) -> bool: from flask import request raw = request.args.get(name) if raw is None: return default return raw.lower() not in {"0", "false", "no", "off"} def _int_arg(name: str, default: int) -> int: from flask import request raw = request.args.get(name) if raw is None: return default try: return int(raw) except ValueError: return default def _str_arg(name: str, default: str) -> str: from flask import request return request.args.get(name, default) def _skill_score(record: SkillRecord) -> float: return round(record.effective_rate * 100, 1) def _serialize_skill(record: SkillRecord, *, include_recent_analyses: bool = False) -> Dict[str, Any]: payload = record.to_dict() if not include_recent_analyses: payload.pop("recent_analyses", None) path = payload.get("path", "") lineage = payload.get("lineage") or {} payload.update( { "skill_dir": str(Path(path).parent) if path else "", "origin": lineage.get("origin", ""), "generation": lineage.get("generation", 0), "parent_skill_ids": lineage.get("parent_skill_ids", []), "applied_rate": round(record.applied_rate, 4), "completion_rate": round(record.completion_rate, 4), "effective_rate": round(record.effective_rate, 4), "fallback_rate": round(record.fallback_rate, 4), "score": _skill_score(record), } ) return payload def _naive_dt(dt: datetime) -> datetime: """Strip tzinfo so naive/aware datetimes can be compared safely.""" return dt.replace(tzinfo=None) if dt.tzinfo else dt def _sort_skills(records: Iterable[SkillRecord], *, sort_key: str) -> List[SkillRecord]: if sort_key == "updated": return sorted(records, key=lambda item: _naive_dt(item.last_updated), reverse=True) if sort_key == "name": return sorted(records, key=lambda item: item.name.lower()) return sorted( records, key=lambda item: (_skill_score(item), item.total_selections, _naive_dt(item.last_updated).timestamp()), reverse=True, ) def _build_skill_stats(store: SkillStore, skills: List[SkillRecord]) -> Dict[str, Any]: stats = store.get_stats(active_only=False) avg_score = round(sum(_skill_score(item) for item in skills) / len(skills), 1) if skills else 0.0 skills_with_recent_analysis = sum(1 for item in skills if item.recent_analyses) return { **stats, "average_score": avg_score, "skills_with_activity": sum(1 for item in skills if item.total_selections > 0), "skills_with_recent_analysis": skills_with_recent_analysis, "top_by_effective_rate": [_serialize_skill(item) for item in _sort_skills(skills, sort_key="score")[:5]], } def _load_skill_source(record: SkillRecord) -> Dict[str, Any]: skill_path = Path(record.path) if not skill_path.exists() or not skill_path.is_file(): return {"exists": False, "path": record.path, "content": None} try: return { "exists": True, "path": str(skill_path), "content": skill_path.read_text(encoding="utf-8"), } except OSError: return {"exists": False, "path": str(skill_path), "content": None} def _build_lineage_payload(skill_id: str, store: SkillStore) -> Dict[str, Any]: records = store.load_all(active_only=False) if skill_id not in records: return {"skill_id": skill_id, "nodes": [], "edges": [], "total_nodes": 0} children_by_parent: Dict[str, set[str]] = {} for item in records.values(): for parent_id in item.lineage.parent_skill_ids: children_by_parent.setdefault(parent_id, set()).add(item.skill_id) related_ids = {skill_id} frontier = [skill_id] while frontier: current = frontier.pop() record = records.get(current) if not record: continue for parent_id in record.lineage.parent_skill_ids: if parent_id not in related_ids: related_ids.add(parent_id) frontier.append(parent_id) for child_id in children_by_parent.get(current, set()): if child_id not in related_ids: related_ids.add(child_id) frontier.append(child_id) nodes = [] edges = [] for related_id in sorted(related_ids): record = records.get(related_id) if not record: continue nodes.append( { "skill_id": record.skill_id, "name": record.name, "description": record.description, "origin": record.lineage.origin.value, "generation": record.lineage.generation, "created_at": record.lineage.created_at.isoformat(), "visibility": record.visibility.value, "is_active": record.is_active, "tags": list(record.tags), "score": _skill_score(record), "effective_rate": round(record.effective_rate, 4), "total_selections": record.total_selections, } ) for parent_id in record.lineage.parent_skill_ids: if parent_id in related_ids: edges.append({"source": parent_id, "target": record.skill_id}) return { "skill_id": skill_id, "nodes": nodes, "edges": edges, "total_nodes": len(nodes), } def _discover_workflow_dirs() -> List[Path]: discovered: Dict[str, Path] = {} for root in WORKFLOW_ROOTS: if not root.exists(): continue _scan_workflow_tree(root, discovered) return sorted(discovered.values(), key=lambda item: item.stat().st_mtime, reverse=True) def _scan_workflow_tree(directory: Path, discovered: Dict[str, Path], *, _depth: int = 0, _max_depth: int = 6) -> None: if _depth > _max_depth: return try: children = list(directory.iterdir()) except OSError: return for child in children: if not child.is_dir(): continue if (child / "metadata.json").exists() or (child / "traj.jsonl").exists(): discovered.setdefault(child.name, child) else: _scan_workflow_tree(child, discovered, _depth=_depth + 1, _max_depth=_max_depth) def _get_workflow_dir(workflow_id: str) -> Optional[Path]: for path in _discover_workflow_dirs(): if path.name == workflow_id: return path return None def _build_workflow_summary(workflow_dir: Path) -> Dict[str, Any]: session = load_recording_session(str(workflow_dir)) metadata = session.get("metadata") or {} statistics = session.get("statistics") or {} actions = load_agent_actions(str(workflow_dir)) screenshots_dir = workflow_dir / "screenshots" screenshot_count = len(list(screenshots_dir.glob("*.png"))) if screenshots_dir.exists() else 0 video_candidates = [workflow_dir / "screen_recording.mp4", workflow_dir / "recording.mp4"] video_url = None for candidate in video_candidates: if candidate.exists(): rel = candidate.relative_to(workflow_dir).as_posix() video_url = url_for("workflow_artifact", workflow_id=workflow_dir.name, artifact_path=rel) break outcome = metadata.get("execution_outcome") or {} # Instruction fallback chain: top-level → retrieved_tools.instruction → skill_selection.task instruction = ( metadata.get("instruction") or (metadata.get("retrieved_tools") or {}).get("instruction") or (metadata.get("skill_selection") or {}).get("task") or "" ) # Resolve start/end times with trajectory fallback start_time = metadata.get("start_time") end_time = metadata.get("end_time") trajectory = session.get("trajectory") or [] # If end_time is missing, infer from last trajectory step if not end_time and trajectory: last_ts = trajectory[-1].get("timestamp") if last_ts: end_time = last_ts # Compute execution_time: prefer outcome, fallback to timestamp diff execution_time = outcome.get("execution_time", 0) if not execution_time and start_time and end_time: try: t0 = datetime.fromisoformat(start_time) t1 = datetime.fromisoformat(end_time) execution_time = round((t1 - t0).total_seconds(), 2) except (ValueError, TypeError): pass # Resolve status: prefer outcome, fallback heuristic status = outcome.get("status", "") if not status: total_steps = statistics.get("total_steps", 0) if total_steps > 0: status = "success" elif trajectory: status = "completed" else: status = "unknown" # Resolve iterations: prefer outcome, fallback to conversation count iterations = outcome.get("iterations", 0) if not iterations and trajectory: iterations = len(trajectory) return { "id": workflow_dir.name, "path": str(workflow_dir), "task_id": metadata.get("task_id") or metadata.get("task_name") or workflow_dir.name, "task_name": metadata.get("task_name") or metadata.get("task_id") or workflow_dir.name, "instruction": instruction, "status": status, "iterations": iterations, "execution_time": execution_time, "start_time": start_time, "end_time": end_time, "total_steps": statistics.get("total_steps", 0), "success_count": statistics.get("success_count", 0), "success_rate": statistics.get("success_rate", 0.0), "backend_counts": statistics.get("backends", {}), "tool_counts": statistics.get("tools", {}), "agent_action_count": len(actions), "has_video": bool(video_url), "video_url": video_url, "screenshot_count": screenshot_count, "selected_skills": (metadata.get("skill_selection") or {}).get("selected", []), } def _build_timeline(actions: List[Dict[str, Any]], trajectory: List[Dict[str, Any]]) -> List[Dict[str, Any]]: events: List[Dict[str, Any]] = [] for action in actions: events.append( { "timestamp": action.get("timestamp", ""), "type": "agent_action", "step": action.get("step"), "label": action.get("action_type", "agent_action"), "agent_name": action.get("agent_name", ""), "agent_type": action.get("agent_type", ""), "details": action, } ) for step in trajectory: events.append( { "timestamp": step.get("timestamp", ""), "type": "tool_execution", "step": step.get("step"), "label": step.get("tool", "tool_execution"), "backend": step.get("backend", ""), "status": (step.get("result") or {}).get("status", "unknown"), "details": step, } ) events.sort(key=lambda item: (item.get("timestamp", ""), item.get("step") or 0)) return events def _build_workflow_artifacts(workflow_dir: Path, workflow_id: str, metadata: Dict[str, Any]) -> Dict[str, Any]: screenshots: List[Dict[str, Any]] = [] screenshots_dir = workflow_dir / "screenshots" if screenshots_dir.exists(): for image in sorted(screenshots_dir.glob("*.png")): rel = image.relative_to(workflow_dir).as_posix() screenshots.append( { "name": image.name, "path": rel, "url": url_for("workflow_artifact", workflow_id=workflow_id, artifact_path=rel), } ) init_screenshot = metadata.get("init_screenshot") init_screenshot_url = ( url_for("workflow_artifact", workflow_id=workflow_id, artifact_path=init_screenshot) if isinstance(init_screenshot, str) else None ) video_url = None for rel in ("screen_recording.mp4", "recording.mp4"): candidate = workflow_dir / rel if candidate.exists(): video_url = url_for("workflow_artifact", workflow_id=workflow_id, artifact_path=rel) break return { "init_screenshot_url": init_screenshot_url, "screenshots": screenshots, "video_url": video_url, } def main() -> None: parser = argparse.ArgumentParser(description="OpenSpace dashboard API server") parser.add_argument("--host", default="127.0.0.1", help="Dashboard API host") parser.add_argument("--port", type=int, default=7788, help="Dashboard API port") parser.add_argument("--debug", action="store_true", help="Enable Flask debug mode") args = parser.parse_args() app = create_app() from werkzeug.serving import run_simple run_simple( args.host, args.port, app, threaded=True, use_debugger=args.debug, use_reloader=args.debug, ) if __name__ == "__main__": main()