Pranoy Mukherjee commited on
Commit
de6f25f
·
1 Parent(s): 3e631d0

Add performance and quality audit agents

Browse files
README.md CHANGED
@@ -7,7 +7,7 @@ AI-powered multi-agent code auditing for GitHub repositories. Paste a public Git
7
  SwarmAudit currently runs with a mock-first LLM interface so the demo is not blocked by ROCm, vLLM, or AMD MI300X setup. The current graph is:
8
 
9
  ```text
10
- GitHub URL -> Crawler -> Chunker -> [Security Agent + Performance Agent] -> Synthesizer -> Report
11
  ```
12
 
13
  ## Quick Start
@@ -62,6 +62,7 @@ Each finding includes:
62
 
63
  - Security Agent: flags hardcoded secrets, disabled TLS verification, and dynamic code execution.
64
  - Performance Agent: flags HTTP calls without timeouts, blocking sleep inside async functions, nested loops, file reads in loops, and synchronous Node.js filesystem calls.
 
65
  - Synthesizer Agent: deduplicates findings, sorts by severity, and builds the final report.
66
 
67
  ## Tests
 
7
  SwarmAudit currently runs with a mock-first LLM interface so the demo is not blocked by ROCm, vLLM, or AMD MI300X setup. The current graph is:
8
 
9
  ```text
10
+ GitHub URL -> Crawler -> Chunker -> [Security Agent + Performance Agent + Quality Agent] -> Synthesizer -> Report
11
  ```
12
 
13
  ## Quick Start
 
62
 
63
  - Security Agent: flags hardcoded secrets, disabled TLS verification, and dynamic code execution.
64
  - Performance Agent: flags HTTP calls without timeouts, blocking sleep inside async functions, nested loops, file reads in loops, and synchronous Node.js filesystem calls.
65
+ - Quality Agent: flags long functions, high branch density, large source sections, unresolved TODO/FIXME/HACK comments, and very short symbol names.
66
  - Synthesizer Agent: deduplicates findings, sorts by severity, and builds the final report.
67
 
68
  ## Tests
app/agents/graph.py CHANGED
@@ -5,6 +5,7 @@ from typing import Annotated, TypedDict
5
  from langgraph.graph import END, StateGraph
6
 
7
  from app.agents.performance_agent import PerformanceAgent
 
8
  from app.agents.security_agent import SecurityAgent
9
  from app.agents.synthesizer_agent import SynthesizerAgent
10
  from app.config import Settings, get_settings
@@ -20,6 +21,7 @@ class AuditState(TypedDict, total=False):
20
  chunks: list[CodeChunk]
21
  security_output: AgentOutput
22
  performance_output: AgentOutput
 
23
  report: AuditReport
24
  progress: Annotated[list[str], add]
25
 
@@ -32,6 +34,7 @@ class AuditGraph:
32
  self.llm_client = LLMClient(self.settings)
33
  self.security_agent = SecurityAgent(self.llm_client)
34
  self.performance_agent = PerformanceAgent()
 
35
  self.synthesizer = SynthesizerAgent()
36
  self.graph = self._build_graph()
37
 
@@ -41,12 +44,14 @@ class AuditGraph:
41
  graph.add_node("chunk", self._chunk)
42
  graph.add_node("security", self._security)
43
  graph.add_node("performance", self._performance)
 
44
  graph.add_node("synthesize", self._synthesize)
45
  graph.set_entry_point("crawl")
46
  graph.add_edge("crawl", "chunk")
47
  graph.add_edge("chunk", "security")
48
  graph.add_edge("chunk", "performance")
49
- graph.add_edge(["security", "performance"], "synthesize")
 
50
  graph.add_edge("synthesize", END)
51
  return graph.compile()
52
 
@@ -73,8 +78,12 @@ class AuditGraph:
73
  performance_output = await self.performance_agent.analyze(chunks)
74
  yield f"Performance Agent: found {len(performance_output.findings)} findings."
