|
|
|
|
|
|
| import asyncio
|
| from playwright.async_api import async_playwright, Page
|
| from bs4 import BeautifulSoup
|
| import requests
|
| import pandas as pd
|
| import time
|
| import re
|
| from datetime import datetime, timedelta
|
| import os
|
| import sys
|
| import random
|
| import sqlite3
|
| import logging
|
| import schedule
|
| import hashlib
|
| import json
|
| import threading
|
| from contextlib import contextmanager
|
| from dataclasses import dataclass
|
| from typing import List, Dict, Optional, Tuple
|
| from urllib.parse import urljoin, urlparse
|
|
|
|
|
|
|
|
|
|
|
| @dataclass
|
| class PersonalInfo:
|
| prenom: str = "Valentin"
|
| nom: str = "Cora"
|
| email: str = "valouassol@outlook.com"
|
| email_derivee: str = "valouassol+concours@outlook.com"
|
| telephone: str = "+41791234567"
|
| adresse: str = "Av Chantemerle 9"
|
| code_postal: str = "1009"
|
| ville: str = "Pully"
|
| pays: str = "Suisse"
|
|
|
| @dataclass
|
| class Contest:
|
| title: str
|
| url: str
|
| description: str
|
| source: str
|
| deadline: Optional[str] = None
|
| prize: Optional[str] = None
|
| difficulty_score: int = 0
|
|
|
| @dataclass
|
| class FormField:
|
| selector: str
|
| field_type: str
|
| label: str
|
| required: bool
|
| current_value: str = ""
|
| ai_context: str = ""
|
|
|
| @dataclass
|
| class FormAnalysis:
|
| fields: List[FormField]
|
| complexity_score: int
|
| estimated_success_rate: float
|
| requires_captcha: bool
|
| requires_social_media: bool
|
| form_url: str
|
|
|
|
|
|
|
|
|
|
|
|
|
| PERSONAL_INFO = PersonalInfo()
|
|
|
|
|
| SITES_CH = [
|
| 'https://www.concours.ch/concours/tous',
|
| 'https://www.jeu-concours.biz/concours-pays_suisse.html',
|
| 'https://www.loisirs.ch/concours/',
|
| 'https://www.radin.ch/',
|
| 'https://win4win.ch/fr/',
|
| 'https://www.concours-suisse.ch/',
|
| 'https://corporate.migros.ch/fr/concours',
|
| 'https://www.20min.ch/fr/concours-et-jeux',
|
| 'https://dein-gewinnspiel.ch/en',
|
| 'https://www.myswitzerland.com/fr/planification/vie-pratique/concours/'
|
| ]
|
|
|
|
|
| USER_AGENTS = [
|
| 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
| 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
| 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
|
| ]
|
|
|
|
|
| logging.basicConfig(
|
| level=logging.INFO,
|
| format='%(asctime)s - %(levelname)s - %(message)s',
|
| handlers=[
|
| logging.FileHandler('concours_bot_sans_api.log', encoding='utf-8'),
|
| logging.StreamHandler()
|
| ]
|
| )
|
|
|
|
|
|
|
|
|
|
|
| class DatabaseManager:
|
| def __init__(self, db_path: str = 'concours_sans_api.sqlite'):
|
| self.db_path = db_path
|
| self.local = threading.local()
|
| self._init_db()
|
|
|
| def _get_connection(self):
|
| if not hasattr(self.local, 'conn'):
|
| self.local.conn = sqlite3.connect(self.db_path)
|
| self.local.conn.row_factory = sqlite3.Row
|
| return self.local.conn
|
|
|
| @contextmanager
|
| def transaction(self):
|
| conn = self._get_connection()
|
| try:
|
| yield conn
|
| conn.commit()
|
| except Exception:
|
| conn.rollback()
|
| raise
|
|
|
| def _init_db(self):
|
| with self.transaction() as conn:
|
| conn.execute('''
|
| CREATE TABLE IF NOT EXISTS participations (
|
| url TEXT PRIMARY KEY,
|
| title TEXT,
|
| source TEXT,
|
| status TEXT,
|
| difficulty_score INTEGER,
|
| success_rate REAL,
|
| date TEXT,
|
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
| )
|
| ''')
|
| conn.execute('''
|
| CREATE TABLE IF NOT EXISTS victories (
|
| email_id TEXT PRIMARY KEY,
|
| date TEXT,
|
| lot TEXT,
|
| source TEXT,
|
| confirmed BOOLEAN DEFAULT FALSE,
|
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
| )
|
| ''')
|
|
|
| conn.execute('CREATE INDEX IF NOT EXISTS idx_participations_date ON participations(date)')
|
|
|
| def add_participation(self, contest: Contest, status: str = 'pending', success_rate: float = 0.0):
|
| with self.transaction() as conn:
|
| conn.execute('''
|
| INSERT OR REPLACE INTO participations
|
| (url, title, source, status, difficulty_score, success_rate, date)
|
| VALUES (?, ?, ?, ?, ?, ?, date('now'))
|
| ''', (contest.url, contest.title, contest.source, status, contest.difficulty_score, success_rate))
|
|
|
| def participation_exists(self, url: str) -> bool:
|
| conn = self._get_connection()
|
| result = conn.execute("SELECT 1 FROM participations WHERE url = ?", (url,)).fetchone()
|
| return result is not None
|
|
|
| def get_stats(self) -> Dict:
|
| conn = self._get_connection()
|
| stats = {}
|
|
|
| stats['total_participations'] = conn.execute("SELECT COUNT(*) FROM participations").fetchone()[0]
|
| stats['successful_participations'] = conn.execute("SELECT COUNT(*) FROM participations WHERE status='success'").fetchone()[0]
|
| stats['total_victories'] = conn.execute("SELECT COUNT(*) FROM victories").fetchone()[0]
|
|
|
| source_stats = conn.execute('''
|
| SELECT source, COUNT(*) as count
|
| FROM participations
|
| GROUP BY source
|
| ORDER BY count DESC
|
| ''').fetchall()
|
| stats['by_source'] = {row[0]: row[1] for row in source_stats}
|
|
|
| return stats
|
|
|
|
|
|
|
|
|
|
|
| class LocalResponseEngine:
|
| def __init__(self):
|
| self.cache = {}
|
|
|
|
|
| self.knowledge_base = {
|
| "suisse": {
|
| "capitale": "Berne",
|
| "langues": "Français, Allemand, Italien, Romanche",
|
| "monnaie": "Franc suisse",
|
| "population": "8.7 millions",
|
| "villes": ["Zurich", "Genève", "Bâle", "Lausanne", "Berne", "Winterthour"],
|
| "cantons": ["Vaud", "Genève", "Valais", "Fribourg", "Neuchâtel", "Jura"]
|
| },
|
| "general": {
|
| "couleurs": ["Rouge", "Bleu", "Vert", "Jaune", "Orange", "Violet", "Rose", "Noir", "Blanc"],
|
| "nombres": ["1", "2", "3", "4", "5", "10", "12", "15", "20", "25", "50", "100"],
|
| "annees": ["2023", "2024", "2025"]
|
| }
|
| }
|
|
|
| def generate_response(self, question: str, context: str = "", response_type: str = "qa") -> str:
|
| """Génère une réponse intelligente sans API"""
|
| cache_key = hashlib.md5(f"{question}{context}{response_type}".encode()).hexdigest()
|
|
|
| if cache_key in self.cache:
|
| return self.cache[cache_key]
|
|
|
| response = self._generate_intelligent_response(question, context, response_type)
|
|
|
|
|
| self.cache[cache_key] = response
|
| return response
|
|
|
| def _generate_intelligent_response(self, question: str, context: str, response_type: str) -> str:
|
| """Génère une réponse basée sur l'analyse du texte"""
|
| question_lower = question.lower()
|
| context_lower = context.lower()
|
|
|
| if response_type == "motivation":
|
| return self._generate_motivation(question_lower, context_lower)
|
| elif response_type == "quiz":
|
| return self._generate_quiz_answer(question_lower, context_lower)
|
| else:
|
| return self._generate_general_response(question_lower, context_lower)
|
|
|
| def _generate_motivation(self, question: str, context: str) -> str:
|
| """Génère une motivation personnalisée"""
|
|
|
| if any(word in context for word in ["voyage", "vacances", "séjour", "destination"]):
|
| return random.choice([
|
| "J'adore voyager et découvrir de nouveaux horizons. Ce prix serait une opportunité fantastique pour moi de vivre une expérience inoubliable en Suisse ou ailleurs.",
|
| "Voyager est ma passion et ce concours représente le voyage de mes rêves. J'espère avoir la chance de le remporter pour découvrir de nouveaux paysages.",
|
| "En tant que passionné de voyages, ce prix m'offrirait l'occasion parfaite de découvrir de nouveaux lieux et cultures, ce qui m'enrichirait énormément."
|
| ])
|
| elif any(word in context for word in ["produit", "cosmétique", "beauté", "soin"]):
|
| return random.choice([
|
| "Je suis toujours à la recherche de nouveaux produits de qualité et j'aimerais beaucoup tester cette gamme qui semble très prometteuse.",
|
| "Ces produits m'intéressent énormément et je serais ravi de pouvoir les découvrir et partager mon expérience avec mes proches.",
|
| "J'ai entendu beaucoup de bien de cette marque et j'aimerais avoir l'opportunité de l'essayer pour me faire ma propre opinion."
|
| ])
|
| elif any(word in context for word in ["technologie", "smartphone", "ordinateur", "électronique"]):
|
| return random.choice([
|
| "En tant que passionné de technologie, ce prix m'intéresse beaucoup et m'aiderait dans mes projets personnels et professionnels.",
|
| "J'ai besoin de ce type d'équipement pour mes études et mes loisirs, ce serait formidable de le gagner dans ce concours.",
|
| "La technologie fait partie de ma vie quotidienne et ce prix serait très utile pour mes activités créatives."
|
| ])
|
| elif any(word in context for word in ["nourriture", "restaurant", "gastronomie", "cuisine"]):
|
| return random.choice([
|
| "J'adore découvrir de nouvelles saveurs et expériences culinaires. Ce prix me permettrait de vivre un moment gastronomique exceptionnel.",
|
| "La cuisine est une de mes passions et ce concours m'offrirait l'opportunité de découvrir de nouveaux goûts et techniques.",
|
| "En tant qu'amateur de bonne cuisine, je serais ravi de remporter ce prix pour explorer de nouvelles expériences gastronomiques."
|
| ])
|
| else:
|
| return random.choice([
|
| "Je participe avec enthousiasme à ce concours car le prix m'intéresse vraiment et correspond parfaitement à mes centres d'intérêt actuels.",
|
| "Ce concours m'attire particulièrement et je serais très heureux de remporter ce magnifique prix qui me ferait énormément plaisir.",
|
| "J'espère avoir la chance de gagner car ce prix représente une belle opportunité pour moi et ma famille.",
|
| "Je suis motivé à participer car cette opportunité pourrait vraiment améliorer mon quotidien de manière positive."
|
| ])
|
|
|
| def _generate_quiz_answer(self, question: str, context: str) -> str:
|
| """Génère une réponse de quiz intelligente"""
|
|
|
| if "suisse" in question:
|
| if any(word in question for word in ["capitale", "capital"]):
|
| return self.knowledge_base["suisse"]["capitale"]
|
| elif any(word in question for word in ["langue", "langues", "language"]):
|
| return random.choice(["Français", "Allemand", "Italien", "Romanche"])
|
| elif any(word in question for word in ["monnaie", "currency", "franc"]):
|
| return self.knowledge_base["suisse"]["monnaie"]
|
| elif any(word in question for word in ["population", "habitants"]):
|
| return self.knowledge_base["suisse"]["population"]
|
| elif any(word in question for word in ["ville", "city", "cities"]):
|
| return random.choice(self.knowledge_base["suisse"]["villes"])
|
| elif any(word in question for word in ["canton", "cantons"]):
|
| return random.choice(self.knowledge_base["suisse"]["cantons"])
|
|
|
|
|
| if any(word in question for word in ["couleur", "color", "couleurs"]):
|
| return random.choice(self.knowledge_base["general"]["couleurs"])
|
|
|
| if any(word in question for word in ["combien", "nombre", "quantité", "how many"]):
|
| return random.choice(self.knowledge_base["general"]["nombres"])
|
|
|
| if any(word in question for word in ["année", "date", "quand", "when", "year"]):
|
| return random.choice(self.knowledge_base["general"]["annees"])
|
|
|
|
|
| if any(word in question for word in ["est-ce", "is", "are", "do", "does"]):
|
| return random.choice(["Oui", "Non"])
|
|
|
|
|
| if any(word in question for word in ["vrai", "faux", "true", "false"]):
|
| return random.choice(["Vrai", "Faux"])
|
|
|
|
|
| return random.choice(["A", "B", "C", "D", "1", "2", "3"])
|
|
|
| def _generate_general_response(self, question: str, context: str) -> str:
|
| """Génère une réponse générale"""
|
| if "age" in question or "âge" in question:
|
| return random.choice(["25", "28", "30", "32"])
|
| elif "profession" in question or "métier" in question:
|
| return random.choice(["Étudiant", "Employé", "Consultant", "Développeur"])
|
| elif "ville" in question:
|
| return PERSONAL_INFO.ville
|
| elif "pays" in question:
|
| return PERSONAL_INFO.pays
|
| else:
|
| return "Merci"
|
|
|
|
|
|
|
|
|
|
|
| class LocalScraper:
|
| def __init__(self, db_manager: DatabaseManager):
|
| self.db = db_manager
|
| self.session = None
|
|
|
| async def __aenter__(self):
|
|
|
| self.session = requests.Session()
|
| self.session.headers.update({'User-Agent': random.choice(USER_AGENTS)})
|
| return self
|
|
|
| async def __aexit__(self, exc_type, exc_val, exc_tb):
|
| if self.session:
|
| self.session.close()
|
|
|
| async def scrape_all_sources(self) -> List[Contest]:
|
| """Scrape tous les sites web localement"""
|
| all_contests = []
|
|
|
|
|
| web_contests = await self._scrape_websites()
|
| all_contests.extend(web_contests)
|
|
|
|
|
| unique_contests = self._filter_unique_contests(all_contests)
|
|
|
| logging.info(f"Total contests found: {len(all_contests)}, unique new: {len(unique_contests)}")
|
| return unique_contests
|
|
|
| async def _scrape_websites(self) -> List[Contest]:
|
| """Scrape les sites web en parallèle"""
|
| batch_size = 3
|
| all_contests = []
|
|
|
| for i in range(0, len(SITES_CH), batch_size):
|
| batch = SITES_CH[i:i + batch_size]
|
| tasks = [self._scrape_single_site(site) for site in batch]
|
|
|
| batch_results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
| for result in batch_results:
|
| if isinstance(result, list):
|
| all_contests.extend(result)
|
| elif isinstance(result, Exception):
|
| logging.error(f"Batch scraping error: {result}")
|
|
|
| await asyncio.sleep(2)
|
|
|
| return all_contests
|
|
|
| async def _scrape_single_site(self, url: str) -> List[Contest]:
|
| """Scrape un site web spécifique"""
|
| try:
|
| content = await self._fetch_with_retry(url)
|
| if not content:
|
| return []
|
|
|
| soup = BeautifulSoup(content, 'html.parser')
|
| contests = self._extract_contests_from_soup(soup, url)
|
|
|
| logging.info(f"Found {len(contests)} contests on {url}")
|
| return contests
|
|
|
| except Exception as e:
|
| logging.error(f"Error scraping {url}: {e}")
|
| return []
|
|
|
| async def _fetch_with_retry(self, url: str, max_retries: int = 3) -> Optional[str]:
|
| """Fetch avec retry et gestion d'erreurs"""
|
| for attempt in range(max_retries):
|
| try:
|
| response = self.session.get(url, timeout=30)
|
| if response.status_code == 200:
|
| return response.text
|
| elif response.status_code == 429:
|
| wait_time = 2 ** attempt * 5
|
| logging.warning(f"Rate limited on {url}, waiting {wait_time}s")
|
| await asyncio.sleep(wait_time)
|
| else:
|
| logging.warning(f"HTTP {response.status_code} for {url}")
|
|
|
| except Exception as e:
|
| logging.error(f"Attempt {attempt+1} failed for {url}: {e}")
|
| if attempt < max_retries - 1:
|
| await asyncio.sleep(2 ** attempt)
|
|
|
| return None
|
|
|
| def _extract_contests_from_soup(self, soup: BeautifulSoup, base_url: str) -> List[Contest]:
|
| """Extrait les concours d'une page HTML"""
|
| contests = []
|
|
|
|
|
| selectors = [
|
| '.contest', '.concours', '.jeu', '.competition', '.giveaway',
|
| '[data-contest]', '[data-concours]', '.prize', '.lot',
|
| 'article[class*="concours"]', '.entry', '.participate'
|
| ]
|
|
|
| containers = []
|
| for selector in selectors:
|
| containers.extend(soup.select(selector))
|
|
|
|
|
| if not containers:
|
| containers = soup.find_all('a', href=re.compile(r'concours|jeu|contest|participate', re.I))
|
|
|
| for container in containers[:20]:
|
| try:
|
| contest = self._parse_contest_container(container, base_url)
|
| if contest and self._is_valid_contest(contest):
|
| contests.append(contest)
|
| except Exception as e:
|
| logging.debug(f"Error parsing container: {e}")
|
|
|
| return contests
|
|
|
| def _parse_contest_container(self, container, base_url: str) -> Optional[Contest]:
|
| """Parse un conteneur de concours"""
|
|
|
| title_selectors = ['h1', 'h2', 'h3', '.title', '.titre', '.contest-title']
|
| title = ""
|
| for selector in title_selectors:
|
| title_elem = container.select_one(selector)
|
| if title_elem:
|
| title = title_elem.get_text(strip=True)
|
| break
|
|
|
| if not title:
|
| title = container.get_text(strip=True)[:100]
|
|
|
|
|
| url = ""
|
| link_elem = container if container.name == 'a' else container.find('a')
|
| if link_elem and link_elem.get('href'):
|
| url = urljoin(base_url, link_elem['href'])
|
|
|
|
|
| description = container.get_text(strip=True)[:500]
|
|
|
|
|
| deadline = self._extract_deadline(description)
|
| prize = self._extract_prize(description)
|
|
|
| if not title or not url:
|
| return None
|
|
|
| return Contest(
|
| title=title[:200],
|
| url=url,
|
| description=description,
|
| source=base_url,
|
| deadline=deadline,
|
| prize=prize,
|
| difficulty_score=self._estimate_difficulty(description)
|
| )
|
|
|
| def _extract_deadline(self, text: str) -> Optional[str]:
|
| """Extrait la date limite du texte"""
|
| patterns = [
|
| r"jusqu[\'']?au (\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4})",
|
| r"avant le (\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4})",
|
| r"fin le (\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4})"
|
| ]
|
|
|
| for pattern in patterns:
|
| match = re.search(pattern, text, re.I)
|
| if match:
|
| return match.group(1)
|
| return None
|
|
|
| def _extract_prize(self, text: str) -> Optional[str]:
|
| """Extrait le prix du texte"""
|
| patterns = [
|
| r"gagne[rz]?\s+([^.!?]{1,50})",
|
| r"prix[:\s]+([^.!?]{1,50})",
|
| r"lot[:\s]+([^.!?]{1,50})",
|
| r"(\d+\s*CHF|\d+\s*euros?|\d+\s*francs?)"
|
| ]
|
|
|
| for pattern in patterns:
|
| match = re.search(pattern, text, re.I)
|
| if match:
|
| return match.group(1).strip()
|
| return None
|
|
|
| def _estimate_difficulty(self, description: str) -> int:
|
| """Estime la difficulté de participation (0-10)"""
|
| difficulty = 0
|
|
|
| if re.search(r'justifi|motivation|pourquoi|essay', description, re.I):
|
| difficulty += 3
|
| if re.search(r'photo|image|créat', description, re.I):
|
| difficulty += 2
|
| if re.search(r'quiz|question|répond', description, re.I):
|
| difficulty += 1
|
| if re.search(r'partag|social|facebook|twitter', description, re.I):
|
| difficulty += 1
|
| if re.search(r'inscription|compte|profil', description, re.I):
|
| difficulty += 1
|
|
|
| return min(difficulty, 10)
|
|
|
| def _is_valid_contest(self, contest: Contest) -> bool:
|
| """Valide qu'un concours est légitime"""
|
| swiss_indicators = [
|
| 'suisse', 'switzerland', 'ch', 'romandie', 'genève', 'lausanne',
|
| 'zurich', 'bern', 'ouvert en suisse', 'résidents suisses'
|
| ]
|
|
|
| full_text = (contest.title + " " + contest.description).lower()
|
| has_swiss_access = any(indicator in full_text for indicator in swiss_indicators)
|
|
|
| excluded_terms = [
|
| 'payant', 'payment', 'carte bancaire', 'spam', 'phishing',
|
| 'adult', 'casino', 'bitcoin', 'crypto', 'investment'
|
| ]
|
|
|
| has_excluded = any(term in full_text for term in excluded_terms)
|
| valid_url = contest.url.startswith(('http://', 'https://'))
|
|
|
| return has_swiss_access and not has_excluded and valid_url
|
|
|
| def _filter_unique_contests(self, contests: List[Contest]) -> List[Contest]:
|
| """Filtre les concours uniques et non déjà traités"""
|
| unique_contests = []
|
| seen_urls = set()
|
|
|
| for contest in contests:
|
| if contest.url not in seen_urls and not self.db.participation_exists(contest.url):
|
| unique_contests.append(contest)
|
| seen_urls.add(contest.url)
|
|
|
| return unique_contests
|
|
|
|
|
|
|
|
|
|
|
| class SmartParticipator:
|
| def __init__(self, db_manager: DatabaseManager, response_engine: LocalResponseEngine):
|
| self.db = db_manager
|
| self.response_engine = response_engine
|
| self.personal_info = PERSONAL_INFO
|
|
|
| self.field_patterns = {
|
| 'prenom': [r'prenom|prénom|first.*name'],
|
| 'nom': [r'nom(?!.*prenom)|last.*name|family.*name'],
|
| 'email': [r'email|e-mail|courriel'],
|
| 'telephone': [r'tel|phone|telephone|téléphone'],
|
| 'adresse': [r'adresse|address|rue|street'],
|
| 'code_postal': [r'code.postal|zip|postal'],
|
| 'ville': [r'ville|city|localité'],
|
| 'pays': [r'pays|country|nation'],
|
| 'motivation': [r'motivation|pourquoi|why|reason'],
|
| 'quiz': [r'question|quiz|réponse|answer']
|
| }
|
|
|
| async def participate_in_contest(self, contest: Contest) -> bool:
|
| """Participe à un concours de manière intelligente"""
|
| async with async_playwright() as p:
|
| browser = await p.chromium.launch(
|
| headless=True,
|
| args=['--no-sandbox', '--disable-blink-features=AutomationControlled']
|
| )
|
|
|
| context = await browser.new_context(
|
| user_agent=random.choice(USER_AGENTS),
|
| viewport={'width': 1920, 'height': 1080}
|
| )
|
|
|
| page = await context.new_page()
|
|
|
| try:
|
|
|
| analysis = await self._analyze_form(page, contest.url)
|
|
|
| if analysis.estimated_success_rate < 0.3:
|
| logging.warning(f"Low success rate for {contest.url}: {analysis.estimated_success_rate}")
|
| self.db.add_participation(contest, 'skipped_low_success', analysis.estimated_success_rate)
|
| return False
|
|
|
| if analysis.requires_captcha:
|
| logging.warning(f"CAPTCHA detected for {contest.url}, skipping")
|
| self.db.add_participation(contest, 'skipped_captcha', analysis.estimated_success_rate)
|
| return False
|
|
|
|
|
| success = await self._fill_and_submit_form(page, analysis, contest)
|
|
|
| status = 'success' if success else 'failed'
|
| self.db.add_participation(contest, status, analysis.estimated_success_rate)
|
|
|
| logging.info(f"Participation {'successful' if success else 'failed'} for {contest.title}")
|
| return success
|
|
|
| except Exception as e:
|
| logging.error(f"Participation error for {contest.url}: {e}")
|
| self.db.add_participation(contest, 'error', 0.0)
|
| return False
|
|
|
| finally:
|
| await browser.close()
|
|
|
| async def _analyze_form(self, page: Page, url: str) -> FormAnalysis:
|
| """Analyse un formulaire de concours"""
|
| try:
|
| await page.goto(url, wait_until='networkidle', timeout=15000)
|
|
|
|
|
| fields = await self._detect_form_fields(page)
|
|
|
|
|
| complexity = sum(self._calculate_field_complexity(field) for field in fields)
|
|
|
|
|
| has_captcha = await self._detect_captcha(page)
|
| has_social_requirements = await self._detect_social_requirements(page)
|
|
|
|
|
| success_rate = self._estimate_success_rate(fields, complexity, has_captcha)
|
|
|
| return FormAnalysis(
|
| fields=fields,
|
| complexity_score=complexity,
|
| estimated_success_rate=success_rate,
|
| requires_captcha=has_captcha,
|
| requires_social_media=has_social_requirements,
|
| form_url=url
|
| )
|
|
|
| except Exception as e:
|
| logging.error(f"Form analysis error: {e}")
|
| return FormAnalysis([], 10, 0.0, True, True, url)
|
|
|
| async def _detect_form_fields(self, page: Page) -> List[FormField]:
|
| """Détecte tous les champs de formulaire"""
|
| fields = []
|
|
|
| selectors = [
|
| 'input[type="text"]', 'input[type="email"]', 'input[type="tel"]',
|
| 'input[type="number"]', 'input:not([type])', 'textarea', 'select'
|
| ]
|
|
|
| for selector in selectors:
|
| elements = await page.query_selector_all(selector)
|
|
|
| for element in elements:
|
| try:
|
| field = await self._analyze_single_field(element, page)
|
| if field:
|
| fields.append(field)
|
| except Exception:
|
| continue
|
|
|
| return fields
|
|
|
| async def _analyze_single_field(self, element, page: Page) -> Optional[FormField]:
|
| """Analyse un champ individuel"""
|
| try:
|
| field_type = await element.evaluate('el => el.type || el.tagName.toLowerCase()')
|
| name = await element.evaluate('el => el.name || el.id || ""')
|
| placeholder = await element.evaluate('el => el.placeholder || ""')
|
| required = await element.evaluate('el => el.required')
|
|
|
|
|
| label_text = await self._find_field_label(element, page)
|
|
|
|
|
| selector = await self._create_unique_selector(element)
|
|
|
| return FormField(
|
| selector=selector,
|
| field_type=field_type,
|
| label=label_text,
|
| required=required,
|
| ai_context=f"Name: {name}, Placeholder: {placeholder}, Label: {label_text}"
|
| )
|
|
|
| except Exception:
|
| return None
|
|
|
| async def _find_field_label(self, element, page: Page) -> str:
|
| """Trouve le label associé à un champ"""
|
| try:
|
|
|
| element_id = await element.evaluate('el => el.id')
|
| if element_id:
|
| label = await page.query_selector(f'label[for="{element_id}"]')
|
| if label:
|
| return await label.inner_text()
|
|
|
|
|
| parent_label = await element.evaluate('''
|
| el => {
|
| let parent = el.parentElement;
|
| while (parent && parent.tagName !== 'BODY') {
|
| if (parent.tagName === 'LABEL') {
|
| return parent.innerText;
|
| }
|
| parent = parent.parentElement;
|
| }
|
| return '';
|
| }
|
| ''')
|
|
|
| if parent_label:
|
| return parent_label.strip()
|
|
|
|
|
| prev_text = await element.evaluate('''
|
| el => {
|
| const prev = el.previousElementSibling;
|
| return prev ? prev.innerText : '';
|
| }
|
| ''')
|
|
|
| return prev_text.strip()
|
|
|
| except Exception:
|
| return ""
|
|
|
| async def _create_unique_selector(self, element) -> str:
|
| """Crée un sélecteur CSS unique"""
|
|
|
| element_id = await element.evaluate('el => el.id')
|
| if element_id:
|
| return f'#{element_id}'
|
|
|
| name = await element.evaluate('el => el.name')
|
| if name:
|
| return f'[name="{name}"]'
|
|
|
| class_name = await element.evaluate('el => el.className')
|
| tag_name = await element.evaluate('el => el.tagName.toLowerCase()')
|
|
|
| if class_name:
|
| return f'{tag_name}.{class_name.split()[0]}'
|
|
|
|
|
| return f'{tag_name}:nth-of-type(1)'
|
|
|
| def _calculate_field_complexity(self, field: FormField) -> int:
|
| """Calcule la complexité d'un champ"""
|
| complexity = 1
|
|
|
| if field.field_type == 'textarea':
|
| complexity += 3
|
| elif field.field_type == 'select':
|
| complexity += 2
|
| elif field.required:
|
| complexity += 1
|
|
|
| if re.search(r'motivation|justifi|pourquoi', field.label, re.I):
|
| complexity += 3
|
| elif re.search(r'quiz|question', field.label, re.I):
|
| complexity += 2
|
|
|
| return complexity
|
|
|
| async def _detect_captcha(self, page: Page) -> bool:
|
| """Détecte la présence de CAPTCHA"""
|
| captcha_selectors = [
|
| '.g-recaptcha', '.h-captcha', '#captcha', '.captcha',
|
| 'iframe[src*="recaptcha"]', 'iframe[src*="hcaptcha"]'
|
| ]
|
|
|
| for selector in captcha_selectors:
|
| element = await page.query_selector(selector)
|
| if element:
|
| return True
|
|
|
| return False
|
|
|
| async def _detect_social_requirements(self, page: Page) -> bool:
|
| """Détecte les exigences de réseaux sociaux"""
|
| content = await page.content()
|
| social_patterns = [
|
| r'follow.*us', r'partag.*facebook', r'retweet',
|
| r'like.*page', r'subscribe.*channel'
|
| ]
|
|
|
| for pattern in social_patterns:
|
| if re.search(pattern, content, re.I):
|
| return True
|
|
|
| return False
|
|
|
| def _estimate_success_rate(self, fields: List[FormField], complexity: int, has_captcha: bool) -> float:
|
| """Estime le taux de succès"""
|
| base_rate = 0.8
|
|
|
| if has_captcha:
|
| base_rate *= 0.1
|
|
|
| if complexity > 15:
|
| base_rate *= 0.4
|
| elif complexity > 10:
|
| base_rate *= 0.6
|
| elif complexity > 5:
|
| base_rate *= 0.8
|
|
|
| required_fields = [f for f in fields if f.required]
|
| if len(required_fields) <= 3:
|
| base_rate *= 1.1
|
|
|
| return min(base_rate, 1.0)
|
|
|
| async def _fill_and_submit_form(self, page: Page, analysis: FormAnalysis, contest: Contest) -> bool:
|
| """Remplit et soumet le formulaire"""
|
| try:
|
| filled_fields = 0
|
|
|
| for field in analysis.fields:
|
| try:
|
|
|
| await page.wait_for_selector(field.selector, timeout=3000)
|
| element = await page.query_selector(field.selector)
|
|
|
| if not element:
|
| continue
|
|
|
|
|
| value = await self._generate_field_value(field, contest, page)
|
| if not value:
|
| continue
|
|
|
|
|
| if field.field_type == 'select':
|
| await self._fill_select_field(element, value, page)
|
| else:
|
| await element.fill(value)
|
|
|
| filled_fields += 1
|
| await asyncio.sleep(random.uniform(0.3, 0.8))
|
|
|
| except Exception as e:
|
| logging.debug(f"Error filling field {field.selector}: {e}")
|
| continue
|
|
|
|
|
| submit_success = await self._submit_form(page)
|
|
|
| logging.info(f"Filled {filled_fields}/{len(analysis.fields)} fields, submitted: {submit_success}")
|
| return filled_fields > 0 and submit_success
|
|
|
| except Exception as e:
|
| logging.error(f"Form filling error: {e}")
|
| return False
|
|
|
| async def _generate_field_value(self, field: FormField, contest: Contest, page: Page) -> Optional[str]:
|
| """Génère une valeur pour un champ"""
|
|
|
| field_type = self._identify_field_type(field)
|
|
|
|
|
| personal_mapping = {
|
| 'prenom': self.personal_info.prenom,
|
| 'nom': self.personal_info.nom,
|
| 'email': self.personal_info.email_derivee,
|
| 'telephone': self.personal_info.telephone,
|
| 'adresse': self.personal_info.adresse,
|
| 'code_postal': self.personal_info.code_postal,
|
| 'ville': self.personal_info.ville,
|
| 'pays': self.personal_info.pays
|
| }
|
|
|
| if field_type in personal_mapping:
|
| return personal_mapping[field_type]
|
|
|
|
|
| if field_type == 'motivation':
|
| return self.response_engine.generate_response(
|
| field.label,
|
| contest.description,
|
| "motivation"
|
| )
|
| elif field_type == 'quiz':
|
| return self.response_engine.generate_response(
|
| field.label,
|
| contest.description,
|
| "quiz"
|
| )
|
|
|
|
|
| default_values = {
|
| 'age': '25',
|
| 'genre': 'Monsieur',
|
| 'profession': 'Étudiant'
|
| }
|
|
|
| for key, value in default_values.items():
|
| if key in field.label.lower():
|
| return value
|
|
|
| return None
|
|
|
| def _identify_field_type(self, field: FormField) -> str:
|
| """Identifie le type de champ"""
|
| combined_text = f"{field.ai_context} {field.label}".lower()
|
|
|
| for field_type, patterns in self.field_patterns.items():
|
| for pattern in patterns:
|
| if re.search(pattern, combined_text, re.I):
|
| return field_type
|
|
|
| return 'unknown'
|
|
|
| async def _fill_select_field(self, element, value: str, page: Page):
|
| """Remplit un champ select"""
|
| try:
|
| options = await element.query_selector_all('option')
|
|
|
| for option in options:
|
| option_text = await option.inner_text()
|
| option_value = await option.get_attribute('value')
|
|
|
| if (value.lower() in option_text.lower() or
|
| value.lower() in (option_value or "").lower()):
|
| await element.select_option(value=option_value)
|
| return
|
|
|
|
|
| if options and len(options) > 1:
|
| first_option = await options[1].get_attribute('value')
|
| await element.select_option(value=first_option)
|
|
|
| except Exception as e:
|
| logging.debug(f"Select field error: {e}")
|
|
|
| async def _submit_form(self, page: Page) -> bool:
|
| """Soumet le formulaire"""
|
| submit_selectors = [
|
| 'input[type="submit"]',
|
| 'button[type="submit"]',
|
| 'button:has-text("Participer")',
|
| 'button:has-text("Envoyer")',
|
| 'button:has-text("Valider")',
|
| '.submit-btn',
|
| '.participate-btn'
|
| ]
|
|
|
| for selector in submit_selectors:
|
| try:
|
| element = await page.query_selector(selector)
|
| if element:
|
| is_visible = await element.is_visible()
|
| is_enabled = await element.is_enabled()
|
|
|
| if is_visible and is_enabled:
|
| await element.click()
|
| await page.wait_for_timeout(3000)
|
| return True
|
|
|
| except Exception:
|
| continue
|
|
|
| return False
|
|
|
|
|
|
|
|
|
|
|
| class ContestBotOrchestrator:
|
| def __init__(self):
|
| self.db = DatabaseManager()
|
| self.response_engine = LocalResponseEngine()
|
| self.scraper = None
|
| self.participator = SmartParticipator(self.db, self.response_engine)
|
|
|
| async def run_full_cycle(self):
|
| """Execute un cycle complet de scraping et participation"""
|
| logging.info("Starting full contest bot cycle (sans API)")
|
|
|
| try:
|
|
|
| async with LocalScraper(self.db) as scraper:
|
| self.scraper = scraper
|
| contests = await scraper.scrape_all_sources()
|
|
|
| if not contests:
|
| logging.info("No new contests found")
|
| return
|
|
|
|
|
| contests.sort(key=lambda x: x.difficulty_score)
|
|
|
|
|
| participation_count = 0
|
| max_daily_participations = 15
|
|
|
| for contest in contests[:max_daily_participations]:
|
| try:
|
|
|
| if participation_count > 0:
|
| wait_time = random.uniform(30, 120)
|
| logging.info(f"Waiting {wait_time:.0f}s before next participation")
|
| await asyncio.sleep(wait_time)
|
|
|
| success = await self.participator.participate_in_contest(contest)
|
| participation_count += 1
|
|
|
| if success:
|
| logging.info(f"SUCCESS: Successfully participated in: {contest.title}")
|
| else:
|
| logging.warning(f"FAILED: Failed to participate in: {contest.title}")
|
|
|
|
|
| if success:
|
| await asyncio.sleep(random.uniform(60, 180))
|
|
|
| except Exception as e:
|
| logging.error(f"Error participating in {contest.title}: {e}")
|
| continue
|
|
|
| logging.info(f"Participation cycle completed: {participation_count} attempts")
|
|
|
| except Exception as e:
|
| logging.error(f"Full cycle error: {e}")
|
|
|
| def generate_report(self) -> str:
|
| """Génère un rapport simple"""
|
| stats = self.db.get_stats()
|
|
|
| today = datetime.now().strftime('%Y-%m-%d')
|
| conn = self.db._get_connection()
|
|
|
| today_participations = conn.execute(
|
| "SELECT COUNT(*) FROM participations WHERE date = ?", (today,)
|
| ).fetchone()[0]
|
|
|
| today_successes = conn.execute(
|
| "SELECT COUNT(*) FROM participations WHERE date = ? AND status = 'success'", (today,)
|
| ).fetchone()[0]
|
|
|
| success_rate = (today_successes / max(today_participations, 1)) * 100
|
|
|
| report = f"""
|
| 📊 RAPPORT QUOTIDIEN - {today}
|
|
|
| 🎯 Aujourd'hui:
|
| • Participations: {today_participations}
|
| • Succès: {today_successes}
|
| • Taux de succès: {success_rate:.1f}%
|
|
|
| 📈 Total:
|
| • Participations totales: {stats['total_participations']}
|
| • Participations réussies: {stats['successful_participations']}
|
|
|
| 🌐 Par source:
|
| """
|
|
|
| for source, count in stats['by_source'].items():
|
| report += f" • {source}: {count}\n"
|
|
|
| return report
|
|
|
|
|
|
|
|
|
|
|
| def run_bot_cycle():
|
| """Point d'entrée pour le scheduler"""
|
|
|
| if sys.platform.startswith('win'):
|
| asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
|
|
|
| bot = ContestBotOrchestrator()
|
|
|
|
|
| asyncio.run(bot.run_full_cycle())
|
|
|
|
|
| report = bot.generate_report()
|
| print(report)
|
|
|
| def main():
|
| """Fonction principale avec scheduler"""
|
| print("🎰 BOT DE CONCOURS SUISSE - VERSION SANS API")
|
| print("=" * 50)
|
| print("✅ Système de réponses locales activé")
|
| print("✅ Scraping direct des sites web")
|
| print("✅ Aucune API externe requise")
|
| print()
|
|
|
|
|
| if sys.platform.startswith('win'):
|
| asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
|
|
|
|
|
| if len(sys.argv) > 1 and sys.argv[1] == "--run-now":
|
| logging.info("Running immediate cycle")
|
| run_bot_cycle()
|
| return
|
|
|
|
|
| logging.info("Starting Contest Bot with scheduler")
|
| schedule.every().day.at("08:00").do(run_bot_cycle)
|
| schedule.every().day.at("14:00").do(run_bot_cycle)
|
|
|
|
|
| print("🚀 Lancement immédiat pour test...")
|
| run_bot_cycle()
|
|
|
|
|
| logging.info("Scheduler started. Waiting for scheduled tasks...")
|
|
|
| while True:
|
| try:
|
| schedule.run_pending()
|
| time.sleep(60)
|
| except KeyboardInterrupt:
|
| logging.info("Bot stopped by user")
|
| break
|
| except Exception as e:
|
| logging.error(f"Scheduler error: {e}")
|
| time.sleep(300)
|
|
|
| if __name__ == "__main__":
|
| main()
|
|
|