1proxy / app /hunter /service.py
Paijo
update app/hunter/service.py
fafa4d5 verified
raw
history blame
4.79 kB
import logging
import aiohttp
import asyncio
from datetime import datetime
from typing import List, Optional
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.db_models import CandidateSource, ProxySource
from app.hunter.strategies.github import GitHubStrategy
from app.hunter.strategies.ai import AIStrategy
from app.hunter.strategies.search import SearchStrategy
from app.hunter.extractor import UniversalExtractor
logger = logging.getLogger(__name__)
class HunterService:
def __init__(self):
self.strategies = [GitHubStrategy(), AIStrategy(), SearchStrategy()]
async def run_hunt(self):
"""
Execute all discovery strategies and process results.
"""
logger.info("Starting Hunter Protocol...")
discovered_urls = set()
# 1. Run all strategies concurrently
tasks = [strategy.discover() for strategy in self.strategies]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
strategy_name = self.strategies[i].name
if isinstance(result, Exception):
logger.error(f"Strategy {strategy_name} failed: {result}")
continue
if result:
logger.info(f"Strategy {strategy_name} found {len(result)} URLs")
for url in result:
discovered_urls.add((url, strategy_name))
# 2. Process unique URLs
logger.info(f"Total unique candidates found: {len(discovered_urls)}")
async for session in get_db():
for url, method in discovered_urls:
await self.process_candidate(session, url, method)
await session.commit()
logger.info("Hunter Protocol complete.")
async def process_candidate(self, session: AsyncSession, url: str, method: str):
"""
Check if URL is new, save it, and score it.
"""
# Check if already exists in Candidates
stmt = select(CandidateSource).where(CandidateSource.url == url)
result = await session.execute(stmt)
if result.scalar_one_or_none():
logger.debug(f"Candidate already exists: {url}")
return
# Check if already exists in Active Sources
stmt = select(ProxySource).where(ProxySource.url == url)
result = await session.execute(stmt)
if result.scalar_one_or_none():
logger.debug(f"Source already active: {url}")
return
# Fetch and Analyze
try:
content = await self._fetch_content(url)
proxies = UniversalExtractor.extract_proxies(content, source_url=url)
confidence = self._calculate_confidence(url, proxies)
# Save
candidate = CandidateSource(
url=url,
domain=self._extract_domain(url),
discovery_method=method,
status="pending",
confidence_score=confidence,
proxies_found_count=len(proxies),
last_checked_at=datetime.utcnow(),
)
session.add(candidate)
logger.info(
f"Added candidate: {url} (Score: {confidence}, Proxies: {len(proxies)})"
)
except Exception as e:
logger.warning(f"Failed to process candidate {url}: {str(e)}")
# We might still save it as 'failed' or 'pending' retry?
# For now, skip invalid URLs to keep DB clean.
async def _fetch_content(self, url: str) -> str:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=10) as resp:
if resp.status != 200:
raise Exception(f"HTTP {resp.status}")
return await resp.text()
def _calculate_confidence(self, url: str, proxies: List[any]) -> int:
score = 0
# Domain Trust
if "github.com" in url or "raw.githubusercontent.com" in url:
score += 20
elif "pastebin.com" in url:
score += 10
# Content Volume
count = len(proxies)
if count > 0:
score += 10
if count > 50:
score += 20
if count > 500:
score += 20
# Protocol Diversity
protocols = {p.protocol for p in proxies}
if len(protocols) > 1:
score += 10
if "vmess" in protocols or "vless" in protocols:
score += 10
return min(score, 100)
def _extract_domain(self, url: str) -> str:
from urllib.parse import urlparse
try:
return urlparse(url).netloc
except:
return ""