75
 
 
 
 
 
76
  yield "Synthesizer Agent: ranking findings and formatting report..."
77
- report = await self.synthesizer.synthesize(repo, [security_output, performance_output])
78
  yield "Synthesizer Agent: final report generated."
79
  yield report
80
  finally:
@@ -96,10 +105,14 @@ class AuditGraph:
96
  output = await self.performance_agent.analyze(state["chunks"])
97
  return {"performance_output": output, "progress": [f"Performance Agent: found {len(output.findings)} findings."]}
98
 
 
 
 
 
99
  async def _synthesize(self, state: AuditState) -> AuditState:
100
  report = await self.synthesizer.synthesize(
101
  state["repo"],
102
- [state["security_output"], state["performance_output"]],
103
  )
104
  self.crawler.cleanup(state["repo"])
105
  return {"report": report, "progress": ["Synthesizer Agent: final report generated."]}
 
5
  from langgraph.graph import END, StateGraph
6
 
7
  from app.agents.performance_agent import PerformanceAgent
8
+ from app.agents.quality_agent import QualityAgent
9
  from app.agents.security_agent import SecurityAgent
10
  from app.agents.synthesizer_agent import SynthesizerAgent
11
  from app.config import Settings, get_settings
 
21
  chunks: list[CodeChunk]
22
  security_output: AgentOutput
23
  performance_output: AgentOutput
24
+ quality_output: AgentOutput
25
  report: AuditReport
26
  progress: Annotated[list[str], add]
27
 
 
34
  self.llm_client = LLMClient(self.settings)
35
  self.security_agent = SecurityAgent(self.llm_client)
36
  self.performance_agent = PerformanceAgent()
37
+ self.quality_agent = QualityAgent()
38
  self.synthesizer = SynthesizerAgent()
39
  self.graph = self._build_graph()
40
 
 
44
  graph.add_node("chunk", self._chunk)
45
  graph.add_node("security", self._security)
46
  graph.add_node("performance", self._performance)
47
+ graph.add_node("quality", self._quality)
48
  graph.add_node("synthesize", self._synthesize)
49
  graph.set_entry_point("crawl")
50
  graph.add_edge("crawl", "chunk")
51
  graph.add_edge("chunk", "security")
52
  graph.add_edge("chunk", "performance")
53
+ graph.add_edge("chunk", "quality")
54
+ graph.add_edge(["security", "performance", "quality"], "synthesize")
55
  graph.add_edge("synthesize", END)
56
  return graph.compile()
57
 
 
78
  performance_output = await self.performance_agent.analyze(chunks)
79
  yield f"Performance Agent: found {len(performance_output.findings)} findings."
80
 
81
+ yield "Quality Agent: scanning maintainability signals..."
82
+ quality_output = await self.quality_agent.analyze(chunks)
83
+ yield f"Quality Agent: found {len(quality_output.findings)} findings."
84
+
85
  yield "Synthesizer Agent: ranking findings and formatting report..."
86
+ report = await self.synthesizer.synthesize(repo, [security_output, performance_output, quality_output])
87
  yield "Synthesizer Agent: final report generated."
88
  yield report
89
  finally:
 
105
  output = await self.performance_agent.analyze(state["chunks"])
106
  return {"performance_output": output, "progress": [f"Performance Agent: found {len(output.findings)} findings."]}
107
 
108
+ async def _quality(self, state: AuditState) -> AuditState:
109
+ output = await self.quality_agent.analyze(state["chunks"])
110
+ return {"quality_output": output, "progress": [f"Quality Agent: found {len(output.findings)} findings."]}
111
+
112
  async def _synthesize(self, state: AuditState) -> AuditState:
113
  report = await self.synthesizer.synthesize(
114
  state["repo"],
115
+ [state["security_output"], state["performance_output"], state["quality_output"]],
116
  )
117
  self.crawler.cleanup(state["repo"])
118
  return {"report": report, "progress": ["Synthesizer Agent: final report generated."]}
