""" Simplified simulation that runs without Ray (compatible with Celery) Uses Gemini directly to simulate agent reactions """ import os import logging import random from typing import Dict, Any, List, Optional from pathlib import Path # Load .env file from project root from dotenv import load_dotenv project_root = Path(__file__).parent.parent load_dotenv(project_root / ".env") from google import genai logger = logging.getLogger(__name__) def get_gemini_client(): """Get Gemini client""" api_key = os.getenv("GEMINI_API_KEY") if not api_key: # Try loading from parent .env again load_dotenv(Path(__file__).parent.parent / ".env") api_key = os.getenv("GEMINI_API_KEY") if not api_key: raise ValueError("GEMINI_API_KEY not set in .env file") return genai.Client(api_key=api_key) def generate_agent_profiles(num_agents: int, demographic_filter: Optional[Dict] = None) -> List[Dict]: """Generate diverse agent profiles""" ages = list(range(18, 70)) genders = ["Male", "Female", "Non-binary"] locations = ["Urban", "Suburban", "Rural"] education_levels = ["High School", "Bachelor's", "Master's", "PhD"] income_levels = ["Low", "Middle", "Upper-Middle", "High"] values = [ "Family-oriented", "Career-focused", "Environmentalist", "Traditional", "Progressive", "Religious", "Secular", "Health-conscious", "Tech-savvy", "Minimalist" ] profiles = [] for i in range(num_agents): profile = { "agent_id": f"agent_{i+1:04d}", "age": random.choice(ages), "gender": random.choice(genders), "location": random.choice(locations), "education": random.choice(education_levels), "income": random.choice(income_levels), "values": random.sample(values, k=random.randint(2, 4)) } # Apply demographic filter if provided if demographic_filter: if "age_min" in demographic_filter: profile["age"] = max(profile["age"], demographic_filter["age_min"]) if "age_max" in demographic_filter: profile["age"] = min(profile["age"], demographic_filter["age_max"]) if "gender" in demographic_filter: profile["gender"] = demographic_filter["gender"] profiles.append(profile) return profiles def simulate_agent_reaction(client, ad_content: str, profile: Dict) -> Dict: """Simulate a single agent's reaction using Gemini""" prompt = f"""You are simulating how a person would react to an advertisement. PERSON PROFILE: - Age: {profile['age']} - Gender: {profile['gender']} - Location: {profile['location']} area - Education: {profile['education']} - Income Level: {profile['income']} - Core Values: {', '.join(profile['values'])} ADVERTISEMENT CONTENT: {ad_content[:2000]} # Limit context size Based on this person's profile, determine their reaction to this advertisement. Respond with EXACTLY this JSON format (no markdown, no extra text): {{"opinion": "POSITIVE|NEUTRAL|NEGATIVE", "reasoning": "brief 1-2 sentence explanation", "would_share": true|false, "controversy_flag": true|false}}""" try: response = client.models.generate_content( model="gemini-3-flash-preview", contents=prompt, config={ "max_output_tokens": 150, "temperature": 0.7 } ) text = response.text.strip() # Parse JSON response import json # Clean up the response if text.startswith("```"): text = text.split("```")[1] if text.startswith("json"): text = text[4:] text = text.strip() result = json.loads(text) return { "agent_id": profile["agent_id"], "profile": profile, "opinion": result.get("opinion", "NEUTRAL"), "reasoning": result.get("reasoning", ""), "would_share": result.get("would_share", False), "controversy_flag": result.get("controversy_flag", False) } except Exception as e: logger.warning(f"Failed to simulate agent {profile['agent_id']}: {e}") # Return a neutral default return { "agent_id": profile["agent_id"], "profile": profile, "opinion": random.choice(["POSITIVE", "NEUTRAL", "NEGATIVE"]), "reasoning": "Simulation fallback", "would_share": random.random() > 0.7, "controversy_flag": False } def detect_controversies(reactions: List[Dict]) -> List[Dict]: """Detect controversial patterns in reactions""" flags = [] # Group by demographics groups = { 'age': {}, 'gender': {}, 'location': {}, 'values': {} } for reaction in reactions: profile = reaction.get('profile', {}) opinion = reaction.get('opinion') if not opinion: continue # Age groups age = profile.get('age', 30) age_bracket = f"{(age // 10) * 10}s" if age_bracket not in groups['age']: groups['age'][age_bracket] = [] groups['age'][age_bracket].append(reaction) # Gender gender = profile.get('gender', 'Unknown') if gender not in groups['gender']: groups['gender'][gender] = [] groups['gender'][gender].append(reaction) # Location location = profile.get('location', 'Unknown') if location not in groups['location']: groups['location'][location] = [] groups['location'][location].append(reaction) # Values for value in profile.get('values', []): if value not in groups['values']: groups['values'][value] = [] groups['values'][value].append(reaction) # Check each group for high negativity for group_type, group_data in groups.items(): for group_name, group_reactions in group_data.items(): if len(group_reactions) < 3: continue negative_count = sum(1 for r in group_reactions if r.get('opinion') == 'NEGATIVE') total = len(group_reactions) negative_rate = negative_count / total if negative_rate > 0.5: if negative_rate > 0.8: severity = "CRITICAL" elif negative_rate > 0.7: severity = "HIGH" elif negative_rate > 0.6: severity = "MEDIUM" else: severity = "LOW" sample_reactions = [ { "agent_id": r.get('agent_id'), "reasoning": r.get('reasoning', '')[:100] } for r in group_reactions if r.get('opinion') == 'NEGATIVE' ][:3] flags.append({ "flag_type": f"{group_type.upper()}_BACKLASH", "severity": severity, "description": f"{int(negative_rate * 100)}% of {group_type}={group_name} reacted negatively", "affected_demographics": {group_type: group_name}, "sample_agent_reactions": sample_reactions }) # Sort by severity severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3} flags.sort(key=lambda x: severity_order.get(x['severity'], 4)) return flags[:10] def run_simulation_simple( experiment_id: str, ad_content: str, demographic_filter: Optional[Dict[str, Any]] = None, num_agents: int = 10, simulation_days: int = 5, redis_client = None ) -> Dict[str, Any]: """ Run a simplified simulation without Ray This version is compatible with Celery and uses Gemini directly """ logger.info(f"Starting simplified simulation {experiment_id} with {num_agents} agents") try: client = get_gemini_client() # Generate profiles profiles = generate_agent_profiles(num_agents, demographic_filter) logger.info(f"Generated {len(profiles)} agent profiles") # Update progress if redis_client: import json redis_client.setex(f"sim:{experiment_id}:status", 60, json.dumps({ "progress": 10, "current_day": 0, "active_agents": 0 })) # Simulate each agent's reaction reactions = [] for i, profile in enumerate(profiles): reaction = simulate_agent_reaction(client, ad_content, profile) reactions.append(reaction) # Update progress if redis_client and i % 5 == 0: progress = 10 + int((i / len(profiles)) * 80) import json redis_client.setex(f"sim:{experiment_id}:status", 60, json.dumps({ "progress": progress, "current_day": min(i // (len(profiles) // simulation_days + 1) + 1, simulation_days), "active_agents": i + 1 })) logger.info(f"Simulated {len(reactions)} agent reactions") # Calculate results opinions = [r.get('opinion') for r in reactions if r.get('opinion')] sentiment_counts = { "positive": sum(1 for o in opinions if o == 'POSITIVE'), "neutral": sum(1 for o in opinions if o == 'NEUTRAL'), "negative": sum(1 for o in opinions if o == 'NEGATIVE') } total = len(opinions) or 1 strong_reactions = sentiment_counts['positive'] + sentiment_counts['negative'] engagement_score = (strong_reactions / total) * 100 # Detect controversies risk_flags = detect_controversies(reactions) # Prepare agent logs agent_logs = [ { "agent_id": r['agent_id'], "event_type": "AD_REACTION", "event_data": { "opinion": r.get('opinion'), "reasoning": r.get('reasoning'), "would_share": r.get('would_share') } } for r in reactions[:100] # Limit logs ] # Final progress update if redis_client: import json redis_client.setex(f"sim:{experiment_id}:status", 60, json.dumps({ "progress": 100, "current_day": simulation_days, "active_agents": len(reactions) })) logger.info(f"Simulation complete. Engagement score: {engagement_score:.1f}") return { "engagement_score": round(engagement_score, 2), "sentiment_breakdown": sentiment_counts, "total_agents": len(reactions), "responding_agents": len(opinions), "risk_flags": risk_flags, "agent_logs": agent_logs } except Exception as e: logger.error(f"Simulation failed: {e}") raise