File size: 5,089 Bytes
3e631d0
 
 
 
 
 
 
 
 
a3ecd30
 
 
 
 
 
3e631d0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import re

from app.schemas import AgentOutput, CodeChunk, Finding, Severity


REQUEST_WITHOUT_TIMEOUT = re.compile(r"\brequests\.(get|post|put|patch|delete)\s*\((?!.*\btimeout\s*=)")
SYNC_FS_JS = re.compile(r"\b(readFileSync|writeFileSync|readdirSync|statSync)\s*\(")
PYTHON_LOOP = re.compile(r"^(\s*)(for|while)\b")
PYTHON_FILE_READ = re.compile(r"\b(open\s*\(|Path\s*\([^)]*\)\.read_(text|bytes)\s*\()")


class PerformanceAgent:
    name = "Performance Agent"

    async def analyze(self, chunks: list[CodeChunk]) -> AgentOutput:
        findings: list[Finding] = []
        for chunk in chunks:
            findings.extend(self._scan_chunk(chunk))

        return AgentOutput(
            agent_name=self.name,
            findings=findings,
            metadata={"chunks_scanned": len(chunks), "mode": "static-rules"},
        )

    def _scan_chunk(self, chunk: CodeChunk) -> list[Finding]:
        findings: list[Finding] = []
        lines = chunk.content.splitlines()
        loop_stack: list[int] = []
        async_indent_stack: list[int] = []

        for offset, line in enumerate(lines):
            actual_line = chunk.line_start + offset
            stripped = line.strip()
            indent = len(line) - len(line.lstrip(" "))

            loop_stack = [loop_indent for loop_indent in loop_stack if indent > loop_indent]
            async_indent_stack = [async_indent for async_indent in async_indent_stack if indent > async_indent]

            if stripped.startswith("async def "):
                async_indent_stack.append(indent)

            loop_match = PYTHON_LOOP.match(line)
            if loop_match:
                if loop_stack:
                    findings.append(
                        self._finding(
                            "Nested loop may become expensive",
                            Severity.low,
                            chunk,
                            actual_line,
                            "A loop nested inside another loop can turn small inputs into slow O(n^2) work.",
                            "Consider indexing data with a dictionary/set, batching work, or documenting why nested iteration is bounded.",
                        )
                    )
                loop_stack.append(len(loop_match.group(1)))

            if REQUEST_WITHOUT_TIMEOUT.search(line):
                findings.append(
                    self._finding(
                        "HTTP request without timeout",
                        Severity.medium,
                        chunk,
                        actual_line,
                        "Network calls without timeouts can hang workers and make the app appear frozen under bad network conditions.",
                        "Pass an explicit timeout, for example requests.get(url, timeout=10).",
                    )
                )

            if async_indent_stack and "time.sleep(" in line:
                findings.append(
                    self._finding(
                        "Blocking sleep inside async function",
                        Severity.medium,
                        chunk,
                        actual_line,
                        "time.sleep blocks the event loop, delaying unrelated async work.",
                        "Use await asyncio.sleep(...) inside async functions.",
                    )
                )

            if loop_stack and PYTHON_FILE_READ.search(line):
                findings.append(
                    self._finding(
                        "File read inside loop",
                        Severity.low,
                        chunk,
                        actual_line,
                        "Repeated disk reads inside loops can dominate runtime and slow audits on larger inputs.",
                        "Read once before the loop, cache results, or stream data deliberately.",
                    )
                )

            if SYNC_FS_JS.search(line):
                findings.append(
                    self._finding(
                        "Synchronous filesystem call",
                        Severity.low,
                        chunk,
                        actual_line,
                        "Synchronous filesystem APIs block the Node.js event loop and can hurt request latency.",
                        "Use async fs.promises APIs or move blocking work outside latency-sensitive paths.",
                    )
                )

        return findings

    def _finding(
        self,
        title: str,
        severity: Severity,
        chunk: CodeChunk,
        line_number: int,
        description: str,
        suggested_fix: str,
    ) -> Finding:
        return Finding(
            title=title,
            severity=severity,
            file_path=chunk.file_path,
            line_start=line_number,
            line_end=line_number,
            description=description,
            why_it_matters="Performance issues in hot paths can increase latency, resource usage, and demo analysis time.",
            suggested_fix=suggested_fix,
            agent_source=self.name,
        )