app/agents/quality_agent.py CHANGED
@@ -1,8 +1,195 @@
1
- from app.schemas import AgentOutput, CodeChunk
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
 
3
 
4
  class QualityAgent:
5
  name = "Quality Agent"
6
 
7
  async def analyze(self, chunks: list[CodeChunk]) -> AgentOutput:
8
- return AgentOutput(agent_name=self.name, findings=[], metadata={"chunks_scanned": len(chunks)})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import re
2
+
3
+ from app.schemas import AgentOutput, CodeChunk, Finding, Severity
4
+
5
+
6
+ PYTHON_DEF = re.compile(r"^\s*(async\s+def|def|class)\s+([A-Za-z_][A-Za-z0-9_]*)")
7
+ PYTHON_BRANCH = re.compile(r"^\s*(if|elif|for|while|except|with)\b")
8
+ JS_FUNCTION = re.compile(r"^\s*(function\s+[A-Za-z_$][\w$]*|(?:const|let|var)\s+[A-Za-z_$][\w$]*\s*=\s*(?:async\s*)?\()")
9
+ JS_BRANCH = re.compile(r"^\s*(if|else\s+if|for|while|switch|catch)\b")
10
+ TODO_COMMENT = re.compile(r"(?i)\b(TODO|FIXME|HACK)\b")
11
+
12
+
13
+ MAX_CHUNK_LINES = 300
14
+ MAX_FUNCTION_LINES = 80
15
+ MAX_BRANCHES_PER_CHUNK = 25
16
+ MIN_MEANINGFUL_NAME_LENGTH = 3
17
 
18
 
19
  class QualityAgent:
20
  name = "Quality Agent"
21
 
22
  async def analyze(self, chunks: list[CodeChunk]) -> AgentOutput:
