| import { Keyv } from 'keyv'; |
| import { FlowStateManager } from './manager'; |
| import { FlowState } from './types'; |
|
|
| |
| class MockKeyv { |
| private store: Map<string, FlowState<string>>; |
|
|
| constructor() { |
| this.store = new Map(); |
| } |
|
|
| async get(key: string): Promise<FlowState<string> | undefined> { |
| return this.store.get(key); |
| } |
|
|
| |
| async set(key: string, value: FlowState<string>, _ttl?: number): Promise<true> { |
| this.store.set(key, value); |
| return true; |
| } |
|
|
| async delete(key: string): Promise<boolean> { |
| return this.store.delete(key); |
| } |
| } |
|
|
| describe('FlowStateManager', () => { |
| let flowManager: FlowStateManager<string>; |
| let store: MockKeyv; |
|
|
| beforeEach(() => { |
| store = new MockKeyv(); |
| |
| flowManager = new FlowStateManager(store as unknown as Keyv, { ttl: 30000, ci: true }); |
| }); |
|
|
| afterEach(() => { |
| jest.clearAllMocks(); |
| }); |
|
|
| describe('Concurrency Tests', () => { |
| it('should handle concurrent flow creation and return same result', async () => { |
| const flowId = 'test-flow'; |
| const type = 'test-type'; |
|
|
| |
| const flow1Promise = flowManager.createFlowWithHandler(flowId, type, async () => { |
| await new Promise((resolve) => setTimeout(resolve, 100)); |
| return 'result'; |
| }); |
|
|
| const flow2Promise = flowManager.createFlowWithHandler(flowId, type, async () => { |
| await new Promise((resolve) => setTimeout(resolve, 50)); |
| return 'different-result'; |
| }); |
|
|
| |
| const [result1, result2] = await Promise.all([flow1Promise, flow2Promise]); |
|
|
| expect(result1).toBe('result'); |
| expect(result2).toBe('result'); |
| }); |
|
|
| it('should handle flow timeout correctly', async () => { |
| const flowId = 'timeout-flow'; |
| const type = 'test-type'; |
|
|
| |
| const shortTtlManager = new FlowStateManager(store as unknown as Keyv, { |
| ttl: 100, |
| ci: true, |
| }); |
|
|
| const flowPromise = shortTtlManager.createFlow(flowId, type); |
|
|
| await expect(flowPromise).rejects.toThrow('test-type flow timed out'); |
| }); |
|
|
| it('should maintain flow state consistency under high concurrency', async () => { |
| const flowId = 'concurrent-flow'; |
| const type = 'test-type'; |
|
|
| |
| const operations = []; |
| for (let i = 0; i < 10; i++) { |
| operations.push( |
| flowManager.createFlowWithHandler(flowId, type, async () => { |
| await new Promise((resolve) => setTimeout(resolve, Math.random() * 50)); |
| return `result-${i}`; |
| }), |
| ); |
| } |
|
|
| |
| const results = await Promise.all(operations); |
| const firstResult = results[0]; |
| results.forEach((result: string) => { |
| expect(result).toBe(firstResult); |
| }); |
| }); |
|
|
| it('should handle race conditions in flow completion', async () => { |
| const flowId = 'test-flow'; |
| const type = 'test-type'; |
|
|
| |
| const flowPromise = flowManager.createFlow(flowId, type); |
|
|
| |
| await new Promise((resolve) => setTimeout(resolve, 500)); |
|
|
| |
| await flowManager.completeFlow(flowId, type, 'result1'); |
|
|
| const result = await flowPromise; |
| expect(result).toBe('result1'); |
| }, 15000); |
|
|
| it('should handle concurrent flow monitoring', async () => { |
| const flowId = 'test-flow'; |
| const type = 'test-type'; |
|
|
| |
| const flowPromise = flowManager.createFlow(flowId, type); |
|
|
| |
| await new Promise((resolve) => setTimeout(resolve, 500)); |
|
|
| |
| await flowManager.completeFlow(flowId, type, 'success'); |
|
|
| const result = await flowPromise; |
| expect(result).toBe('success'); |
| }, 15000); |
|
|
| it('should handle concurrent success and failure attempts', async () => { |
| const flowId = 'race-flow'; |
| const type = 'test-type'; |
|
|
| const flowPromise = flowManager.createFlow(flowId, type); |
|
|
| |
| await new Promise((resolve) => setTimeout(resolve, 500)); |
|
|
| |
| await flowManager.failFlow(flowId, type, new Error('failure')); |
|
|
| await expect(flowPromise).rejects.toThrow('failure'); |
| }, 15000); |
| }); |
|
|
| describe('deleteFlow', () => { |
| const flowId = 'test-flow-123'; |
| const type = 'test-type'; |
| const flowKey = `${type}:${flowId}`; |
|
|
| it('deletes an existing flow', async () => { |
| await store.set(flowKey, { type, status: 'PENDING', metadata: {}, createdAt: Date.now() }); |
| expect(await store.get(flowKey)).toBeDefined(); |
|
|
| const result = await flowManager.deleteFlow(flowId, type); |
|
|
| expect(result).toBe(true); |
| expect(await store.get(flowKey)).toBeUndefined(); |
| }); |
|
|
| it('returns false if the deletion errors', async () => { |
| jest.spyOn(store, 'delete').mockRejectedValue(new Error('Deletion failed')); |
|
|
| const result = await flowManager.deleteFlow(flowId, type); |
|
|
| expect(result).toBe(false); |
| }); |
|
|
| it('does nothing if the flow does not exist', async () => { |
| expect(await store.get(flowKey)).toBeUndefined(); |
|
|
| const result = await flowManager.deleteFlow(flowId, type); |
|
|
| expect(result).toBe(true); |
| }); |
| }); |
|
|
| describe('isFlowStale', () => { |
| const flowId = 'test-flow-stale'; |
| const type = 'test-type'; |
| const flowKey = `${type}:${flowId}`; |
|
|
| it('returns not stale for non-existent flow', async () => { |
| const result = await flowManager.isFlowStale(flowId, type); |
|
|
| expect(result).toEqual({ |
| isStale: false, |
| age: 0, |
| }); |
| }); |
|
|
| it('returns not stale for PENDING flow regardless of age', async () => { |
| const oldTimestamp = Date.now() - 10 * 60 * 1000; |
| await store.set(flowKey, { |
| type, |
| status: 'PENDING', |
| metadata: {}, |
| createdAt: oldTimestamp, |
| }); |
|
|
| const result = await flowManager.isFlowStale(flowId, type, 2 * 60 * 1000); |
|
|
| expect(result).toEqual({ |
| isStale: false, |
| age: 0, |
| status: 'PENDING', |
| }); |
| }); |
|
|
| it('returns not stale for recently COMPLETED flow', async () => { |
| const recentTimestamp = Date.now() - 30 * 1000; |
| await store.set(flowKey, { |
| type, |
| status: 'COMPLETED', |
| metadata: {}, |
| createdAt: Date.now() - 60 * 1000, |
| completedAt: recentTimestamp, |
| }); |
|
|
| const result = await flowManager.isFlowStale(flowId, type, 2 * 60 * 1000); |
|
|
| expect(result.isStale).toBe(false); |
| expect(result.status).toBe('COMPLETED'); |
| expect(result.age).toBeGreaterThan(0); |
| expect(result.age).toBeLessThan(60 * 1000); |
| }); |
|
|
| it('returns stale for old COMPLETED flow', async () => { |
| const oldTimestamp = Date.now() - 5 * 60 * 1000; |
| await store.set(flowKey, { |
| type, |
| status: 'COMPLETED', |
| metadata: {}, |
| createdAt: Date.now() - 10 * 60 * 1000, |
| completedAt: oldTimestamp, |
| }); |
|
|
| const result = await flowManager.isFlowStale(flowId, type, 2 * 60 * 1000); |
|
|
| expect(result.isStale).toBe(true); |
| expect(result.status).toBe('COMPLETED'); |
| expect(result.age).toBeGreaterThan(2 * 60 * 1000); |
| }); |
|
|
| it('returns not stale for recently FAILED flow', async () => { |
| const recentTimestamp = Date.now() - 30 * 1000; |
| await store.set(flowKey, { |
| type, |
| status: 'FAILED', |
| metadata: {}, |
| createdAt: Date.now() - 60 * 1000, |
| failedAt: recentTimestamp, |
| error: 'Test error', |
| }); |
|
|
| const result = await flowManager.isFlowStale(flowId, type, 2 * 60 * 1000); |
|
|
| expect(result.isStale).toBe(false); |
| expect(result.status).toBe('FAILED'); |
| expect(result.age).toBeGreaterThan(0); |
| expect(result.age).toBeLessThan(60 * 1000); |
| }); |
|
|
| it('returns stale for old FAILED flow', async () => { |
| const oldTimestamp = Date.now() - 5 * 60 * 1000; |
| await store.set(flowKey, { |
| type, |
| status: 'FAILED', |
| metadata: {}, |
| createdAt: Date.now() - 10 * 60 * 1000, |
| failedAt: oldTimestamp, |
| error: 'Test error', |
| }); |
|
|
| const result = await flowManager.isFlowStale(flowId, type, 2 * 60 * 1000); |
|
|
| expect(result.isStale).toBe(true); |
| expect(result.status).toBe('FAILED'); |
| expect(result.age).toBeGreaterThan(2 * 60 * 1000); |
| }); |
|
|
| it('uses custom stale threshold', async () => { |
| const timestamp = Date.now() - 90 * 1000; |
| await store.set(flowKey, { |
| type, |
| status: 'COMPLETED', |
| metadata: {}, |
| createdAt: Date.now() - 2 * 60 * 1000, |
| completedAt: timestamp, |
| }); |
|
|
| |
| const result1 = await flowManager.isFlowStale(flowId, type, 60 * 1000); |
| expect(result1.isStale).toBe(true); |
|
|
| |
| const result2 = await flowManager.isFlowStale(flowId, type, 120 * 1000); |
| expect(result2.isStale).toBe(false); |
| }); |
|
|
| it('uses default threshold of 2 minutes when not specified', async () => { |
| const timestamp = Date.now() - 3 * 60 * 1000; |
| await store.set(flowKey, { |
| type, |
| status: 'COMPLETED', |
| metadata: {}, |
| createdAt: Date.now() - 5 * 60 * 1000, |
| completedAt: timestamp, |
| }); |
|
|
| |
| const result = await flowManager.isFlowStale(flowId, type); |
|
|
| expect(result.isStale).toBe(true); |
| expect(result.age).toBeGreaterThan(2 * 60 * 1000); |
| }); |
|
|
| it('falls back to createdAt when completedAt/failedAt are not present', async () => { |
| const createdTimestamp = Date.now() - 5 * 60 * 1000; |
| await store.set(flowKey, { |
| type, |
| status: 'COMPLETED', |
| metadata: {}, |
| createdAt: createdTimestamp, |
| |
| }); |
|
|
| const result = await flowManager.isFlowStale(flowId, type, 2 * 60 * 1000); |
|
|
| expect(result.isStale).toBe(true); |
| expect(result.status).toBe('COMPLETED'); |
| expect(result.age).toBeGreaterThan(2 * 60 * 1000); |
| }); |
|
|
| it('handles flow with no timestamps', async () => { |
| await store.set(flowKey, { |
| type, |
| status: 'COMPLETED', |
| metadata: {}, |
| |
| } as FlowState<string>); |
|
|
| const result = await flowManager.isFlowStale(flowId, type, 2 * 60 * 1000); |
|
|
| expect(result.isStale).toBe(false); |
| expect(result.age).toBe(0); |
| expect(result.status).toBe('COMPLETED'); |
| }); |
|
|
| it('prefers completedAt over createdAt for age calculation', async () => { |
| const createdTimestamp = Date.now() - 10 * 60 * 1000; |
| const completedTimestamp = Date.now() - 30 * 1000; |
| await store.set(flowKey, { |
| type, |
| status: 'COMPLETED', |
| metadata: {}, |
| createdAt: createdTimestamp, |
| completedAt: completedTimestamp, |
| }); |
|
|
| const result = await flowManager.isFlowStale(flowId, type, 2 * 60 * 1000); |
|
|
| |
| expect(result.isStale).toBe(false); |
| expect(result.age).toBeLessThan(60 * 1000); |
| }); |
|
|
| it('prefers failedAt over createdAt for age calculation', async () => { |
| const createdTimestamp = Date.now() - 10 * 60 * 1000; |
| const failedTimestamp = Date.now() - 30 * 1000; |
| await store.set(flowKey, { |
| type, |
| status: 'FAILED', |
| metadata: {}, |
| createdAt: createdTimestamp, |
| failedAt: failedTimestamp, |
| error: 'Test error', |
| }); |
|
|
| const result = await flowManager.isFlowStale(flowId, type, 2 * 60 * 1000); |
|
|
| |
| expect(result.isStale).toBe(false); |
| expect(result.age).toBeLessThan(60 * 1000); |
| }); |
| }); |
| }); |
|
|