| import os |
| import json |
| from collections import defaultdict |
| from dataclasses import dataclass, asdict |
| from typing import List, Dict, Union |
|
|
| import ray |
|
|
|
|
| class _NullLogSpan: |
| """A log span context manager that does nothing""" |
|
|
| def __enter__(self): |
| pass |
|
|
| def __exit__(self, type, value, tb): |
| pass |
|
|
|
|
| PROFILING_ENABLED = "RAY_PROFILING" in os.environ |
| NULL_LOG_SPAN = _NullLogSpan() |
|
|
| |
| |
| _default_color_mapping = defaultdict( |
| lambda: "generic_work", |
| { |
| "worker_idle": "cq_build_abandoned", |
| "task": "rail_response", |
| "task:deserialize_arguments": "rail_load", |
| "task:execute": "rail_animation", |
| "task:store_outputs": "rail_idle", |
| "wait_for_function": "detailed_memory_dump", |
| "ray.get": "good", |
| "ray.put": "terrible", |
| "ray.wait": "vsync_highlight_color", |
| "submit_task": "background_memory_dump", |
| "fetch_and_run_function": "detailed_memory_dump", |
| "register_remote_function": "detailed_memory_dump", |
| }, |
| ) |
|
|
|
|
| @dataclass(init=True) |
| class ChromeTracingCompleteEvent: |
| |
| |
| |
| |
| cat: str |
| |
| name: str |
| |
| |
| pid: int |
| |
| tid: int |
| |
| ts: int |
| |
| dur: int |
| |
| cname: str |
| |
| args: Dict[str, Union[str, int]] |
| |
| ph: str = "X" |
|
|
|
|
| @dataclass(init=True) |
| class ChromeTracingMetadataEvent: |
| |
| name: str |
| |
| args: Dict[str, str] |
| |
| pid: int |
| |
| tid: int = None |
| |
| ph: str = "M" |
|
|
|
|
| def profile(event_type, extra_data=None): |
| """Profile a span of time so that it appears in the timeline visualization. |
| |
| Note that this only works in the raylet code path. |
| |
| This function can be used as follows (both on the driver or within a task). |
| |
| .. testcode:: |
| import ray._private.profiling as profiling |
| |
| with profiling.profile("custom event", extra_data={'key': 'val'}): |
| # Do some computation here. |
| x = 1 * 2 |
| |
| Optionally, a dictionary can be passed as the "extra_data" argument, and |
| it can have keys "name" and "cname" if you want to override the default |
| timeline display text and box color. Other values will appear at the bottom |
| of the chrome tracing GUI when you click on the box corresponding to this |
| profile span. |
| |
| Args: |
| event_type: A string describing the type of the event. |
| extra_data: This must be a dictionary mapping strings to strings. This |
| data will be added to the json objects that are used to populate |
| the timeline, so if you want to set a particular color, you can |
| simply set the "cname" attribute to an appropriate color. |
| Similarly, if you set the "name" attribute, then that will set the |
| text displayed on the box in the timeline. |
| |
| Returns: |
| An object that can profile a span of time via a "with" statement. |
| """ |
| if not PROFILING_ENABLED: |
| return NULL_LOG_SPAN |
| worker = ray._private.worker.global_worker |
| if worker.mode == ray._private.worker.LOCAL_MODE: |
| return NULL_LOG_SPAN |
| return worker.core_worker.profile_event(event_type.encode("ascii"), extra_data) |
|
|
|
|
| def chrome_tracing_dump( |
| tasks: List[dict], |
| ) -> str: |
| """Generate a chrome/perfetto tracing dump using task events. |
| |
| Args: |
| tasks: List of tasks generated by a state API list_tasks(detail=True). |
| |
| Returns: |
| Json serialized dump to create a chrome/perfetto tracing. |
| """ |
| |
| all_events = [] |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| node_to_index = {} |
| |
| node_idx = 0 |
| |
| worker_to_index = {} |
| |
| worker_idx = 0 |
|
|
| for task in tasks: |
| profiling_data = task.get("profiling_data", []) |
| if profiling_data: |
| node_ip_address = profiling_data["node_ip_address"] |
| component_events = profiling_data["events"] |
| component_type = profiling_data["component_type"] |
| component_id = component_type + ":" + profiling_data["component_id"] |
|
|
| if component_type not in ["worker", "driver"]: |
| continue |
|
|
| for event in component_events: |
| extra_data = event["extra_data"] |
| |
| extra_data["task_id"] = task["task_id"] |
| extra_data["job_id"] = task["job_id"] |
| extra_data["attempt_number"] = task["attempt_number"] |
| extra_data["func_or_class_name"] = task["func_or_class_name"] |
| extra_data["actor_id"] = task["actor_id"] |
| event_name = event["event_name"] |
|
|
| |
| if node_ip_address not in node_to_index: |
| node_to_index[node_ip_address] = node_idx |
| |
| node_idx += 1 |
|
|
| if ( |
| node_to_index[node_ip_address], |
| component_id, |
| ) not in worker_to_index: |
| worker_to_index[ |
| (node_to_index[node_ip_address], component_id) |
| ] = worker_idx |
| worker_idx += 1 |
|
|
| |
| cname = _default_color_mapping[event["event_name"]] |
| name = event_name |
|
|
| if "cname" in extra_data: |
| cname = _default_color_mapping[event["extra_data"]["cname"]] |
| if "name" in extra_data: |
| name = extra_data["name"] |
|
|
| new_event = ChromeTracingCompleteEvent( |
| cat=event_name, |
| name=name, |
| pid=node_to_index[node_ip_address], |
| tid=worker_to_index[(node_to_index[node_ip_address], component_id)], |
| ts=event["start_time"] * 1e3, |
| dur=(event["end_time"] * 1e3) - (event["start_time"] * 1e3), |
| cname=cname, |
| args=extra_data, |
| ) |
| all_events.append(asdict(new_event)) |
|
|
| for node, i in node_to_index.items(): |
| all_events.append( |
| asdict( |
| ChromeTracingMetadataEvent( |
| name="process_name", |
| pid=i, |
| args={"name": f"Node {node}"}, |
| ) |
| ) |
| ) |
|
|
| for worker, i in worker_to_index.items(): |
| all_events.append( |
| asdict( |
| ChromeTracingMetadataEvent( |
| name="thread_name", |
| ph="M", |
| tid=i, |
| pid=worker[0], |
| args={"name": worker[1]}, |
| ) |
| ) |
| ) |
|
|
| |
| return json.dumps(all_events) |
|
|