23
+ findings: list[Finding] = []
24
+ for chunk in chunks:
25
+ findings.extend(self._scan_chunk(chunk))
26
+
27
+ return AgentOutput(
28
+ agent_name=self.name,
29
+ findings=findings,
30
+ metadata={"chunks_scanned": len(chunks), "mode": "static-rules"},
31
+ )
32
+
33
+ def _scan_chunk(self, chunk: CodeChunk) -> list[Finding]:
34
+ findings: list[Finding] = []
35
+ lines = chunk.content.splitlines()
36
+
37
+ findings.extend(self._check_large_chunk(chunk, lines))
38
+ findings.extend(self._check_long_functions(chunk, lines))
39
+ findings.extend(self._check_branch_density(chunk, lines))
40
+ findings.extend(self._check_todo_comments(chunk, lines))
41
+ findings.extend(self._check_short_names(chunk, lines))
42
+
43
+ return findings
44
+
45
+ def _check_large_chunk(self, chunk: CodeChunk, lines: list[str]) -> list[Finding]:
46
+ if len(lines) <= MAX_CHUNK_LINES:
47
+ return []
48
+
49
+ return [
50
+ self._finding(
51
+ "Large source file section",
52
+ Severity.low,
53
+ chunk,
54
+ chunk.line_start,
55
+ chunk.line_end,
56
+ "This source section is large enough to make review, testing, and future changes harder.",
57
+ "Split unrelated responsibilities into smaller modules or focused helper functions.",
58
+ )
59
+ ]
60
+
61
+ def _check_long_functions(self, chunk: CodeChunk, lines: list[str]) -> list[Finding]:
62
+ findings: list[Finding] = []
63
+ active_start: int | None = None
64
+ active_name: str | None = None
65
+ active_indent = 0
66
+
67
+ for index, line in enumerate(lines):
68
+ if not line.strip():
69
+ continue
70
+
71
+ match = self._definition_match(line)
72
+ indent = self._indent_width(line)
73
+
74
+ if active_start is not None and indent <= active_indent and match:
75
+ findings.extend(self._long_function_finding(chunk, active_name, active_start, chunk.line_start + index - 1))
76
+ active_start = None
77
+ active_name = None
78
+
79
+ if match:
80
+ active_start = chunk.line_start + index
81
+ active_name = match.group(2) if match.lastindex and match.lastindex >= 2 else "function"
82
+ active_indent = indent
83
+
84
+ if active_start is not None:
85
+ findings.extend(self._long_function_finding(chunk, active_name, active_start, chunk.line_end))
86
+
87
+ return findings
88
+
89
+ def _check_branch_density(self, chunk: CodeChunk, lines: list[str]) -> list[Finding]:
90
+ branch_count = sum(1 for line in lines if PYTHON_BRANCH.match(line) or JS_BRANCH.match(line))
91
+ if branch_count <= MAX_BRANCHES_PER_CHUNK:
92
+ return []
93
+
94
+ return [
95
+ self._finding(
96
+ "High branching complexity",
97
+ Severity.medium,
98
+ chunk,
99
+ chunk.line_start,
100
+ chunk.line_end,
101
+ f"This code section contains {branch_count} control-flow branches.",
102
+ "Extract decision-heavy logic into named helpers and add focused tests around each path.",
103
+ )
104
+ ]
105
+
106
+ def _check_todo_comments(self, chunk: CodeChunk, lines: list[str]) -> list[Finding]:
107
+ findings: list[Finding] = []
108
+ for offset, line in enumerate(lines):
109
+ if TODO_COMMENT.search(line):
110
+ line_number = chunk.line_start + offset
111
+ findings.append(
112
+ self._finding(
113
+ "Unresolved maintenance comment",
114
+ Severity.low,
115
+ chunk,
116
+ line_number,
117
+ line_number,
118
+ "A TODO/FIXME/HACK marker indicates known unfinished or fragile code.",
119
+ "Convert the comment into a tracked issue or resolve it before shipping.",
120
+ )
121
+ )
122
+ return findings
123
+
124
+ def _check_short_names(self, chunk: CodeChunk, lines: list[str]) -> list[Finding]:
125
+ findings: list[Finding] = []
126
+ for offset, line in enumerate(lines):
127
+ match = self._definition_match(line)
128
+ if not match:
129
+ continue
130
+
131
+ name = match.group(2) if match.lastindex and match.lastindex >= 2 else ""
132
+ if len(name) < MIN_MEANINGFUL_NAME_LENGTH and name not in {"id"}:
133
+ line_number = chunk.line_start + offset
134
+ findings.append(
135
+ self._finding(
136
+ "Very short symbol name",
137
+ Severity.low,
138
+ chunk,
139
+ line_number,
140
+ line_number,
141
+ f"The symbol `{name}` is short enough to make intent unclear.",
142
+ "Use a descriptive function, class, or variable name that explains the role of this code.",
143
+ )
144
+ )
145
+ return findings
146
+
147
+ def _long_function_finding(
148
+ self,
149
+ chunk: CodeChunk,
150
+ name: str | None,
151
+ line_start: int,
152
+ line_end: int,
153
+ ) -> list[Finding]:
154
+ if line_end - line_start + 1 <= MAX_FUNCTION_LINES:
155
+ return []
156
+
157
+ return [
158
+ self._finding(
159
+ "Long function or class body",
160
+ Severity.medium,
161
+ chunk,
162
+ line_start,
163
+ line_end,
164
+ f"`{name or 'This symbol'}` spans more than {MAX_FUNCTION_LINES} lines.",
165
+ "Extract cohesive helper functions and keep each function centered on one responsibility.",
166
+ )
167
+ ]
168
+
169
+ def _definition_match(self, line: str) -> re.Match[str] | None:
170
+ return PYTHON_DEF.match(line) or JS_FUNCTION.match(line)
171
+
172
+ def _indent_width(self, line: str) -> int:
173
+ return len(line) - len(line.lstrip(" "))
174
+
175
+ def _finding(
176
+ self,
177
+ title: str,
178
+ severity: Severity,
179
+ chunk: CodeChunk,
180
+ line_start: int,
181
+ line_end: int,
182
+ description: str,
183
+ suggested_fix: str,
184
+ ) -> Finding:
185
+ return Finding(
186
+ title=title,
187
+ severity=severity,
188
+ file_path=chunk.file_path,
189
+ line_start=line_start,
190
+ line_end=line_end,
191
+ description=description,
192
+ why_it_matters="Maintainable code is easier to review, test, debug, and safely extend during a hackathon demo.",
193
+ suggested_fix=suggested_fix,
194
+ agent_source=self.name,
195
+ )
tests/test_graph_progress.py CHANGED
@@ -23,7 +23,9 @@ async def test_run_with_progress_yields_real_stages_and_report(tmp_path: Path):
23
  assert any("Crawler Agent" in event for event in events if isinstance(event, str))
