| |
| |
| |
| |
|
|
| export type TaskType = 'workflow' | 'scraper' | 'email' | 'social'; |
| export type TaskStatus = 'pending' | 'running' | 'completed' | 'failed' | 'cancelled'; |
| export type TaskPriority = 'low' | 'medium' | 'high'; |
|
|
| export interface Task { |
| id: string; |
| type: TaskType; |
| status: TaskStatus; |
| priority: TaskPriority; |
| data: Record<string, unknown>; |
| createdAt: Date; |
| startedAt?: Date; |
| completedAt?: Date; |
| error?: string; |
| retryCount: number; |
| maxRetries: number; |
| } |
|
|
| export interface QueueStats { |
| type: TaskType; |
| pending: number; |
| running: number; |
| completed: number; |
| failed: number; |
| avgProcessingTime: number; |
| } |
|
|
| export interface QueueConfig { |
| maxConcurrent: number; |
| processingInterval: number; |
| maxRetries: number; |
| } |
|
|
| |
| |
| |
| export class TaskQueue { |
| private tasks: Map<string, Task> = new Map(); |
| private queues: Map<TaskType, Task[]> = new Map(); |
| private running: Map<TaskType, Set<string>> = new Map(); |
| private config: Map<TaskType, QueueConfig> = new Map(); |
|
|
| constructor() { |
| |
| const types: TaskType[] = ['workflow', 'scraper', 'email', 'social']; |
| types.forEach(type => { |
| this.queues.set(type, []); |
| this.running.set(type, new Set()); |
| |
| |
| this.config.set(type, { |
| maxConcurrent: type === 'scraper' ? 2 : 5, |
| processingInterval: 1000, |
| maxRetries: 3, |
| }); |
| }); |
| } |
|
|
| |
| |
| |
| async addTask(task: Omit<Task, 'id' | 'createdAt' | 'status' | 'retryCount'>): Promise<string> { |
| const id = `${task.type}_${Date.now()}_${Math.random().toString(36).slice(2, 9)}`; |
| |
| const fullTask: Task = { |
| ...task, |
| id, |
| status: 'pending', |
| createdAt: new Date(), |
| retryCount: 0, |
| }; |
|
|
| this.tasks.set(id, fullTask); |
| |
| const queue = this.queues.get(task.type); |
| if (queue) { |
| |
| const insertIndex = queue.findIndex(t => |
| this.getPriorityValue(t.priority) < this.getPriorityValue(task.priority) |
| ); |
| |
| if (insertIndex === -1) { |
| queue.push(fullTask); |
| } else { |
| queue.splice(insertIndex, 0, fullTask); |
| } |
| } |
|
|
| console.log(`✅ Task ${id} added to ${task.type} queue`); |
| return id; |
| } |
|
|
| |
| |
| |
| getNextTask(type: TaskType): Task | null { |
| const queue = this.queues.get(type); |
| const runningSet = this.running.get(type); |
| const config = this.config.get(type); |
|
|
| if (!queue || !runningSet || !config) return null; |
|
|
| |
| if (runningSet.size >= config.maxConcurrent) { |
| return null; |
| } |
|
|
| |
| const task = queue.find(t => t.status === 'pending'); |
| if (!task) return null; |
|
|
| |
| task.status = 'running'; |
| task.startedAt = new Date(); |
| runningSet.add(task.id); |
|
|
| return task; |
| } |
|
|
| |
| |
| |
| completeTask(taskId: string, error?: string): void { |
| const task = this.tasks.get(taskId); |
| if (!task) return; |
|
|
| const runningSet = this.running.get(task.type); |
| if (runningSet) { |
| runningSet.delete(taskId); |
| } |
|
|
| task.completedAt = new Date(); |
| |
| if (error) { |
| task.error = error; |
| task.retryCount++; |
|
|
| |
| const config = this.config.get(task.type); |
| if (config && task.retryCount < (task.maxRetries || config.maxRetries)) { |
| task.status = 'pending'; |
| console.log(`🔄 Retrying task ${taskId} (attempt ${task.retryCount + 1})`); |
| } else { |
| task.status = 'failed'; |
| console.log(`❌ Task ${taskId} failed: ${error}`); |
| } |
| } else { |
| task.status = 'completed'; |
| console.log(`✅ Task ${taskId} completed`); |
| |
| |
| const queue = this.queues.get(task.type); |
| if (queue) { |
| const index = queue.findIndex(t => t.id === taskId); |
| if (index !== -1) { |
| queue.splice(index, 1); |
| } |
| } |
| } |
| } |
|
|
| |
| |
| |
| cancelTask(taskId: string): boolean { |
| const task = this.tasks.get(taskId); |
| if (!task || task.status === 'completed' || task.status === 'failed') { |
| return false; |
| } |
|
|
| task.status = 'cancelled'; |
| task.completedAt = new Date(); |
|
|
| const runningSet = this.running.get(task.type); |
| if (runningSet) { |
| runningSet.delete(taskId); |
| } |
|
|
| |
| const queue = this.queues.get(task.type); |
| if (queue) { |
| const index = queue.findIndex(t => t.id === taskId); |
| if (index !== -1) { |
| queue.splice(index, 1); |
| } |
| } |
|
|
| console.log(`🛑 Task ${taskId} cancelled`); |
| return true; |
| } |
|
|
| |
| |
| |
| getTask(taskId: string): Task | undefined { |
| return this.tasks.get(taskId); |
| } |
|
|
| |
| |
| |
| getTasksByType(type: TaskType): Task[] { |
| return Array.from(this.tasks.values()).filter(t => t.type === type); |
| } |
|
|
| |
| |
| |
| getActiveTasks(): Task[] { |
| return Array.from(this.tasks.values()).filter( |
| t => t.status === 'pending' || t.status === 'running' |
| ); |
| } |
|
|
| |
| |
| |
| getStats(type: TaskType): QueueStats { |
| const tasks = this.getTasksByType(type); |
| |
| const pending = tasks.filter(t => t.status === 'pending').length; |
| const running = tasks.filter(t => t.status === 'running').length; |
| const completed = tasks.filter(t => t.status === 'completed').length; |
| const failed = tasks.filter(t => t.status === 'failed').length; |
|
|
| |
| const completedTasks = tasks.filter(t => |
| t.status === 'completed' && t.startedAt && t.completedAt |
| ); |
| const avgProcessingTime = completedTasks.length > 0 |
| ? completedTasks.reduce((sum, t) => { |
| const duration = t.completedAt!.getTime() - t.startedAt!.getTime(); |
| return sum + duration; |
| }, 0) / completedTasks.length |
| : 0; |
|
|
| return { |
| type, |
| pending, |
| running, |
| completed, |
| failed, |
| avgProcessingTime, |
| }; |
| } |
|
|
| |
| |
| |
| getAllStats(): QueueStats[] { |
| return Array.from(this.queues.keys()).map(type => this.getStats(type)); |
| } |
|
|
| |
| |
| |
| updateConfig(type: TaskType, config: Partial<QueueConfig>): void { |
| const currentConfig = this.config.get(type); |
| if (currentConfig) { |
| this.config.set(type, { ...currentConfig, ...config }); |
| } |
| } |
|
|
| |
| |
| |
| clearCompleted(type?: TaskType): number { |
| let cleared = 0; |
| |
| this.tasks.forEach((task, id) => { |
| if (type && task.type !== type) return; |
| |
| if (task.status === 'completed' || task.status === 'failed' || task.status === 'cancelled') { |
| this.tasks.delete(id); |
| cleared++; |
| } |
| }); |
|
|
| console.log(`🧹 Cleared ${cleared} completed tasks`); |
| return cleared; |
| } |
|
|
| |
| |
| |
| private getPriorityValue(priority: TaskPriority): number { |
| const values = { low: 1, medium: 2, high: 3 }; |
| return values[priority]; |
| } |
| } |
|
|
| |
| export const taskQueue = new TaskQueue(); |
|
|