summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/click/_compat.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/click/_compat.py')
-rw-r--r--venv/lib/python3.11/site-packages/click/_compat.py623
1 files changed, 0 insertions, 623 deletions
diff --git a/venv/lib/python3.11/site-packages/click/_compat.py b/venv/lib/python3.11/site-packages/click/_compat.py
deleted file mode 100644
index 23f8866..0000000
--- a/venv/lib/python3.11/site-packages/click/_compat.py
+++ /dev/null
@@ -1,623 +0,0 @@
-import codecs
-import io
-import os
-import re
-import sys
-import typing as t
-from weakref import WeakKeyDictionary
-
-CYGWIN = sys.platform.startswith("cygwin")
-WIN = sys.platform.startswith("win")
-auto_wrap_for_ansi: t.Optional[t.Callable[[t.TextIO], t.TextIO]] = None
-_ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]")
-
-
-def _make_text_stream(
- stream: t.BinaryIO,
- encoding: t.Optional[str],
- errors: t.Optional[str],
- force_readable: bool = False,
- force_writable: bool = False,
-) -> t.TextIO:
- if encoding is None:
- encoding = get_best_encoding(stream)
- if errors is None:
- errors = "replace"
- return _NonClosingTextIOWrapper(
- stream,
- encoding,
- errors,
- line_buffering=True,
- force_readable=force_readable,
- force_writable=force_writable,
- )
-
-
-def is_ascii_encoding(encoding: str) -> bool:
- """Checks if a given encoding is ascii."""
- try:
- return codecs.lookup(encoding).name == "ascii"
- except LookupError:
- return False
-
-
-def get_best_encoding(stream: t.IO[t.Any]) -> str:
- """Returns the default stream encoding if not found."""
- rv = getattr(stream, "encoding", None) or sys.getdefaultencoding()
- if is_ascii_encoding(rv):
- return "utf-8"
- return rv
-
-
-class _NonClosingTextIOWrapper(io.TextIOWrapper):
- def __init__(
- self,
- stream: t.BinaryIO,
- encoding: t.Optional[str],
- errors: t.Optional[str],
- force_readable: bool = False,
- force_writable: bool = False,
- **extra: t.Any,
- ) -> None:
- self._stream = stream = t.cast(
- t.BinaryIO, _FixupStream(stream, force_readable, force_writable)
- )
- super().__init__(stream, encoding, errors, **extra)
-
- def __del__(self) -> None:
- try:
- self.detach()
- except Exception:
- pass
-
- def isatty(self) -> bool:
- # https://bitbucket.org/pypy/pypy/issue/1803
- return self._stream.isatty()
-
-
-class _FixupStream:
- """The new io interface needs more from streams than streams
- traditionally implement. As such, this fix-up code is necessary in
- some circumstances.
-
- The forcing of readable and writable flags are there because some tools
- put badly patched objects on sys (one such offender are certain version
- of jupyter notebook).
- """
-
- def __init__(
- self,
- stream: t.BinaryIO,
- force_readable: bool = False,
- force_writable: bool = False,
- ):
- self._stream = stream
- self._force_readable = force_readable
- self._force_writable = force_writable
-
- def __getattr__(self, name: str) -> t.Any:
- return getattr(self._stream, name)
-
- def read1(self, size: int) -> bytes:
- f = getattr(self._stream, "read1", None)
-
- if f is not None:
- return t.cast(bytes, f(size))
-
- return self._stream.read(size)
-
- def readable(self) -> bool:
- if self._force_readable:
- return True
- x = getattr(self._stream, "readable", None)
- if x is not None:
- return t.cast(bool, x())
- try:
- self._stream.read(0)
- except Exception:
- return False
- return True
-
- def writable(self) -> bool:
- if self._force_writable:
- return True
- x = getattr(self._stream, "writable", None)
- if x is not None:
- return t.cast(bool, x())
- try:
- self._stream.write("") # type: ignore
- except Exception:
- try:
- self._stream.write(b"")
- except Exception:
- return False
- return True
-
- def seekable(self) -> bool:
- x = getattr(self._stream, "seekable", None)
- if x is not None:
- return t.cast(bool, x())
- try:
- self._stream.seek(self._stream.tell())
- except Exception:
- return False
- return True
-
-
-def _is_binary_reader(stream: t.IO[t.Any], default: bool = False) -> bool:
- try:
- return isinstance(stream.read(0), bytes)
- except Exception:
- return default
- # This happens in some cases where the stream was already
- # closed. In this case, we assume the default.
-
-
-def _is_binary_writer(stream: t.IO[t.Any], default: bool = False) -> bool:
- try:
- stream.write(b"")
- except Exception:
- try:
- stream.write("")
- return False
- except Exception:
- pass
- return default
- return True
-
-
-def _find_binary_reader(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]:
- # We need to figure out if the given stream is already binary.
- # This can happen because the official docs recommend detaching
- # the streams to get binary streams. Some code might do this, so
- # we need to deal with this case explicitly.
- if _is_binary_reader(stream, False):
- return t.cast(t.BinaryIO, stream)
-
- buf = getattr(stream, "buffer", None)
-
- # Same situation here; this time we assume that the buffer is
- # actually binary in case it's closed.
- if buf is not None and _is_binary_reader(buf, True):
- return t.cast(t.BinaryIO, buf)
-
- return None
-
-
-def _find_binary_writer(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]:
- # We need to figure out if the given stream is already binary.
- # This can happen because the official docs recommend detaching
- # the streams to get binary streams. Some code might do this, so
- # we need to deal with this case explicitly.
- if _is_binary_writer(stream, False):
- return t.cast(t.BinaryIO, stream)
-
- buf = getattr(stream, "buffer", None)
-
- # Same situation here; this time we assume that the buffer is
- # actually binary in case it's closed.
- if buf is not None and _is_binary_writer(buf, True):
- return t.cast(t.BinaryIO, buf)
-
- return None
-
-
-def _stream_is_misconfigured(stream: t.TextIO) -> bool:
- """A stream is misconfigured if its encoding is ASCII."""
- # If the stream does not have an encoding set, we assume it's set
- # to ASCII. This appears to happen in certain unittest
- # environments. It's not quite clear what the correct behavior is
- # but this at least will force Click to recover somehow.
- return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii")
-
-
-def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: t.Optional[str]) -> bool:
- """A stream attribute is compatible if it is equal to the
- desired value or the desired value is unset and the attribute
- has a value.
- """
- stream_value = getattr(stream, attr, None)
- return stream_value == value or (value is None and stream_value is not None)
-
-
-def _is_compatible_text_stream(
- stream: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]
-) -> bool:
- """Check if a stream's encoding and errors attributes are
- compatible with the desired values.
- """
- return _is_compat_stream_attr(
- stream, "encoding", encoding
- ) and _is_compat_stream_attr(stream, "errors", errors)
-
-
-def _force_correct_text_stream(
- text_stream: t.IO[t.Any],
- encoding: t.Optional[str],
- errors: t.Optional[str],
- is_binary: t.Callable[[t.IO[t.Any], bool], bool],
- find_binary: t.Callable[[t.IO[t.Any]], t.Optional[t.BinaryIO]],
- force_readable: bool = False,
- force_writable: bool = False,
-) -> t.TextIO:
- if is_binary(text_stream, False):
- binary_reader = t.cast(t.BinaryIO, text_stream)
- else:
- text_stream = t.cast(t.TextIO, text_stream)
- # If the stream looks compatible, and won't default to a
- # misconfigured ascii encoding, return it as-is.
- if _is_compatible_text_stream(text_stream, encoding, errors) and not (
- encoding is None and _stream_is_misconfigured(text_stream)
- ):
- return text_stream
-
- # Otherwise, get the underlying binary reader.
- possible_binary_reader = find_binary(text_stream)
-
- # If that's not possible, silently use the original reader
- # and get mojibake instead of exceptions.
- if possible_binary_reader is None:
- return text_stream
-
- binary_reader = possible_binary_reader
-
- # Default errors to replace instead of strict in order to get
- # something that works.
- if errors is None:
- errors = "replace"
-
- # Wrap the binary stream in a text stream with the correct
- # encoding parameters.
- return _make_text_stream(
- binary_reader,
- encoding,
- errors,
- force_readable=force_readable,
- force_writable=force_writable,
- )
-
-
-def _force_correct_text_reader(
- text_reader: t.IO[t.Any],
- encoding: t.Optional[str],
- errors: t.Optional[str],
- force_readable: bool = False,
-) -> t.TextIO:
- return _force_correct_text_stream(
- text_reader,
- encoding,
- errors,
- _is_binary_reader,
- _find_binary_reader,
- force_readable=force_readable,
- )
-
-
-def _force_correct_text_writer(
- text_writer: t.IO[t.Any],
- encoding: t.Optional[str],
- errors: t.Optional[str],
- force_writable: bool = False,
-) -> t.TextIO:
- return _force_correct_text_stream(
- text_writer,
- encoding,
- errors,
- _is_binary_writer,
- _find_binary_writer,
- force_writable=force_writable,
- )
-
-
-def get_binary_stdin() -> t.BinaryIO:
- reader = _find_binary_reader(sys.stdin)
- if reader is None:
- raise RuntimeError("Was not able to determine binary stream for sys.stdin.")
- return reader
-
-
-def get_binary_stdout() -> t.BinaryIO:
- writer = _find_binary_writer(sys.stdout)
- if writer is None:
- raise RuntimeError("Was not able to determine binary stream for sys.stdout.")
- return writer
-
-
-def get_binary_stderr() -> t.BinaryIO:
- writer = _find_binary_writer(sys.stderr)
- if writer is None:
- raise RuntimeError("Was not able to determine binary stream for sys.stderr.")
- return writer
-
-
-def get_text_stdin(
- encoding: t.Optional[str] = None, errors: t.Optional[str] = None
-) -> t.TextIO:
- rv = _get_windows_console_stream(sys.stdin, encoding, errors)
- if rv is not None:
- return rv
- return _force_correct_text_reader(sys.stdin, encoding, errors, force_readable=True)
-
-
-def get_text_stdout(
- encoding: t.Optional[str] = None, errors: t.Optional[str] = None
-) -> t.TextIO:
- rv = _get_windows_console_stream(sys.stdout, encoding, errors)
- if rv is not None:
- return rv
- return _force_correct_text_writer(sys.stdout, encoding, errors, force_writable=True)
-
-
-def get_text_stderr(
- encoding: t.Optional[str] = None, errors: t.Optional[str] = None
-) -> t.TextIO:
- rv = _get_windows_console_stream(sys.stderr, encoding, errors)
- if rv is not None:
- return rv
- return _force_correct_text_writer(sys.stderr, encoding, errors, force_writable=True)
-
-
-def _wrap_io_open(
- file: t.Union[str, "os.PathLike[str]", int],
- mode: str,
- encoding: t.Optional[str],
- errors: t.Optional[str],
-) -> t.IO[t.Any]:
- """Handles not passing ``encoding`` and ``errors`` in binary mode."""
- if "b" in mode:
- return open(file, mode)
-
- return open(file, mode, encoding=encoding, errors=errors)
-
-
-def open_stream(
- filename: "t.Union[str, os.PathLike[str]]",
- mode: str = "r",
- encoding: t.Optional[str] = None,
- errors: t.Optional[str] = "strict",
- atomic: bool = False,
-) -> t.Tuple[t.IO[t.Any], bool]:
- binary = "b" in mode
- filename = os.fspath(filename)
-
- # Standard streams first. These are simple because they ignore the
- # atomic flag. Use fsdecode to handle Path("-").
- if os.fsdecode(filename) == "-":
- if any(m in mode for m in ["w", "a", "x"]):
- if binary:
- return get_binary_stdout(), False
- return get_text_stdout(encoding=encoding, errors=errors), False
- if binary:
- return get_binary_stdin(), False
- return get_text_stdin(encoding=encoding, errors=errors), False
-
- # Non-atomic writes directly go out through the regular open functions.
- if not atomic:
- return _wrap_io_open(filename, mode, encoding, errors), True
-
- # Some usability stuff for atomic writes
- if "a" in mode:
- raise ValueError(
- "Appending to an existing file is not supported, because that"
- " would involve an expensive `copy`-operation to a temporary"
- " file. Open the file in normal `w`-mode and copy explicitly"
- " if that's what you're after."
- )
- if "x" in mode:
- raise ValueError("Use the `overwrite`-parameter instead.")
- if "w" not in mode:
- raise ValueError("Atomic writes only make sense with `w`-mode.")
-
- # Atomic writes are more complicated. They work by opening a file
- # as a proxy in the same folder and then using the fdopen
- # functionality to wrap it in a Python file. Then we wrap it in an
- # atomic file that moves the file over on close.
- import errno
- import random
-
- try:
- perm: t.Optional[int] = os.stat(filename).st_mode
- except OSError:
- perm = None
-
- flags = os.O_RDWR | os.O_CREAT | os.O_EXCL
-
- if binary:
- flags |= getattr(os, "O_BINARY", 0)
-
- while True:
- tmp_filename = os.path.join(
- os.path.dirname(filename),
- f".__atomic-write{random.randrange(1 << 32):08x}",
- )
- try:
- fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm)
- break
- except OSError as e:
- if e.errno == errno.EEXIST or (
- os.name == "nt"
- and e.errno == errno.EACCES
- and os.path.isdir(e.filename)
- and os.access(e.filename, os.W_OK)
- ):
- continue
- raise
-
- if perm is not None:
- os.chmod(tmp_filename, perm) # in case perm includes bits in umask
-
- f = _wrap_io_open(fd, mode, encoding, errors)
- af = _AtomicFile(f, tmp_filename, os.path.realpath(filename))
- return t.cast(t.IO[t.Any], af), True
-
-
-class _AtomicFile:
- def __init__(self, f: t.IO[t.Any], tmp_filename: str, real_filename: str) -> None:
- self._f = f
- self._tmp_filename = tmp_filename
- self._real_filename = real_filename
- self.closed = False
-
- @property
- def name(self) -> str:
- return self._real_filename
-
- def close(self, delete: bool = False) -> None:
- if self.closed:
- return
- self._f.close()
- os.replace(self._tmp_filename, self._real_filename)
- self.closed = True
-
- def __getattr__(self, name: str) -> t.Any:
- return getattr(self._f, name)
-
- def __enter__(self) -> "_AtomicFile":
- return self
-
- def __exit__(self, exc_type: t.Optional[t.Type[BaseException]], *_: t.Any) -> None:
- self.close(delete=exc_type is not None)
-
- def __repr__(self) -> str:
- return repr(self._f)
-
-
-def strip_ansi(value: str) -> str:
- return _ansi_re.sub("", value)
-
-
-def _is_jupyter_kernel_output(stream: t.IO[t.Any]) -> bool:
- while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)):
- stream = stream._stream
-
- return stream.__class__.__module__.startswith("ipykernel.")
-
-
-def should_strip_ansi(
- stream: t.Optional[t.IO[t.Any]] = None, color: t.Optional[bool] = None
-) -> bool:
- if color is None:
- if stream is None:
- stream = sys.stdin
- return not isatty(stream) and not _is_jupyter_kernel_output(stream)
- return not color
-
-
-# On Windows, wrap the output streams with colorama to support ANSI
-# color codes.
-# NOTE: double check is needed so mypy does not analyze this on Linux
-if sys.platform.startswith("win") and WIN:
- from ._winconsole import _get_windows_console_stream
-
- def _get_argv_encoding() -> str:
- import locale
-
- return locale.getpreferredencoding()
-
- _ansi_stream_wrappers: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
-
- def auto_wrap_for_ansi( # noqa: F811
- stream: t.TextIO, color: t.Optional[bool] = None
- ) -> t.TextIO:
- """Support ANSI color and style codes on Windows by wrapping a
- stream with colorama.
- """
- try:
- cached = _ansi_stream_wrappers.get(stream)
- except Exception:
- cached = None
-
- if cached is not None:
- return cached
-
- import colorama
-
- strip = should_strip_ansi(stream, color)
- ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip)
- rv = t.cast(t.TextIO, ansi_wrapper.stream)
- _write = rv.write
-
- def _safe_write(s):
- try:
- return _write(s)
- except BaseException:
- ansi_wrapper.reset_all()
- raise
-
- rv.write = _safe_write
-
- try:
- _ansi_stream_wrappers[stream] = rv
- except Exception:
- pass
-
- return rv
-
-else:
-
- def _get_argv_encoding() -> str:
- return getattr(sys.stdin, "encoding", None) or sys.getfilesystemencoding()
-
- def _get_windows_console_stream(
- f: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]
- ) -> t.Optional[t.TextIO]:
- return None
-
-
-def term_len(x: str) -> int:
- return len(strip_ansi(x))
-
-
-def isatty(stream: t.IO[t.Any]) -> bool:
- try:
- return stream.isatty()
- except Exception:
- return False
-
-
-def _make_cached_stream_func(
- src_func: t.Callable[[], t.Optional[t.TextIO]],
- wrapper_func: t.Callable[[], t.TextIO],
-) -> t.Callable[[], t.Optional[t.TextIO]]:
- cache: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
-
- def func() -> t.Optional[t.TextIO]:
- stream = src_func()
-
- if stream is None:
- return None
-
- try:
- rv = cache.get(stream)
- except Exception:
- rv = None
- if rv is not None:
- return rv
- rv = wrapper_func()
- try:
- cache[stream] = rv
- except Exception:
- pass
- return rv
-
- return func
-
-
-_default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin)
-_default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout)
-_default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr)
-
-
-binary_streams: t.Mapping[str, t.Callable[[], t.BinaryIO]] = {
- "stdin": get_binary_stdin,
- "stdout": get_binary_stdout,
- "stderr": get_binary_stderr,
-}
-
-text_streams: t.Mapping[
- str, t.Callable[[t.Optional[str], t.Optional[str]], t.TextIO]
-] = {
- "stdin": get_text_stdin,
- "stdout": get_text_stdout,
- "stderr": get_text_stderr,
-}