| """ |
| Advanced Content Moderation System for Helion-V2 |
| Provides production-grade content filtering and safety checks. |
| """ |
|
|
| import re |
| import json |
| from typing import List, Dict, Tuple, Optional, Set |
| from dataclasses import dataclass, asdict |
| from datetime import datetime |
| import logging |
|
|
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class ModerationResult: |
| """Detailed moderation result.""" |
| timestamp: str |
| is_approved: bool |
| risk_level: str |
| violations: List[str] |
| confidence_scores: Dict[str, float] |
| recommended_action: str |
| sanitized_content: Optional[str] = None |
| metadata: Optional[Dict] = None |
|
|
|
|
| class ContentFilter: |
| """Multi-layer content filtering system.""" |
| |
| def __init__(self, config_path: Optional[str] = None): |
| """ |
| Initialize content filter with optional custom configuration. |
| |
| Args: |
| config_path: Path to custom filter configuration JSON |
| """ |
| self.config = self._load_config(config_path) |
| self._initialize_filters() |
| |
| def _load_config(self, config_path: Optional[str]) -> Dict: |
| """Load filter configuration.""" |
| default_config = { |
| "enable_profanity_filter": True, |
| "enable_toxicity_detection": True, |
| "enable_bias_detection": True, |
| "enable_pii_detection": True, |
| "enable_spam_detection": True, |
| "strictness_level": "medium", |
| "blocked_domains": ["example-spam.com"], |
| "allowed_code_patterns": True, |
| "max_repetition_ratio": 0.3 |
| } |
| |
| if config_path: |
| try: |
| with open(config_path, 'r') as f: |
| custom_config = json.load(f) |
| default_config.update(custom_config) |
| except Exception as e: |
| logger.warning(f"Could not load config from {config_path}: {e}") |
| |
| return default_config |
| |
| def _initialize_filters(self): |
| """Initialize all filter components.""" |
| |
| |
| self.profanity_list = self._load_profanity_list() |
| |
| |
| self.toxic_phrases = [ |
| "you should kill yourself", |
| "i hope you die", |
| "you deserve to suffer", |
| "stupid idiot moron", |
| "worthless piece of", |
| ] |
| |
| |
| self.bias_indicators = { |
| "gender": ["all women are", "all men are", "females are", "males are"], |
| "race": ["all [race] are", "typical [race]", "[race] people always"], |
| "religion": ["all [religion] are", "[religion] believers are"], |
| "age": ["all old people", "millennials are all", "boomers are"], |
| } |
| |
| |
| self.spam_patterns = [ |
| r'(?i)(buy now|click here|limited time|act now).{0,50}(http|www)', |
| r'(?i)(viagra|cialis|lottery|prince|inheritance)', |
| r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', |
| ] |
| |
| |
| self.dangerous_instructions = [ |
| r'(?i)how\s+to\s+(make|build|create|construct)\s+(bomb|explosive|poison|weapon)', |
| r'(?i)instructions?\s+(for|to)\s+(kill|murder|harm|torture)', |
| r'(?i)(recipe|guide|tutorial)\s+for\s+(meth|cocaine|heroin)', |
| r'(?i)how\s+to\s+(hack|crack|break\s+into|bypass)', |
| ] |
| |
| |
| self.medical_misinfo = [ |
| r'(?i)(cancer|covid|hiv).+(cure|treat|prevent).+(bleach|hydrogen\s+peroxide|vitamin\s+c)', |
| r'(?i)vaccines?\s+(cause|lead\s+to|result\s+in)\s+(autism|death|infertility)', |
| r'(?i)essential\s+oils?\s+(cure|treat)\s+(cancer|diabetes|heart\s+disease)', |
| ] |
| |
| def _load_profanity_list(self) -> Set[str]: |
| """Load profanity word list.""" |
| |
| return { |
| 'fuck', 'shit', 'bitch', 'asshole', 'bastard', 'damn', |
| 'cunt', 'piss', 'cock', 'dick', 'pussy', 'slut', 'whore' |
| } |
| |
| def check_profanity(self, text: str) -> Tuple[bool, List[str]]: |
| """ |
| Check for profanity in text. |
| |
| Args: |
| text: Text to check |
| |
| Returns: |
| Tuple of (has_profanity, list of found words) |
| """ |
| if not self.config["enable_profanity_filter"]: |
| return False, [] |
| |
| text_lower = text.lower() |
| words = re.findall(r'\b\w+\b', text_lower) |
| found_profanity = [word for word in words if word in self.profanity_list] |
| |
| return len(found_profanity) > 0, found_profanity |
| |
| def check_toxicity(self, text: str) -> Tuple[bool, float, List[str]]: |
| """ |
| Check for toxic content. |
| |
| Args: |
| text: Text to check |
| |
| Returns: |
| Tuple of (is_toxic, toxicity_score, matched_phrases) |
| """ |
| if not self.config["enable_toxicity_detection"]: |
| return False, 0.0, [] |
| |
| text_lower = text.lower() |
| matched_phrases = [] |
| toxicity_score = 0.0 |
| |
| for phrase in self.toxic_phrases: |
| if phrase in text_lower: |
| matched_phrases.append(phrase) |
| toxicity_score += 0.3 |
| |
| |
| aggressive_patterns = [ |
| r'\b(hate|despise|loathe)\s+you\b', |
| r'\byou\s+(are|re)\s+(stupid|dumb|idiot|moron)', |
| r'\bshut\s+up\b', |
| r'\bgo\s+to\s+hell\b', |
| ] |
| |
| for pattern in aggressive_patterns: |
| if re.search(pattern, text_lower): |
| toxicity_score += 0.2 |
| |
| is_toxic = toxicity_score > 0.5 |
| return is_toxic, min(toxicity_score, 1.0), matched_phrases |
| |
| def check_bias(self, text: str) -> Tuple[bool, Dict[str, List[str]]]: |
| """ |
| Check for biased language. |
| |
| Args: |
| text: Text to check |
| |
| Returns: |
| Tuple of (has_bias, dictionary of bias types and matched phrases) |
| """ |
| if not self.config["enable_bias_detection"]: |
| return False, {} |
| |
| text_lower = text.lower() |
| bias_found = {} |
| |
| for bias_type, indicators in self.bias_indicators.items(): |
| matches = [] |
| for indicator in indicators: |
| |
| if indicator in text_lower: |
| matches.append(indicator) |
| |
| if matches: |
| bias_found[bias_type] = matches |
| |
| return len(bias_found) > 0, bias_found |
| |
| def check_pii(self, text: str) -> Tuple[bool, Dict[str, List[str]]]: |
| """ |
| Check for personally identifiable information. |
| |
| Args: |
| text: Text to check |
| |
| Returns: |
| Tuple of (has_pii, dictionary of PII types found) |
| """ |
| if not self.config["enable_pii_detection"]: |
| return False, {} |
| |
| pii_found = {} |
| |
| |
| ssn_pattern = r'\b\d{3}-\d{2}-\d{4}\b' |
| ssns = re.findall(ssn_pattern, text) |
| if ssns: |
| pii_found['ssn'] = ssns |
| |
| |
| cc_pattern = r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b' |
| ccs = re.findall(cc_pattern, text) |
| if ccs: |
| pii_found['credit_card'] = ccs |
| |
| |
| email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' |
| emails = re.findall(email_pattern, text) |
| if emails: |
| pii_found['email'] = emails |
| |
| |
| phone_pattern = r'\b(?:\+?1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b' |
| phones = re.findall(phone_pattern, text) |
| if phones: |
| pii_found['phone'] = phones |
| |
| |
| address_pattern = r'\b\d+\s+[A-Za-z]+\s+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd)\b' |
| addresses = re.findall(address_pattern, text, re.IGNORECASE) |
| if addresses: |
| pii_found['address'] = addresses |
| |
| return len(pii_found) > 0, pii_found |
| |
| def check_spam(self, text: str) -> Tuple[bool, List[str]]: |
| """ |
| Check for spam content. |
| |
| Args: |
| text: Text to check |
| |
| Returns: |
| Tuple of (is_spam, list of matched patterns) |
| """ |
| if not self.config["enable_spam_detection"]: |
| return False, [] |
| |
| matched_patterns = [] |
| |
| for pattern in self.spam_patterns: |
| if re.search(pattern, text): |
| matched_patterns.append(pattern) |
| |
| |
| for domain in self.config["blocked_domains"]: |
| if domain in text.lower(): |
| matched_patterns.append(f"Blocked domain: {domain}") |
| |
| return len(matched_patterns) > 0, matched_patterns |
| |
| def check_dangerous_content(self, text: str) -> Tuple[bool, List[str]]: |
| """ |
| Check for dangerous instructions or content. |
| |
| Args: |
| text: Text to check |
| |
| Returns: |
| Tuple of (is_dangerous, list of matched categories) |
| """ |
| text_lower = text.lower() |
| dangerous_categories = [] |
| |
| |
| for pattern in self.dangerous_instructions: |
| if re.search(pattern, text_lower): |
| dangerous_categories.append("dangerous_instructions") |
| break |
| |
| |
| for pattern in self.medical_misinfo: |
| if re.search(pattern, text_lower): |
| dangerous_categories.append("medical_misinformation") |
| break |
| |
| return len(dangerous_categories) > 0, dangerous_categories |
| |
| def check_repetition(self, text: str) -> Tuple[bool, float]: |
| """ |
| Check for excessive repetition (potential spam or model failure). |
| |
| Args: |
| text: Text to check |
| |
| Returns: |
| Tuple of (is_repetitive, repetition_ratio) |
| """ |
| words = text.split() |
| if len(words) < 10: |
| return False, 0.0 |
| |
| unique_words = len(set(words)) |
| total_words = len(words) |
| repetition_ratio = 1.0 - (unique_words / total_words) |
| |
| is_repetitive = repetition_ratio > self.config["max_repetition_ratio"] |
| return is_repetitive, repetition_ratio |
| |
| def moderate_content(self, text: str, context: str = "general") -> ModerationResult: |
| """ |
| Perform comprehensive content moderation. |
| |
| Args: |
| text: Text to moderate |
| context: Context of the content (general, chat, code, etc.) |
| |
| Returns: |
| ModerationResult with detailed analysis |
| """ |
| violations = [] |
| confidence_scores = {} |
| risk_level = "low" |
| |
| |
| has_profanity, profanity_words = self.check_profanity(text) |
| if has_profanity: |
| violations.append(f"Profanity detected: {len(profanity_words)} words") |
| confidence_scores["profanity"] = 0.9 |
| risk_level = "medium" |
| |
| is_toxic, toxicity_score, toxic_phrases = self.check_toxicity(text) |
| if is_toxic: |
| violations.append(f"Toxic content detected (score: {toxicity_score:.2f})") |
| confidence_scores["toxicity"] = toxicity_score |
| risk_level = "high" |
| |
| has_bias, bias_types = self.check_bias(text) |
| if has_bias: |
| violations.append(f"Potential bias detected: {', '.join(bias_types.keys())}") |
| confidence_scores["bias"] = 0.7 |
| if risk_level == "low": |
| risk_level = "medium" |
| |
| has_pii, pii_types = self.check_pii(text) |
| if has_pii: |
| violations.append(f"PII detected: {', '.join(pii_types.keys())}") |
| confidence_scores["pii"] = 1.0 |
| risk_level = "high" |
| |
| is_spam, spam_patterns = self.check_spam(text) |
| if is_spam: |
| violations.append(f"Spam indicators: {len(spam_patterns)}") |
| confidence_scores["spam"] = 0.8 |
| if risk_level == "low": |
| risk_level = "medium" |
| |
| is_dangerous, dangerous_categories = self.check_dangerous_content(text) |
| if is_dangerous: |
| violations.append(f"Dangerous content: {', '.join(dangerous_categories)}") |
| confidence_scores["dangerous"] = 0.95 |
| risk_level = "critical" |
| |
| is_repetitive, repetition_ratio = self.check_repetition(text) |
| if is_repetitive: |
| violations.append(f"Excessive repetition ({repetition_ratio:.2%})") |
| confidence_scores["repetition"] = repetition_ratio |
| |
| |
| is_approved = len(violations) == 0 or (risk_level == "low" and not is_dangerous) |
| |
| if risk_level == "critical": |
| recommended_action = "block" |
| elif risk_level == "high": |
| recommended_action = "review" |
| elif risk_level == "medium": |
| recommended_action = "flag" |
| else: |
| recommended_action = "approve" |
| |
| |
| sanitized_content = None |
| if has_pii: |
| sanitized_content = self._sanitize_pii(text) |
| |
| return ModerationResult( |
| timestamp=datetime.now().isoformat(), |
| is_approved=is_approved, |
| risk_level=risk_level, |
| violations=violations, |
| confidence_scores=confidence_scores, |
| recommended_action=recommended_action, |
| sanitized_content=sanitized_content, |
| metadata={ |
| "text_length": len(text), |
| "word_count": len(text.split()), |
| "context": context |
| } |
| ) |
| |
| def _sanitize_pii(self, text: str) -> str: |
| """Sanitize text by removing/redacting PII.""" |
| sanitized = text |
| |
| |
| sanitized = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN-REDACTED]', sanitized) |
| |
| |
| sanitized = re.sub(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b', '[CC-REDACTED]', sanitized) |
| |
| |
| sanitized = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL-REDACTED]', sanitized) |
| |
| |
| sanitized = re.sub(r'\b(?:\+?1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b', '[PHONE-REDACTED]', sanitized) |
| |
| return sanitized |
| |
| def batch_moderate(self, texts: List[str]) -> List[ModerationResult]: |
| """ |
| Moderate multiple texts in batch. |
| |
| Args: |
| texts: List of texts to moderate |
| |
| Returns: |
| List of ModerationResults |
| """ |
| return [self.moderate_content(text) for text in texts] |
| |
| def export_results(self, results: List[ModerationResult], filepath: str): |
| """ |
| Export moderation results to JSON file. |
| |
| Args: |
| results: List of ModerationResults |
| filepath: Output file path |
| """ |
| with open(filepath, 'w') as f: |
| json.dump([asdict(r) for r in results], f, indent=2) |
| |
| logger.info(f"Exported {len(results)} moderation results to {filepath}") |
|
|
|
|
| |
| if __name__ == "__main__": |
| |
| filter_system = ContentFilter() |
| |
| |
| test_texts = [ |
| "What is the capital of France?", |
| "You are a stupid idiot!", |
| "My SSN is 123-45-6789", |
| "Buy now! Limited time offer! www.spam.com", |
| "How to make a bomb at home", |
| ] |
| |
| print("Content Moderation Results:\n") |
| print("=" * 80) |
| |
| for i, text in enumerate(test_texts, 1): |
| result = filter_system.moderate_content(text) |
| |
| print(f"\nTest {i}: {text[:50]}...") |
| print(f"Approved: {result.is_approved}") |
| print(f"Risk Level: {result.risk_level}") |
| print(f"Violations: {result.violations}") |
| print(f"Recommended Action: {result.recommended_action}") |
| if result.sanitized_content: |
| print(f"Sanitized: {result.sanitized_content[:100]}...") |
| print("-" * 80) |
| |
| |
| results = filter_system.batch_moderate(test_texts) |
| filter_system.export_results(results, "moderation_results.json") |
| print(f"\n✓ Exported {len(results)} results to moderation_results.json") |