Spaces:
Running
Running
File size: 7,258 Bytes
dc71cad | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | """
ast_parser/cache.py
ββββββββββββββββββββ
Per-repo AST and graph caching layer.
Cache strategy:
- Key: (repo_name, repo_commit_sha)
- Value: {file_path: FileSymbols JSON} + graph adjacency JSON
- Backend: diskcache (local) β zero external dependencies
On cache hit: skip all Tree-sitter parsing and graph construction.
On cache miss: parse all files, build graph, write to cache.
For a 500-file repo, this takes parsing from ~8s β ~0ms on repeat runs.
Cache invalidation:
- Individual file: SHA-256 of file content differs from cached hash
- Full repo: commit SHA changed (new cache entry created)
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Optional
from ast_parser.python_parser import FileSymbols
from ast_parser.dependency_graph import RepoDependencyGraph, graph_to_dict, graph_from_dict
logger = logging.getLogger(__name__)
class ASTCache:
"""
Disk-backed cache for AST parse results and dependency graphs.
Uses diskcache if available, falls back to raw JSON files.
"""
def __init__(self, cache_dir: Path):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self._dc = None
self._try_init_diskcache()
def _try_init_diskcache(self) -> None:
try:
import diskcache
self._dc = diskcache.Cache(str(self.cache_dir / "diskcache"))
logger.debug("ASTCache: using diskcache backend")
except ImportError:
logger.debug("ASTCache: diskcache not available, using JSON files")
# ββ FileSymbols cache βββββββββββββββββββββββββββββββββββββββββββββββββββββ
def get_file_symbols(self, repo_key: str, file_path: str) -> Optional[FileSymbols]:
"""Return cached FileSymbols or None if not cached / stale."""
key = f"symbols:{repo_key}:{file_path}"
raw = self._get(key)
if raw is None:
return None
try:
return FileSymbols.from_dict(json.loads(raw))
except (json.JSONDecodeError, KeyError) as e:
logger.debug("Cache decode error for %s: %s", key, e)
return None
def set_file_symbols(self, repo_key: str, fs: FileSymbols) -> None:
key = f"symbols:{repo_key}:{fs.file_path}"
self._set(key, json.dumps(fs.to_dict()))
def get_all_file_symbols(self, repo_key: str) -> Optional[list[FileSymbols]]:
"""Return all cached FileSymbols for a repo or None."""
key = f"all_symbols:{repo_key}"
raw = self._get(key)
if raw is None:
return None
try:
data = json.loads(raw)
return [FileSymbols.from_dict(d) for d in data]
except Exception as e:
logger.debug("Cache decode error for all_symbols: %s", e)
return None
def set_all_file_symbols(self, repo_key: str, symbols: list[FileSymbols]) -> None:
key = f"all_symbols:{repo_key}"
self._set(key, json.dumps([fs.to_dict() for fs in symbols]))
# ββ Graph cache βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def get_graph(self, repo_key: str) -> Optional[RepoDependencyGraph]:
"""Return cached dependency graph or None."""
key = f"graph:{repo_key}"
raw = self._get(key)
if raw is None:
return None
try:
return graph_from_dict(json.loads(raw))
except Exception as e:
logger.debug("Graph cache decode error: %s", e)
return None
def set_graph(self, repo_key: str, graph: RepoDependencyGraph) -> None:
key = f"graph:{repo_key}"
self._set(key, json.dumps(graph_to_dict(graph)))
# ββ Combined: parse + cache a whole repo ββββββββββββββββββββββββββββββββββ
def get_or_parse_repo(
self,
repo_root: Path,
repo_key: str,
force_reparse: bool = False,
) -> tuple[list[FileSymbols], RepoDependencyGraph]:
"""
High-level entry point: returns (symbols, graph) from cache or parses fresh.
Args:
repo_root: path to the cloned repository
repo_key: unique key e.g. 'django__django_abc1234' (repo + commit)
force_reparse: bypass cache entirely
Returns:
(file_symbols_list, dependency_graph)
"""
if not force_reparse:
cached_symbols = self.get_all_file_symbols(repo_key)
cached_graph = self.get_graph(repo_key)
if cached_symbols is not None and cached_graph is not None:
logger.info(
"Cache HIT for %s β %d files, %d graph nodes",
repo_key, len(cached_symbols), cached_graph.graph.number_of_nodes()
)
return cached_symbols, cached_graph
logger.info("Cache MISS for %s β parsing repo from scratch", repo_key)
# Parse all files
from ast_parser.python_parser import PythonASTParser
parser = PythonASTParser()
symbols = list(parser.parse_repo(repo_root))
# Build graph
graph = RepoDependencyGraph()
graph.build(symbols, repo_root)
# Write to cache
self.set_all_file_symbols(repo_key, symbols)
self.set_graph(repo_key, graph)
logger.info(
"Cached %d file symbols + graph (%d nodes) for %s",
len(symbols), graph.graph.number_of_nodes(), repo_key
)
return symbols, graph
# ββ Backend helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _get(self, key: str) -> Optional[str]:
if self._dc is not None:
return self._dc.get(key)
# Fallback: JSON file
p = self._json_path(key)
if p.exists():
return p.read_text()
return None
def _set(self, key: str, value: str) -> None:
if self._dc is not None:
self._dc.set(key, value)
else:
p = self._json_path(key)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(value)
def _json_path(self, key: str) -> Path:
"""Convert cache key to a safe filesystem path."""
safe = key.replace(":", "_").replace("/", "_").replace("\\", "_")
return self.cache_dir / "json_cache" / f"{safe}.json"
def invalidate_repo(self, repo_key: str) -> None:
"""Remove all cached data for a repo."""
for prefix in ("all_symbols", "graph"):
key = f"{prefix}:{repo_key}"
if self._dc is not None:
self._dc.delete(key)
else:
p = self._json_path(key)
if p.exists():
p.unlink()
logger.info("Cache invalidated for %s", repo_key)
|