summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/click/_winconsole.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/click/_winconsole.py')
-rw-r--r--venv/lib/python3.11/site-packages/click/_winconsole.py279
1 files changed, 0 insertions, 279 deletions
diff --git a/venv/lib/python3.11/site-packages/click/_winconsole.py b/venv/lib/python3.11/site-packages/click/_winconsole.py
deleted file mode 100644
index 6b20df3..0000000
--- a/venv/lib/python3.11/site-packages/click/_winconsole.py
+++ /dev/null
@@ -1,279 +0,0 @@
-# This module is based on the excellent work by Adam Bartoš who
-# provided a lot of what went into the implementation here in
-# the discussion to issue1602 in the Python bug tracker.
-#
-# There are some general differences in regards to how this works
-# compared to the original patches as we do not need to patch
-# the entire interpreter but just work in our little world of
-# echo and prompt.
-import io
-import sys
-import time
-import typing as t
-from ctypes import byref
-from ctypes import c_char
-from ctypes import c_char_p
-from ctypes import c_int
-from ctypes import c_ssize_t
-from ctypes import c_ulong
-from ctypes import c_void_p
-from ctypes import POINTER
-from ctypes import py_object
-from ctypes import Structure
-from ctypes.wintypes import DWORD
-from ctypes.wintypes import HANDLE
-from ctypes.wintypes import LPCWSTR
-from ctypes.wintypes import LPWSTR
-
-from ._compat import _NonClosingTextIOWrapper
-
-assert sys.platform == "win32"
-import msvcrt # noqa: E402
-from ctypes import windll # noqa: E402
-from ctypes import WINFUNCTYPE # noqa: E402
-
-c_ssize_p = POINTER(c_ssize_t)
-
-kernel32 = windll.kernel32
-GetStdHandle = kernel32.GetStdHandle
-ReadConsoleW = kernel32.ReadConsoleW
-WriteConsoleW = kernel32.WriteConsoleW
-GetConsoleMode = kernel32.GetConsoleMode
-GetLastError = kernel32.GetLastError
-GetCommandLineW = WINFUNCTYPE(LPWSTR)(("GetCommandLineW", windll.kernel32))
-CommandLineToArgvW = WINFUNCTYPE(POINTER(LPWSTR), LPCWSTR, POINTER(c_int))(
- ("CommandLineToArgvW", windll.shell32)
-)
-LocalFree = WINFUNCTYPE(c_void_p, c_void_p)(("LocalFree", windll.kernel32))
-
-STDIN_HANDLE = GetStdHandle(-10)
-STDOUT_HANDLE = GetStdHandle(-11)
-STDERR_HANDLE = GetStdHandle(-12)
-
-PyBUF_SIMPLE = 0
-PyBUF_WRITABLE = 1
-
-ERROR_SUCCESS = 0
-ERROR_NOT_ENOUGH_MEMORY = 8
-ERROR_OPERATION_ABORTED = 995
-
-STDIN_FILENO = 0
-STDOUT_FILENO = 1
-STDERR_FILENO = 2
-
-EOF = b"\x1a"
-MAX_BYTES_WRITTEN = 32767
-
-try:
- from ctypes import pythonapi
-except ImportError:
- # On PyPy we cannot get buffers so our ability to operate here is
- # severely limited.
- get_buffer = None
-else:
-
- class Py_buffer(Structure):
- _fields_ = [
- ("buf", c_void_p),
- ("obj", py_object),
- ("len", c_ssize_t),
- ("itemsize", c_ssize_t),
- ("readonly", c_int),
- ("ndim", c_int),
- ("format", c_char_p),
- ("shape", c_ssize_p),
- ("strides", c_ssize_p),
- ("suboffsets", c_ssize_p),
- ("internal", c_void_p),
- ]
-
- PyObject_GetBuffer = pythonapi.PyObject_GetBuffer
- PyBuffer_Release = pythonapi.PyBuffer_Release
-
- def get_buffer(obj, writable=False):
- buf = Py_buffer()
- flags = PyBUF_WRITABLE if writable else PyBUF_SIMPLE
- PyObject_GetBuffer(py_object(obj), byref(buf), flags)
-
- try:
- buffer_type = c_char * buf.len
- return buffer_type.from_address(buf.buf)
- finally:
- PyBuffer_Release(byref(buf))
-
-
-class _WindowsConsoleRawIOBase(io.RawIOBase):
- def __init__(self, handle):
- self.handle = handle
-
- def isatty(self):
- super().isatty()
- return True
-
-
-class _WindowsConsoleReader(_WindowsConsoleRawIOBase):
- def readable(self):
- return True
-
- def readinto(self, b):
- bytes_to_be_read = len(b)
- if not bytes_to_be_read:
- return 0
- elif bytes_to_be_read % 2:
- raise ValueError(
- "cannot read odd number of bytes from UTF-16-LE encoded console"
- )
-
- buffer = get_buffer(b, writable=True)
- code_units_to_be_read = bytes_to_be_read // 2
- code_units_read = c_ulong()
-
- rv = ReadConsoleW(
- HANDLE(self.handle),
- buffer,
- code_units_to_be_read,
- byref(code_units_read),
- None,
- )
- if GetLastError() == ERROR_OPERATION_ABORTED:
- # wait for KeyboardInterrupt
- time.sleep(0.1)
- if not rv:
- raise OSError(f"Windows error: {GetLastError()}")
-
- if buffer[0] == EOF:
- return 0
- return 2 * code_units_read.value
-
-
-class _WindowsConsoleWriter(_WindowsConsoleRawIOBase):
- def writable(self):
- return True
-
- @staticmethod
- def _get_error_message(errno):
- if errno == ERROR_SUCCESS:
- return "ERROR_SUCCESS"
- elif errno == ERROR_NOT_ENOUGH_MEMORY:
- return "ERROR_NOT_ENOUGH_MEMORY"
- return f"Windows error {errno}"
-
- def write(self, b):
- bytes_to_be_written = len(b)
- buf = get_buffer(b)
- code_units_to_be_written = min(bytes_to_be_written, MAX_BYTES_WRITTEN) // 2
- code_units_written = c_ulong()
-
- WriteConsoleW(
- HANDLE(self.handle),
- buf,
- code_units_to_be_written,
- byref(code_units_written),
- None,
- )
- bytes_written = 2 * code_units_written.value
-
- if bytes_written == 0 and bytes_to_be_written > 0:
- raise OSError(self._get_error_message(GetLastError()))
- return bytes_written
-
-
-class ConsoleStream:
- def __init__(self, text_stream: t.TextIO, byte_stream: t.BinaryIO) -> None:
- self._text_stream = text_stream
- self.buffer = byte_stream
-
- @property
- def name(self) -> str:
- return self.buffer.name
-
- def write(self, x: t.AnyStr) -> int:
- if isinstance(x, str):
- return self._text_stream.write(x)
- try:
- self.flush()
- except Exception:
- pass
- return self.buffer.write(x)
-
- def writelines(self, lines: t.Iterable[t.AnyStr]) -> None:
- for line in lines:
- self.write(line)
-
- def __getattr__(self, name: str) -> t.Any:
- return getattr(self._text_stream, name)
-
- def isatty(self) -> bool:
- return self.buffer.isatty()
-
- def __repr__(self):
- return f"<ConsoleStream name={self.name!r} encoding={self.encoding!r}>"
-
-
-def _get_text_stdin(buffer_stream: t.BinaryIO) -> t.TextIO:
- text_stream = _NonClosingTextIOWrapper(
- io.BufferedReader(_WindowsConsoleReader(STDIN_HANDLE)),
- "utf-16-le",
- "strict",
- line_buffering=True,
- )
- return t.cast(t.TextIO, ConsoleStream(text_stream, buffer_stream))
-
-
-def _get_text_stdout(buffer_stream: t.BinaryIO) -> t.TextIO:
- text_stream = _NonClosingTextIOWrapper(
- io.BufferedWriter(_WindowsConsoleWriter(STDOUT_HANDLE)),
- "utf-16-le",
- "strict",
- line_buffering=True,
- )
- return t.cast(t.TextIO, ConsoleStream(text_stream, buffer_stream))
-
-
-def _get_text_stderr(buffer_stream: t.BinaryIO) -> t.TextIO:
- text_stream = _NonClosingTextIOWrapper(
- io.BufferedWriter(_WindowsConsoleWriter(STDERR_HANDLE)),
- "utf-16-le",
- "strict",
- line_buffering=True,
- )
- return t.cast(t.TextIO, ConsoleStream(text_stream, buffer_stream))
-
-
-_stream_factories: t.Mapping[int, t.Callable[[t.BinaryIO], t.TextIO]] = {
- 0: _get_text_stdin,
- 1: _get_text_stdout,
- 2: _get_text_stderr,
-}
-
-
-def _is_console(f: t.TextIO) -> bool:
- if not hasattr(f, "fileno"):
- return False
-
- try:
- fileno = f.fileno()
- except (OSError, io.UnsupportedOperation):
- return False
-
- handle = msvcrt.get_osfhandle(fileno)
- return bool(GetConsoleMode(handle, byref(DWORD())))
-
-
-def _get_windows_console_stream(
- f: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]
-) -> t.Optional[t.TextIO]:
- if (
- get_buffer is not None
- and encoding in {"utf-16-le", None}
- and errors in {"strict", None}
- and _is_console(f)
- ):
- func = _stream_factories.get(f.fileno())
- if func is not None:
- b = getattr(f, "buffer", None)
-
- if b is None:
- return None
-
- return func(b)