Spaces:
Sleeping
Sleeping
| """ | |
| Phase 3b - CodeT5p multilingual AI code detector. | |
| Uses Salesforce/codet5p-770m encoder + a custom binary classifier head | |
| trained by Gurioli et al. (2024, "Is This You, LLM?", SANER 2025). | |
| Checkpoint from: https://huggingface.co/spaces/isThisYouLLM/Human-Ai | |
| Supports 10 languages: C, C++, C#, Go, Java, JavaScript, Kotlin, Python, | |
| Ruby, Rust. Paper claims 84.1% average accuracy. | |
| Public API: | |
| phase3b_classify(code: str, language: str) -> dict | |
| returns { | |
| "p_ai": float, # probability that code is AI-generated | |
| "confidence": str, # "high" for native langs, "medium" otherwise | |
| "head_used": str, # always "multilingual" | |
| } | |
| Label convention note: | |
| Upstream sigmoid output: high (>=0.07) = Human, low (<0.07) = AI. | |
| We invert to p_ai = 1 - sigmoid_output for pipeline consistency. | |
| NOTE: outputs cluster near 0, so p_ai will cluster near 1. The orchestrator | |
| should NOT use 0.5 as the threshold. Use empirical calibration. | |
| """ | |
| from __future__ import annotations | |
| from pathlib import Path | |
| import torch | |
| import torch.nn as nn | |
| import transformers | |
| from transformers import AutoTokenizer, T5EncoderModel | |
| # --------------------------------------------------------------------------- # | |
| # Config | |
| # --------------------------------------------------------------------------- # | |
| ENCODER_NAME = "Salesforce/codet5p-770m" | |
| _THIS_DIR = Path(__file__).resolve().parent | |
| MODELS_DIR = _THIS_DIR / "models" | |
| CHECKPOINT = MODELS_DIR / "multilingual_checkpoint.bin" | |
| # Languages the model was trained on (per the paper) | |
| NATIVE_LANGUAGES = { | |
| "c", "cpp", "c++", "csharp", "c#", "go", "java", "javascript", | |
| "kotlin", "python", "ruby", "rust", | |
| } | |
| DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # --------------------------------------------------------------------------- # | |
| # Model architecture - mirrors upstream app.py / PkAlvaro test_model.py | |
| # Following PkAlvaro's float32 approach (more compatible on Windows CPU than | |
| # the BFloat16 used in the Streamlit Space). | |
| # --------------------------------------------------------------------------- # | |
| class StylometerClassifier(nn.Module): | |
| def __init__(self, pretrained_encoder: nn.Module, dimensionality: int): | |
| super().__init__() | |
| self.modelBase = pretrained_encoder | |
| self.pre_classifier = nn.Linear(dimensionality, 768) | |
| self.activation = nn.ReLU() | |
| self.dropout = nn.Dropout(0.2) | |
| self.classifier = nn.Linear(768, 1) | |
| def forward(self, input_ids: torch.Tensor, attention_mask: torch.Tensor): | |
| outputs = self.modelBase(input_ids=input_ids, attention_mask=attention_mask) | |
| hidden_state = outputs[0] | |
| cls_output = hidden_state[:, 0] | |
| pooler = self.pre_classifier(cls_output) | |
| after_activation = self.activation(pooler) | |
| pooler_after_act = self.dropout(after_activation) | |
| logits = self.classifier(pooler_after_act) | |
| prob = torch.sigmoid(logits) | |
| return prob.squeeze(-1) | |
| # --------------------------------------------------------------------------- # | |
| # Lazy loaders | |
| # --------------------------------------------------------------------------- # | |
| _tokenizer = None | |
| _model = None | |
| def _get_tokenizer(): | |
| global _tokenizer | |
| if _tokenizer is None: | |
| _tokenizer = AutoTokenizer.from_pretrained(ENCODER_NAME, use_fast=False) | |
| return _tokenizer | |
| def _get_model(): | |
| global _model | |
| if _model is not None: | |
| return _model | |
| if not CHECKPOINT.exists(): | |
| raise FileNotFoundError( | |
| f"Checkpoint not found: {CHECKPOINT}\n" | |
| f"Download from:\n" | |
| f" https://huggingface.co/spaces/isThisYouLLM/Human-Ai/resolve/main/checkpoint.bin" | |
| ) | |
| # Encoder only (T5 is encoder-decoder; we discard the decoder) | |
| transformers.T5EncoderModel._keys_to_ignore_on_load_unexpected = ["decoder.*"] | |
| encoder = T5EncoderModel.from_pretrained(ENCODER_NAME) | |
| model = StylometerClassifier(encoder, dimensionality=encoder.shared.embedding_dim) | |
| state_dict = torch.load(CHECKPOINT, map_location=DEVICE) | |
| model.load_state_dict(state_dict) | |
| model.to(DEVICE) | |
| model.eval() | |
| _model = model | |
| return _model | |
| # --------------------------------------------------------------------------- # | |
| # Chunking - handles code longer than the model's max_length | |
| # --------------------------------------------------------------------------- # | |
| def _chunk_and_score(code: str) -> float: | |
| """ | |
| Tokenize once, split into max_length-sized chunks, score each, average. | |
| Returns the AVERAGED raw sigmoid output (probability of Human per upstream). | |
| """ | |
| tokenizer = _get_tokenizer() | |
| model = _get_model() | |
| max_len = tokenizer.model_max_length | |
| # Safety cap in case the tokenizer reports a huge value | |
| if max_len is None or max_len > 4096: | |
| max_len = 512 | |
| full_tokens = tokenizer(code, return_tensors="pt", truncation=False)["input_ids"][0] | |
| if len(full_tokens) <= max_len: | |
| # Single-chunk fast path | |
| inputs = tokenizer( | |
| code, return_tensors="pt", max_length=max_len, truncation=True | |
| ) | |
| with torch.no_grad(): | |
| prob = model( | |
| inputs["input_ids"].to(DEVICE), | |
| inputs["attention_mask"].to(DEVICE), | |
| ) | |
| return prob.cpu().item() | |
| # Multi-chunk: split, decode each chunk, re-tokenize with padding, average | |
| probs = [] | |
| for i in range(0, len(full_tokens), max_len): | |
| chunk_tokens = full_tokens[i : i + max_len] | |
| chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True) | |
| if not chunk_text.strip(): | |
| continue | |
| inputs = tokenizer( | |
| chunk_text, return_tensors="pt", max_length=max_len, truncation=True | |
| ) | |
| with torch.no_grad(): | |
| prob = model( | |
| inputs["input_ids"].to(DEVICE), | |
| inputs["attention_mask"].to(DEVICE), | |
| ) | |
| probs.append(prob.cpu().item()) | |
| if not probs: | |
| return 0.5 # default neutral if everything was empty | |
| return sum(probs) / len(probs) | |
| # --------------------------------------------------------------------------- # | |
| # Public API | |
| # --------------------------------------------------------------------------- # | |
| def phase3b_classify(code: str, language: str) -> dict: | |
| """ | |
| Score a code snippet with the multilingual CodeT5p stylometer. | |
| Returns: | |
| dict with keys p_ai (float in [0,1]), confidence ("high"|"medium"), | |
| head_used ("multilingual" | "none"). | |
| """ | |
| if not code or not code.strip(): | |
| return {"p_ai": 0.5, "confidence": "low", "head_used": "none"} | |
| lang = language.lower().strip() | |
| confidence = "high" if lang in NATIVE_LANGUAGES else "medium" | |
| prob_human = _chunk_and_score(code) | |
| p_ai = 1.0 - prob_human | |
| return {"p_ai": p_ai, "confidence": confidence, "head_used": "multilingual"} | |
| # --------------------------------------------------------------------------- # | |
| # Smoke test | |
| # --------------------------------------------------------------------------- # | |
| _HUMAN_PYTHON = """ | |
| def fib(n): | |
| a, b = 0, 1 | |
| for _ in range(n): | |
| a, b = b, a + b | |
| return a | |
| """ | |
| _AI_PYTHON = ''' | |
| def calculate_fibonacci_number(n: int) -> int: | |
| """ | |
| Calculate the nth Fibonacci number using an iterative approach. | |
| """ | |
| if n < 0: | |
| raise ValueError("Input must be a non-negative integer.") | |
| previous_value, current_value = 0, 1 | |
| for _ in range(n): | |
| previous_value, current_value = current_value, previous_value + current_value | |
| return previous_value | |
| ''' | |
| _HUMAN_JAVA = """ | |
| public class Fib { | |
| public static int fib(int n) { | |
| int a = 0, b = 1; | |
| for (int i = 0; i < n; i++) { | |
| int t = b; | |
| b = a + b; | |
| a = t; | |
| } | |
| return a; | |
| } | |
| } | |
| """ | |
| _AI_JAVA = """ | |
| public class FibonacciCalculator { | |
| /** | |
| * Calculates the nth Fibonacci number using an iterative approach. | |
| */ | |
| public static int calculateFibonacci(int n) { | |
| if (n < 0) { | |
| throw new IllegalArgumentException("Input must be non-negative."); | |
| } | |
| int previousValue = 0; | |
| int currentValue = 1; | |
| for (int i = 0; i < n; i++) { | |
| int temporary = currentValue; | |
| currentValue = previousValue + currentValue; | |
| previousValue = temporary; | |
| } | |
| return previousValue; | |
| } | |
| } | |
| """ | |
| _HUMAN_JS = """ | |
| function fib(n) { | |
| let a = 0, b = 1; | |
| for (let i = 0; i < n; i++) { | |
| [a, b] = [b, a + b]; | |
| } | |
| return a; | |
| } | |
| """ | |
| _AI_JS = """ | |
| function calculateFibonacciNumber(n) { | |
| if (n < 0) { | |
| throw new Error("Input must be a non-negative integer."); | |
| } | |
| let previousValue = 0; | |
| let currentValue = 1; | |
| for (let i = 0; i < n; i++) { | |
| const temporary = currentValue; | |
| currentValue = previousValue + currentValue; | |
| previousValue = temporary; | |
| } | |
| return previousValue; | |
| } | |
| """ | |
| _HUMAN_GO = """ | |
| package main | |
| func fib(n int) int { | |
| a, b := 0, 1 | |
| for i := 0; i < n; i++ { | |
| a, b = b, a+b | |
| } | |
| return a | |
| } | |
| """ | |
| _AI_GO = """ | |
| package main | |
| import "errors" | |
| // CalculateFibonacciNumber computes the nth Fibonacci number iteratively. | |
| // It returns an error if n is negative. | |
| func CalculateFibonacciNumber(n int) (int, error) { | |
| if n < 0 { | |
| return 0, errors.New("input must be a non-negative integer") | |
| } | |
| previousValue, currentValue := 0, 1 | |
| for i := 0; i < n; i++ { | |
| previousValue, currentValue = currentValue, previousValue+currentValue | |
| } | |
| return previousValue, nil | |
| } | |
| """ | |
| if __name__ == "__main__": | |
| print(f"Device: {DEVICE}") | |
| print(f"Models dir: {MODELS_DIR}") | |
| print() | |
| samples = [ | |
| ("HUMAN python", _HUMAN_PYTHON, "python"), | |
| ("AI python", _AI_PYTHON, "python"), | |
| ("HUMAN java", _HUMAN_JAVA, "java"), | |
| ("AI java", _AI_JAVA, "java"), | |
| ("HUMAN js", _HUMAN_JS, "javascript"), | |
| ("AI js", _AI_JS, "javascript"), | |
| ("HUMAN go", _HUMAN_GO, "go"), | |
| ("AI go", _AI_GO, "go"), | |
| ] | |
| print(f"{'Sample':20s} {'p_ai':>8s} {'conf':6s} verdict") | |
| print("-" * 60) | |
| human_p, ai_p = [], [] | |
| for label, code, lang in samples: | |
| try: | |
| result = phase3b_classify(code, lang) | |
| p = result["p_ai"] | |
| kind = "AI " if "AI" in label else "HUM" | |
| (ai_p if kind == "AI " else human_p).append(p) | |
| print(f"{label:20s} {p:8.4f} {result['confidence']:6s}") | |
| except FileNotFoundError as e: | |
| print(f"ERROR: {e}") | |
| break | |
| if human_p and ai_p: | |
| avg_h = sum(human_p) / len(human_p) | |
| avg_a = sum(ai_p) / len(ai_p) | |
| print("-" * 60) | |
| print(f"Avg human p_ai: {avg_h:.4f}") | |
| print(f"Avg AI p_ai: {avg_a:.4f}") | |
| print(f"Gap (AI - human): {avg_a - avg_h:+.4f}") | |
| print() | |
| if avg_a - avg_h > 0.05: | |
| print("VERDICT: Multilingual head shows real signal. Ship it.") | |
| elif avg_a - avg_h > -0.05: | |
| print("VERDICT: Signal weak/zero. Same as Phase 2 alone. Reconsider use.") | |
| else: | |
| print("VERDICT: Signal INVERTED. Likely broken or label flip needed.") |