Claude_Code / tests /cli /test_cli.py
Jainish1808
Move project files to repository root for Hugging Face Space
bf177ff
"""Tests for cli/ module."""
import asyncio
import json
import os
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from messaging.event_parser import parse_cli_event
# --- Existing Parser Tests ---
class TestCLIParser:
"""Test CLI event parsing."""
def test_parse_text_content(self):
"""Test parsing text content from assistant message."""
event = {
"type": "assistant",
"message": {"content": [{"type": "text", "text": "Hello, world!"}]},
}
result = parse_cli_event(event)
assert len(result) == 1
assert result[0]["type"] == "text_chunk"
assert result[0]["text"] == "Hello, world!"
def test_parse_thinking_content(self):
"""Test parsing thinking content."""
event = {
"type": "assistant",
"message": {
"content": [{"type": "thinking", "thinking": "Let me think..."}]
},
}
result = parse_cli_event(event)
assert len(result) == 1
assert result[0]["type"] == "thinking_chunk"
assert (
result[0]["text"] == "Let me think...\n"
or result[0]["text"] == "Let me think..."
)
def test_parse_multiple_content(self):
"""Test parsing mixed content (thinking + tools)."""
event = {
"type": "assistant",
"message": {
"content": [
{"type": "thinking", "thinking": "Thinking..."},
{"type": "tool_use", "name": "ls", "input": {}},
]
},
}
result = parse_cli_event(event)
assert len(result) == 2
assert result[0]["type"] == "thinking_chunk"
assert result[0]["text"] == "Thinking..."
assert result[1]["type"] == "tool_use"
def test_parse_tool_use(self):
"""Test parsing tool use content."""
event = {
"type": "assistant",
"message": {
"content": [
{
"type": "tool_use",
"name": "read_file",
"input": {"path": "/test"},
}
]
},
}
result = parse_cli_event(event)
assert len(result) == 1
assert result[0]["type"] == "tool_use"
assert result[0]["name"] == "read_file"
assert result[0]["input"] == {"path": "/test"}
def test_parse_text_delta(self):
"""Test parsing streaming text delta."""
event = {
"type": "content_block_delta",
"index": 0,
"delta": {"type": "text_delta", "text": "streaming text"},
}
result = parse_cli_event(event)
assert len(result) == 1
assert result[0]["type"] == "text_delta"
assert result[0]["text"] == "streaming text"
def test_parse_thinking_delta(self):
"""Test parsing streaming thinking delta."""
event = {
"type": "content_block_delta",
"index": 1,
"delta": {"type": "thinking_delta", "thinking": "thinking..."},
}
result = parse_cli_event(event)
assert len(result) == 1
assert result[0]["type"] == "thinking_delta"
assert result[0]["text"] == "thinking..."
def test_parse_error(self):
"""Test parsing error event."""
event = {"type": "error", "error": {"message": "Something went wrong"}}
result = parse_cli_event(event)
assert result[0]["type"] == "error"
assert result[0]["message"] == "Something went wrong"
def test_parse_exit_success(self):
"""Test parsing exit event with success."""
event = {"type": "exit", "code": 0}
result = parse_cli_event(event)
assert result[0]["type"] == "complete"
assert result[0]["status"] == "success"
def test_parse_exit_failure(self):
"""Test parsing exit event with failure returns error then complete."""
event = {"type": "exit", "code": 1}
result = parse_cli_event(event)
# Non-zero exit now returns error first, then complete
assert len(result) == 2
assert result[0]["type"] == "error"
assert (
"exit" in result[0]["message"].lower()
or "code" in result[0]["message"].lower()
)
assert result[1]["type"] == "complete"
assert result[1]["status"] == "failed"
def test_parse_invalid_event(self):
"""Test parsing returns empty list for unrecognized event."""
result = parse_cli_event({"type": "unknown"})
assert result == []
def test_parse_non_dict(self):
"""Test parsing returns empty list for non-dict input."""
result = parse_cli_event("not a dict")
assert result == []
# --- CLI Session Tests ---
class TestCLISession:
"""Test CLISession."""
def test_session_init(self):
"""Test CLISession initialization."""
from cli.session import CLISession
session = CLISession(
workspace_path="/tmp/test",
api_url="http://localhost:8082/v1",
allowed_dirs=["/home/user/projects"],
)
assert session.workspace == os.path.normpath(os.path.abspath("/tmp/test"))
assert session.api_url == "http://localhost:8082/v1"
assert not session.is_busy
def test_session_extract_session_id(self):
"""Test session ID extraction from various event formats."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
# Direct session_id field
assert session._extract_session_id({"session_id": "abc123"}) == "abc123"
assert session._extract_session_id({"sessionId": "abc123"}) == "abc123"
# Nested in init
assert (
session._extract_session_id({"init": {"session_id": "nested123"}})
== "nested123"
)
# Nested in result
assert (
session._extract_session_id({"result": {"session_id": "res123"}})
== "res123"
)
# Conversation id
assert (
session._extract_session_id({"conversation": {"id": "conv123"}})
== "conv123"
)
# No session ID
assert session._extract_session_id({"type": "message"}) is None
assert session._extract_session_id("not a dict") is None
@pytest.mark.asyncio
async def test_start_task_basic_flow(self):
"""Test start_task running a basic command flow."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
# Mock subprocess
mock_process = AsyncMock()
mock_process.stdout.read.side_effect = [
b'{"type": "message", "content": "Hello"}\n',
b'{"session_id": "sess_1"}\n',
b"", # EOF
]
mock_process.stderr.read.return_value = b"" # No error
mock_process.wait.return_value = 0
mock_process.returncode = 0
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
events = [e async for e in session.start_task("Hello")]
# Verify command construction
# Arg 1 is subprocess command
args = mock_exec.call_args[0]
assert args[0] == "claude"
assert "-p" in args
assert "Hello" in args
# Verify events
assert (
len(events) == 4
) # message, session_id, session_info (synthesized), exit
assert events[0] == {"type": "message", "content": "Hello"}
assert events[1] == {"type": "session_info", "session_id": "sess_1"}
# The session_info event is yielded by _handle_line_gen right after extracting ID
assert events[2] == {"session_id": "sess_1"} # The original event
assert events[3] == {"type": "exit", "code": 0, "stderr": None}
assert session.current_session_id == "sess_1"
@pytest.mark.asyncio
async def test_start_task_with_session_resume(self):
"""Test resuming an existing session."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
mock_process = AsyncMock()
mock_process.stdout.read.side_effect = [
b"",
] # Immediate EOF
mock_process.stderr.read.return_value = b""
mock_process.wait.return_value = 0
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
async for _ in session.start_task("Hello", session_id="sess_abc"):
pass
args = mock_exec.call_args[0]
assert "--resume" in args
assert "sess_abc" in args
assert "--fork-session" not in args
@pytest.mark.asyncio
async def test_start_task_with_session_resume_and_fork(self):
"""Test resuming an existing session and forking."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
mock_process = AsyncMock()
mock_process.stdout.read.side_effect = [b""] # Immediate EOF
mock_process.stderr.read.return_value = b""
mock_process.wait.return_value = 0
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
async for _ in session.start_task(
"Hello", session_id="sess_abc", fork_session=True
):
pass
args = mock_exec.call_args[0]
assert "--resume" in args
assert "sess_abc" in args
assert "--fork-session" in args
@pytest.mark.asyncio
async def test_start_task_process_failure_with_stderr(self):
"""Test process exit with error code and stderr output."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
mock_process = AsyncMock()
mock_process.stdout.read.side_effect = [b""] # No stdout
mock_process.stderr.read.return_value = b"Fatal error"
mock_process.wait.return_value = 1
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
events = [e async for e in session.start_task("Hello")]
# Should have error event from stderr, then exit event
assert len(events) == 2
assert events[0]["type"] == "error"
assert events[0]["error"]["message"] == "Fatal error"
assert events[1]["type"] == "exit"
assert events[1]["code"] == 1
assert events[1]["stderr"] == "Fatal error"
@pytest.mark.asyncio
async def test_stop_session(self):
"""Test stopping the session process."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
mock_process = MagicMock()
mock_process.returncode = None # Running
# Mock wait to simulate async finish
mock_process.wait = AsyncMock(return_value=0)
session.process = mock_process
stopped = await session.stop()
assert stopped is True
mock_process.terminate.assert_called_once()
mock_process.wait.assert_called()
@pytest.mark.asyncio
async def test_stop_session_timeout_force_kill(self):
"""Test force kill if terminate times out."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
mock_process = MagicMock()
mock_process.returncode = None
# First wait times out
async def wait_side_effect():
if not mock_process.kill.called:
await asyncio.sleep(6) # Should be > 5.0 timeout
return 0
# We can simulate timeout by raising TimeoutError directly on first call
mock_process.wait = AsyncMock(side_effect=[asyncio.TimeoutError, 0])
session.process = mock_process
stopped = await session.stop()
assert stopped is True
mock_process.terminate.assert_called()
mock_process.kill.assert_called()
@pytest.mark.asyncio
async def test_start_task_split_buffer(self):
"""Test handling of JSON split across chunks."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
mock_process = AsyncMock()
# Split json: {"type": "mess... age"}
mock_process.stdout.read.side_effect = [
b'{"type": "mess',
b'age", "content": "Split"}\n',
b"",
]
mock_process.stderr.read.return_value = b""
mock_process.wait.return_value = 0
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
events = [
e async for e in session.start_task("test") if e["type"] == "message"
]
assert len(events) == 1
assert events[0]["content"] == "Split"
@pytest.mark.asyncio
async def test_start_task_remnant_buffer(self):
"""Test handling of buffer remnant at EOF (no newline at end)."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
mock_process = AsyncMock()
mock_process.stdout.read.side_effect = [
b'{"type": "message", "content": "Remnant"}', # No newline
b"",
]
mock_process.stderr.read.return_value = b""
mock_process.wait.return_value = 0
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
events = [
e async for e in session.start_task("test") if e["type"] == "message"
]
assert len(events) == 1
assert events[0]["content"] == "Remnant"
@pytest.mark.asyncio
async def test_start_task_non_v1_url(self):
"""Test start_task with a non-v1 URL."""
from cli.session import CLISession
# URL not ending in /v1
session = CLISession("/tmp", "http://localhost:8082")
mock_process = AsyncMock()
mock_process.stdout.read.side_effect = [b""]
mock_process.stderr.read.return_value = b""
mock_process.wait.return_value = 0
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
async for _ in session.start_task("test"):
pass
# Check env var
kwargs = mock_exec.call_args[1]
env = kwargs["env"]
assert env["ANTHROPIC_BASE_URL"] == "http://localhost:8082"
@pytest.mark.asyncio
async def test_start_task_allowed_dirs(self):
"""Test start_task includes allowed dirs in command."""
from cli.session import CLISession
session = CLISession(
"/tmp", "http://localhost:8082/v1", allowed_dirs=["/dir1", "/dir2"]
)
mock_process = AsyncMock()
mock_process.stdout.read.side_effect = [b""]
mock_process.stderr.read.return_value = b""
mock_process.wait.return_value = 0
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
async for _ in session.start_task("test"):
pass
cmd = mock_exec.call_args[0]
assert "--add-dir" in cmd
assert os.path.normpath("/dir1") in cmd
assert os.path.normpath("/dir2") in cmd
@pytest.mark.asyncio
async def test_start_task_plans_directory(self):
"""Test start_task includes --settings plansDirectory when plans_directory set."""
from cli.session import CLISession
session = CLISession(
"/tmp",
"http://localhost:8082/v1",
plans_directory="./agent_workspace/plans",
)
mock_process = AsyncMock()
mock_process.stdout.read.side_effect = [b""]
mock_process.stderr.read.return_value = b""
mock_process.wait.return_value = 0
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
async for _ in session.start_task("test"):
pass
cmd = mock_exec.call_args[0]
assert "--settings" in cmd
settings_idx = cmd.index("--settings")
assert settings_idx + 1 < len(cmd)
settings = json.loads(cmd[settings_idx + 1])
assert settings["plansDirectory"] == "./agent_workspace/plans"
@pytest.mark.asyncio
async def test_start_task_json_error(self):
"""Test handling of non-JSON output from CLI."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
mock_process = AsyncMock()
mock_process.stdout.read.side_effect = [b"Not valid json\n", b""]
mock_process.stderr.read.return_value = b""
mock_process.wait.return_value = 0
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
events = [e async for e in session.start_task("test") if e["type"] == "raw"]
assert len(events) == 1
assert events[0]["content"] == "Not valid json"
@pytest.mark.asyncio
async def test_stop_exception(self):
"""Test exception handling during stop."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
mock_process = MagicMock()
mock_process.returncode = None
# Raise exception on terminate
mock_process.terminate.side_effect = RuntimeError("Permission denied")
session.process = mock_process
stopped = await session.stop()
assert stopped is False
class TestCLISessionManager:
"""Test CLISessionManager."""
@pytest.mark.asyncio
async def test_manager_create_session(self):
"""Test creating a new session."""
from cli.manager import CLISessionManager
manager = CLISessionManager(
workspace_path="/tmp/test",
api_url="http://localhost:8082/v1",
)
session, sid, is_new = await manager.get_or_create_session()
assert session is not None
assert sid.startswith("pending_")
assert is_new is True
@pytest.mark.asyncio
async def test_manager_reuse_session(self):
"""Test reusing an existing session."""
from cli.manager import CLISessionManager
manager = CLISessionManager(
workspace_path="/tmp/test",
api_url="http://localhost:8082/v1",
)
# Create first session
s1, sid1, _is_new1 = await manager.get_or_create_session()
# Request same session
s2, _sid2, is_new2 = await manager.get_or_create_session(session_id=sid1)
assert s1 is s2
assert is_new2 is False
@pytest.mark.asyncio
async def test_manager_stats(self):
"""Test manager stats."""
from cli.manager import CLISessionManager
manager = CLISessionManager(
workspace_path="/tmp/test",
api_url="http://localhost:8082/v1",
)
stats = manager.get_stats()
assert stats["active_sessions"] == 0
assert stats["pending_sessions"] == 0