File size: 10,131 Bytes
cf6a8b4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 | import asyncio
from typing import Optional, Any, List, Dict
from collections.abc import Iterable
import ray
from ray.util.annotations import PublicAPI
@PublicAPI(stability="beta")
class Empty(Exception):
pass
@PublicAPI(stability="beta")
class Full(Exception):
pass
@PublicAPI(stability="beta")
class Queue:
"""A first-in, first-out queue implementation on Ray.
The behavior and use cases are similar to those of the asyncio.Queue class.
Features both sync and async put and get methods. Provides the option to
block until space is available when calling put on a full queue,
or to block until items are available when calling get on an empty queue.
Optionally supports batched put and get operations to minimize
serialization overhead.
Args:
maxsize (optional, int): maximum size of the queue. If zero, size is
unbounded.
actor_options (optional, Dict): Dictionary of options to pass into
the QueueActor during creation. These are directly passed into
QueueActor.options(...). This could be useful if you
need to pass in custom resource requirements, for example.
Examples:
.. testcode::
from ray.util.queue import Queue
q = Queue()
items = list(range(10))
for item in items:
q.put(item)
for item in items:
assert item == q.get()
# Create Queue with the underlying actor reserving 1 CPU.
q = Queue(actor_options={"num_cpus": 1})
"""
def __init__(self, maxsize: int = 0, actor_options: Optional[Dict] = None) -> None:
from ray._private.usage.usage_lib import record_library_usage
record_library_usage("util.Queue")
actor_options = actor_options or {}
self.maxsize = maxsize
self.actor = (
ray.remote(_QueueActor).options(**actor_options).remote(self.maxsize)
)
def __len__(self) -> int:
return self.size()
def size(self) -> int:
"""The size of the queue."""
return ray.get(self.actor.qsize.remote())
def qsize(self) -> int:
"""The size of the queue."""
return self.size()
def empty(self) -> bool:
"""Whether the queue is empty."""
return ray.get(self.actor.empty.remote())
def full(self) -> bool:
"""Whether the queue is full."""
return ray.get(self.actor.full.remote())
def put(
self, item: Any, block: bool = True, timeout: Optional[float] = None
) -> None:
"""Adds an item to the queue.
If block is True and the queue is full, blocks until the queue is no
longer full or until timeout.
There is no guarantee of order if multiple producers put to the same
full queue.
Raises:
Full: if the queue is full and blocking is False.
Full: if the queue is full, blocking is True, and it timed out.
ValueError: if timeout is negative.
"""
if not block:
try:
ray.get(self.actor.put_nowait.remote(item))
except asyncio.QueueFull:
raise Full
else:
if timeout is not None and timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
ray.get(self.actor.put.remote(item, timeout))
async def put_async(
self, item: Any, block: bool = True, timeout: Optional[float] = None
) -> None:
"""Adds an item to the queue.
If block is True and the queue is full,
blocks until the queue is no longer full or until timeout.
There is no guarantee of order if multiple producers put to the same
full queue.
Raises:
Full: if the queue is full and blocking is False.
Full: if the queue is full, blocking is True, and it timed out.
ValueError: if timeout is negative.
"""
if not block:
try:
await self.actor.put_nowait.remote(item)
except asyncio.QueueFull:
raise Full
else:
if timeout is not None and timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
await self.actor.put.remote(item, timeout)
def get(self, block: bool = True, timeout: Optional[float] = None) -> Any:
"""Gets an item from the queue.
If block is True and the queue is empty, blocks until the queue is no
longer empty or until timeout.
There is no guarantee of order if multiple consumers get from the
same empty queue.
Returns:
The next item in the queue.
Raises:
Empty: if the queue is empty and blocking is False.
Empty: if the queue is empty, blocking is True, and it timed out.
ValueError: if timeout is negative.
"""
if not block:
try:
return ray.get(self.actor.get_nowait.remote())
except asyncio.QueueEmpty:
raise Empty
else:
if timeout is not None and timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
return ray.get(self.actor.get.remote(timeout))
async def get_async(
self, block: bool = True, timeout: Optional[float] = None
) -> Any:
"""Gets an item from the queue.
There is no guarantee of order if multiple consumers get from the
same empty queue.
Returns:
The next item in the queue.
Raises:
Empty: if the queue is empty and blocking is False.
Empty: if the queue is empty, blocking is True, and it timed out.
ValueError: if timeout is negative.
"""
if not block:
try:
return await self.actor.get_nowait.remote()
except asyncio.QueueEmpty:
raise Empty
else:
if timeout is not None and timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
return await self.actor.get.remote(timeout)
def put_nowait(self, item: Any) -> None:
"""Equivalent to put(item, block=False).
Raises:
Full: if the queue is full.
"""
return self.put(item, block=False)
def put_nowait_batch(self, items: Iterable) -> None:
"""Takes in a list of items and puts them into the queue in order.
Raises:
Full: if the items will not fit in the queue
"""
if not isinstance(items, Iterable):
raise TypeError("Argument 'items' must be an Iterable")
ray.get(self.actor.put_nowait_batch.remote(items))
def get_nowait(self) -> Any:
"""Equivalent to get(block=False).
Raises:
Empty: if the queue is empty.
"""
return self.get(block=False)
def get_nowait_batch(self, num_items: int) -> List[Any]:
"""Gets items from the queue and returns them in a
list in order.
Raises:
Empty: if the queue does not contain the desired number of items
"""
if not isinstance(num_items, int):
raise TypeError("Argument 'num_items' must be an int")
if num_items < 0:
raise ValueError("'num_items' must be nonnegative")
return ray.get(self.actor.get_nowait_batch.remote(num_items))
def shutdown(self, force: bool = False, grace_period_s: int = 5) -> None:
"""Terminates the underlying QueueActor.
All of the resources reserved by the queue will be released.
Args:
force: If True, forcefully kill the actor, causing an
immediate failure. If False, graceful
actor termination will be attempted first, before falling back
to a forceful kill.
grace_period_s: If force is False, how long in seconds to
wait for graceful termination before falling back to
forceful kill.
"""
if self.actor:
if force:
ray.kill(self.actor, no_restart=True)
else:
done_ref = self.actor.__ray_terminate__.remote()
done, not_done = ray.wait([done_ref], timeout=grace_period_s)
if not_done:
ray.kill(self.actor, no_restart=True)
self.actor = None
class _QueueActor:
def __init__(self, maxsize):
self.maxsize = maxsize
self.queue = asyncio.Queue(self.maxsize)
def qsize(self):
return self.queue.qsize()
def empty(self):
return self.queue.empty()
def full(self):
return self.queue.full()
async def put(self, item, timeout=None):
try:
await asyncio.wait_for(self.queue.put(item), timeout)
except asyncio.TimeoutError:
raise Full
async def get(self, timeout=None):
try:
return await asyncio.wait_for(self.queue.get(), timeout)
except asyncio.TimeoutError:
raise Empty
def put_nowait(self, item):
self.queue.put_nowait(item)
def put_nowait_batch(self, items):
# If maxsize is 0, queue is unbounded, so no need to check size.
if self.maxsize > 0 and len(items) + self.qsize() > self.maxsize:
raise Full(
f"Cannot add {len(items)} items to queue of size "
f"{self.qsize()} and maxsize {self.maxsize}."
)
for item in items:
self.queue.put_nowait(item)
def get_nowait(self):
return self.queue.get_nowait()
def get_nowait_batch(self, num_items):
if num_items > self.qsize():
raise Empty(
f"Cannot get {num_items} items from queue of size " f"{self.qsize()}."
)
return [self.queue.get_nowait() for _ in range(num_items)]
|