kushalExplores's picture
Upload folder using huggingface_hub
53dbcc1 verified
import abc
import asyncio
import typing
from functools import partial
from . import abstract
ContextType = typing.Type[abstract.AbstractContext]
OperationType = typing.Type[abstract.AbstractOperation]
class AsyncioContextBase(abc.ABC):
MAX_REQUESTS_DEFAULT = 512
CONTEXT_CLASS = None # type: ContextType
OPERATION_CLASS = None # type: OperationType
def __init__(self, max_requests=None, loop=None, **kwargs):
max_requests = max_requests or self.MAX_REQUESTS_DEFAULT
self.loop = loop or asyncio.get_event_loop()
self.semaphore = asyncio.BoundedSemaphore(max_requests)
self.context = self._create_context(max_requests, **kwargs)
def _create_context(self, max_requests, **kwargs):
return self.CONTEXT_CLASS(max_requests=max_requests, **kwargs)
def _destroy_context(self):
return
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.close()
def close(self):
self._destroy_context()
async def submit(self, op):
if not isinstance(op, self.OPERATION_CLASS):
raise ValueError("Operation object expected")
future = self.loop.create_future()
op.set_callback(partial(self._on_done, future))
async with self.semaphore:
if self.context.submit(op) != 1:
raise IOError("Operation was not submitted")
try:
await future
except asyncio.CancelledError:
try:
self.context.cancel(op)
except ValueError:
pass
raise
return op.get_value()
def _on_done(self, future, result):
"""
In general case impossible predict current thread and the thread
of event loop. So have to use `call_soon_threadsave` the result setter.
"""
if future.done():
return
self.loop.call_soon_threadsafe(
lambda: future.done() or future.set_result(True),
)
def read(
self, nbytes: int, fd: int,
offset: int, priority: int = 0,
) -> typing.Awaitable[bytes]:
return self.submit(
self.OPERATION_CLASS.read(nbytes, fd, offset, priority),
)
def write(
self, payload: bytes, fd: int,
offset: int, priority: int = 0,
) -> typing.Awaitable[int]:
return self.submit(
self.OPERATION_CLASS.write(payload, fd, offset, priority),
)
def fsync(self, fd: int) -> typing.Awaitable:
return self.submit(self.OPERATION_CLASS.fsync(fd))
def fdsync(self, fd: int) -> typing.Awaitable:
return self.submit(self.OPERATION_CLASS.fdsync(fd))