Pranoy Mukherjee commited on
Commit
a3ecd30
·
1 Parent(s): 3c067f0

Add crawler, security agent, API, and Gradio MVP

Browse files
.env.example ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ APP_NAME=SwarmAudit
2
+ LLM_PROVIDER=mock
3
+ LLM_BASE_URL=http://localhost:8000/v1
4
+ LLM_API_KEY=not-needed-for-mock
5
+ LLM_MODEL=Qwen/Qwen2.5-Coder-32B-Instruct
6
+ MAX_FILES=200
7
+ MAX_FILE_SIZE_KB=250
8
+ MAX_CHARS_PER_CHUNK=12000
9
+ CLONE_TIMEOUT_SECONDS=60
.gitignore CHANGED
@@ -206,6 +206,10 @@ tempCodeRunnerFile.py
206
  # Ruff stuff:
207
  .ruff_cache/
208
 
 
 
 
 
209
  # PyPI configuration file
210
  .pypirc
211
 
 
206
  # Ruff stuff:
207
  .ruff_cache/
208
 
209
+ # SwarmAudit local test artifacts
210
+ .tmp_pytest/
211
+ pytest-cache-files-*
212
+
213
  # PyPI configuration file
214
  .pypirc
215
 
README.md CHANGED
@@ -1,2 +1,65 @@
1
  # SwarmAudit
2
- AI-powered multi agent code auditing for GitHub repositories. Detect vulnerabilities, performance bottlenecks, and architecture issues in minutes.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  # SwarmAudit
2
+
3
+ AI-powered multi-agent code auditing for GitHub repositories. Paste a public GitHub URL and get a structured audit report with severity, file references, and suggested fixes.
4
+
5
+ ## MVP
6
+
7
+ SwarmAudit currently runs with a mock-first LLM interface so the demo is not blocked by ROCm, vLLM, or AMD MI300X setup. The first graph is:
8
+
9
+ ```text
10
+ GitHub URL -> Crawler -> Chunker -> Security Agent -> Synthesizer -> Report
11
+ ```
12
+
13
+ ## Quick Start
14
+
15
+ ```bash
16
+ python -m venv .venv
17
+ .venv\Scripts\activate
18
+ pip install -r requirements.txt
19
+ ```
20
+
21
+ Run the FastAPI backend:
22
+
23
+ ```bash
24
+ uvicorn app.main:app --reload
25
+ ```
26
+
27
+ Health check:
28
+
29
+ ```bash
30
+ curl http://127.0.0.1:8000/health
31
+ ```
32
+
33
+ Run the Gradio demo:
34
+
35
+ ```bash
36
+ python -m app.ui.gradio_app
37
+ ```
38
+
39
+ ## Configuration
40
+
41
+ Copy `.env.example` to `.env` for local overrides. Default inference mode is:
42
+
43
+ ```text
44
+ LLM_PROVIDER=mock
45
+ ```
46
+
47
+ Later, set `LLM_PROVIDER=vllm` and point `LLM_BASE_URL` at an OpenAI-compatible vLLM endpoint running Qwen2.5-Coder.
48
+
49
+ ## Report Schema
50
+
51
+ Each finding includes:
52
+
53
+ - title
54
+ - severity: CRITICAL, HIGH, MEDIUM, LOW
55
+ - file path and line range
56
+ - description
57
+ - why it matters
58
+ - suggested fix
59
+ - agent source
60
+
61
+ ## Tests
62
+
63
+ ```bash
64
+ pytest
65
+ ```
app/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ """SwarmAudit application package."""
app/agents/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ """Agent implementations for SwarmAudit."""
app/agents/docs_agent.py ADDED
@@ -0,0 +1,8 @@
 
 
 
 
 
 
 
 
 
