import { and, eq, lte } from "drizzle-orm"; import { db } from "@/db"; import { connectedAccounts, socialPosts } from "@/db/schema"; import { claimSocialEvent, buildSocialEventKey, releaseSocialEvent } from "@/lib/social/event-dedupe"; import { socialPublisher } from "@/lib/social/publisher"; import { Logger } from "@/lib/logger"; const CHECK_INTERVAL_MS = 30 * 1000; let intervalId: NodeJS.Timeout | null = null; let isRunning = false; async function publishScheduledPost(post: typeof socialPosts.$inferSelect) { if (!post.connectedAccountId) { await db .update(socialPosts) .set({ status: "failed", error: "Scheduled post is missing a connected account", updatedAt: new Date(), }) .where(eq(socialPosts.id, post.id)); return; } const lockKey = buildSocialEventKey(post.platform, post.connectedAccountId, "scheduled-post", post.id); const claimed = await claimSocialEvent(lockKey, 60 * 10); if (!claimed) { return; } try { const account = await db.query.connectedAccounts.findFirst({ where: eq(connectedAccounts.id, post.connectedAccountId), }); if (!account) { throw new Error("Connected account not found"); } await db .update(socialPosts) .set({ status: "publishing", updatedAt: new Date(), }) .where(and(eq(socialPosts.id, post.id), eq(socialPosts.status, "scheduled"))); const mediaUrl = post.mediaUrls?.[0]; const payload = { content: post.content || post.title || "", mediaUrl, accessToken: account.accessToken, providerAccountId: account.providerAccountId, refreshToken: account.refreshToken || undefined, }; let platformPostId: string | null = null; if (account.provider === "facebook") { platformPostId = await socialPublisher.publishToFacebook(payload); } else if (account.provider === "instagram") { platformPostId = await socialPublisher.publishToInstagram(payload); } else if (account.provider === "linkedin") { platformPostId = await socialPublisher.publishToLinkedin(payload); } else if (account.provider === "youtube") { platformPostId = (await socialPublisher.publishToYoutube(payload)) || null; } else { throw new Error(`Unsupported scheduled publishing provider: ${account.provider}`); } await db .update(socialPosts) .set({ status: "published", platformPostId, publishedAt: new Date(), error: null, updatedAt: new Date(), }) .where(eq(socialPosts.id, post.id)); Logger.info("Scheduled post published", { postId: post.id, platform: account.provider, platformPostId, }); } catch (error) { const message = error instanceof Error ? error.message : String(error); await db .update(socialPosts) .set({ status: "failed", error: message, updatedAt: new Date(), }) .where(eq(socialPosts.id, post.id)); Logger.error("Scheduled post publish failed", error, { postId: post.id, }); } finally { await releaseSocialEvent(lockKey); } } export async function processScheduledPostsOnce() { const now = new Date(); const duePosts = await db.query.socialPosts.findMany({ where: and( eq(socialPosts.status, "scheduled"), lte(socialPosts.scheduledAt, now) ), limit: 25, }); for (const post of duePosts) { await publishScheduledPost(post); } } export function startScheduledPostWorker(intervalMs = CHECK_INTERVAL_MS) { if (isRunning) { return; } isRunning = true; processScheduledPostsOnce().catch((error) => { Logger.error("Initial scheduled post run failed", error); }); intervalId = setInterval(() => { processScheduledPostsOnce().catch((error) => { Logger.error("Scheduled post worker run failed", error); }); }, intervalMs); Logger.info("Scheduled post worker started", { intervalMs }); } export function stopScheduledPostWorker() { if (intervalId) { clearInterval(intervalId); intervalId = null; } isRunning = false; }