esicodehub-ai / phase2 /preprocess.py
WissalllK's picture
Add ESIcodeHub AI detection service
a937307
"""
preprocess.py
=============
Strip comments, docstrings, and blank lines from Python source.
Used to normalize code BEFORE embedding so the cosine-similarity step
isn't dominated by surface artifacts ("code with comments" vs
"code without comments") instead of the actual logic.
Public function:
strip(code: str) -> str
Run this file directly to execute the unit tests:
python preprocess.py
"""
import ast
import io
import sys
import tokenize
# ---------------------------------------------------------------------------
# CORE
# ---------------------------------------------------------------------------
def _remove_comment_tokens(code: str) -> str:
"""Drop tokens of type COMMENT using Python's own tokenizer.
Safe against `#` inside strings because tokenize knows the difference."""
if not code.strip():
return code
# Collect (start_pos, end_pos) of every comment token.
comment_ranges = []
try:
tokens = tokenize.generate_tokens(io.StringIO(code).readline)
for tok in tokens:
if tok.type == tokenize.COMMENT:
comment_ranges.append((tok.start, tok.end))
except (tokenize.TokenError, Exception):
# Source has lexer-level issues. Return as-is rather than corrupt it.
return code
if not comment_ranges:
return code
# Rebuild line-by-line, deleting the comment slice from each affected line.
# tokenize positions are (row, col) with row 1-indexed.
lines = code.splitlines(keepends=True)
# Group by line so we delete from the rightmost comment first
# (deleting left-first would shift columns of subsequent ones).
by_line: dict[int, list] = {}
for (sr, sc), (er, ec) in comment_ranges:
by_line.setdefault(sr, []).append((sc, ec, sr == er))
for row, ranges in by_line.items():
if row - 1 >= len(lines):
continue
line = lines[row - 1]
# Process rightmost first.
for sc, ec, single_line in sorted(ranges, key=lambda x: -x[0]):
if single_line:
# Cut from sc to ec; preserve trailing newline if present.
line = line[:sc].rstrip() + ("\n" if line.endswith("\n") else "")
else:
line = line[:sc].rstrip() + ("\n" if line.endswith("\n") else "")
lines[row - 1] = line
return "".join(lines)
def _remove_docstrings(code: str) -> str:
"""Walk the AST. For Module/FunctionDef/AsyncFunctionDef/ClassDef nodes,
if the first statement is a bare string-literal expression, that's the
docstring -- replace it with a `pass` to keep the parent body legal.
We use AST mutation + ast.unparse rather than line-deletion because
ast.unparse rebuilds source faithfully and handles every edge case
(single-line docstrings, raw strings, f-strings used as docstrings, etc.).
"""
if not code.strip():
return code
try:
tree = ast.parse(code)
except SyntaxError:
# Can't parse -> can't safely modify. Return original.
return code
docstring_node_types = (
ast.Module, ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef,
)
for node in ast.walk(tree):
if not isinstance(node, docstring_node_types):
continue
if not node.body:
continue
first = node.body[0]
# A docstring is an Expr node whose value is a Constant str.
if (isinstance(first, ast.Expr)
and isinstance(first.value, ast.Constant)
and isinstance(first.value.value, str)):
if isinstance(node, ast.Module):
# Module docstrings: always safe to remove.
node.body.pop(0)
elif len(node.body) == 1:
# Docstring is the ONLY statement — replace with pass
# so the function/class body stays syntactically legal.
node.body[0] = ast.Pass()
else:
# Docstring followed by real code — just remove it.
# No pass needed; real code keeps the body legal.
node.body.pop(0)
try:
return ast.unparse(tree)
except Exception:
# ast.unparse failed (very rare). Return original.
return code
def _remove_blank_lines(code: str) -> str:
"""Drop lines that are empty or only whitespace."""
return "\n".join(
line for line in code.splitlines() if line.strip()
)
def strip(code: str) -> str:
"""Strip comments, docstrings, and blank lines.
Order matters: docstrings first (AST-based, needs valid syntax),
then comments (token-based), then blank lines (string-based)."""
code = _remove_docstrings(code)
code = _remove_comment_tokens(code)
code = _remove_blank_lines(code)
return code
# ---------------------------------------------------------------------------
# UNIT TESTS
# ---------------------------------------------------------------------------
def _check(name: str, src: str, must_contain=None, must_not_contain=None,
must_be_empty=False, must_parse=True):
"""Run strip() on src and verify expectations."""
try:
result = strip(src)
except Exception as e:
print(f" [FAIL] {name}: strip() raised {type(e).__name__}: {e}")
return False
failures = []
if must_be_empty and result.strip():
failures.append(f"expected empty, got: {result!r}")
if must_contain:
for needle in must_contain:
if needle not in result:
failures.append(f"missing: {needle!r}")
if must_not_contain:
for needle in must_not_contain:
if needle in result:
failures.append(f"should not contain: {needle!r}")
if must_parse and result.strip():
try:
ast.parse(result)
except SyntaxError as e:
failures.append(f"output does not parse: {e}")
if failures:
print(f" [FAIL] {name}")
for f in failures:
print(f" {f}")
print(f" output was:\n "
+ result.replace("\n", "\n "))
return False
print(f" [ OK ] {name}")
return True
def run_tests():
print("=" * 70)
print("UNIT TESTS")
print("=" * 70)
passed = 0
total = 0
cases = [
# 1. Plain comment
("plain_comment",
"# this is a comment\nx = 1\n",
["x = 1"], ["this is a comment"]),
# 2. Inline comment
("inline_comment",
"x = 1 # inline\ny = 2\n",
["x = 1", "y = 2"], ["inline"]),
# 3. # inside a string -- MUST NOT be stripped
# (ast.unparse may normalize quote style, so check content only)
("hash_in_string",
'print("# not a comment")\n',
["# not a comment"], None),
# 4. # in URL string
("hash_in_url",
'url = "https://example.com#anchor"\nprint(url)\n',
["#anchor"], None),
# 5. Module-level docstring
("module_docstring",
'"""this is a module docstring"""\nx = 1\n',
["x = 1"], ["module docstring"]),
# 6. Function docstring
("function_docstring",
'def f():\n """fn docstring"""\n return 1\n',
["def f", "return 1"], ["fn docstring"]),
# 7. Class docstring
("class_docstring",
'class C:\n """class docstring"""\n x = 1\n',
["class C", "x = 1"], ["class docstring"]),
# 8. Triple-quoted string assigned to variable -- MUST be kept
("triple_quoted_value",
'x = """real value"""\nprint(x)\n',
["real value"], None),
# 9. Blank lines between code
("blank_lines",
"x = 1\n\n\ny = 2\n",
["x = 1", "y = 2"], None),
# 10. Indented inline comment
("indented_inline",
"if True:\n x = 1 # inner comment\n",
["x = 1"], ["inner comment"]),
# 11. Mixed: comments + docstring + blank lines
("mixed",
'"""module doc"""\n\n# top comment\ndef f():\n """fn doc"""\n x = 1 # inline\n return x\n\n',
["def f", "x = 1", "return x"],
["module doc", "fn doc", "top comment", "inline"]),
# 12. f-string with # in format spec
("fstring_format",
'x = 255\nprint(f"{x:#x}")\n',
["#x"], None),
# 13. Comment-only file
("comment_only",
"# only a comment\n# another\n",
None, None, True), # must_be_empty
# 14. Empty file
("empty",
"",
None, None, True),
# 15. Whitespace-only file
("whitespace_only",
" \n \n\n",
None, None, True),
]
for case in cases:
if len(case) == 4:
name, src, must, must_not = case
ok = _check(name, src, must_contain=must, must_not_contain=must_not)
else:
name, src, must, must_not, must_empty = case
ok = _check(name, src, must_contain=must,
must_not_contain=must_not, must_be_empty=must_empty)
passed += int(ok)
total += 1
print()
print(f"{passed}/{total} passed")
return passed == total
def run_apps_demo():
"""Show before/after on the 5 cached APPS samples."""
import json
from pathlib import Path
cache = Path("stress_samples.json")
if not cache.exists():
print("\n(stress_samples.json not found, skipping APPS demo)")
return
print()
print("=" * 70)
print("DEMO ON CACHED APPS SAMPLES (before/after line counts)")
print("=" * 70)
samples = json.loads(cache.read_text(encoding="utf-8"))
for s in samples:
before_lines = len(s["code"].splitlines())
stripped = strip(s["code"])
after_lines = len(stripped.splitlines())
# Verify it still parses.
try:
ast.parse(stripped)
parse_ok = "yes"
except SyntaxError:
parse_ok = "NO -- BROKEN"
sid = f"{s['category']}_{s['problem_id']}"
print(f" {sid:<22s} before={before_lines:>3d} "
f"after={after_lines:>3d} parses={parse_ok}")
if __name__ == "__main__":
ok = run_tests()
run_apps_demo()
sys.exit(0 if ok else 1)