Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
File size: 6,174 Bytes
5fe810b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | """
Main agent implementation
"""
import asyncio
from litellm import (
ChatCompletionMessageToolCall,
Message,
ModelResponse,
acompletion,
)
from agent.core.session import Event, OpType, Session
ToolCall = ChatCompletionMessageToolCall
class Handlers:
"""Handler functions for each operation type"""
@staticmethod
async def run_agent(session: Session, text: str, max_iterations: int = 10) -> None:
"""Handle user input (like user_input_or_turn in codex.rs:1291)"""
# Add user message to history
user_msg = Message(role="user", content=text)
session.context_manager.add_message(user_msg)
# Send event that we're processing
await session.send_event(
Event(event_type="processing", data={"message": "Processing user input"})
)
# Agentic loop - continue until model doesn't call tools or max iterations is reached
iteration = 0
while iteration < max_iterations:
messages = session.context_manager.get_messages()
try:
response: ModelResponse = await acompletion(
model=session.config.model_name,
messages=messages,
tools=session.config.tools,
)
message = response.choices[0].message
# Extract content and tool calls
content = message.content
tool_calls: list[ToolCall] = message.get("tool_calls", [])
# Record assistant message if there's content
if content:
assistant_msg = Message(role="assistant", content=content)
session.context_manager.add_message(assistant_msg)
await session.send_event(
Event(
event_type="assistant_message",
data={"message": assistant_msg},
)
)
# If no tool calls, we're done
if not tool_calls:
break
for tool_call in tool_calls:
result = await session.tool_executor.execute_tool(tool_call)
tool_output = Message(role="tool", content=result.output)
session.context_manager.add_message(tool_output)
await session.send_event(
Event(
event_type="tool_output",
data={"message": tool_output},
)
)
iteration += 1
except Exception as e:
await session.send_event(
Event(event_type="error", data={"error": str(e)})
)
break
# Send completion event
await session.send_event(
Event(
event_type="turn_complete",
data={"history_size": len(session.context_manager.items)},
)
)
@staticmethod
async def interrupt(session: Session) -> None:
"""Handle interrupt (like interrupt in codex.rs:1266)"""
session.interrupt()
await session.send_event(Event(event_type="interrupted"))
@staticmethod
async def compact(session: Session) -> None:
"""Handle compact (like compact in codex.rs:1317)"""
old_size = len(session.context_manager.items)
session.context_manager.compact(target_size=10)
new_size = len(session.context_manager.items)
await session.send_event(
Event(
event_type="compacted",
data={"removed": old_size - new_size, "remaining": new_size},
)
)
@staticmethod
async def undo(session: Session) -> None:
"""Handle undo (like undo in codex.rs:1314)"""
# Remove last user turn and all following items
# Simplified: just remove last 2 items
for _ in range(min(2, len(session.context_manager.items))):
session.context_manager.items.pop()
await session.send_event(Event(event_type="undo_complete"))
@staticmethod
async def shutdown(session: Session) -> bool:
"""Handle shutdown (like shutdown in codex.rs:1329)"""
session.is_running = False
await session.send_event(Event(event_type="shutdown"))
return True
async def submission_loop(
submission_queue: asyncio.Queue, event_queue: asyncio.Queue
) -> None:
"""
Main agent loop - processes submissions and dispatches to handlers.
This is the core of the agent (like submission_loop in codex.rs:1259-1340)
"""
session = Session(event_queue)
print("🤖 Agent loop started")
# Main processing loop
while session.is_running:
try:
# Wait for next submission (like rx_sub.recv() in codex.rs:1262)
submission = await submission_queue.get()
print(f"📨 Received: {submission.operation.op_type.value}")
# Dispatch to handler based on operation type
op = submission.operation
if op.op_type == OpType.USER_INPUT:
text = op.data.get("text", "") if op.data else ""
await Handlers.run_agent(session, text, max_iterations=10)
elif op.op_type == OpType.INTERRUPT:
# im not currently sure what this does lol
await Handlers.interrupt(session)
elif op.op_type == OpType.COMPACT:
await Handlers.compact(session)
elif op.op_type == OpType.UNDO:
await Handlers.undo(session)
elif op.op_type == OpType.SHUTDOWN:
if await Handlers.shutdown(session):
break
else:
print(f"⚠️ Unknown operation: {op.op_type}")
except asyncio.CancelledError:
break
except Exception as e:
print(f"❌ Error in agent loop: {e}")
await session.send_event(Event(event_type="error", data={"error": str(e)}))
print("🛑 Agent loop exited")
|