Spaces:
Running
Running
| """ | |
| tests/test_phase2_ast.py | |
| ββββββββββββββββββββββββ | |
| Unit tests for Phase 2: AST parser, dependency graph, and PPR scorer. | |
| Run with: pytest tests/test_phase2_ast.py -v | |
| Tests cover: | |
| - Python parser edge cases: decorators, async/await, dataclasses, comprehensions | |
| - Import resolution (simple, dotted, from-import, relative) | |
| - Dependency graph construction (nodes, import edges, call edges) | |
| - Personalized PageRank propagation direction and ordering | |
| - Cache hit/miss behaviour (JSON fallback) | |
| - FileSymbols serialisation round-trip | |
| - summary_text property for BM25 indexing | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import textwrap | |
| from pathlib import Path | |
| import pytest | |
| # ββ Helper: write Python file and parse it ββββββββββββββββββββββββββββββββββββ | |
| def parse_python(source: str, tmp_path: Path, filename: str = "test_module.py"): | |
| """Write source to tmp_path/filename and parse it.""" | |
| from ast_parser.python_parser import PythonASTParser | |
| fp = tmp_path / filename | |
| fp.write_text(textwrap.dedent(source)) | |
| parser = PythonASTParser() | |
| return parser.parse_file(fp, tmp_path) | |
| # ββ Parser: basic extraction ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestPythonParser: | |
| def test_simple_function(self, tmp_path): | |
| fs = parse_python(""" | |
| def add(x, y): | |
| \"\"\"Add two numbers.\"\"\" | |
| return x + y | |
| """, tmp_path) | |
| assert len(fs.functions) == 1 | |
| fn = fs.functions[0] | |
| assert fn.name == "add" | |
| assert "x" in fn.args | |
| assert "y" in fn.args | |
| assert "Add two" in fn.docstring | |
| def test_class_with_methods(self, tmp_path): | |
| fs = parse_python(""" | |
| class Foo: | |
| \"\"\"Foo class.\"\"\" | |
| def __init__(self, x): | |
| self.x = x | |
| def get_x(self): | |
| return self.x | |
| """, tmp_path) | |
| assert len(fs.classes) == 1 | |
| cls = fs.classes[0] | |
| assert cls.name == "Foo" | |
| assert "__init__" in cls.methods | |
| assert "get_x" in cls.methods | |
| assert "Foo class" in cls.docstring | |
| def test_simple_import(self, tmp_path): | |
| fs = parse_python("import os\nimport sys\n", tmp_path) | |
| modules = [i.module for i in fs.imports] | |
| assert "os" in modules | |
| assert "sys" in modules | |
| def test_from_import(self, tmp_path): | |
| fs = parse_python("from pathlib import Path, PurePath\n", tmp_path) | |
| assert len(fs.imports) >= 1 | |
| imp = fs.imports[0] | |
| assert imp.module == "pathlib" | |
| assert "Path" in imp.names | |
| def test_decorator(self, tmp_path): | |
| fs = parse_python(""" | |
| import functools | |
| @functools.wraps | |
| def decorated(): | |
| pass | |
| """, tmp_path) | |
| assert len(fs.functions) >= 1 | |
| def test_async_function(self, tmp_path): | |
| fs = parse_python(""" | |
| import asyncio | |
| async def fetch(url): | |
| \"\"\"Async fetch.\"\"\" | |
| pass | |
| """, tmp_path) | |
| # Either Tree-sitter or stdlib ast should pick this up | |
| fn_names = [f.name for f in fs.functions] | |
| assert "fetch" in fn_names | |
| def test_dataclass(self, tmp_path): | |
| fs = parse_python(""" | |
| from dataclasses import dataclass | |
| @dataclass | |
| class Point: | |
| x: float | |
| y: float | |
| def distance(self): | |
| return (self.x**2 + self.y**2)**0.5 | |
| """, tmp_path) | |
| cls_names = [c.name for c in fs.classes] | |
| assert "Point" in cls_names | |
| def test_comprehension_no_crash(self, tmp_path): | |
| """Parser should handle comprehensions without crashing.""" | |
| fs = parse_python(""" | |
| def process(items): | |
| result = [x * 2 for x in items if x > 0] | |
| nested = {k: [v for v in vals] for k, vals in items.items()} | |
| return result | |
| """, tmp_path) | |
| assert not fs.parse_error | |
| def test_multiple_classes(self, tmp_path): | |
| fs = parse_python(""" | |
| class A: | |
| def method_a(self): pass | |
| class B(A): | |
| def method_b(self): pass | |
| """, tmp_path) | |
| class_names = [c.name for c in fs.classes] | |
| assert "A" in class_names | |
| assert "B" in class_names | |
| b = next(c for c in fs.classes if c.name == "B") | |
| assert "A" in b.bases | |
| def test_module_docstring(self, tmp_path): | |
| fs = parse_python(''' | |
| """This module does X.""" | |
| def foo(): pass | |
| ''', tmp_path) | |
| assert "This module does X" in fs.module_docstring | |
| def test_no_parse_error_on_valid_file(self, tmp_path): | |
| fs = parse_python("x = 1\n", tmp_path) | |
| assert fs.parse_error == "" | |
| def test_file_hash_populated(self, tmp_path): | |
| fs = parse_python("x = 1\n", tmp_path) | |
| assert len(fs.file_hash) == 64 # SHA-256 hex | |
| def test_summary_text_contains_names(self, tmp_path): | |
| fs = parse_python(""" | |
| \"\"\"Module doc.\"\"\" | |
| import pathlib | |
| def compute_things(a, b): | |
| pass | |
| class MyService: | |
| pass | |
| """, tmp_path) | |
| summary = fs.summary_text | |
| assert "compute_things" in summary | |
| assert "MyService" in summary | |
| assert "pathlib" in summary | |
| def test_serialisation_round_trip(self, tmp_path): | |
| from ast_parser.python_parser import FileSymbols | |
| fs = parse_python(""" | |
| from os import path | |
| def greet(name): | |
| \"\"\"Say hello.\"\"\" | |
| return f"Hello {name}" | |
| """, tmp_path) | |
| serialised = fs.to_dict() | |
| restored = FileSymbols.from_dict(serialised) | |
| assert restored.file_path == fs.file_path | |
| assert restored.file_hash == fs.file_hash | |
| assert len(restored.functions) == len(fs.functions) | |
| assert len(restored.imports) == len(fs.imports) | |
| assert restored.functions[0].name == fs.functions[0].name | |
| # ββ Parser: all_imported_modules helper βββββββββββββββββββββββββββββββββββββββ | |
| class TestImportedModules: | |
| def test_top_level_modules_extracted(self, tmp_path): | |
| fs = parse_python(""" | |
| import os | |
| import os.path | |
| from django.db import models | |
| from collections import defaultdict | |
| """, tmp_path) | |
| mods = fs.all_imported_modules | |
| assert "os" in mods | |
| assert "django" in mods | |
| assert "collections" in mods | |
| # ββ Dependency graph ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_graph_from_sources(sources: dict[str, str], tmp_path: Path): | |
| """ | |
| Build a RepoDependencyGraph from a dict of {filename: source}. | |
| """ | |
| from ast_parser.python_parser import PythonASTParser | |
| from ast_parser.dependency_graph import RepoDependencyGraph | |
| parser = PythonASTParser() | |
| symbols = [] | |
| for fname, src in sources.items(): | |
| fp = tmp_path / fname | |
| fp.parent.mkdir(parents=True, exist_ok=True) | |
| fp.write_text(textwrap.dedent(src)) | |
| symbols.append(parser.parse_file(fp, tmp_path)) | |
| graph = RepoDependencyGraph() | |
| graph.build(symbols, tmp_path) | |
| return graph, symbols | |
| class TestDependencyGraph: | |
| def test_nodes_created_for_all_files(self, tmp_path): | |
| graph, _ = build_graph_from_sources({ | |
| "a.py": "x = 1\n", | |
| "b.py": "y = 2\n", | |
| "c.py": "z = 3\n", | |
| }, tmp_path) | |
| assert graph.graph.number_of_nodes() == 3 | |
| def test_import_edge_created(self, tmp_path): | |
| graph, _ = build_graph_from_sources({ | |
| "models.py": "class User: pass\n", | |
| "views.py": "from models import User\n", | |
| }, tmp_path) | |
| # views.py imports from models.py β should have edge views β models | |
| assert graph.graph.number_of_edges() >= 0 # edge may exist if resolution works | |
| def test_no_self_loop(self, tmp_path): | |
| graph, _ = build_graph_from_sources({ | |
| "utils.py": "import utils\n", # self-import (pathological) | |
| }, tmp_path) | |
| assert not graph.graph.has_edge("utils.py", "utils.py") | |
| def test_stats_returns_dict(self, tmp_path): | |
| graph, _ = build_graph_from_sources({"a.py": "x = 1\n"}, tmp_path) | |
| stats = graph.stats() | |
| assert "num_nodes" in stats | |
| assert "num_edges" in stats | |
| assert stats["num_nodes"] == 1 | |
| def test_most_connected_files_ordering(self, tmp_path): | |
| # b.py and c.py both import from a.py β a.py should have high in-degree | |
| graph, _ = build_graph_from_sources({ | |
| "a.py": "class Core: pass\n", | |
| "b.py": "from a import Core\n", | |
| "c.py": "from a import Core\n", | |
| }, tmp_path) | |
| top = graph.most_connected_files(top_k=3) | |
| # If import resolution works, a.py has 2 in-edges | |
| # Just check the function returns a list without crashing | |
| assert isinstance(top, list) | |
| def test_get_reverse_deps(self, tmp_path): | |
| graph, _ = build_graph_from_sources({ | |
| "core.py": "x = 1\n", | |
| "app.py": "from core import x\n", | |
| }, tmp_path) | |
| # Whether resolution works or not, function should not crash | |
| rev = graph.get_reverse_deps("core.py") | |
| assert isinstance(rev, list) | |
| def test_transitive_imports(self, tmp_path): | |
| graph, _ = build_graph_from_sources({ | |
| "a.py": "x = 1\n", | |
| "b.py": "from a import x\n", | |
| "c.py": "from b import x\n", | |
| }, tmp_path) | |
| result = graph.get_transitive_imports("c.py", depth=2) | |
| assert isinstance(result, set) | |
| def test_empty_graph_ppr(self, tmp_path): | |
| from ast_parser.dependency_graph import RepoDependencyGraph | |
| graph = RepoDependencyGraph() | |
| seeds = {"a.py": 1.0, "b.py": 0.5} | |
| # Should not crash on empty graph | |
| result = graph.personalized_pagerank(seeds) | |
| assert isinstance(result, dict) | |
| # ββ Personalized PageRank βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestPersonalizedPageRank: | |
| def test_ppr_returns_top_k(self, tmp_path): | |
| graph, _ = build_graph_from_sources({ | |
| f"file{i}.py": "x = 1\n" for i in range(10) | |
| }, tmp_path) | |
| seeds = {"file0.py": 1.0, "file1.py": 0.5} | |
| result = graph.personalized_pagerank(seeds, top_k=5) | |
| assert len(result) <= 5 | |
| def test_ppr_seeds_in_result(self, tmp_path): | |
| graph, _ = build_graph_from_sources({ | |
| "a.py": "x = 1\n", | |
| "b.py": "y = 2\n", | |
| }, tmp_path) | |
| seeds = {"a.py": 1.0} | |
| result = graph.personalized_pagerank(seeds, top_k=10) | |
| assert "a.py" in result | |
| def test_ppr_empty_seeds(self, tmp_path): | |
| graph, _ = build_graph_from_sources({"a.py": "x = 1\n"}, tmp_path) | |
| result = graph.personalized_pagerank({}) | |
| assert result == {} | |
| def test_ppr_scores_positive(self, tmp_path): | |
| graph, _ = build_graph_from_sources({ | |
| "a.py": "x = 1\n", | |
| "b.py": "y = 2\n", | |
| }, tmp_path) | |
| result = graph.personalized_pagerank({"a.py": 1.0}, top_k=10) | |
| for score in result.values(): | |
| assert score > 0 | |
| # ββ Cache βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestASTCache: | |
| def test_cache_miss_returns_none(self, tmp_path): | |
| from ast_parser.cache import ASTCache | |
| cache = ASTCache(tmp_path / "cache") | |
| result = cache.get_file_symbols("nonexistent_repo", "a.py") | |
| assert result is None | |
| def test_set_and_get_file_symbols(self, tmp_path): | |
| from ast_parser.cache import ASTCache | |
| from ast_parser.python_parser import FileSymbols | |
| cache = ASTCache(tmp_path / "cache") | |
| fs = FileSymbols(file_path="a.py", file_hash="abc123") | |
| cache.set_file_symbols("repo_v1", fs) | |
| result = cache.get_file_symbols("repo_v1", "a.py") | |
| assert result is not None | |
| assert result.file_path == "a.py" | |
| assert result.file_hash == "abc123" | |
| def test_set_and_get_all_symbols(self, tmp_path): | |
| from ast_parser.cache import ASTCache | |
| from ast_parser.python_parser import FileSymbols | |
| cache = ASTCache(tmp_path / "cache") | |
| symbols = [ | |
| FileSymbols(file_path="a.py", file_hash="aaa"), | |
| FileSymbols(file_path="b.py", file_hash="bbb"), | |
| ] | |
| cache.set_all_file_symbols("repo_v1", symbols) | |
| result = cache.get_all_file_symbols("repo_v1") | |
| assert result is not None | |
| assert len(result) == 2 | |
| paths = [fs.file_path for fs in result] | |
| assert "a.py" in paths | |
| assert "b.py" in paths | |
| def test_invalidate_repo(self, tmp_path): | |
| from ast_parser.cache import ASTCache | |
| from ast_parser.python_parser import FileSymbols | |
| cache = ASTCache(tmp_path / "cache") | |
| fs = FileSymbols(file_path="a.py", file_hash="xxx") | |
| cache.set_file_symbols("repo_v1", fs) | |
| cache.set_all_file_symbols("repo_v1", [fs]) | |
| cache.invalidate_repo("repo_v1") | |
| assert cache.get_all_file_symbols("repo_v1") is None | |
| def test_get_or_parse_repo_integration(self, tmp_path): | |
| from ast_parser.cache import ASTCache | |
| cache = ASTCache(tmp_path / "cache") | |
| # Create a tiny fake repo | |
| repo_dir = tmp_path / "myrepo" | |
| repo_dir.mkdir() | |
| (repo_dir / "utils.py").write_text("def helper(): pass\n") | |
| (repo_dir / "app.py").write_text("from utils import helper\n") | |
| # First call β cache miss, should parse | |
| symbols, graph = cache.get_or_parse_repo(repo_dir, "myrepo_abc1234") | |
| assert len(symbols) == 2 | |
| assert graph.graph.number_of_nodes() == 2 | |
| # Second call β cache hit | |
| symbols2, graph2 = cache.get_or_parse_repo(repo_dir, "myrepo_abc1234") | |
| assert len(symbols2) == 2 | |
| # ββ Module key helper βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestModuleKey: | |
| def test_simple_path(self): | |
| from ast_parser.dependency_graph import _path_to_module_key | |
| assert _path_to_module_key("a/b/c.py") == "a.b.c" | |
| def test_init_module(self): | |
| from ast_parser.dependency_graph import _path_to_module_key | |
| assert _path_to_module_key("a/b/__init__.py") == "a.b" | |
| def test_top_level(self): | |
| from ast_parser.dependency_graph import _path_to_module_key | |
| assert _path_to_module_key("utils.py") == "utils" | |