| import subprocess |
| import os |
| import sys |
| import random |
| import threading |
| import collections |
| import logging |
| import shutil |
| import time |
|
|
|
|
| _logger = logging.getLogger("ray.util.spark.utils") |
|
|
|
|
| def is_in_databricks_runtime(): |
| return "DATABRICKS_RUNTIME_VERSION" in os.environ |
|
|
|
|
| def gen_cmd_exec_failure_msg(cmd, return_code, tail_output_deque): |
| cmd_str = " ".join(cmd) |
| tail_output = "".join(tail_output_deque) |
| return ( |
| f"Command {cmd_str} failed with return code {return_code}, tail output are " |
| f"included below.\n{tail_output}\n" |
| ) |
|
|
|
|
| def get_configured_spark_executor_memory_bytes(spark): |
| value_str = spark.conf.get("spark.executor.memory", "1g").lower() |
| value_num = int(value_str[:-1]) |
| value_unit = value_str[-1] |
| unit_map = { |
| "k": 1024, |
| "m": 1024 * 1024, |
| "g": 1024 * 1024 * 1024, |
| "t": 1024 * 1024 * 1024 * 1024, |
| } |
| return value_num * unit_map[value_unit] |
|
|
|
|
| def exec_cmd( |
| cmd, |
| *, |
| extra_env=None, |
| synchronous=True, |
| **kwargs, |
| ): |
| """ |
| A convenience wrapper of `subprocess.Popen` for running a command from a Python |
| script. |
| If `synchronous` is True, wait until the process terminated and if subprocess |
| return code is not 0, raise error containing last 100 lines output. |
| If `synchronous` is False, return an `Popen` instance and a deque instance holding |
| tail outputs. |
| The subprocess stdout / stderr output will be streamly redirected to current |
| process stdout. |
| """ |
| illegal_kwargs = set(kwargs.keys()).intersection({"text", "stdout", "stderr"}) |
| if illegal_kwargs: |
| raise ValueError(f"`kwargs` cannot contain {list(illegal_kwargs)}") |
|
|
| env = kwargs.pop("env", None) |
| if extra_env is not None and env is not None: |
| raise ValueError("`extra_env` and `env` cannot be used at the same time") |
|
|
| env = env if extra_env is None else {**os.environ, **extra_env} |
|
|
| process = subprocess.Popen( |
| cmd, |
| env=env, |
| text=True, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| **kwargs, |
| ) |
|
|
| tail_output_deque = collections.deque(maxlen=100) |
|
|
| def redirect_log_thread_fn(): |
| for line in process.stdout: |
| |
| tail_output_deque.append(line) |
|
|
| |
| sys.stdout.write(line) |
|
|
| threading.Thread(target=redirect_log_thread_fn, args=()).start() |
|
|
| if not synchronous: |
| return process, tail_output_deque |
|
|
| return_code = process.wait() |
| if return_code != 0: |
| raise RuntimeError( |
| gen_cmd_exec_failure_msg(cmd, return_code, tail_output_deque) |
| ) |
|
|
|
|
| def is_port_in_use(host, port): |
| import socket |
| from contextlib import closing |
|
|
| with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: |
| return sock.connect_ex((host, port)) == 0 |
|
|
|
|
| def _wait_service_up(host, port, timeout): |
| beg_time = time.time() |
|
|
| while time.time() - beg_time < timeout: |
| if is_port_in_use(host, port): |
| return True |
| time.sleep(1) |
|
|
| return False |
|
|
|
|
| def get_random_unused_port( |
| host, min_port=1024, max_port=65535, max_retries=100, exclude_list=None |
| ): |
| """ |
| Get random unused port. |
| """ |
| |
| rng = random.SystemRandom() |
|
|
| exclude_list = exclude_list or [] |
| for _ in range(max_retries): |
| port = rng.randint(min_port, max_port) |
| if port in exclude_list: |
| continue |
| if not is_port_in_use(host, port): |
| return port |
| raise RuntimeError( |
| f"Get available port between range {min_port} and {max_port} failed." |
| ) |
|
|
|
|
| def get_spark_session(): |
| from pyspark.sql import SparkSession |
|
|
| spark_session = SparkSession.getActiveSession() |
| if spark_session is None: |
| raise RuntimeError( |
| "Spark session haven't been initiated yet. Please use " |
| "`SparkSession.builder` to create a spark session and connect to a spark " |
| "cluster." |
| ) |
| return spark_session |
|
|
|
|
| def get_spark_application_driver_host(spark): |
| return spark.conf.get("spark.driver.host") |
|
|
|
|
| def get_max_num_concurrent_tasks(spark_context, resource_profile): |
| """Gets the current max number of concurrent tasks.""" |
| |
| ssc = spark_context._jsc.sc() |
| if resource_profile is not None: |
|
|
| def dummpy_mapper(_): |
| pass |
|
|
| |
| spark_context.parallelize([1], 1).withResources(resource_profile).map( |
| dummpy_mapper |
| ).collect() |
|
|
| return ssc.maxNumConcurrentTasks(resource_profile._java_resource_profile) |
| else: |
| return ssc.maxNumConcurrentTasks( |
| ssc.resourceProfileManager().defaultResourceProfile() |
| ) |
|
|
|
|
| def _get_spark_worker_total_physical_memory(): |
| import psutil |
|
|
| if RAY_ON_SPARK_WORKER_PHYSICAL_MEMORY_BYTES in os.environ: |
| return int(os.environ[RAY_ON_SPARK_WORKER_PHYSICAL_MEMORY_BYTES]) |
| return psutil.virtual_memory().total |
|
|
|
|
| def _get_spark_worker_total_shared_memory(): |
| import shutil |
|
|
| if RAY_ON_SPARK_WORKER_SHARED_MEMORY_BYTES in os.environ: |
| return int(os.environ[RAY_ON_SPARK_WORKER_SHARED_MEMORY_BYTES]) |
|
|
| return shutil.disk_usage("/dev/shm").total |
|
|
|
|
| |
| _RAY_ON_SPARK_MAX_OBJECT_STORE_MEMORY_PROPORTION = 0.8 |
|
|
| |
| _RAY_ON_SPARK_NODE_MEMORY_BUFFER_OFFSET = 0.8 |
|
|
|
|
| def calc_mem_ray_head_node(configured_heap_memory_bytes, configured_object_store_bytes): |
| import psutil |
| import shutil |
|
|
| if RAY_ON_SPARK_DRIVER_PHYSICAL_MEMORY_BYTES in os.environ: |
| available_physical_mem = int( |
| os.environ[RAY_ON_SPARK_DRIVER_PHYSICAL_MEMORY_BYTES] |
| ) |
| else: |
| available_physical_mem = psutil.virtual_memory().total |
|
|
| available_physical_mem = ( |
| available_physical_mem * _RAY_ON_SPARK_NODE_MEMORY_BUFFER_OFFSET |
| ) |
|
|
| if RAY_ON_SPARK_DRIVER_SHARED_MEMORY_BYTES in os.environ: |
| available_shared_mem = int(os.environ[RAY_ON_SPARK_DRIVER_SHARED_MEMORY_BYTES]) |
| else: |
| available_shared_mem = shutil.disk_usage("/dev/shm").total |
|
|
| available_shared_mem = ( |
| available_shared_mem * _RAY_ON_SPARK_NODE_MEMORY_BUFFER_OFFSET |
| ) |
|
|
| heap_mem_bytes, object_store_bytes, warning_msg = _calc_mem_per_ray_node( |
| available_physical_mem, |
| available_shared_mem, |
| configured_heap_memory_bytes, |
| configured_object_store_bytes, |
| ) |
|
|
| if warning_msg is not None: |
| _logger.warning(warning_msg) |
|
|
| return heap_mem_bytes, object_store_bytes |
|
|
|
|
| def _calc_mem_per_ray_worker_node( |
| num_task_slots, |
| physical_mem_bytes, |
| shared_mem_bytes, |
| configured_heap_memory_bytes, |
| configured_object_store_bytes, |
| ): |
| available_physical_mem_per_node = int( |
| physical_mem_bytes / num_task_slots * _RAY_ON_SPARK_NODE_MEMORY_BUFFER_OFFSET |
| ) |
| available_shared_mem_per_node = int( |
| shared_mem_bytes / num_task_slots * _RAY_ON_SPARK_NODE_MEMORY_BUFFER_OFFSET |
| ) |
| return _calc_mem_per_ray_node( |
| available_physical_mem_per_node, |
| available_shared_mem_per_node, |
| configured_heap_memory_bytes, |
| configured_object_store_bytes, |
| ) |
|
|
|
|
| def _calc_mem_per_ray_node( |
| available_physical_mem_per_node, |
| available_shared_mem_per_node, |
| configured_heap_memory_bytes, |
| configured_object_store_bytes, |
| ): |
| from ray._private.ray_constants import ( |
| DEFAULT_OBJECT_STORE_MEMORY_PROPORTION, |
| OBJECT_STORE_MINIMUM_MEMORY_BYTES, |
| ) |
|
|
| warning_msg = None |
|
|
| object_store_bytes = configured_object_store_bytes or ( |
| available_physical_mem_per_node * DEFAULT_OBJECT_STORE_MEMORY_PROPORTION |
| ) |
|
|
| |
| |
| if not os.environ.get("RAY_OBJECT_STORE_ALLOW_SLOW_STORAGE"): |
| if object_store_bytes > available_shared_mem_per_node: |
| object_store_bytes = available_shared_mem_per_node |
|
|
| object_store_bytes_upper_bound = ( |
| available_physical_mem_per_node |
| * _RAY_ON_SPARK_MAX_OBJECT_STORE_MEMORY_PROPORTION |
| ) |
|
|
| if object_store_bytes > object_store_bytes_upper_bound: |
| object_store_bytes = object_store_bytes_upper_bound |
| warning_msg = ( |
| "Your configured `object_store_memory_per_node` value " |
| "is too high and it is capped by 80% of per-Ray node " |
| "allocated memory." |
| ) |
|
|
| if object_store_bytes < OBJECT_STORE_MINIMUM_MEMORY_BYTES: |
| if object_store_bytes == available_shared_mem_per_node: |
| warning_msg = ( |
| "Your operating system is configured with too small /dev/shm " |
| "size, so `object_store_memory_worker_node` value is configured " |
| f"to minimal size ({OBJECT_STORE_MINIMUM_MEMORY_BYTES} bytes)," |
| f"Please increase system /dev/shm size." |
| ) |
| else: |
| warning_msg = ( |
| "You configured too small Ray node object store memory size, " |
| "so `object_store_memory_worker_node` value is configured " |
| f"to minimal size ({OBJECT_STORE_MINIMUM_MEMORY_BYTES} bytes)," |
| "Please increase 'object_store_memory_worker_node' argument value." |
| ) |
|
|
| object_store_bytes = OBJECT_STORE_MINIMUM_MEMORY_BYTES |
|
|
| object_store_bytes = int(object_store_bytes) |
|
|
| if configured_heap_memory_bytes is None: |
| heap_mem_bytes = int(available_physical_mem_per_node - object_store_bytes) |
| else: |
| heap_mem_bytes = int(configured_heap_memory_bytes) |
|
|
| return heap_mem_bytes, object_store_bytes, warning_msg |
|
|
|
|
| |
| |
| |
| |
| |
| RAY_ON_SPARK_WORKER_CPU_CORES = "RAY_ON_SPARK_WORKER_CPU_CORES" |
| RAY_ON_SPARK_WORKER_GPU_NUM = "RAY_ON_SPARK_WORKER_GPU_NUM" |
| RAY_ON_SPARK_WORKER_PHYSICAL_MEMORY_BYTES = "RAY_ON_SPARK_WORKER_PHYSICAL_MEMORY_BYTES" |
| RAY_ON_SPARK_WORKER_SHARED_MEMORY_BYTES = "RAY_ON_SPARK_WORKER_SHARED_MEMORY_BYTES" |
|
|
| |
| |
| RAY_ON_SPARK_DRIVER_PHYSICAL_MEMORY_BYTES = "RAY_ON_SPARK_DRIVER_PHYSICAL_MEMORY_BYTES" |
| RAY_ON_SPARK_DRIVER_SHARED_MEMORY_BYTES = "RAY_ON_SPARK_DRIVER_SHARED_MEMORY_BYTES" |
|
|
|
|
| def _get_cpu_cores(): |
| import multiprocessing |
|
|
| if RAY_ON_SPARK_WORKER_CPU_CORES in os.environ: |
| |
| |
| |
| |
| |
| return int(os.environ[RAY_ON_SPARK_WORKER_CPU_CORES]) |
|
|
| return multiprocessing.cpu_count() |
|
|
|
|
| def _get_num_physical_gpus(): |
| if RAY_ON_SPARK_WORKER_GPU_NUM in os.environ: |
| |
| |
| |
| |
| |
| return int(os.environ[RAY_ON_SPARK_WORKER_GPU_NUM]) |
|
|
| if shutil.which("nvidia-smi") is None: |
| |
| return 0 |
| try: |
| completed_proc = subprocess.run( |
| "nvidia-smi --query-gpu=name --format=csv,noheader", |
| shell=True, |
| check=True, |
| text=True, |
| capture_output=True, |
| ) |
| return len(completed_proc.stdout.strip().split("\n")) |
| except Exception as e: |
| _logger.info( |
| "'nvidia-smi --query-gpu=name --format=csv,noheader' command execution " |
| f"failed, error: {repr(e)}" |
| ) |
| return 0 |
|
|
|
|
| def _get_local_ray_node_slots( |
| num_cpus, |
| num_gpus, |
| num_cpus_per_node, |
| num_gpus_per_node, |
| ): |
| if num_cpus_per_node > num_cpus: |
| raise ValueError( |
| "cpu number per Ray worker node should be <= spark worker node CPU cores, " |
| f"you set cpu number per Ray worker node to {num_cpus_per_node} but " |
| f"spark worker node CPU core number is {num_cpus}." |
| ) |
| num_ray_node_slots = num_cpus // num_cpus_per_node |
|
|
| if num_gpus_per_node > 0: |
| if num_gpus_per_node > num_gpus: |
| raise ValueError( |
| "gpu number per Ray worker node should be <= spark worker node " |
| "GPU number, you set GPU devices number per Ray worker node to " |
| f"{num_gpus_per_node} but spark worker node GPU devices number " |
| f"is {num_gpus}." |
| ) |
| if num_ray_node_slots > num_gpus // num_gpus_per_node: |
| num_ray_node_slots = num_gpus // num_gpus_per_node |
|
|
| return num_ray_node_slots |
|
|
|
|
| def _get_avail_mem_per_ray_worker_node( |
| num_cpus_per_node, |
| num_gpus_per_node, |
| heap_memory_per_node, |
| object_store_memory_per_node, |
| ): |
| """ |
| Returns tuple of ( |
| ray_worker_node_heap_mem_bytes, |
| ray_worker_node_object_store_bytes, |
| error_message, # always None |
| warning_message, |
| ) |
| """ |
| num_cpus = _get_cpu_cores() |
| if num_gpus_per_node > 0: |
| num_gpus = _get_num_physical_gpus() |
| else: |
| num_gpus = 0 |
|
|
| num_ray_node_slots = _get_local_ray_node_slots( |
| num_cpus, num_gpus, num_cpus_per_node, num_gpus_per_node |
| ) |
|
|
| physical_mem_bytes = _get_spark_worker_total_physical_memory() |
| shared_mem_bytes = _get_spark_worker_total_shared_memory() |
|
|
| ( |
| ray_worker_node_heap_mem_bytes, |
| ray_worker_node_object_store_bytes, |
| warning_msg, |
| ) = _calc_mem_per_ray_worker_node( |
| num_ray_node_slots, |
| physical_mem_bytes, |
| shared_mem_bytes, |
| heap_memory_per_node, |
| object_store_memory_per_node, |
| ) |
| return ( |
| ray_worker_node_heap_mem_bytes, |
| ray_worker_node_object_store_bytes, |
| None, |
| warning_msg, |
| ) |
|
|
|
|
| def get_avail_mem_per_ray_worker_node( |
| spark, |
| heap_memory_per_node, |
| object_store_memory_per_node, |
| num_cpus_per_node, |
| num_gpus_per_node, |
| ): |
| """ |
| Return the available heap memory and object store memory for each ray worker, |
| and error / warning message if it has. |
| Return value is a tuple of |
| (ray_worker_node_heap_mem_bytes, ray_worker_node_object_store_bytes, |
| error_message, warning_message) |
| NB: We have one ray node per spark task. |
| """ |
|
|
| def mapper(_): |
| try: |
| return _get_avail_mem_per_ray_worker_node( |
| num_cpus_per_node, |
| num_gpus_per_node, |
| heap_memory_per_node, |
| object_store_memory_per_node, |
| ) |
| except Exception as e: |
| import traceback |
|
|
| trace_msg = "\n".join(traceback.format_tb(e.__traceback__)) |
| return -1, -1, repr(e) + trace_msg, None |
|
|
| |
| |
| |
| ( |
| inferred_ray_worker_node_heap_mem_bytes, |
| inferred_ray_worker_node_object_store_bytes, |
| err, |
| warning_msg, |
| ) = ( |
| spark.sparkContext.parallelize([1], 1).map(mapper).collect()[0] |
| ) |
|
|
| if err is not None: |
| raise RuntimeError( |
| f"Inferring ray worker node available memory failed, error: {err}. " |
| "You can bypass this error by setting following spark configs: " |
| "spark.executorEnv.RAY_ON_SPARK_WORKER_CPU_CORES, " |
| "spark.executorEnv.RAY_ON_SPARK_WORKER_GPU_NUM, " |
| "spark.executorEnv.RAY_ON_SPARK_WORKER_PHYSICAL_MEMORY_BYTES, " |
| "spark.executorEnv.RAY_ON_SPARK_WORKER_SHARED_MEMORY_BYTES." |
| ) |
| if warning_msg is not None: |
| _logger.warning(warning_msg) |
| return ( |
| inferred_ray_worker_node_heap_mem_bytes, |
| inferred_ray_worker_node_object_store_bytes, |
| ) |
|
|
|
|
| def get_spark_task_assigned_physical_gpus(gpu_addr_list): |
| if "CUDA_VISIBLE_DEVICES" in os.environ: |
| visible_cuda_dev_list = [ |
| int(dev.strip()) for dev in os.environ["CUDA_VISIBLE_DEVICES"].split(",") |
| ] |
| return [visible_cuda_dev_list[addr] for addr in gpu_addr_list] |
| else: |
| return gpu_addr_list |
|
|