Spaces:
Running
Running
| """ | |
| CLI Session Manager for Multi-Instance Claude CLI Support | |
| Manages a pool of CLISession instances, each handling one conversation. | |
| This enables true parallel processing where multiple conversations run | |
| simultaneously in separate CLI processes. | |
| """ | |
| import asyncio | |
| import uuid | |
| from loguru import logger | |
| from .session import CLISession | |
| class CLISessionManager: | |
| """ | |
| Manages multiple CLISession instances for parallel conversation processing. | |
| Each new conversation gets its own CLISession with its own subprocess. | |
| Replies to existing conversations reuse the same CLISession instance. | |
| """ | |
| def __init__( | |
| self, | |
| workspace_path: str, | |
| api_url: str, | |
| allowed_dirs: list[str] | None = None, | |
| plans_directory: str | None = None, | |
| ): | |
| """ | |
| Initialize the session manager. | |
| Args: | |
| workspace_path: Working directory for CLI processes | |
| api_url: API URL for the proxy | |
| allowed_dirs: Directories the CLI is allowed to access | |
| plans_directory: Directory for Claude Code CLI plan files (passed via --settings) | |
| """ | |
| self.workspace = workspace_path | |
| self.api_url = api_url | |
| self.allowed_dirs = allowed_dirs or [] | |
| self.plans_directory = plans_directory | |
| self._sessions: dict[str, CLISession] = {} | |
| self._pending_sessions: dict[str, CLISession] = {} | |
| self._temp_to_real: dict[str, str] = {} | |
| self._real_to_temp: dict[str, str] = {} | |
| self._lock = asyncio.Lock() | |
| logger.info("CLISessionManager initialized") | |
| async def get_or_create_session( | |
| self, session_id: str | None = None | |
| ) -> tuple[CLISession, str, bool]: | |
| """ | |
| Get an existing session or create a new one. | |
| Returns: | |
| Tuple of (CLISession instance, session_id, is_new_session) | |
| """ | |
| async with self._lock: | |
| if session_id: | |
| lookup_id = self._temp_to_real.get(session_id, session_id) | |
| if lookup_id in self._sessions: | |
| return self._sessions[lookup_id], lookup_id, False | |
| if lookup_id in self._pending_sessions: | |
| return self._pending_sessions[lookup_id], lookup_id, False | |
| temp_id = session_id if session_id else f"pending_{uuid.uuid4().hex[:8]}" | |
| new_session = CLISession( | |
| workspace_path=self.workspace, | |
| api_url=self.api_url, | |
| allowed_dirs=self.allowed_dirs, | |
| plans_directory=self.plans_directory, | |
| ) | |
| self._pending_sessions[temp_id] = new_session | |
| logger.info(f"Created new session: {temp_id}") | |
| return new_session, temp_id, True | |
| async def register_real_session_id( | |
| self, temp_id: str, real_session_id: str | |
| ) -> bool: | |
| """Register the real session ID from CLI output.""" | |
| async with self._lock: | |
| if temp_id not in self._pending_sessions: | |
| logger.warning(f"Temp session {temp_id} not found") | |
| return False | |
| session = self._pending_sessions.pop(temp_id) | |
| self._sessions[real_session_id] = session | |
| self._temp_to_real[temp_id] = real_session_id | |
| self._real_to_temp[real_session_id] = temp_id | |
| logger.info(f"Registered session: {temp_id} -> {real_session_id}") | |
| return True | |
| async def remove_session(self, session_id: str) -> bool: | |
| """Remove a session from the manager.""" | |
| async with self._lock: | |
| if session_id in self._pending_sessions: | |
| session = self._pending_sessions.pop(session_id) | |
| await session.stop() | |
| return True | |
| if session_id in self._sessions: | |
| session = self._sessions.pop(session_id) | |
| await session.stop() | |
| temp_id = self._real_to_temp.pop(session_id, None) | |
| if temp_id is not None: | |
| self._temp_to_real.pop(temp_id, None) | |
| return True | |
| return False | |
| async def stop_all(self): | |
| """Stop all sessions.""" | |
| async with self._lock: | |
| all_sessions = list(self._sessions.values()) + list( | |
| self._pending_sessions.values() | |
| ) | |
| for session in all_sessions: | |
| try: | |
| await session.stop() | |
| except Exception as e: | |
| logger.error(f"Error stopping session: {e}") | |
| self._sessions.clear() | |
| self._pending_sessions.clear() | |
| self._temp_to_real.clear() | |
| self._real_to_temp.clear() | |
| logger.info("All sessions stopped") | |
| def get_stats(self) -> dict: | |
| """Get session statistics.""" | |
| return { | |
| "active_sessions": len(self._sessions), | |
| "pending_sessions": len(self._pending_sessions), | |
| "busy_count": sum(1 for s in self._sessions.values() if s.is_busy), | |
| } | |