| import subprocess | |
| import tempfile | |
| import os | |
| import psutil | |
| import threading | |
| import uuid | |
| import signal | |
| def run_cpp_code(code_str: str, stdin_str: str, time_limit: int = 2, memory_limit_mb: int = 128): | |
| file_id = str(uuid.uuid4()) | |
| cpp_file = f"./cache/{file_id}.cpp" | |
| exe_file = f"./cache/{file_id}.exe" | |
| with open(cpp_file, "w", encoding="utf-8") as f: | |
| f.write(code_str) | |
| # 编译代码 | |
| compile_cmd = ["g++", cpp_file, "-o", exe_file] | |
| try: | |
| subprocess.run(compile_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=time_limit) | |
| except subprocess.CalledProcessError as e: | |
| return {"error": "Compilation failed", "details": e.stderr.decode()} | |
| except FileNotFoundError: | |
| return {"error": "g++ not found. Please install MinGW or similar compiler on Windows."} | |
| proc = None | |
| try: | |
| # 启动子进程 | |
| proc = subprocess.Popen( | |
| [exe_file], | |
| stdin=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE | |
| ) | |
| stdout, stderr = proc.communicate(input=stdin_str.encode(), timeout=time_limit) | |
| return { | |
| "stdout": stdout.decode(), | |
| "stderr": stderr.decode(), | |
| "exit_code": proc.returncode | |
| } | |
| except subprocess.TimeoutExpired: | |
| if proc: | |
| proc.kill() | |
| proc.wait() | |
| return {"error": "Execution timed out"} | |
| finally: | |
| # 清理文件前确保进程完全退出 | |
| if proc and proc.poll() is not None: | |
| proc.kill() | |
| proc.wait() | |
| if os.path.exists(cpp_file): | |
| os.remove(cpp_file) | |
| if os.path.exists(exe_file): | |
| try: | |
| os.remove(exe_file) | |
| except PermissionError: | |
| # Windows 有时仍持有锁,稍后再试 | |
| attempt = 5 | |
| while attempt > 0 : | |
| attempt -= 1 | |
| import time | |
| time.sleep(0.5) | |
| try: | |
| os.remove(exe_file) | |
| except Exception as e: | |
| print(f"Failed to delete exe file: {e}") | |
| import subprocess | |
| import tempfile | |
| import os | |
| import threading | |
| import psutil | |
| import time | |
| def run_python_code_windows(code_string, input_string, time_limit_sec=10, memory_limit_mb=256): | |
| with tempfile.TemporaryDirectory() as tmpdirname: | |
| py_file = os.path.join(tmpdirname, "temp.py") | |
| with open(py_file, "w") as f: | |
| f.write(code_string) | |
| # Start subprocess | |
| process = subprocess.Popen( | |
| ["python", py_file], | |
| stdin=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True | |
| ) | |
| # Memory & time monitor | |
| def monitor(): | |
| try: | |
| p = psutil.Process(process.pid) | |
| start_time = time.time() | |
| while True: | |
| mem_usage = p.memory_info().rss / (1024 * 1024) # in MB | |
| if mem_usage > memory_limit_mb: | |
| process.kill() | |
| break | |
| if time.time() - start_time > time_limit_sec: | |
| process.kill() | |
| break | |
| if process.poll() is not None: | |
| break | |
| time.sleep(0.05) | |
| except psutil.NoSuchProcess: | |
| pass | |
| monitor_thread = threading.Thread(target=monitor) | |
| monitor_thread.start() | |
| try: | |
| stdout, stderr = process.communicate(input=input_string, timeout=time_limit_sec + 1) | |
| return { | |
| "stdout": stdout, | |
| "stderr": stderr, | |
| "returncode": process.returncode | |
| } | |
| except subprocess.TimeoutExpired: | |
| process.kill() | |
| return {"error": "Time limit exceeded"} | |
| except Exception as e: | |
| process.kill() | |
| return {"error": f"Runtime error: {e}"} | |
| import threading | |
| import random | |
| import queue | |
| def run_func_code(code_str, funcname="gen_input", param=None, timeout=2): | |
| """ | |
| Executes the given code string and calls the specified function with timeout using threading. | |
| Args: | |
| code_str (str): The Python code containing the function definition. | |
| funcname (str): The name of the function to execute. | |
| param (Any): Optional parameter to pass to the function. | |
| timeout (int): Maximum execution time in seconds. | |
| Returns: | |
| Any or dict: Result or {'error': ...} | |
| """ | |
| result_queue = queue.Queue() | |
| stop_event = threading.Event() # Create an Event to signal the thread to stop | |
| def target(): | |
| try: | |
| global_scope = { | |
| "random": random, | |
| "string": __import__("string"), | |
| "math": __import__("math"), | |
| "os": __import__("os"), | |
| "re": __import__("re"), | |
| "__builtins__": __builtins__, | |
| } | |
| exec(code_str, global_scope) | |
| func = global_scope.get(funcname) | |
| if not callable(func): | |
| result_queue.put({"error": f"Function '{funcname}' not found in the provided code."}) | |
| return | |
| output = func(param) if param is not None else func() | |
| if not stop_event.is_set(): # Check if we are not stopped by timeout | |
| result_queue.put(output) | |
| except Exception as e: | |
| result_queue.put({"error": str(e)}) | |
| thread = threading.Thread(target=target) | |
| thread.start() | |
| thread.join(timeout) | |
| # If thread is still alive after timeout, signal it to stop (soft termination) | |
| if thread.is_alive(): | |
| stop_event.set() # Signal the thread to stop | |
| return {"error": f"Execution exceeded {timeout} seconds."} | |
| # Return the result from the queue | |
| return result_queue.get() | |
| # code = ''' | |
| # def gen_input(): | |
| # while True: | |
| # pass | |
| # ''' | |
| # print(run_func_code(code, timeout=1)) # {'error': 'Execution exceeded 1 seconds.'} | |
Xet Storage Details
- Size:
- 6.36 kB
- Xet hash:
- 72e14632304640cf75bbfcab64df0b23d685d910554b060faae2e96ec01b50ca
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.