| """ |
| Test script to verify Redis job store functionality |
| """ |
| import sys |
| import os |
|
|
| |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) |
|
|
| from backend.utils.redis_job_store import RedisJobStore |
|
|
| def test_redis_job_store(): |
| """Test the Redis job store functionality.""" |
| print("Testing Redis Job Store...") |
|
|
| |
| try: |
| redis_job_store = RedisJobStore('redis://localhost:6379/0') |
| print("[OK] Redis job store initialized successfully") |
| except Exception as e: |
| print(f"[ERROR] Failed to initialize Redis job store: {e}") |
| return False |
|
|
| |
| try: |
| job_id = redis_job_store.create_job(initial_status='pending', initial_data={'test': True}) |
| print(f"[OK] Job created successfully with ID: {job_id}") |
| except Exception as e: |
| print(f"[ERROR] Failed to create job: {e}") |
| return False |
|
|
| |
| try: |
| job = redis_job_store.get_job(job_id) |
| if job and job['status'] == 'pending': |
| print(f"[OK] Job retrieved successfully: {job}") |
| else: |
| print(f"[ERROR] Job not found or incorrect status: {job}") |
| return False |
| except Exception as e: |
| print(f"[ERROR] Failed to get job: {e}") |
| return False |
|
|
| |
| try: |
| success = redis_job_store.update_job(job_id, status='completed', result={'data': 'test result'}) |
| if success: |
| print("[OK] Job updated successfully") |
| else: |
| print("[ERROR] Job update failed") |
| return False |
| except Exception as e: |
| print(f"[ERROR] Failed to update job: {e}") |
| return False |
|
|
| |
| try: |
| job = redis_job_store.get_job(job_id) |
| if job and job['status'] == 'completed' and job['result']['data'] == 'test result': |
| print(f"[OK] Updated job retrieved successfully: {job}") |
| else: |
| print(f"[ERROR] Job not updated correctly: {job}") |
| return False |
| except Exception as e: |
| print(f"[ERROR] Failed to get updated job: {e}") |
| return False |
|
|
| |
| try: |
| success = redis_job_store.delete_job(job_id) |
| if success: |
| print("[OK] Job deleted successfully") |
| else: |
| print("[ERROR] Job deletion failed") |
| return False |
| except Exception as e: |
| print(f"[ERROR] Failed to delete job: {e}") |
| return False |
|
|
| |
| try: |
| |
| try: |
| redis_job_store.create_job(initial_status='invalid_status') |
| print("[ERROR] Should have failed with invalid status") |
| return False |
| except ValueError: |
| print("[OK] Correctly rejected invalid status") |
|
|
| |
| try: |
| redis_job_store.get_job("invalid job id with spaces") |
| print("[ERROR] Should have failed with invalid job ID format") |
| return False |
| except ValueError: |
| print("[OK] Correctly rejected invalid job ID format") |
| except Exception as e: |
| print(f"[ERROR] Validation tests failed: {e}") |
| return False |
|
|
| print("[OK] All Redis job store tests passed!") |
| return True |
|
|
| if __name__ == "__main__": |
| success = test_redis_job_store() |
| if success: |
| print("\nRedis job store is working correctly!") |
| else: |
| print("\nRedis job store tests failed!") |
| sys.exit(1) |