pangweijlu's picture
Upload clustering.py with huggingface_hub
c597362 verified
"""
Paper Mill Detection via Clustering Analysis
Clusters fraudulent papers to identify organized paper mills.
"""
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import DBSCAN
import hdbscan
from typing import List, Dict, Optional
import warnings
warnings.filterwarnings('ignore')
class StylometricFeatures:
"""Extract stylometric features for paper mill detection."""
@staticmethod
def extract(text: str) -> Dict[str, float]:
import re
from collections import Counter
words = re.findall(r'\b\w+\b', text.lower())
sentences = [s.strip() for s in re.split(r'[.!?]+', text) if s.strip()]
if not words:
return {}
n_words = len(words)
n_sentences = len(sentences)
unique_words = len(set(words))
ttr = unique_words / n_words if n_words > 0 else 0
avg_sent_len = n_words / n_sentences if n_sentences > 0 else 0
word_lengths = [len(w) for w in words]
avg_word_len = np.mean(word_lengths)
bigrams = [tuple(words[i:i+2]) for i in range(len(words)-1)]
bigram_counts = Counter(bigrams)
total_bigrams = len(bigrams)
if total_bigrams > 0:
probs = np.array(list(bigram_counts.values())) / total_bigrams
bigram_entropy = -np.sum(probs * np.log2(probs + 1e-10))
else:
bigram_entropy = 0
punct_count = len(re.findall(r'[^\w\s]', text))
punct_ratio = punct_count / len(text) if len(text) > 0 else 0
digit_count = len(re.findall(r'\d', text))
digit_ratio = digit_count / len(text) if len(text) > 0 else 0
return {
'ttr': ttr, 'avg_sent_len': avg_sent_len, 'avg_word_len': avg_word_len,
'bigram_entropy': bigram_entropy, 'punct_ratio': punct_ratio,
'digit_ratio': digit_ratio, 'n_words': n_words, 'n_sentences': n_sentences
}
class VisualHashFeatures:
"""Extract visual features for detecting template-based paper mills."""
@staticmethod
def compute_perceptual_hash(image) -> str:
try:
import cv2
if isinstance(image, str):
img = cv2.imread(image, cv2.IMREAD_GRAYSCALE)
else:
img = np.array(image.convert('L'))
img = cv2.resize(img, (32, 32))
dct = cv2.dct(np.float32(img))
dct_low = dct[:8, :8]
avg = dct_low.mean()
bits = (dct_low > avg).flatten().astype(int)
return ''.join(map(str, bits))
except Exception:
return None
@staticmethod
def hamming_distance(hash1: str, hash2: str) -> int:
if hash1 is None or hash2 is None or len(hash1) != len(hash2):
return 64
return sum(c1 != c2 for c1, c2 in zip(hash1, hash2))
class MetadataPatternFeatures:
"""Extract metadata patterns indicative of paper mills."""
@staticmethod
def extract(papers: List[Dict]) -> np.ndarray:
features = []
for paper in papers:
feat = {}
authors = paper.get('authors', '')
n_authors = len(authors.split(';')) if isinstance(authors, str) else len(authors)
feat['n_authors'] = n_authors
affiliations = paper.get('affiliations', '')
n_affiliations = len(affiliations.split(';')) if isinstance(affiliations, str) else len(affiliations) if affiliations else 0
feat['n_affiliations'] = n_affiliations
feat['author_affil_ratio'] = n_authors / max(n_affiliations, 1)
cited_by = paper.get('cited_by_count', 0) or 0
references = paper.get('reference_count', 0) or 0
feat['cited_by'] = cited_by
feat['references'] = references
feat['cite_ratio'] = cited_by / max(references, 1)
n_grants = paper.get('n_grants', 0) or 0
feat['has_grants'] = 1 if n_grants > 0 else 0
journal = paper.get('journal', '')
feat['journal_len'] = len(str(journal))
features.append(list(feat.values()))
return np.array(features, dtype=np.float32)
class PaperMillClustering:
"""
Detects paper mills by clustering fraudulent papers based on
combined multimodal embeddings + stylometric + metadata features.
"""
def __init__(self, min_cluster_size: int = 5, min_samples: int = 3, eps: float = 0.5):
self.min_cluster_size = min_cluster_size
self.min_samples = min_samples
self.eps = eps
self.scaler = StandardScaler()
self.clusterer = None
self.labels_ = None
def fit(self, embeddings: np.ndarray,
stylometric: Optional[np.ndarray] = None,
metadata: Optional[np.ndarray] = None):
features = [embeddings]
if stylometric is not None:
features.append(stylometric)
if metadata is not None:
features.append(metadata)
combined = np.concatenate(features, axis=1)
combined = self.scaler.fit_transform(combined)
try:
self.clusterer = hdbscan.HDBSCAN(
min_cluster_size=self.min_cluster_size,
min_samples=self.min_samples, metric='euclidean'
)
self.labels_ = self.clusterer.fit_predict(combined)
except Exception:
self.clusterer = DBSCAN(eps=self.eps, min_samples=self.min_samples)
self.labels_ = self.clusterer.fit_predict(combined)
return self
def get_cluster_stats(self) -> Dict:
if self.labels_ is None:
return {}
n_clusters = len(set(self.labels_)) - (1 if -1 in self.labels_ else 0)
n_noise = list(self.labels_).count(-1)
cluster_sizes = {}
for label in set(self.labels_):
if label != -1:
cluster_sizes[int(label)] = int(np.sum(self.labels_ == label))
return {
'n_clusters': n_clusters, 'n_noise': n_noise,
'cluster_sizes': cluster_sizes, 'noise_ratio': n_noise / len(self.labels_)
}
def get_paper_mill_scores(self) -> np.ndarray:
if self.labels_ is None:
return np.zeros(len(self.labels_))
scores = np.zeros(len(self.labels_))
for label in set(self.labels_):
if label == -1:
continue
mask = self.labels_ == label
cluster_size = np.sum(mask)
scores[mask] = cluster_size / len(self.labels_)
return scores