| """ |
| patch_stdout |
| ============ |
| |
| This implements a context manager that ensures that print statements within |
| it won't destroy the user interface. The context manager will replace |
| `sys.stdout` by something that draws the output above the current prompt, |
| rather than overwriting the UI. |
| |
| Usage:: |
| |
| with patch_stdout(application): |
| ... |
| application.run() |
| ... |
| |
| Multiple applications can run in the body of the context manager, one after the |
| other. |
| """ |
| from __future__ import annotations |
|
|
| import asyncio |
| import queue |
| import sys |
| import threading |
| import time |
| from contextlib import contextmanager |
| from typing import Generator, TextIO, cast |
|
|
| from .application import get_app_session, run_in_terminal |
| from .output import Output |
|
|
| __all__ = [ |
| "patch_stdout", |
| "StdoutProxy", |
| ] |
|
|
|
|
| @contextmanager |
| def patch_stdout(raw: bool = False) -> Generator[None, None, None]: |
| """ |
| Replace `sys.stdout` by an :class:`_StdoutProxy` instance. |
| |
| Writing to this proxy will make sure that the text appears above the |
| prompt, and that it doesn't destroy the output from the renderer. If no |
| application is curring, the behavior should be identical to writing to |
| `sys.stdout` directly. |
| |
| Warning: If a new event loop is installed using `asyncio.set_event_loop()`, |
| then make sure that the context manager is applied after the event loop |
| is changed. Printing to stdout will be scheduled in the event loop |
| that's active when the context manager is created. |
| |
| :param raw: (`bool`) When True, vt100 terminal escape sequences are not |
| removed/escaped. |
| """ |
| with StdoutProxy(raw=raw) as proxy: |
| original_stdout = sys.stdout |
| original_stderr = sys.stderr |
|
|
| |
| sys.stdout = cast(TextIO, proxy) |
| sys.stderr = cast(TextIO, proxy) |
|
|
| try: |
| yield |
| finally: |
| sys.stdout = original_stdout |
| sys.stderr = original_stderr |
|
|
|
|
| class _Done: |
| "Sentinel value for stopping the stdout proxy." |
|
|
|
|
| class StdoutProxy: |
| """ |
| File-like object, which prints everything written to it, output above the |
| current application/prompt. This class is compatible with other file |
| objects and can be used as a drop-in replacement for `sys.stdout` or can |
| for instance be passed to `logging.StreamHandler`. |
| |
| The current application, above which we print, is determined by looking |
| what application currently runs in the `AppSession` that is active during |
| the creation of this instance. |
| |
| This class can be used as a context manager. |
| |
| In order to avoid having to repaint the prompt continuously for every |
| little write, a short delay of `sleep_between_writes` seconds will be added |
| between writes in order to bundle many smaller writes in a short timespan. |
| """ |
|
|
| def __init__( |
| self, |
| sleep_between_writes: float = 0.2, |
| raw: bool = False, |
| ) -> None: |
| self.sleep_between_writes = sleep_between_writes |
| self.raw = raw |
|
|
| self._lock = threading.RLock() |
| self._buffer: list[str] = [] |
|
|
| |
| self.app_session = get_app_session() |
|
|
| |
| |
| |
| |
| |
| |
| self._output: Output = self.app_session.output |
|
|
| |
| self._flush_queue: queue.Queue[str | _Done] = queue.Queue() |
| self._flush_thread = self._start_write_thread() |
| self.closed = False |
|
|
| def __enter__(self) -> StdoutProxy: |
| return self |
|
|
| def __exit__(self, *args: object) -> None: |
| self.close() |
|
|
| def close(self) -> None: |
| """ |
| Stop `StdoutProxy` proxy. |
| |
| This will terminate the write thread, make sure everything is flushed |
| and wait for the write thread to finish. |
| """ |
| if not self.closed: |
| self._flush_queue.put(_Done()) |
| self._flush_thread.join() |
| self.closed = True |
|
|
| def _start_write_thread(self) -> threading.Thread: |
| thread = threading.Thread( |
| target=self._write_thread, |
| name="patch-stdout-flush-thread", |
| daemon=True, |
| ) |
| thread.start() |
| return thread |
|
|
| def _write_thread(self) -> None: |
| done = False |
|
|
| while not done: |
| item = self._flush_queue.get() |
|
|
| if isinstance(item, _Done): |
| break |
|
|
| |
| if not item: |
| continue |
|
|
| text = [] |
| text.append(item) |
|
|
| |
| while True: |
| try: |
| item = self._flush_queue.get_nowait() |
| except queue.Empty: |
| break |
| else: |
| if isinstance(item, _Done): |
| done = True |
| else: |
| text.append(item) |
|
|
| app_loop = self._get_app_loop() |
| self._write_and_flush(app_loop, "".join(text)) |
|
|
| |
| |
| |
| if app_loop is not None: |
| time.sleep(self.sleep_between_writes) |
|
|
| def _get_app_loop(self) -> asyncio.AbstractEventLoop | None: |
| """ |
| Return the event loop for the application currently running in our |
| `AppSession`. |
| """ |
| app = self.app_session.app |
|
|
| if app is None: |
| return None |
|
|
| return app.loop |
|
|
| def _write_and_flush( |
| self, loop: asyncio.AbstractEventLoop | None, text: str |
| ) -> None: |
| """ |
| Write the given text to stdout and flush. |
| If an application is running, use `run_in_terminal`. |
| """ |
|
|
| def write_and_flush() -> None: |
| |
| |
| |
| |
| |
| self._output.enable_autowrap() |
|
|
| if self.raw: |
| self._output.write_raw(text) |
| else: |
| self._output.write(text) |
|
|
| self._output.flush() |
|
|
| def write_and_flush_in_loop() -> None: |
| |
| |
| run_in_terminal(write_and_flush, in_executor=False) |
|
|
| if loop is None: |
| |
| write_and_flush() |
| else: |
| |
| |
| loop.call_soon_threadsafe(write_and_flush_in_loop) |
|
|
| def _write(self, data: str) -> None: |
| """ |
| Note: print()-statements cause to multiple write calls. |
| (write('line') and write('\n')). Of course we don't want to call |
| `run_in_terminal` for every individual call, because that's too |
| expensive, and as long as the newline hasn't been written, the |
| text itself is again overwritten by the rendering of the input |
| command line. Therefor, we have a little buffer which holds the |
| text until a newline is written to stdout. |
| """ |
| if "\n" in data: |
| |
| |
| before, after = data.rsplit("\n", 1) |
| to_write = self._buffer + [before, "\n"] |
| self._buffer = [after] |
|
|
| text = "".join(to_write) |
| self._flush_queue.put(text) |
| else: |
| |
| self._buffer.append(data) |
|
|
| def _flush(self) -> None: |
| text = "".join(self._buffer) |
| self._buffer = [] |
| self._flush_queue.put(text) |
|
|
| def write(self, data: str) -> int: |
| with self._lock: |
| self._write(data) |
|
|
| return len(data) |
|
|
| def flush(self) -> None: |
| """ |
| Flush buffered output. |
| """ |
| with self._lock: |
| self._flush() |
|
|
| @property |
| def original_stdout(self) -> TextIO: |
| return self._output.stdout or sys.__stdout__ |
|
|
| |
|
|
| def fileno(self) -> int: |
| return self._output.fileno() |
|
|
| def isatty(self) -> bool: |
| stdout = self._output.stdout |
| if stdout is None: |
| return False |
|
|
| return stdout.isatty() |
|
|
| @property |
| def encoding(self) -> str: |
| return self._output.encoding() |
|
|
| @property |
| def errors(self) -> str: |
| return "strict" |
|
|