1
+ from app.schemas import AgentOutput, CodeChunk
2
+
3
+
4
+ class DocsAgent:
5
+ name = "Docs Agent"
6
+
7
+ async def analyze(self, chunks: list[CodeChunk]) -> AgentOutput:
8
+ return AgentOutput(agent_name=self.name, findings=[], metadata={"chunks_scanned": len(chunks)})
app/agents/graph.py ADDED
@@ -0,0 +1,92 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from collections.abc import AsyncIterator
2
+ from typing import TypedDict
3
+
4
+ from langgraph.graph import END, StateGraph
5
+
6
+ from app.agents.security_agent import SecurityAgent
7
+ from app.agents.synthesizer_agent import SynthesizerAgent
8
+ from app.config import Settings, get_settings
9
+ from app.schemas import AgentOutput, AuditReport, CodeChunk, RepoScanResult
10
+ from app.services.chunker import Chunker
11
+ from app.services.llm_client import LLMClient
12
+ from app.services.repo_crawler import RepoCrawler
13
+
14
+
15
+ class AuditState(TypedDict, total=False):
16
+ repo_url: str
17
+ repo: RepoScanResult
18
+ chunks: list[CodeChunk]
19
+ security_output: AgentOutput
20
+ report: AuditReport
21
+ progress: list[str]
22
+
23
+
24
+ class AuditGraph:
25
+ def __init__(self, settings: Settings | None = None):
26
+ self.settings = settings or get_settings()
27
+ self.crawler = RepoCrawler(self.settings)
28
+ self.chunker = Chunker(self.settings)
29
+ self.llm_client = LLMClient(self.settings)
30
+ self.security_agent = SecurityAgent(self.llm_client)
31
+ self.synthesizer = SynthesizerAgent()
32
+ self.graph = self._build_graph()
33
+
34
+ def _build_graph(self):
35
+ graph = StateGraph(AuditState)
36
+ graph.add_node("crawl", self._crawl)
37
+ graph.add_node("chunk", self._chunk)
38
+ graph.add_node("security", self._security)
39
+ graph.add_node("synthesize", self._synthesize)
40
+ graph.set_entry_point("crawl")
41
+ graph.add_edge("crawl", "chunk")
42
+ graph.add_edge("chunk", "security")
43
+ graph.add_edge("security", "synthesize")
44
+ graph.add_edge("synthesize", END)
45
+ return graph.compile()
46
+
47
+ async def run(self, repo_url: str) -> AuditReport:
48
+ result = await self.graph.ainvoke({"repo_url": repo_url, "progress": []})
49
+ return result["report"]
50
+
51
+ async def run_with_progress(self, repo_url: str) -> AsyncIterator[str | AuditReport]:
52
+ repo: RepoScanResult | None = None
53
+ yield "Crawler Agent: cloning and mapping repository..."
54
+ repo = self.crawler.clone_and_scan(repo_url)
55
+ yield f"Crawler Agent: mapped {len(repo.files)} files and skipped {repo.skipped_files}."
56
+
57
+ try:
58
+ yield "Chunker: filtering source files and creating chunks..."
59
+ chunks = self.chunker.chunk_files(repo.files)
60
+ yield f"Chunker: created {len(chunks)} code chunks."
61
+
62
+ yield "Security Agent: scanning for risky patterns..."
63
+ security_output = await self.security_agent.analyze(chunks)
64
+ yield f"Security Agent: found {len(security_output.findings)} findings."
65
+
66
+ yield "Synthesizer Agent: ranking findings and formatting report..."
67
+ report = await self.synthesizer.synthesize(repo, [security_output])
68
+ yield "Synthesizer Agent: final report generated."
69
+ yield report
70
+ finally:
71
+ self.crawler.cleanup(repo)
72
+
73
+ async def _crawl(self, state: AuditState) -> AuditState:
74
+ repo = self.crawler.clone_and_scan(state["repo_url"])
75
+ progress = state.get("progress", []) + [f"Crawler Agent: mapped {len(repo.files)} files."]
76
+ return {"repo": repo, "progress": progress}
77
+
78
+ async def _chunk(self, state: AuditState) -> AuditState:
79
+ chunks = self.chunker.chunk_files(state["repo"].files)
80
+ progress = state.get("progress", []) + [f"Chunker: created {len(chunks)} code chunks."]
81
+ return {"chunks": chunks, "progress": progress}
82
+
83
+ async def _security(self, state: AuditState) -> AuditState:
84
+ output = await self.security_agent.analyze(state["chunks"])
85
+ progress = state.get("progress", []) + [f"Security Agent: found {len(output.findings)} findings."]
86
+ return {"security_output": output, "progress": progress}
87
+
88
+ async def _synthesize(self, state: AuditState) -> AuditState:
89
+ report = await self.synthesizer.synthesize(state["repo"], [state["security_output"]])
90
+ progress = state.get("progress", []) + ["Synthesizer Agent: final report generated."]
91
+ self.crawler.cleanup(state["repo"])
92
+ return {"report": report, "progress": progress}
app/agents/performance_agent.py ADDED
@@ -0,0 +1,8 @@
 
 
 
 
 
 
 
 
 
1
+ from app.schemas import AgentOutput, CodeChunk
2
+
3
+
4
+ class PerformanceAgent:
5
+ name = "Performance Agent"
6
+
7
+ async def analyze(self, chunks: list[CodeChunk]) -> AgentOutput:
8
+ return AgentOutput(agent_name=self.name, findings=[], metadata={"chunks_scanned": len(chunks)})
app/agents/quality_agent.py ADDED
@@ -0,0 +1,8 @@
 
 
 
 
 
 
 
 
 
