| |
| """ |
| Sistema de execução massiva de testes com paralelismo |
| """ |
| import asyncio |
| import logging |
| import time |
| import threading |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from typing import Dict, List, Any, Optional |
| from datetime import datetime |
| import uuid |
| import sys |
| import os |
|
|
| |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
| from graphs.main_graph import AgentGraphManager |
| from testes.test_validator import TestValidator |
| from utils.config import AVAILABLE_MODELS |
|
|
| class MassiveTestRunner: |
| """ |
| Executor de testes massivos com paralelismo otimizado |
| """ |
| |
| def __init__(self, max_workers: int = 5): |
| """ |
| Inicializa o test runner |
| |
| Args: |
| max_workers: Número máximo de workers paralelos |
| """ |
| self.max_workers = max_workers |
| logging.info(f"🔧 MassiveTestRunner inicializado com {max_workers} workers paralelos") |
| self.validator = TestValidator() |
| self.results = { |
| 'session_info': {}, |
| 'group_results': [], |
| 'individual_results': [], |
| 'summary': {} |
| } |
| self.status = { |
| 'current_status': 'idle', |
| 'progress': 0, |
| 'current_group': None, |
| 'completed_tests': 0, |
| 'total_tests': 0, |
| 'start_time': None, |
| 'estimated_remaining': None, |
| 'errors': [], |
| 'current_test': None, |
| 'running_tests': {}, |
| 'cancelled_tests': set(), |
| 'timeout_tests': set() |
| } |
| self._lock = threading.Lock() |
| self._cancel_event = threading.Event() |
| self._test_timeout = 360 |
| self._active_futures = {} |
| |
| async def run_test_session(self, session: Dict[str, Any], validation_method: str = 'llm', expected_content: str = None) -> Dict[str, Any]: |
| """ |
| Executa sessão completa de testes |
| |
| Args: |
| session: Dados da sessão de teste |
| validation_method: Método de validação ('llm' ou 'keyword') |
| expected_content: Conteúdo esperado (para validação keyword) |
| |
| Returns: |
| Resultados completos dos testes |
| """ |
| try: |
| print(f"\n🔥 MASSIVE TEST RUNNER INICIADO") |
| print(f"📋 Sessão: {session['id']}") |
| print(f"❓ Pergunta: {session['question']}") |
| print(f"👥 Grupos: {len(session['groups'])}") |
|
|
| total_tests = sum(group['iterations'] for group in session['groups']) |
| print(f"🔢 Total de testes: {total_tests}") |
| print(f"⚡ Workers paralelos: {self.max_workers}") |
| print("-" * 60) |
|
|
| logging.info(f"🚀 Iniciando sessão de testes: {session['id']}") |
|
|
| |
| with self._lock: |
| self.status.update({ |
| 'current_status': 'initializing', |
| 'start_time': time.time(), |
| 'total_tests': total_tests |
| }) |
| |
| |
| self.results['session_info'] = { |
| 'id': session['id'], |
| 'question': session['question'], |
| 'validation_method': validation_method, |
| 'expected_content': expected_content, |
| 'total_groups': len(session['groups']), |
| 'total_tests': self.status['total_tests'], |
| 'started_at': datetime.now().isoformat() |
| } |
| |
| |
| group_results = [] |
| |
| for group_idx, group in enumerate(session['groups']): |
| print(f"\n📊 EXECUTANDO GRUPO {group_idx + 1}/{len(session['groups'])}") |
| print(f"🤖 Modelo SQL: {group['sql_model_name']}") |
| print(f"🔄 Processing Agent: {'✅ ' + group['processing_model_name'] if group['processing_enabled'] else '❌ Desativado'}") |
| print(f"🔢 Iterações: {group['iterations']}") |
| print(f"⏰ {datetime.now().strftime('%H:%M:%S')}") |
|
|
| logging.info(f"📊 Executando grupo {group_idx + 1}/{len(session['groups'])}: {group['sql_model_name']}") |
|
|
| with self._lock: |
| self.status['current_group'] = group_idx + 1 |
| self.status['current_status'] = 'running_group' |
| |
| |
| group_result = await self._run_group_tests( |
| session['question'], |
| group, |
| validation_method, |
| expected_content |
| ) |
| |
| group_results.append(group_result) |
| self.results['group_results'] = group_results |
| |
| |
| completed_so_far = sum(len(gr['individual_results']) for gr in group_results) |
| with self._lock: |
| self.status['completed_tests'] = completed_so_far |
| self.status['progress'] = (completed_so_far / self.status['total_tests']) * 100 |
| |
| |
| if self.status['start_time']: |
| elapsed = time.time() - self.status['start_time'] |
| if completed_so_far > 0: |
| avg_time_per_test = elapsed / completed_so_far |
| remaining_tests = self.status['total_tests'] - completed_so_far |
| self.status['estimated_remaining'] = avg_time_per_test * remaining_tests |
| |
| |
| self._generate_summary() |
|
|
| with self._lock: |
| self.status['current_status'] = 'completed' |
| self.status['progress'] = 100 |
| self.status['end_time'] = time.time() |
| total_time = self.status['end_time'] - self.status['start_time'] |
| self.status['total_execution_time'] = total_time |
|
|
| logging.info(f"✅ Sessão de testes concluída: {session['id']}") |
| logging.info(f"📊 Resumo final: {self.status['total_tests']} testes em {total_time:.2f}s") |
| logging.info(f"🎯 Taxa geral de sucesso: {self.results['summary'].get('overall_success_rate', 0)}%") |
|
|
| return self.results |
| |
| except Exception as e: |
| logging.error(f"❌ Erro na sessão de testes: {e}") |
| with self._lock: |
| self.status['current_status'] = 'error' |
| self.status['errors'].append(str(e)) |
| raise |
| |
| async def _run_group_tests(self, question: str, group: Dict[str, Any], validation_method: str, expected_content: str) -> Dict[str, Any]: |
| """ |
| Executa testes de um grupo específico com paralelismo REAL |
| |
| Args: |
| question: Pergunta do teste |
| group: Configuração do grupo |
| validation_method: Método de validação |
| expected_content: Conteúdo esperado |
| |
| Returns: |
| Resultados do grupo |
| """ |
| print(f"🔄 Executando {group['iterations']} testes em paralelo (máx {self.max_workers} simultâneos)") |
| logging.info(f"🔄 Executando {group['iterations']} testes para grupo {group['id']}") |
|
|
| |
| semaphore = asyncio.Semaphore(self.max_workers) |
|
|
| print(f"⚡ Iniciando {group['iterations']} testes com paralelismo REAL...") |
| start_time = time.time() |
|
|
| |
| print(f"🚀 Executando {group['iterations']} testes em paralelo (máx {self.max_workers} simultâneos)") |
|
|
| |
| semaphore = asyncio.Semaphore(self.max_workers) |
| tasks = [] |
|
|
| print(f"⚡ Criando {group['iterations']} tasks paralelas...") |
| for iteration in range(group['iterations']): |
| task = self._run_single_test( |
| semaphore, |
| question, |
| group, |
| iteration + 1, |
| validation_method, |
| expected_content |
| ) |
| tasks.append(task) |
|
|
| print(f"🚀 Executando {len(tasks)} testes em paralelo...") |
| start_time = time.time() |
|
|
| |
| individual_results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
| execution_time = time.time() - start_time |
| print(f"✅ Grupo {group['id']} concluído em {execution_time:.2f}s") |
|
|
| execution_time = time.time() - start_time |
| print(f"✅ Grupo {group['id']} concluído em {execution_time:.2f}s ({group['iterations']} testes)") |
| |
| |
| valid_results = [] |
| errors = [] |
| |
| for result in individual_results: |
| if isinstance(result, Exception): |
| errors.append(str(result)) |
| logging.error(f"Erro em teste individual: {result}") |
| else: |
| valid_results.append(result) |
| self.results['individual_results'].append(result) |
| |
| |
| group_stats = self._calculate_group_stats(valid_results, group) |
| group_stats['errors'] = errors |
| group_stats['error_count'] = len(errors) |
| |
| logging.info(f"✅ Grupo {group['id']} concluído: {len(valid_results)} sucessos, {len(errors)} erros") |
| |
| return group_stats |
| |
| async def _run_single_test(self, semaphore: asyncio.Semaphore, question: str, group: Dict[str, Any], |
| iteration: int, validation_method: str, expected_content: str) -> Dict[str, Any]: |
| """ |
| Executa um teste individual com paralelismo real |
| |
| Args: |
| semaphore: Semáforo para controle de concorrência |
| question: Pergunta do teste |
| group: Configuração do grupo |
| iteration: Número da iteração |
| validation_method: Método de validação |
| expected_content: Conteúdo esperado |
| |
| Returns: |
| Resultado do teste individual |
| """ |
| async with semaphore: |
| try: |
| start_time = time.time() |
|
|
| |
| thread_id = f"test_{group['id']}_{iteration}_{uuid.uuid4().hex[:8]}" |
|
|
| |
| with self._lock: |
| self.status['running_tests'][thread_id] = { |
| 'start_time': start_time, |
| 'group_id': group['id'], |
| 'iteration': iteration, |
| 'question': question[:50] + '...' if len(question) > 50 else question |
| } |
| self.status['current_test'] = thread_id |
|
|
| print(f"🔄 [{datetime.now().strftime('%H:%M:%S')}] 🚀 INICIANDO {thread_id} (Worker {asyncio.current_task().get_name() if asyncio.current_task() else 'unknown'})") |
| logging.info(f"🔄 Iniciando teste {thread_id} - Grupo {group['id']}, Iteração {iteration}") |
|
|
| |
| if thread_id in self.status['cancelled_tests']: |
| print(f"🚫 Teste {thread_id} cancelado antes de iniciar") |
| return self._create_cancelled_result(thread_id, group, iteration, start_time) |
|
|
| |
| current_task = asyncio.current_task() |
| with self._lock: |
| self._active_futures[thread_id] = current_task |
|
|
| |
| loop = asyncio.get_event_loop() |
|
|
| def run_sync_test(): |
| """Executa teste de forma síncrona em thread separada""" |
| try: |
| |
| if thread_id in self.status['cancelled_tests']: |
| return {'cancelled': True, 'reason': 'cancelled_before_start'} |
|
|
| |
| new_loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(new_loop) |
|
|
| |
| graph_manager = AgentGraphManager() |
|
|
| |
| result = new_loop.run_until_complete( |
| graph_manager.process_query( |
| user_input=question, |
| selected_model=group['sql_model_name'], |
| processing_enabled=group['processing_enabled'], |
| processing_model=group['processing_model_name'] if group['processing_enabled'] else None, |
| thread_id=thread_id |
| ) |
| ) |
|
|
| new_loop.close() |
|
|
| |
| if thread_id in self.status['cancelled_tests']: |
| return {'cancelled': True, 'reason': 'cancelled_after_execution'} |
|
|
| return result |
|
|
| except Exception as e: |
| logging.error(f"Erro em thread separada para {thread_id}: {e}") |
| return {'error': str(e)} |
|
|
| |
| with ThreadPoolExecutor(max_workers=1) as executor: |
| future = loop.run_in_executor(executor, run_sync_test) |
|
|
| |
| while not future.done(): |
| await asyncio.sleep(0.1) |
| if thread_id in self.status['cancelled_tests']: |
| future.cancel() |
| print(f"🚫 Cancelando future do teste {thread_id}") |
| try: |
| await future |
| except: |
| pass |
| return self._create_cancelled_result(thread_id, group, iteration, start_time, 'user_cancelled') |
|
|
| result = await future |
|
|
| execution_time = time.time() - start_time |
|
|
| |
| with self._lock: |
| if thread_id in self.status['running_tests']: |
| del self.status['running_tests'][thread_id] |
| if self.status['current_test'] == thread_id: |
| self.status['current_test'] = None |
| if thread_id in self._active_futures: |
| del self._active_futures[thread_id] |
|
|
| |
| if isinstance(result, dict): |
| if result.get('cancelled'): |
| print(f"🚫 [{datetime.now().strftime('%H:%M:%S')}] CANCELADO {thread_id} - {result.get('reason', 'unknown')}") |
| logging.info(f"🚫 Teste {thread_id} cancelado") |
| return self._create_cancelled_result(thread_id, group, iteration, start_time, result.get('reason')) |
| elif result.get('timeout'): |
| print(f"⏰ [{datetime.now().strftime('%H:%M:%S')}] TIMEOUT {thread_id} após {result.get('duration')}s") |
| logging.warning(f"⏰ Teste {thread_id} timeout") |
| return self._create_timeout_result(thread_id, group, iteration, start_time, result.get('duration')) |
| elif result.get('error'): |
| print(f"❌ [{datetime.now().strftime('%H:%M:%S')}] ERRO {thread_id}: {result['error']}") |
| logging.error(f"❌ Teste {thread_id} erro: {result['error']}") |
|
|
| print(f"✅ [{datetime.now().strftime('%H:%M:%S')}] 🎉 CONCLUÍDO {thread_id} em {execution_time:.2f}s") |
| logging.info(f"✅ Teste {thread_id} concluído em {execution_time:.2f}s") |
|
|
| |
| validation_result = await self.validator.validate_result( |
| question=question, |
| sql_query=result.get('sql_query_extracted', ''), |
| response=result.get('response', ''), |
| method=validation_method, |
| expected_content=expected_content |
| ) |
|
|
| |
| individual_result = { |
| 'group_id': group['id'], |
| 'iteration': iteration, |
| 'thread_id': thread_id, |
| 'timestamp': datetime.now().isoformat(), |
| 'execution_time': round(execution_time, 2), |
| 'question': question, |
| 'sql_model': group['sql_model_name'], |
| 'processing_enabled': group['processing_enabled'], |
| 'processing_model': group['processing_model_name'], |
| 'sql_query': result.get('sql_query_extracted', ''), |
| 'response': result.get('response', ''), |
| 'error': result.get('error'), |
| 'success': not bool(result.get('error')), |
| 'validation': validation_result |
| } |
|
|
| |
| with self._lock: |
| self.status['completed_tests'] += 1 |
| progress = (self.status['completed_tests'] / self.status['total_tests']) * 100 |
| self.status['progress'] = progress |
|
|
| |
| if self.status['start_time']: |
| elapsed = time.time() - self.status['start_time'] |
| if self.status['completed_tests'] > 0: |
| avg_time_per_test = elapsed / self.status['completed_tests'] |
| remaining_tests = self.status['total_tests'] - self.status['completed_tests'] |
| self.status['estimated_remaining'] = avg_time_per_test * remaining_tests |
|
|
| |
| remaining_min = int(self.status['estimated_remaining'] // 60) |
| remaining_sec = int(self.status['estimated_remaining'] % 60) |
|
|
| print(f"📊 [{datetime.now().strftime('%H:%M:%S')}] Progresso: {self.status['completed_tests']}/{self.status['total_tests']} ({progress:.1f}%) - Restam ~{remaining_min}m{remaining_sec}s") |
|
|
| logging.info(f"📊 Progresso: {self.status['completed_tests']}/{self.status['total_tests']} ({progress:.1f}%)") |
|
|
| return individual_result |
|
|
| except Exception as e: |
| logging.error(f"❌ Erro em teste individual (grupo {group['id']}, iteração {iteration}): {e}") |
|
|
| |
| with self._lock: |
| self.status['completed_tests'] += 1 |
| self.status['errors'].append(f"Grupo {group['id']}, Iteração {iteration}: {e}") |
|
|
| return { |
| 'group_id': group['id'], |
| 'iteration': iteration, |
| 'thread_id': f"error_{group['id']}_{iteration}", |
| 'timestamp': datetime.now().isoformat(), |
| 'execution_time': time.time() - start_time, |
| 'question': question, |
| 'sql_model': group['sql_model_name'], |
| 'processing_enabled': group['processing_enabled'], |
| 'processing_model': group['processing_model_name'], |
| 'sql_query': '', |
| 'response': '', |
| 'error': str(e), |
| 'success': False, |
| 'validation': {'valid': False, 'score': 0, 'reason': f'Erro de execução: {e}'} |
| } |
| |
| def _calculate_group_stats(self, results: List[Dict[str, Any]], group: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Calcula estatísticas de um grupo |
| |
| Args: |
| results: Resultados individuais do grupo |
| group: Configuração do grupo |
| |
| Returns: |
| Estatísticas do grupo |
| """ |
| if not results: |
| return { |
| 'group_id': group['id'], |
| 'group_config': group, |
| 'total_tests': 0, |
| 'success_rate': 0, |
| 'validation_rate': 0, |
| 'consistency_rate': 0, |
| 'avg_execution_time': 0, |
| 'individual_results': [] |
| } |
| |
| total_tests = len(results) |
| successful_tests = sum(1 for r in results if r.get('success', False)) |
| valid_responses = sum(1 for r in results if r.get('validation', {}).get('valid', False)) |
| |
| |
| responses = [r.get('response', '') for r in results if r.get('success', False)] |
| sql_queries = [r.get('sql_query', '') for r in results if r.get('success', False)] |
| |
| response_consistency = self._calculate_consistency(responses) |
| sql_consistency = self._calculate_consistency(sql_queries) |
| |
| avg_execution_time = sum(r.get('execution_time', 0) for r in results) / total_tests |
| |
| return { |
| 'group_id': group['id'], |
| 'group_config': group, |
| 'total_tests': total_tests, |
| 'successful_tests': successful_tests, |
| 'valid_responses': valid_responses, |
| 'success_rate': round((successful_tests / total_tests) * 100, 2), |
| 'validation_rate': round((valid_responses / total_tests) * 100, 2), |
| 'response_consistency': round(response_consistency * 100, 2), |
| 'sql_consistency': round(sql_consistency * 100, 2), |
| 'avg_execution_time': round(avg_execution_time, 2), |
| 'individual_results': results |
| } |
| |
| def _calculate_consistency(self, items: List[str]) -> float: |
| """ |
| Calcula taxa de consistência entre itens |
| |
| Args: |
| items: Lista de strings para comparar |
| |
| Returns: |
| Taxa de consistência (0-1) |
| """ |
| if len(items) <= 1: |
| return 1.0 |
| |
| |
| unique_items = set(items) |
| most_common_count = max(items.count(item) for item in unique_items) |
| |
| return most_common_count / len(items) |
| |
| def _generate_summary(self): |
| """Gera resumo geral dos testes""" |
| group_results = self.results.get('group_results', []) |
| |
| if not group_results: |
| self.results['summary'] = {} |
| return |
| |
| total_tests = sum(gr['total_tests'] for gr in group_results) |
| total_successful = sum(gr['successful_tests'] for gr in group_results) |
| total_valid = sum(gr['valid_responses'] for gr in group_results) |
| |
| avg_success_rate = sum(gr['success_rate'] for gr in group_results) / len(group_results) |
| avg_validation_rate = sum(gr['validation_rate'] for gr in group_results) / len(group_results) |
| avg_response_consistency = sum(gr['response_consistency'] for gr in group_results) / len(group_results) |
| avg_sql_consistency = sum(gr['sql_consistency'] for gr in group_results) / len(group_results) |
| |
| self.results['summary'] = { |
| 'total_groups': len(group_results), |
| 'total_tests': total_tests, |
| 'total_successful': total_successful, |
| 'total_valid': total_valid, |
| 'overall_success_rate': round((total_successful / total_tests) * 100, 2), |
| 'overall_validation_rate': round((total_valid / total_tests) * 100, 2), |
| 'avg_response_consistency': round(avg_response_consistency, 2), |
| 'avg_sql_consistency': round(avg_sql_consistency, 2), |
| 'best_performing_group': max(group_results, key=lambda x: x['validation_rate']), |
| 'most_consistent_group': max(group_results, key=lambda x: x['response_consistency']) |
| } |
| |
| def get_status(self) -> Dict[str, Any]: |
| """Retorna status atual dos testes""" |
| with self._lock: |
| status = self.status.copy() |
| |
| status['running_tests_count'] = len(self.status['running_tests']) |
| status['running_tests_details'] = list(self.status['running_tests'].values()) |
| return status |
|
|
| def cancel_current_test(self, thread_id: str = None) -> bool: |
| """ |
| Cancela teste específico ou o mais antigo em execução |
| |
| Args: |
| thread_id: ID do teste específico para cancelar (opcional) |
| |
| Returns: |
| True se cancelou algum teste |
| """ |
| with self._lock: |
| if thread_id: |
| if thread_id in self.status['running_tests']: |
| self.status['cancelled_tests'].add(thread_id) |
| print(f"🚫 Teste {thread_id} marcado para cancelamento") |
| logging.info(f"Teste {thread_id} cancelado pelo usuário") |
| return True |
| else: |
| |
| if self.status['running_tests']: |
| oldest_test = min( |
| self.status['running_tests'].items(), |
| key=lambda x: x[1]['start_time'] |
| ) |
| thread_id = oldest_test[0] |
| self.status['cancelled_tests'].add(thread_id) |
| print(f"🚫 Teste mais antigo {thread_id} marcado para cancelamento") |
| logging.info(f"Teste mais antigo {thread_id} cancelado pelo usuário") |
| return True |
| return False |
|
|
| def cancel_all_tests(self) -> int: |
| """ |
| Cancela todos os testes em execução |
| |
| Returns: |
| Número de testes cancelados |
| """ |
| with self._lock: |
| running_count = len(self.status['running_tests']) |
| for thread_id in self.status['running_tests'].keys(): |
| self.status['cancelled_tests'].add(thread_id) |
|
|
| print(f"🚫 {running_count} testes marcados para cancelamento") |
| logging.info(f"{running_count} testes cancelados pelo usuário") |
| return running_count |
|
|
| def skip_stuck_tests(self, max_duration: int = 120) -> int: |
| """ |
| Marca testes travados (que excedem tempo limite) para cancelamento |
| |
| Args: |
| max_duration: Tempo máximo em segundos |
| |
| Returns: |
| Número de testes marcados como travados |
| """ |
| current_time = time.time() |
| stuck_count = 0 |
|
|
| with self._lock: |
| for thread_id, test_info in self.status['running_tests'].items(): |
| if current_time - test_info['start_time'] > max_duration: |
| if thread_id not in self.status['cancelled_tests']: |
| self.status['timeout_tests'].add(thread_id) |
| self.status['cancelled_tests'].add(thread_id) |
| stuck_count += 1 |
| print(f"⏰ Teste {thread_id} marcado como travado (>{max_duration}s)") |
| logging.warning(f"Teste {thread_id} travado - timeout após {max_duration}s") |
|
|
| return stuck_count |
|
|
| def _create_cancelled_result(self, thread_id: str, group: Dict[str, Any], iteration: int, start_time: float, reason: str = 'user_cancelled') -> Dict[str, Any]: |
| """Cria resultado para teste cancelado""" |
| execution_time = time.time() - start_time |
| return { |
| 'thread_id': thread_id, |
| 'group_id': group['id'], |
| 'iteration': iteration, |
| 'success': False, |
| 'cancelled': True, |
| 'cancel_reason': reason, |
| 'execution_time': execution_time, |
| 'sql_query': None, |
| 'final_response': f"Teste cancelado: {reason}", |
| 'validation_valid': False, |
| 'validation_score': 0, |
| 'error': None, |
| 'timestamp': datetime.now().isoformat() |
| } |
|
|
| def _create_timeout_result(self, thread_id: str, group: Dict[str, Any], iteration: int, start_time: float, duration: int) -> Dict[str, Any]: |
| """Cria resultado para teste com timeout""" |
| execution_time = time.time() - start_time |
| return { |
| 'thread_id': thread_id, |
| 'group_id': group['id'], |
| 'iteration': iteration, |
| 'success': False, |
| 'timeout': True, |
| 'timeout_duration': duration, |
| 'execution_time': execution_time, |
| 'sql_query': None, |
| 'final_response': f"Teste travado - timeout após {duration}s", |
| 'validation_valid': False, |
| 'validation_score': 0, |
| 'error': f"Timeout após {duration}s", |
| 'timestamp': datetime.now().isoformat() |
| } |
| |
| def get_results(self) -> Dict[str, Any]: |
| """Retorna resultados dos testes""" |
| return self.results |
|
|