Tsukihjy/testcase / methods /codeT /execute_tool_linux.py
download
raw
6.36 kB
import subprocess
import tempfile
import os
import resource
import sys
import uuid
def run_cpp_code_linux(code_string, input_string, time_limit=2, memory_limit=256):
with tempfile.TemporaryDirectory() as tmpdirname:
unique_id = uuid.uuid4()
cpp_file = os.path.join(tmpdirname, f"{unique_id}.cpp")
exe_file = os.path.join(tmpdirname, f"{unique_id}.out")
# Write C++ code to file
with open(cpp_file, "w") as f:
f.write(code_string)
# Compile the C++ code
compile_result = subprocess.run(
["g++", cpp_file, "-o", exe_file],
capture_output=True,
text=True
)
if compile_result.returncode != 0:
return {
"error": "Compilation failed",
"details": compile_result.stderr
}
def set_limits():
# CPU time limit (seconds)
resource.setrlimit(resource.RLIMIT_CPU, (time_limit, time_limit))
# Memory limit (bytes)
memory_bytes = memory_limit * 1024 * 1024
resource.setrlimit(resource.RLIMIT_AS, (memory_bytes, memory_bytes))
try:
result = subprocess.run(
[exe_file],
input=input_string,
text=True,
capture_output=True,
preexec_fn=set_limits,
timeout=time_limit + 1
)
return {
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode
}
except subprocess.TimeoutExpired:
return {
"error": "Time limit exceeded"
}
except MemoryError:
return {
"error": "Memory limit exceeded"
}
except Exception as e:
return {
"error": f"Runtime error: {e}"
}
import multiprocessing
import traceback
def run_func_code(code_str, funcname= "construct_inputs", param=None, time_limit=2):
def target_func(shared_dict):
try:
env = {}
exec(code_str, env)
if funcname in env and callable(env[funcname]):
if param == None:
result = env[funcname]()
else:
result = env[funcname](param)
shared_dict['status'] = 'success'
shared_dict['result'] = result
else:
shared_dict['status'] = 'error'
shared_dict['result'] = 'No function named construct_inputs found'
except Exception:
shared_dict['status'] = 'error'
shared_dict['result'] = traceback.format_exc()
manager = multiprocessing.Manager()
shared_dict = manager.dict()
process = multiprocessing.Process(target=target_func, args=(shared_dict,))
process.start()
process.join(time_limit)
if process.is_alive():
process.terminate()
return 'TimeoutError: Execution exceeded time limit'
if shared_dict.get('status') == 'success':
return shared_dict.get('result')
else:
return f"ExecutionError: {shared_dict.get('result')}"
import subprocess
import tempfile
import os
def run_python_code_linux(code_str, test_input, time_limit=2):
with tempfile.NamedTemporaryFile(mode='w+', suffix='.py', delete=False) as tmp_file:
tmp_file.write(code_str)
tmp_file.flush()
tmp_filename = tmp_file.name
try:
# 使用 subprocess 运行 Python 程序,并提供 stdin 输入
result = subprocess.run(
['python3', tmp_filename],
input=test_input,
text=True,
capture_output=True,
timeout=time_limit
)
return result.stdout
except subprocess.TimeoutExpired:
return "TimeoutError: Execution exceeded time limit"
except Exception as e:
return f"ExecutionError: {str(e)}"
finally:
os.remove(tmp_filename) # 清理临时文件
# # 测试代码示例
# if __name__ == "__main__":
# # 要执行的 Python 代码(从标准输入读取一行并打印)
# code = """
# input_str = input()
# print("Received:", input_str)
# """
# # 输入内容
# test_input = "Fuck, Test!"
# # 调用函数
# output = run_python_code_linux(code, test_input)
# # 输出运行结果
# print("运行结果:")
# print(output)
# def run_multiple_cpp_tasks(code_string, input_string_list, time_limit=2, memory_limit=256):
# # Create a ThreadPoolExecutor for concurrent execution
# with ThreadPoolExecutor() as executor:
# # Submit all tasks
# futures = [executor.submit(run_cpp_code_linux, code_string, input_string, time_limit, memory_limit)
# for input_string in input_string_list]
# # Wait for all tasks to complete and collect results
# results = []
# for future in as_completed(futures):
# result = future.result()
# results.append(result)
# return results
# # Example usage:
# code_string = '''
# #include <iostream>
# using namespace std;
# int main() {
# string input;
# getline(cin, input);
# cout << "Input received: " << input << endl;
# return 0;
# }
# '''
# input_string_list = [
# "input1",
# "input2",
# "input3",
# "input4"
# ]
# results = run_multiple_cpp_tasks(code_string, input_string_list)
# # Print the results
# for i, result in enumerate(results):
# print(f"Result {i + 1}: {result}")
from concurrent.futures import ThreadPoolExecutor, as_completed
def run_multiple_tests(code_str, input_strings, time_limit=2):
# 使用 ThreadPoolExecutor 来并行执行多个任务
with ThreadPoolExecutor() as executor:
results = list(executor.map(lambda test_input: run_python_code_linux(code_str, test_input, time_limit), input_strings))
return results
# # 示例用法
# code_str = """
# import sys
# # 从标准输入读取
# input_data = sys.stdin.read().strip()
# # 返回处理后的输出
# print(f"Processed input: {input_data}")
# """
# input_strings = ["Alice", "Bob", "Charlie"]
# results = run_multiple_tests(code_str, input_strings)
# for result in results:
# print(result)

Xet Storage Details

Size:
6.36 kB
·
Xet hash:
0e6fa8ab0396b83ad3b1bc926728b2f89bb4095064c840ee255b780e26180f69

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.