import { Router, type Response as ExpressResponse, type Request } from "express"; import { db, videosTable, usersTable, configTable, creditTransactionsTable } from "@workspace/db"; import { desc, eq, and, or, sql } from "drizzle-orm"; import { randomUUID } from "crypto"; import { getValidBearerToken, refreshAccessToken, getPoolToken, tryRefreshPoolAccount, } from "./config"; import { getTurnstileToken, invalidateTurnstileToken } from "../captcha"; import { generateGuardId } from "../guardId"; import { optionalJwtAuth } from "./auth"; import { downloadAndStoreVideo, streamStoredVideo, isStorageReady } from "../lib/videoStorage"; const router = Router(); const GEMINIGEN_BASE = "https://api.geminigen.ai"; const GROK_ENDPOINT = `${GEMINIGEN_BASE}/api/video-gen/grok-stream`; const VEO_ENDPOINT = `${GEMINIGEN_BASE}/api/video-gen/veo`; const USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"; // ── In-memory task store ───────────────────────────────────────────────────── // Tasks live here between POST /generate (which creates them) and the // SSE /progress/:taskId endpoint which a client subscribes to. type TaskStatus = "pending" | "complete" | "failed"; interface ProgressEvent { type: "start" | "progress" | "complete" | "error"; message?: string; status?: number; uuid?: string; video?: unknown; errorCode?: string; } interface Task { status: TaskStatus; createdAt: number; buffered: ProgressEvent[]; // events buffered before client connects clients: Set<(e: ProgressEvent) => void>; // active SSE listeners } const tasks = new Map(); // Clean up tasks older than 30 min setInterval(() => { const cutoff = Date.now() - 30 * 60 * 1000; for (const [id, t] of tasks) { if (t.createdAt < cutoff) tasks.delete(id); } }, 5 * 60 * 1000); function createTask(): string { const id = randomUUID(); tasks.set(id, { status: "pending", createdAt: Date.now(), buffered: [], clients: new Set() }); return id; } function broadcast(task: Task, event: ProgressEvent): void { if (task.status === "pending") task.buffered.push(event); for (const client of task.clients) { try { client(event); } catch { /* ignore broken pipe */ } } } function finishTask(task: Task, status: TaskStatus, final: ProgressEvent): void { task.status = status; task.buffered.push(final); for (const client of task.clients) { try { client(final); } catch {} } task.clients.clear(); } // ── Video generation options ────────────────────────────────────────────────── export type VideoModel = "grok-3" | "veo-3-fast"; export interface VideoGenOptions { model: VideoModel; prompt: string; negativePrompt?: string; aspectRatio: "16:9" | "9:16" | "1:1" | "3:4" | "4:3"; resolution: "480p" | "720p" | "1080p"; duration: 5 | 6 | 8 | 10; enhancePrompt: boolean; refImageBase64?: string; refImageMime?: string; } // ── Model constraints ───────────────────────────────────────────────────────── // Grok-3 limits (API confirmed) const GROK3_MAX_DURATION = 6; const GROK3_MAX_RESOLUTION = "720p"; // Veo 3.1 Fast limits (API confirmed) // duration: 8s only, aspect ratios: 16:9 / 9:16 only const VEO_VALID_ASPECT_RATIOS = new Set(["16:9", "9:16"]); const VEO_DURATION = 8 as const; // Grok-3 aspect ratio mapping: raw user ratio → API keyword const GROK_ASPECT_RATIO_MAP: Record = { "16:9": "landscape", "4:3": "landscape", "3:2": "3:2", "9:16": "portrait", "3:4": "portrait", "2:3": "2:3", "1:1": "square", }; const VALID_ASPECT_RATIOS = new Set(["16:9", "9:16", "1:1", "3:4", "4:3"]); const VALID_RESOLUTIONS = new Set(["480p", "720p", "1080p"]); const VALID_DURATIONS = new Set([5, 6, 8, 10]); const RESOLUTION_RANK: Record = { "480p": 0, "720p": 1, "1080p": 2 }; export function parseVideoOptions(body: Record, prompt: string): VideoGenOptions { const model: VideoModel = body.model === "veo-3-fast" ? "veo-3-fast" : "grok-3"; let aspectRatio = VALID_ASPECT_RATIOS.has(body.aspectRatio as string) ? (body.aspectRatio as VideoGenOptions["aspectRatio"]) : "16:9"; let resolution = VALID_RESOLUTIONS.has(body.resolution as string) ? (body.resolution as VideoGenOptions["resolution"]) : "480p"; let duration = VALID_DURATIONS.has(Number(body.duration)) ? (Number(body.duration) as VideoGenOptions["duration"]) : 6; if (model === "grok-3") { // Clamp resolution to max 720p if (RESOLUTION_RANK[resolution] > RESOLUTION_RANK[GROK3_MAX_RESOLUTION]) { resolution = GROK3_MAX_RESOLUTION; } // Clamp duration to max 6s if (duration > GROK3_MAX_DURATION) { duration = GROK3_MAX_DURATION as VideoGenOptions["duration"]; } } else if (model === "veo-3-fast") { // Force duration to 8s duration = VEO_DURATION; // Restrict to 16:9 / 9:16 if (!VEO_VALID_ASPECT_RATIOS.has(aspectRatio)) { aspectRatio = "16:9"; } } const negativePrompt = typeof body.negativePrompt === "string" && body.negativePrompt.trim() ? body.negativePrompt.trim() : undefined; const enhancePrompt = body.enhancePrompt !== false; return { model, prompt, negativePrompt, aspectRatio, resolution, duration, enhancePrompt }; } // ── Helper: build FormData ─────────────────────────────────────────────────── function base64ToBlob(base64: string, mime: string): Blob { const binary = Buffer.from(base64, "base64"); return new Blob([binary], { type: mime }); } // ── Grok-3 form builder ─────────────────────────────────────────────────────── function buildGrokForm(turnstileToken: string, opts: VideoGenOptions): FormData { const form = new FormData(); form.append("prompt", opts.prompt); form.append("model", "grok-3"); form.append("model_name", "grok-3"); // Aspect ratio: API expects 'landscape'|'portrait'|'square'|'3:2'|'2:3' const apiAspectRatio = GROK_ASPECT_RATIO_MAP[opts.aspectRatio] ?? "landscape"; form.append("aspect_ratio", apiAspectRatio); form.append("resolution", opts.resolution); form.append("duration", opts.duration.toString()); form.append("num_result", "1"); form.append("enhance_prompt", opts.enhancePrompt ? "true" : "false"); form.append("turnstile_token", turnstileToken); if (opts.negativePrompt) form.append("negative_prompt", opts.negativePrompt); if (opts.refImageBase64 && opts.refImageMime) { // Presence of 'files' tells the API this is image-to-video. // Do NOT set mode='image_to_video' — the 'mode' field is a creativity level // (normal | custom | extremely-crazy | extremely-spicy-or-crazy). form.append("files", base64ToBlob(opts.refImageBase64, opts.refImageMime), "reference.jpg"); form.append("mode", "custom"); } return form; } // ── Veo form builder ────────────────────────────────────────────────────────── function buildVeoForm(turnstileToken: string, opts: VideoGenOptions): FormData { const form = new FormData(); form.append("prompt", opts.prompt); form.append("model", "veo-3-fast"); // Veo uses raw ratio strings directly ('16:9' or '9:16') form.append("aspect_ratio", opts.aspectRatio); form.append("duration", opts.duration.toString()); form.append("enhance_prompt", opts.enhancePrompt ? "true" : "false"); form.append("turnstile_token", turnstileToken); if (opts.negativePrompt) form.append("negative_prompt", opts.negativePrompt); if (opts.refImageBase64 && opts.refImageMime) { form.append("ref_images", base64ToBlob(opts.refImageBase64, opts.refImageMime), "reference.jpg"); form.append("mode_image", "image_to_video"); } return form; } // ── Grok-3 SSE endpoint ─────────────────────────────────────────────────────── async function callGrokEndpoint( bearerToken: string, turnstileToken: string, opts: VideoGenOptions, ): Promise { const guardId = generateGuardId("/api/video-gen/grok-stream", "post"); return fetch(GROK_ENDPOINT, { method: "POST", headers: { Authorization: `Bearer ${bearerToken}`, "x-guard-id": guardId, "User-Agent": USER_AGENT, Accept: "text/event-stream, application/json", }, body: buildGrokForm(turnstileToken, opts), }); } // ── Veo POST endpoint (returns {uuid}, then poll) ───────────────────────────── async function callVeoEndpoint( bearerToken: string, turnstileToken: string, opts: VideoGenOptions, ): Promise<{ uuid: string }> { const guardId = generateGuardId("/api/video-gen/veo", "post"); const resp = await fetch(VEO_ENDPOINT, { method: "POST", headers: { Authorization: `Bearer ${bearerToken}`, "x-guard-id": guardId, "User-Agent": USER_AGENT, Accept: "application/json", }, body: buildVeoForm(turnstileToken, opts), }); if (!resp.ok) { const raw = await resp.text().catch(() => ""); const { code, msg } = parseErrBody(raw); throw Object.assign(new Error(msg || `HTTP ${resp.status}`), { code, status: resp.status, raw }); } const data = await resp.json() as { uuid?: string; history_uuid?: string; task_id?: string; id?: string }; console.log("[veo-submit] response:", JSON.stringify(data).slice(0, 300)); const uuid = data.uuid || data.history_uuid || data.task_id || data.id; if (!uuid) throw new Error(`Veo API did not return a UUID. Response: ${JSON.stringify(data).slice(0, 200)}`); return { uuid }; } // ── One-shot history fetch: get the R2 pre-signed URL after generation ──────── // geminigen.ai stores completed videos to Cloudflare R2 and returns a 7-day // pre-signed URL in generated_video[0].video_url. This works without any // additional auth, unlike the intermediate assets.grok.com CDN URL. async function fetchHistoryVideoUrl( uuid: string, bearerToken: string, ): Promise<{ videoUrl: string; thumbnailUrl: string | null } | null> { try { const guardId = generateGuardId("/api/history/" + uuid, "get"); const resp = await fetch(`${GEMINIGEN_BASE}/api/history/${uuid}`, { headers: { Authorization: `Bearer ${bearerToken}`, "x-guard-id": guardId, "User-Agent": USER_AGENT, Accept: "application/json", }, }); if (!resp.ok) { console.warn("[history-fetch] HTTP", resp.status, "for uuid", uuid); return null; } const data = await resp.json() as { status?: number; generated_video?: Array<{ video_url?: string; thumbnail_url?: string }>; }; const vid = data.generated_video?.[0]; const videoUrl = vid?.video_url ?? null; const thumbnailUrl = vid?.thumbnail_url ?? null; if (videoUrl) { console.log("[history-fetch] got R2 URL:", videoUrl.slice(0, 120)); return { videoUrl, thumbnailUrl }; } return null; } catch (err) { console.warn("[history-fetch] error:", err instanceof Error ? err.message : err); return null; } } // ── Veo polling: GET /api/history/{uuid} until status 2/3 ──────────────────── // Veo generation typically takes 2–8 minutes on geminigen.ai; we poll up to // 12 minutes with a 20s interval (first check after 15s to catch fast results). async function pollVeoHistory( uuid: string, bearerToken: string, onProgress: (msg: string) => void, maxWaitMs = 720_000, // 12 minutes ): Promise<{ videoUrl: string; thumbnailUrl: string | null } | null> { const start = Date.now(); let attempt = 0; let waitMs = 15_000; // first check sooner; subsequent checks every 20s while (Date.now() - start < maxWaitMs) { await new Promise((r) => setTimeout(r, waitMs)); waitMs = 20_000; // steady-state interval attempt++; const elapsed = Math.round((Date.now() - start) / 1000); onProgress(`Veo 生成中,第 ${attempt} 次確認... (已等待 ${elapsed}s)`); const guardId = generateGuardId("/api/history/" + uuid, "get"); let resp: Response; try { resp = await fetch(`${GEMINIGEN_BASE}/api/history/${uuid}`, { headers: { Authorization: `Bearer ${bearerToken}`, "x-guard-id": guardId, "User-Agent": USER_AGENT, Accept: "application/json", }, signal: AbortSignal.timeout(15_000), }); } catch (fetchErr) { console.warn("[veo-poll] fetch error on attempt", attempt, ":", fetchErr instanceof Error ? fetchErr.message : fetchErr); continue; } if (!resp.ok) { console.warn("[veo-poll] history HTTP", resp.status, "for uuid", uuid, "attempt", attempt); continue; } let data: { status?: number; generated_video?: Array<{ video_url?: string; thumbnail_url?: string; media_url?: string }>; video_url?: string; }; try { data = await resp.json(); } catch { console.warn("[veo-poll] JSON parse failed on attempt", attempt); continue; } console.log("[veo-poll] attempt", attempt, "status=", data.status, "has_video=", !!(data.generated_video?.[0]?.video_url || data.video_url)); if (data.status === 2) { const vid = data.generated_video?.[0]; // Try multiple possible URL fields const videoUrl = vid?.video_url || vid?.media_url || data.video_url || null; const thumbnailUrl = vid?.thumbnail_url ?? null; if (videoUrl) { console.log("[veo-poll] completed with video URL:", videoUrl.slice(0, 120)); return { videoUrl, thumbnailUrl }; } // Status 2 but no URL — wait a bit and try once more console.warn("[veo-poll] status=2 but no video URL, retrying in 10s..."); await new Promise((r) => setTimeout(r, 10_000)); const guardId2 = generateGuardId("/api/history/" + uuid, "get"); const resp2 = await fetch(`${GEMINIGEN_BASE}/api/history/${uuid}`, { headers: { Authorization: `Bearer ${bearerToken}`, "x-guard-id": guardId2, "User-Agent": USER_AGENT, Accept: "application/json" }, }).catch(() => null); if (resp2?.ok) { const data2 = await resp2.json().catch(() => ({})) as typeof data; const vid2 = data2.generated_video?.[0]; const videoUrl2 = vid2?.video_url || vid2?.media_url || data2.video_url || null; if (videoUrl2) return { videoUrl: videoUrl2, thumbnailUrl: vid2?.thumbnail_url ?? null }; } return null; // complete but no URL } if (data.status === 3) { console.warn("[veo-poll] status=3 (failed) for uuid", uuid); return null; // failed } // status 1 = still generating; any other status: continue polling } console.warn("[veo-poll] timed out after", maxWaitMs / 1000, "s for uuid", uuid); return null; // timeout } // ── Helper: parse error body ───────────────────────────────────────────────── function parseErrBody(text: string): { code: string; msg: string } { try { const d = JSON.parse(text) as { detail?: { error_code?: string; error_message?: string } }; return { code: d?.detail?.error_code || "", msg: (d?.detail?.error_message || "").toLowerCase(), }; } catch { return { code: "", msg: text.toLowerCase() }; } } // ── SSE stream reader ───────────────────────────────────────────────────────── interface StreamResult { videoUrl: string | null; thumbnailUrl: string | null; uuid: string | null; lastEvent: unknown; errorCode: string | null; errorMsg: string | null; } /** * Resolve a potentially-relative video URL to an absolute URL. * * geminigen.ai's SSE stream returns a field like: * "videoUrl": "users/{userId}/generated/{videoId}/gen_xxx.mp4?signed=..." * which needs to be prefixed with the R2 CDN base, or it may already be a * full https:// signed URL (if the JSON was not truncated in our logs). * * NOTE: confirmed from production logs — video files are served from * https://assets.grok.com/ NOT from https://api.geminigen.ai/ */ const VIDEO_CDN_BASE = "https://assets.grok.com/"; function resolveVideoUrl(rawUrl: string): string { if (!rawUrl) return rawUrl; if (rawUrl.startsWith("http://") || rawUrl.startsWith("https://")) return rawUrl; // Relative path → prepend CDN base return VIDEO_CDN_BASE + rawUrl; } /** * Extract the geminigen.ai streaming video generation response from a parsed * SSE event. The structure (confirmed from production logs) is: * * parsed.data.result.response.streamingVideoGenerationResponse * .progress : 0–100 * .videoUrl : relative or absolute URL (only when progress === 100) */ function extractSVGR(parsed: Record): Record | null { try { const data = parsed.data as Record | undefined; const result = data?.result as Record | undefined; const response = result?.response as Record | undefined; const svgr = response?.streamingVideoGenerationResponse as Record | undefined; return svgr ?? null; } catch { return null; } } async function readVideoStream( resp: Response, onEvent: (parsed: Record) => void, maxWaitMs = 300_000, ): Promise { const decoder = new TextDecoder(); const reader = resp.body?.getReader(); const empty: StreamResult = { videoUrl: null, thumbnailUrl: null, uuid: null, lastEvent: null, errorCode: null, errorMsg: null, }; if (!reader) return empty; let videoUrl: string | null = null; let thumbnailUrl: string | null = null; let uuid: string | null = null; let lastEvent: unknown = null; let errorCode: string | null = null; let errorMsg: string | null = null; let buffer = ""; let historyUuid: string | null = null; const deadline = Date.now() + maxWaitMs; try { while (Date.now() < deadline) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split("\n"); buffer = lines.pop() ?? ""; for (const line of lines) { const trimmed = line.trim(); if (!trimmed || trimmed === "data: [DONE]") continue; const jsonStr = trimmed.startsWith("data:") ? trimmed.slice(5).trim() : trimmed; let parsed: Record; try { parsed = JSON.parse(jsonStr); } catch { continue; } lastEvent = parsed; // Log full event (no truncation — we need the complete video URL) console.log("[video-stream]", JSON.stringify(parsed)); // ── Error detection ─────────────────────────────────────────────────── // Pattern A: {"detail":{"error_code":"..."}} const detail = parsed.detail as Record | undefined; if (detail?.error_code) { errorCode = detail.error_code; errorMsg = detail.error_message || detail.error_code; console.error(`[video-stream] error: ${errorCode} — ${errorMsg}`); return { videoUrl: null, thumbnailUrl: null, uuid, lastEvent, errorCode, errorMsg }; } // Pattern B: top-level error_code if (typeof parsed.error_code === "string" && parsed.error_code) { errorCode = parsed.error_code; errorMsg = (parsed.error_message as string) || parsed.error_code; return { videoUrl: null, thumbnailUrl: null, uuid, lastEvent, errorCode, errorMsg }; } // Pattern C: top-level error string (no status field) if (typeof parsed.error === "string" && parsed.error && typeof parsed.status === "undefined") { errorCode = "STREAM_ERROR"; errorMsg = parsed.error; return { videoUrl: null, thumbnailUrl: null, uuid, lastEvent, errorCode, errorMsg }; } // ── Initial event: {"success":true,"message":"Video generation started", // "history_id":...,"history_uuid":"...","status":1} ── if (parsed.success === true && typeof parsed.history_uuid === "string") { historyUuid = parsed.history_uuid; uuid = historyUuid; if (parsed.message === "Video generation started") { onEvent({ type: "started", historyUuid }); continue; } } // ── Completion signal: {"success":true,"message":"Video generation complete..."} ── if (parsed.success === true && typeof parsed.message === "string" && parsed.message.includes("Video generation complete")) { console.log("[video-stream] generation complete signal received"); return { videoUrl, thumbnailUrl, uuid, lastEvent, errorCode: null, errorMsg: null }; } // ── HD URL event: {"success":true,"data":{"video_id":"...","hdMediaUrl":"https://assets.grok.com/..."}} ── // Prefer this over the relative videoUrl from SVGR since it's already absolute and higher quality. { const d = parsed.data as Record | undefined; if (d && typeof d.hdMediaUrl === "string" && d.hdMediaUrl.startsWith("https://")) { console.log("[video-url] using hdMediaUrl:", d.hdMediaUrl.slice(0, 120)); videoUrl = d.hdMediaUrl; } } // ── Progress events: data.result.response.streamingVideoGenerationResponse ── const svgr = extractSVGR(parsed); if (svgr) { const progress = typeof svgr.progress === "number" ? svgr.progress : null; const rawUrl = typeof svgr.videoUrl === "string" ? svgr.videoUrl : null; const videoId = typeof svgr.videoId === "string" ? svgr.videoId : undefined; if (rawUrl) { videoUrl = resolveVideoUrl(rawUrl); try { const u = new URL(videoUrl); console.log("[video-url] origin:", u.origin); console.log("[video-url] path:", u.pathname); const qs = u.search; for (let i = 0; i < qs.length; i += 200) { console.log("[video-url] qs[" + Math.floor(i / 200) + "]:", qs.slice(i, i + 200)); } console.log("[video-url] total_length:", videoUrl.length, "| raw_length:", rawUrl.length); } catch { console.log("[video-url] raw (non-URL):", rawUrl.slice(0, 200)); } } // Extract thumbnail URL (only present at progress=100) const rawThumb = typeof svgr.thumbnailImageUrl === "string" ? svgr.thumbnailImageUrl : null; if (rawThumb) thumbnailUrl = resolveVideoUrl(rawThumb); onEvent({ type: "progress", progress, videoId, videoUrl }); if (progress === 100 && videoUrl) { console.log("[video-stream] progress=100, video ready"); // Don't return yet — wait for the "Video generation complete" signal // which ensures the backend has finished processing. // But if no more events come, videoUrl is set so we'll return at stream end. } continue; } // ── Legacy format fallback ───────────────────────────────────────────── // In case geminigen.ai changes format back, still check old fields if (!uuid && typeof parsed.uuid === "string") uuid = parsed.uuid; const legacyCandidate = (parsed.video_url as string | undefined) || (parsed.generated_video as Array<{ video_url?: string }> | undefined)?.[0]?.video_url || null; if (legacyCandidate) videoUrl = resolveVideoUrl(legacyCandidate); if (typeof parsed.status === "number" && (parsed.status === 2 || parsed.status === 3)) { return { videoUrl, thumbnailUrl, uuid, lastEvent, errorCode: null, errorMsg: null }; } if (typeof parsed.status === "number" && parsed.status > 3) { errorCode = "GEN_FAILED"; errorMsg = (parsed.message as string) || `status=${parsed.status}`; return { videoUrl: null, thumbnailUrl: null, uuid, lastEvent, errorCode, errorMsg }; } } } } finally { reader.cancel().catch(() => {}); } // Stream ended naturally — if we have a videoUrl it's a success return { videoUrl, thumbnailUrl, uuid, lastEvent, errorCode, errorMsg }; } // ── Shared token helpers ────────────────────────────────────────────────────── function makeTokenHelpers() { const failedPoolIds: number[] = []; let currentAccountId: number | null = null; async function pickToken(): Promise { const poolEntry = await getPoolToken(failedPoolIds); if (poolEntry) { currentAccountId = poolEntry.accountId; return poolEntry.token; } currentAccountId = null; return getValidBearerToken(); } async function handleTokenExpiry(): Promise { if (currentAccountId !== null) { const refreshed = await tryRefreshPoolAccount(currentAccountId); if (refreshed) return refreshed; failedPoolIds.push(currentAccountId); const next = await getPoolToken(failedPoolIds); if (next) { currentAccountId = next.accountId; return next.token; } } return refreshAccessToken(); } return { pickToken, handleTokenExpiry }; } // ── Grok-3 background task runner (SSE streaming) ──────────────────────────── async function runGrokTask( taskId: string, opts: VideoGenOptions, isPrivate: boolean, userId: number | null, ): Promise { const task = tasks.get(taskId); if (!task) return; const { pickToken, handleTokenExpiry } = makeTokenHelpers(); try { broadcast(task, { type: "start", message: "正在取得 Turnstile 驗證碼..." }); let turnstileToken = await getTurnstileToken(); broadcast(task, { type: "start", message: "正在取得 Bearer Token..." }); let token = await pickToken(); if (!token) { finishTask(task, "failed", { type: "error", errorCode: "NO_TOKEN", message: "未設定 API Token,請到管理後台設定" }); return; } broadcast(task, { type: "start", message: "正在連接 geminigen.ai (Grok-3)..." }); let resp = await callGrokEndpoint(token, turnstileToken, opts); // ── Handle HTTP-level errors ───────────────────────────────────────────── if (!resp.ok) { const rawText = await resp.text().catch(() => ""); const { code, msg } = parseErrBody(rawText); const isCaptcha = msg.includes("captcha") || msg.includes("turnstile") || code === "CAPTCHA_ERROR" || code === "INVALID_CAPTCHA"; if (isCaptcha) { broadcast(task, { type: "start", message: "Turnstile 拒絕,正在重新取得..." }); invalidateTurnstileToken(); turnstileToken = await getTurnstileToken(); resp = await callGrokEndpoint(token, turnstileToken, opts); } else { const isExpired = resp.status === 401 || resp.status === 403 || code === "TOKEN_EXPIRED" || code === "INVALID_CREDENTIALS" || msg.includes("token") || msg.includes("expired") || msg.includes("credential"); if (isExpired) { broadcast(task, { type: "start", message: "Token 過期,正在刷新..." }); const newToken = await handleTokenExpiry(); if (!newToken) { finishTask(task, "failed", { type: "error", errorCode: "TOKEN_EXPIRED", message: "Token 已過期且無法自動刷新,請到管理後台更新 Token", }); return; } token = newToken; resp = await callGrokEndpoint(token, turnstileToken, opts); } } if (!resp.ok) { const raw2 = await resp.text().catch(() => ""); const { code: c2, msg: m2 } = parseErrBody(raw2); finishTask(task, "failed", { type: "error", errorCode: c2 || "API_ERROR", message: m2 || `HTTP ${resp.status}`, }); return; } } broadcast(task, { type: "start", message: "AI 正在生成影片,這可能需要 1–5 分鐘..." }); // ── Read SSE stream ────────────────────────────────────────────────────── const makeProgressHandler = (label: string) => (parsed: Record) => { // New format: { type: "progress", progress: 0-100 } if (parsed.type === "progress") { const pct = typeof parsed.progress === "number" ? parsed.progress : null; const pctStr = pct !== null ? ` (${pct}%)` : ""; broadcast(task, { type: "progress", progress: pct, message: `${label}${pctStr}` }); } else if (parsed.type === "started") { broadcast(task, { type: "progress", message: "AI 開始生成..." }); } else { // Legacy / fallback const u = typeof parsed.uuid === "string" ? parsed.uuid : undefined; broadcast(task, { type: "progress", uuid: u, message: label }); } }; let streamResult = await readVideoStream(resp, makeProgressHandler("AI 生成中")); // ── Retry logic for stream-level errors ────────────────────────────────── if (!streamResult.videoUrl && streamResult.errorCode) { const ec = streamResult.errorCode; const em = (streamResult.errorMsg || "").toLowerCase(); const isStreamCaptcha = ec === "CAPTCHA_ERROR" || ec === "INVALID_CAPTCHA" || em.includes("captcha") || em.includes("turnstile"); const isStreamToken = ec === "TOKEN_EXPIRED" || ec === "INVALID_CREDENTIALS" || ec === "UNAUTHORIZED" || em.includes("token") || em.includes("expired") || em.includes("credentials") || em.includes("invalid credential"); if (isStreamCaptcha) { broadcast(task, { type: "start", message: "Turnstile 在串流中拒絕,重新取得並重試..." }); invalidateTurnstileToken(); turnstileToken = await getTurnstileToken(); const retryResp = await callGrokEndpoint(token, turnstileToken, opts); if (retryResp.ok) { streamResult = await readVideoStream(retryResp, makeProgressHandler("AI 生成中(重試)")); } } else if (isStreamToken) { broadcast(task, { type: "start", message: `Token 錯誤 (${ec}),重新刷新並重試...` }); const newToken = await handleTokenExpiry(); if (!newToken) { finishTask(task, "failed", { type: "error", errorCode: "TOKEN_EXPIRED", message: "Token 已過期且無法自動刷新,請到管理後台更新 Token", }); return; } token = newToken; const retryResp = await callGrokEndpoint(token, turnstileToken, opts); if (retryResp.ok) { streamResult = await readVideoStream(retryResp, makeProgressHandler("AI 生成中(重試)")); } } } let { videoUrl, thumbnailUrl, uuid, errorCode: finalCode, errorMsg: finalMsg } = streamResult; if (!videoUrl && !uuid) { const msg = finalMsg ? `生成失敗:${finalMsg}` : "未取得影片 URL,生成可能已失敗或超時"; finishTask(task, "failed", { type: "error", errorCode: finalCode || "NO_VIDEO", message: msg }); return; } // ── Step 2: Fetch R2 pre-signed URL from history API ───────────────────── // The SSE stream gives us an assets.grok.com URL (requires auth + Cloudflare cookies). // The history API returns a Cloudflare R2 pre-signed URL (7-day, publicly accessible). // Always try to get the R2 URL regardless of whether SSE already gave us a URL. if (uuid) { broadcast(task, { type: "progress", progress: 100, message: "正在取得影片下載連結..." }); const historyResult = await fetchHistoryVideoUrl(uuid, token); if (historyResult?.videoUrl) { videoUrl = historyResult.videoUrl; if (historyResult.thumbnailUrl) thumbnailUrl = historyResult.thumbnailUrl; console.log("[grok-task] using R2 URL from history API"); } } if (!videoUrl) { const msg = finalMsg ? `生成失敗:${finalMsg}` : "未取得影片 URL,生成可能已失敗或超時"; finishTask(task, "failed", { type: "error", errorCode: finalCode || "NO_VIDEO", message: msg }); return; } // Store the URL (R2 pre-signed if available, falls back to assets.grok.com). const [saved] = await db .insert(videosTable) .values({ videoUrl, thumbnailUrl: thumbnailUrl ?? null, prompt: opts.prompt, negativePrompt: opts.negativePrompt ?? null, model: opts.model, aspectRatio: opts.aspectRatio, resolution: opts.resolution, duration: opts.duration, hasRefImage: !!(opts.refImageBase64 && opts.refImageMime), isPrivate, userId, }) .returning(); finishTask(task, "complete", { type: "complete", video: saved, uuid: uuid ?? undefined }); // ── Background: download video to S3 for permanent storage ─────────────── // R2 pre-signed URLs expire in 7 days; S3 copy is permanent. if (isStorageReady()) { (async () => { try { const storedPath = await downloadAndStoreVideo(videoUrl!, token); if (storedPath) { await db.update(videosTable).set({ videoUrl: storedPath }).where(eq(videosTable.id, saved.id)); console.log(`[grok-task] video cached in storage: ${storedPath}`); } } catch (e) { console.warn("[grok-task] background storage failed:", e instanceof Error ? e.message : e); } })(); } } catch (err: unknown) { const msg = err instanceof Error ? err.message : String(err); console.error("[grok-task] unexpected error:", msg); finishTask(task, "failed", { type: "error", errorCode: "INTERNAL_ERROR", message: msg }); } } // ── Veo 3.1 Fast background task runner (POST + polling) ───────────────────── async function runVeoTask( taskId: string, opts: VideoGenOptions, isPrivate: boolean, userId: number | null, ): Promise { const task = tasks.get(taskId); if (!task) return; const { pickToken, handleTokenExpiry } = makeTokenHelpers(); try { broadcast(task, { type: "start", message: "正在取得 Turnstile 驗證碼..." }); let turnstileToken = await getTurnstileToken(); broadcast(task, { type: "start", message: "正在取得 Bearer Token..." }); let token = await pickToken(); if (!token) { finishTask(task, "failed", { type: "error", errorCode: "NO_TOKEN", message: "未設定 API Token,請到管理後台設定" }); return; } broadcast(task, { type: "start", message: "正在提交 Veo 3.1 Fast 任務..." }); // Submit to Veo endpoint (returns UUID immediately) let veoResult: { uuid: string }; try { veoResult = await callVeoEndpoint(token, turnstileToken, opts); } catch (err: unknown) { const e = err as Error & { code?: string; status?: number }; const code = e.code ?? ""; const msg = (e.message ?? "").toLowerCase(); const isCaptcha = msg.includes("captcha") || msg.includes("turnstile") || code === "CAPTCHA_ERROR" || code === "INVALID_CAPTCHA"; const isExpired = e.status === 401 || e.status === 403 || code === "TOKEN_EXPIRED" || msg.includes("token") || msg.includes("expired"); if (isCaptcha) { broadcast(task, { type: "start", message: "Turnstile 拒絕,重新取得並重試..." }); invalidateTurnstileToken(); turnstileToken = await getTurnstileToken(); veoResult = await callVeoEndpoint(token, turnstileToken, opts); } else if (isExpired) { broadcast(task, { type: "start", message: "Token 過期,正在刷新..." }); const newToken = await handleTokenExpiry(); if (!newToken) { finishTask(task, "failed", { type: "error", errorCode: "TOKEN_EXPIRED", message: "Token 過期且無法自動刷新,請到管理後台更新 Token" }); return; } token = newToken; veoResult = await callVeoEndpoint(token, turnstileToken, opts); } else { throw err; } } const { uuid } = veoResult; broadcast(task, { type: "progress", progress: 5, message: `Veo 任務已提交 (UUID: ${uuid.slice(0, 8)}...),等待生成(每 30 秒確認一次)...`, }); // Poll until done (max 5 minutes, every 30s) const pollResult = await pollVeoHistory( uuid, token, (msg) => broadcast(task, { type: "progress", progress: null, message: msg }), ); if (!pollResult || !pollResult.videoUrl) { finishTask(task, "failed", { type: "error", errorCode: "NO_VIDEO", message: "Veo 未返回影片 URL,可能生成失敗或超時" }); return; } const { videoUrl, thumbnailUrl } = pollResult; const absVideoUrl = resolveVideoUrl(videoUrl); const absThumbUrl = thumbnailUrl ? resolveVideoUrl(thumbnailUrl) : null; // Store raw CDN URL immediately; background GCS download follows. const [saved] = await db .insert(videosTable) .values({ videoUrl: absVideoUrl, thumbnailUrl: absThumbUrl ?? null, prompt: opts.prompt, negativePrompt: opts.negativePrompt ?? null, model: opts.model, aspectRatio: opts.aspectRatio, resolution: opts.resolution, duration: opts.duration, hasRefImage: !!(opts.refImageBase64 && opts.refImageMime), isPrivate, userId, }) .returning(); finishTask(task, "complete", { type: "complete", video: saved, uuid }); // ── Background: download video to GCS ──────────────────────────────────── if (isStorageReady()) { (async () => { try { const storedPath = await downloadAndStoreVideo(absVideoUrl, token); if (storedPath) { await db.update(videosTable).set({ videoUrl: storedPath }).where(eq(videosTable.id, saved.id)); console.log(`[veo-task] video cached in storage: ${storedPath}`); } } catch (e) { console.warn("[veo-task] background storage failed:", e instanceof Error ? e.message : e); } })(); } } catch (err: unknown) { const msg = err instanceof Error ? err.message : String(err); console.error("[veo-task] unexpected error:", msg); finishTask(task, "failed", { type: "error", errorCode: "INTERNAL_ERROR", message: msg }); } } // ── Credit helpers ───────────────────────────────────────────────────────────── async function getVideoConfigVal(key: string): Promise { const rows = await db.select({ value: configTable.value }).from(configTable).where(eq(configTable.key, key)).limit(1); return rows[0]?.value ?? null; } async function checkAndDeductVideoCredits(userId: number, cost: number, description: string): Promise<{ ok: boolean; balance?: number }> { const enabled = await getVideoConfigVal("enable_credits"); if (enabled !== "true") return { ok: true }; const [user] = await db.select({ credits: usersTable.credits }).from(usersTable).where(eq(usersTable.id, userId)).limit(1); if (!user) return { ok: false }; if (user.credits < cost) return { ok: false, balance: user.credits }; const [updated] = await db .update(usersTable) .set({ credits: sql`${usersTable.credits} - ${cost}` }) .where(eq(usersTable.id, userId)) .returning({ credits: usersTable.credits }); await db.insert(creditTransactionsTable).values({ userId, amount: -cost, type: "spend", description }); return { ok: true, balance: updated.credits }; } // ── POST /api/videos/generate ───────────────────────────────────────────────── // Returns taskId immediately; generation runs in background. router.post("/generate", optionalJwtAuth, async (req, res) => { const body = req.body as Record; const prompt = typeof body.prompt === "string" ? body.prompt.trim() : ""; if (!prompt || prompt.length < 1 || prompt.length > 2000) { return res.status(400).json({ error: "INVALID_BODY", message: "prompt is required (1-2000 chars)" }); } const isPrivate = body.isPrivate === true; // Parse video options (aspectRatio, resolution, duration, negativePrompt, enhancePrompt) const opts = parseVideoOptions(body, prompt); // Attach reference image if provided const refImageBase64 = typeof body.referenceImageBase64 === "string" ? body.referenceImageBase64 : undefined; const refImageMime = typeof body.referenceImageMime === "string" ? body.referenceImageMime : undefined; if (refImageBase64 && refImageMime) { opts.refImageBase64 = refImageBase64; opts.refImageMime = refImageMime; } const userId: number | null = (req as any).jwtUserId ?? null; // ── Credits check ──────────────────────────────────────────────────────────── if (userId !== null) { const costStr = await getVideoConfigVal("video_gen_cost"); const cost = Number(costStr) || 0; if (cost > 0) { const creditResult = await checkAndDeductVideoCredits(userId, cost, `影片生成(${opts.model})`); if (!creditResult.ok) { return res.status(402).json({ error: "INSUFFICIENT_CREDITS", message: `點數不足,此操作需要 ${cost} 點`, balance: creditResult.balance ?? 0, }); } } } const taskId = createTask(); res.json({ taskId }); // Dispatch to the correct runner based on model const runner = opts.model === "veo-3-fast" ? runVeoTask : runGrokTask; runner(taskId, opts, isPrivate, userId).catch((err) => { console.error("[video-task] uncaught:", err); const t = tasks.get(taskId); if (t && t.status === "pending") { finishTask(t, "failed", { type: "error", errorCode: "INTERNAL_ERROR", message: String(err) }); } }); }); // ── GET /api/videos/progress/:taskId (SSE) ────────────────────────────────── router.get("/progress/:taskId", (req, res: ExpressResponse) => { const task = tasks.get(req.params.taskId); if (!task) return res.status(404).json({ error: "TASK_NOT_FOUND" }); res.writeHead(200, { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive", "X-Accel-Buffering": "no", }); res.flushHeaders(); const send = (event: ProgressEvent) => { res.write(`data: ${JSON.stringify(event)}\n\n`); }; // Replay buffered events for (const ev of task.buffered) { send(ev); } // If already done, close immediately if (task.status !== "pending") { res.write("data: {\"type\":\"done\"}\n\n"); res.end(); return; } // Subscribe to future events task.clients.add(send); // Heartbeat every 20s to prevent proxy / CDN from closing idle SSE connections const heartbeat = setInterval(() => { try { res.write(": heartbeat\n\n"); } catch { clearInterval(heartbeat); } }, 20_000); const onClose = () => { clearInterval(heartbeat); task.clients.delete(send); }; req.on("close", onClose); req.on("end", onClose); }); // Helper: rewrite raw geminigen URLs to proxy URL for backward compatibility. // assets.grok.com URLs are returned as-is (browser plays directly). // /api/videos/stored/ URLs are returned as-is (GCS). function toProxyUrl(url: string | null): string | null { if (!url) return null; if (url.startsWith("/api/videos/proxy")) return url; if (url.startsWith("/api/videos/stored")) return url; try { const u = new URL(url); // Cloudflare R2 pre-signed URLs (*.r2.cloudflarestorage.com) are publicly // accessible — no proxy needed. Browser plays them directly. if (u.hostname.endsWith(".r2.cloudflarestorage.com")) return url; // assets.grok.com requires auth/cookies — route through our proxy. if (u.hostname === "api.geminigen.ai" || u.hostname === "assets.grok.com") { return `/api/videos/proxy?url=${encodeURIComponent(url)}`; } } catch { /* relative path or non-URL */ } return url; } // ── GET /api/videos/stored/.mp4 ────────────────────────────────────────── // Serve videos that were downloaded from CDN and stored in GCS. router.get(/^\/stored\/(.+)$/, async (req, res) => { const objectPath = (req.params as Record)[0]; if (!objectPath) return res.status(400).json({ error: "Missing objectPath" }); await streamStoredVideo(objectPath, res, req.headers.range); }); // ── GET /api/videos/history ─────────────────────────────────────────────────── router.get("/history", optionalJwtAuth, async (req, res) => { const userId: number | null = (req as any).jwtUserId ?? null; const limit = Math.min(Number(req.query.limit) || 20, 50); const offset = Number(req.query.offset) || 0; const visibilityFilter = userId ? or(eq(videosTable.isPrivate, false), and(eq(videosTable.isPrivate, true), eq(videosTable.userId, userId))) : eq(videosTable.isPrivate, false); const rows = await db .select() .from(videosTable) .where(visibilityFilter) .orderBy(desc(videosTable.createdAt)) .limit(limit) .offset(offset); // Rewrite any legacy direct geminigen URLs to proxy URLs const videos = rows.map((v) => ({ ...v, videoUrl: toProxyUrl(v.videoUrl) ?? v.videoUrl, thumbnailUrl: toProxyUrl(v.thumbnailUrl), })); return res.json({ videos, limit, offset }); }); // ── GET /api/videos/proxy ───────────────────────────────────────────────────── // Proxy geminigen.ai video/thumbnail files with Bearer token auth. // Supports HTTP Range requests so the browser's