""" Paper Mill Detection via Clustering Analysis Clusters fraudulent papers to identify organized paper mills. """ import numpy as np from sklearn.preprocessing import StandardScaler from sklearn.cluster import DBSCAN import hdbscan from typing import List, Dict, Optional import warnings warnings.filterwarnings('ignore') class StylometricFeatures: """Extract stylometric features for paper mill detection.""" @staticmethod def extract(text: str) -> Dict[str, float]: import re from collections import Counter words = re.findall(r'\b\w+\b', text.lower()) sentences = [s.strip() for s in re.split(r'[.!?]+', text) if s.strip()] if not words: return {} n_words = len(words) n_sentences = len(sentences) unique_words = len(set(words)) ttr = unique_words / n_words if n_words > 0 else 0 avg_sent_len = n_words / n_sentences if n_sentences > 0 else 0 word_lengths = [len(w) for w in words] avg_word_len = np.mean(word_lengths) bigrams = [tuple(words[i:i+2]) for i in range(len(words)-1)] bigram_counts = Counter(bigrams) total_bigrams = len(bigrams) if total_bigrams > 0: probs = np.array(list(bigram_counts.values())) / total_bigrams bigram_entropy = -np.sum(probs * np.log2(probs + 1e-10)) else: bigram_entropy = 0 punct_count = len(re.findall(r'[^\w\s]', text)) punct_ratio = punct_count / len(text) if len(text) > 0 else 0 digit_count = len(re.findall(r'\d', text)) digit_ratio = digit_count / len(text) if len(text) > 0 else 0 return { 'ttr': ttr, 'avg_sent_len': avg_sent_len, 'avg_word_len': avg_word_len, 'bigram_entropy': bigram_entropy, 'punct_ratio': punct_ratio, 'digit_ratio': digit_ratio, 'n_words': n_words, 'n_sentences': n_sentences } class VisualHashFeatures: """Extract visual features for detecting template-based paper mills.""" @staticmethod def compute_perceptual_hash(image) -> str: try: import cv2 if isinstance(image, str): img = cv2.imread(image, cv2.IMREAD_GRAYSCALE) else: img = np.array(image.convert('L')) img = cv2.resize(img, (32, 32)) dct = cv2.dct(np.float32(img)) dct_low = dct[:8, :8] avg = dct_low.mean() bits = (dct_low > avg).flatten().astype(int) return ''.join(map(str, bits)) except Exception: return None @staticmethod def hamming_distance(hash1: str, hash2: str) -> int: if hash1 is None or hash2 is None or len(hash1) != len(hash2): return 64 return sum(c1 != c2 for c1, c2 in zip(hash1, hash2)) class MetadataPatternFeatures: """Extract metadata patterns indicative of paper mills.""" @staticmethod def extract(papers: List[Dict]) -> np.ndarray: features = [] for paper in papers: feat = {} authors = paper.get('authors', '') n_authors = len(authors.split(';')) if isinstance(authors, str) else len(authors) feat['n_authors'] = n_authors affiliations = paper.get('affiliations', '') n_affiliations = len(affiliations.split(';')) if isinstance(affiliations, str) else len(affiliations) if affiliations else 0 feat['n_affiliations'] = n_affiliations feat['author_affil_ratio'] = n_authors / max(n_affiliations, 1) cited_by = paper.get('cited_by_count', 0) or 0 references = paper.get('reference_count', 0) or 0 feat['cited_by'] = cited_by feat['references'] = references feat['cite_ratio'] = cited_by / max(references, 1) n_grants = paper.get('n_grants', 0) or 0 feat['has_grants'] = 1 if n_grants > 0 else 0 journal = paper.get('journal', '') feat['journal_len'] = len(str(journal)) features.append(list(feat.values())) return np.array(features, dtype=np.float32) class PaperMillClustering: """ Detects paper mills by clustering fraudulent papers based on combined multimodal embeddings + stylometric + metadata features. """ def __init__(self, min_cluster_size: int = 5, min_samples: int = 3, eps: float = 0.5): self.min_cluster_size = min_cluster_size self.min_samples = min_samples self.eps = eps self.scaler = StandardScaler() self.clusterer = None self.labels_ = None def fit(self, embeddings: np.ndarray, stylometric: Optional[np.ndarray] = None, metadata: Optional[np.ndarray] = None): features = [embeddings] if stylometric is not None: features.append(stylometric) if metadata is not None: features.append(metadata) combined = np.concatenate(features, axis=1) combined = self.scaler.fit_transform(combined) try: self.clusterer = hdbscan.HDBSCAN( min_cluster_size=self.min_cluster_size, min_samples=self.min_samples, metric='euclidean' ) self.labels_ = self.clusterer.fit_predict(combined) except Exception: self.clusterer = DBSCAN(eps=self.eps, min_samples=self.min_samples) self.labels_ = self.clusterer.fit_predict(combined) return self def get_cluster_stats(self) -> Dict: if self.labels_ is None: return {} n_clusters = len(set(self.labels_)) - (1 if -1 in self.labels_ else 0) n_noise = list(self.labels_).count(-1) cluster_sizes = {} for label in set(self.labels_): if label != -1: cluster_sizes[int(label)] = int(np.sum(self.labels_ == label)) return { 'n_clusters': n_clusters, 'n_noise': n_noise, 'cluster_sizes': cluster_sizes, 'noise_ratio': n_noise / len(self.labels_) } def get_paper_mill_scores(self) -> np.ndarray: if self.labels_ is None: return np.zeros(len(self.labels_)) scores = np.zeros(len(self.labels_)) for label in set(self.labels_): if label == -1: continue mask = self.labels_ == label cluster_size = np.sum(mask) scores[mask] = cluster_size / len(self.labels_) return scores