24
  assert any("Security Agent" in event for event in events if isinstance(event, str))
25
  assert any("Performance Agent" in event for event in events if isinstance(event, str))
 
26
  assert isinstance(events[-1], AuditReport)
27
  assert len(events[-1].findings) == 2
28
  assert "Security Agent" in events[-1].agents_run
29
  assert "Performance Agent" in events[-1].agents_run
 
 
23
  assert any("Crawler Agent" in event for event in events if isinstance(event, str))
24
  assert any("Security Agent" in event for event in events if isinstance(event, str))
25
  assert any("Performance Agent" in event for event in events if isinstance(event, str))
26
+ assert any("Quality Agent" in event for event in events if isinstance(event, str))
27
  assert isinstance(events[-1], AuditReport)
28
  assert len(events[-1].findings) == 2
29
  assert "Security Agent" in events[-1].agents_run
30
  assert "Performance Agent" in events[-1].agents_run
31
+ assert "Quality Agent" in events[-1].agents_run
tests/test_quality_agent.py ADDED
@@ -0,0 +1,57 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pytest
2
+
3
+ from app.agents.quality_agent import QualityAgent
4
+ from app.schemas import CodeChunk, Severity
5
+
6
+
7
+ @pytest.mark.anyio
8
+ async def test_quality_agent_flags_todo_comments():
9
+ chunk = CodeChunk(
10
+ file_path="service.py",
11
+ language="Python",
12
+ line_start=12,
13
+ line_end=12,
14
+ content="# TODO: handle retry failures",
15
+ )
16
+
17
+ output = await QualityAgent().analyze([chunk])
18
+
19
+ assert output.findings[0].title == "Unresolved maintenance comment"
20
+ assert output.findings[0].severity == Severity.low
21
+ assert output.findings[0].line_start == 12
22
+
23
+
24
+ @pytest.mark.anyio
25
+ async def test_quality_agent_flags_high_branch_density():
26
+ lines = [f"if condition_{index}:" for index in range(30)]
27
+ chunk = CodeChunk(
28
+ file_path="rules.py",
29
+ language="Python",
30
+ line_start=1,
31
+ line_end=len(lines),
32
+ content="\n".join(lines),
33
+ )
34
+
35
+ output = await QualityAgent().analyze([chunk])
36
+
37
+ assert output.findings[0].title == "High branching complexity"
38
+ assert output.findings[0].severity == Severity.medium
39
+
40
+
41
+ @pytest.mark.anyio
42
+ async def test_quality_agent_flags_long_function():
43
+ lines = ["def process_everything():"]
44
+ lines.extend(f" value_{index} = {index}" for index in range(85))
45
+ chunk = CodeChunk(
46
+ file_path="processor.py",
47
+ language="Python",
48
+ line_start=30,
49
+ line_end=30 + len(lines) - 1,
50
+ content="\n".join(lines),
51
+ )
52
+
53
+ output = await QualityAgent().analyze([chunk])
54
+
55
+ assert output.findings[0].title == "Long function or class body"
56
+ assert output.findings[0].severity == Severity.medium
57
+ assert output.findings[0].line_start == 30