1
+ from app.schemas import AgentOutput, CodeChunk
2
+
3
+
4
+ class QualityAgent:
5
+ name = "Quality Agent"
6
+
7
+ async def analyze(self, chunks: list[CodeChunk]) -> AgentOutput:
8
+ return AgentOutput(agent_name=self.name, findings=[], metadata={"chunks_scanned": len(chunks)})
app/agents/security_agent.py ADDED
@@ -0,0 +1,77 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import re
2
+
3
+ from app.schemas import AgentOutput, CodeChunk, Finding, Severity
4
+ from app.services.llm_client import LLMClient
5
+
6
+
7
+ SECURITY_PATTERNS = [
8
+ (
9
+ re.compile(r"(?i)(api[_-]?key|secret|token|password)\s*=\s*['\"][^'\"]{8,}['\"]"),
10
+ "Potential hardcoded secret",
11
+ Severity.high,
12
+ "A credential-like value appears to be hardcoded.",
13
+ "Move secrets into environment variables or a managed secret store.",
14
+ ),
15
+ (
16
+ re.compile(r"(?i)verify\s*=\s*False"),
17
+ "TLS certificate verification disabled",
18
+ Severity.high,
19
+ "Disabling TLS verification can allow man-in-the-middle attacks.",
20
+ "Remove verify=False and use a trusted CA bundle if needed.",
21
+ ),
22
+ (
23
+ re.compile(r"(?i)(eval|exec)\s*\("),
24
+ "Dynamic code execution",
25
+ Severity.medium,
26
+ "Dynamic execution can turn untrusted input into arbitrary code execution.",
27
+ "Replace eval/exec with explicit parsing or a constrained command map.",
28
+ ),
29
+ ]
30
+
31
+
32
+ class SecurityAgent:
33
+ name = "Security Agent"
34
+
35
+ def __init__(self, llm_client: LLMClient):
36
+ self.llm_client = llm_client
37
+
38
+ async def analyze(self, chunks: list[CodeChunk]) -> AgentOutput:
39
+ findings: list[Finding] = []
40
+
41
+ for chunk in chunks:
42
+ findings.extend(self._scan_chunk(chunk))
43
+
44
+ await self.llm_client.complete_json(
45
+ "You are a security code review agent. Return JSON findings only.",
46
+ f"Review {len(chunks)} chunks for security issues.",
47
+ )
48
+
49
+ return AgentOutput(
50
+ agent_name=self.name,
51
+ findings=findings,
52
+ metadata={"chunks_scanned": len(chunks), "mode": "static-rules-plus-llm-interface"},
53
+ )
54
+
55
+ def _scan_chunk(self, chunk: CodeChunk) -> list[Finding]:
56
+ findings: list[Finding] = []
57
+ lines = chunk.content.splitlines()
58
+
59
+ for offset, line in enumerate(lines):
60
+ actual_line = chunk.line_start + offset
61
+ for pattern, title, severity, description, fix in SECURITY_PATTERNS:
62
+ if pattern.search(line):
63
+ findings.append(
64
+ Finding(
65
+ title=title,
66
+ severity=severity,
67
+ file_path=chunk.file_path,
68
+ line_start=actual_line,
69
+ line_end=actual_line,
70
+ description=description,
71
+ why_it_matters="Attackers often search repos for exposed credentials and unsafe execution paths.",
72
+ suggested_fix=fix,
73
+ agent_source=self.name,
74
+ )
75
+ )
76
+
77
+ return findings
app/agents/synthesizer_agent.py ADDED
@@ -0,0 +1,42 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from app.schemas import AgentOutput, AuditReport, Finding, RepoScanResult, Severity
2
+
3
+
4
+ SEVERITY_ORDER = {
5
+ Severity.critical: 0,
6
+ Severity.high: 1,
7
+ Severity.medium: 2,
8
+ Severity.low: 3,
9
+ }
10
+
11
+
12
+ class SynthesizerAgent:
13
+ name = "Synthesizer Agent"
14
+
15
+ async def synthesize(self, repo: RepoScanResult, outputs: list[AgentOutput]) -> AuditReport:
16
+ findings = self._dedupe([finding for output in outputs for finding in output.findings])
17
+ findings.sort(key=lambda finding: (SEVERITY_ORDER[finding.severity], finding.file_path, finding.line_start))
18
+
19
+ summary = {severity: 0 for severity in Severity}
20
+ for finding in findings:
21
+ summary[finding.severity] += 1
22
+
23
+ return AuditReport(
24
+ repo_url=repo.repo_url,
25
+ scanned_file_count=len(repo.files),
26
+ skipped_file_count=repo.skipped_files,
27
+ findings=findings,
28
+ severity_summary=summary,
29
+ agents_run=[output.agent_name for output in outputs] + [self.name],
30
+ warnings=repo.warnings,
31
+ )
32
+
33
+ def _dedupe(self, findings: list[Finding]) -> list[Finding]:
34
+ seen: set[tuple[str, int, str, str]] = set()
35
+ unique: list[Finding] = []
36
+ for finding in findings:
37
+ key = (finding.file_path, finding.line_start, finding.title, finding.agent_source)
38
+ if key in seen:
39
+ continue
40
+ seen.add(key)
41
+ unique.append(finding)
42
+ return unique
app/config.py ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from functools import lru_cache
2
+
3
+ from pydantic_settings import BaseSettings, SettingsConfigDict
4
+
5
+
6
+ class Settings(BaseSettings):
7
+ app_name: str = "SwarmAudit"
8
+ llm_provider: str = "mock"
9
+ llm_base_url: str = "http://localhost:8000/v1"
10
+ llm_api_key: str = "not-needed-for-mock"
11
+ llm_model: str = "Qwen/Qwen2.5-Coder-32B-Instruct"
12
+ max_files: int = 200
13
+ max_file_size_kb: int = 250
14
+ max_chars_per_chunk: int = 12000
15
+ clone_timeout_seconds: int = 60
16
+
17
+ model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
18
+
19
+
20
+ @lru_cache
21
+ def get_settings() -> Settings:
22
+ return Settings()
app/main.py ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, HTTPException
2
+
3
+ from app.agents.graph import AuditGraph
4
+ from app.config import get_settings
5
+ from app.schemas import AuditReport, AuditRequest
6
+
7
+ app = FastAPI(title="SwarmAudit", version="0.1.0")
8
+
9
+
10
+ @app.get("/health")
11
+ async def health() -> dict[str, str]:
12
+ return {"status": "ok", "app": get_settings().app_name}
13
+
14
+
15
+ @app.post("/audit", response_model=AuditReport)
16
+ async def audit(request: AuditRequest) -> AuditReport:
17
+ try:
18
+ graph = AuditGraph()
19
+ return await graph.run(str(request.repo_url))
20
+ except ValueError as exc:
21
+ raise HTTPException(status_code=400, detail=str(exc)) from exc
22
+ except Exception as exc:
23
+ raise HTTPException(status_code=500, detail=f"Audit failed: {exc}") from exc
app/schemas.py ADDED
@@ -0,0 +1,73 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from datetime import datetime, timezone
2
+ from enum import Enum
3
+ from typing import Any
4
+
5
+ from pydantic import BaseModel, Field, HttpUrl
6
+
7
+
8
+ class Severity(str, Enum):
9
+ critical = "CRITICAL"
10
+ high = "HIGH"
11
+ medium = "MEDIUM"
12
+ low = "LOW"
13
+
14
+
15
+ class AuditRequest(BaseModel):
16
+ repo_url: HttpUrl
17
+
18
+
19
+ class SourceFile(BaseModel):
20
+ path: str
21
+ absolute_path: str
22
+ size_bytes: int
23
+ language: str | None = None
24
+
25
+
26
+ class CodeChunk(BaseModel):
27
+ file_path: str
28
+ language: str | None = None
29
+ line_start: int
30
+ line_end: int
31
+ content: str
32
+
33
+
34
+ class Finding(BaseModel):
35
+ title: str
36
+ severity: Severity
37
+ file_path: str
38
+ line_start: int = Field(ge=1)
39
+ line_end: int = Field(ge=1)
40
+ description: str
41
+ why_it_matters: str
42
+ suggested_fix: str
43
+ agent_source: str
44
+
45
+
46
+ class AgentOutput(BaseModel):
47
+ agent_name: str
48
+ findings: list[Finding] = Field(default_factory=list)
49
+ metadata: dict[str, Any] = Field(default_factory=dict)
50
+
51
+
52
+ class RepoScanResult(BaseModel):
53
+ repo_url: str
54
+ local_path: str
55
+ files: list[SourceFile]
56
+ skipped_files: int = 0
57
+ warnings: list[str] = Field(default_factory=list)
58
+
59
+
60
+ class AuditReport(BaseModel):
61
+ repo_url: str
62
+ scanned_file_count: int
63
+ skipped_file_count: int
64
+ findings: list[Finding]
65
+ severity_summary: dict[Severity, int]
66
+ generated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
67
+ agents_run: list[str]
68
+ warnings: list[str] = Field(default_factory=list)
69
+
70
+
71
+ class AuditProgress(BaseModel):
72
+ message: str
73
+ stage: str
app/services/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ """Service layer for crawling, chunking, LLM access, and formatting."""
app/services/chunker.py ADDED
@@ -0,0 +1,58 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pathlib import Path
2
+
3
+ from app.config import Settings
4
+ from app.schemas import CodeChunk, SourceFile
5
+
6
+
7
+ class Chunker:
8
+ def __init__(self, settings: Settings):
9
+ self.settings = settings
10
+
11
+ def chunk_files(self, files: list[SourceFile]) -> list[CodeChunk]:
12
+ chunks: list[CodeChunk] = []
13
+ for source_file in files:
14
+ chunks.extend(self.chunk_file(source_file))
15
+ return chunks
16
+
17
+ def chunk_file(self, source_file: SourceFile) -> list[CodeChunk]:
18
+ text = Path(source_file.absolute_path).read_text(encoding="utf-8", errors="ignore")
19
+ lines = text.splitlines()
20
+ if not lines:
21
+ return []
22
+
23
+ chunks: list[CodeChunk] = []
24
+ current_lines: list[str] = []
25
+ current_start = 1
26
+ current_chars = 0
27
+
28
+ for index, line in enumerate(lines, start=1):
29
+ line_chars = len(line) + 1
30
+ if current_lines and current_chars + line_chars > self.settings.max_chars_per_chunk:
31
+ chunks.append(
32
+ CodeChunk(
33
+ file_path=source_file.path,
34
+ language=source_file.language,
35
+ line_start=current_start,
36
+ line_end=index - 1,
37
+ content="\n".join(current_lines),
38
+ )
39
+ )
40
+ current_lines = []
41
+ current_start = index
42
+ current_chars = 0
43
+
44
+ current_lines.append(line)
45
+ current_chars += line_chars
46
+
47
+ if current_lines:
48
+ chunks.append(
49
+ CodeChunk(
50
+ file_path=source_file.path,
51
+ language=source_file.language,
52
+ line_start=current_start,
53
+ line_end=len(lines),
54
+ content="\n".join(current_lines),
55
+ )
56
+ )
57
+
58
+ return chunks
app/services/llm_client.py ADDED
@@ -0,0 +1,41 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ from typing import Any
3
+
4
+ import httpx
5
+
6
+ from app.config import Settings
7
+
8
+
9
+ class LLMClient:
10
+ def __init__(self, settings: Settings):
11
+ self.settings = settings
12
+
13
+ async def complete_json(self, system_prompt: str, user_prompt: str) -> dict[str, Any]:
14
+ if self.settings.llm_provider == "mock":
15
+ return {
16
+ "findings": [],
17
+ "note": "Mock LLM is active; static rules produced the demo findings.",
18
+ }
19
+
20
+ if self.settings.llm_provider != "vllm":
21
+ raise ValueError(f"Unsupported LLM_PROVIDER={self.settings.llm_provider}")
22
+
23
+ payload = {
24
+ "model": self.settings.llm_model,
25
+ "messages": [
26
+ {"role": "system", "content": system_prompt},
27
+ {"role": "user", "content": user_prompt},
28
+ ],
29
+ "temperature": 0.1,
30
+ "response_format": {"type": "json_object"},
31
+ }
32
+ headers = {"Authorization": f"Bearer {self.settings.llm_api_key}"}
33
+ async with httpx.AsyncClient(timeout=120) as client:
34
+ response = await client.post(
35
+ f"{self.settings.llm_base_url.rstrip('/')}/chat/completions",
36
+ json=payload,
37
+ headers=headers,
38
+ )
39
+ response.raise_for_status()
40
+ content = response.json()["choices"][0]["message"]["content"]
41
+ return json.loads(content)
app/services/repo_crawler.py ADDED
@@ -0,0 +1,139 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import shutil
2
+ import tempfile
3
+ from pathlib import Path
4
+ from urllib.parse import urlparse
5
+
6
+ from git import Repo
7
+
8
+ from app.config import Settings
9
+ from app.schemas import RepoScanResult, SourceFile
10
+
11
+
12
+ IGNORED_DIRS = {
13
+ ".git",
14
+ "node_modules",
15
+ "dist",
16
+ "build",
17
+ ".venv",
18
+ "__pycache__",
19
+ "vendor",
20
+ "target",
21
+ ".next",
22
+ }
23
+
24
+ SUPPORTED_EXTENSIONS = {
25
+ ".py": "Python",
26
+ ".js": "JavaScript",
27
+ ".ts": "TypeScript",
28
+ ".tsx": "TypeScript React",
29
+ ".jsx": "JavaScript React",
30
+ ".java": "Java",
31
+ ".go": "Go",
32
+ ".rs": "Rust",
33
+ ".cpp": "C++",
34
+ ".c": "C",
35
+ ".cs": "C#",
36
+ ".php": "PHP",
37
+ ".rb": "Ruby",
38
+ }
39
+
40
+
41
+ def validate_github_url(repo_url: str) -> str:
42
+ parsed = urlparse(repo_url)
43
+ if parsed.scheme not in {"https", "http"}:
44
+ raise ValueError("Only HTTP(S) GitHub URLs are supported.")
45
+ if parsed.netloc.lower() != "github.com":
46
+ raise ValueError("Only public github.com repository URLs are supported.")
47
+
48
+ parts = [part for part in parsed.path.split("/") if part]
49
+ if len(parts) < 2:
50
+ raise ValueError("GitHub URL must include owner and repo name.")
51
+
52
+ owner, repo = parts[0], parts[1].removesuffix(".git")
53
+ return f"https://github.com/{owner}/{repo}.git"
54
+
55
+
56
+ class RepoCrawler:
57
+ def __init__(self, settings: Settings):
58
+ self.settings = settings
59
+
60
+ def clone_and_scan(self, repo_url: str) -> RepoScanResult:
61
+ clone_url = validate_github_url(repo_url)
62
+ temp_root = Path(tempfile.mkdtemp(prefix="swarm_audit_"))
63
+ repo_path = temp_root / "repo"
64
+
65
+ try:
66
+ Repo.clone_from(
67
+ clone_url,
68
+ repo_path,
69
+ depth=1,
70
+ single_branch=True,
71
+ kill_after_timeout=self.settings.clone_timeout_seconds,
72
+ env={"GIT_TERMINAL_PROMPT": "0"},
73
+ multi_options=["--filter=blob:none"],
74
+ )
75
+ return self.scan_local_repo(repo_url=repo_url, repo_path=repo_path)
76
+ except Exception:
77
+ shutil.rmtree(temp_root, ignore_errors=True)
78
+ raise
79
+
80
+ def scan_local_repo(self, repo_url: str, repo_path: Path) -> RepoScanResult:
81
+ files: list[SourceFile] = []
82
+ skipped = 0
83
+ warnings: list[str] = []
84
+ max_bytes = self.settings.max_file_size_kb * 1024
85
+
86
+ for path in repo_path.rglob("*"):
87
+ if not path.is_file():
88
+ continue
89
+ rel_path = path.relative_to(repo_path)
90
+ if any(part in IGNORED_DIRS for part in rel_path.parts):
91
+ skipped += 1
92
+ continue
93
+ if path.suffix.lower() not in SUPPORTED_EXTENSIONS:
94
+ skipped += 1
95
+ continue
96
+ size = path.stat().st_size
97
+ if size > max_bytes:
98
+ skipped += 1
99
+ warnings.append(f"Skipped large file: {rel_path}")
100
+ continue
101
+ if len(files) >= self.settings.max_files:
102
+ skipped += 1
103
+ continue
104
+
105
+ files.append(
106
+ SourceFile(
107
+ path=str(rel_path).replace("\\", "/"),
108
+ absolute_path=str(path),
109
+ size_bytes=size,
110
+ language=SUPPORTED_EXTENSIONS[path.suffix.lower()],
111
+ )
112
+ )
113
+
114
+ if len(files) >= self.settings.max_files:
115
+ warnings.append(f"Repo hit MAX_FILES={self.settings.max_files}; remaining files were skipped.")
116
+
117
+ return RepoScanResult(
118
+ repo_url=repo_url,
119
+ local_path=str(repo_path),
120
+ files=files,
121
+ skipped_files=skipped,
122
+ warnings=warnings,
123
+ )
124
+
125
+ def cleanup(self, scan_result: RepoScanResult | None) -> None:
126
+ if scan_result is None:
127
+ return
128
+
129
+ repo_path = Path(scan_result.local_path)
130
+ temp_root = repo_path.parent
131
+ temp_dir = Path(tempfile.gettempdir()).resolve()
132
+
133
+ try:
134
+ resolved_temp_root = temp_root.resolve()
135
+ except FileNotFoundError:
136
+ return
137
+
138
+ if temp_dir in resolved_temp_root.parents and temp_root.name.startswith("swarm_audit_"):
139
+ shutil.rmtree(temp_root, ignore_errors=True)
app/services/report_formatter.py ADDED
@@ -0,0 +1,49 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from app.schemas import AuditReport, Severity
2
+
3
+
4
+ def format_report_markdown(report: AuditReport) -> str:
5
+ lines = [
6
+ "# SwarmAudit Report",
7
+ "",
8
+ f"Repository: `{report.repo_url}`",
9
+ f"Files scanned: `{report.scanned_file_count}`",
10
+ f"Files skipped: `{report.skipped_file_count}`",
11
+ "",
12
+ "## Severity Summary",
13
+ "",
14
+ ]
15
+
16
+ for severity in [Severity.critical, Severity.high, Severity.medium, Severity.low]:
17
+ lines.append(f"- **{severity.value}**: {report.severity_summary.get(severity, 0)}")
18
+
19
+ if report.warnings:
20
+ lines.extend(["", "## Warnings", ""])
21
+ lines.extend(f"- {warning}" for warning in report.warnings)
22
+
23
+ lines.extend(["", "## Findings", ""])
24
+ if not report.findings:
25
+ lines.append("No findings detected by the current MVP agents.")
26
+ return "\n".join(lines)
27
+
28
+ for finding in report.findings:
29
+ lines.extend(
30
+ [
31
+ f"### [{finding.severity.value}] {finding.title}",
32
+ "",
33
+ f"- File: `{finding.file_path}:{finding.line_start}-{finding.line_end}`",
34
+ f"- Agent: `{finding.agent_source}`",
35
+ "",
36
+ finding.description,
37
+ "",
38
+ f"**Why it matters:** {finding.why_it_matters}",
39
+ "",
40
+ "**Suggested fix:**",
41
+ "",
42
+ "```text",
43
+ finding.suggested_fix,
44
+ "```",
45
+ "",
46
+ ]
47
+ )
48
+
49
+ return "\n".join(lines)
app/ui/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ """Gradio UI for SwarmAudit."""
app/ui/gradio_app.py ADDED
@@ -0,0 +1,42 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import gradio as gr
2
+
3
+ from app.agents.graph import AuditGraph
4
+ from app.schemas import AuditReport
5
+ from app.services.report_formatter import format_report_markdown
6
+
7
+
8
+ async def analyze_repo(repo_url: str):
9
+ if not repo_url.strip():
10
+ yield "Paste a public GitHub repository URL to start."
11
+ return
12
+
13
+ transcript: list[str] = []
14
+ try:
15
+ async for event in AuditGraph().run_with_progress(repo_url.strip()):
16
+ if isinstance(event, AuditReport):
17
+ transcript.append("")
18
+ transcript.append(format_report_markdown(event))
19
+ else:
20
+ transcript.append(event)
21
+ yield "\n".join(transcript)
22
+ except Exception as exc:
23
+ transcript.append(f"Audit failed: {exc}")
24
+ yield "\n".join(transcript)
25
+
26
+
27
+ def build_app() -> gr.Blocks:
28
+ with gr.Blocks(title="SwarmAudit") as demo:
29
+ gr.Markdown("# SwarmAudit")
30
+ gr.Markdown("Paste any public GitHub URL. Get a structured AI code review in minutes.")
31
+ repo_url = gr.Textbox(
32
+ label="GitHub Repository URL",
33
+ placeholder="https://github.com/owner/repo",
34
+ )
35
+ analyze = gr.Button("Analyze")
36
+ output = gr.Markdown(label="Audit Report")
37
+ analyze.click(analyze_repo, inputs=repo_url, outputs=output)
38
+ return demo
39
+
40
+
41
+ if __name__ == "__main__":
42
+ build_app().queue().launch()
pytest.ini ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ [pytest]
2
+ testpaths = tests
3
+ norecursedirs = .git .venv __pycache__ .pytest_cache .tmp_pytest pytest-cache-files-*
requirements.txt ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ fastapi
2
+ uvicorn[standard]
3
+ gradio
4
+ gitpython
5
+ pydantic
6
+ pydantic-settings
7
+ langgraph
8
+ langchain-core
9
+ httpx
10
+ python-dotenv
11
+ pytest
tests/test_api.py ADDED
@@ -0,0 +1,10 @@
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi.testclient import TestClient
2
+
3
+ from app.main import app
4
+
5
+
6
+ def test_health_endpoint():
7
+ response = TestClient(app).get("/health")
8
+
9
+ assert response.status_code == 200
10
+ assert response.json() == {"status": "ok", "app": "SwarmAudit"}
tests/test_chunker.py ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pathlib import Path
2
+
3
+ from app.config import Settings
4
+ from app.schemas import SourceFile
5
+ from app.services.chunker import Chunker
6
+
7
+
8
+ def test_chunker_preserves_line_ranges(tmp_path: Path):
9
+ source = tmp_path / "demo.py"
10
+ source.write_text("a = 1\nb = 2\nc = 3\n", encoding="utf-8")
11
+ source_file = SourceFile(
12
+ path="demo.py",
13
+ absolute_path=str(source),
14
+ size_bytes=source.stat().st_size,
15
+ language="Python",
16
+ )
17
+
18
+ chunks = Chunker(Settings(max_chars_per_chunk=8)).chunk_file(source_file)
19
+
20
+ assert len(chunks) > 1
21
+ assert chunks[0].line_start == 1
22
+ assert chunks[-1].line_end == 3
tests/test_graph_progress.py ADDED
@@ -0,0 +1,26 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pathlib import Path
2
+
3
+ import pytest
4
+
5
+ from app.agents.graph import AuditGraph
6
+ from app.config import Settings
7
+ from app.schemas import AuditReport
8
+
9
+
10
+ @pytest.mark.anyio
11
+ async def test_run_with_progress_yields_real_stages_and_report(tmp_path: Path):
12
+ source = tmp_path / "app.py"
13
+ source.write_text("API_KEY = '1234567890abcdef'\n", encoding="utf-8")
14
+ graph = AuditGraph(Settings(max_files=10, max_file_size_kb=10, max_chars_per_chunk=1000))
15
+
16
+ graph.crawler.clone_and_scan = lambda repo_url: graph.crawler.scan_local_repo(repo_url, tmp_path)
17
+ graph.crawler.cleanup = lambda scan_result: None
18
+
19
+ events = []
20
+ async for event in graph.run_with_progress("https://github.com/example/project"):
21
+ events.append(event)
22
+
23
+ assert any("Crawler Agent" in event for event in events if isinstance(event, str))
24
+ assert any("Security Agent" in event for event in events if isinstance(event, str))
25
+ assert isinstance(events[-1], AuditReport)
26
+ assert len(events[-1].findings) == 1
tests/test_repo_crawler.py ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pathlib import Path
2
+
3
+ import pytest
4
+
5
+ from app.config import Settings
6
+ from app.services.repo_crawler import RepoCrawler, validate_github_url
7
+
8
+
9
+ def test_validate_github_url_normalizes_clone_url():
10
+ assert validate_github_url("https://github.com/example/project") == "https://github.com/example/project.git"
11
+
12
+
13
+ def test_validate_github_url_rejects_non_github():
14
+ with pytest.raises(ValueError):
15
+ validate_github_url("https://gitlab.com/example/project")
16
+
17
+
18
+ def test_scan_local_repo_filters_supported_files(tmp_path: Path):
19
+ (tmp_path / "node_modules").mkdir()
20
+ (tmp_path / "node_modules" / "ignored.js").write_text("x", encoding="utf-8")
21
+ (tmp_path / "app.py").write_text("API_KEY = '1234567890'\n", encoding="utf-8")
22
+ (tmp_path / "notes.txt").write_text("hello", encoding="utf-8")
23
+
24
+ crawler = RepoCrawler(Settings(max_files=10, max_file_size_kb=1))
25
+ result = crawler.scan_local_repo("https://github.com/example/project", tmp_path)
26
+
27
+ assert [file.path for file in result.files] == ["app.py"]
28
+ assert result.skipped_files == 2
tests/test_security_report.py ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pytest
2
+
3
+ from app.agents.security_agent import SecurityAgent
4
+ from app.agents.synthesizer_agent import SynthesizerAgent
5
+ from app.config import Settings
6
+ from app.schemas import CodeChunk, RepoScanResult, Severity
7
+ from app.services.llm_client import LLMClient
8
+
9
+
10
+ @pytest.mark.anyio
11
+ async def test_security_agent_and_synthesizer_return_structured_report():
12
+ chunk = CodeChunk(
13
+ file_path="app.py",
14
+ language="Python",
15
+ line_start=10,
16
+ line_end=10,
17
+ content="API_KEY = '1234567890abcdef'",
18
+ )
19
+ output = await SecurityAgent(LLMClient(Settings())).analyze([chunk])
20
+ repo = RepoScanResult(repo_url="https://github.com/example/project", local_path=".", files=[], skipped_files=0)
21
+
22
+ report = await SynthesizerAgent().synthesize(repo, [output])
23
+
24
+ assert len(report.findings) == 1
25
+ assert report.findings[0].severity == Severity.high
26
+ assert report.findings[0].file_path == "app.py"
27
+ assert report.findings[0].line_start == 10
28
+ assert report.severity_summary[Severity.high] == 1