summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/click/testing.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/click/testing.py')
-rw-r--r--venv/lib/python3.11/site-packages/click/testing.py479
1 files changed, 0 insertions, 479 deletions
diff --git a/venv/lib/python3.11/site-packages/click/testing.py b/venv/lib/python3.11/site-packages/click/testing.py
deleted file mode 100644
index e0df0d2..0000000
--- a/venv/lib/python3.11/site-packages/click/testing.py
+++ /dev/null
@@ -1,479 +0,0 @@
-import contextlib
-import io
-import os
-import shlex
-import shutil
-import sys
-import tempfile
-import typing as t
-from types import TracebackType
-
-from . import formatting
-from . import termui
-from . import utils
-from ._compat import _find_binary_reader
-
-if t.TYPE_CHECKING:
- from .core import BaseCommand
-
-
-class EchoingStdin:
- def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None:
- self._input = input
- self._output = output
- self._paused = False
-
- def __getattr__(self, x: str) -> t.Any:
- return getattr(self._input, x)
-
- def _echo(self, rv: bytes) -> bytes:
- if not self._paused:
- self._output.write(rv)
-
- return rv
-
- def read(self, n: int = -1) -> bytes:
- return self._echo(self._input.read(n))
-
- def read1(self, n: int = -1) -> bytes:
- return self._echo(self._input.read1(n)) # type: ignore
-
- def readline(self, n: int = -1) -> bytes:
- return self._echo(self._input.readline(n))
-
- def readlines(self) -> t.List[bytes]:
- return [self._echo(x) for x in self._input.readlines()]
-
- def __iter__(self) -> t.Iterator[bytes]:
- return iter(self._echo(x) for x in self._input)
-
- def __repr__(self) -> str:
- return repr(self._input)
-
-
-@contextlib.contextmanager
-def _pause_echo(stream: t.Optional[EchoingStdin]) -> t.Iterator[None]:
- if stream is None:
- yield
- else:
- stream._paused = True
- yield
- stream._paused = False
-
-
-class _NamedTextIOWrapper(io.TextIOWrapper):
- def __init__(
- self, buffer: t.BinaryIO, name: str, mode: str, **kwargs: t.Any
- ) -> None:
- super().__init__(buffer, **kwargs)
- self._name = name
- self._mode = mode
-
- @property
- def name(self) -> str:
- return self._name
-
- @property
- def mode(self) -> str:
- return self._mode
-
-
-def make_input_stream(
- input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]], charset: str
-) -> t.BinaryIO:
- # Is already an input stream.
- if hasattr(input, "read"):
- rv = _find_binary_reader(t.cast(t.IO[t.Any], input))
-
- if rv is not None:
- return rv
-
- raise TypeError("Could not find binary reader for input stream.")
-
- if input is None:
- input = b""
- elif isinstance(input, str):
- input = input.encode(charset)
-
- return io.BytesIO(input)
-
-
-class Result:
- """Holds the captured result of an invoked CLI script."""
-
- def __init__(
- self,
- runner: "CliRunner",
- stdout_bytes: bytes,
- stderr_bytes: t.Optional[bytes],
- return_value: t.Any,
- exit_code: int,
- exception: t.Optional[BaseException],
- exc_info: t.Optional[
- t.Tuple[t.Type[BaseException], BaseException, TracebackType]
- ] = None,
- ):
- #: The runner that created the result
- self.runner = runner
- #: The standard output as bytes.
- self.stdout_bytes = stdout_bytes
- #: The standard error as bytes, or None if not available
- self.stderr_bytes = stderr_bytes
- #: The value returned from the invoked command.
- #:
- #: .. versionadded:: 8.0
- self.return_value = return_value
- #: The exit code as integer.
- self.exit_code = exit_code
- #: The exception that happened if one did.
- self.exception = exception
- #: The traceback
- self.exc_info = exc_info
-
- @property
- def output(self) -> str:
- """The (standard) output as unicode string."""
- return self.stdout
-
- @property
- def stdout(self) -> str:
- """The standard output as unicode string."""
- return self.stdout_bytes.decode(self.runner.charset, "replace").replace(
- "\r\n", "\n"
- )
-
- @property
- def stderr(self) -> str:
- """The standard error as unicode string."""
- if self.stderr_bytes is None:
- raise ValueError("stderr not separately captured")
- return self.stderr_bytes.decode(self.runner.charset, "replace").replace(
- "\r\n", "\n"
- )
-
- def __repr__(self) -> str:
- exc_str = repr(self.exception) if self.exception else "okay"
- return f"<{type(self).__name__} {exc_str}>"
-
-
-class CliRunner:
- """The CLI runner provides functionality to invoke a Click command line
- script for unittesting purposes in a isolated environment. This only
- works in single-threaded systems without any concurrency as it changes the
- global interpreter state.
-
- :param charset: the character set for the input and output data.
- :param env: a dictionary with environment variables for overriding.
- :param echo_stdin: if this is set to `True`, then reading from stdin writes
- to stdout. This is useful for showing examples in
- some circumstances. Note that regular prompts
- will automatically echo the input.
- :param mix_stderr: if this is set to `False`, then stdout and stderr are
- preserved as independent streams. This is useful for
- Unix-philosophy apps that have predictable stdout and
- noisy stderr, such that each may be measured
- independently
- """
-
- def __init__(
- self,
- charset: str = "utf-8",
- env: t.Optional[t.Mapping[str, t.Optional[str]]] = None,
- echo_stdin: bool = False,
- mix_stderr: bool = True,
- ) -> None:
- self.charset = charset
- self.env: t.Mapping[str, t.Optional[str]] = env or {}
- self.echo_stdin = echo_stdin
- self.mix_stderr = mix_stderr
-
- def get_default_prog_name(self, cli: "BaseCommand") -> str:
- """Given a command object it will return the default program name
- for it. The default is the `name` attribute or ``"root"`` if not
- set.
- """
- return cli.name or "root"
-
- def make_env(
- self, overrides: t.Optional[t.Mapping[str, t.Optional[str]]] = None
- ) -> t.Mapping[str, t.Optional[str]]:
- """Returns the environment overrides for invoking a script."""
- rv = dict(self.env)
- if overrides:
- rv.update(overrides)
- return rv
-
- @contextlib.contextmanager
- def isolation(
- self,
- input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]] = None,
- env: t.Optional[t.Mapping[str, t.Optional[str]]] = None,
- color: bool = False,
- ) -> t.Iterator[t.Tuple[io.BytesIO, t.Optional[io.BytesIO]]]:
- """A context manager that sets up the isolation for invoking of a
- command line tool. This sets up stdin with the given input data
- and `os.environ` with the overrides from the given dictionary.
- This also rebinds some internals in Click to be mocked (like the
- prompt functionality).
-
- This is automatically done in the :meth:`invoke` method.
-
- :param input: the input stream to put into sys.stdin.
- :param env: the environment overrides as dictionary.
- :param color: whether the output should contain color codes. The
- application can still override this explicitly.
-
- .. versionchanged:: 8.0
- ``stderr`` is opened with ``errors="backslashreplace"``
- instead of the default ``"strict"``.
-
- .. versionchanged:: 4.0
- Added the ``color`` parameter.
- """
- bytes_input = make_input_stream(input, self.charset)
- echo_input = None
-
- old_stdin = sys.stdin
- old_stdout = sys.stdout
- old_stderr = sys.stderr
- old_forced_width = formatting.FORCED_WIDTH
- formatting.FORCED_WIDTH = 80
-
- env = self.make_env(env)
-
- bytes_output = io.BytesIO()
-
- if self.echo_stdin:
- bytes_input = echo_input = t.cast(
- t.BinaryIO, EchoingStdin(bytes_input, bytes_output)
- )
-
- sys.stdin = text_input = _NamedTextIOWrapper(
- bytes_input, encoding=self.charset, name="<stdin>", mode="r"
- )
-
- if self.echo_stdin:
- # Force unbuffered reads, otherwise TextIOWrapper reads a
- # large chunk which is echoed early.
- text_input._CHUNK_SIZE = 1 # type: ignore
-
- sys.stdout = _NamedTextIOWrapper(
- bytes_output, encoding=self.charset, name="<stdout>", mode="w"
- )
-
- bytes_error = None
- if self.mix_stderr:
- sys.stderr = sys.stdout
- else:
- bytes_error = io.BytesIO()
- sys.stderr = _NamedTextIOWrapper(
- bytes_error,
- encoding=self.charset,
- name="<stderr>",
- mode="w",
- errors="backslashreplace",
- )
-
- @_pause_echo(echo_input) # type: ignore
- def visible_input(prompt: t.Optional[str] = None) -> str:
- sys.stdout.write(prompt or "")
- val = text_input.readline().rstrip("\r\n")
- sys.stdout.write(f"{val}\n")
- sys.stdout.flush()
- return val
-
- @_pause_echo(echo_input) # type: ignore
- def hidden_input(prompt: t.Optional[str] = None) -> str:
- sys.stdout.write(f"{prompt or ''}\n")
- sys.stdout.flush()
- return text_input.readline().rstrip("\r\n")
-
- @_pause_echo(echo_input) # type: ignore
- def _getchar(echo: bool) -> str:
- char = sys.stdin.read(1)
-
- if echo:
- sys.stdout.write(char)
-
- sys.stdout.flush()
- return char
-
- default_color = color
-
- def should_strip_ansi(
- stream: t.Optional[t.IO[t.Any]] = None, color: t.Optional[bool] = None
- ) -> bool:
- if color is None:
- return not default_color
- return not color
-
- old_visible_prompt_func = termui.visible_prompt_func
- old_hidden_prompt_func = termui.hidden_prompt_func
- old__getchar_func = termui._getchar
- old_should_strip_ansi = utils.should_strip_ansi # type: ignore
- termui.visible_prompt_func = visible_input
- termui.hidden_prompt_func = hidden_input
- termui._getchar = _getchar
- utils.should_strip_ansi = should_strip_ansi # type: ignore
-
- old_env = {}
- try:
- for key, value in env.items():
- old_env[key] = os.environ.get(key)
- if value is None:
- try:
- del os.environ[key]
- except Exception:
- pass
- else:
- os.environ[key] = value
- yield (bytes_output, bytes_error)
- finally:
- for key, value in old_env.items():
- if value is None:
- try:
- del os.environ[key]
- except Exception:
- pass
- else:
- os.environ[key] = value
- sys.stdout = old_stdout
- sys.stderr = old_stderr
- sys.stdin = old_stdin
- termui.visible_prompt_func = old_visible_prompt_func
- termui.hidden_prompt_func = old_hidden_prompt_func
- termui._getchar = old__getchar_func
- utils.should_strip_ansi = old_should_strip_ansi # type: ignore
- formatting.FORCED_WIDTH = old_forced_width
-
- def invoke(
- self,
- cli: "BaseCommand",
- args: t.Optional[t.Union[str, t.Sequence[str]]] = None,
- input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]] = None,
- env: t.Optional[t.Mapping[str, t.Optional[str]]] = None,
- catch_exceptions: bool = True,
- color: bool = False,
- **extra: t.Any,
- ) -> Result:
- """Invokes a command in an isolated environment. The arguments are
- forwarded directly to the command line script, the `extra` keyword
- arguments are passed to the :meth:`~clickpkg.Command.main` function of
- the command.
-
- This returns a :class:`Result` object.
-
- :param cli: the command to invoke
- :param args: the arguments to invoke. It may be given as an iterable
- or a string. When given as string it will be interpreted
- as a Unix shell command. More details at
- :func:`shlex.split`.
- :param input: the input data for `sys.stdin`.
- :param env: the environment overrides.
- :param catch_exceptions: Whether to catch any other exceptions than
- ``SystemExit``.
- :param extra: the keyword arguments to pass to :meth:`main`.
- :param color: whether the output should contain color codes. The
- application can still override this explicitly.
-
- .. versionchanged:: 8.0
- The result object has the ``return_value`` attribute with
- the value returned from the invoked command.
-
- .. versionchanged:: 4.0
- Added the ``color`` parameter.
-
- .. versionchanged:: 3.0
- Added the ``catch_exceptions`` parameter.
-
- .. versionchanged:: 3.0
- The result object has the ``exc_info`` attribute with the
- traceback if available.
- """
- exc_info = None
- with self.isolation(input=input, env=env, color=color) as outstreams:
- return_value = None
- exception: t.Optional[BaseException] = None
- exit_code = 0
-
- if isinstance(args, str):
- args = shlex.split(args)
-
- try:
- prog_name = extra.pop("prog_name")
- except KeyError:
- prog_name = self.get_default_prog_name(cli)
-
- try:
- return_value = cli.main(args=args or (), prog_name=prog_name, **extra)
- except SystemExit as e:
- exc_info = sys.exc_info()
- e_code = t.cast(t.Optional[t.Union[int, t.Any]], e.code)
-
- if e_code is None:
- e_code = 0
-
- if e_code != 0:
- exception = e
-
- if not isinstance(e_code, int):
- sys.stdout.write(str(e_code))
- sys.stdout.write("\n")
- e_code = 1
-
- exit_code = e_code
-
- except Exception as e:
- if not catch_exceptions:
- raise
- exception = e
- exit_code = 1
- exc_info = sys.exc_info()
- finally:
- sys.stdout.flush()
- stdout = outstreams[0].getvalue()
- if self.mix_stderr:
- stderr = None
- else:
- stderr = outstreams[1].getvalue() # type: ignore
-
- return Result(
- runner=self,
- stdout_bytes=stdout,
- stderr_bytes=stderr,
- return_value=return_value,
- exit_code=exit_code,
- exception=exception,
- exc_info=exc_info, # type: ignore
- )
-
- @contextlib.contextmanager
- def isolated_filesystem(
- self, temp_dir: t.Optional[t.Union[str, "os.PathLike[str]"]] = None
- ) -> t.Iterator[str]:
- """A context manager that creates a temporary directory and
- changes the current working directory to it. This isolates tests
- that affect the contents of the CWD to prevent them from
- interfering with each other.
-
- :param temp_dir: Create the temporary directory under this
- directory. If given, the created directory is not removed
- when exiting.
-
- .. versionchanged:: 8.0
- Added the ``temp_dir`` parameter.
- """
- cwd = os.getcwd()
- dt = tempfile.mkdtemp(dir=temp_dir)
- os.chdir(dt)
-
- try:
- yield dt
- finally:
- os.chdir(cwd)
-
- if temp_dir is None:
- try:
- shutil.rmtree(dt)
- except OSError: # noqa: B014
- pass