| import logging |
| from typing import Any, List, Optional |
|
|
| import ray.dashboard.consts as dashboard_consts |
| from ray._private.utils import ( |
| get_or_create_event_loop, |
| parse_pg_formatted_resources_to_original, |
| ) |
| from ray.dashboard.utils import ( |
| Dict, |
| MutableNotificationDict, |
| async_loop_forever, |
| compose_state_message, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| class DataSource: |
| |
| |
| node_stats = Dict() |
| |
| node_physical_stats = Dict() |
| |
| |
| actors = MutableNotificationDict() |
| |
| |
| agents = Dict() |
| |
| nodes = Dict() |
| |
| node_workers = Dict() |
| |
| node_actors = MutableNotificationDict() |
| |
| core_worker_stats = Dict() |
|
|
|
|
| class DataOrganizer: |
| head_node_ip = None |
|
|
| @staticmethod |
| @async_loop_forever(dashboard_consts.RAY_DASHBOARD_STATS_PURGING_INTERVAL) |
| async def purge(): |
| |
| |
| |
| |
| |
| alive_nodes = { |
| node_id |
| for node_id, node_info in DataSource.nodes.items() |
| if node_info["state"] == "ALIVE" |
| } |
| for key in DataSource.node_stats.keys() - alive_nodes: |
| DataSource.node_stats.pop(key) |
|
|
| for key in DataSource.node_physical_stats.keys() - alive_nodes: |
| DataSource.node_physical_stats.pop(key) |
|
|
| @classmethod |
| @async_loop_forever(dashboard_consts.RAY_DASHBOARD_STATS_UPDATING_INTERVAL) |
| async def organize(cls, thread_pool_executor): |
| """ |
| Organizes data: read from (node_physical_stats, node_stats) and updates |
| (node_workers, node_worker_stats). |
| |
| This methods is not really async, but DataSource is not thread safe so we need |
| to make sure it's on the main event loop thread. To avoid blocking the main |
| event loop, we yield after each node processed. |
| """ |
| loop = get_or_create_event_loop() |
|
|
| node_workers = {} |
| core_worker_stats = {} |
|
|
| |
| |
| |
| for node_id in list(DataSource.nodes.keys()): |
| node_physical_stats = DataSource.node_physical_stats.get(node_id, {}) |
| node_stats = DataSource.node_stats.get(node_id, {}) |
| |
| |
| workers = await loop.run_in_executor( |
| thread_pool_executor, |
| cls._extract_workers_for_node, |
| node_physical_stats, |
| node_stats, |
| ) |
|
|
| for worker in workers: |
| for stats in worker.get("coreWorkerStats", []): |
| worker_id = stats["workerId"] |
| core_worker_stats[worker_id] = stats |
|
|
| node_workers[node_id] = workers |
|
|
| DataSource.node_workers.reset(node_workers) |
| DataSource.core_worker_stats.reset(core_worker_stats) |
|
|
| @classmethod |
| def _extract_workers_for_node(cls, node_physical_stats, node_stats): |
| workers = [] |
| |
| pid_to_worker_stats = {} |
| pid_to_language = {} |
| pid_to_job_id = {} |
|
|
| for core_worker_stats in node_stats.get("coreWorkersStats", []): |
| pid = core_worker_stats["pid"] |
|
|
| pid_to_worker_stats[pid] = core_worker_stats |
| pid_to_language[pid] = core_worker_stats["language"] |
| pid_to_job_id[pid] = core_worker_stats["jobId"] |
|
|
| for worker in node_physical_stats.get("workers", []): |
| worker = dict(worker) |
| pid = worker["pid"] |
|
|
| core_worker_stats = pid_to_worker_stats.get(pid) |
| |
| worker["coreWorkerStats"] = [core_worker_stats] if core_worker_stats else [] |
| worker["language"] = pid_to_language.get( |
| pid, dashboard_consts.DEFAULT_LANGUAGE |
| ) |
| worker["jobId"] = pid_to_job_id.get(pid, dashboard_consts.DEFAULT_JOB_ID) |
|
|
| workers.append(worker) |
|
|
| return workers |
|
|
| @classmethod |
| async def get_node_info(cls, node_id, get_summary=False): |
| node_physical_stats = dict(DataSource.node_physical_stats.get(node_id, {})) |
| node_stats = dict(DataSource.node_stats.get(node_id, {})) |
| node = DataSource.nodes.get(node_id, {}) |
|
|
| if get_summary: |
| node_physical_stats.pop("workers", None) |
| node_stats.pop("workersStats", None) |
| else: |
| node_stats.pop("coreWorkersStats", None) |
| store_stats = node_stats.get("storeStats", {}) |
| used = int(store_stats.get("objectStoreBytesUsed", 0)) |
| |
| total = int(store_stats.get("objectStoreBytesAvail", 0)) |
| ray_stats = { |
| "object_store_used_memory": used, |
| "object_store_available_memory": total - used, |
| } |
|
|
| node_info = node_physical_stats |
| |
| node_info["raylet"] = node_stats |
| node_info["raylet"].update(ray_stats) |
|
|
| |
| node_info["raylet"].update(node) |
| death_info = node.get("deathInfo", {}) |
| node_info["raylet"]["stateMessage"] = compose_state_message( |
| death_info.get("reason", None), death_info.get("reasonMessage", None) |
| ) |
|
|
| if not get_summary: |
| actor_table_entries = DataSource.node_actors.get(node_id, {}) |
|
|
| |
| node_info["actors"] = { |
| actor_id: await DataOrganizer._get_actor_info(actor_table_entry) |
| for actor_id, actor_table_entry in actor_table_entries.items() |
| } |
|
|
| |
| node_info["workers"] = DataSource.node_workers.get(node_id, []) |
|
|
| return node_info |
|
|
| @classmethod |
| async def get_all_node_summary(cls): |
| return [ |
| |
| |
| await DataOrganizer.get_node_info(node_id, get_summary=True) |
| for node_id in DataSource.nodes.keys() |
| ] |
|
|
| @classmethod |
| async def get_agent_infos( |
| cls, target_node_ids: Optional[List[str]] = None |
| ) -> Dict[str, Dict[str, Any]]: |
| """Fetches running Agent (like HTTP/gRPC ports, IP, etc) running on every node |
| |
| :param target_node_ids: Target node ids to fetch agent info for. If omitted will |
| fetch the info for all agents |
| """ |
|
|
| |
| target_node_ids = target_node_ids or DataSource.agents.keys() |
|
|
| missing_node_ids = [ |
| node_id for node_id in target_node_ids if node_id not in DataSource.agents |
| ] |
| if missing_node_ids: |
| logger.warning( |
| f"Agent info was not found for {missing_node_ids}" |
| f" (having agent infos for {list(DataSource.agents.keys())})" |
| ) |
| return {} |
|
|
| def _create_agent_info(node_id: str): |
| (node_ip, http_port, grpc_port) = DataSource.agents[node_id] |
|
|
| return dict( |
| ipAddress=node_ip, |
| httpPort=int(http_port or -1), |
| grpcPort=int(grpc_port or -1), |
| httpAddress=f"{node_ip}:{http_port}", |
| ) |
|
|
| return {node_id: _create_agent_info(node_id) for node_id in target_node_ids} |
|
|
| @classmethod |
| async def get_actor_infos(cls, actor_ids: Optional[List[str]] = None): |
| target_actor_table_entries: dict[str, Optional[dict]] |
| if actor_ids is not None: |
| target_actor_table_entries = { |
| actor_id: DataSource.actors.get(actor_id) for actor_id in actor_ids |
| } |
| else: |
| target_actor_table_entries = DataSource.actors |
|
|
| return { |
| actor_id: await DataOrganizer._get_actor_info(actor_table_entry) |
| for actor_id, actor_table_entry in target_actor_table_entries.items() |
| } |
|
|
| @staticmethod |
| async def _get_actor_info(actor): |
| if actor is None: |
| return None |
|
|
| actor = dict(actor) |
| worker_id = actor["address"]["workerId"] |
| core_worker_stats = DataSource.core_worker_stats.get(worker_id, {}) |
| actor_constructor = core_worker_stats.get( |
| "actorTitle", "Unknown actor constructor" |
| ) |
| actor["actorConstructor"] = actor_constructor |
| actor.update(core_worker_stats) |
|
|
| |
| |
| node_id = actor["address"]["rayletId"] |
| pid = core_worker_stats.get("pid") |
| node_physical_stats = DataSource.node_physical_stats.get(node_id, {}) |
| actor_process_stats = None |
| actor_process_gpu_stats = [] |
| if pid: |
| for process_stats in node_physical_stats.get("workers", []): |
| if process_stats["pid"] == pid: |
| actor_process_stats = process_stats |
| break |
|
|
| for gpu_stats in node_physical_stats.get("gpus", []): |
| |
| |
| for process in gpu_stats.get("processesPids") or []: |
| if process["pid"] == pid: |
| actor_process_gpu_stats.append(gpu_stats) |
| break |
|
|
| actor["gpus"] = actor_process_gpu_stats |
| actor["processStats"] = actor_process_stats |
| actor["mem"] = node_physical_stats.get("mem", []) |
|
|
| required_resources = parse_pg_formatted_resources_to_original( |
| actor["requiredResources"] |
| ) |
| actor["requiredResources"] = required_resources |
|
|
| return actor |
|
|