Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
File size: 4,325 Bytes
2a2e170 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | """Heartbeat + stable-local-path tests for Session.
We don't spin up the real agent loop — we build a minimal Session with a
stubbed config and an in-memory queue, then call send_event repeatedly while
monkeypatching time.monotonic to simulate seconds passing.
"""
import asyncio
import json
from pathlib import Path
from unittest.mock import patch
import pytest
from agent.core.session import Event, Session
class _FakeConfig:
model_name = "claude-opus-4-6"
save_sessions = True
session_dataset_repo = "fake/repo"
auto_save_interval = 1
heartbeat_interval_s = 60
max_iterations = 10
yolo_mode = False
confirm_cpu_jobs = False
auto_file_upload = False
reasoning_effort = None
mcpServers: dict = {}
def _mk_session(tmp_path: Path) -> Session:
import os
os.chdir(tmp_path) # so session_logs/ lands under tmp_path
# Stub out the context manager to avoid litellm lookups.
from agent.context_manager.manager import ContextManager
cm = ContextManager.__new__(ContextManager)
cm.items = []
cm.tool_specs = []
cm.model_max_tokens = 200_000
cm.running_context_usage = 0
cm.compact_size = 0.1
cm.untouched_messages = 5
cm.hf_token = None
cm.local_mode = True
s = Session(
event_queue=asyncio.Queue(),
config=_FakeConfig(),
tool_router=None,
context_manager=cm,
hf_token=None,
local_mode=True,
)
return s
def test_heartbeat_fires_after_interval(tmp_path, monkeypatch):
# Use asyncio.run rather than pytest-asyncio so the test works without the
# plugin installed (same pattern elsewhere in this repo).
async def body():
s = _mk_session(tmp_path)
calls = []
def fake_upload(repo_id):
calls.append(repo_id)
return "fake/path.json"
monkeypatch.setattr(s, "save_and_upload_detached", fake_upload)
# t=0: first event, should NOT trigger (initial _last_heartbeat_ts = now)
with patch("agent.core.telemetry.time.monotonic", return_value=100.0):
s._last_heartbeat_ts = 100.0
await s.send_event(Event(event_type="x"))
assert calls == []
# t=+30s: still under interval → no save
with patch("agent.core.telemetry.time.monotonic", return_value=130.0):
await s.send_event(Event(event_type="y"))
assert calls == []
# t=+61s: over 60s → save fires once
with patch("agent.core.telemetry.time.monotonic", return_value=161.0):
await s.send_event(Event(event_type="z"))
# create_task runs on the event loop; wait for the to_thread to complete
await asyncio.sleep(0.05)
assert calls == ["fake/repo"]
# Next event shortly after → no second save (interval resets to 161)
with patch("agent.core.telemetry.time.monotonic", return_value=170.0):
await s.send_event(Event(event_type="w"))
await asyncio.sleep(0.05)
assert len(calls) == 1
asyncio.run(body())
def test_stable_local_path_overwrites(tmp_path):
import os
os.chdir(tmp_path)
from agent.context_manager.manager import ContextManager
cm = ContextManager.__new__(ContextManager)
cm.items = []
cm.tool_specs = []
cm.model_max_tokens = 200_000
cm.running_context_usage = 0
cm.compact_size = 0.1
cm.untouched_messages = 5
cm.hf_token = None
cm.local_mode = True
s = Session(
event_queue=asyncio.Queue(),
config=_FakeConfig(),
tool_router=None,
context_manager=cm,
hf_token=None,
local_mode=True,
)
p1 = s.save_trajectory_local(directory="session_logs")
assert p1 is not None
p2 = s.save_trajectory_local(directory="session_logs")
p3 = s.save_trajectory_local(directory="session_logs")
# All three saves land on the same file — heartbeat should not spam files.
assert p1 == p2 == p3
files = list(Path("session_logs").glob("session_*.json"))
# Exactly one final file; the .tmp should be renamed away.
assert len(files) == 1
# File is valid JSON (atomic write → no torn content).
with open(p1) as f:
data = json.load(f)
assert data["session_id"] == s.session_id
assert data["upload_status"] == "pending"
|