| """ |
| PhD Research OS — ECC Harness Integration Tests |
| ================================================= |
| Tests the companion agent lifecycle: spawn → preflight → plan → execute → postflight |
| """ |
|
|
| import os |
| import sys |
| import json |
| import pytest |
|
|
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
| from phd_research_os.agent_os import ( |
| AgentOS, AgentState, COMPANION_TYPES, |
| init_agent_os_db |
| ) |
| from phd_research_os.db import ( |
| init_db, get_db, create_claim, create_goal, create_conflict |
| ) |
|
|
| TEST_DB = "test_agent_os.db" |
|
|
|
|
| @pytest.fixture(autouse=True) |
| def setup_teardown(): |
| """Create fresh DB with both core and agent_os tables.""" |
| init_agent_os_db(TEST_DB) |
| |
| |
| conn = get_db(TEST_DB) |
| cid1 = create_claim(conn, "Graphene FET shows 45mV shift", "Fact", 0.85, |
| evidence_strength=0.9, study_quality_weight=1.0) |
| cid2 = create_claim(conn, "Sensitivity plateaus below 1mM", "Interpretation", 0.6, |
| evidence_strength=0.5, study_quality_weight=0.8) |
| create_goal(conn, "Achieve sub-fM detection limit", "high", [cid1]) |
| create_conflict(conn, cid1, cid2, "value_mismatch", "Different conditions") |
| conn.close() |
| |
| yield |
| |
| for suffix in ["", "-wal", "-shm"]: |
| path = TEST_DB + suffix |
| if os.path.exists(path): |
| os.remove(path) |
|
|
|
|
| |
| |
| |
|
|
| def test_spawn_data_quality_auditor(): |
| aos = AgentOS(db_path=TEST_DB) |
| agent_id = aos.spawn_companion("DataQualityAuditor") |
| assert agent_id.startswith("COMP_") |
| |
| agents = aos.list_companions() |
| assert len(agents) == 1 |
| assert agents[0]["agent_type"] == "DataQualityAuditor" |
| assert agents[0]["state"] == "spawned" |
|
|
|
|
| def test_spawn_all_builtin_types(): |
| aos = AgentOS(db_path=TEST_DB) |
| ids = [] |
| for agent_type in COMPANION_TYPES: |
| aid = aos.spawn_companion(agent_type) |
| ids.append(aid) |
| assert len(ids) == len(COMPANION_TYPES) |
| assert len(set(ids)) == len(COMPANION_TYPES) |
|
|
|
|
| def test_spawn_custom_agent(): |
| aos = AgentOS(db_path=TEST_DB) |
| agent_id = aos.spawn_companion( |
| "custom", |
| purpose="Test custom agent", |
| system_prompt="You are a test agent. Output JSON: {'test': true}" |
| ) |
| agents = aos.list_companions() |
| assert any(a["agent_type"] == "custom" for a in agents) |
|
|
|
|
| def test_spawn_custom_without_prompt_fails(): |
| aos = AgentOS(db_path=TEST_DB) |
| with pytest.raises(ValueError, match="Custom agents require"): |
| aos.spawn_companion("custom") |
|
|
|
|
| def test_spawn_unknown_type_fails(): |
| aos = AgentOS(db_path=TEST_DB) |
| with pytest.raises(ValueError, match="Unknown agent type"): |
| aos.spawn_companion("NonexistentAgent") |
|
|
|
|
| |
| |
| |
|
|
| def test_assign_task(): |
| aos = AgentOS(db_path=TEST_DB) |
| agent_id = aos.spawn_companion("DataQualityAuditor") |
| task_id = aos.assign_task(agent_id, "Audit last 10 claims for hallucination") |
| assert task_id.startswith("TASK_") |
| |
| conn = get_db(TEST_DB) |
| task = dict(conn.execute("SELECT * FROM agent_tasks WHERE task_id = ?", |
| (task_id,)).fetchone()) |
| conn.close() |
| assert task["state"] == "preflight" |
| assert task["max_iterations"] == 3 |
|
|
|
|
| def test_full_lifecycle_without_brain(): |
| """Test the full ECC lifecycle runs even without an AI brain.""" |
| aos = AgentOS(db_path=TEST_DB) |
| agent_id = aos.spawn_companion("DataQualityAuditor") |
| task_id = aos.assign_task(agent_id, "Audit claims for quality") |
| |
| result = aos.run_task(task_id) |
| |
| assert result["status"] == "completed" |
| assert len(result["proposals"]) >= 1 |
| |
| |
| conn = get_db(TEST_DB) |
| task = dict(conn.execute("SELECT * FROM agent_tasks WHERE task_id = ?", |
| (task_id,)).fetchone()) |
| conn.close() |
| assert task["state"] == "completed" |
|
|
|
|
| def test_task_has_plan(): |
| aos = AgentOS(db_path=TEST_DB) |
| agent_id = aos.spawn_companion("PromptOptimizer") |
| task_id = aos.assign_task(agent_id, "Optimize extraction prompt") |
| aos.run_task(task_id) |
| |
| conn = get_db(TEST_DB) |
| task = dict(conn.execute("SELECT * FROM agent_tasks WHERE task_id = ?", |
| (task_id,)).fetchone()) |
| conn.close() |
| |
| plan = json.loads(task["plan"]) |
| assert "steps" in plan |
| assert len(plan["steps"]) > 0 |
|
|
|
|
| def test_kill_heuristic_time_budget(): |
| """Task with 0-second budget should be halted immediately on execution.""" |
| aos = AgentOS(db_path=TEST_DB) |
| agent_id = aos.spawn_companion("DataQualityAuditor") |
| task_id = aos.assign_task(agent_id, "Quick check", time_budget_s=0) |
| |
| |
| |
| result = aos.run_task(task_id) |
| assert result["status"] in ["completed", "halted"] |
|
|
|
|
| |
| |
| |
|
|
| def test_proposals_created(): |
| aos = AgentOS(db_path=TEST_DB) |
| agent_id = aos.spawn_companion("DomainExpander") |
| task_id = aos.assign_task(agent_id, "Generate materials science examples") |
| aos.run_task(task_id) |
| |
| proposals = aos.get_proposals(agent_id) |
| assert len(proposals) >= 1 |
| assert proposals[0]["status"] == "proposed" |
|
|
|
|
| def test_approve_proposal(): |
| aos = AgentOS(db_path=TEST_DB) |
| agent_id = aos.spawn_companion("CalibrationAnalyst") |
| task_id = aos.assign_task(agent_id, "Check calibration") |
| aos.run_task(task_id) |
| |
| proposals = aos.get_proposals(agent_id) |
| assert len(proposals) >= 1 |
| |
| aos.approve_proposal(proposals[0]["proposal_id"], "Dr. Smith") |
| |
| updated = aos.get_proposals(agent_id) |
| approved = [p for p in updated if p["status"] == "approved"] |
| assert len(approved) == 1 |
| assert approved[0]["reviewed_by"] == "Dr. Smith" |
|
|
|
|
| def test_reject_proposal_with_reason(): |
| aos = AgentOS(db_path=TEST_DB) |
| agent_id = aos.spawn_companion("CitationChaser") |
| task_id = aos.assign_task(agent_id, "Find citing papers") |
| aos.run_task(task_id) |
| |
| proposals = aos.get_proposals(agent_id) |
| assert len(proposals) >= 1 |
| |
| aos.reject_proposal(proposals[0]["proposal_id"], |
| "Not relevant to current research focus", |
| "researcher") |
| |
| updated = aos.get_proposals(agent_id) |
| rejected = [p for p in updated if p["status"] == "rejected"] |
| assert len(rejected) == 1 |
| assert "Not relevant" in rejected[0]["rejection_reason"] |
|
|
|
|
| def test_proposals_filter_by_status(): |
| aos = AgentOS(db_path=TEST_DB) |
| agent_id = aos.spawn_companion("DataQualityAuditor") |
| task_id = aos.assign_task(agent_id, "Audit") |
| aos.run_task(task_id) |
| |
| all_proposals = aos.get_proposals() |
| proposed = aos.get_proposals(status="proposed") |
| assert len(proposed) == len(all_proposals) |
|
|
|
|
| |
| |
| |
|
|
| def test_audit_trail(): |
| aos = AgentOS(db_path=TEST_DB) |
| agent_id = aos.spawn_companion("DataQualityAuditor") |
| task_id = aos.assign_task(agent_id, "Audit claims") |
| aos.run_task(task_id) |
| |
| audit = aos.get_audit_log(agent_id) |
| assert len(audit) >= 4 |
| |
| phases = [entry["phase"] for entry in audit] |
| assert "spawn" in phases |
| assert "preflight" in phases |
| assert "planning" in phases |
| assert "executing" in phases |
|
|
|
|
| def test_audit_log_immutable(): |
| """Audit entries cannot be modified or deleted through the API.""" |
| aos = AgentOS(db_path=TEST_DB) |
| agent_id = aos.spawn_companion("DataQualityAuditor") |
| |
| log = aos.get_audit_log(agent_id) |
| assert len(log) == 1 |
| assert log[0]["action"] == "Agent created" |
|
|
|
|
| |
| |
| |
|
|
| def test_memory_store(): |
| aos = AgentOS(db_path=TEST_DB) |
| aos.set_memory("test_key", "test_value", "assumption") |
| |
| mem = aos.get_memory("test_key") |
| assert mem is not None |
| assert mem["value"] == "test_value" |
| assert mem["category"] == "assumption" |
|
|
|
|
| def test_memory_overwrite(): |
| aos = AgentOS(db_path=TEST_DB) |
| aos.set_memory("key1", "old_value") |
| aos.set_memory("key1", "new_value") |
| |
| mem = aos.get_memory("key1") |
| assert mem["value"] == "new_value" |
|
|
|
|
| |
| |
| |
|
|
| def test_propose_harness_evolution(): |
| aos = AgentOS(db_path=TEST_DB) |
| evo_id = aos.propose_harness_evolution( |
| "§3", "Add max 5 iterations for data quality tasks", |
| "Data quality requires more iterations than architecture changes", |
| "COMP_TEST001" |
| ) |
| assert evo_id >= 1 |
| |
| conn = get_db(TEST_DB) |
| row = dict(conn.execute("SELECT * FROM harness_evolution WHERE id = ?", |
| (evo_id,)).fetchone()) |
| conn.close() |
| assert row["approved"] == 0 |
|
|
|
|
| |
| |
| |
|
|
| def test_retire_companion(): |
| aos = AgentOS(db_path=TEST_DB) |
| agent_id = aos.spawn_companion("DataQualityAuditor") |
| aos.retire_companion(agent_id) |
| |
| agents = aos.list_companions(include_retired=False) |
| assert len(agents) == 0 |
| |
| all_agents = aos.list_companions(include_retired=True) |
| assert len(all_agents) == 1 |
| assert all_agents[0]["state"] == "retired" |
|
|
|
|
| def test_retired_agent_fails_preflight(): |
| """Retired agents should fail preflight checks.""" |
| aos = AgentOS(db_path=TEST_DB) |
| agent_id = aos.spawn_companion("DataQualityAuditor") |
| aos.retire_companion(agent_id) |
| |
| task_id = aos.assign_task(agent_id, "Should fail") |
| result = aos.run_task(task_id) |
| assert result["status"] == "halted" |
|
|
|
|
| |
| |
| |
|
|
| def test_agent_stats_updated_after_task(): |
| aos = AgentOS(db_path=TEST_DB) |
| agent_id = aos.spawn_companion("DataQualityAuditor") |
| task_id = aos.assign_task(agent_id, "Audit claims") |
| aos.run_task(task_id) |
| |
| agents = aos.list_companions() |
| agent = [a for a in agents if a["agent_id"] == agent_id][0] |
| assert agent["total_tasks_completed"] == 1 |
| assert agent["total_proposals_made"] >= 1 |
|
|
|
|
| if __name__ == "__main__": |
| pytest.main([__file__, "-v"]) |
|
|