| import subprocess | |
| import tempfile | |
| import os | |
| import resource | |
| import sys | |
| import uuid | |
| def run_cpp_code_linux(code_string, input_string, time_limit=2, memory_limit=256): | |
| with tempfile.TemporaryDirectory() as tmpdirname: | |
| unique_id = uuid.uuid4() | |
| cpp_file = os.path.join(tmpdirname, f"{unique_id}.cpp") | |
| exe_file = os.path.join(tmpdirname, f"{unique_id}.out") | |
| # Write C++ code to file | |
| with open(cpp_file, "w") as f: | |
| f.write(code_string) | |
| # Compile the C++ code | |
| compile_result = subprocess.run( | |
| ["g++", cpp_file, "-o", exe_file], | |
| capture_output=True, | |
| text=True | |
| ) | |
| if compile_result.returncode != 0: | |
| return { | |
| "error": "Compilation failed", | |
| "details": compile_result.stderr | |
| } | |
| def set_limits(): | |
| # CPU time limit (seconds) | |
| resource.setrlimit(resource.RLIMIT_CPU, (time_limit, time_limit)) | |
| # Memory limit (bytes) | |
| memory_bytes = memory_limit * 1024 * 1024 | |
| resource.setrlimit(resource.RLIMIT_AS, (memory_bytes, memory_bytes)) | |
| try: | |
| result = subprocess.run( | |
| [exe_file], | |
| input=input_string, | |
| text=True, | |
| capture_output=True, | |
| preexec_fn=set_limits, | |
| timeout=time_limit + 1 | |
| ) | |
| return { | |
| "stdout": result.stdout, | |
| "stderr": result.stderr, | |
| "returncode": result.returncode | |
| } | |
| except subprocess.TimeoutExpired: | |
| return { | |
| "error": "Time limit exceeded" | |
| } | |
| except MemoryError: | |
| return { | |
| "error": "Memory limit exceeded" | |
| } | |
| except Exception as e: | |
| return { | |
| "error": f"Runtime error: {e}" | |
| } | |
| import multiprocessing | |
| import traceback | |
| def run_func_code(code_str, funcname= "construct_inputs", param=None, time_limit=2): | |
| def target_func(shared_dict): | |
| try: | |
| env = {} | |
| exec(code_str, env) | |
| if funcname in env and callable(env[funcname]): | |
| if param == None: | |
| result = env[funcname]() | |
| else: | |
| result = env[funcname](param) | |
| shared_dict['status'] = 'success' | |
| shared_dict['result'] = result | |
| else: | |
| shared_dict['status'] = 'error' | |
| shared_dict['result'] = 'No function named construct_inputs found' | |
| except Exception: | |
| shared_dict['status'] = 'error' | |
| shared_dict['result'] = traceback.format_exc() | |
| manager = multiprocessing.Manager() | |
| shared_dict = manager.dict() | |
| process = multiprocessing.Process(target=target_func, args=(shared_dict,)) | |
| process.start() | |
| process.join(time_limit) | |
| if process.is_alive(): | |
| process.terminate() | |
| return 'TimeoutError: Execution exceeded time limit' | |
| if shared_dict.get('status') == 'success': | |
| return shared_dict.get('result') | |
| else: | |
| return f"ExecutionError: {shared_dict.get('result')}" | |
| import subprocess | |
| import tempfile | |
| import os | |
| def run_python_code_linux(code_str, test_input, time_limit=2): | |
| with tempfile.NamedTemporaryFile(mode='w+', suffix='.py', delete=False) as tmp_file: | |
| tmp_file.write(code_str) | |
| tmp_file.flush() | |
| tmp_filename = tmp_file.name | |
| try: | |
| # 使用 subprocess 运行 Python 程序,并提供 stdin 输入 | |
| result = subprocess.run( | |
| ['python3', tmp_filename], | |
| input=test_input, | |
| text=True, | |
| capture_output=True, | |
| timeout=time_limit | |
| ) | |
| return result.stdout | |
| except subprocess.TimeoutExpired: | |
| return "TimeoutError: Execution exceeded time limit" | |
| except Exception as e: | |
| return f"ExecutionError: {str(e)}" | |
| finally: | |
| os.remove(tmp_filename) # 清理临时文件 | |
| # # 测试代码示例 | |
| # if __name__ == "__main__": | |
| # # 要执行的 Python 代码(从标准输入读取一行并打印) | |
| # code = """ | |
| # input_str = input() | |
| # print("Received:", input_str) | |
| # """ | |
| # # 输入内容 | |
| # test_input = "Fuck, Test!" | |
| # # 调用函数 | |
| # output = run_python_code_linux(code, test_input) | |
| # # 输出运行结果 | |
| # print("运行结果:") | |
| # print(output) | |
| # def run_multiple_cpp_tasks(code_string, input_string_list, time_limit=2, memory_limit=256): | |
| # # Create a ThreadPoolExecutor for concurrent execution | |
| # with ThreadPoolExecutor() as executor: | |
| # # Submit all tasks | |
| # futures = [executor.submit(run_cpp_code_linux, code_string, input_string, time_limit, memory_limit) | |
| # for input_string in input_string_list] | |
| # # Wait for all tasks to complete and collect results | |
| # results = [] | |
| # for future in as_completed(futures): | |
| # result = future.result() | |
| # results.append(result) | |
| # return results | |
| # # Example usage: | |
| # code_string = ''' | |
| # #include <iostream> | |
| # using namespace std; | |
| # int main() { | |
| # string input; | |
| # getline(cin, input); | |
| # cout << "Input received: " << input << endl; | |
| # return 0; | |
| # } | |
| # ''' | |
| # input_string_list = [ | |
| # "input1", | |
| # "input2", | |
| # "input3", | |
| # "input4" | |
| # ] | |
| # results = run_multiple_cpp_tasks(code_string, input_string_list) | |
| # # Print the results | |
| # for i, result in enumerate(results): | |
| # print(f"Result {i + 1}: {result}") | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| def run_multiple_tests(code_str, input_strings, time_limit=2): | |
| # 使用 ThreadPoolExecutor 来并行执行多个任务 | |
| with ThreadPoolExecutor() as executor: | |
| results = list(executor.map(lambda test_input: run_python_code_linux(code_str, test_input, time_limit), input_strings)) | |
| return results | |
| # # 示例用法 | |
| # code_str = """ | |
| # import sys | |
| # # 从标准输入读取 | |
| # input_data = sys.stdin.read().strip() | |
| # # 返回处理后的输出 | |
| # print(f"Processed input: {input_data}") | |
| # """ | |
| # input_strings = ["Alice", "Bob", "Charlie"] | |
| # results = run_multiple_tests(code_str, input_strings) | |
| # for result in results: | |
| # print(result) | |
Xet Storage Details
- Size:
- 6.36 kB
- Xet hash:
- 0e6fa8ab0396b83ad3b1bc926728b2f89bb4095064c840ee255b780e26180f69
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.