| import os |
| import sys |
| import asyncio |
| import multiprocessing as mp |
| from multiprocessing.synchronize import Lock as ProcessLock |
| from multiprocessing import Manager |
| import time |
| import logging |
| from typing import Any, Dict, List, Optional, Union, TypeVar, Generic |
|
|
| from lightrag.exceptions import PipelineNotInitializedError |
|
|
|
|
| |
| def direct_log(message, enable_output: bool = False, level: str = "DEBUG"): |
| """ |
| Log a message directly to stderr to ensure visibility in all processes, |
| including the Gunicorn master process. |
| |
| Args: |
| message: The message to log |
| level: Log level (default: "DEBUG") |
| enable_output: Whether to actually output the log (default: True) |
| """ |
| if not enable_output: |
| return |
|
|
| |
| try: |
| from lightrag.utils import logger |
|
|
| current_level = logger.getEffectiveLevel() |
| except ImportError: |
| |
| current_level = 20 |
|
|
| |
| level_mapping = { |
| "DEBUG": 10, |
| "INFO": 20, |
| "WARNING": 30, |
| "ERROR": 40, |
| "CRITICAL": 50, |
| } |
| message_level = level_mapping.get(level.upper(), logging.DEBUG) |
|
|
| |
| if message_level >= current_level: |
| print(f"{level}: {message}", file=sys.stderr, flush=True) |
|
|
|
|
| T = TypeVar("T") |
| LockType = Union[ProcessLock, asyncio.Lock] |
|
|
| _is_multiprocess = None |
| _workers = None |
| _manager = None |
|
|
| |
| _lock_registry: Optional[Dict[str, mp.synchronize.Lock]] = None |
| _lock_registry_count: Optional[Dict[str, int]] = None |
| _lock_cleanup_data: Optional[Dict[str, time.time]] = None |
| _registry_guard = None |
| |
| CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300 |
| |
| CLEANUP_THRESHOLD = 500 |
| |
| MIN_CLEANUP_INTERVAL_SECONDS = 30 |
| |
| _earliest_mp_cleanup_time: Optional[float] = None |
| |
| _last_mp_cleanup_time: Optional[float] = None |
|
|
| _initialized = None |
|
|
| |
| _shared_dicts: Optional[Dict[str, Any]] = None |
| _init_flags: Optional[Dict[str, bool]] = None |
| _update_flags: Optional[Dict[str, bool]] = None |
|
|
| |
| _storage_lock: Optional[LockType] = None |
| _internal_lock: Optional[LockType] = None |
| _pipeline_status_lock: Optional[LockType] = None |
| _graph_db_lock: Optional[LockType] = None |
| _data_init_lock: Optional[LockType] = None |
| |
| _storage_keyed_lock: Optional["KeyedUnifiedLock"] = None |
|
|
| |
| _async_locks: Optional[Dict[str, asyncio.Lock]] = None |
|
|
| DEBUG_LOCKS = False |
| _debug_n_locks_acquired: int = 0 |
|
|
|
|
| def inc_debug_n_locks_acquired(): |
| global _debug_n_locks_acquired |
| if DEBUG_LOCKS: |
| _debug_n_locks_acquired += 1 |
| print(f"DEBUG: Keyed Lock acquired, total: {_debug_n_locks_acquired:>5}") |
|
|
|
|
| def dec_debug_n_locks_acquired(): |
| global _debug_n_locks_acquired |
| if DEBUG_LOCKS: |
| if _debug_n_locks_acquired > 0: |
| _debug_n_locks_acquired -= 1 |
| print(f"DEBUG: Keyed Lock released, total: {_debug_n_locks_acquired:>5}") |
| else: |
| raise RuntimeError("Attempting to release lock when no locks are acquired") |
|
|
|
|
| def get_debug_n_locks_acquired(): |
| global _debug_n_locks_acquired |
| return _debug_n_locks_acquired |
|
|
|
|
| class UnifiedLock(Generic[T]): |
| """Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock""" |
|
|
| def __init__( |
| self, |
| lock: Union[ProcessLock, asyncio.Lock], |
| is_async: bool, |
| name: str = "unnamed", |
| enable_logging: bool = True, |
| async_lock: Optional[asyncio.Lock] = None, |
| ): |
| self._lock = lock |
| self._is_async = is_async |
| self._pid = os.getpid() |
| self._name = name |
| self._enable_logging = enable_logging |
| self._async_lock = async_lock |
|
|
| async def __aenter__(self) -> "UnifiedLock[T]": |
| try: |
| |
| if not self._is_async and self._async_lock is not None: |
| await self._async_lock.acquire() |
| direct_log( |
| f"== Lock == Process {self._pid}: Async lock for '{self._name}' acquired", |
| enable_output=self._enable_logging, |
| ) |
|
|
| |
| if self._is_async: |
| await self._lock.acquire() |
| else: |
| self._lock.acquire() |
|
|
| direct_log( |
| f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (async={self._is_async})", |
| enable_output=self._enable_logging, |
| ) |
| return self |
| except Exception as e: |
| |
| if ( |
| not self._is_async |
| and self._async_lock is not None |
| and self._async_lock.locked() |
| ): |
| self._async_lock.release() |
|
|
| direct_log( |
| f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}", |
| level="ERROR", |
| enable_output=self._enable_logging, |
| ) |
| raise |
|
|
| async def __aexit__(self, exc_type, exc_val, exc_tb): |
| main_lock_released = False |
| try: |
| |
| if self._is_async: |
| self._lock.release() |
| else: |
| self._lock.release() |
| main_lock_released = True |
|
|
| direct_log( |
| f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})", |
| enable_output=self._enable_logging, |
| ) |
|
|
| |
| if not self._is_async and self._async_lock is not None: |
| self._async_lock.release() |
| direct_log( |
| f"== Lock == Process {self._pid}: Async lock '{self._name}' released", |
| enable_output=self._enable_logging, |
| ) |
|
|
| except Exception as e: |
| direct_log( |
| f"== Lock == Process {self._pid}: Failed to release lock '{self._name}': {e}", |
| level="ERROR", |
| enable_output=self._enable_logging, |
| ) |
|
|
| |
| if ( |
| not main_lock_released |
| and not self._is_async |
| and self._async_lock is not None |
| ): |
| try: |
| direct_log( |
| f"== Lock == Process {self._pid}: Attempting to release async lock after main lock failure", |
| level="WARNING", |
| enable_output=self._enable_logging, |
| ) |
| self._async_lock.release() |
| direct_log( |
| f"== Lock == Process {self._pid}: Successfully released async lock after main lock failure", |
| enable_output=self._enable_logging, |
| ) |
| except Exception as inner_e: |
| direct_log( |
| f"== Lock == Process {self._pid}: Failed to release async lock after main lock failure: {inner_e}", |
| level="ERROR", |
| enable_output=self._enable_logging, |
| ) |
|
|
| raise |
|
|
| def __enter__(self) -> "UnifiedLock[T]": |
| """For backward compatibility""" |
| try: |
| if self._is_async: |
| raise RuntimeError("Use 'async with' for shared_storage lock") |
| direct_log( |
| f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (sync)", |
| enable_output=self._enable_logging, |
| ) |
| self._lock.acquire() |
| direct_log( |
| f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (sync)", |
| enable_output=self._enable_logging, |
| ) |
| return self |
| except Exception as e: |
| direct_log( |
| f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}' (sync): {e}", |
| level="ERROR", |
| enable_output=self._enable_logging, |
| ) |
| raise |
|
|
| def __exit__(self, exc_type, exc_val, exc_tb): |
| """For backward compatibility""" |
| try: |
| if self._is_async: |
| raise RuntimeError("Use 'async with' for shared_storage lock") |
| direct_log( |
| f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (sync)", |
| enable_output=self._enable_logging, |
| ) |
| self._lock.release() |
| direct_log( |
| f"== Lock == Process {self._pid}: Lock '{self._name}' released (sync)", |
| enable_output=self._enable_logging, |
| ) |
| except Exception as e: |
| direct_log( |
| f"== Lock == Process {self._pid}: Failed to release lock '{self._name}' (sync): {e}", |
| level="ERROR", |
| enable_output=self._enable_logging, |
| ) |
| raise |
|
|
| def locked(self) -> bool: |
| if self._is_async: |
| return self._lock.locked() |
| else: |
| return self._lock.locked() |
|
|
|
|
| def _get_combined_key(factory_name: str, key: str) -> str: |
| """Return the combined key for the factory and key.""" |
| return f"{factory_name}:{key}" |
|
|
|
|
| def _perform_lock_cleanup( |
| lock_type: str, |
| cleanup_data: Dict[str, float], |
| lock_registry: Optional[Dict[str, Any]], |
| lock_count: Optional[Dict[str, int]], |
| earliest_cleanup_time: Optional[float], |
| last_cleanup_time: Optional[float], |
| current_time: float, |
| threshold_check: bool = True, |
| ) -> tuple[int, Optional[float], Optional[float]]: |
| """ |
| Generic lock cleanup function to unify cleanup logic for both multiprocess and async locks. |
| |
| Args: |
| lock_type: Lock type identifier ("mp" or "async") |
| cleanup_data: Cleanup data dictionary |
| lock_registry: Lock registry dictionary (can be None for async locks) |
| lock_count: Lock count dictionary (can be None for async locks) |
| earliest_cleanup_time: Earliest cleanup time |
| last_cleanup_time: Last cleanup time |
| current_time: Current time |
| threshold_check: Whether to check threshold condition (default True, set to False in cleanup_expired_locks) |
| |
| Returns: |
| tuple: (cleaned_count, new_earliest_time, new_last_cleanup_time) |
| """ |
| if len(cleanup_data) == 0: |
| return 0, earliest_cleanup_time, last_cleanup_time |
|
|
| |
| if threshold_check and len(cleanup_data) < CLEANUP_THRESHOLD: |
| return 0, earliest_cleanup_time, last_cleanup_time |
|
|
| |
| if last_cleanup_time is not None and current_time < last_cleanup_time: |
| direct_log( |
| f"== {lock_type} Lock == Time rollback detected, resetting cleanup time", |
| level="WARNING", |
| enable_output=False, |
| ) |
| last_cleanup_time = None |
|
|
| |
| has_expired_locks = ( |
| earliest_cleanup_time is not None |
| and current_time - earliest_cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS |
| ) |
|
|
| interval_satisfied = ( |
| last_cleanup_time is None |
| or current_time - last_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS |
| ) |
|
|
| if not (has_expired_locks and interval_satisfied): |
| return 0, earliest_cleanup_time, last_cleanup_time |
|
|
| try: |
| cleaned_count = 0 |
| new_earliest_time = None |
|
|
| |
| total_cleanup_len = len(cleanup_data) |
|
|
| |
| for cleanup_key, cleanup_time in list(cleanup_data.items()): |
| if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: |
| |
| cleanup_data.pop(cleanup_key, None) |
|
|
| |
| if lock_registry is not None: |
| lock_registry.pop(cleanup_key, None) |
| if lock_count is not None: |
| lock_count.pop(cleanup_key, None) |
|
|
| cleaned_count += 1 |
| else: |
| |
| if new_earliest_time is None or cleanup_time < new_earliest_time: |
| new_earliest_time = cleanup_time |
|
|
| |
| if cleaned_count > 0: |
| new_last_cleanup_time = current_time |
|
|
| |
| next_cleanup_in = max( |
| (new_earliest_time + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - current_time) |
| if new_earliest_time |
| else float("inf"), |
| MIN_CLEANUP_INTERVAL_SECONDS, |
| ) |
|
|
| if lock_type == "async": |
| direct_log( |
| f"== {lock_type} Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired {lock_type} locks, " |
| f"next cleanup in {next_cleanup_in:.1f}s", |
| enable_output=False, |
| level="INFO", |
| ) |
| else: |
| direct_log( |
| f"== {lock_type} Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks, " |
| f"next cleanup in {next_cleanup_in:.1f}s", |
| enable_output=False, |
| level="INFO", |
| ) |
|
|
| return cleaned_count, new_earliest_time, new_last_cleanup_time |
| else: |
| return 0, earliest_cleanup_time, last_cleanup_time |
|
|
| except Exception as e: |
| direct_log( |
| f"== {lock_type} Lock == Cleanup failed: {e}", |
| level="ERROR", |
| enable_output=False, |
| ) |
| return 0, earliest_cleanup_time, last_cleanup_time |
|
|
|
|
| def _get_or_create_shared_raw_mp_lock( |
| factory_name: str, key: str |
| ) -> Optional[mp.synchronize.Lock]: |
| """Return the *singleton* manager.Lock() proxy for keyed lock, creating if needed.""" |
| if not _is_multiprocess: |
| return None |
|
|
| with _registry_guard: |
| combined_key = _get_combined_key(factory_name, key) |
| raw = _lock_registry.get(combined_key) |
| count = _lock_registry_count.get(combined_key) |
| if raw is None: |
| raw = _manager.Lock() |
| _lock_registry[combined_key] = raw |
| count = 0 |
| else: |
| if count is None: |
| raise RuntimeError( |
| f"Shared-Data lock registry for {factory_name} is corrupted for key {key}" |
| ) |
| if ( |
| count == 0 and combined_key in _lock_cleanup_data |
| ): |
| _lock_cleanup_data.pop(combined_key) |
| count += 1 |
| _lock_registry_count[combined_key] = count |
| return raw |
|
|
|
|
| def _release_shared_raw_mp_lock(factory_name: str, key: str): |
| """Release the *singleton* manager.Lock() proxy for *key*.""" |
| if not _is_multiprocess: |
| return |
|
|
| global _earliest_mp_cleanup_time, _last_mp_cleanup_time |
|
|
| with _registry_guard: |
| combined_key = _get_combined_key(factory_name, key) |
| raw = _lock_registry.get(combined_key) |
| count = _lock_registry_count.get(combined_key) |
| if raw is None and count is None: |
| return |
| elif raw is None or count is None: |
| raise RuntimeError( |
| f"Shared-Data lock registry for {factory_name} is corrupted for key {key}" |
| ) |
|
|
| count -= 1 |
| if count < 0: |
| raise RuntimeError( |
| f"Attempting to release lock for {key} more times than it was acquired" |
| ) |
|
|
| _lock_registry_count[combined_key] = count |
|
|
| current_time = time.time() |
| if count == 0: |
| _lock_cleanup_data[combined_key] = current_time |
|
|
| |
| if ( |
| _earliest_mp_cleanup_time is None |
| or current_time < _earliest_mp_cleanup_time |
| ): |
| _earliest_mp_cleanup_time = current_time |
|
|
| |
| cleaned_count, new_earliest_time, new_last_cleanup_time = _perform_lock_cleanup( |
| lock_type="mp", |
| cleanup_data=_lock_cleanup_data, |
| lock_registry=_lock_registry, |
| lock_count=_lock_registry_count, |
| earliest_cleanup_time=_earliest_mp_cleanup_time, |
| last_cleanup_time=_last_mp_cleanup_time, |
| current_time=current_time, |
| threshold_check=True, |
| ) |
|
|
| |
| if cleaned_count > 0: |
| _earliest_mp_cleanup_time = new_earliest_time |
| _last_mp_cleanup_time = new_last_cleanup_time |
|
|
|
|
| class KeyedUnifiedLock: |
| """ |
| Manager for unified keyed locks, supporting both single and multi-process |
| |
| • Keeps only a table of async keyed locks locally |
| • Fetches the multi-process keyed lock on every acquire |
| • Builds a fresh `UnifiedLock` each time, so `enable_logging` |
| (or future options) can vary per call. |
| • Supports dynamic namespaces specified at lock usage time |
| """ |
|
|
| def __init__(self, *, default_enable_logging: bool = True) -> None: |
| self._default_enable_logging = default_enable_logging |
| self._async_lock: Dict[str, asyncio.Lock] = {} |
| self._async_lock_count: Dict[ |
| str, int |
| ] = {} |
| self._async_lock_cleanup_data: Dict[ |
| str, time.time |
| ] = {} |
| self._mp_locks: Dict[ |
| str, mp.synchronize.Lock |
| ] = {} |
| self._earliest_async_cleanup_time: Optional[float] = ( |
| None |
| ) |
| self._last_async_cleanup_time: Optional[float] = ( |
| None |
| ) |
|
|
| def __call__( |
| self, namespace: str, keys: list[str], *, enable_logging: Optional[bool] = None |
| ): |
| """ |
| Ergonomic helper so you can write: |
| |
| async with storage_keyed_lock("namespace", ["key1", "key2"]): |
| ... |
| """ |
| if enable_logging is None: |
| enable_logging = self._default_enable_logging |
| return _KeyedLockContext( |
| self, |
| namespace=namespace, |
| keys=keys, |
| enable_logging=enable_logging, |
| ) |
|
|
| def _get_or_create_async_lock(self, combined_key: str) -> asyncio.Lock: |
| async_lock = self._async_lock.get(combined_key) |
| count = self._async_lock_count.get(combined_key, 0) |
| if async_lock is None: |
| async_lock = asyncio.Lock() |
| self._async_lock[combined_key] = async_lock |
| elif count == 0 and combined_key in self._async_lock_cleanup_data: |
| self._async_lock_cleanup_data.pop(combined_key) |
| count += 1 |
| self._async_lock_count[combined_key] = count |
| return async_lock |
|
|
| def _release_async_lock(self, combined_key: str): |
| count = self._async_lock_count.get(combined_key, 0) |
| count -= 1 |
|
|
| current_time = time.time() |
| if count == 0: |
| self._async_lock_cleanup_data[combined_key] = current_time |
|
|
| |
| if ( |
| self._earliest_async_cleanup_time is None |
| or current_time < self._earliest_async_cleanup_time |
| ): |
| self._earliest_async_cleanup_time = current_time |
| self._async_lock_count[combined_key] = count |
|
|
| |
| cleaned_count, new_earliest_time, new_last_cleanup_time = _perform_lock_cleanup( |
| lock_type="async", |
| cleanup_data=self._async_lock_cleanup_data, |
| lock_registry=self._async_lock, |
| lock_count=self._async_lock_count, |
| earliest_cleanup_time=self._earliest_async_cleanup_time, |
| last_cleanup_time=self._last_async_cleanup_time, |
| current_time=current_time, |
| threshold_check=True, |
| ) |
|
|
| |
| if cleaned_count > 0: |
| self._earliest_async_cleanup_time = new_earliest_time |
| self._last_async_cleanup_time = new_last_cleanup_time |
|
|
| def _get_lock_for_key( |
| self, namespace: str, key: str, enable_logging: bool = False |
| ) -> UnifiedLock: |
| |
| combined_key = _get_combined_key(namespace, key) |
|
|
| |
| |
| async_lock = self._get_or_create_async_lock(combined_key) |
|
|
| |
| raw_lock = _get_or_create_shared_raw_mp_lock(namespace, key) |
| is_multiprocess = raw_lock is not None |
| if not is_multiprocess: |
| raw_lock = async_lock |
|
|
| |
| if is_multiprocess: |
| return UnifiedLock( |
| lock=raw_lock, |
| is_async=False, |
| name=combined_key, |
| enable_logging=enable_logging, |
| async_lock=async_lock, |
| ) |
| else: |
| return UnifiedLock( |
| lock=raw_lock, |
| is_async=True, |
| name=combined_key, |
| enable_logging=enable_logging, |
| async_lock=None, |
| ) |
|
|
| def _release_lock_for_key(self, namespace: str, key: str): |
| combined_key = _get_combined_key(namespace, key) |
| self._release_async_lock(combined_key) |
| _release_shared_raw_mp_lock(namespace, key) |
|
|
| def cleanup_expired_locks(self) -> Dict[str, Any]: |
| """ |
| Cleanup expired locks for both async and multiprocess locks following the same |
| conditions as _release_shared_raw_mp_lock and _release_async_lock functions. |
| |
| Only performs cleanup when both has_expired_locks and interval_satisfied conditions are met |
| to avoid too frequent cleanup operations. |
| |
| Since async and multiprocess locks work together, this method cleans up |
| both types of expired locks and returns comprehensive statistics. |
| |
| Returns: |
| Dict containing cleanup statistics and current status: |
| { |
| "process_id": 12345, |
| "cleanup_performed": { |
| "mp_cleaned": 5, |
| "async_cleaned": 3 |
| }, |
| "current_status": { |
| "total_mp_locks": 10, |
| "pending_mp_cleanup": 2, |
| "total_async_locks": 8, |
| "pending_async_cleanup": 1 |
| } |
| } |
| """ |
| global _lock_registry, _lock_registry_count, _lock_cleanup_data |
| global _registry_guard, _earliest_mp_cleanup_time, _last_mp_cleanup_time |
|
|
| cleanup_stats = {"mp_cleaned": 0, "async_cleaned": 0} |
|
|
| current_time = time.time() |
|
|
| |
| if ( |
| _is_multiprocess |
| and _lock_registry is not None |
| and _registry_guard is not None |
| ): |
| try: |
| with _registry_guard: |
| if _lock_cleanup_data is not None: |
| |
| cleaned_count, new_earliest_time, new_last_cleanup_time = ( |
| _perform_lock_cleanup( |
| lock_type="mp", |
| cleanup_data=_lock_cleanup_data, |
| lock_registry=_lock_registry, |
| lock_count=_lock_registry_count, |
| earliest_cleanup_time=_earliest_mp_cleanup_time, |
| last_cleanup_time=_last_mp_cleanup_time, |
| current_time=current_time, |
| threshold_check=False, |
| ) |
| ) |
|
|
| |
| if cleaned_count > 0: |
| _earliest_mp_cleanup_time = new_earliest_time |
| _last_mp_cleanup_time = new_last_cleanup_time |
| cleanup_stats["mp_cleaned"] = cleaned_count |
|
|
| except Exception as e: |
| direct_log( |
| f"Error during multiprocess lock cleanup: {e}", |
| level="ERROR", |
| enable_output=False, |
| ) |
|
|
| |
| try: |
| |
| cleaned_count, new_earliest_time, new_last_cleanup_time = ( |
| _perform_lock_cleanup( |
| lock_type="async", |
| cleanup_data=self._async_lock_cleanup_data, |
| lock_registry=self._async_lock, |
| lock_count=self._async_lock_count, |
| earliest_cleanup_time=self._earliest_async_cleanup_time, |
| last_cleanup_time=self._last_async_cleanup_time, |
| current_time=current_time, |
| threshold_check=False, |
| ) |
| ) |
|
|
| |
| if cleaned_count > 0: |
| self._earliest_async_cleanup_time = new_earliest_time |
| self._last_async_cleanup_time = new_last_cleanup_time |
| cleanup_stats["async_cleaned"] = cleaned_count |
|
|
| except Exception as e: |
| direct_log( |
| f"Error during async lock cleanup: {e}", |
| level="ERROR", |
| enable_output=False, |
| ) |
|
|
| |
| current_status = self.get_lock_status() |
|
|
| return { |
| "process_id": os.getpid(), |
| "cleanup_performed": cleanup_stats, |
| "current_status": current_status, |
| } |
|
|
| def get_lock_status(self) -> Dict[str, int]: |
| """ |
| Get current status of both async and multiprocess locks. |
| |
| Returns comprehensive lock counts for both types of locks since |
| they work together in the keyed lock system. |
| |
| Returns: |
| Dict containing lock counts: |
| { |
| "total_mp_locks": 10, |
| "pending_mp_cleanup": 2, |
| "total_async_locks": 8, |
| "pending_async_cleanup": 1 |
| } |
| """ |
| global _lock_registry_count, _lock_cleanup_data, _registry_guard |
|
|
| status = { |
| "total_mp_locks": 0, |
| "pending_mp_cleanup": 0, |
| "total_async_locks": 0, |
| "pending_async_cleanup": 0, |
| } |
|
|
| try: |
| |
| if _is_multiprocess and _lock_registry_count is not None: |
| if _registry_guard is not None: |
| with _registry_guard: |
| status["total_mp_locks"] = len(_lock_registry_count) |
| if _lock_cleanup_data is not None: |
| status["pending_mp_cleanup"] = len(_lock_cleanup_data) |
|
|
| |
| status["total_async_locks"] = len(self._async_lock_count) |
| status["pending_async_cleanup"] = len(self._async_lock_cleanup_data) |
|
|
| except Exception as e: |
| direct_log( |
| f"Error getting keyed lock status: {e}", |
| level="ERROR", |
| enable_output=False, |
| ) |
|
|
| return status |
|
|
|
|
| class _KeyedLockContext: |
| def __init__( |
| self, |
| parent: KeyedUnifiedLock, |
| namespace: str, |
| keys: list[str], |
| enable_logging: bool, |
| ) -> None: |
| self._parent = parent |
| self._namespace = namespace |
|
|
| |
| |
| self._keys = sorted(keys) |
| self._enable_logging = ( |
| enable_logging |
| if enable_logging is not None |
| else parent._default_enable_logging |
| ) |
| self._ul: Optional[List["UnifiedLock"]] = None |
|
|
| |
| async def __aenter__(self): |
| if self._ul is not None: |
| raise RuntimeError("KeyedUnifiedLock already acquired in current context") |
|
|
| |
| self._ul = [] |
| for key in self._keys: |
| lock = self._parent._get_lock_for_key( |
| self._namespace, key, enable_logging=self._enable_logging |
| ) |
| await lock.__aenter__() |
| inc_debug_n_locks_acquired() |
| self._ul.append(lock) |
| return self |
|
|
| |
| async def __aexit__(self, exc_type, exc, tb): |
| |
| for ul, key in zip(reversed(self._ul), reversed(self._keys)): |
| await ul.__aexit__(exc_type, exc, tb) |
| self._parent._release_lock_for_key(self._namespace, key) |
| dec_debug_n_locks_acquired() |
| self._ul = None |
|
|
|
|
| def get_internal_lock(enable_logging: bool = False) -> UnifiedLock: |
| """return unified storage lock for data consistency""" |
| async_lock = _async_locks.get("internal_lock") if _is_multiprocess else None |
| return UnifiedLock( |
| lock=_internal_lock, |
| is_async=not _is_multiprocess, |
| name="internal_lock", |
| enable_logging=enable_logging, |
| async_lock=async_lock, |
| ) |
|
|
|
|
| def get_storage_lock(enable_logging: bool = False) -> UnifiedLock: |
| """return unified storage lock for data consistency""" |
| async_lock = _async_locks.get("storage_lock") if _is_multiprocess else None |
| return UnifiedLock( |
| lock=_storage_lock, |
| is_async=not _is_multiprocess, |
| name="storage_lock", |
| enable_logging=enable_logging, |
| async_lock=async_lock, |
| ) |
|
|
|
|
| def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock: |
| """return unified storage lock for data consistency""" |
| async_lock = _async_locks.get("pipeline_status_lock") if _is_multiprocess else None |
| return UnifiedLock( |
| lock=_pipeline_status_lock, |
| is_async=not _is_multiprocess, |
| name="pipeline_status_lock", |
| enable_logging=enable_logging, |
| async_lock=async_lock, |
| ) |
|
|
|
|
| def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock: |
| """return unified graph database lock for ensuring atomic operations""" |
| async_lock = _async_locks.get("graph_db_lock") if _is_multiprocess else None |
| return UnifiedLock( |
| lock=_graph_db_lock, |
| is_async=not _is_multiprocess, |
| name="graph_db_lock", |
| enable_logging=enable_logging, |
| async_lock=async_lock, |
| ) |
|
|
|
|
| def get_storage_keyed_lock( |
| keys: str | list[str], namespace: str = "default", enable_logging: bool = False |
| ) -> _KeyedLockContext: |
| """Return unified storage keyed lock for ensuring atomic operations across different namespaces""" |
| global _storage_keyed_lock |
| if _storage_keyed_lock is None: |
| raise RuntimeError("Shared-Data is not initialized") |
| if isinstance(keys, str): |
| keys = [keys] |
| return _storage_keyed_lock(namespace, keys, enable_logging=enable_logging) |
|
|
|
|
| def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock: |
| """return unified data initialization lock for ensuring atomic data initialization""" |
| async_lock = _async_locks.get("data_init_lock") if _is_multiprocess else None |
| return UnifiedLock( |
| lock=_data_init_lock, |
| is_async=not _is_multiprocess, |
| name="data_init_lock", |
| enable_logging=enable_logging, |
| async_lock=async_lock, |
| ) |
|
|
|
|
| def cleanup_keyed_lock() -> Dict[str, Any]: |
| """ |
| Force cleanup of expired keyed locks and return comprehensive status information. |
| |
| This function actively cleans up expired locks for both async and multiprocess locks, |
| then returns detailed statistics about the cleanup operation and current lock status. |
| |
| Returns: |
| Same as cleanup_expired_locks in KeyedUnifiedLock |
| """ |
| global _storage_keyed_lock |
|
|
| |
| if not _initialized or _storage_keyed_lock is None: |
| return { |
| "process_id": os.getpid(), |
| "cleanup_performed": {"mp_cleaned": 0, "async_cleaned": 0}, |
| "current_status": { |
| "total_mp_locks": 0, |
| "pending_mp_cleanup": 0, |
| "total_async_locks": 0, |
| "pending_async_cleanup": 0, |
| }, |
| } |
|
|
| return _storage_keyed_lock.cleanup_expired_locks() |
|
|
|
|
| def get_keyed_lock_status() -> Dict[str, Any]: |
| """ |
| Get current status of keyed locks without performing cleanup. |
| |
| This function provides a read-only view of the current lock counts |
| for both multiprocess and async locks, including pending cleanup counts. |
| |
| Returns: |
| Same as get_lock_status in KeyedUnifiedLock |
| """ |
| global _storage_keyed_lock |
|
|
| |
| if not _initialized or _storage_keyed_lock is None: |
| return { |
| "process_id": os.getpid(), |
| "total_mp_locks": 0, |
| "pending_mp_cleanup": 0, |
| "total_async_locks": 0, |
| "pending_async_cleanup": 0, |
| } |
|
|
| status = _storage_keyed_lock.get_lock_status() |
| status["process_id"] = os.getpid() |
| return status |
|
|
|
|
| def initialize_share_data(workers: int = 1): |
| """ |
| Initialize shared storage data for single or multi-process mode. |
| |
| When used with Gunicorn's preload feature, this function is called once in the |
| master process before forking worker processes, allowing all workers to share |
| the same initialized data. |
| |
| In single-process mode, this function is called in FASTAPI lifespan function. |
| |
| The function determines whether to use cross-process shared variables for data storage |
| based on the number of workers. If workers=1, it uses thread locks and local dictionaries. |
| If workers>1, it uses process locks and shared dictionaries managed by multiprocessing.Manager. |
| |
| Args: |
| workers (int): Number of worker processes. If 1, single-process mode is used. |
| If > 1, multi-process mode with shared memory is used. |
| """ |
| global \ |
| _manager, \ |
| _workers, \ |
| _is_multiprocess, \ |
| _storage_lock, \ |
| _lock_registry, \ |
| _lock_registry_count, \ |
| _lock_cleanup_data, \ |
| _registry_guard, \ |
| _internal_lock, \ |
| _pipeline_status_lock, \ |
| _graph_db_lock, \ |
| _data_init_lock, \ |
| _shared_dicts, \ |
| _init_flags, \ |
| _initialized, \ |
| _update_flags, \ |
| _async_locks, \ |
| _storage_keyed_lock, \ |
| _earliest_mp_cleanup_time, \ |
| _last_mp_cleanup_time |
|
|
| |
| if _initialized: |
| direct_log( |
| f"Process {os.getpid()} Shared-Data already initialized (multiprocess={_is_multiprocess})" |
| ) |
| return |
|
|
| _workers = workers |
|
|
| if workers > 1: |
| _is_multiprocess = True |
| _manager = Manager() |
| _lock_registry = _manager.dict() |
| _lock_registry_count = _manager.dict() |
| _lock_cleanup_data = _manager.dict() |
| _registry_guard = _manager.RLock() |
| _internal_lock = _manager.Lock() |
| _storage_lock = _manager.Lock() |
| _pipeline_status_lock = _manager.Lock() |
| _graph_db_lock = _manager.Lock() |
| _data_init_lock = _manager.Lock() |
| _shared_dicts = _manager.dict() |
| _init_flags = _manager.dict() |
| _update_flags = _manager.dict() |
|
|
| _storage_keyed_lock = KeyedUnifiedLock() |
|
|
| |
| _async_locks = { |
| "internal_lock": asyncio.Lock(), |
| "storage_lock": asyncio.Lock(), |
| "pipeline_status_lock": asyncio.Lock(), |
| "graph_db_lock": asyncio.Lock(), |
| "data_init_lock": asyncio.Lock(), |
| } |
|
|
| direct_log( |
| f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})" |
| ) |
| else: |
| _is_multiprocess = False |
| _internal_lock = asyncio.Lock() |
| _storage_lock = asyncio.Lock() |
| _pipeline_status_lock = asyncio.Lock() |
| _graph_db_lock = asyncio.Lock() |
| _data_init_lock = asyncio.Lock() |
| _shared_dicts = {} |
| _init_flags = {} |
| _update_flags = {} |
| _async_locks = None |
|
|
| _storage_keyed_lock = KeyedUnifiedLock() |
| direct_log(f"Process {os.getpid()} Shared-Data created for Single Process") |
|
|
| |
| _earliest_mp_cleanup_time = None |
| _last_mp_cleanup_time = None |
|
|
| |
| _initialized = True |
|
|
|
|
| async def initialize_pipeline_status(): |
| """ |
| Initialize pipeline namespace with default values. |
| This function is called during FASTAPI lifespan for each worker. |
| """ |
| pipeline_namespace = await get_namespace_data("pipeline_status", first_init=True) |
|
|
| async with get_internal_lock(): |
| |
| if "busy" in pipeline_namespace: |
| return |
|
|
| |
| history_messages = _manager.list() if _is_multiprocess else [] |
| pipeline_namespace.update( |
| { |
| "autoscanned": False, |
| "busy": False, |
| "job_name": "-", |
| "job_start": None, |
| "docs": 0, |
| "batchs": 0, |
| "cur_batch": 0, |
| "request_pending": False, |
| "latest_message": "", |
| "history_messages": history_messages, |
| } |
| ) |
| direct_log(f"Process {os.getpid()} Pipeline namespace initialized") |
|
|
|
|
| async def get_update_flag(namespace: str): |
| """ |
| Create a namespace's update flag for a workers. |
| Returen the update flag to caller for referencing or reset. |
| """ |
| global _update_flags |
| if _update_flags is None: |
| raise ValueError("Try to create namespace before Shared-Data is initialized") |
|
|
| async with get_internal_lock(): |
| if namespace not in _update_flags: |
| if _is_multiprocess and _manager is not None: |
| _update_flags[namespace] = _manager.list() |
| else: |
| _update_flags[namespace] = [] |
| direct_log( |
| f"Process {os.getpid()} initialized updated flags for namespace: [{namespace}]" |
| ) |
|
|
| if _is_multiprocess and _manager is not None: |
| new_update_flag = _manager.Value("b", False) |
| else: |
| |
| class MutableBoolean: |
| def __init__(self, initial_value=False): |
| self.value = initial_value |
|
|
| new_update_flag = MutableBoolean(False) |
|
|
| _update_flags[namespace].append(new_update_flag) |
| return new_update_flag |
|
|
|
|
| async def set_all_update_flags(namespace: str): |
| """Set all update flag of namespace indicating all workers need to reload data from files""" |
| global _update_flags |
| if _update_flags is None: |
| raise ValueError("Try to create namespace before Shared-Data is initialized") |
|
|
| async with get_internal_lock(): |
| if namespace not in _update_flags: |
| raise ValueError(f"Namespace {namespace} not found in update flags") |
| |
| for i in range(len(_update_flags[namespace])): |
| _update_flags[namespace][i].value = True |
|
|
|
|
| async def clear_all_update_flags(namespace: str): |
| """Clear all update flag of namespace indicating all workers need to reload data from files""" |
| global _update_flags |
| if _update_flags is None: |
| raise ValueError("Try to create namespace before Shared-Data is initialized") |
|
|
| async with get_internal_lock(): |
| if namespace not in _update_flags: |
| raise ValueError(f"Namespace {namespace} not found in update flags") |
| |
| for i in range(len(_update_flags[namespace])): |
| _update_flags[namespace][i].value = False |
|
|
|
|
| async def get_all_update_flags_status() -> Dict[str, list]: |
| """ |
| Get update flags status for all namespaces. |
| |
| Returns: |
| Dict[str, list]: A dictionary mapping namespace names to lists of update flag statuses |
| """ |
| if _update_flags is None: |
| return {} |
|
|
| result = {} |
| async with get_internal_lock(): |
| for namespace, flags in _update_flags.items(): |
| worker_statuses = [] |
| for flag in flags: |
| if _is_multiprocess: |
| worker_statuses.append(flag.value) |
| else: |
| worker_statuses.append(flag) |
| result[namespace] = worker_statuses |
|
|
| return result |
|
|
|
|
| async def try_initialize_namespace(namespace: str) -> bool: |
| """ |
| Returns True if the current worker(process) gets initialization permission for loading data later. |
| The worker does not get the permission is prohibited to load data from files. |
| """ |
| global _init_flags, _manager |
|
|
| if _init_flags is None: |
| raise ValueError("Try to create nanmespace before Shared-Data is initialized") |
|
|
| async with get_internal_lock(): |
| if namespace not in _init_flags: |
| _init_flags[namespace] = True |
| direct_log( |
| f"Process {os.getpid()} ready to initialize storage namespace: [{namespace}]" |
| ) |
| return True |
| direct_log( |
| f"Process {os.getpid()} storage namespace already initialized: [{namespace}]" |
| ) |
|
|
| return False |
|
|
|
|
| async def get_namespace_data( |
| namespace: str, first_init: bool = False |
| ) -> Dict[str, Any]: |
| """get the shared data reference for specific namespace |
| |
| Args: |
| namespace: The namespace to retrieve |
| allow_create: If True, allows creation of the namespace if it doesn't exist. |
| Used internally by initialize_pipeline_status(). |
| """ |
| if _shared_dicts is None: |
| direct_log( |
| f"Error: try to getnanmespace before it is initialized, pid={os.getpid()}", |
| level="ERROR", |
| ) |
| raise ValueError("Shared dictionaries not initialized") |
|
|
| async with get_internal_lock(): |
| if namespace not in _shared_dicts: |
| |
| if namespace == "pipeline_status" and not first_init: |
| |
| |
| raise PipelineNotInitializedError(namespace) |
|
|
| |
| if _is_multiprocess and _manager is not None: |
| _shared_dicts[namespace] = _manager.dict() |
| else: |
| _shared_dicts[namespace] = {} |
|
|
| return _shared_dicts[namespace] |
|
|
|
|
| def finalize_share_data(): |
| """ |
| Release shared resources and clean up. |
| |
| This function should be called when the application is shutting down |
| to properly release shared resources and avoid memory leaks. |
| |
| In multi-process mode, it shuts down the Manager and releases all shared objects. |
| In single-process mode, it simply resets the global variables. |
| """ |
| global \ |
| _manager, \ |
| _is_multiprocess, \ |
| _storage_lock, \ |
| _internal_lock, \ |
| _pipeline_status_lock, \ |
| _graph_db_lock, \ |
| _data_init_lock, \ |
| _shared_dicts, \ |
| _init_flags, \ |
| _initialized, \ |
| _update_flags, \ |
| _async_locks |
|
|
| |
| if not _initialized: |
| direct_log( |
| f"Process {os.getpid()} storage data not initialized, nothing to finalize" |
| ) |
| return |
|
|
| direct_log( |
| f"Process {os.getpid()} finalizing storage data (multiprocess={_is_multiprocess})" |
| ) |
|
|
| |
| if _is_multiprocess and _manager is not None: |
| try: |
| |
| if _shared_dicts is not None: |
| |
| try: |
| pipeline_status = _shared_dicts.get("pipeline_status", {}) |
| if "history_messages" in pipeline_status: |
| pipeline_status["history_messages"].clear() |
| except Exception: |
| pass |
| _shared_dicts.clear() |
| if _init_flags is not None: |
| _init_flags.clear() |
| if _update_flags is not None: |
| |
| try: |
| for namespace in _update_flags: |
| flags_list = _update_flags[namespace] |
| if isinstance(flags_list, list): |
| |
| for flag in flags_list: |
| if hasattr( |
| flag, "value" |
| ): |
| flag.value = False |
| flags_list.clear() |
| except Exception: |
| pass |
| _update_flags.clear() |
|
|
| |
| _manager.shutdown() |
| direct_log(f"Process {os.getpid()} Manager shutdown complete") |
| except Exception as e: |
| direct_log( |
| f"Process {os.getpid()} Error shutting down Manager: {e}", level="ERROR" |
| ) |
|
|
| |
| _manager = None |
| _initialized = None |
| _is_multiprocess = None |
| _shared_dicts = None |
| _init_flags = None |
| _storage_lock = None |
| _internal_lock = None |
| _pipeline_status_lock = None |
| _graph_db_lock = None |
| _data_init_lock = None |
| _update_flags = None |
| _async_locks = None |
|
|
| direct_log(f"Process {os.getpid()} storage data finalization complete") |
|
|