""" Phase 3b - CodeT5p multilingual AI code detector. Uses Salesforce/codet5p-770m encoder + a custom binary classifier head trained by Gurioli et al. (2024, "Is This You, LLM?", SANER 2025). Checkpoint from: https://huggingface.co/spaces/isThisYouLLM/Human-Ai Supports 10 languages: C, C++, C#, Go, Java, JavaScript, Kotlin, Python, Ruby, Rust. Paper claims 84.1% average accuracy. Public API: phase3b_classify(code: str, language: str) -> dict returns { "p_ai": float, # probability that code is AI-generated "confidence": str, # "high" for native langs, "medium" otherwise "head_used": str, # always "multilingual" } Label convention note: Upstream sigmoid output: high (>=0.07) = Human, low (<0.07) = AI. We invert to p_ai = 1 - sigmoid_output for pipeline consistency. NOTE: outputs cluster near 0, so p_ai will cluster near 1. The orchestrator should NOT use 0.5 as the threshold. Use empirical calibration. """ from __future__ import annotations from pathlib import Path import torch import torch.nn as nn import transformers from transformers import AutoTokenizer, T5EncoderModel # --------------------------------------------------------------------------- # # Config # --------------------------------------------------------------------------- # ENCODER_NAME = "Salesforce/codet5p-770m" _THIS_DIR = Path(__file__).resolve().parent MODELS_DIR = _THIS_DIR / "models" CHECKPOINT = MODELS_DIR / "multilingual_checkpoint.bin" # Languages the model was trained on (per the paper) NATIVE_LANGUAGES = { "c", "cpp", "c++", "csharp", "c#", "go", "java", "javascript", "kotlin", "python", "ruby", "rust", } DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") # --------------------------------------------------------------------------- # # Model architecture - mirrors upstream app.py / PkAlvaro test_model.py # Following PkAlvaro's float32 approach (more compatible on Windows CPU than # the BFloat16 used in the Streamlit Space). # --------------------------------------------------------------------------- # class StylometerClassifier(nn.Module): def __init__(self, pretrained_encoder: nn.Module, dimensionality: int): super().__init__() self.modelBase = pretrained_encoder self.pre_classifier = nn.Linear(dimensionality, 768) self.activation = nn.ReLU() self.dropout = nn.Dropout(0.2) self.classifier = nn.Linear(768, 1) def forward(self, input_ids: torch.Tensor, attention_mask: torch.Tensor): outputs = self.modelBase(input_ids=input_ids, attention_mask=attention_mask) hidden_state = outputs[0] cls_output = hidden_state[:, 0] pooler = self.pre_classifier(cls_output) after_activation = self.activation(pooler) pooler_after_act = self.dropout(after_activation) logits = self.classifier(pooler_after_act) prob = torch.sigmoid(logits) return prob.squeeze(-1) # --------------------------------------------------------------------------- # # Lazy loaders # --------------------------------------------------------------------------- # _tokenizer = None _model = None def _get_tokenizer(): global _tokenizer if _tokenizer is None: _tokenizer = AutoTokenizer.from_pretrained(ENCODER_NAME, use_fast=False) return _tokenizer def _get_model(): global _model if _model is not None: return _model if not CHECKPOINT.exists(): raise FileNotFoundError( f"Checkpoint not found: {CHECKPOINT}\n" f"Download from:\n" f" https://huggingface.co/spaces/isThisYouLLM/Human-Ai/resolve/main/checkpoint.bin" ) # Encoder only (T5 is encoder-decoder; we discard the decoder) transformers.T5EncoderModel._keys_to_ignore_on_load_unexpected = ["decoder.*"] encoder = T5EncoderModel.from_pretrained(ENCODER_NAME) model = StylometerClassifier(encoder, dimensionality=encoder.shared.embedding_dim) state_dict = torch.load(CHECKPOINT, map_location=DEVICE) model.load_state_dict(state_dict) model.to(DEVICE) model.eval() _model = model return _model # --------------------------------------------------------------------------- # # Chunking - handles code longer than the model's max_length # --------------------------------------------------------------------------- # def _chunk_and_score(code: str) -> float: """ Tokenize once, split into max_length-sized chunks, score each, average. Returns the AVERAGED raw sigmoid output (probability of Human per upstream). """ tokenizer = _get_tokenizer() model = _get_model() max_len = tokenizer.model_max_length # Safety cap in case the tokenizer reports a huge value if max_len is None or max_len > 4096: max_len = 512 full_tokens = tokenizer(code, return_tensors="pt", truncation=False)["input_ids"][0] if len(full_tokens) <= max_len: # Single-chunk fast path inputs = tokenizer( code, return_tensors="pt", max_length=max_len, truncation=True ) with torch.no_grad(): prob = model( inputs["input_ids"].to(DEVICE), inputs["attention_mask"].to(DEVICE), ) return prob.cpu().item() # Multi-chunk: split, decode each chunk, re-tokenize with padding, average probs = [] for i in range(0, len(full_tokens), max_len): chunk_tokens = full_tokens[i : i + max_len] chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True) if not chunk_text.strip(): continue inputs = tokenizer( chunk_text, return_tensors="pt", max_length=max_len, truncation=True ) with torch.no_grad(): prob = model( inputs["input_ids"].to(DEVICE), inputs["attention_mask"].to(DEVICE), ) probs.append(prob.cpu().item()) if not probs: return 0.5 # default neutral if everything was empty return sum(probs) / len(probs) # --------------------------------------------------------------------------- # # Public API # --------------------------------------------------------------------------- # def phase3b_classify(code: str, language: str) -> dict: """ Score a code snippet with the multilingual CodeT5p stylometer. Returns: dict with keys p_ai (float in [0,1]), confidence ("high"|"medium"), head_used ("multilingual" | "none"). """ if not code or not code.strip(): return {"p_ai": 0.5, "confidence": "low", "head_used": "none"} lang = language.lower().strip() confidence = "high" if lang in NATIVE_LANGUAGES else "medium" prob_human = _chunk_and_score(code) p_ai = 1.0 - prob_human return {"p_ai": p_ai, "confidence": confidence, "head_used": "multilingual"} # --------------------------------------------------------------------------- # # Smoke test # --------------------------------------------------------------------------- # _HUMAN_PYTHON = """ def fib(n): a, b = 0, 1 for _ in range(n): a, b = b, a + b return a """ _AI_PYTHON = ''' def calculate_fibonacci_number(n: int) -> int: """ Calculate the nth Fibonacci number using an iterative approach. """ if n < 0: raise ValueError("Input must be a non-negative integer.") previous_value, current_value = 0, 1 for _ in range(n): previous_value, current_value = current_value, previous_value + current_value return previous_value ''' _HUMAN_JAVA = """ public class Fib { public static int fib(int n) { int a = 0, b = 1; for (int i = 0; i < n; i++) { int t = b; b = a + b; a = t; } return a; } } """ _AI_JAVA = """ public class FibonacciCalculator { /** * Calculates the nth Fibonacci number using an iterative approach. */ public static int calculateFibonacci(int n) { if (n < 0) { throw new IllegalArgumentException("Input must be non-negative."); } int previousValue = 0; int currentValue = 1; for (int i = 0; i < n; i++) { int temporary = currentValue; currentValue = previousValue + currentValue; previousValue = temporary; } return previousValue; } } """ _HUMAN_JS = """ function fib(n) { let a = 0, b = 1; for (let i = 0; i < n; i++) { [a, b] = [b, a + b]; } return a; } """ _AI_JS = """ function calculateFibonacciNumber(n) { if (n < 0) { throw new Error("Input must be a non-negative integer."); } let previousValue = 0; let currentValue = 1; for (let i = 0; i < n; i++) { const temporary = currentValue; currentValue = previousValue + currentValue; previousValue = temporary; } return previousValue; } """ _HUMAN_GO = """ package main func fib(n int) int { a, b := 0, 1 for i := 0; i < n; i++ { a, b = b, a+b } return a } """ _AI_GO = """ package main import "errors" // CalculateFibonacciNumber computes the nth Fibonacci number iteratively. // It returns an error if n is negative. func CalculateFibonacciNumber(n int) (int, error) { if n < 0 { return 0, errors.New("input must be a non-negative integer") } previousValue, currentValue := 0, 1 for i := 0; i < n; i++ { previousValue, currentValue = currentValue, previousValue+currentValue } return previousValue, nil } """ if __name__ == "__main__": print(f"Device: {DEVICE}") print(f"Models dir: {MODELS_DIR}") print() samples = [ ("HUMAN python", _HUMAN_PYTHON, "python"), ("AI python", _AI_PYTHON, "python"), ("HUMAN java", _HUMAN_JAVA, "java"), ("AI java", _AI_JAVA, "java"), ("HUMAN js", _HUMAN_JS, "javascript"), ("AI js", _AI_JS, "javascript"), ("HUMAN go", _HUMAN_GO, "go"), ("AI go", _AI_GO, "go"), ] print(f"{'Sample':20s} {'p_ai':>8s} {'conf':6s} verdict") print("-" * 60) human_p, ai_p = [], [] for label, code, lang in samples: try: result = phase3b_classify(code, lang) p = result["p_ai"] kind = "AI " if "AI" in label else "HUM" (ai_p if kind == "AI " else human_p).append(p) print(f"{label:20s} {p:8.4f} {result['confidence']:6s}") except FileNotFoundError as e: print(f"ERROR: {e}") break if human_p and ai_p: avg_h = sum(human_p) / len(human_p) avg_a = sum(ai_p) / len(ai_p) print("-" * 60) print(f"Avg human p_ai: {avg_h:.4f}") print(f"Avg AI p_ai: {avg_a:.4f}") print(f"Gap (AI - human): {avg_a - avg_h:+.4f}") print() if avg_a - avg_h > 0.05: print("VERDICT: Multilingual head shows real signal. Ship it.") elif avg_a - avg_h > -0.05: print("VERDICT: Signal weak/zero. Same as Phase 2 alone. Reconsider use.") else: print("VERDICT: Signal INVERTED. Likely broken or label flip needed.")