| from app.schemas import AgentOutput, AuditReport, Finding, RepoScanResult, Severity |
|
|
|
|
| SEVERITY_ORDER = { |
| Severity.critical: 0, |
| Severity.high: 1, |
| Severity.medium: 2, |
| Severity.low: 3, |
| } |
|
|
| MAX_DISPLAY_FINDINGS = 40 |
| MAX_DISPLAY_FINDINGS_BY_AGENT = { |
| "Security Agent": 20, |
| "Performance Agent": 12, |
| "Quality Agent": 10, |
| "Docs Agent": 12, |
| } |
| MAX_DISPLAY_BY_SEVERITY = { |
| Severity.critical: None, |
| Severity.high: 30, |
| Severity.medium: 18, |
| Severity.low: 12, |
| } |
|
|
| SECURITY_CATEGORIES = { |
| "security", |
| "config", |
| "dependency", |
| "cuda_migration", |
| } |
|
|
| PRODUCTION_CATEGORIES = { |
| "performance", |
| "quality", |
| "docs", |
| "error_handling", |
| "observability", |
| } |
|
|
| AGENT_CATEGORY_DEFAULTS = { |
| "Security Agent": "security", |
| "Config Agent": "config", |
| "Dependency Agent": "dependency", |
| "CUDA-to-ROCm Agent": "cuda_migration", |
| "Performance Agent": "performance", |
| "Quality Agent": "quality", |
| "Docs Agent": "docs", |
| "Error Handling Agent": "error_handling", |
| "Observability Agent": "observability", |
| } |
|
|
| SECURITY_WEIGHTS = { |
| Severity.critical: 24, |
| Severity.high: 12, |
| Severity.medium: 5, |
| Severity.low: 1, |
| } |
|
|
| PRODUCTION_WEIGHTS = { |
| Severity.critical: 16, |
| Severity.high: 9, |
| Severity.medium: 4, |
| Severity.low: 1, |
| } |
|
|
| MAX_SECURITY_CATEGORY_PENALTY = 35 |
| MAX_PRODUCTION_CATEGORY_PENALTY = 28 |
|
|
|
|
| class SynthesizerAgent: |
| name = "Synthesizer Agent" |
|
|
| async def synthesize(self, repo: RepoScanResult, outputs: list[AgentOutput]) -> AuditReport: |
| all_findings = self._dedupe([finding for output in outputs for finding in output.findings]) |
| all_findings.sort(key=self._sort_key) |
|
|
| summary = {severity: 0 for severity in Severity} |
| for finding in all_findings: |
| summary[finding.severity] += 1 |
|
|
| agent_counts = {output.agent_name: len(output.findings) for output in outputs} |
| display_findings, hidden_count, warnings = self._select_display_findings(all_findings, agent_counts) |
| category_summary = self._category_summary(all_findings) |
| security_score, production_score = self._compute_scores(all_findings) |
| roadmap = self._build_roadmap(all_findings) |
| dependency_cves = [ |
| cve |
| for output in outputs |
| for cve in output.metadata.get("dependency_cves", []) |
| if isinstance(cve, dict) |
| ] |
| dependency_warnings = [ |
| warning |
| for output in outputs |
| for warning in output.metadata.get("warnings", []) |
| if isinstance(warning, str) |
| ] |
|
|
| return AuditReport( |
| repo_url=repo.repo_url, |
| scanned_file_count=len(repo.files), |
| skipped_file_count=repo.skipped_files, |
| findings=display_findings, |
| severity_summary=summary, |
| total_findings_count=len(all_findings), |
| displayed_findings_count=len(display_findings), |
| hidden_findings_count=hidden_count, |
| agent_finding_counts=agent_counts, |
| category_summary=category_summary, |
| security_score=security_score, |
| production_score=production_score, |
| remediation_roadmap=roadmap, |
| dependency_cves=dependency_cves, |
| agents_run=[output.agent_name for output in outputs] + [self.name], |
| warnings=repo.warnings + dependency_warnings + warnings, |
| ) |
|
|
| def _dedupe(self, findings: list[Finding]) -> list[Finding]: |
| seen: set[tuple[str, int, str, str]] = set() |
| unique: list[Finding] = [] |
| for finding in findings: |
| key = (finding.file_path, finding.line_start, finding.title, finding.agent_source) |
| if key in seen: |
| continue |
| seen.add(key) |
| unique.append(finding) |
| return unique |
|
|
| def _select_display_findings( |
| self, |
| findings: list[Finding], |
| agent_counts: dict[str, int], |
| ) -> tuple[list[Finding], int, list[str]]: |
| selected: list[Finding] = [] |
| selected_by_agent = {agent_name: 0 for agent_name in agent_counts} |
| selected_by_severity = {severity: 0 for severity in Severity} |
|
|
| for finding in findings: |
| agent_limit = MAX_DISPLAY_FINDINGS_BY_AGENT.get(finding.agent_source, MAX_DISPLAY_FINDINGS) |
| severity_limit = MAX_DISPLAY_BY_SEVERITY[finding.severity] |
| if severity_limit is not None and selected_by_severity[finding.severity] >= severity_limit: |
| continue |
| if selected_by_agent.get(finding.agent_source, 0) >= agent_limit: |
| continue |
| if len(selected) >= MAX_DISPLAY_FINDINGS: |
| break |
| selected.append(finding) |
| selected_by_agent[finding.agent_source] = selected_by_agent.get(finding.agent_source, 0) + 1 |
| selected_by_severity[finding.severity] += 1 |
|
|
| if not any(finding.severity == Severity.low for finding in selected): |
| low_findings = [finding for finding in findings if finding.severity == Severity.low] |
| low_slots = MAX_DISPLAY_BY_SEVERITY[Severity.low] or 0 |
| for finding in low_findings[:low_slots]: |
| if finding in selected: |
| continue |
| if len(selected) >= MAX_DISPLAY_FINDINGS: |
| replace_index = self._replaceable_display_index(selected) |
| if replace_index is None: |
| break |
| replaced = selected[replace_index] |
| selected_by_agent[replaced.agent_source] = max( |
| 0, |
| selected_by_agent.get(replaced.agent_source, 0) - 1, |
| ) |
| selected_by_severity[replaced.severity] = max( |
| 0, |
| selected_by_severity[replaced.severity] - 1, |
| ) |
| selected[replace_index] = finding |
| else: |
| selected.append(finding) |
| selected_by_agent[finding.agent_source] = selected_by_agent.get(finding.agent_source, 0) + 1 |
| selected_by_severity[finding.severity] += 1 |
|
|
| selected.sort(key=self._sort_key) |
| hidden_count = max(0, len(findings) - len(selected)) |
| warnings: list[str] = [] |
| if hidden_count: |
| warnings.append( |
| f"Report display prioritized {len(selected)} of {len(findings)} findings; " |
| f"{hidden_count} lower-priority findings are hidden from the demo report." |
| ) |
|
|
| for agent_name, total_count in agent_counts.items(): |
| displayed_count = selected_by_agent.get(agent_name, 0) |
| hidden_for_agent = total_count - displayed_count |
| if hidden_for_agent > 0: |
| warnings.append(f"{agent_name}: displaying {displayed_count} of {total_count} findings.") |
|
|
| return selected, hidden_count, warnings |
|
|
| def _replaceable_display_index(self, selected: list[Finding]) -> int | None: |
| for severity in (Severity.low, Severity.medium): |
| for index in range(len(selected) - 1, -1, -1): |
| if selected[index].severity == severity: |
| return index |
| return None |
|
|
| def _sort_key(self, finding: Finding) -> tuple[int, int, str, int]: |
| test_file_penalty = 1 if self._is_test_file(finding.file_path) and finding.severity != Severity.critical else 0 |
| return (SEVERITY_ORDER[finding.severity], test_file_penalty, finding.file_path, finding.line_start) |
|
|
| def _is_test_file(self, file_path: str) -> bool: |
| normalized = file_path.lower().replace("\\", "/") |
| return "/test" in normalized or normalized.startswith("test") or "_test." in normalized |
|
|
| def _category_for(self, finding: Finding) -> str: |
| if finding.category: |
| return finding.category |
| return AGENT_CATEGORY_DEFAULTS.get(finding.agent_source, finding.agent_source.replace(" Agent", "").lower()) |
|
|
| def _category_summary(self, findings: list[Finding]) -> dict[str, int]: |
| summary: dict[str, int] = {} |
| for finding in findings: |
| category = self._category_for(finding) |
| summary[category] = summary.get(category, 0) + 1 |
| return dict(sorted(summary.items(), key=lambda item: (-item[1], item[0]))) |
|
|
| def _compute_scores(self, findings: list[Finding]) -> tuple[int, int]: |
| security_penalties: dict[str, int] = {} |
| production_penalties: dict[str, int] = {} |
|
|
| for finding in findings: |
| category = self._category_for(finding) |
| if category in SECURITY_CATEGORIES or finding.agent_source in { |
| "Security Agent", |
| "Config Agent", |
| "Dependency Agent", |
| "CUDA-to-ROCm Agent", |
| }: |
| security_penalties[category] = security_penalties.get(category, 0) + SECURITY_WEIGHTS[finding.severity] |
| if category in PRODUCTION_CATEGORIES or finding.agent_source in { |
| "Performance Agent", |
| "Quality Agent", |
| "Docs Agent", |
| "Error Handling Agent", |
| "Observability Agent", |
| }: |
| production_penalties[category] = ( |
| production_penalties.get(category, 0) + PRODUCTION_WEIGHTS[finding.severity] |
| ) |
|
|
| security_penalty = sum(min(value, MAX_SECURITY_CATEGORY_PENALTY) for value in security_penalties.values()) |
| production_penalty = sum(min(value, MAX_PRODUCTION_CATEGORY_PENALTY) for value in production_penalties.values()) |
| return self._score_from_penalty(security_penalty), self._score_from_penalty(production_penalty) |
|
|
| def _score_from_penalty(self, penalty: int) -> int: |
| if penalty <= 0: |
| return 100 |
| return max(1, round(10000 / (100 + penalty))) |
|
|
| def _build_roadmap(self, findings: list[Finding]) -> dict[str, list[dict[str, str]]]: |
| critical = [finding for finding in findings if finding.severity == Severity.critical] |
| high = [finding for finding in findings if finding.severity == Severity.high] |
| medium = [finding for finding in findings if finding.severity == Severity.medium] |
| low = [finding for finding in findings if finding.severity == Severity.low] |
|
|
| this_week = critical + high[:5] |
| next_sprint = high[5:] + medium[:10] |
| backlog = medium[10:] + low |
|
|
| return { |
| "this_week": [self._roadmap_item(finding) for finding in this_week], |
| "next_sprint": [self._roadmap_item(finding) for finding in next_sprint], |
| "backlog": [self._roadmap_item(finding) for finding in backlog], |
| } |
|
|
| def _roadmap_item(self, finding: Finding) -> dict[str, str]: |
| return { |
| "title": finding.title, |
| "severity": finding.severity.value, |
| "category": self._category_for(finding), |
| "file_path": finding.file_path, |
| "line_start": str(finding.line_start), |
| "agent_source": finding.agent_source, |
| } |
|
|