from app.schemas import AgentOutput, AuditReport, Finding, RepoScanResult, Severity SEVERITY_ORDER = { Severity.critical: 0, Severity.high: 1, Severity.medium: 2, Severity.low: 3, } MAX_DISPLAY_FINDINGS = 40 MAX_DISPLAY_FINDINGS_BY_AGENT = { "Security Agent": 20, "Performance Agent": 12, "Quality Agent": 10, "Docs Agent": 12, } MAX_DISPLAY_BY_SEVERITY = { Severity.critical: None, Severity.high: 30, Severity.medium: 18, Severity.low: 12, } SECURITY_CATEGORIES = { "security", "config", "dependency", "cuda_migration", } PRODUCTION_CATEGORIES = { "performance", "quality", "docs", "error_handling", "observability", } AGENT_CATEGORY_DEFAULTS = { "Security Agent": "security", "Config Agent": "config", "Dependency Agent": "dependency", "CUDA-to-ROCm Agent": "cuda_migration", "Performance Agent": "performance", "Quality Agent": "quality", "Docs Agent": "docs", "Error Handling Agent": "error_handling", "Observability Agent": "observability", } SECURITY_WEIGHTS = { Severity.critical: 24, Severity.high: 12, Severity.medium: 5, Severity.low: 1, } PRODUCTION_WEIGHTS = { Severity.critical: 16, Severity.high: 9, Severity.medium: 4, Severity.low: 1, } MAX_SECURITY_CATEGORY_PENALTY = 35 MAX_PRODUCTION_CATEGORY_PENALTY = 28 class SynthesizerAgent: name = "Synthesizer Agent" async def synthesize(self, repo: RepoScanResult, outputs: list[AgentOutput]) -> AuditReport: all_findings = self._dedupe([finding for output in outputs for finding in output.findings]) all_findings.sort(key=self._sort_key) summary = {severity: 0 for severity in Severity} for finding in all_findings: summary[finding.severity] += 1 agent_counts = {output.agent_name: len(output.findings) for output in outputs} display_findings, hidden_count, warnings = self._select_display_findings(all_findings, agent_counts) category_summary = self._category_summary(all_findings) security_score, production_score = self._compute_scores(all_findings) roadmap = self._build_roadmap(all_findings) dependency_cves = [ cve for output in outputs for cve in output.metadata.get("dependency_cves", []) if isinstance(cve, dict) ] dependency_warnings = [ warning for output in outputs for warning in output.metadata.get("warnings", []) if isinstance(warning, str) ] return AuditReport( repo_url=repo.repo_url, scanned_file_count=len(repo.files), skipped_file_count=repo.skipped_files, findings=display_findings, severity_summary=summary, total_findings_count=len(all_findings), displayed_findings_count=len(display_findings), hidden_findings_count=hidden_count, agent_finding_counts=agent_counts, category_summary=category_summary, security_score=security_score, production_score=production_score, remediation_roadmap=roadmap, dependency_cves=dependency_cves, agents_run=[output.agent_name for output in outputs] + [self.name], warnings=repo.warnings + dependency_warnings + warnings, ) def _dedupe(self, findings: list[Finding]) -> list[Finding]: seen: set[tuple[str, int, str, str]] = set() unique: list[Finding] = [] for finding in findings: key = (finding.file_path, finding.line_start, finding.title, finding.agent_source) if key in seen: continue seen.add(key) unique.append(finding) return unique def _select_display_findings( self, findings: list[Finding], agent_counts: dict[str, int], ) -> tuple[list[Finding], int, list[str]]: selected: list[Finding] = [] selected_by_agent = {agent_name: 0 for agent_name in agent_counts} selected_by_severity = {severity: 0 for severity in Severity} for finding in findings: agent_limit = MAX_DISPLAY_FINDINGS_BY_AGENT.get(finding.agent_source, MAX_DISPLAY_FINDINGS) severity_limit = MAX_DISPLAY_BY_SEVERITY[finding.severity] if severity_limit is not None and selected_by_severity[finding.severity] >= severity_limit: continue if selected_by_agent.get(finding.agent_source, 0) >= agent_limit: continue if len(selected) >= MAX_DISPLAY_FINDINGS: break selected.append(finding) selected_by_agent[finding.agent_source] = selected_by_agent.get(finding.agent_source, 0) + 1 selected_by_severity[finding.severity] += 1 if not any(finding.severity == Severity.low for finding in selected): low_findings = [finding for finding in findings if finding.severity == Severity.low] low_slots = MAX_DISPLAY_BY_SEVERITY[Severity.low] or 0 for finding in low_findings[:low_slots]: if finding in selected: continue if len(selected) >= MAX_DISPLAY_FINDINGS: replace_index = self._replaceable_display_index(selected) if replace_index is None: break replaced = selected[replace_index] selected_by_agent[replaced.agent_source] = max( 0, selected_by_agent.get(replaced.agent_source, 0) - 1, ) selected_by_severity[replaced.severity] = max( 0, selected_by_severity[replaced.severity] - 1, ) selected[replace_index] = finding else: selected.append(finding) selected_by_agent[finding.agent_source] = selected_by_agent.get(finding.agent_source, 0) + 1 selected_by_severity[finding.severity] += 1 selected.sort(key=self._sort_key) hidden_count = max(0, len(findings) - len(selected)) warnings: list[str] = [] if hidden_count: warnings.append( f"Report display prioritized {len(selected)} of {len(findings)} findings; " f"{hidden_count} lower-priority findings are hidden from the demo report." ) for agent_name, total_count in agent_counts.items(): displayed_count = selected_by_agent.get(agent_name, 0) hidden_for_agent = total_count - displayed_count if hidden_for_agent > 0: warnings.append(f"{agent_name}: displaying {displayed_count} of {total_count} findings.") return selected, hidden_count, warnings def _replaceable_display_index(self, selected: list[Finding]) -> int | None: for severity in (Severity.low, Severity.medium): for index in range(len(selected) - 1, -1, -1): if selected[index].severity == severity: return index return None def _sort_key(self, finding: Finding) -> tuple[int, int, str, int]: test_file_penalty = 1 if self._is_test_file(finding.file_path) and finding.severity != Severity.critical else 0 return (SEVERITY_ORDER[finding.severity], test_file_penalty, finding.file_path, finding.line_start) def _is_test_file(self, file_path: str) -> bool: normalized = file_path.lower().replace("\\", "/") return "/test" in normalized or normalized.startswith("test") or "_test." in normalized def _category_for(self, finding: Finding) -> str: if finding.category: return finding.category return AGENT_CATEGORY_DEFAULTS.get(finding.agent_source, finding.agent_source.replace(" Agent", "").lower()) def _category_summary(self, findings: list[Finding]) -> dict[str, int]: summary: dict[str, int] = {} for finding in findings: category = self._category_for(finding) summary[category] = summary.get(category, 0) + 1 return dict(sorted(summary.items(), key=lambda item: (-item[1], item[0]))) def _compute_scores(self, findings: list[Finding]) -> tuple[int, int]: security_penalties: dict[str, int] = {} production_penalties: dict[str, int] = {} for finding in findings: category = self._category_for(finding) if category in SECURITY_CATEGORIES or finding.agent_source in { "Security Agent", "Config Agent", "Dependency Agent", "CUDA-to-ROCm Agent", }: security_penalties[category] = security_penalties.get(category, 0) + SECURITY_WEIGHTS[finding.severity] if category in PRODUCTION_CATEGORIES or finding.agent_source in { "Performance Agent", "Quality Agent", "Docs Agent", "Error Handling Agent", "Observability Agent", }: production_penalties[category] = ( production_penalties.get(category, 0) + PRODUCTION_WEIGHTS[finding.severity] ) security_penalty = sum(min(value, MAX_SECURITY_CATEGORY_PENALTY) for value in security_penalties.values()) production_penalty = sum(min(value, MAX_PRODUCTION_CATEGORY_PENALTY) for value in production_penalties.values()) return self._score_from_penalty(security_penalty), self._score_from_penalty(production_penalty) def _score_from_penalty(self, penalty: int) -> int: if penalty <= 0: return 100 return max(1, round(10000 / (100 + penalty))) def _build_roadmap(self, findings: list[Finding]) -> dict[str, list[dict[str, str]]]: critical = [finding for finding in findings if finding.severity == Severity.critical] high = [finding for finding in findings if finding.severity == Severity.high] medium = [finding for finding in findings if finding.severity == Severity.medium] low = [finding for finding in findings if finding.severity == Severity.low] this_week = critical + high[:5] next_sprint = high[5:] + medium[:10] backlog = medium[10:] + low return { "this_week": [self._roadmap_item(finding) for finding in this_week], "next_sprint": [self._roadmap_item(finding) for finding in next_sprint], "backlog": [self._roadmap_item(finding) for finding in backlog], } def _roadmap_item(self, finding: Finding) -> dict[str, str]: return { "title": finding.title, "severity": finding.severity.value, "category": self._category_for(finding), "file_path": finding.file_path, "line_start": str(finding.line_start), "agent_source": finding.agent_source, }