| |
| |
| |
| |
|
|
| import { taskQueue, type TaskType } from '@/lib/queue/task-queue'; |
| import { db } from '@/db'; |
| import { scrapingJobs } from '@/db/schema'; |
| import { eq } from 'drizzle-orm'; |
|
|
| export class QueueProcessor { |
| private isRunning = false; |
| private intervals: Map<TaskType, NodeJS.Timeout> = new Map(); |
|
|
| |
| |
| |
| start(): void { |
| if (this.isRunning) { |
| console.log('⚠️ Queue processor already running'); |
| return; |
| } |
|
|
| this.isRunning = true; |
| console.log('🚀 Queue processor started'); |
|
|
| |
| this.startWorkflowProcessor(); |
| this.startScraperProcessor(); |
| this.startEmailProcessor(); |
| this.startSocialProcessor(); |
| } |
|
|
| |
| |
| |
| stop(): void { |
| this.intervals.forEach(interval => clearInterval(interval)); |
| this.intervals.clear(); |
| this.isRunning = false; |
| console.log('🛑 Queue processor stopped'); |
| } |
|
|
| |
| |
| |
| private startWorkflowProcessor(): void { |
| const interval = setInterval(async () => { |
| const task = taskQueue.getNextTask('workflow'); |
| if (!task) return; |
|
|
| try { |
| console.log(`🔄 Processing workflow task ${task.id}`); |
|
|
| const { workflowId, businessId, userId } = task.data as { |
| workflowId: string; |
| businessId?: string; |
| userId: string; |
| }; |
|
|
| |
| const { executeWorkflowLoopWithLogging } = await import('@/lib/workflow-executor'); |
|
|
| const result = await executeWorkflowLoopWithLogging(workflowId, userId, businessId); |
|
|
| if (result.success) { |
| console.log(`✅ Workflow ${workflowId} executed:`, result.logs.join(', ')); |
| taskQueue.completeTask(task.id); |
| } else { |
| taskQueue.completeTask(task.id, result.logs.join('; ')); |
| } |
| } catch (error) { |
| const message = error instanceof Error ? error.message : 'Unknown error'; |
| taskQueue.completeTask(task.id, message); |
| } |
| }, 2000); |
|
|
| this.intervals.set('workflow', interval); |
| } |
|
|
| |
| |
| |
| private startScraperProcessor(): void { |
| const interval = setInterval(async () => { |
| const task = taskQueue.getNextTask('scraper'); |
| if (!task) return; |
|
|
| try { |
| console.log(`🔍 Processing scraper task ${task.id}`); |
|
|
| const { jobId } = task.data as { jobId: string }; |
|
|
| |
| await db |
| .update(scrapingJobs) |
| .set({ status: 'running' }) |
| .where(eq(scrapingJobs.id, jobId)); |
|
|
| |
| |
| console.log(`Processing scraper job ${jobId}`); |
|
|
| taskQueue.completeTask(task.id); |
| } catch (error) { |
| const message = error instanceof Error ? error.message : 'Unknown error'; |
| taskQueue.completeTask(task.id, message); |
|
|
| |
| const { jobId } = task.data as { jobId: string }; |
| await db |
| .update(scrapingJobs) |
| .set({ status: 'failed' }) |
| .where(eq(scrapingJobs.id, jobId)); |
| } |
| }, 3000); |
|
|
| this.intervals.set('scraper', interval); |
| } |
|
|
| |
| |
| |
| private startEmailProcessor(): void { |
| const interval = setInterval(async () => { |
| const task = taskQueue.getNextTask('email'); |
| if (!task) return; |
|
|
| try { |
| console.log(`📧 Processing email task ${task.id}`); |
|
|
| const { to, subject, body, accessToken } = task.data as { |
| to: string; |
| subject: string; |
| body: string; |
| accessToken: string; |
| }; |
|
|
| |
| const { sendEmail } = await import('@/lib/email'); |
|
|
| const result = await sendEmail({ to, subject, body, accessToken }); |
|
|
| if (result?.success) { |
| console.log(`✅ Email sent to ${to}`); |
| taskQueue.completeTask(task.id); |
| } else { |
| taskQueue.completeTask(task.id, result?.error || 'Failed to send email'); |
| } |
| } catch (error) { |
| const message = error instanceof Error ? error.message : 'Unknown error'; |
| taskQueue.completeTask(task.id, message); |
| } |
| }, 1000); |
|
|
| this.intervals.set('email', interval); |
| } |
|
|
| |
| |
| |
| private startSocialProcessor(): void { |
| const interval = setInterval(async () => { |
| const task = taskQueue.getNextTask('social'); |
| if (!task) return; |
|
|
| try { |
| console.log(`📱 Processing social task ${task.id}`); |
|
|
| const { automationId } = task.data as { automationId: string }; |
|
|
| |
| const { socialAutomationWorker } = await import('@/lib/workers/social-automation'); |
|
|
| |
| const worker = socialAutomationWorker as unknown as { processAutomations: () => Promise<void> }; |
| await worker.processAutomations(); |
|
|
| console.log(`✅ Social automation ${automationId} processed`); |
| taskQueue.completeTask(task.id); |
| } catch (error) { |
| const message = error instanceof Error ? error.message : 'Unknown error'; |
| taskQueue.completeTask(task.id, message); |
| } |
| }, 2000); |
|
|
| this.intervals.set('social', interval); |
| } |
| } |
|
|
| |
| export const queueProcessor = new QueueProcessor(); |
|
|
| |
| |
| |
| export function startQueueProcessor(): void { |
| queueProcessor.start(); |
| } |
|
|
| |
| |
| |
| export function stopQueueProcessor(): void { |
| queueProcessor.stop(); |
| } |
|
|