import asyncio import collections.abc import io import os from abc import ABC, abstractmethod from pathlib import Path from types import MappingProxyType from typing import Any, Generator, Tuple, Union from .aio import AIOFile, FileIOType ENCODING_MAP = MappingProxyType({ "utf-8": 4, "utf-16": 8, "UTF-8": 4, "UTF-16": 8, }) async def unicode_reader( afp: AIOFile, chunk_size: int, offset: int, encoding: str = "utf-8", ) -> Tuple[int, str]: if chunk_size < 0: chunk_bytes = await afp.read_bytes(-1, offset) return len(chunk_bytes), chunk_bytes.decode(encoding=encoding) last_error = None for retry in range(ENCODING_MAP.get(encoding, 4)): chunk_bytes = await afp.read_bytes(chunk_size + retry, offset) try: chunk = chunk_bytes.decode(encoding=encoding) break except UnicodeDecodeError as e: last_error = e else: raise last_error # type: ignore chunk_size = len(chunk_bytes) return chunk_size, chunk class Reader(collections.abc.AsyncIterable): __slots__ = "_chunk_size", "__offset", "file", "__lock", "encoding" CHUNK_SIZE = 32 * 1024 def __init__( self, aio_file: AIOFile, offset: int = 0, chunk_size: int = CHUNK_SIZE, ): self.__lock = asyncio.Lock() self.__offset = int(offset) self._chunk_size = int(chunk_size) self.file = aio_file self.encoding = self.file.encoding async def read_chunk(self) -> Union[str, bytes]: async with self.__lock: if self.file.mode.binary: chunk = await self.file.read_bytes( self._chunk_size, self.__offset, ) # type: Union[str, bytes] chunk_size = len(chunk) else: chunk_size, chunk = await unicode_reader( self.file, self._chunk_size, self.__offset, encoding=self.encoding, ) self.__offset += chunk_size return chunk async def __anext__(self) -> Union[str, bytes]: chunk = await self.read_chunk() if not chunk: raise StopAsyncIteration(chunk) return chunk def __aiter__(self) -> "Reader": return self class Writer: __slots__ = "__chunk_size", "__offset", "__aio_file", "__lock" def __init__(self, aio_file: AIOFile, offset: int = 0): self.__offset = int(offset) self.__aio_file = aio_file self.__lock = asyncio.Lock() async def __call__(self, data: Union[str, bytes]) -> None: async with self.__lock: if isinstance(data, str): data = self.__aio_file.encode_bytes(data) await self.__aio_file.write_bytes(data, self.__offset) self.__offset += len(data) class LineReader(collections.abc.AsyncIterable): CHUNK_SIZE = 4192 def __init__( self, aio_file: AIOFile, offset: int = 0, chunk_size: int = CHUNK_SIZE, line_sep: str = "\n", ): self.__reader = Reader(aio_file, chunk_size=chunk_size, offset=offset) self._buffer = ( io.BytesIO() if aio_file.mode.binary else io.StringIO() ) # type: Any self.linesep = ( aio_file.encode_bytes(line_sep) if aio_file.mode.binary else line_sep ) async def readline(self) -> Union[str, bytes]: while True: line = self._buffer.readline() if line and line.endswith(self.linesep): return line buffer_remainder = line + self._buffer.read() self._buffer.truncate(0) self._buffer.seek(0) # No line in buffer, read more data chunk = await self.__reader.read_chunk() if not chunk: # No more data to read, return any remaining content in the buffer return buffer_remainder # We have more data to read, write it to the buffer and handle it in the next iteration self._buffer.write(buffer_remainder) self._buffer.write(chunk) self._buffer.seek(0) async def __anext__(self) -> Union[bytes, str]: line = await self.readline() if not line: # We are finished, close the buffer and raise StopAsyncIteration self._buffer.close() raise StopAsyncIteration(line) return line def __aiter__(self) -> "LineReader": return self class FileIOWrapperBase(ABC): _READLINE_CHUNK_SIZE = 4192 def __init__(self, afp: AIOFile, *, offset: int = 0): self._offset = offset self._lock = asyncio.Lock() self.file = afp if self.file.mode.appending: try: self._offset = os.stat(afp.name).st_size except FileNotFoundError: self._offset = 0 @abstractmethod async def read(self, length: int = -1) -> Any: raise NotImplementedError @abstractmethod async def write(self, data: Any) -> int: raise NotImplementedError @abstractmethod async def readline( self, size: int = -1, newline: Any = ..., ) -> Union[str, bytes]: raise NotImplementedError def seek(self, offset: int) -> None: self._offset = offset def tell(self) -> int: return self._offset async def flush(self, sync_metadata: bool = False) -> None: if sync_metadata: await self.file.fsync() else: await self.file.fdsync() async def close(self) -> None: await self.file.close() def __await__(self) -> Generator[None, None, "FileIOWrapperBase"]: yield from self.file.__await__() return self async def __aenter__(self) -> "FileIOWrapperBase": await self.file.open() return self async def __aexit__(self, *_: Any) -> None: await self.close() def __aiter__(self) -> LineReader: return LineReader(self.file) def iter_chunked(self, chunk_size: int = Reader.CHUNK_SIZE) -> Reader: return Reader(self.file, chunk_size=chunk_size, offset=self._offset) class BinaryFileWrapper(FileIOWrapperBase): def __init__(self, afp: AIOFile): if not afp.mode.binary: raise ValueError("Expected file in binary mode") super().__init__(afp) async def __read(self, length: int) -> bytes: data = await self.file.read_bytes(length, self._offset) self._offset += len(data) return data async def read(self, length: int = -1) -> bytes: async with self._lock: return await self.__read(length) async def write(self, data: bytes) -> int: async with self._lock: operation = self.file.write_bytes(data, self._offset) self._offset += len(data) await operation return len(data) async def readline(self, size: int = -1, newline: bytes = b"\n") -> bytes: async with self._lock: offset = self._offset with io.BytesIO() as fp: while True: chunk = await self.__read(self._READLINE_CHUNK_SIZE) if chunk: if newline not in chunk: fp.write(chunk) continue fp.write(chunk) if 0 < size <= fp.tell(): fp.seek(size) fp.truncate(size) return fp.getvalue() fp.seek(0) line = fp.readline() self._offset = offset + fp.tell() return line class TextFileWrapper(FileIOWrapperBase): def __init__(self, afp: AIOFile): if afp.mode.binary: raise ValueError("Expected file in text mode") super().__init__(afp) self.encoding = self.file.encoding async def __read(self, length: int) -> str: chunk_size = 0 offset = self._offset chunk = "" while length < 0 or length > len(chunk): part_offset, part = await unicode_reader( self.file, length, offset, self.encoding, ) if not part: break chunk += part offset += part_offset if chunk_size > length > 0: chunk = chunk[:length] offset = length self._offset = offset return chunk async def read(self, length: int = -1) -> str: async with self._lock: return await self.__read(length) async def write(self, data: str) -> int: async with self._lock: data_bytes = data.encode(self.encoding) operation = self.file.write_bytes(data_bytes, self._offset) self._offset += len(data_bytes) await operation return len(data_bytes) async def readline(self, size: int = -1, newline: str = "\n") -> str: async with self._lock: offset = self._offset with io.StringIO() as fp: while True: chunk = await self.__read(self._READLINE_CHUNK_SIZE) if chunk: if newline not in chunk: fp.write(chunk) continue fp.write(chunk) if 0 < size <= fp.tell(): fp.seek(size) fp.truncate(size) return fp.getvalue() fp.seek(0) line = fp.readline() self._offset = offset + len( line.encode(encoding=self.encoding), ) return line def async_open( file_specifier: Union[str, Path, FileIOType], mode: str = "r", *args: Any, **kwargs: Any, ) -> Union[BinaryFileWrapper, TextFileWrapper]: if isinstance(file_specifier, (str, Path)): afp = AIOFile(str(file_specifier), mode, *args, **kwargs) else: if args: raise ValueError("Arguments denied when IO[Any] opening.") afp = AIOFile.from_fp(file_specifier, **kwargs) if not afp.mode.binary: return TextFileWrapper(afp) return BinaryFileWrapper(afp) __all__ = ( "BinaryFileWrapper", "FileIOWrapperBase", "LineReader", "Reader", "TextFileWrapper", "Writer", "async_open", "unicode_reader", )