cesjavi commited on
Commit
c38df78
·
1 Parent(s): 327739b

Phase 7: Integrated Vectorized Long-Term Memory (Index on Approval, Retrieve on Run)

Browse files
backend/routers/agent_runner.py CHANGED
@@ -1,7 +1,14 @@
1
- from fastapi import APIRouter, HTTPException, BackgroundTasks
 
2
  from services.supabase_service import supabase
3
  from services.agent_runner_service import AgentRunnerService
 
 
4
  from services.output_quality import report_text_from_output
 
 
 
 
5
  import logging
6
 
7
  router = APIRouter()
@@ -25,7 +32,24 @@ def _assert_task_quality(task: dict):
25
  reasons = quality_review.get("fail_reasons") or ["Task output failed quality validation."]
26
  raise HTTPException(status_code=400, detail=f"Task output failed quality review: {'; '.join(reasons)}")
27
 
 
 
 
 
 
 
 
 
 
 
 
 
28
  def update_task_status(task_id: str, status: str):
 
 
 
 
 
29
  result = (
30
  supabase.table("tasks")
31
  .update({"status": status})
@@ -53,8 +77,218 @@ def update_task_status(task_id: str, status: str):
53
 
54
  return task_data
55
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
  @router.post("/{task_id}/run")
57
- async def run_task(task_id: str, background_tasks: BackgroundTasks):
58
  """
59
  Triggers the execution of a specific task.
60
  """
@@ -64,6 +298,7 @@ async def run_task(task_id: str, background_tasks: BackgroundTasks):
64
  raise HTTPException(status_code=404, detail="Task not found")
65
 
66
  task = task_res.data
 
67
 
68
  # 2. Check if agent is assigned
69
  agent_id = task.get("assigned_agent_id")
@@ -76,9 +311,30 @@ async def run_task(task_id: str, background_tasks: BackgroundTasks):
76
  raise HTTPException(status_code=404, detail="Assigned agent not found")
77
 
78
  agent_data = agent_res.data
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
 
80
  # 4. Update task status to in_progress
81
  supabase.table("tasks").update({"status": "in_progress"}).eq("id", task_id).execute()
 
 
 
 
 
 
 
82
 
83
  # 5. Run in background
84
  background_tasks.add_task(AgentRunnerService.execute_agent_logic, task, agent_data)
@@ -86,23 +342,43 @@ async def run_task(task_id: str, background_tasks: BackgroundTasks):
86
  return {"message": "Task execution started", "task_id": task_id}
87
 
88
  @router.post("/{task_id}/approve")
89
- async def approve_task(task_id: str):
90
  task_res = supabase.table("tasks").select("*").eq("id", task_id).single().execute()
91
  if not task_res.data:
92
  raise HTTPException(status_code=404, detail="Task not found")
 
93
  _assert_task_quality(task_res.data)
94
  task = update_task_status(task_id, "done")
 
 
 
 
 
 
 
 
 
 
 
95
  return {"message": "Task approved", "task": task}
96
 
97
  @router.post("/{task_id}/reject")
98
  async def reject_task(task_id: str):
99
  task = update_task_status(task_id, "todo")
 
 
 
 
 
 
 
100
  return {"message": "Task rejected", "task": task}
101
  @router.post("/project/{project_id}/approve-all")
102
- async def approve_all_tasks(project_id: str):
103
  """
104
  Approves all tasks in a project that are awaiting approval.
105
  """
 
106
  waiting_tasks = (
107
  supabase.table("tasks")
108
  .select("*")
@@ -132,6 +408,10 @@ async def approve_all_tasks(project_id: str):
132
  .execute()
133
  )
134
  result_data = result.data or []
 
 
 
 
135
 
136
  # 2. Check if all tasks in project are now done
137
  task_result = (
@@ -143,6 +423,16 @@ async def approve_all_tasks(project_id: str):
143
  tasks = task_result.data or []
144
  if tasks and all(task.get("status") == "done" for task in tasks):
145
  supabase.table("projects").update({"status": "completed"}).eq("id", project_id).execute()
 
 
 
 
 
 
 
 
 
 
146
 
147
  return {
148
  "message": f"Approved {len(result_data)} tasks",
 
1
+ from fastapi import APIRouter, HTTPException, BackgroundTasks, Request
2
+ from fastapi.responses import StreamingResponse
3
  from services.supabase_service import supabase
4
  from services.agent_runner_service import AgentRunnerService
5
+ from services.config import settings
6
+ from services.audit_service import audit_service
7
  from services.output_quality import report_text_from_output
8
+ from services.task_queue import TaskQueueService
9
+ from services.memory_service import memory_service
10
+ import asyncio
11
+ import json
12
  import logging
13
 
14
  router = APIRouter()
 
32
  reasons = quality_review.get("fail_reasons") or ["Task output failed quality validation."]
33
  raise HTTPException(status_code=400, detail=f"Task output failed quality review: {'; '.join(reasons)}")
34
 
35
+ def _assert_project_is_mutable(project_id: str):
36
+ project = supabase.table("projects").select("id,status").eq("id", project_id).single().execute().data
37
+ if not project:
38
+ raise HTTPException(status_code=404, detail="Project not found")
39
+ if project.get("status") == "completed":
40
+ raise HTTPException(status_code=409, detail="Completed projects are locked and cannot be modified.")
41
+
42
+ def _assert_task_project_is_mutable(task: dict):
43
+ project_id = task.get("project_id")
44
+ if project_id:
45
+ _assert_project_is_mutable(project_id)
46
+
47
  def update_task_status(task_id: str, status: str):
48
+ task_res = supabase.table("tasks").select("project_id").eq("id", task_id).single().execute()
49
+ if not task_res.data:
50
+ raise HTTPException(status_code=404, detail="Task not found")
51
+ _assert_task_project_is_mutable(task_res.data)
52
+
53
  result = (
54
  supabase.table("tasks")
55
  .update({"status": status})
 
77
 
78
  return task_data
79
 
80
+
81
+ def _sse_event(event: str, data: dict, event_id: str | None = None) -> str:
82
+ lines = []
83
+ if event_id:
84
+ lines.append(f"id: {event_id}")
85
+ lines.append(f"event: {event}")
86
+ payload = json.dumps(data, default=str)
87
+ for line in payload.splitlines() or ["{}"]:
88
+ lines.append(f"data: {line}")
89
+ return "\n".join(lines) + "\n\n"
90
+
91
+
92
+ def _project_task_ids(project_id: str) -> list[str]:
93
+ rows = (
94
+ supabase.table("tasks")
95
+ .select("id")
96
+ .eq("project_id", project_id)
97
+ .execute()
98
+ .data
99
+ or []
100
+ )
101
+ return [row["id"] for row in rows if row.get("id")]
102
+
103
+
104
+ def _user_id_from_access_token(access_token: str | None) -> str:
105
+ if not access_token:
106
+ raise HTTPException(status_code=401, detail="Missing access token")
107
+ try:
108
+ auth_user = supabase.auth.get_user(access_token)
109
+ user = getattr(auth_user, "user", None)
110
+ user_id = getattr(user, "id", None)
111
+ if not user_id and isinstance(auth_user, dict):
112
+ user_id = auth_user.get("user", {}).get("id")
113
+ except Exception as exc:
114
+ logger.warning("Could not validate log stream access token: %s", exc)
115
+ raise HTTPException(status_code=401, detail="Invalid access token") from exc
116
+ if not user_id:
117
+ raise HTTPException(status_code=401, detail="Invalid access token")
118
+ return user_id
119
+
120
+
121
+ def _team_ids_for_user(user_id: str) -> list[str]:
122
+ try:
123
+ rows = (
124
+ supabase.table("team_members")
125
+ .select("team_id")
126
+ .eq("user_id", user_id)
127
+ .execute()
128
+ .data
129
+ or []
130
+ )
131
+ except Exception as exc:
132
+ logger.warning("Team membership lookup unavailable for log stream: %s", exc)
133
+ return []
134
+ return [row["team_id"] for row in rows if row.get("team_id")]
135
+
136
+
137
+ def _project_ids_for_user(user_id: str) -> list[str]:
138
+ project_ids: set[str] = set()
139
+
140
+ owned = (
141
+ supabase.table("projects")
142
+ .select("id")
143
+ .eq("owner_id", user_id)
144
+ .execute()
145
+ .data
146
+ or []
147
+ )
148
+ project_ids.update(row["id"] for row in owned if row.get("id"))
149
+
150
+ public = (
151
+ supabase.table("projects")
152
+ .select("id")
153
+ .eq("is_public", True)
154
+ .execute()
155
+ .data
156
+ or []
157
+ )
158
+ project_ids.update(row["id"] for row in public if row.get("id"))
159
+
160
+ team_ids = _team_ids_for_user(user_id)
161
+ if team_ids:
162
+ team_projects = (
163
+ supabase.table("projects")
164
+ .select("id")
165
+ .in_("team_id", team_ids)
166
+ .execute()
167
+ .data
168
+ or []
169
+ )
170
+ project_ids.update(row["id"] for row in team_projects if row.get("id"))
171
+
172
+ return list(project_ids)
173
+
174
+
175
+ def _can_view_project_for_user(project_id: str, user_id: str) -> bool:
176
+ if not project_id:
177
+ return False
178
+ if project_id in _project_ids_for_user(user_id):
179
+ return True
180
+ return False
181
+
182
+
183
+ def _authorized_task_ids(user_id: str, project_id: str | None = None, task_id: str | None = None) -> list[str]:
184
+ if task_id:
185
+ task = supabase.table("tasks").select("id,project_id").eq("id", task_id).single().execute().data
186
+ if not task or not _can_view_project_for_user(task.get("project_id"), user_id):
187
+ raise HTTPException(status_code=403, detail="Task logs are not visible to this user")
188
+ return [task_id]
189
+
190
+ if project_id:
191
+ if not _can_view_project_for_user(project_id, user_id):
192
+ raise HTTPException(status_code=403, detail="Project logs are not visible to this user")
193
+ return _project_task_ids(project_id)
194
+
195
+ project_ids = _project_ids_for_user(user_id)
196
+ if not project_ids:
197
+ return []
198
+ rows = (
199
+ supabase.table("tasks")
200
+ .select("id")
201
+ .in_("project_id", project_ids)
202
+ .execute()
203
+ .data
204
+ or []
205
+ )
206
+ return [row["id"] for row in rows if row.get("id")]
207
+
208
+
209
+ def _fetch_recent_logs(
210
+ limit: int = 50,
211
+ after_created_at: str | None = None,
212
+ *,
213
+ task_ids: list[str],
214
+ ) -> list[dict]:
215
+ if not task_ids:
216
+ return []
217
+ query = (
218
+ supabase.table("agent_logs")
219
+ .select("id,task_id,run_id,action,content,metadata,created_at")
220
+ .order("created_at", desc=after_created_at is None)
221
+ .limit(limit)
222
+ .in_("task_id", task_ids)
223
+ )
224
+ if after_created_at:
225
+ query = query.gt("created_at", after_created_at)
226
+ rows = query.execute().data or []
227
+ return rows if after_created_at else list(reversed(rows))
228
+
229
+
230
+ @router.get("/logs/stream")
231
+ async def stream_agent_logs(
232
+ request: Request,
233
+ limit: int = 50,
234
+ project_id: str | None = None,
235
+ task_id: str | None = None,
236
+ access_token: str | None = None,
237
+ ):
238
+ """
239
+ Streams agent log inserts as Server-Sent Events.
240
+ """
241
+ if project_id and task_id:
242
+ raise HTTPException(status_code=400, detail="Use either project_id or task_id, not both.")
243
+ user_id = _user_id_from_access_token(access_token)
244
+ task_ids = _authorized_task_ids(user_id, project_id=project_id, task_id=task_id)
245
+
246
+ async def event_generator():
247
+ last_created_at = None
248
+ sent_ids: set[str] = set()
249
+ yield _sse_event("ready", {
250
+ "message": "Agent log stream connected",
251
+ "project_id": project_id,
252
+ "task_id": task_id,
253
+ "user_id": user_id,
254
+ })
255
+
256
+ while not await request.is_disconnected():
257
+ try:
258
+ rows = _fetch_recent_logs(
259
+ limit=max(1, min(limit, 100)),
260
+ after_created_at=last_created_at,
261
+ task_ids=task_ids,
262
+ )
263
+ for row in rows:
264
+ row_id = row.get("id")
265
+ if row_id in sent_ids:
266
+ continue
267
+ sent_ids.add(row_id)
268
+ if len(sent_ids) > 500:
269
+ sent_ids = set(list(sent_ids)[-250:])
270
+ last_created_at = row.get("created_at") or last_created_at
271
+ yield _sse_event("log", row, row_id)
272
+ except Exception as exc:
273
+ logger.warning("Agent log SSE stream failed to fetch logs: %s", exc)
274
+ yield _sse_event("error", {"message": str(exc)})
275
+
276
+ yield ": keep-alive\n\n"
277
+ await asyncio.sleep(1)
278
+
279
+ return StreamingResponse(
280
+ event_generator(),
281
+ media_type="text/event-stream",
282
+ headers={
283
+ "Cache-Control": "no-cache",
284
+ "Connection": "keep-alive",
285
+ "X-Accel-Buffering": "no",
286
+ },
287
+ )
288
+
289
+
290
  @router.post("/{task_id}/run")
291
+ async def run_task(task_id: str, background_tasks: BackgroundTasks, use_queue: bool | None = None):
292
  """
293
  Triggers the execution of a specific task.
294
  """
 
298
  raise HTTPException(status_code=404, detail="Task not found")
299
 
300
  task = task_res.data
301
+ _assert_task_project_is_mutable(task)
302
 
303
  # 2. Check if agent is assigned
304
  agent_id = task.get("assigned_agent_id")
 
311
  raise HTTPException(status_code=404, detail="Assigned agent not found")
312
 
313
  agent_data = agent_res.data
314
+
315
+ should_queue = use_queue if use_queue is not None else settings.TASK_EXECUTION_MODE == "queue"
316
+ if should_queue:
317
+ queued = await TaskQueueService.queue_task(task_id)
318
+ if not queued or not queued.data:
319
+ raise HTTPException(status_code=500, detail="Task could not be queued")
320
+ await audit_service.log_action(
321
+ user_id=task.get("project", {}).get("owner_id"),
322
+ action="task_queued",
323
+ agent_id=agent_id,
324
+ task_id=task_id,
325
+ metadata={"project_id": task.get("project_id"), "source": "task_run_endpoint"},
326
+ )
327
+ return {"message": "Task queued for worker execution", "task_id": task_id, "mode": "queue"}
328
 
329
  # 4. Update task status to in_progress
330
  supabase.table("tasks").update({"status": "in_progress"}).eq("id", task_id).execute()
331
+ await audit_service.log_action(
332
+ user_id=task.get("project", {}).get("owner_id"),
333
+ action="task_run_started",
334
+ agent_id=agent_id,
335
+ task_id=task_id,
336
+ metadata={"project_id": task.get("project_id"), "mode": "direct"},
337
+ )
338
 
339
  # 5. Run in background
340
  background_tasks.add_task(AgentRunnerService.execute_agent_logic, task, agent_data)
 
342
  return {"message": "Task execution started", "task_id": task_id}
343
 
344
  @router.post("/{task_id}/approve")
345
+ async def approve_task(task_id: str, background_tasks: BackgroundTasks):
346
  task_res = supabase.table("tasks").select("*").eq("id", task_id).single().execute()
347
  if not task_res.data:
348
  raise HTTPException(status_code=404, detail="Task not found")
349
+ _assert_task_project_is_mutable(task_res.data)
350
  _assert_task_quality(task_res.data)
351
  task = update_task_status(task_id, "done")
352
+
353
+ # Index for Long-Term Memory
354
+ background_tasks.add_task(memory_service.index_task_output, task)
355
+
356
+ await audit_service.log_action(
357
+ user_id=None,
358
+ action="task_approved",
359
+ agent_id=task.get("assigned_agent_id"),
360
+ task_id=task_id,
361
+ metadata={"project_id": task.get("project_id")},
362
+ )
363
  return {"message": "Task approved", "task": task}
364
 
365
  @router.post("/{task_id}/reject")
366
  async def reject_task(task_id: str):
367
  task = update_task_status(task_id, "todo")
368
+ await audit_service.log_action(
369
+ user_id=None,
370
+ action="task_rejected",
371
+ agent_id=task.get("assigned_agent_id"),
372
+ task_id=task_id,
373
+ metadata={"project_id": task.get("project_id")},
374
+ )
375
  return {"message": "Task rejected", "task": task}
376
  @router.post("/project/{project_id}/approve-all")
377
+ async def approve_all_tasks(project_id: str, background_tasks: BackgroundTasks):
378
  """
379
  Approves all tasks in a project that are awaiting approval.
380
  """
381
+ _assert_project_is_mutable(project_id)
382
  waiting_tasks = (
383
  supabase.table("tasks")
384
  .select("*")
 
408
  .execute()
409
  )
410
  result_data = result.data or []
411
+
412
+ # Index all approved tasks for Long-Term Memory
413
+ for approved_task in result_data:
414
+ background_tasks.add_task(memory_service.index_task_output, approved_task)
415
 
416
  # 2. Check if all tasks in project are now done
417
  task_result = (
 
423
  tasks = task_result.data or []
424
  if tasks and all(task.get("status") == "done" for task in tasks):
425
  supabase.table("projects").update({"status": "completed"}).eq("id", project_id).execute()
426
+
427
+ await audit_service.log_action(
428
+ user_id=None,
429
+ action="tasks_approved_bulk",
430
+ metadata={
431
+ "project_id": project_id,
432
+ "approved_count": len(result_data),
433
+ "blocked_count": len(blocked),
434
+ },
435
+ )
436
 
437
  return {
438
  "message": f"Approved {len(result_data)} tasks",
backend/services/agent_runner_service.py CHANGED
@@ -2,12 +2,25 @@ import logging
2
  from datetime import datetime, timezone
3
  from services.supabase_service import supabase
4
  from services.audit_service import audit_service
 
 
5
  from agents.agent_factory import AgentFactory
6
  from services.semantic_backprop import semantic_backprop
7
  from services.output_quality import build_quality_instructions, validate_output
 
8
 
9
  logger = logging.getLogger("agent_runner_service")
10
 
 
 
 
 
 
 
 
 
 
 
11
  class AgentRunnerService:
12
  @staticmethod
13
  async def run_agent_task(
@@ -27,6 +40,13 @@ class AgentRunnerService:
27
 
28
  if update_task:
29
  supabase.table("tasks").update({"status": "in_progress"}).eq("id", task_id).execute()
 
 
 
 
 
 
 
30
 
31
  try:
32
  run_res = supabase.table("task_runs").insert({
@@ -35,6 +55,13 @@ class AgentRunnerService:
35
  "status": "running"
36
  }).execute()
37
  run_id = run_res.data[0]["id"]
 
 
 
 
 
 
 
38
 
39
  agent = AgentFactory.get_agent(
40
  provider=agent_data["api_provider"],
@@ -65,6 +92,18 @@ class AgentRunnerService:
65
  extra_context = ""
66
  if include_semantic_context:
67
  extra_context = await semantic_backprop.get_project_context(project_id, task_id)
 
 
 
 
 
 
 
 
 
 
 
 
68
 
69
  import time
70
  import hashlib
@@ -81,6 +120,7 @@ class AgentRunnerService:
81
  if cache_key in AgentRunnerService._task_cache:
82
  logger.info(f"Cache hit for task {task_id}. Skipping LLM call.")
83
  cached_result = AgentRunnerService._task_cache[cache_key]
 
84
 
85
  # Still log the "start" for UI consistency
86
  agent_name = agent_data.get('name', 'Agent')
@@ -97,6 +137,24 @@ class AgentRunnerService:
97
  "status": "awaiting_approval",
98
  "output_data": cached_result
99
  }).eq("id", task_id).execute()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
100
 
101
  return cached_result, run_id
102
 
@@ -112,6 +170,24 @@ class AgentRunnerService:
112
  start_time = time.time()
113
  task_instructions = task.get("description") or task["title"]
114
  task_instructions = f"{task_instructions}\n\n{build_quality_instructions(quality_task)}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
115
  result = await agent.run(task_instructions, context, extra_context=extra_context)
116
  duration = time.time() - start_time
117
 
@@ -129,6 +205,26 @@ class AgentRunnerService:
129
 
130
  quality_review = validate_output(quality_task, result)
131
  result["quality_review"] = quality_review
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
132
 
133
  # 6. Save to Cache
134
  AgentRunnerService._task_cache[cache_key] = result
@@ -138,13 +234,28 @@ class AgentRunnerService:
138
  "status": "awaiting_approval",
139
  "output_data": result
140
  }).eq("id", task_id).execute()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
141
 
142
  # 7. Update Run Status
143
- supabase.table("task_runs").update({
144
  "status": "completed",
145
  "finished_at": datetime.now(timezone.utc).isoformat(),
146
  "duration_seconds": round(duration, 2)
147
- }).eq("id", run_id).execute()
148
 
149
  # 8. Log Completion with Metrics
150
  supabase.table("agent_logs").insert({
@@ -161,22 +272,76 @@ class AgentRunnerService:
161
  "action": "quality_review_failed",
162
  "content": f"Quality review failed: {', '.join(quality_review['fail_reasons'])}"
163
  }).execute()
 
 
 
 
 
 
 
 
 
 
 
164
 
165
  return result, run_id
166
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
167
  except Exception as e:
168
  logger.error(f"Error executing task {task_id}: {str(e)}")
169
  if run_id:
170
- supabase.table("task_runs").update({
171
  "status": "failed",
172
  "finished_at": datetime.now(timezone.utc).isoformat()
173
- }).eq("id", run_id).execute()
174
 
175
  if update_task:
176
  supabase.table("tasks").update({
177
  "status": "failed",
178
  "output_data": {"error": str(e)}
179
  }).eq("id", task_id).execute()
 
 
 
 
 
 
 
 
 
 
 
 
180
 
181
  # LOG ERROR TO AGENT CONSOLE
182
  supabase.table("agent_logs").insert({
 
2
  from datetime import datetime, timezone
3
  from services.supabase_service import supabase
4
  from services.audit_service import audit_service
5
+ from services.budget_service import BudgetExceededError, budget_service
6
+ from services.evidence_service import evidence_service
7
  from agents.agent_factory import AgentFactory
8
  from services.semantic_backprop import semantic_backprop
9
  from services.output_quality import build_quality_instructions, validate_output
10
+ from services.memory_service import memory_service
11
 
12
  logger = logging.getLogger("agent_runner_service")
13
 
14
+ def _update_task_run(run_id: str, payload: dict):
15
+ try:
16
+ return supabase.table("task_runs").update(payload).eq("id", run_id).execute()
17
+ except Exception as exc:
18
+ if "duration_seconds" in payload and "duration_seconds" in str(exc) and "schema cache" in str(exc):
19
+ fallback_payload = {key: value for key, value in payload.items() if key != "duration_seconds"}
20
+ logger.warning("task_runs.duration_seconds is missing in Supabase schema; retrying run update without duration.")
21
+ return supabase.table("task_runs").update(fallback_payload).eq("id", run_id).execute()
22
+ raise
23
+
24
  class AgentRunnerService:
25
  @staticmethod
26
  async def run_agent_task(
 
40
 
41
  if update_task:
42
  supabase.table("tasks").update({"status": "in_progress"}).eq("id", task_id).execute()
43
+ await audit_service.log_action(
44
+ user_id=None,
45
+ action="task_status_changed",
46
+ agent_id=agent_data.get("id"),
47
+ task_id=task_id,
48
+ metadata={"project_id": project_id, "status": "in_progress"},
49
+ )
50
 
51
  try:
52
  run_res = supabase.table("task_runs").insert({
 
55
  "status": "running"
56
  }).execute()
57
  run_id = run_res.data[0]["id"]
58
+ await audit_service.log_action(
59
+ user_id=None,
60
+ action="task_run_created",
61
+ agent_id=agent_data.get("id"),
62
+ task_id=task_id,
63
+ metadata={"project_id": project_id, "run_id": run_id, "status": "running"},
64
+ )
65
 
66
  agent = AgentFactory.get_agent(
67
  provider=agent_data["api_provider"],
 
92
  extra_context = ""
93
  if include_semantic_context:
94
  extra_context = await semantic_backprop.get_project_context(project_id, task_id)
95
+ # Fetch Long-Term Memory (Cross-project)
96
+ memories = await memory_service.search_memory(
97
+ query=task.get("description") or task["title"],
98
+ limit=3,
99
+ threshold=0.72
100
+ )
101
+ if memories:
102
+ memory_header = "\n\n### RELEVANT HISTORICAL CONTEXT (CROSS-PROJECT)\n"
103
+ memory_blocks = []
104
+ for m in memories:
105
+ memory_blocks.append(f"- Memory: {m['content']}")
106
+ extra_context += memory_header + "\n".join(memory_blocks)
107
 
108
  import time
109
  import hashlib
 
120
  if cache_key in AgentRunnerService._task_cache:
121
  logger.info(f"Cache hit for task {task_id}. Skipping LLM call.")
122
  cached_result = AgentRunnerService._task_cache[cache_key]
123
+ claims_count = await evidence_service.replace_task_claims(task, cached_result)
124
 
125
  # Still log the "start" for UI consistency
126
  agent_name = agent_data.get('name', 'Agent')
 
137
  "status": "awaiting_approval",
138
  "output_data": cached_result
139
  }).eq("id", task_id).execute()
140
+ await audit_service.log_action(
141
+ user_id=None,
142
+ action="task_status_changed",
143
+ agent_id=agent_data.get("id"),
144
+ task_id=task_id,
145
+ metadata={
146
+ "project_id": project_id,
147
+ "run_id": run_id,
148
+ "status": "awaiting_approval",
149
+ "cache_hit": True,
150
+ "claims_count": claims_count,
151
+ },
152
+ )
153
+
154
+ _update_task_run(run_id, {
155
+ "status": "completed",
156
+ "finished_at": datetime.now(timezone.utc).isoformat()
157
+ })
158
 
159
  return cached_result, run_id
160
 
 
170
  start_time = time.time()
171
  task_instructions = task.get("description") or task["title"]
172
  task_instructions = f"{task_instructions}\n\n{build_quality_instructions(quality_task)}"
173
+ prompt_tokens = budget_service.estimate_prompt_tokens(
174
+ task_instructions=task_instructions,
175
+ context=context,
176
+ extra_context=extra_context,
177
+ system_prompt=agent_data.get("system_prompt"),
178
+ )
179
+ max_completion_tokens = int(getattr(agent, "max_tokens", 0) or 0)
180
+ estimated_preflight_cost = budget_service.estimate_cost(
181
+ agent_data.get("api_provider"),
182
+ agent_data.get("model"),
183
+ prompt_tokens,
184
+ max_completion_tokens,
185
+ )
186
+ budget_service.check_before_run(
187
+ project_id=project_id,
188
+ estimated_tokens=prompt_tokens + max_completion_tokens,
189
+ estimated_cost=estimated_preflight_cost,
190
+ )
191
  result = await agent.run(task_instructions, context, extra_context=extra_context)
192
  duration = time.time() - start_time
193
 
 
205
 
206
  quality_review = validate_output(quality_task, result)
207
  result["quality_review"] = quality_review
208
+ claims_count = await evidence_service.replace_task_claims(task, result)
209
+ completion_tokens = budget_service.estimate_completion_tokens(result)
210
+ estimated_cost = budget_service.estimate_cost(
211
+ agent_data.get("api_provider"),
212
+ agent_data.get("model"),
213
+ prompt_tokens,
214
+ completion_tokens,
215
+ )
216
+ budget_service.record_usage(
217
+ project_id=project_id,
218
+ task_id=task_id,
219
+ run_id=run_id,
220
+ agent_id=agent_data.get("id"),
221
+ provider=agent_data.get("api_provider"),
222
+ model=agent_data.get("model"),
223
+ prompt_tokens=prompt_tokens,
224
+ completion_tokens=completion_tokens,
225
+ estimated_cost=estimated_cost,
226
+ metadata={"duration_seconds": round(duration, 2), "claims_count": claims_count},
227
+ )
228
 
229
  # 6. Save to Cache
230
  AgentRunnerService._task_cache[cache_key] = result
 
234
  "status": "awaiting_approval",
235
  "output_data": result
236
  }).eq("id", task_id).execute()
237
+ await audit_service.log_action(
238
+ user_id=None,
239
+ action="task_status_changed",
240
+ agent_id=agent_data.get("id"),
241
+ task_id=task_id,
242
+ metadata={
243
+ "project_id": project_id,
244
+ "run_id": run_id,
245
+ "status": "awaiting_approval",
246
+ "quality_approved": quality_review["approved"],
247
+ "claims_count": claims_count,
248
+ "estimated_tokens": prompt_tokens + completion_tokens,
249
+ "estimated_cost": float(estimated_cost),
250
+ },
251
+ )
252
 
253
  # 7. Update Run Status
254
+ _update_task_run(run_id, {
255
  "status": "completed",
256
  "finished_at": datetime.now(timezone.utc).isoformat(),
257
  "duration_seconds": round(duration, 2)
258
+ })
259
 
260
  # 8. Log Completion with Metrics
261
  supabase.table("agent_logs").insert({
 
272
  "action": "quality_review_failed",
273
  "content": f"Quality review failed: {', '.join(quality_review['fail_reasons'])}"
274
  }).execute()
275
+ await audit_service.log_action(
276
+ user_id=None,
277
+ action="task_quality_review_failed",
278
+ agent_id=agent_data.get("id"),
279
+ task_id=task_id,
280
+ metadata={
281
+ "project_id": project_id,
282
+ "run_id": run_id,
283
+ "fail_reasons": quality_review.get("fail_reasons", []),
284
+ },
285
+ )
286
 
287
  return result, run_id
288
 
289
+ except BudgetExceededError as e:
290
+ logger.warning(f"Budget blocked task {task_id}: {str(e)}")
291
+ if run_id:
292
+ _update_task_run(run_id, {
293
+ "status": "cancelled",
294
+ "error_message": str(e),
295
+ "finished_at": datetime.now(timezone.utc).isoformat()
296
+ })
297
+
298
+ if update_task:
299
+ supabase.table("tasks").update({
300
+ "status": "failed",
301
+ "output_data": {"error": str(e), "budget_blocked": True}
302
+ }).eq("id", task_id).execute()
303
+ await audit_service.log_action(
304
+ user_id=None,
305
+ action="task_budget_blocked",
306
+ agent_id=agent_data.get("id"),
307
+ task_id=task_id,
308
+ metadata={"project_id": project_id, "run_id": run_id, "error": str(e)},
309
+ )
310
+
311
+ supabase.table("agent_logs").insert({
312
+ "task_id": task_id,
313
+ "run_id": run_id,
314
+ "action": "budget_blocked",
315
+ "content": f"Budget blocked execution: {str(e)}"
316
+ }).execute()
317
+
318
+ raise e
319
+
320
  except Exception as e:
321
  logger.error(f"Error executing task {task_id}: {str(e)}")
322
  if run_id:
323
+ _update_task_run(run_id, {
324
  "status": "failed",
325
  "finished_at": datetime.now(timezone.utc).isoformat()
326
+ })
327
 
328
  if update_task:
329
  supabase.table("tasks").update({
330
  "status": "failed",
331
  "output_data": {"error": str(e)}
332
  }).eq("id", task_id).execute()
333
+ await audit_service.log_action(
334
+ user_id=None,
335
+ action="task_status_changed",
336
+ agent_id=agent_data.get("id"),
337
+ task_id=task_id,
338
+ metadata={
339
+ "project_id": project_id,
340
+ "run_id": run_id,
341
+ "status": "failed",
342
+ "error": str(e),
343
+ },
344
+ )
345
 
346
  # LOG ERROR TO AGENT CONSOLE
347
  supabase.table("agent_logs").insert({
backend/services/memory_service.py ADDED
@@ -0,0 +1,112 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ from typing import List, Dict, Any, Optional
3
+ from services.supabase_service import supabase
4
+ from services.embedding_service import embedding_service
5
+
6
+ logger = logging.getLogger("uvicorn")
7
+
8
+ class MemoryService:
9
+ """
10
+ Handles vectorized long-term memory for Aubm projects.
11
+ Allows agents to retrieve context from past projects and approved work.
12
+ """
13
+
14
+ async def save_memory(
15
+ self,
16
+ project_id: str,
17
+ content: str,
18
+ task_id: Optional[str] = None,
19
+ memory_type: str = "approved_output",
20
+ metadata: Optional[Dict[str, Any]] = None
21
+ ) -> bool:
22
+ """
23
+ Vectorizes content and saves it to project_memory.
24
+ """
25
+ try:
26
+ if not content or len(content.strip()) < 10:
27
+ return False
28
+
29
+ embedding = await embedding_service.get_embedding(content)
30
+
31
+ data = {
32
+ "project_id": project_id,
33
+ "task_id": task_id,
34
+ "content": content,
35
+ "embedding": embedding,
36
+ "memory_type": memory_type,
37
+ "metadata": metadata or {}
38
+ }
39
+
40
+ result = supabase.table("project_memory").insert(data).execute()
41
+ return len(result.data) > 0
42
+ except Exception as e:
43
+ logger.error(f"Failed to save memory: {e}")
44
+ return False
45
+
46
+ async def search_memory(
47
+ self,
48
+ query: str,
49
+ limit: int = 5,
50
+ threshold: float = 0.7,
51
+ project_id: Optional[str] = None
52
+ ) -> List[Dict[str, Any]]:
53
+ """
54
+ Performs semantic search across project memory.
55
+ If project_id is provided, filters memory to that project only (short-term).
56
+ If project_id is None, searches cross-project (long-term).
57
+ """
58
+ try:
59
+ query_embedding = await embedding_service.get_embedding(query)
60
+
61
+ # Use the match_project_memory RPC function defined in add_vector_memory.sql
62
+ rpc_params = {
63
+ "query_embedding": query_embedding,
64
+ "match_threshold": threshold,
65
+ "match_count": limit,
66
+ }
67
+
68
+ if project_id:
69
+ rpc_params["filter_project_id"] = project_id
70
+
71
+ result = supabase.rpc("match_project_memory", rpc_params).execute()
72
+ return result.data or []
73
+ except Exception as e:
74
+ logger.error(f"Failed to search memory: {e}")
75
+ return []
76
+
77
+ async def index_task_output(self, task: Dict[str, Any]) -> bool:
78
+ """
79
+ Specialized indexer for approved task outputs.
80
+ """
81
+ output_data = task.get("output_data")
82
+ if not output_data:
83
+ return False
84
+
85
+ # Extract meaningful text from output
86
+ content = ""
87
+ if isinstance(output_data, str):
88
+ content = output_data
89
+ elif isinstance(output_data, dict):
90
+ # Try to get the primary content
91
+ content = (
92
+ output_data.get("data") or
93
+ output_data.get("strategicConclusion") or
94
+ output_data.get("raw_output") or
95
+ str(output_data)
96
+ )
97
+
98
+ if not content:
99
+ return False
100
+
101
+ return await self.save_memory(
102
+ project_id=task.get("project_id"),
103
+ task_id=task.get("id"),
104
+ content=str(content),
105
+ memory_type="approved_output",
106
+ metadata={
107
+ "task_title": task.get("title"),
108
+ "agent_id": task.get("assigned_agent_id")
109
+ }
110
+ )
111
+
112
+ memory_service = MemoryService()