koichi12's picture
Add files using upload-large-folder tool
cf6a8b4 verified
raw
history blame
4.05 kB
import logging
import os
import sys
import threading
import importlib
import ray
from ray.util.annotations import DeveloperAPI
log = logging.getLogger(__name__)
POST_MORTEM_ERROR_UUID = "post_mortem_error_uuid"
def _try_import_debugpy():
try:
debugpy = importlib.import_module("debugpy")
if not hasattr(debugpy, "__version__") or debugpy.__version__ < "1.8.0":
raise ImportError()
return debugpy
except (ModuleNotFoundError, ImportError):
log.error(
"Module 'debugpy>=1.8.0' cannot be loaded. "
"Ray Debugpy Debugger will not work without 'debugpy>=1.8.0' installed. "
"Install this module using 'pip install debugpy==1.8.0' "
)
return None
# A lock to ensure that only one thread can open the debugger port.
debugger_port_lock = threading.Lock()
def _override_breakpoint_hooks():
"""
This method overrides the breakpoint() function to set_trace()
so that other threads can reuse the same setup logic.
This is based on: https://github.com/microsoft/debugpy/blob/ef9a67fe150179ee4df9997f9273723c26687fab/src/debugpy/_vendored/pydevd/pydev_sitecustomize/sitecustomize.py#L87 # noqa: E501
"""
sys.__breakpointhook__ = set_trace
sys.breakpointhook = set_trace
import builtins as __builtin__
__builtin__.breakpoint = set_trace
def _ensure_debugger_port_open_thread_safe():
"""
This is a thread safe method that ensure that the debugger port
is open, and if not, open it.
"""
# The lock is acquired before checking the debugger port so only
# one thread can open the debugger port.
with debugger_port_lock:
debugpy = _try_import_debugpy()
if not debugpy:
return
debugger_port = ray._private.worker.global_worker.debugger_port
if not debugger_port:
(host, port) = debugpy.listen(
(ray._private.worker.global_worker.node_ip_address, 0)
)
ray._private.worker.global_worker.set_debugger_port(port)
log.info(f"Ray debugger is listening on {host}:{port}")
else:
log.info(f"Ray debugger is already open on {debugger_port}")
@DeveloperAPI
def set_trace(breakpoint_uuid=None):
"""Interrupt the flow of the program and drop into the Ray debugger.
Can be used within a Ray task or actor.
"""
debugpy = _try_import_debugpy()
if not debugpy:
return
_ensure_debugger_port_open_thread_safe()
# debugpy overrides the breakpoint() function, so we need to set it back
# so other threads can reuse it.
_override_breakpoint_hooks()
with ray._private.worker.global_worker.worker_paused_by_debugger():
msg = (
"Waiting for debugger to attach (see "
"https://docs.ray.io/en/latest/ray-observability/"
"ray-distributed-debugger.html)..."
)
log.info(msg)
debugpy.wait_for_client()
log.info("Debugger client is connected")
if breakpoint_uuid == POST_MORTEM_ERROR_UUID:
_debugpy_excepthook()
else:
_debugpy_breakpoint()
def _debugpy_breakpoint():
"""
Drop the user into the debugger on a breakpoint.
"""
import pydevd
pydevd.settrace(stop_at_frame=sys._getframe().f_back)
def _debugpy_excepthook():
"""
Drop the user into the debugger on an unhandled exception.
"""
import threading
import pydevd
py_db = pydevd.get_global_debugger()
thread = threading.current_thread()
additional_info = py_db.set_additional_thread_info(thread)
additional_info.is_tracing += 1
try:
error = sys.exc_info()
py_db.stop_on_unhandled_exception(py_db, thread, additional_info, error)
sys.excepthook(error[0], error[1], error[2])
finally:
additional_info.is_tracing -= 1
def _is_ray_debugger_post_mortem_enabled():
return os.environ.get("RAY_DEBUG_POST_MORTEM", "0") == "1"
def _post_mortem():
return set_trace(POST_MORTEM_ERROR_UUID)