esicodehub-ai / phase3 /classifier_b.py
WissalllK's picture
Add ESIcodeHub AI detection service
a937307
"""
Phase 3b - CodeT5p multilingual AI code detector.
Uses Salesforce/codet5p-770m encoder + a custom binary classifier head
trained by Gurioli et al. (2024, "Is This You, LLM?", SANER 2025).
Checkpoint from: https://huggingface.co/spaces/isThisYouLLM/Human-Ai
Supports 10 languages: C, C++, C#, Go, Java, JavaScript, Kotlin, Python,
Ruby, Rust. Paper claims 84.1% average accuracy.
Public API:
phase3b_classify(code: str, language: str) -> dict
returns {
"p_ai": float, # probability that code is AI-generated
"confidence": str, # "high" for native langs, "medium" otherwise
"head_used": str, # always "multilingual"
}
Label convention note:
Upstream sigmoid output: high (>=0.07) = Human, low (<0.07) = AI.
We invert to p_ai = 1 - sigmoid_output for pipeline consistency.
NOTE: outputs cluster near 0, so p_ai will cluster near 1. The orchestrator
should NOT use 0.5 as the threshold. Use empirical calibration.
"""
from __future__ import annotations
from pathlib import Path
import torch
import torch.nn as nn
import transformers
from transformers import AutoTokenizer, T5EncoderModel
# --------------------------------------------------------------------------- #
# Config
# --------------------------------------------------------------------------- #
ENCODER_NAME = "Salesforce/codet5p-770m"
_THIS_DIR = Path(__file__).resolve().parent
MODELS_DIR = _THIS_DIR / "models"
CHECKPOINT = MODELS_DIR / "multilingual_checkpoint.bin"
# Languages the model was trained on (per the paper)
NATIVE_LANGUAGES = {
"c", "cpp", "c++", "csharp", "c#", "go", "java", "javascript",
"kotlin", "python", "ruby", "rust",
}
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# --------------------------------------------------------------------------- #
# Model architecture - mirrors upstream app.py / PkAlvaro test_model.py
# Following PkAlvaro's float32 approach (more compatible on Windows CPU than
# the BFloat16 used in the Streamlit Space).
# --------------------------------------------------------------------------- #
class StylometerClassifier(nn.Module):
def __init__(self, pretrained_encoder: nn.Module, dimensionality: int):
super().__init__()
self.modelBase = pretrained_encoder
self.pre_classifier = nn.Linear(dimensionality, 768)
self.activation = nn.ReLU()
self.dropout = nn.Dropout(0.2)
self.classifier = nn.Linear(768, 1)
def forward(self, input_ids: torch.Tensor, attention_mask: torch.Tensor):
outputs = self.modelBase(input_ids=input_ids, attention_mask=attention_mask)
hidden_state = outputs[0]
cls_output = hidden_state[:, 0]
pooler = self.pre_classifier(cls_output)
after_activation = self.activation(pooler)
pooler_after_act = self.dropout(after_activation)
logits = self.classifier(pooler_after_act)
prob = torch.sigmoid(logits)
return prob.squeeze(-1)
# --------------------------------------------------------------------------- #
# Lazy loaders
# --------------------------------------------------------------------------- #
_tokenizer = None
_model = None
def _get_tokenizer():
global _tokenizer
if _tokenizer is None:
_tokenizer = AutoTokenizer.from_pretrained(ENCODER_NAME, use_fast=False)
return _tokenizer
def _get_model():
global _model
if _model is not None:
return _model
if not CHECKPOINT.exists():
raise FileNotFoundError(
f"Checkpoint not found: {CHECKPOINT}\n"
f"Download from:\n"
f" https://huggingface.co/spaces/isThisYouLLM/Human-Ai/resolve/main/checkpoint.bin"
)
# Encoder only (T5 is encoder-decoder; we discard the decoder)
transformers.T5EncoderModel._keys_to_ignore_on_load_unexpected = ["decoder.*"]
encoder = T5EncoderModel.from_pretrained(ENCODER_NAME)
model = StylometerClassifier(encoder, dimensionality=encoder.shared.embedding_dim)
state_dict = torch.load(CHECKPOINT, map_location=DEVICE)
model.load_state_dict(state_dict)
model.to(DEVICE)
model.eval()
_model = model
return _model
# --------------------------------------------------------------------------- #
# Chunking - handles code longer than the model's max_length
# --------------------------------------------------------------------------- #
def _chunk_and_score(code: str) -> float:
"""
Tokenize once, split into max_length-sized chunks, score each, average.
Returns the AVERAGED raw sigmoid output (probability of Human per upstream).
"""
tokenizer = _get_tokenizer()
model = _get_model()
max_len = tokenizer.model_max_length
# Safety cap in case the tokenizer reports a huge value
if max_len is None or max_len > 4096:
max_len = 512
full_tokens = tokenizer(code, return_tensors="pt", truncation=False)["input_ids"][0]
if len(full_tokens) <= max_len:
# Single-chunk fast path
inputs = tokenizer(
code, return_tensors="pt", max_length=max_len, truncation=True
)
with torch.no_grad():
prob = model(
inputs["input_ids"].to(DEVICE),
inputs["attention_mask"].to(DEVICE),
)
return prob.cpu().item()
# Multi-chunk: split, decode each chunk, re-tokenize with padding, average
probs = []
for i in range(0, len(full_tokens), max_len):
chunk_tokens = full_tokens[i : i + max_len]
chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True)
if not chunk_text.strip():
continue
inputs = tokenizer(
chunk_text, return_tensors="pt", max_length=max_len, truncation=True
)
with torch.no_grad():
prob = model(
inputs["input_ids"].to(DEVICE),
inputs["attention_mask"].to(DEVICE),
)
probs.append(prob.cpu().item())
if not probs:
return 0.5 # default neutral if everything was empty
return sum(probs) / len(probs)
# --------------------------------------------------------------------------- #
# Public API
# --------------------------------------------------------------------------- #
def phase3b_classify(code: str, language: str) -> dict:
"""
Score a code snippet with the multilingual CodeT5p stylometer.
Returns:
dict with keys p_ai (float in [0,1]), confidence ("high"|"medium"),
head_used ("multilingual" | "none").
"""
if not code or not code.strip():
return {"p_ai": 0.5, "confidence": "low", "head_used": "none"}
lang = language.lower().strip()
confidence = "high" if lang in NATIVE_LANGUAGES else "medium"
prob_human = _chunk_and_score(code)
p_ai = 1.0 - prob_human
return {"p_ai": p_ai, "confidence": confidence, "head_used": "multilingual"}
# --------------------------------------------------------------------------- #
# Smoke test
# --------------------------------------------------------------------------- #
_HUMAN_PYTHON = """
def fib(n):
a, b = 0, 1
for _ in range(n):
a, b = b, a + b
return a
"""
_AI_PYTHON = '''
def calculate_fibonacci_number(n: int) -> int:
"""
Calculate the nth Fibonacci number using an iterative approach.
"""
if n < 0:
raise ValueError("Input must be a non-negative integer.")
previous_value, current_value = 0, 1
for _ in range(n):
previous_value, current_value = current_value, previous_value + current_value
return previous_value
'''
_HUMAN_JAVA = """
public class Fib {
public static int fib(int n) {
int a = 0, b = 1;
for (int i = 0; i < n; i++) {
int t = b;
b = a + b;
a = t;
}
return a;
}
}
"""
_AI_JAVA = """
public class FibonacciCalculator {
/**
* Calculates the nth Fibonacci number using an iterative approach.
*/
public static int calculateFibonacci(int n) {
if (n < 0) {
throw new IllegalArgumentException("Input must be non-negative.");
}
int previousValue = 0;
int currentValue = 1;
for (int i = 0; i < n; i++) {
int temporary = currentValue;
currentValue = previousValue + currentValue;
previousValue = temporary;
}
return previousValue;
}
}
"""
_HUMAN_JS = """
function fib(n) {
let a = 0, b = 1;
for (let i = 0; i < n; i++) {
[a, b] = [b, a + b];
}
return a;
}
"""
_AI_JS = """
function calculateFibonacciNumber(n) {
if (n < 0) {
throw new Error("Input must be a non-negative integer.");
}
let previousValue = 0;
let currentValue = 1;
for (let i = 0; i < n; i++) {
const temporary = currentValue;
currentValue = previousValue + currentValue;
previousValue = temporary;
}
return previousValue;
}
"""
_HUMAN_GO = """
package main
func fib(n int) int {
a, b := 0, 1
for i := 0; i < n; i++ {
a, b = b, a+b
}
return a
}
"""
_AI_GO = """
package main
import "errors"
// CalculateFibonacciNumber computes the nth Fibonacci number iteratively.
// It returns an error if n is negative.
func CalculateFibonacciNumber(n int) (int, error) {
if n < 0 {
return 0, errors.New("input must be a non-negative integer")
}
previousValue, currentValue := 0, 1
for i := 0; i < n; i++ {
previousValue, currentValue = currentValue, previousValue+currentValue
}
return previousValue, nil
}
"""
if __name__ == "__main__":
print(f"Device: {DEVICE}")
print(f"Models dir: {MODELS_DIR}")
print()
samples = [
("HUMAN python", _HUMAN_PYTHON, "python"),
("AI python", _AI_PYTHON, "python"),
("HUMAN java", _HUMAN_JAVA, "java"),
("AI java", _AI_JAVA, "java"),
("HUMAN js", _HUMAN_JS, "javascript"),
("AI js", _AI_JS, "javascript"),
("HUMAN go", _HUMAN_GO, "go"),
("AI go", _AI_GO, "go"),
]
print(f"{'Sample':20s} {'p_ai':>8s} {'conf':6s} verdict")
print("-" * 60)
human_p, ai_p = [], []
for label, code, lang in samples:
try:
result = phase3b_classify(code, lang)
p = result["p_ai"]
kind = "AI " if "AI" in label else "HUM"
(ai_p if kind == "AI " else human_p).append(p)
print(f"{label:20s} {p:8.4f} {result['confidence']:6s}")
except FileNotFoundError as e:
print(f"ERROR: {e}")
break
if human_p and ai_p:
avg_h = sum(human_p) / len(human_p)
avg_a = sum(ai_p) / len(ai_p)
print("-" * 60)
print(f"Avg human p_ai: {avg_h:.4f}")
print(f"Avg AI p_ai: {avg_a:.4f}")
print(f"Gap (AI - human): {avg_a - avg_h:+.4f}")
print()
if avg_a - avg_h > 0.05:
print("VERDICT: Multilingual head shows real signal. Ship it.")
elif avg_a - avg_h > -0.05:
print("VERDICT: Signal weak/zero. Same as Phase 2 alone. Reconsider use.")
else:
print("VERDICT: Signal INVERTED. Likely broken or label flip needed.")