File size: 10,607 Bytes
53dbcc1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
import asyncio
import collections.abc
import io
import os
from abc import ABC, abstractmethod
from pathlib import Path
from types import MappingProxyType
from typing import Any, Generator, Tuple, Union

from .aio import AIOFile, FileIOType


ENCODING_MAP = MappingProxyType({
    "utf-8": 4,
    "utf-16": 8,
    "UTF-8": 4,
    "UTF-16": 8,
})


async def unicode_reader(
    afp: AIOFile, chunk_size: int, offset: int, encoding: str = "utf-8",
) -> Tuple[int, str]:

    if chunk_size < 0:
        chunk_bytes = await afp.read_bytes(-1, offset)
        return len(chunk_bytes), chunk_bytes.decode(encoding=encoding)

    last_error = None
    for retry in range(ENCODING_MAP.get(encoding, 4)):
        chunk_bytes = await afp.read_bytes(chunk_size + retry, offset)
        try:
            chunk = chunk_bytes.decode(encoding=encoding)
            break
        except UnicodeDecodeError as e:
            last_error = e
    else:
        raise last_error    # type: ignore

    chunk_size = len(chunk_bytes)

    return chunk_size, chunk


class Reader(collections.abc.AsyncIterable):
    __slots__ = "_chunk_size", "__offset", "file", "__lock", "encoding"

    CHUNK_SIZE = 32 * 1024

    def __init__(
        self, aio_file: AIOFile, offset: int = 0,
        chunk_size: int = CHUNK_SIZE,
    ):

        self.__lock = asyncio.Lock()
        self.__offset = int(offset)

        self._chunk_size = int(chunk_size)
        self.file = aio_file
        self.encoding = self.file.encoding

    async def read_chunk(self) -> Union[str, bytes]:
        async with self.__lock:
            if self.file.mode.binary:
                chunk = await self.file.read_bytes(
                    self._chunk_size, self.__offset,
                )   # type: Union[str, bytes]
                chunk_size = len(chunk)
            else:
                chunk_size, chunk = await unicode_reader(
                    self.file, self._chunk_size, self.__offset,
                    encoding=self.encoding,
                )
        self.__offset += chunk_size
        return chunk

    async def __anext__(self) -> Union[str, bytes]:
        chunk = await self.read_chunk()

        if not chunk:
            raise StopAsyncIteration(chunk)

        return chunk

    def __aiter__(self) -> "Reader":
        return self


class Writer:
    __slots__ = "__chunk_size", "__offset", "__aio_file", "__lock"

    def __init__(self, aio_file: AIOFile, offset: int = 0):
        self.__offset = int(offset)
        self.__aio_file = aio_file
        self.__lock = asyncio.Lock()

    async def __call__(self, data: Union[str, bytes]) -> None:
        async with self.__lock:
            if isinstance(data, str):
                data = self.__aio_file.encode_bytes(data)

            await self.__aio_file.write_bytes(data, self.__offset)
            self.__offset += len(data)


class LineReader(collections.abc.AsyncIterable):
    CHUNK_SIZE = 4192

    def __init__(
        self, aio_file: AIOFile, offset: int = 0,
        chunk_size: int = CHUNK_SIZE, line_sep: str = "\n",
    ):
        self.__reader = Reader(aio_file, chunk_size=chunk_size, offset=offset)

        self._buffer = (
            io.BytesIO() if aio_file.mode.binary else io.StringIO()
        )   # type: Any

        self.linesep = (
            aio_file.encode_bytes(line_sep)
            if aio_file.mode.binary
            else line_sep
        )

    async def readline(self) -> Union[str, bytes]:
        while True:
            line = self._buffer.readline()
            if line and line.endswith(self.linesep):
                return line

            buffer_remainder = line + self._buffer.read()
            self._buffer.truncate(0)
            self._buffer.seek(0)

            # No line in buffer, read more data
            chunk = await self.__reader.read_chunk()
            if not chunk:
                # No more data to read, return any remaining content in the buffer
                return buffer_remainder
            # We have more data to read, write it to the buffer and handle it in the next iteration
            self._buffer.write(buffer_remainder)
            self._buffer.write(chunk)
            self._buffer.seek(0)

    async def __anext__(self) -> Union[bytes, str]:
        line = await self.readline()

        if not line:
            # We are finished, close the buffer and raise StopAsyncIteration
            self._buffer.close()
            raise StopAsyncIteration(line)

        return line

    def __aiter__(self) -> "LineReader":
        return self


