Spaces:
Running
Running
| """Tests for cli/ module.""" | |
| import asyncio | |
| import json | |
| import os | |
| from unittest.mock import AsyncMock, MagicMock, patch | |
| import pytest | |
| from messaging.event_parser import parse_cli_event | |
| # --- Existing Parser Tests --- | |
| class TestCLIParser: | |
| """Test CLI event parsing.""" | |
| def test_parse_text_content(self): | |
| """Test parsing text content from assistant message.""" | |
| event = { | |
| "type": "assistant", | |
| "message": {"content": [{"type": "text", "text": "Hello, world!"}]}, | |
| } | |
| result = parse_cli_event(event) | |
| assert len(result) == 1 | |
| assert result[0]["type"] == "text_chunk" | |
| assert result[0]["text"] == "Hello, world!" | |
| def test_parse_thinking_content(self): | |
| """Test parsing thinking content.""" | |
| event = { | |
| "type": "assistant", | |
| "message": { | |
| "content": [{"type": "thinking", "thinking": "Let me think..."}] | |
| }, | |
| } | |
| result = parse_cli_event(event) | |
| assert len(result) == 1 | |
| assert result[0]["type"] == "thinking_chunk" | |
| assert ( | |
| result[0]["text"] == "Let me think...\n" | |
| or result[0]["text"] == "Let me think..." | |
| ) | |
| def test_parse_multiple_content(self): | |
| """Test parsing mixed content (thinking + tools).""" | |
| event = { | |
| "type": "assistant", | |
| "message": { | |
| "content": [ | |
| {"type": "thinking", "thinking": "Thinking..."}, | |
| {"type": "tool_use", "name": "ls", "input": {}}, | |
| ] | |
| }, | |
| } | |
| result = parse_cli_event(event) | |
| assert len(result) == 2 | |
| assert result[0]["type"] == "thinking_chunk" | |
| assert result[0]["text"] == "Thinking..." | |
| assert result[1]["type"] == "tool_use" | |
| def test_parse_tool_use(self): | |
| """Test parsing tool use content.""" | |
| event = { | |
| "type": "assistant", | |
| "message": { | |
| "content": [ | |
| { | |
| "type": "tool_use", | |
| "name": "read_file", | |
| "input": {"path": "/test"}, | |
| } | |
| ] | |
| }, | |
| } | |
| result = parse_cli_event(event) | |
| assert len(result) == 1 | |
| assert result[0]["type"] == "tool_use" | |
| assert result[0]["name"] == "read_file" | |
| assert result[0]["input"] == {"path": "/test"} | |
| def test_parse_text_delta(self): | |
| """Test parsing streaming text delta.""" | |
| event = { | |
| "type": "content_block_delta", | |
| "index": 0, | |
| "delta": {"type": "text_delta", "text": "streaming text"}, | |
| } | |
| result = parse_cli_event(event) | |
| assert len(result) == 1 | |
| assert result[0]["type"] == "text_delta" | |
| assert result[0]["text"] == "streaming text" | |
| def test_parse_thinking_delta(self): | |
| """Test parsing streaming thinking delta.""" | |
| event = { | |
| "type": "content_block_delta", | |
| "index": 1, | |
| "delta": {"type": "thinking_delta", "thinking": "thinking..."}, | |
| } | |
| result = parse_cli_event(event) | |
| assert len(result) == 1 | |
| assert result[0]["type"] == "thinking_delta" | |
| assert result[0]["text"] == "thinking..." | |
| def test_parse_error(self): | |
| """Test parsing error event.""" | |
| event = {"type": "error", "error": {"message": "Something went wrong"}} | |
| result = parse_cli_event(event) | |
| assert result[0]["type"] == "error" | |
| assert result[0]["message"] == "Something went wrong" | |
| def test_parse_exit_success(self): | |
| """Test parsing exit event with success.""" | |
| event = {"type": "exit", "code": 0} | |
| result = parse_cli_event(event) | |
| assert result[0]["type"] == "complete" | |
| assert result[0]["status"] == "success" | |
| def test_parse_exit_failure(self): | |
| """Test parsing exit event with failure returns error then complete.""" | |
| event = {"type": "exit", "code": 1} | |
| result = parse_cli_event(event) | |
| # Non-zero exit now returns error first, then complete | |
| assert len(result) == 2 | |
| assert result[0]["type"] == "error" | |
| assert ( | |
| "exit" in result[0]["message"].lower() | |
| or "code" in result[0]["message"].lower() | |
| ) | |
| assert result[1]["type"] == "complete" | |
| assert result[1]["status"] == "failed" | |
| def test_parse_invalid_event(self): | |
| """Test parsing returns empty list for unrecognized event.""" | |
| result = parse_cli_event({"type": "unknown"}) | |
| assert result == [] | |
| def test_parse_non_dict(self): | |
| """Test parsing returns empty list for non-dict input.""" | |
| result = parse_cli_event("not a dict") | |
| assert result == [] | |
| # --- CLI Session Tests --- | |
| class TestCLISession: | |
| """Test CLISession.""" | |
| def test_session_init(self): | |
| """Test CLISession initialization.""" | |
| from cli.session import CLISession | |
| session = CLISession( | |
| workspace_path="/tmp/test", | |
| api_url="http://localhost:8082/v1", | |
| allowed_dirs=["/home/user/projects"], | |
| ) | |
| assert session.workspace == os.path.normpath(os.path.abspath("/tmp/test")) | |
| assert session.api_url == "http://localhost:8082/v1" | |
| assert not session.is_busy | |
| def test_session_extract_session_id(self): | |
| """Test session ID extraction from various event formats.""" | |
| from cli.session import CLISession | |
| session = CLISession("/tmp", "http://localhost:8082/v1") | |
| # Direct session_id field | |
| assert session._extract_session_id({"session_id": "abc123"}) == "abc123" | |
| assert session._extract_session_id({"sessionId": "abc123"}) == "abc123" | |
| # Nested in init | |
| assert ( | |
| session._extract_session_id({"init": {"session_id": "nested123"}}) | |
| == "nested123" | |
| ) | |
| # Nested in result | |
| assert ( | |
| session._extract_session_id({"result": {"session_id": "res123"}}) | |
| == "res123" | |
| ) | |
| # Conversation id | |
| assert ( | |
| session._extract_session_id({"conversation": {"id": "conv123"}}) | |
| == "conv123" | |
| ) | |
| # No session ID | |
| assert session._extract_session_id({"type": "message"}) is None | |
| assert session._extract_session_id("not a dict") is None | |
| async def test_start_task_basic_flow(self): | |
| """Test start_task running a basic command flow.""" | |
| from cli.session import CLISession | |
| session = CLISession("/tmp", "http://localhost:8082/v1") | |
| # Mock subprocess | |
| mock_process = AsyncMock() | |
| mock_process.stdout.read.side_effect = [ | |
| b'{"type": "message", "content": "Hello"}\n', | |
| b'{"session_id": "sess_1"}\n', | |
| b"", # EOF | |
| ] | |
| mock_process.stderr.read.return_value = b"" # No error | |
| mock_process.wait.return_value = 0 | |
| mock_process.returncode = 0 | |
| with patch( | |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock | |
| ) as mock_exec: | |
| mock_exec.return_value = mock_process | |
| events = [e async for e in session.start_task("Hello")] | |
| # Verify command construction | |
| # Arg 1 is subprocess command | |
| args = mock_exec.call_args[0] | |
| assert args[0] == "claude" | |
| assert "-p" in args | |
| assert "Hello" in args | |
| # Verify events | |
| assert ( | |
| len(events) == 4 | |
| ) # message, session_id, session_info (synthesized), exit | |
| assert events[0] == {"type": "message", "content": "Hello"} | |
| assert events[1] == {"type": "session_info", "session_id": "sess_1"} | |
| # The session_info event is yielded by _handle_line_gen right after extracting ID | |
| assert events[2] == {"session_id": "sess_1"} # The original event | |
| assert events[3] == {"type": "exit", "code": 0, "stderr": None} | |
| assert session.current_session_id == "sess_1" | |
| async def test_start_task_with_session_resume(self): | |
| """Test resuming an existing session.""" | |
| from cli.session import CLISession | |
| session = CLISession("/tmp", "http://localhost:8082/v1") | |
| mock_process = AsyncMock() | |
| mock_process.stdout.read.side_effect = [ | |
| b"", | |
| ] # Immediate EOF | |
| mock_process.stderr.read.return_value = b"" | |
| mock_process.wait.return_value = 0 | |
| with patch( | |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock | |
| ) as mock_exec: | |
| mock_exec.return_value = mock_process | |
| async for _ in session.start_task("Hello", session_id="sess_abc"): | |
| pass | |
| args = mock_exec.call_args[0] | |
| assert "--resume" in args | |
| assert "sess_abc" in args | |
| assert "--fork-session" not in args | |
| async def test_start_task_with_session_resume_and_fork(self): | |
| """Test resuming an existing session and forking.""" | |
| from cli.session import CLISession | |
| session = CLISession("/tmp", "http://localhost:8082/v1") | |
| mock_process = AsyncMock() | |
| mock_process.stdout.read.side_effect = [b""] # Immediate EOF | |
| mock_process.stderr.read.return_value = b"" | |
| mock_process.wait.return_value = 0 | |
| with patch( | |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock | |
| ) as mock_exec: | |
| mock_exec.return_value = mock_process | |
| async for _ in session.start_task( | |
| "Hello", session_id="sess_abc", fork_session=True | |
| ): | |
| pass | |
| args = mock_exec.call_args[0] | |
| assert "--resume" in args | |
| assert "sess_abc" in args | |
| assert "--fork-session" in args | |
| async def test_start_task_process_failure_with_stderr(self): | |
| """Test process exit with error code and stderr output.""" | |
| from cli.session import CLISession | |
| session = CLISession("/tmp", "http://localhost:8082/v1") | |
| mock_process = AsyncMock() | |
| mock_process.stdout.read.side_effect = [b""] # No stdout | |
| mock_process.stderr.read.return_value = b"Fatal error" | |
| mock_process.wait.return_value = 1 | |
| with patch( | |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock | |
| ) as mock_exec: | |
| mock_exec.return_value = mock_process | |
| events = [e async for e in session.start_task("Hello")] | |
| # Should have error event from stderr, then exit event | |
| assert len(events) == 2 | |
| assert events[0]["type"] == "error" | |
| assert events[0]["error"]["message"] == "Fatal error" | |
| assert events[1]["type"] == "exit" | |
| assert events[1]["code"] == 1 | |
| assert events[1]["stderr"] == "Fatal error" | |
| async def test_stop_session(self): | |
| """Test stopping the session process.""" | |
| from cli.session import CLISession | |
| session = CLISession("/tmp", "http://localhost:8082/v1") | |
| mock_process = MagicMock() | |
| mock_process.returncode = None # Running | |
| # Mock wait to simulate async finish | |
| mock_process.wait = AsyncMock(return_value=0) | |
| session.process = mock_process | |
| stopped = await session.stop() | |
| assert stopped is True | |
| mock_process.terminate.assert_called_once() | |
| mock_process.wait.assert_called() | |
| async def test_stop_session_timeout_force_kill(self): | |
| """Test force kill if terminate times out.""" | |
| from cli.session import CLISession | |
| session = CLISession("/tmp", "http://localhost:8082/v1") | |
| mock_process = MagicMock() | |
| mock_process.returncode = None | |
| # First wait times out | |
| async def wait_side_effect(): | |
| if not mock_process.kill.called: | |
| await asyncio.sleep(6) # Should be > 5.0 timeout | |
| return 0 | |
| # We can simulate timeout by raising TimeoutError directly on first call | |
| mock_process.wait = AsyncMock(side_effect=[asyncio.TimeoutError, 0]) | |
| session.process = mock_process | |
| stopped = await session.stop() | |
| assert stopped is True | |
| mock_process.terminate.assert_called() | |
| mock_process.kill.assert_called() | |
| async def test_start_task_split_buffer(self): | |
| """Test handling of JSON split across chunks.""" | |
| from cli.session import CLISession | |
| session = CLISession("/tmp", "http://localhost:8082/v1") | |
| mock_process = AsyncMock() | |
| # Split json: {"type": "mess... age"} | |
| mock_process.stdout.read.side_effect = [ | |
| b'{"type": "mess', | |
| b'age", "content": "Split"}\n', | |
| b"", | |
| ] | |
| mock_process.stderr.read.return_value = b"" | |
| mock_process.wait.return_value = 0 | |
| with patch( | |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock | |
| ) as mock_exec: | |
| mock_exec.return_value = mock_process | |
| events = [ | |
| e async for e in session.start_task("test") if e["type"] == "message" | |
| ] | |
| assert len(events) == 1 | |
| assert events[0]["content"] == "Split" | |
| async def test_start_task_remnant_buffer(self): | |
| """Test handling of buffer remnant at EOF (no newline at end).""" | |
| from cli.session import CLISession | |
| session = CLISession("/tmp", "http://localhost:8082/v1") | |
| mock_process = AsyncMock() | |
| mock_process.stdout.read.side_effect = [ | |
| b'{"type": "message", "content": "Remnant"}', # No newline | |
| b"", | |
| ] | |
| mock_process.stderr.read.return_value = b"" | |
| mock_process.wait.return_value = 0 | |
| with patch( | |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock | |
| ) as mock_exec: | |
| mock_exec.return_value = mock_process | |
| events = [ | |
| e async for e in session.start_task("test") if e["type"] == "message" | |
| ] | |
| assert len(events) == 1 | |
| assert events[0]["content"] == "Remnant" | |
| async def test_start_task_non_v1_url(self): | |
| """Test start_task with a non-v1 URL.""" | |
| from cli.session import CLISession | |
| # URL not ending in /v1 | |
| session = CLISession("/tmp", "http://localhost:8082") | |
| mock_process = AsyncMock() | |
| mock_process.stdout.read.side_effect = [b""] | |
| mock_process.stderr.read.return_value = b"" | |
| mock_process.wait.return_value = 0 | |
| with patch( | |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock | |
| ) as mock_exec: | |
| mock_exec.return_value = mock_process | |
| async for _ in session.start_task("test"): | |
| pass | |
| # Check env var | |
| kwargs = mock_exec.call_args[1] | |
| env = kwargs["env"] | |
| assert env["ANTHROPIC_BASE_URL"] == "http://localhost:8082" | |
| async def test_start_task_allowed_dirs(self): | |
| """Test start_task includes allowed dirs in command.""" | |
| from cli.session import CLISession | |
| session = CLISession( | |
| "/tmp", "http://localhost:8082/v1", allowed_dirs=["/dir1", "/dir2"] | |
| ) | |
| mock_process = AsyncMock() | |
| mock_process.stdout.read.side_effect = [b""] | |
| mock_process.stderr.read.return_value = b"" | |
| mock_process.wait.return_value = 0 | |
| with patch( | |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock | |
| ) as mock_exec: | |
| mock_exec.return_value = mock_process | |
| async for _ in session.start_task("test"): | |
| pass | |
| cmd = mock_exec.call_args[0] | |
| assert "--add-dir" in cmd | |
| assert os.path.normpath("/dir1") in cmd | |
| assert os.path.normpath("/dir2") in cmd | |
| async def test_start_task_plans_directory(self): | |
| """Test start_task includes --settings plansDirectory when plans_directory set.""" | |
| from cli.session import CLISession | |
| session = CLISession( | |
| "/tmp", | |
| "http://localhost:8082/v1", | |
| plans_directory="./agent_workspace/plans", | |
| ) | |
| mock_process = AsyncMock() | |
| mock_process.stdout.read.side_effect = [b""] | |
| mock_process.stderr.read.return_value = b"" | |
| mock_process.wait.return_value = 0 | |
| with patch( | |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock | |
| ) as mock_exec: | |
| mock_exec.return_value = mock_process | |
| async for _ in session.start_task("test"): | |
| pass | |
| cmd = mock_exec.call_args[0] | |
| assert "--settings" in cmd | |
| settings_idx = cmd.index("--settings") | |
| assert settings_idx + 1 < len(cmd) | |
| settings = json.loads(cmd[settings_idx + 1]) | |
| assert settings["plansDirectory"] == "./agent_workspace/plans" | |
| async def test_start_task_json_error(self): | |
| """Test handling of non-JSON output from CLI.""" | |
| from cli.session import CLISession | |
| session = CLISession("/tmp", "http://localhost:8082/v1") | |
| mock_process = AsyncMock() | |
| mock_process.stdout.read.side_effect = [b"Not valid json\n", b""] | |
| mock_process.stderr.read.return_value = b"" | |
| mock_process.wait.return_value = 0 | |
| with patch( | |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock | |
| ) as mock_exec: | |
| mock_exec.return_value = mock_process | |
| events = [e async for e in session.start_task("test") if e["type"] == "raw"] | |
| assert len(events) == 1 | |
| assert events[0]["content"] == "Not valid json" | |
| async def test_stop_exception(self): | |
| """Test exception handling during stop.""" | |
| from cli.session import CLISession | |
| session = CLISession("/tmp", "http://localhost:8082/v1") | |
| mock_process = MagicMock() | |
| mock_process.returncode = None | |
| # Raise exception on terminate | |
| mock_process.terminate.side_effect = RuntimeError("Permission denied") | |
| session.process = mock_process | |
| stopped = await session.stop() | |
| assert stopped is False | |
| class TestCLISessionManager: | |
| """Test CLISessionManager.""" | |
| async def test_manager_create_session(self): | |
| """Test creating a new session.""" | |
| from cli.manager import CLISessionManager | |
| manager = CLISessionManager( | |
| workspace_path="/tmp/test", | |
| api_url="http://localhost:8082/v1", | |
| ) | |
| session, sid, is_new = await manager.get_or_create_session() | |
| assert session is not None | |
| assert sid.startswith("pending_") | |
| assert is_new is True | |
| async def test_manager_reuse_session(self): | |
| """Test reusing an existing session.""" | |
| from cli.manager import CLISessionManager | |
| manager = CLISessionManager( | |
| workspace_path="/tmp/test", | |
| api_url="http://localhost:8082/v1", | |
| ) | |
| # Create first session | |
| s1, sid1, _is_new1 = await manager.get_or_create_session() | |
| # Request same session | |
| s2, _sid2, is_new2 = await manager.get_or_create_session(session_id=sid1) | |
| assert s1 is s2 | |
| assert is_new2 is False | |
| async def test_manager_stats(self): | |
| """Test manager stats.""" | |
| from cli.manager import CLISessionManager | |
| manager = CLISessionManager( | |
| workspace_path="/tmp/test", | |
| api_url="http://localhost:8082/v1", | |
| ) | |
| stats = manager.get_stats() | |
| assert stats["active_sessions"] == 0 | |
| assert stats["pending_sessions"] == 0 | |