| """ |
| Paper Mill Detection via Clustering Analysis |
| Clusters fraudulent papers to identify organized paper mills. |
| """ |
|
|
| import numpy as np |
| from sklearn.preprocessing import StandardScaler |
| from sklearn.cluster import DBSCAN |
| import hdbscan |
| from typing import List, Dict, Optional |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
|
|
| class StylometricFeatures: |
| """Extract stylometric features for paper mill detection.""" |
| |
| @staticmethod |
| def extract(text: str) -> Dict[str, float]: |
| import re |
| from collections import Counter |
| |
| words = re.findall(r'\b\w+\b', text.lower()) |
| sentences = [s.strip() for s in re.split(r'[.!?]+', text) if s.strip()] |
| |
| if not words: |
| return {} |
| |
| n_words = len(words) |
| n_sentences = len(sentences) |
| unique_words = len(set(words)) |
| ttr = unique_words / n_words if n_words > 0 else 0 |
| avg_sent_len = n_words / n_sentences if n_sentences > 0 else 0 |
| word_lengths = [len(w) for w in words] |
| avg_word_len = np.mean(word_lengths) |
| |
| bigrams = [tuple(words[i:i+2]) for i in range(len(words)-1)] |
| bigram_counts = Counter(bigrams) |
| total_bigrams = len(bigrams) |
| |
| if total_bigrams > 0: |
| probs = np.array(list(bigram_counts.values())) / total_bigrams |
| bigram_entropy = -np.sum(probs * np.log2(probs + 1e-10)) |
| else: |
| bigram_entropy = 0 |
| |
| punct_count = len(re.findall(r'[^\w\s]', text)) |
| punct_ratio = punct_count / len(text) if len(text) > 0 else 0 |
| digit_count = len(re.findall(r'\d', text)) |
| digit_ratio = digit_count / len(text) if len(text) > 0 else 0 |
| |
| return { |
| 'ttr': ttr, 'avg_sent_len': avg_sent_len, 'avg_word_len': avg_word_len, |
| 'bigram_entropy': bigram_entropy, 'punct_ratio': punct_ratio, |
| 'digit_ratio': digit_ratio, 'n_words': n_words, 'n_sentences': n_sentences |
| } |
|
|
|
|
| class VisualHashFeatures: |
| """Extract visual features for detecting template-based paper mills.""" |
| |
| @staticmethod |
| def compute_perceptual_hash(image) -> str: |
| try: |
| import cv2 |
| if isinstance(image, str): |
| img = cv2.imread(image, cv2.IMREAD_GRAYSCALE) |
| else: |
| img = np.array(image.convert('L')) |
| img = cv2.resize(img, (32, 32)) |
| dct = cv2.dct(np.float32(img)) |
| dct_low = dct[:8, :8] |
| avg = dct_low.mean() |
| bits = (dct_low > avg).flatten().astype(int) |
| return ''.join(map(str, bits)) |
| except Exception: |
| return None |
| |
| @staticmethod |
| def hamming_distance(hash1: str, hash2: str) -> int: |
| if hash1 is None or hash2 is None or len(hash1) != len(hash2): |
| return 64 |
| return sum(c1 != c2 for c1, c2 in zip(hash1, hash2)) |
|
|
|
|
| class MetadataPatternFeatures: |
| """Extract metadata patterns indicative of paper mills.""" |
| |
| @staticmethod |
| def extract(papers: List[Dict]) -> np.ndarray: |
| features = [] |
| for paper in papers: |
| feat = {} |
| authors = paper.get('authors', '') |
| n_authors = len(authors.split(';')) if isinstance(authors, str) else len(authors) |
| feat['n_authors'] = n_authors |
| affiliations = paper.get('affiliations', '') |
| n_affiliations = len(affiliations.split(';')) if isinstance(affiliations, str) else len(affiliations) if affiliations else 0 |
| feat['n_affiliations'] = n_affiliations |
| feat['author_affil_ratio'] = n_authors / max(n_affiliations, 1) |
| cited_by = paper.get('cited_by_count', 0) or 0 |
| references = paper.get('reference_count', 0) or 0 |
| feat['cited_by'] = cited_by |
| feat['references'] = references |
| feat['cite_ratio'] = cited_by / max(references, 1) |
| n_grants = paper.get('n_grants', 0) or 0 |
| feat['has_grants'] = 1 if n_grants > 0 else 0 |
| journal = paper.get('journal', '') |
| feat['journal_len'] = len(str(journal)) |
| features.append(list(feat.values())) |
| return np.array(features, dtype=np.float32) |
|
|
|
|
| class PaperMillClustering: |
| """ |
| Detects paper mills by clustering fraudulent papers based on |
| combined multimodal embeddings + stylometric + metadata features. |
| """ |
| |
| def __init__(self, min_cluster_size: int = 5, min_samples: int = 3, eps: float = 0.5): |
| self.min_cluster_size = min_cluster_size |
| self.min_samples = min_samples |
| self.eps = eps |
| self.scaler = StandardScaler() |
| self.clusterer = None |
| self.labels_ = None |
| |
| def fit(self, embeddings: np.ndarray, |
| stylometric: Optional[np.ndarray] = None, |
| metadata: Optional[np.ndarray] = None): |
| features = [embeddings] |
| if stylometric is not None: |
| features.append(stylometric) |
| if metadata is not None: |
| features.append(metadata) |
| combined = np.concatenate(features, axis=1) |
| combined = self.scaler.fit_transform(combined) |
| try: |
| self.clusterer = hdbscan.HDBSCAN( |
| min_cluster_size=self.min_cluster_size, |
| min_samples=self.min_samples, metric='euclidean' |
| ) |
| self.labels_ = self.clusterer.fit_predict(combined) |
| except Exception: |
| self.clusterer = DBSCAN(eps=self.eps, min_samples=self.min_samples) |
| self.labels_ = self.clusterer.fit_predict(combined) |
| return self |
| |
| def get_cluster_stats(self) -> Dict: |
| if self.labels_ is None: |
| return {} |
| n_clusters = len(set(self.labels_)) - (1 if -1 in self.labels_ else 0) |
| n_noise = list(self.labels_).count(-1) |
| cluster_sizes = {} |
| for label in set(self.labels_): |
| if label != -1: |
| cluster_sizes[int(label)] = int(np.sum(self.labels_ == label)) |
| return { |
| 'n_clusters': n_clusters, 'n_noise': n_noise, |
| 'cluster_sizes': cluster_sizes, 'noise_ratio': n_noise / len(self.labels_) |
| } |
| |
| def get_paper_mill_scores(self) -> np.ndarray: |
| if self.labels_ is None: |
| return np.zeros(len(self.labels_)) |
| scores = np.zeros(len(self.labels_)) |
| for label in set(self.labels_): |
| if label == -1: |
| continue |
| mask = self.labels_ == label |
| cluster_size = np.sum(mask) |
| scores[mask] = cluster_size / len(self.labels_) |
| return scores |
|
|