Tsukihjy/testcase / methods /codeT /execute_tool_win.py
download
raw
6.36 kB
import subprocess
import tempfile
import os
import psutil
import threading
import uuid
import signal
def run_cpp_code(code_str: str, stdin_str: str, time_limit: int = 2, memory_limit_mb: int = 128):
file_id = str(uuid.uuid4())
cpp_file = f"./cache/{file_id}.cpp"
exe_file = f"./cache/{file_id}.exe"
with open(cpp_file, "w", encoding="utf-8") as f:
f.write(code_str)
# 编译代码
compile_cmd = ["g++", cpp_file, "-o", exe_file]
try:
subprocess.run(compile_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=time_limit)
except subprocess.CalledProcessError as e:
return {"error": "Compilation failed", "details": e.stderr.decode()}
except FileNotFoundError:
return {"error": "g++ not found. Please install MinGW or similar compiler on Windows."}
proc = None
try:
# 启动子进程
proc = subprocess.Popen(
[exe_file],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
stdout, stderr = proc.communicate(input=stdin_str.encode(), timeout=time_limit)
return {
"stdout": stdout.decode(),
"stderr": stderr.decode(),
"exit_code": proc.returncode
}
except subprocess.TimeoutExpired:
if proc:
proc.kill()
proc.wait()
return {"error": "Execution timed out"}
finally:
# 清理文件前确保进程完全退出
if proc and proc.poll() is not None:
proc.kill()
proc.wait()
if os.path.exists(cpp_file):
os.remove(cpp_file)
if os.path.exists(exe_file):
try:
os.remove(exe_file)
except PermissionError:
# Windows 有时仍持有锁,稍后再试
attempt = 5
while attempt > 0 :
attempt -= 1
import time
time.sleep(0.5)
try:
os.remove(exe_file)
except Exception as e:
print(f"Failed to delete exe file: {e}")
import subprocess
import tempfile
import os
import threading
import psutil
import time
def run_python_code_windows(code_string, input_string, time_limit_sec=10, memory_limit_mb=256):
with tempfile.TemporaryDirectory() as tmpdirname:
py_file = os.path.join(tmpdirname, "temp.py")
with open(py_file, "w") as f:
f.write(code_string)
# Start subprocess
process = subprocess.Popen(
["python", py_file],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Memory & time monitor
def monitor():
try:
p = psutil.Process(process.pid)
start_time = time.time()
while True:
mem_usage = p.memory_info().rss / (1024 * 1024) # in MB
if mem_usage > memory_limit_mb:
process.kill()
break
if time.time() - start_time > time_limit_sec:
process.kill()
break
if process.poll() is not None:
break
time.sleep(0.05)
except psutil.NoSuchProcess:
pass
monitor_thread = threading.Thread(target=monitor)
monitor_thread.start()
try:
stdout, stderr = process.communicate(input=input_string, timeout=time_limit_sec + 1)
return {
"stdout": stdout,
"stderr": stderr,
"returncode": process.returncode
}
except subprocess.TimeoutExpired:
process.kill()
return {"error": "Time limit exceeded"}
except Exception as e:
process.kill()
return {"error": f"Runtime error: {e}"}
import threading
import random
import queue
def run_func_code(code_str, funcname="gen_input", param=None, timeout=2):
"""
Executes the given code string and calls the specified function with timeout using threading.
Args:
code_str (str): The Python code containing the function definition.
funcname (str): The name of the function to execute.
param (Any): Optional parameter to pass to the function.
timeout (int): Maximum execution time in seconds.
Returns:
Any or dict: Result or {'error': ...}
"""
result_queue = queue.Queue()
stop_event = threading.Event() # Create an Event to signal the thread to stop
def target():
try:
global_scope = {
"random": random,
"string": __import__("string"),
"math": __import__("math"),
"os": __import__("os"),
"re": __import__("re"),
"__builtins__": __builtins__,
}
exec(code_str, global_scope)
func = global_scope.get(funcname)
if not callable(func):
result_queue.put({"error": f"Function '{funcname}' not found in the provided code."})
return
output = func(param) if param is not None else func()
if not stop_event.is_set(): # Check if we are not stopped by timeout
result_queue.put(output)
except Exception as e:
result_queue.put({"error": str(e)})
thread = threading.Thread(target=target)
thread.start()
thread.join(timeout)
# If thread is still alive after timeout, signal it to stop (soft termination)
if thread.is_alive():
stop_event.set() # Signal the thread to stop
return {"error": f"Execution exceeded {timeout} seconds."}
# Return the result from the queue
return result_queue.get()
# code = '''
# def gen_input():
# while True:
# pass
# '''
# print(run_func_code(code, timeout=1)) # {'error': 'Execution exceeded 1 seconds.'}

Xet Storage Details

Size:
6.36 kB
·
Xet hash:
72e14632304640cf75bbfcab64df0b23d685d910554b060faae2e96ec01b50ca

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.