aubm / backend /services /orchestrator_service.py
cesjavi's picture
Initial clean commit v11 for HF
95810d1
from services.supabase_service import supabase
from agents.agent_factory import AgentFactory
import json
import logging
import re
from services.config import settings
from services.agent_runner_service import AgentRunnerService
from services.audit_service import audit_service
from services.evidence_service import evidence_service
from services.output_quality import clean_report_text, dedupe_lines, filter_report_sections, validate_output
logger = logging.getLogger("uvicorn")
NOISY_REPORT_KEYS = {
"raw_text",
"sampleBackendCode",
"sampleUploadSnippet",
"sampleSearchEndpoint",
"sampleRedisCartHelper",
"sampleWebhookHandler",
"sampleStateMachine",
"repositoryStructure",
"wireframes",
"dataModel",
"userStories",
}
def _humanize_key(key: str) -> str:
return key.replace("_", " ").replace("-", " ").strip().title()
def _format_value_for_report(value, level: int = 0) -> list[str]:
if value is None:
return ["Not specified."]
if isinstance(value, (str, int, float, bool)):
return [str(value)]
if isinstance(value, list):
lines: list[str] = []
for item in value:
if isinstance(item, dict):
item_lines = _format_value_for_report(item, level + 1)
if item_lines:
lines.append(f"- {item_lines[0]}")
lines.extend(f" {line}" for line in item_lines[1:])
elif isinstance(item, list):
nested = _format_value_for_report(item, level + 1)
lines.extend(f"- {line}" for line in nested)
else:
lines.append(f"- {item}")
return lines or ["No items."]
if isinstance(value, dict):
lines: list[str] = []
for key, item in value.items():
if str(key) in NOISY_REPORT_KEYS:
continue
title = _humanize_key(str(key))
if isinstance(item, dict):
lines.append(f"{title}:")
lines.extend(f" {line}" for line in _format_value_for_report(item, level + 1))
elif isinstance(item, list):
lines.append(f"{title}:")
lines.extend(f" {line}" for line in _format_value_for_report(item, level + 1))
else:
lines.append(f"{title}: {item}")
return lines or ["No details."]
return [str(value)]
def _extract_json_payload(text: str):
if not text:
return None
stripped = text.strip()
# 1. Try standard block extraction
if stripped.startswith("```"):
cleaned = stripped.strip("`")
if cleaned.lower().startswith("json"):
cleaned = cleaned[4:].strip()
try:
return json.loads(cleaned)
except Exception:
pass # Fallback to regex
# 2. Try direct parsing
try:
return json.loads(stripped)
except Exception:
pass
# 3. Robust Regex Search (find content between first { and last })
# This is the "Repair Layer" for noisy LLM outputs
try:
# Search for anything starting with { and ending with }
# across multiple lines
match = re.search(r'(\{.*\})', stripped, re.DOTALL)
if match:
return json.loads(match.group(1))
except Exception:
pass
# 4. Specific Markdown Block Search
match = re.search(r"```json\s*(.*?)\s*```", text, re.IGNORECASE | re.DOTALL)
if match:
try:
return json.loads(match.group(1))
except Exception:
pass
return None
def _format_output_for_report(output_data) -> str:
if not output_data:
return "No approved output was saved for this task."
if isinstance(output_data, dict):
primary = (
output_data.get("data")
or output_data.get("final")
or output_data.get("raw_output")
or output_data
)
else:
primary = output_data
if isinstance(primary, str):
parsed = _extract_json_payload(primary)
if parsed is not None:
return clean_report_text(dedupe_lines("\n".join(_format_value_for_report(parsed))))
return clean_report_text(dedupe_lines(primary))
return clean_report_text(dedupe_lines("\n".join(_format_value_for_report(primary))))
def _is_empty_curated_text(text: str) -> bool:
normalized = (text or "").strip().lower()
return normalized in {
"",
"no approved output was saved for this task.",
"{}",
"[]",
}
def _is_empty_report_variant(text: str | None) -> bool:
normalized = clean_report_text(dedupe_lines(text or "")).strip()
content_words = re.findall(r"[A-Za-z0-9_]+", normalized)
lower = normalized.lower()
return (
len(content_words) < 20
or lower in {"{}", "[]", "null", "none", "no details.", "not specified."}
or lower.startswith("```")
)
def _format_conclusion_payload(data: dict) -> str:
conclusion = data.get("strategicConclusion") or data.get("conclusion") or data.get("content") or ""
next_steps = data.get("nextSteps") or data.get("next_steps") or []
lines: list[str] = []
if isinstance(conclusion, str) and conclusion.strip():
lines.append(conclusion.strip())
usable_steps = [
step.strip()
for step in next_steps
if isinstance(step, str) and step.strip()
] if isinstance(next_steps, list) else []
if usable_steps:
lines.append("")
lines.append("Next steps:")
for step in usable_steps[:5]:
lines.append(f"- {step}")
return "\n".join(lines).strip() or "\n".join(_format_value_for_report(data))
def _has_usable_output(output_data) -> bool:
if not output_data:
return False
if isinstance(output_data, dict):
if output_data.get("error"):
return False
primary = output_data.get("data")
if primary in (None, "", [], {}):
return False
return True
def _output_text(output_data) -> str:
return _format_output_for_report(output_data).lower()
def _build_report_charts(tasks: list[dict]) -> dict:
total = len(tasks)
done = sum(1 for task in tasks if task.get("status") == "done")
failed = sum(1 for task in tasks if task.get("status") == "failed")
pending = max(total - done - failed, 0)
priority_counts: dict[str, int] = {}
for task in tasks:
priority = str(task.get("priority") if task.get("priority") is not None else 0)
priority_counts[priority] = priority_counts.get(priority, 0) + 1
categories = {
"Market": ("market", "competitor", "customer", "segment", "demand"),
"Product": ("product", "mvp", "feature", "design", "scope"),
"Revenue": ("revenue", "price", "pricing", "margin", "commission"),
"Operations": ("operation", "process", "logistic", "support", "fulfillment"),
"Risk": ("risk", "threat", "failure", "weak", "mitigation")
}
category_counts = {name: 0 for name in categories}
risk_mentions = 0
for task in tasks:
text = f"{task.get('title', '')} {task.get('description', '')} {_output_text(task.get('output_data'))}".lower()
risk_mentions += sum(text.count(term) for term in categories["Risk"])
for category, terms in categories.items():
if any(term in text for term in terms):
category_counts[category] += 1
opportunity_score = 85 if total and done == total else round((done / total) * 85) if total else 0
risk_score = min(95, 35 + risk_mentions * 3)
readiness_score = round((done / total) * 100) if total else 0
return {
"status": [
{"label": "Approved", "value": done},
{"label": "Pending", "value": pending},
{"label": "Failed", "value": failed}
],
"priorities": [
{"label": f"Priority {key}", "value": value}
for key, value in sorted(priority_counts.items(), key=lambda item: int(item[0]) if item[0].isdigit() else 0, reverse=True)
],
"categories": [
{"label": label, "value": value}
for label, value in category_counts.items()
],
"scores": [
{"label": "Readiness", "value": readiness_score},
{"label": "Opportunity", "value": opportunity_score},
{"label": "Risk", "value": risk_score}
]
}
def _format_chart_rows(title: str, rows: list[dict]) -> list[str]:
if not rows:
return [f"### {title}", "No data available.", ""]
lines = [f"### {title}"]
lines.extend(f"- {row['label']}: {row['value']}" for row in rows)
lines.append("")
return lines
def _format_execution_summary(charts: dict, total_tasks: int, kept_task_count: int, excluded_count: int) -> list[str]:
lines = [
f"- Total tasks: {total_tasks}",
f"- Included outputs: {kept_task_count}",
f"- Excluded outputs: {excluded_count}",
"",
]
lines.extend(_format_chart_rows("Scores", charts.get("scores", [])))
lines.extend(_format_chart_rows("Task Categories", charts.get("categories", [])))
lines.extend(_format_chart_rows("Priorities", charts.get("priorities", [])))
return lines
async def _format_evidence_summary(project_id: str, claims: list[dict]) -> list[str]:
if not claims:
return []
# Get semantically merged claims for the "Strategic Findings" section
merged_claims = await evidence_service.merge_project_claims(project_id, threshold=0.88)
summary = evidence_service.summarize_claims(claims)
lines = [
"## Strategic Findings & Evidence",
f"The analysis has consolidated **{summary['claim_count']}** unique data points into **{len(merged_claims)}** strategic findings.",
f"Source coverage: **{summary['source_coverage']:.0%}** (Claims backed by external evidence).",
"",
"### Key Consolidated Findings",
]
# Show merged claims with their confidence and sources
for claim in merged_claims[:15]:
text = claim.get("claim_text")
entity = claim.get("entity_name")
source = claim.get("source_url")
confidence = claim.get("confidence", "unknown")
merged_count = claim.get("merged_count", 1)
prefix = f"**[{entity}]** " if entity else ""
source_suffix = f" [Source: {source}]" if source else " [Internal Analysis]"
repetition_suffix = f" (Verified by {merged_count} sources)" if merged_count > 1 else ""
lines.append(f"- {prefix}{text}{repetition_suffix}{source_suffix}")
if summary["by_entity"]:
lines.append("")
lines.append("### Entity Analysis Coverage")
for entity, count in list(summary["by_entity"].items())[:8]:
lines.append(f"- **{entity}**: {count} supporting claims identified.")
lines.append("")
return lines
REPORT_VARIANTS = {
"full": {
"title": "Final Report",
"agent_terms": [],
"fallback_heading": "Approved Work Summary",
"prompt": ""
},
"brief": {
"title": "Short Brief",
"agent_terms": ["brief", "summary", "writer"],
"fallback_heading": "Short Brief",
"prompt": (
"Create a concise executive brief from the approved project work. "
"Use plain English, no JSON, no code blocks. Include: objective, main findings, recommended next steps, and key risks. "
"Keep it short and decision-oriented. Do not invent entities, metrics, or placeholders."
)
},
"presentation": {
"title": "Presentation Slides",
"agent_terms": ["brief", "writer", "summary"],
"fallback_heading": "Presentation Outline",
"prompt": (
"Transform the approved project work into a high-impact presentation deck structure. "
"For each slide, provide a Title and 3-4 concise bullet points. "
"Use plain English, no JSON, no code blocks. "
"Include: 1. Title Slide, 2. Objective, 3. Market/Problem Context, 4. Strategic Findings, 5. Proposed Solution/Roadmap, 6. Key Risks, 7. Final Recommendation. "
"Focus on visual clarity and executive communication."
)
}
}
class OrchestratorService:
"""
Handles complex multi-agent workflows like Debates and Peer Reviews.
"""
async def run_debate(self, task_id: str, agent_a_id: str, agent_b_id: str):
"""
Executes a debate between two agents for a specific task.
"""
try:
# 1. Fetch task and agents
task = supabase.table("tasks").select("*").eq("id", task_id).single().execute().data
agent_a_data = supabase.table("agents").select("*").eq("id", agent_a_id).single().execute().data
agent_b_data = supabase.table("agents").select("*").eq("id", agent_b_id).single().execute().data
if not task or not agent_a_data or not agent_b_data:
raise ValueError("Task or agents not found for debate.")
# Update status to in_progress
supabase.table("tasks").update({"status": "in_progress"}).eq("id", task_id).execute()
await audit_service.log_action(
user_id=None,
action="debate_started",
agent_id=agent_a_id,
task_id=task_id,
metadata={"agent_b_id": agent_b_id, "project_id": task.get("project_id")},
)
# 2. Agent A generates initial response
initial_res, _ = await AgentRunnerService.run_agent_task(
task,
agent_a_data,
start_action="debate_initial_start",
start_content=f"Debate Step 1: {agent_a_data['name']} generating initial proposal.",
complete_action="debate_initial_complete",
update_task=False
)
# 3. Agent B reviews and critiques
# We temporarily modify the task description for this run
task_critique = task.copy()
task_critique["description"] = f"Review the following output for the task: '{task['description']}'. Provide constructive critique and identify errors.\n\nOutput: {json.dumps(initial_res['data'])}"
critique_res, _ = await AgentRunnerService.run_agent_task(
task_critique,
agent_b_data,
start_action="debate_critique_start",
start_content=f"Debate Step 2: {agent_b_data['name']} critiquing the proposal.",
complete_action="debate_critique_complete",
update_task=False
)
# 4. Agent A refines based on critique
task_refinement = task.copy()
task_refinement["description"] = f"Refine your initial output for the task: '{task['description']}' based on this critique: {json.dumps(critique_res['data'])}"
final_res, _ = await AgentRunnerService.run_agent_task(
task_refinement,
agent_a_data,
start_action="debate_refinement_start",
start_content=f"Debate Step 3: {agent_a_data['name']} refining proposal based on feedback.",
complete_action="debate_refinement_complete",
update_task=False
)
# 5. Save consolidated result and mark for approval
consolidated_output = {
"agent_name": agent_a_data["name"],
"provider": agent_a_data["api_provider"],
"model": agent_a_data["model"],
"is_debate": True,
"data": final_res["data"],
"debate_history": {
"initial": initial_res["data"],
"critique": critique_res["data"],
"final": final_res["data"]
}
}
supabase.table("tasks").update({
"status": "awaiting_approval",
"output_data": consolidated_output
}).eq("id", task_id).execute()
claims_count = await evidence_service.replace_task_claims(task, consolidated_output)
await audit_service.log_action(
user_id=None,
action="debate_completed",
agent_id=agent_a_id,
task_id=task_id,
metadata={"agent_b_id": agent_b_id, "project_id": task.get("project_id"), "claims_count": claims_count},
)
logger.info(f"Debate completed for task {task_id}")
except Exception as e:
logger.error(f"Debate failed: {str(e)}")
supabase.table("tasks").update({
"status": "failed",
"output_data": {"error": str(e)}
}).eq("id", task_id).execute()
await audit_service.log_action(
user_id=None,
action="debate_failed",
agent_id=agent_a_id,
task_id=task_id,
metadata={"agent_b_id": agent_b_id, "error": str(e)},
)
# LOG ERROR TO AGENT CONSOLE
supabase.table("agent_logs").insert({
"task_id": task_id,
"action": "debate_failed",
"content": f"DEBATE ERROR: {str(e)}"
}).execute()
async def run_project(self, project_id: str):
"""
Runs queued tasks in a project sequentially. Unassigned tasks are assigned
to the first available project-owner or global agent.
"""
project = supabase.table("projects").select("*").eq("id", project_id).single().execute().data
if not project:
raise ValueError(f"Project not found: {project_id}")
owner_id = project.get("owner_id")
tasks = (
supabase.table("tasks")
.select("*")
.eq("project_id", project_id)
.in_("status", ["todo", "failed", "queued"])
.order("priority", desc=True)
.order("created_at", desc=False)
.execute()
.data
or []
)
# Check if ANY tasks exist for this project (regardless of status) to avoid re-decomposing
all_tasks_res = supabase.table("tasks").select("id", count="exact").eq("project_id", project_id).limit(1).execute()
has_any_tasks = all_tasks_res.count > 0 if all_tasks_res.count is not None else len(all_tasks_res.data) > 0
# Automatic Decomposition: Only if no tasks exist AT ALL
if not has_any_tasks:
logger.info(f"No tasks found for project {project_id}. Triggering auto-decomposition.")
await self.decompose_project(project_id)
# Re-fetch tasks after decomposition
tasks = (
supabase.table("tasks")
.select("*")
.eq("project_id", project_id)
.in_("status", ["todo", "failed", "queued"])
.order("priority", desc=True)
.order("created_at", desc=False)
.execute()
.data
or []
)
agents = supabase.table("agents").select("*").execute().data or []
available_agents = [
agent for agent in agents
if agent.get("user_id") in (None, owner_id) or agent.get("id") in {t.get("assigned_agent_id") for t in tasks if t.get("assigned_agent_id")}
]
completed = 0
failed = 0
for task in tasks:
try:
agent_data = self._resolve_agent(task, available_agents)
if not agent_data:
raise ValueError("No available agent for task")
if not task.get("assigned_agent_id"):
supabase.table("tasks").update({
"assigned_agent_id": agent_data["id"]
}).eq("id", task["id"]).execute()
task["assigned_agent_id"] = agent_data["id"]
await self._run_task(task, agent_data)
completed += 1
except Exception as exc:
failed += 1
logger.error(f"Project orchestration task failed: {str(exc)}")
supabase.table("tasks").update({
"status": "failed",
"output_data": {"error": str(exc)}
}).eq("id", task["id"]).execute()
return {
"project_id": project_id,
"queued_tasks": len(tasks),
"completed": completed,
"failed": failed,
}
async def queue_project(self, project_id: str):
"""
Assigns available agents and queues runnable project tasks for worker execution.
"""
from services.task_queue import TaskQueueService
project = supabase.table("projects").select("*").eq("id", project_id).single().execute().data
if not project:
raise ValueError(f"Project not found: {project_id}")
if project.get("status") == "completed":
raise ValueError("Completed projects are locked and cannot be modified.")
owner_id = project.get("owner_id")
tasks = (
supabase.table("tasks")
.select("*")
.eq("project_id", project_id)
.in_("status", ["todo", "failed", "queued"])
.order("priority", desc=True)
.order("created_at", desc=False)
.execute()
.data
or []
)
all_tasks_res = supabase.table("tasks").select("id", count="exact").eq("project_id", project_id).limit(1).execute()
has_any_tasks = all_tasks_res.count > 0 if all_tasks_res.count is not None else len(all_tasks_res.data) > 0
if not has_any_tasks:
logger.info(f"No tasks found for project {project_id}. Triggering auto-decomposition before queueing.")
await self.decompose_project(project_id)
tasks = (
supabase.table("tasks")
.select("*")
.eq("project_id", project_id)
.in_("status", ["todo", "failed", "queued"])
.order("priority", desc=True)
.order("created_at", desc=False)
.execute()
.data
or []
)
agents = supabase.table("agents").select("*").execute().data or []
assigned_ids = {t.get("assigned_agent_id") for t in tasks if t.get("assigned_agent_id")}
available_agents = [
agent for agent in agents
if agent.get("user_id") in (None, owner_id) or agent.get("id") in assigned_ids
]
queued = 0
failed = 0
skipped = 0
for task in tasks:
try:
agent_data = self._resolve_agent(task, available_agents)
if not agent_data:
raise ValueError("No available agent for task")
if not task.get("assigned_agent_id"):
supabase.table("tasks").update({
"assigned_agent_id": agent_data["id"]
}).eq("id", task["id"]).execute()
result = await TaskQueueService.queue_task(task["id"])
if result and result.data:
queued += 1
else:
skipped += 1
except Exception as exc:
failed += 1
logger.error(f"Project queueing task failed: {str(exc)}")
supabase.table("tasks").update({
"status": "failed",
"last_error": str(exc),
"output_data": {"error": str(exc)}
}).eq("id", task["id"]).execute()
await audit_service.log_action(
user_id=owner_id,
action="task_queue_failed",
task_id=task.get("id"),
metadata={"project_id": project_id, "error": str(exc)},
)
await audit_service.log_action(
user_id=owner_id,
action="project_queued",
metadata={
"project_id": project_id,
"queued_tasks": queued,
"failed": failed,
"skipped": skipped,
},
)
return {
"project_id": project_id,
"queued_tasks": queued,
"failed": failed,
"skipped": skipped,
"mode": "queue",
}
def _select_report_agent(self, project: dict, variant: str):
config = REPORT_VARIANTS.get(variant, REPORT_VARIANTS["full"])
terms = config["agent_terms"]
if not terms:
return None
owner_id = project.get("owner_id")
agents = supabase.table("agents").select("*").execute().data or []
available_agents = [
agent for agent in agents
if agent.get("user_id") in (None, owner_id)
]
return next(
(
agent for agent in available_agents
if any(term in f"{agent.get('name', '')} {agent.get('role', '')}".lower() for term in terms)
),
available_agents[0] if available_agents else None
)
async def _generate_report_variant_with_agent(self, project: dict, report: str, variant: str):
agent_data = self._select_report_agent(project, variant)
if not agent_data:
return None
config = REPORT_VARIANTS[variant]
agent = AgentFactory.get_agent(
provider=agent_data["api_provider"],
name=agent_data["name"],
role=agent_data["role"],
model=agent_data["model"],
system_prompt=agent_data.get("system_prompt")
)
result = await agent.run(f"{config['prompt']}\n\nApproved project material:\n{report}", [])
if result.get("status") == "error":
raise RuntimeError(result.get("error") or "Report agent returned an error.")
data = result.get("data")
if isinstance(data, dict):
for key in ("brief", "analysis", "report", "summary", "content"):
value = data.get(key)
if isinstance(value, str) and not _is_empty_report_variant(value):
return value
formatted = "\n".join(_format_value_for_report(data))
return None if _is_empty_report_variant(formatted) else formatted
if isinstance(data, str):
return None if _is_empty_report_variant(data) else data
raw_output = result.get("raw_output")
return None if _is_empty_report_variant(raw_output) else raw_output
def _build_fallback_variant(self, project: dict, tasks: list[dict], variant: str):
config = REPORT_VARIANTS[variant]
lines = [
f"# {config['title']}: {project['name']}",
"",
"## Project Brief",
project.get("description") or "No project description provided.",
"",
f"## {config['fallback_heading']}"
]
if variant == "brief":
lines.extend([
f"All {len(tasks)} approved tasks have been consolidated.",
"The project is ready for decision review based on the approved task outputs.",
"",
"Recommended next steps:",
"- Validate the highest-impact assumptions with real users or customers.",
"- Prioritize the smallest launch scope that proves demand.",
"- Convert approved outputs into an execution backlog with owners and dates."
])
return "\n".join(lines)
if variant == "pessimistic":
lines.extend([
"This project can still fail even with all tasks approved.",
"",
"Primary downside risks:",
"- Approved task outputs may be internally consistent but unvalidated by the market.",
"- Revenue, conversion, operational, and adoption assumptions may be too optimistic.",
"- Execution scope can expand faster than the team can deliver.",
"- Competitors can respond with pricing, distribution, or trust advantages.",
"",
"Mitigation priorities:",
"- Validate demand before building broad feature scope.",
"- Stress-test unit economics and support costs.",
"- Define kill criteria before committing more resources."
])
return "\n".join(lines)
return None
def _quality_approved_tasks(self, tasks: list[dict], project: dict) -> tuple[list[dict], list[dict]]:
approved: list[dict] = []
excluded: list[dict] = []
for task in tasks:
output_data = task.get("output_data") or {}
if not _has_usable_output(output_data):
excluded.append({
"title": task.get("title", "Untitled task"),
"reasons": ["Task has no usable approved output."]
})
continue
task_with_project = {**task, "project": project}
quality_review = output_data.get("quality_review") if isinstance(output_data, dict) else None
if not quality_review and isinstance(output_data, dict):
quality_review = validate_output(task_with_project, output_data)
if quality_review and not quality_review.get("approved", False):
excluded.append({
"title": task.get("title", "Untitled task"),
"reasons": quality_review.get("fail_reasons") or ["Failed quality review."]
})
continue
approved.append(task)
return approved, excluded
def _curate_task_output(self, output_data) -> tuple[str, list[str]]:
text = _format_output_for_report(output_data)
text = clean_report_text(dedupe_lines(text))
text, excluded_lines = filter_report_sections(text)
return text or "No approved output was saved for this task.", excluded_lines
async def build_final_report(self, project_id: str, variant: str = "full"):
variant = variant if variant in REPORT_VARIANTS else "full"
project = supabase.table("projects").select("*").eq("id", project_id).single().execute().data
if not project:
raise ValueError(f"Project not found: {project_id}")
tasks = (
supabase.table("tasks")
.select("title,description,status,priority,output_data,created_at")
.eq("project_id", project_id)
.order("priority", desc=True)
.order("created_at", desc=False)
.execute()
.data
or []
)
if not tasks:
raise ValueError("Project has no tasks to summarize.")
incomplete = [task for task in tasks if task.get("status") != "done"]
if incomplete:
raise ValueError(f"Final report is available after all tasks are approved. Pending tasks: {len(incomplete)}")
curated_tasks, excluded_tasks = self._quality_approved_tasks(tasks, project)
if not curated_tasks:
# Fallback: if no tasks pass the strict quality review, include all 'done' tasks
# so the user can at least see a draft report.
logger.warning(f"Project {project_id}: No tasks passed quality review. Falling back to all tasks.")
curated_tasks = tasks
# Load raw claims for statistics, and we will use semantic merging inside _format_evidence_summary
all_raw_claims = evidence_service.load_project_claims(project_id)
merged_claims = await evidence_service.merge_project_claims(project_id)
# 0. Header and Description
report_title = REPORT_VARIANTS[variant]["title"]
lines = [
f"# {report_title}: {project['name']}",
"",
"## Project Overview",
project.get("description") or "No description provided.",
""
]
# Add Context if exists
if project.get("context"):
lines.extend(["## Context", project["context"], ""])
approved_work_lines = ["## Approved Work Summary", ""]
report_exclusions: list[str] = []
included_tasks: list[dict] = []
kept_task_count = 0
for task in curated_tasks:
curated_text, excluded_lines = self._curate_task_output(task.get("output_data"))
report_exclusions.extend(excluded_lines)
if _is_empty_curated_text(curated_text):
excluded_tasks.append({
"title": task.get("title", "Untitled task"),
"reasons": ["Task output became empty after quality filtering."]
})
continue
kept_task_count += 1
included_tasks.append(task)
approved_work_lines.extend([
f"### {kept_task_count}. {task['title']}",
task.get("description") or "No task description provided.",
"",
curated_text,
""
])
charts = _build_report_charts(included_tasks)
lines.extend(["## Execution Summary", ""])
lines.extend(_format_execution_summary(charts, len(tasks), kept_task_count, len(excluded_tasks)))
# New Evidence-Aware Strategic Findings Section
evidence_section = await _format_evidence_summary(project_id, all_raw_claims)
lines.extend(evidence_section)
lines.extend(approved_work_lines)
if excluded_tasks or report_exclusions:
lines.extend(["## Excluded Content", ""])
for excluded in excluded_tasks:
lines.append(f"- Excluded task output: {excluded['title']} ({'; '.join(excluded['reasons'])})")
for excluded_line in list(dict.fromkeys(report_exclusions))[:10]:
if excluded_line:
lines.append(f"- {excluded_line}")
lines.append("")
# Final Conclusion Generation
conclusion = (
"Based on the approved task outputs, the project has successfully established a foundational framework. "
"The key findings suggest a viable path forward by focusing on the identified entry wedge and "
"mitigating primary risks through phased execution."
)
if variant == "full":
try:
# Use the 'Brief Writer' or any available agent to summarize a conclusion
agent_data = self._select_report_agent(project, "brief")
if agent_data:
agent = AgentFactory.get_agent(
provider=agent_data["api_provider"],
name=agent_data["name"],
role=agent_data["role"],
model=agent_data["model"],
system_prompt=(
"You are a Senior Strategic Consultant. Your goal is to write a comprehensive, "
"professional strategic conclusion for a project report based on approved work. "
"Synthesize the findings, highlight critical success factors, identify remaining "
"operational or market risks, and provide 3-5 high-impact, actionable next steps. "
"The tone should be executive, insightful, and strictly based on provided facts. "
"Avoid generic filler or unsupported placeholders."
)
)
report_so_far = "\n".join(lines)
# Feed the strategic conclusion agent with the consolidated findings for maximum accuracy
evidence_context = "\n".join(evidence_section)
res = await agent.run(
f"Project: {project['name']}\n"
f"Consolidated Strategic Findings:\n{evidence_context}\n\n"
f"Full Report Context:\n{report_so_far}\n\n"
"Task: Write a final strategic conclusion and 3-5 next steps based on the findings above.",
[]
)
if res.get("status") != "error":
data = res.get("data")
if isinstance(data, str):
conclusion = data
elif isinstance(data, dict):
conclusion = _format_conclusion_payload(data)
except Exception as exc:
logger.warning(f"Failed to generate dynamic conclusion: {exc}")
lines.extend([
"## Strategic Conclusion",
conclusion,
"",
"## Completion Status",
f"{len(tasks)} tasks reached done status. {kept_task_count} task outputs were included in the final report. {len(excluded_tasks)} task outputs were excluded from the final report."
])
supabase.table("projects").update({"status": "completed"}).eq("id", project_id).execute()
report = "\n".join(lines)
if variant != "full":
try:
generated = await self._generate_report_variant_with_agent(project, report, variant)
fallback_report = self._build_fallback_variant(project, included_tasks or tasks, variant)
report = generated if not _is_empty_report_variant(generated) else fallback_report or report
except Exception as exc:
logger.warning(f"Report variant generation failed: {exc}")
report = self._build_fallback_variant(project, included_tasks or tasks, variant) or report
await audit_service.log_action(
user_id=project.get("owner_id"),
action="final_report_generated",
metadata={
"project_id": project_id,
"variant": variant,
"task_count": kept_task_count,
"excluded_task_count": len(excluded_tasks),
"normalized_claim_count": len(merged_claims),
},
)
return {
"project_id": project_id,
"project_name": project["name"],
"task_count": kept_task_count,
"variant": variant,
"report": clean_report_text(dedupe_lines(report)),
"charts": charts,
"evidence": evidence_service.summarize_claims(merged_claims),
}
async def decompose_project(self, project_id: str):
"""
Uses a Planner agent to decompose a project into discrete tasks.
"""
project = supabase.table("projects").select("*").eq("id", project_id).single().execute().data
owner_id = project.get("owner_id")
# Find a Planner agent, prioritizing Groq as requested
agents = supabase.table("agents").select("*").execute().data or []
# 1. Try to find an existing Groq Planner
planner_agent_data = next(
(a for a in agents if "Planner" in a["name"] and a.get("api_provider") == "groq"),
None
)
# 2. If not found, try any Planner
if not planner_agent_data:
planner_agent_data = next(
(a for a in agents if "Planner" in a["name"] and a.get("user_id") in (None, owner_id)),
next((a for a in agents if a.get("user_id") in (None, owner_id)), None)
)
# 3. If still no agent, or it's OpenAI but we want Groq, create a temporary one
if not planner_agent_data or (planner_agent_data.get("api_provider") == "openai" and not settings.OPENAI_API_KEY):
logger.info("Using default Groq Planner for decomposition.")
planner = AgentFactory.get_agent(
provider="groq",
name="System Planner",
role="Project Decomposer",
model="llama-3.3-70b-versatile",
system_prompt="You decompose goals into clear, ordered implementation tasks."
)
else:
planner = AgentFactory.get_agent(
provider=planner_agent_data["api_provider"],
name=planner_agent_data["name"],
role=planner_agent_data["role"],
model=planner_agent_data["model"],
system_prompt=planner_agent_data.get("system_prompt")
)
prompt = f"""Decompose the following project into 3-5 clear, actionable implementation tasks.
Project Name: {project['name']}
Description: {project['description']}
Context: {project.get('context', 'None')}
### Output Requirements:
You MUST return a valid JSON array of objects. Each object represents a task.
Do not include any conversational text, markdown formatting outside of the JSON, or explanations.
### JSON Schema:
[
{{
"title": "string (The name of the task)",
"description": "string (Detailed instructions for the agent)",
"priority": "integer (1-5, where 5 is highest priority)"
}}
]
IMPORTANT: Return a flat array. Do not wrap it in a parent 'tasks' object.
Do not use placeholder names or generic filler tasks. Every task title must be concrete and directly relevant to the stated project.
"""
try:
result = await planner.run(prompt, [])
tasks_data = result.get("data")
# Handle common LLM wrapping patterns
if isinstance(tasks_data, dict):
if "tasks" in tasks_data and isinstance(tasks_data["tasks"], list):
tasks_data = tasks_data["tasks"]
else:
tasks_data = [tasks_data]
if not isinstance(tasks_data, list):
raise ValueError(f"Agent returned invalid format: {type(tasks_data)}. Expected list or dict.")
# Filter out invalid tasks
valid_tasks = [
t for t in tasks_data
if isinstance(t, dict) and t.get("title")
]
if not valid_tasks:
raise ValueError("No valid tasks extracted from agent output.")
# Insert tasks
from .project_service import project_service
await project_service.add_tasks_to_project(project_id, valid_tasks)
await audit_service.log_action(
user_id=owner_id,
action="project_decomposed",
metadata={"project_id": project_id, "task_count": len(valid_tasks)},
)
logger.info(f"Auto-decomposed project {project_id} into {len(valid_tasks)} tasks.")
except Exception as e:
logger.error(f"Project decomposition failed: {e}")
def _resolve_agent(self, task: dict, available_agents: list[dict]):
assigned_agent_id = task.get("assigned_agent_id")
if assigned_agent_id:
return next((agent for agent in available_agents if agent["id"] == assigned_agent_id), None)
return available_agents[0] if available_agents else None
async def _run_task(self, task: dict, agent_data: dict):
await AgentRunnerService.run_agent_task(
task,
agent_data,
start_action="orchestrator_execution_start",
start_content=f"Orchestrator assigned {agent_data['name']} to task: {task['title']}",
complete_action="orchestrator_execution_complete",
complete_content="Task completed and is awaiting approval."
)
orchestrator_service = OrchestratorService()