| import redis |
| import json |
| from typing import Optional |
| from src.dev_pilot.state.sdlc_state import CustomEncoder, SDLCState |
| from upstash_redis import Redis |
| import os |
| from dotenv import load_dotenv |
| from loguru import logger |
|
|
| load_dotenv() |
|
|
|
|
| |
|
|
| |
| REDIS_URL = os.getenv("REDIS_URL") |
| REDIS_TOKEN = os.getenv("REDIS_TOKEN") |
| redis_client = redis = Redis(url=REDIS_URL, token=REDIS_TOKEN) |
|
|
| |
| |
| |
| |
| |
| |
|
|
| def save_state_to_redis(task_id: str, state: SDLCState): |
| """Save the state to Redis.""" |
| state = json.dumps(state, cls=CustomEncoder) |
| redis_client.set(task_id, state) |
|
|
| |
| redis_client.expire(task_id, 86400) |
|
|
| def get_state_from_redis(task_id: str) -> Optional[SDLCState]: |
| """ Retrieves the state from redis """ |
| state_json = redis_client.get(task_id) |
| if not state_json: |
| return None |
| |
| state_dict = json.loads(state_json)[0] |
| return SDLCState(**state_dict) |
|
|
| def delete_from_redis(task_id: str): |
| """ Delete from redis """ |
| redis_client.delete(task_id) |
|
|
| def flush_redis_cache(): |
| """ Flushes the whole cache""" |
|
|
| |
| redis_client.flushall() |
|
|
| logger.info("--- Redis cache cleared ---") |