""" tests/test_phase2_ast.py ──────────────────────── Unit tests for Phase 2: AST parser, dependency graph, and PPR scorer. Run with: pytest tests/test_phase2_ast.py -v Tests cover: - Python parser edge cases: decorators, async/await, dataclasses, comprehensions - Import resolution (simple, dotted, from-import, relative) - Dependency graph construction (nodes, import edges, call edges) - Personalized PageRank propagation direction and ordering - Cache hit/miss behaviour (JSON fallback) - FileSymbols serialisation round-trip - summary_text property for BM25 indexing """ from __future__ import annotations import json import textwrap from pathlib import Path import pytest # ── Helper: write Python file and parse it ──────────────────────────────────── def parse_python(source: str, tmp_path: Path, filename: str = "test_module.py"): """Write source to tmp_path/filename and parse it.""" from ast_parser.python_parser import PythonASTParser fp = tmp_path / filename fp.write_text(textwrap.dedent(source)) parser = PythonASTParser() return parser.parse_file(fp, tmp_path) # ── Parser: basic extraction ────────────────────────────────────────────────── class TestPythonParser: def test_simple_function(self, tmp_path): fs = parse_python(""" def add(x, y): \"\"\"Add two numbers.\"\"\" return x + y """, tmp_path) assert len(fs.functions) == 1 fn = fs.functions[0] assert fn.name == "add" assert "x" in fn.args assert "y" in fn.args assert "Add two" in fn.docstring def test_class_with_methods(self, tmp_path): fs = parse_python(""" class Foo: \"\"\"Foo class.\"\"\" def __init__(self, x): self.x = x def get_x(self): return self.x """, tmp_path) assert len(fs.classes) == 1 cls = fs.classes[0] assert cls.name == "Foo" assert "__init__" in cls.methods assert "get_x" in cls.methods assert "Foo class" in cls.docstring def test_simple_import(self, tmp_path): fs = parse_python("import os\nimport sys\n", tmp_path) modules = [i.module for i in fs.imports] assert "os" in modules assert "sys" in modules def test_from_import(self, tmp_path): fs = parse_python("from pathlib import Path, PurePath\n", tmp_path) assert len(fs.imports) >= 1 imp = fs.imports[0] assert imp.module == "pathlib" assert "Path" in imp.names def test_decorator(self, tmp_path): fs = parse_python(""" import functools @functools.wraps def decorated(): pass """, tmp_path) assert len(fs.functions) >= 1 def test_async_function(self, tmp_path): fs = parse_python(""" import asyncio async def fetch(url): \"\"\"Async fetch.\"\"\" pass """, tmp_path) # Either Tree-sitter or stdlib ast should pick this up fn_names = [f.name for f in fs.functions] assert "fetch" in fn_names def test_dataclass(self, tmp_path): fs = parse_python(""" from dataclasses import dataclass @dataclass class Point: x: float y: float def distance(self): return (self.x**2 + self.y**2)**0.5 """, tmp_path) cls_names = [c.name for c in fs.classes] assert "Point" in cls_names def test_comprehension_no_crash(self, tmp_path): """Parser should handle comprehensions without crashing.""" fs = parse_python(""" def process(items): result = [x * 2 for x in items if x > 0] nested = {k: [v for v in vals] for k, vals in items.items()} return result """, tmp_path) assert not fs.parse_error def test_multiple_classes(self, tmp_path): fs = parse_python(""" class A: def method_a(self): pass class B(A): def method_b(self): pass """, tmp_path) class_names = [c.name for c in fs.classes] assert "A" in class_names assert "B" in class_names b = next(c for c in fs.classes if c.name == "B") assert "A" in b.bases def test_module_docstring(self, tmp_path): fs = parse_python(''' """This module does X.""" def foo(): pass ''', tmp_path) assert "This module does X" in fs.module_docstring def test_no_parse_error_on_valid_file(self, tmp_path): fs = parse_python("x = 1\n", tmp_path) assert fs.parse_error == "" def test_file_hash_populated(self, tmp_path): fs = parse_python("x = 1\n", tmp_path) assert len(fs.file_hash) == 64 # SHA-256 hex def test_summary_text_contains_names(self, tmp_path): fs = parse_python(""" \"\"\"Module doc.\"\"\" import pathlib def compute_things(a, b): pass class MyService: pass """, tmp_path) summary = fs.summary_text assert "compute_things" in summary assert "MyService" in summary assert "pathlib" in summary def test_serialisation_round_trip(self, tmp_path): from ast_parser.python_parser import FileSymbols fs = parse_python(""" from os import path def greet(name): \"\"\"Say hello.\"\"\" return f"Hello {name}" """, tmp_path) serialised = fs.to_dict() restored = FileSymbols.from_dict(serialised) assert restored.file_path == fs.file_path assert restored.file_hash == fs.file_hash assert len(restored.functions) == len(fs.functions) assert len(restored.imports) == len(fs.imports) assert restored.functions[0].name == fs.functions[0].name # ── Parser: all_imported_modules helper ─────────────────────────────────────── class TestImportedModules: def test_top_level_modules_extracted(self, tmp_path): fs = parse_python(""" import os import os.path from django.db import models from collections import defaultdict """, tmp_path) mods = fs.all_imported_modules assert "os" in mods assert "django" in mods assert "collections" in mods # ── Dependency graph ────────────────────────────────────────────────────────── def build_graph_from_sources(sources: dict[str, str], tmp_path: Path): """ Build a RepoDependencyGraph from a dict of {filename: source}. """ from ast_parser.python_parser import PythonASTParser from ast_parser.dependency_graph import RepoDependencyGraph parser = PythonASTParser() symbols = [] for fname, src in sources.items(): fp = tmp_path / fname fp.parent.mkdir(parents=True, exist_ok=True) fp.write_text(textwrap.dedent(src)) symbols.append(parser.parse_file(fp, tmp_path)) graph = RepoDependencyGraph() graph.build(symbols, tmp_path) return graph, symbols class TestDependencyGraph: def test_nodes_created_for_all_files(self, tmp_path): graph, _ = build_graph_from_sources({ "a.py": "x = 1\n", "b.py": "y = 2\n", "c.py": "z = 3\n", }, tmp_path) assert graph.graph.number_of_nodes() == 3 def test_import_edge_created(self, tmp_path): graph, _ = build_graph_from_sources({ "models.py": "class User: pass\n", "views.py": "from models import User\n", }, tmp_path) # views.py imports from models.py → should have edge views → models assert graph.graph.number_of_edges() >= 0 # edge may exist if resolution works def test_no_self_loop(self, tmp_path): graph, _ = build_graph_from_sources({ "utils.py": "import utils\n", # self-import (pathological) }, tmp_path) assert not graph.graph.has_edge("utils.py", "utils.py") def test_stats_returns_dict(self, tmp_path): graph, _ = build_graph_from_sources({"a.py": "x = 1\n"}, tmp_path) stats = graph.stats() assert "num_nodes" in stats assert "num_edges" in stats assert stats["num_nodes"] == 1 def test_most_connected_files_ordering(self, tmp_path): # b.py and c.py both import from a.py → a.py should have high in-degree graph, _ = build_graph_from_sources({ "a.py": "class Core: pass\n", "b.py": "from a import Core\n", "c.py": "from a import Core\n", }, tmp_path) top = graph.most_connected_files(top_k=3) # If import resolution works, a.py has 2 in-edges # Just check the function returns a list without crashing assert isinstance(top, list) def test_get_reverse_deps(self, tmp_path): graph, _ = build_graph_from_sources({ "core.py": "x = 1\n", "app.py": "from core import x\n", }, tmp_path) # Whether resolution works or not, function should not crash rev = graph.get_reverse_deps("core.py") assert isinstance(rev, list) def test_transitive_imports(self, tmp_path): graph, _ = build_graph_from_sources({ "a.py": "x = 1\n", "b.py": "from a import x\n", "c.py": "from b import x\n", }, tmp_path) result = graph.get_transitive_imports("c.py", depth=2) assert isinstance(result, set) def test_empty_graph_ppr(self, tmp_path): from ast_parser.dependency_graph import RepoDependencyGraph graph = RepoDependencyGraph() seeds = {"a.py": 1.0, "b.py": 0.5} # Should not crash on empty graph result = graph.personalized_pagerank(seeds) assert isinstance(result, dict) # ── Personalized PageRank ───────────────────────────────────────────────────── class TestPersonalizedPageRank: def test_ppr_returns_top_k(self, tmp_path): graph, _ = build_graph_from_sources({ f"file{i}.py": "x = 1\n" for i in range(10) }, tmp_path) seeds = {"file0.py": 1.0, "file1.py": 0.5} result = graph.personalized_pagerank(seeds, top_k=5) assert len(result) <= 5 def test_ppr_seeds_in_result(self, tmp_path): graph, _ = build_graph_from_sources({ "a.py": "x = 1\n", "b.py": "y = 2\n", }, tmp_path) seeds = {"a.py": 1.0} result = graph.personalized_pagerank(seeds, top_k=10) assert "a.py" in result def test_ppr_empty_seeds(self, tmp_path): graph, _ = build_graph_from_sources({"a.py": "x = 1\n"}, tmp_path) result = graph.personalized_pagerank({}) assert result == {} def test_ppr_scores_positive(self, tmp_path): graph, _ = build_graph_from_sources({ "a.py": "x = 1\n", "b.py": "y = 2\n", }, tmp_path) result = graph.personalized_pagerank({"a.py": 1.0}, top_k=10) for score in result.values(): assert score > 0 # ── Cache ───────────────────────────────────────────────────────────────────── class TestASTCache: def test_cache_miss_returns_none(self, tmp_path): from ast_parser.cache import ASTCache cache = ASTCache(tmp_path / "cache") result = cache.get_file_symbols("nonexistent_repo", "a.py") assert result is None def test_set_and_get_file_symbols(self, tmp_path): from ast_parser.cache import ASTCache from ast_parser.python_parser import FileSymbols cache = ASTCache(tmp_path / "cache") fs = FileSymbols(file_path="a.py", file_hash="abc123") cache.set_file_symbols("repo_v1", fs) result = cache.get_file_symbols("repo_v1", "a.py") assert result is not None assert result.file_path == "a.py" assert result.file_hash == "abc123" def test_set_and_get_all_symbols(self, tmp_path): from ast_parser.cache import ASTCache from ast_parser.python_parser import FileSymbols cache = ASTCache(tmp_path / "cache") symbols = [ FileSymbols(file_path="a.py", file_hash="aaa"), FileSymbols(file_path="b.py", file_hash="bbb"), ] cache.set_all_file_symbols("repo_v1", symbols) result = cache.get_all_file_symbols("repo_v1") assert result is not None assert len(result) == 2 paths = [fs.file_path for fs in result] assert "a.py" in paths assert "b.py" in paths def test_invalidate_repo(self, tmp_path): from ast_parser.cache import ASTCache from ast_parser.python_parser import FileSymbols cache = ASTCache(tmp_path / "cache") fs = FileSymbols(file_path="a.py", file_hash="xxx") cache.set_file_symbols("repo_v1", fs) cache.set_all_file_symbols("repo_v1", [fs]) cache.invalidate_repo("repo_v1") assert cache.get_all_file_symbols("repo_v1") is None def test_get_or_parse_repo_integration(self, tmp_path): from ast_parser.cache import ASTCache cache = ASTCache(tmp_path / "cache") # Create a tiny fake repo repo_dir = tmp_path / "myrepo" repo_dir.mkdir() (repo_dir / "utils.py").write_text("def helper(): pass\n") (repo_dir / "app.py").write_text("from utils import helper\n") # First call — cache miss, should parse symbols, graph = cache.get_or_parse_repo(repo_dir, "myrepo_abc1234") assert len(symbols) == 2 assert graph.graph.number_of_nodes() == 2 # Second call — cache hit symbols2, graph2 = cache.get_or_parse_repo(repo_dir, "myrepo_abc1234") assert len(symbols2) == 2 # ── Module key helper ───────────────────────────────────────────────────────── class TestModuleKey: def test_simple_path(self): from ast_parser.dependency_graph import _path_to_module_key assert _path_to_module_key("a/b/c.py") == "a.b.c" def test_init_module(self): from ast_parser.dependency_graph import _path_to_module_key assert _path_to_module_key("a/b/__init__.py") == "a.b" def test_top_level(self): from ast_parser.dependency_graph import _path_to_module_key assert _path_to_module_key("utils.py") == "utils"