| """ |
| Redis-based job store for distributed job tracking across multiple Gunicorn workers. |
| """ |
| import json |
| import uuid |
| import redis |
| from flask import current_app |
| from datetime import datetime, timedelta |
| import logging |
| import re |
|
|
|
|
| class RedisJobStore: |
| """ |
| Redis-based job store for tracking background tasks across multiple Gunicorn workers. |
| """ |
|
|
| |
| VALID_STATUSES = {'pending', 'processing', 'completed', 'failed', 'cancelled'} |
|
|
| def __init__(self, redis_url=None, default_ttl_hours=24): |
| """ |
| Initialize Redis job store. |
| |
| Args: |
| redis_url (str): Redis connection URL. If None, uses current_app.config['REDIS_URL'] |
| default_ttl_hours (int): Default time-to-live for jobs in hours |
| """ |
| self.default_ttl_hours = default_ttl_hours |
| if redis_url: |
| self.redis_client = redis.from_url(redis_url) |
| elif current_app and hasattr(current_app, 'config') and 'REDIS_URL' in current_app.config: |
| self.redis_client = redis.from_url(current_app.config['REDIS_URL']) |
| else: |
| |
| self.redis_client = redis.from_url('redis://localhost:6379/0') |
|
|
| def _validate_job_id(self, job_id): |
| """ |
| Validate job ID format to prevent injection attacks. |
| |
| Args: |
| job_id (str): Job ID to validate |
| |
| Returns: |
| bool: True if valid, False otherwise |
| """ |
| if not job_id or not isinstance(job_id, str): |
| return False |
| |
| return bool(re.match(r'^[a-zA-Z0-9_-]{1,64}$', job_id)) |
|
|
| def _validate_status(self, status): |
| """ |
| Validate job status value. |
| |
| Args: |
| status (str): Status to validate |
| |
| Returns: |
| bool: True if valid, False otherwise |
| """ |
| return status in self.VALID_STATUSES |
|
|
| def create_job(self, job_id=None, initial_status='pending', initial_data=None): |
| """ |
| Create a new job in Redis. |
| |
| Args: |
| job_id (str): Job ID. If None, generates a new UUID. |
| initial_status (str): Initial job status. |
| initial_data (dict): Initial job data. |
| |
| Returns: |
| str: Job ID |
| """ |
| if initial_status and not self._validate_status(initial_status): |
| raise ValueError(f"Invalid status: {initial_status}. Valid statuses are: {self.VALID_STATUSES}") |
|
|
| if job_id and not self._validate_job_id(job_id): |
| raise ValueError(f"Invalid job ID format: {job_id}") |
|
|
| if job_id is None: |
| job_id = str(uuid.uuid4()) |
|
|
| job_data = { |
| 'status': initial_status, |
| 'result': None, |
| 'error': None, |
| 'created_at': datetime.utcnow().isoformat(), |
| 'updated_at': datetime.utcnow().isoformat() |
| } |
|
|
| if initial_data: |
| job_data.update(initial_data) |
|
|
| try: |
| |
| self.redis_client.setex( |
| f"job:{job_id}", |
| timedelta(hours=self.default_ttl_hours), |
| json.dumps(job_data) |
| ) |
| except redis.ConnectionError: |
| logging.error(f"Failed to connect to Redis when creating job {job_id}") |
| raise |
| except Exception as e: |
| logging.error(f"Unexpected error when creating job {job_id}: {str(e)}") |
| raise |
|
|
| return job_id |
|
|
| def get_job(self, job_id): |
| """ |
| Get job data from Redis. |
| |
| Args: |
| job_id (str): Job ID |
| |
| Returns: |
| dict: Job data or None if not found |
| """ |
| if not self._validate_job_id(job_id): |
| raise ValueError(f"Invalid job ID format: {job_id}") |
|
|
| try: |
| job_data_json = self.redis_client.get(f"job:{job_id}") |
| if job_data_json: |
| return json.loads(job_data_json) |
| return None |
| except redis.ConnectionError: |
| logging.error(f"Failed to connect to Redis when getting job {job_id}") |
| return None |
| except json.JSONDecodeError: |
| logging.error(f"Failed to decode JSON for job {job_id}") |
| return None |
| except Exception as e: |
| logging.error(f"Unexpected error when getting job {job_id}: {str(e)}") |
| return None |
|
|
| def update_job(self, job_id, status=None, result=None, error=None): |
| """ |
| Update job status and data in Redis using atomic operations to prevent race conditions. |
| |
| Args: |
| job_id (str): Job ID |
| status (str): New status |
| result (any): Result data |
| error (str): Error message |
| |
| Returns: |
| bool: True if job was updated, False if not found |
| """ |
| if not self._validate_job_id(job_id): |
| raise ValueError(f"Invalid job ID format: {job_id}") |
|
|
| if status is not None and not self._validate_status(status): |
| raise ValueError(f"Invalid status: {status}. Valid statuses are: {self.VALID_STATUSES}") |
|
|
| |
| lua_script = """ |
| local job_key = KEYS[1] |
| local job_data = redis.call('GET', job_key) |
| |
| if not job_data then |
| return 0 |
| end |
| |
| local updated_data = cjson.decode(job_data) |
| |
| if ARGV[1] ~= 'nil' then |
| updated_data.status = ARGV[1] |
| end |
| if ARGV[2] ~= 'nil' then |
| updated_data.result = cjson.decode(ARGV[2]) |
| end |
| if ARGV[3] ~= 'nil' then |
| updated_data.error = ARGV[3] |
| end |
| |
| updated_data.updated_at = ARGV[4] |
| |
| local ttl = redis.call('TTL', job_key) |
| redis.call('SET', job_key, cjson.encode(updated_data), 'EX', ttl) |
| |
| return 1 |
| """ |
|
|
| try: |
| |
| status_arg = status if status is not None else 'nil' |
| result_arg = json.dumps(result) if result is not None else 'nil' |
| error_arg = error if error is not None else 'nil' |
| updated_at_arg = datetime.utcnow().isoformat() |
|
|
| script = self.redis_client.register_script(lua_script) |
| result = script(keys=[f"job:{job_id}"], |
| args=[status_arg, result_arg, error_arg, updated_at_arg]) |
|
|
| return result == 1 |
| except redis.ConnectionError: |
| logging.error(f"Failed to connect to Redis when updating job {job_id}") |
| return False |
| except Exception as e: |
| logging.error(f"Unexpected error when updating job {job_id}: {str(e)}") |
| return False |
|
|
| def delete_job(self, job_id): |
| """ |
| Delete a job from Redis. |
| |
| Args: |
| job_id (str): Job ID |
| |
| Returns: |
| bool: True if job was deleted, False if not found |
| """ |
| if not self._validate_job_id(job_id): |
| raise ValueError(f"Invalid job ID format: {job_id}") |
|
|
| try: |
| result = self.redis_client.delete(f"job:{job_id}") |
| return result > 0 |
| except redis.ConnectionError: |
| logging.error(f"Failed to connect to Redis when deleting job {job_id}") |
| return False |
| except Exception as e: |
| logging.error(f"Unexpected error when deleting job {job_id}: {str(e)}") |
| return False |
|
|
| def cleanup_expired_jobs(self): |
| """ |
| Clean up jobs that have expired based on their creation time. |
| This is a placeholder method - in a real implementation, you might want to |
| scan for expired jobs and remove them, but Redis automatically handles TTL. |
| """ |
| |
| |
| pass |
|
|
|
|
| def get_redis_job_store(): |
| """ |
| Get the Redis job store instance from the current app context. |
| |
| Returns: |
| RedisJobStore: Redis job store instance |
| """ |
| if not hasattr(current_app, 'redis_job_store'): |
| redis_url = current_app.config.get('REDIS_URL', 'redis://localhost:6379/0') |
| current_app.redis_job_store = RedisJobStore(redis_url) |
|
|
| return current_app.redis_job_store |