class FileIOWrapperBase(ABC):
    _READLINE_CHUNK_SIZE = 4192

    def __init__(self, afp: AIOFile, *, offset: int = 0):
        self._offset = offset
        self._lock = asyncio.Lock()
        self.file = afp

        if self.file.mode.appending:
            try:
                self._offset = os.stat(afp.name).st_size
            except FileNotFoundError:
                self._offset = 0

    @abstractmethod
    async def read(self, length: int = -1) -> Any:
        raise NotImplementedError

    @abstractmethod
    async def write(self, data: Any) -> int:
        raise NotImplementedError

    @abstractmethod
    async def readline(
        self, size: int = -1, newline: Any = ...,
    ) -> Union[str, bytes]:
        raise NotImplementedError

    def seek(self, offset: int) -> None:
        self._offset = offset

    def tell(self) -> int:
        return self._offset

    async def flush(self, sync_metadata: bool = False) -> None:
        if sync_metadata:
            await self.file.fsync()
        else:
            await self.file.fdsync()

    async def close(self) -> None:
        await self.file.close()

    def __await__(self) -> Generator[None, None, "FileIOWrapperBase"]:
        yield from self.file.__await__()
        return self

    async def __aenter__(self) -> "FileIOWrapperBase":
        await self.file.open()
        return self

    async def __aexit__(self, *_: Any) -> None:
        await self.close()

    def __aiter__(self) -> LineReader:
        return LineReader(self.file)

    def iter_chunked(self, chunk_size: int = Reader.CHUNK_SIZE) -> Reader:
        return Reader(self.file, chunk_size=chunk_size, offset=self._offset)


class BinaryFileWrapper(FileIOWrapperBase):
    def __init__(self, afp: AIOFile):
        if not afp.mode.binary:
            raise ValueError("Expected file in binary mode")
        super().__init__(afp)

    async def __read(self, length: int) -> bytes:
        data = await self.file.read_bytes(length, self._offset)
        self._offset += len(data)
        return data

    async def read(self, length: int = -1) -> bytes:
        async with self._lock:
            return await self.__read(length)

    async def write(self, data: bytes) -> int:
        async with self._lock:
            operation = self.file.write_bytes(data, self._offset)
            self._offset += len(data)
        await operation
        return len(data)

    async def readline(self, size: int = -1, newline: bytes = b"\n") -> bytes:
        async with self._lock:
            offset = self._offset
            with io.BytesIO() as fp:
                while True:
                    chunk = await self.__read(self._READLINE_CHUNK_SIZE)

                    if chunk:
                        if newline not in chunk:
                            fp.write(chunk)
                            continue

                        fp.write(chunk)

                    if 0 < size <= fp.tell():
                        fp.seek(size)
                        fp.truncate(size)
                        return fp.getvalue()

                    fp.seek(0)
                    line = fp.readline()
                    self._offset = offset + fp.tell()
                    return line


class TextFileWrapper(FileIOWrapperBase):
    def __init__(self, afp: AIOFile):
        if afp.mode.binary:
            raise ValueError("Expected file in text mode")
        super().__init__(afp)
        self.encoding = self.file.encoding

    async def __read(self, length: int) -> str:
        chunk_size = 0
        offset = self._offset
        chunk = ""
        while length < 0 or length > len(chunk):
            part_offset, part = await unicode_reader(
                self.file, length, offset, self.encoding,
            )

            if not part:
                break

            chunk += part
            offset += part_offset

        if chunk_size > length > 0:
            chunk = chunk[:length]
            offset = length

        self._offset = offset
        return chunk

    async def read(self, length: int = -1) -> str:
        async with self._lock:
            return await self.__read(length)

    async def write(self, data: str) -> int:
        async with self._lock:
            data_bytes = data.encode(self.encoding)
            operation = self.file.write_bytes(data_bytes, self._offset)
            self._offset += len(data_bytes)

        await operation
        return len(data_bytes)

    async def readline(self, size: int = -1, newline: str = "\n") -> str:
        async with self._lock:
            offset = self._offset
            with io.StringIO() as fp:
                while True:
                    chunk = await self.__read(self._READLINE_CHUNK_SIZE)

                    if chunk:
                        if newline not in chunk:
                            fp.write(chunk)
                            continue

                        fp.write(chunk)

                    if 0 < size <= fp.tell():
                        fp.seek(size)
                        fp.truncate(size)
                        return fp.getvalue()

                    fp.seek(0)
                    line = fp.readline()
                    self._offset = offset + len(
                        line.encode(encoding=self.encoding),
                    )
                    return line


def async_open(
    file_specifier: Union[str, Path, FileIOType],
    mode: str = "r", *args: Any, **kwargs: Any,
) -> Union[BinaryFileWrapper, TextFileWrapper]:
    if isinstance(file_specifier, (str, Path)):
        afp = AIOFile(str(file_specifier), mode, *args, **kwargs)
    else:
        if args:
            raise ValueError("Arguments denied when IO[Any] opening.")
        afp = AIOFile.from_fp(file_specifier, **kwargs)

    if not afp.mode.binary:
        return TextFileWrapper(afp)

    return BinaryFileWrapper(afp)


__all__ = (
    "BinaryFileWrapper",
    "FileIOWrapperBase",
    "LineReader",
    "Reader",
    "TextFileWrapper",
    "Writer",
    "async_open",
    "unicode_reader",
)