Spaces:
Running
Running
| """Claude Code CLI session management.""" | |
| import asyncio | |
| import json | |
| import os | |
| from collections.abc import AsyncGenerator | |
| from typing import Any | |
| from loguru import logger | |
| from .process_registry import register_pid, unregister_pid | |
| class CLISession: | |
| """Manages a single persistent Claude Code CLI subprocess.""" | |
| def __init__( | |
| self, | |
| workspace_path: str, | |
| api_url: str, | |
| allowed_dirs: list[str] | None = None, | |
| plans_directory: str | None = None, | |
| ): | |
| self.workspace = os.path.normpath(os.path.abspath(workspace_path)) | |
| self.api_url = api_url | |
| self.allowed_dirs = [os.path.normpath(d) for d in (allowed_dirs or [])] | |
| self.plans_directory = plans_directory | |
| self.process: asyncio.subprocess.Process | None = None | |
| self.current_session_id: str | None = None | |
| self._is_busy = False | |
| self._cli_lock = asyncio.Lock() | |
| def is_busy(self) -> bool: | |
| """Check if a task is currently running.""" | |
| return self._is_busy | |
| async def start_task( | |
| self, prompt: str, session_id: str | None = None, fork_session: bool = False | |
| ) -> AsyncGenerator[dict]: | |
| """ | |
| Start a new task or continue an existing session. | |
| Args: | |
| prompt: The user's message/prompt | |
| session_id: Optional session ID to resume | |
| Yields: | |
| Event dictionaries from the CLI | |
| """ | |
| async with self._cli_lock: | |
| self._is_busy = True | |
| env = os.environ.copy() | |
| if "ANTHROPIC_API_KEY" not in env: | |
| env["ANTHROPIC_API_KEY"] = "sk-placeholder-key-for-proxy" | |
| env["ANTHROPIC_API_URL"] = self.api_url | |
| if self.api_url.endswith("/v1"): | |
| env["ANTHROPIC_BASE_URL"] = self.api_url[:-3] | |
| else: | |
| env["ANTHROPIC_BASE_URL"] = self.api_url | |
| env["TERM"] = "dumb" | |
| env["PYTHONIOENCODING"] = "utf-8" | |
| # Build command | |
| if session_id and not session_id.startswith("pending_"): | |
| cmd = [ | |
| "claude", | |
| "--resume", | |
| session_id, | |
| ] | |
| if fork_session: | |
| cmd.append("--fork-session") | |
| cmd += [ | |
| "-p", | |
| prompt, | |
| "--output-format", | |
| "stream-json", | |
| "--dangerously-skip-permissions", | |
| "--verbose", | |
| ] | |
| logger.info(f"Resuming Claude session {session_id}") | |
| else: | |
| cmd = [ | |
| "claude", | |
| "-p", | |
| prompt, | |
| "--output-format", | |
| "stream-json", | |
| "--dangerously-skip-permissions", | |
| "--verbose", | |
| ] | |
| logger.info("Starting new Claude session") | |
| if self.allowed_dirs: | |
| for d in self.allowed_dirs: | |
| cmd.extend(["--add-dir", d]) | |
| if self.plans_directory is not None: | |
| settings_json = json.dumps({"plansDirectory": self.plans_directory}) | |
| cmd.extend(["--settings", settings_json]) | |
| try: | |
| self.process = await asyncio.create_subprocess_exec( | |
| *cmd, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| cwd=self.workspace, | |
| env=env, | |
| ) | |
| if self.process and self.process.pid: | |
| register_pid(self.process.pid) | |
| if not self.process or not self.process.stdout: | |
| yield {"type": "exit", "code": 1} | |
| return | |
| session_id_extracted = False | |
| buffer = bytearray() | |
| try: | |
| while True: | |
| chunk = await self.process.stdout.read(65536) | |
| if not chunk: | |
| if buffer: | |
| line_str = buffer.decode( | |
| "utf-8", errors="replace" | |
| ).strip() | |
| if line_str: | |
| async for event in self._handle_line_gen( | |
| line_str, session_id_extracted | |
| ): | |
| if event.get("type") == "session_info": | |
| session_id_extracted = True | |
| yield event | |
| break | |
| buffer.extend(chunk) | |
| while True: | |
| newline_pos = buffer.find(b"\n") | |
| if newline_pos == -1: | |
| break | |
| line = buffer[:newline_pos] | |
| buffer = buffer[newline_pos + 1 :] | |
| line_str = line.decode("utf-8", errors="replace").strip() | |
| if line_str: | |
| async for event in self._handle_line_gen( | |
| line_str, session_id_extracted | |
| ): | |
| if event.get("type") == "session_info": | |
| session_id_extracted = True | |
| yield event | |
| except asyncio.CancelledError: | |
| # Cancelling the handler task should not leave a Claude CLI | |
| # subprocess running in the background. | |
| try: | |
| await asyncio.shield(self.stop()) | |
| finally: | |
| raise | |
| stderr_text = None | |
| if self.process.stderr: | |
| stderr_output = await self.process.stderr.read() | |
| if stderr_output: | |
| stderr_text = stderr_output.decode( | |
| "utf-8", errors="replace" | |
| ).strip() | |
| logger.error(f"Claude CLI Stderr: {stderr_text}") | |
| # Yield stderr as error event so it shows in UI | |
| if stderr_text: | |
| logger.info("CLI_SESSION: Yielding error event from stderr") | |
| yield {"type": "error", "error": {"message": stderr_text}} | |
| return_code = await self.process.wait() | |
| logger.info( | |
| f"Claude CLI exited with code {return_code}, stderr_present={bool(stderr_text)}" | |
| ) | |
| if return_code != 0 and not stderr_text: | |
| logger.warning( | |
| f"CLI_SESSION: Process exited with code {return_code} but no stderr captured" | |
| ) | |
| yield { | |
| "type": "exit", | |
| "code": return_code, | |
| "stderr": stderr_text, | |
| } | |
| finally: | |
| self._is_busy = False | |
| if self.process and self.process.pid: | |
| unregister_pid(self.process.pid) | |
| async def _handle_line_gen( | |
| self, line_str: str, session_id_extracted: bool | |
| ) -> AsyncGenerator[dict]: | |
| """Process a single line and yield events.""" | |
| try: | |
| event = json.loads(line_str) | |
| if not session_id_extracted: | |
| extracted_id = self._extract_session_id(event) | |
| if extracted_id: | |
| self.current_session_id = extracted_id | |
| logger.info(f"Extracted session ID: {extracted_id}") | |
| yield {"type": "session_info", "session_id": extracted_id} | |
| yield event | |
| except json.JSONDecodeError: | |
| logger.debug(f"Non-JSON output: {line_str}") | |
| yield {"type": "raw", "content": line_str} | |
| def _extract_session_id(self, event: Any) -> str | None: | |
| """Extract session ID from CLI event.""" | |
| if not isinstance(event, dict): | |
| return None | |
| if "session_id" in event: | |
| return event["session_id"] | |
| if "sessionId" in event: | |
| return event["sessionId"] | |
| for key in ["init", "system", "result", "metadata"]: | |
| if key in event and isinstance(event[key], dict): | |
| nested = event[key] | |
| if "session_id" in nested: | |
| return nested["session_id"] | |
| if "sessionId" in nested: | |
| return nested["sessionId"] | |
| if "conversation" in event and isinstance(event["conversation"], dict): | |
| conv = event["conversation"] | |
| if "id" in conv: | |
| return conv["id"] | |
| return None | |
| async def stop(self): | |
| """Stop the CLI process.""" | |
| if self.process and self.process.returncode is None: | |
| try: | |
| logger.info(f"Stopping Claude CLI process {self.process.pid}") | |
| self.process.terminate() | |
| try: | |
| await asyncio.wait_for(self.process.wait(), timeout=5.0) | |
| except TimeoutError: | |
| self.process.kill() | |
| await self.process.wait() | |
| if self.process and self.process.pid: | |
| unregister_pid(self.process.pid) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error stopping process: {e}") | |
| return False | |
| return False | |