| |
| |
|
|
| import ctypes |
| |
|
|
| from collections import defaultdict |
| from concurrent.futures import as_completed, ProcessPoolExecutor |
| import logging |
|
|
| from factool.code.helper._execution import test_case_against_solution |
|
|
| logging.basicConfig( |
| format="SystemLog: [%(asctime)s][%(name)s][%(levelname)s] - %(message)s", |
| datefmt="%Y-%m-%d %H:%M:%S", |
| level=logging.INFO, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
| def evaluate_test_cases_multi_solution(prompt, testcases_input, |
| multi_solutions, timeout=0.1): |
| logger.info(f'Start evaluation with test code, timeout={timeout}') |
|
|
| with ProcessPoolExecutor() as executor: |
| futures = [] |
| results = [[None for _ in multi_solutions] for _ in testcases_input] |
|
|
| for i, testcase in enumerate(testcases_input): |
| for j, solution in enumerate(multi_solutions): |
| args = (prompt, solution, testcase, timeout) |
| future = executor.submit(test_case_against_solution, *args) |
| futures.append((i, j, future)) |
| logger.info(f'{len(futures)} execution requests are submitted') |
| |
| for completed_future in as_completed([f[2] for f in futures]): |
| for i, j, future in futures: |
| if future == completed_future: |
| logger.info('[{}/{}] execution completed'.format( |
| i * len(multi_solutions) + j + 1, len(futures))) |
| result = completed_future.result() |
| results[i][j] = result |
| break |
|
|
| return results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|