diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/pip/_internal/req')
12 files changed, 0 insertions, 2913 deletions
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/req/__init__.py b/venv/lib/python3.11/site-packages/pip/_internal/req/__init__.py deleted file mode 100644 index 16de903..0000000 --- a/venv/lib/python3.11/site-packages/pip/_internal/req/__init__.py +++ /dev/null @@ -1,92 +0,0 @@ -import collections -import logging -from typing import Generator, List, Optional, Sequence, Tuple - -from pip._internal.utils.logging import indent_log - -from .req_file import parse_requirements -from .req_install import InstallRequirement -from .req_set import RequirementSet - -__all__ = [ - "RequirementSet", - "InstallRequirement", - "parse_requirements", - "install_given_reqs", -] - -logger = logging.getLogger(__name__) - - -class InstallationResult: - def __init__(self, name: str) -> None: - self.name = name - - def __repr__(self) -> str: - return f"InstallationResult(name={self.name!r})" - - -def _validate_requirements( - requirements: List[InstallRequirement], -) -> Generator[Tuple[str, InstallRequirement], None, None]: - for req in requirements: - assert req.name, f"invalid to-be-installed requirement: {req}" - yield req.name, req - - -def install_given_reqs( - requirements: List[InstallRequirement], - global_options: Sequence[str], - root: Optional[str], - home: Optional[str], - prefix: Optional[str], - warn_script_location: bool, - use_user_site: bool, - pycompile: bool, -) -> List[InstallationResult]: - """ - Install everything in the given list. - - (to be called after having downloaded and unpacked the packages) - """ - to_install = collections.OrderedDict(_validate_requirements(requirements)) - - if to_install: - logger.info( - "Installing collected packages: %s", - ", ".join(to_install.keys()), - ) - - installed = [] - - with indent_log(): - for req_name, requirement in to_install.items(): - if requirement.should_reinstall: - logger.info("Attempting uninstall: %s", req_name) - with indent_log(): - uninstalled_pathset = requirement.uninstall(auto_confirm=True) - else: - uninstalled_pathset = None - - try: - requirement.install( - global_options, - root=root, - home=home, - prefix=prefix, - warn_script_location=warn_script_location, - use_user_site=use_user_site, - pycompile=pycompile, - ) - except Exception: - # if install did not succeed, rollback previous uninstall - if uninstalled_pathset and not requirement.install_succeeded: - uninstalled_pathset.rollback() - raise - else: - if uninstalled_pathset and requirement.install_succeeded: - uninstalled_pathset.commit() - - installed.append(InstallationResult(req_name)) - - return installed diff --git a/venv/lib/python3.11/site-packages/pip/_internal/req/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/req/__pycache__/__init__.cpython-311.pyc Binary files differdeleted file mode 100644 index 53d6f7d..0000000 --- a/venv/lib/python3.11/site-packages/pip/_internal/req/__pycache__/__init__.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/pip/_internal/req/__pycache__/constructors.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/req/__pycache__/constructors.cpython-311.pyc Binary files differdeleted file mode 100644 index 7404820..0000000 --- a/venv/lib/python3.11/site-packages/pip/_internal/req/__pycache__/constructors.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/pip/_internal/req/__pycache__/req_file.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/req/__pycache__/req_file.cpython-311.pyc Binary files differdeleted file mode 100644 index bee7822..0000000 --- a/venv/lib/python3.11/site-packages/pip/_internal/req/__pycache__/req_file.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/pip/_internal/req/__pycache__/req_install.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/req/__pycache__/req_install.cpython-311.pyc Binary files differdeleted file mode 100644 index 424829d..0000000 --- a/venv/lib/python3.11/site-packages/pip/_internal/req/__pycache__/req_install.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/pip/_internal/req/__pycache__/req_set.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/req/__pycache__/req_set.cpython-311.pyc Binary files differdeleted file mode 100644 index 5c280f3..0000000 --- a/venv/lib/python3.11/site-packages/pip/_internal/req/__pycache__/req_set.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/pip/_internal/req/__pycache__/req_uninstall.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/req/__pycache__/req_uninstall.cpython-311.pyc Binary files differdeleted file mode 100644 index a77e730..0000000 --- a/venv/lib/python3.11/site-packages/pip/_internal/req/__pycache__/req_uninstall.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/pip/_internal/req/constructors.py b/venv/lib/python3.11/site-packages/pip/_internal/req/constructors.py deleted file mode 100644 index 7e2d0e5..0000000 --- a/venv/lib/python3.11/site-packages/pip/_internal/req/constructors.py +++ /dev/null @@ -1,576 +0,0 @@ -"""Backing implementation for InstallRequirement's various constructors - -The idea here is that these formed a major chunk of InstallRequirement's size -so, moving them and support code dedicated to them outside of that class -helps creates for better understandability for the rest of the code. - -These are meant to be used elsewhere within pip to create instances of -InstallRequirement. -""" - -import copy -import logging -import os -import re -from typing import Collection, Dict, List, Optional, Set, Tuple, Union - -from pip._vendor.packaging.markers import Marker -from pip._vendor.packaging.requirements import InvalidRequirement, Requirement -from pip._vendor.packaging.specifiers import Specifier - -from pip._internal.exceptions import InstallationError -from pip._internal.models.index import PyPI, TestPyPI -from pip._internal.models.link import Link -from pip._internal.models.wheel import Wheel -from pip._internal.req.req_file import ParsedRequirement -from pip._internal.req.req_install import InstallRequirement -from pip._internal.utils.filetypes import is_archive_file -from pip._internal.utils.misc import is_installable_dir -from pip._internal.utils.packaging import get_requirement -from pip._internal.utils.urls import path_to_url -from pip._internal.vcs import is_url, vcs - -__all__ = [ - "install_req_from_editable", - "install_req_from_line", - "parse_editable", -] - -logger = logging.getLogger(__name__) -operators = Specifier._operators.keys() - - -def _strip_extras(path: str) -> Tuple[str, Optional[str]]: - m = re.match(r"^(.+)(\[[^\]]+\])$", path) - extras = None - if m: - path_no_extras = m.group(1) - extras = m.group(2) - else: - path_no_extras = path - - return path_no_extras, extras - - -def convert_extras(extras: Optional[str]) -> Set[str]: - if not extras: - return set() - return get_requirement("placeholder" + extras.lower()).extras - - -def _set_requirement_extras(req: Requirement, new_extras: Set[str]) -> Requirement: - """ - Returns a new requirement based on the given one, with the supplied extras. If the - given requirement already has extras those are replaced (or dropped if no new extras - are given). - """ - match: Optional[re.Match[str]] = re.fullmatch( - # see https://peps.python.org/pep-0508/#complete-grammar - r"([\w\t .-]+)(\[[^\]]*\])?(.*)", - str(req), - flags=re.ASCII, - ) - # ireq.req is a valid requirement so the regex should always match - assert ( - match is not None - ), f"regex match on requirement {req} failed, this should never happen" - pre: Optional[str] = match.group(1) - post: Optional[str] = match.group(3) - assert ( - pre is not None and post is not None - ), f"regex group selection for requirement {req} failed, this should never happen" - extras: str = "[%s]" % ",".join(sorted(new_extras)) if new_extras else "" - return Requirement(f"{pre}{extras}{post}") - - -def parse_editable(editable_req: str) -> Tuple[Optional[str], str, Set[str]]: - """Parses an editable requirement into: - - a requirement name - - an URL - - extras - - editable options - Accepted requirements: - svn+http://blahblah@rev#egg=Foobar[baz]&subdirectory=version_subdir - .[some_extra] - """ - - url = editable_req - - # If a file path is specified with extras, strip off the extras. - url_no_extras, extras = _strip_extras(url) - - if os.path.isdir(url_no_extras): - # Treating it as code that has already been checked out - url_no_extras = path_to_url(url_no_extras) - - if url_no_extras.lower().startswith("file:"): - package_name = Link(url_no_extras).egg_fragment - if extras: - return ( - package_name, - url_no_extras, - get_requirement("placeholder" + extras.lower()).extras, - ) - else: - return package_name, url_no_extras, set() - - for version_control in vcs: - if url.lower().startswith(f"{version_control}:"): - url = f"{version_control}+{url}" - break - - link = Link(url) - - if not link.is_vcs: - backends = ", ".join(vcs.all_schemes) - raise InstallationError( - f"{editable_req} is not a valid editable requirement. " - f"It should either be a path to a local project or a VCS URL " - f"(beginning with {backends})." - ) - - package_name = link.egg_fragment - if not package_name: - raise InstallationError( - "Could not detect requirement name for '{}', please specify one " - "with #egg=your_package_name".format(editable_req) - ) - return package_name, url, set() - - -def check_first_requirement_in_file(filename: str) -> None: - """Check if file is parsable as a requirements file. - - This is heavily based on ``pkg_resources.parse_requirements``, but - simplified to just check the first meaningful line. - - :raises InvalidRequirement: If the first meaningful line cannot be parsed - as an requirement. - """ - with open(filename, encoding="utf-8", errors="ignore") as f: - # Create a steppable iterator, so we can handle \-continuations. - lines = ( - line - for line in (line.strip() for line in f) - if line and not line.startswith("#") # Skip blank lines/comments. - ) - - for line in lines: - # Drop comments -- a hash without a space may be in a URL. - if " #" in line: - line = line[: line.find(" #")] - # If there is a line continuation, drop it, and append the next line. - if line.endswith("\\"): - line = line[:-2].strip() + next(lines, "") - Requirement(line) - return - - -def deduce_helpful_msg(req: str) -> str: - """Returns helpful msg in case requirements file does not exist, - or cannot be parsed. - - :params req: Requirements file path - """ - if not os.path.exists(req): - return f" File '{req}' does not exist." - msg = " The path does exist. " - # Try to parse and check if it is a requirements file. - try: - check_first_requirement_in_file(req) - except InvalidRequirement: - logger.debug("Cannot parse '%s' as requirements file", req) - else: - msg += ( - f"The argument you provided " - f"({req}) appears to be a" - f" requirements file. If that is the" - f" case, use the '-r' flag to install" - f" the packages specified within it." - ) - return msg - - -class RequirementParts: - def __init__( - self, - requirement: Optional[Requirement], - link: Optional[Link], - markers: Optional[Marker], - extras: Set[str], - ): - self.requirement = requirement - self.link = link - self.markers = markers - self.extras = extras - - -def parse_req_from_editable(editable_req: str) -> RequirementParts: - name, url, extras_override = parse_editable(editable_req) - - if name is not None: - try: - req: Optional[Requirement] = Requirement(name) - except InvalidRequirement: - raise InstallationError(f"Invalid requirement: '{name}'") - else: - req = None - - link = Link(url) - - return RequirementParts(req, link, None, extras_override) - - -# ---- The actual constructors follow ---- - - -def install_req_from_editable( - editable_req: str, - comes_from: Optional[Union[InstallRequirement, str]] = None, - *, - use_pep517: Optional[bool] = None, - isolated: bool = False, - global_options: Optional[List[str]] = None, - hash_options: Optional[Dict[str, List[str]]] = None, - constraint: bool = False, - user_supplied: bool = False, - permit_editable_wheels: bool = False, - config_settings: Optional[Dict[str, Union[str, List[str]]]] = None, -) -> InstallRequirement: - parts = parse_req_from_editable(editable_req) - - return InstallRequirement( - parts.requirement, - comes_from=comes_from, - user_supplied=user_supplied, - editable=True, - permit_editable_wheels=permit_editable_wheels, - link=parts.link, - constraint=constraint, - use_pep517=use_pep517, - isolated=isolated, - global_options=global_options, - hash_options=hash_options, - config_settings=config_settings, - extras=parts.extras, - ) - - -def _looks_like_path(name: str) -> bool: - """Checks whether the string "looks like" a path on the filesystem. - - This does not check whether the target actually exists, only judge from the - appearance. - - Returns true if any of the following conditions is true: - * a path separator is found (either os.path.sep or os.path.altsep); - * a dot is found (which represents the current directory). - """ - if os.path.sep in name: - return True - if os.path.altsep is not None and os.path.altsep in name: - return True - if name.startswith("."): - return True - return False - - -def _get_url_from_path(path: str, name: str) -> Optional[str]: - """ - First, it checks whether a provided path is an installable directory. If it - is, returns the path. - - If false, check if the path is an archive file (such as a .whl). - The function checks if the path is a file. If false, if the path has - an @, it will treat it as a PEP 440 URL requirement and return the path. - """ - if _looks_like_path(name) and os.path.isdir(path): - if is_installable_dir(path): - return path_to_url(path) - # TODO: The is_installable_dir test here might not be necessary - # now that it is done in load_pyproject_toml too. - raise InstallationError( - f"Directory {name!r} is not installable. Neither 'setup.py' " - "nor 'pyproject.toml' found." - ) - if not is_archive_file(path): - return None - if os.path.isfile(path): - return path_to_url(path) - urlreq_parts = name.split("@", 1) - if len(urlreq_parts) >= 2 and not _looks_like_path(urlreq_parts[0]): - # If the path contains '@' and the part before it does not look - # like a path, try to treat it as a PEP 440 URL req instead. - return None - logger.warning( - "Requirement %r looks like a filename, but the file does not exist", - name, - ) - return path_to_url(path) - - -def parse_req_from_line(name: str, line_source: Optional[str]) -> RequirementParts: - if is_url(name): - marker_sep = "; " - else: - marker_sep = ";" - if marker_sep in name: - name, markers_as_string = name.split(marker_sep, 1) - markers_as_string = markers_as_string.strip() - if not markers_as_string: - markers = None - else: - markers = Marker(markers_as_string) - else: - markers = None - name = name.strip() - req_as_string = None - path = os.path.normpath(os.path.abspath(name)) - link = None - extras_as_string = None - - if is_url(name): - link = Link(name) - else: - p, extras_as_string = _strip_extras(path) - url = _get_url_from_path(p, name) - if url is not None: - link = Link(url) - - # it's a local file, dir, or url - if link: - # Handle relative file URLs - if link.scheme == "file" and re.search(r"\.\./", link.url): - link = Link(path_to_url(os.path.normpath(os.path.abspath(link.path)))) - # wheel file - if link.is_wheel: - wheel = Wheel(link.filename) # can raise InvalidWheelFilename - req_as_string = f"{wheel.name}=={wheel.version}" - else: - # set the req to the egg fragment. when it's not there, this - # will become an 'unnamed' requirement - req_as_string = link.egg_fragment - - # a requirement specifier - else: - req_as_string = name - - extras = convert_extras(extras_as_string) - - def with_source(text: str) -> str: - if not line_source: - return text - return f"{text} (from {line_source})" - - def _parse_req_string(req_as_string: str) -> Requirement: - try: - req = get_requirement(req_as_string) - except InvalidRequirement: - if os.path.sep in req_as_string: - add_msg = "It looks like a path." - add_msg += deduce_helpful_msg(req_as_string) - elif "=" in req_as_string and not any( - op in req_as_string for op in operators - ): - add_msg = "= is not a valid operator. Did you mean == ?" - else: - add_msg = "" - msg = with_source(f"Invalid requirement: {req_as_string!r}") - if add_msg: - msg += f"\nHint: {add_msg}" - raise InstallationError(msg) - else: - # Deprecate extras after specifiers: "name>=1.0[extras]" - # This currently works by accident because _strip_extras() parses - # any extras in the end of the string and those are saved in - # RequirementParts - for spec in req.specifier: - spec_str = str(spec) - if spec_str.endswith("]"): - msg = f"Extras after version '{spec_str}'." - raise InstallationError(msg) - return req - - if req_as_string is not None: - req: Optional[Requirement] = _parse_req_string(req_as_string) - else: - req = None - - return RequirementParts(req, link, markers, extras) - - -def install_req_from_line( - name: str, - comes_from: Optional[Union[str, InstallRequirement]] = None, - *, - use_pep517: Optional[bool] = None, - isolated: bool = False, - global_options: Optional[List[str]] = None, - hash_options: Optional[Dict[str, List[str]]] = None, - constraint: bool = False, - line_source: Optional[str] = None, - user_supplied: bool = False, - config_settings: Optional[Dict[str, Union[str, List[str]]]] = None, -) -> InstallRequirement: - """Creates an InstallRequirement from a name, which might be a - requirement, directory containing 'setup.py', filename, or URL. - - :param line_source: An optional string describing where the line is from, - for logging purposes in case of an error. - """ - parts = parse_req_from_line(name, line_source) - - return InstallRequirement( - parts.requirement, - comes_from, - link=parts.link, - markers=parts.markers, - use_pep517=use_pep517, - isolated=isolated, - global_options=global_options, - hash_options=hash_options, - config_settings=config_settings, - constraint=constraint, - extras=parts.extras, - user_supplied=user_supplied, - ) - - -def install_req_from_req_string( - req_string: str, - comes_from: Optional[InstallRequirement] = None, - isolated: bool = False, - use_pep517: Optional[bool] = None, - user_supplied: bool = False, -) -> InstallRequirement: - try: - req = get_requirement(req_string) - except InvalidRequirement: - raise InstallationError(f"Invalid requirement: '{req_string}'") - - domains_not_allowed = [ - PyPI.file_storage_domain, - TestPyPI.file_storage_domain, - ] - if ( - req.url - and comes_from - and comes_from.link - and comes_from.link.netloc in domains_not_allowed - ): - # Explicitly disallow pypi packages that depend on external urls - raise InstallationError( - "Packages installed from PyPI cannot depend on packages " - "which are not also hosted on PyPI.\n" - f"{comes_from.name} depends on {req} " - ) - - return InstallRequirement( - req, - comes_from, - isolated=isolated, - use_pep517=use_pep517, - user_supplied=user_supplied, - ) - - -def install_req_from_parsed_requirement( - parsed_req: ParsedRequirement, - isolated: bool = False, - use_pep517: Optional[bool] = None, - user_supplied: bool = False, - config_settings: Optional[Dict[str, Union[str, List[str]]]] = None, -) -> InstallRequirement: - if parsed_req.is_editable: - req = install_req_from_editable( - parsed_req.requirement, - comes_from=parsed_req.comes_from, - use_pep517=use_pep517, - constraint=parsed_req.constraint, - isolated=isolated, - user_supplied=user_supplied, - config_settings=config_settings, - ) - - else: - req = install_req_from_line( - parsed_req.requirement, - comes_from=parsed_req.comes_from, - use_pep517=use_pep517, - isolated=isolated, - global_options=( - parsed_req.options.get("global_options", []) - if parsed_req.options - else [] - ), - hash_options=( - parsed_req.options.get("hashes", {}) if parsed_req.options else {} - ), - constraint=parsed_req.constraint, - line_source=parsed_req.line_source, - user_supplied=user_supplied, - config_settings=config_settings, - ) - return req - - -def install_req_from_link_and_ireq( - link: Link, ireq: InstallRequirement -) -> InstallRequirement: - return InstallRequirement( - req=ireq.req, - comes_from=ireq.comes_from, - editable=ireq.editable, - link=link, - markers=ireq.markers, - use_pep517=ireq.use_pep517, - isolated=ireq.isolated, - global_options=ireq.global_options, - hash_options=ireq.hash_options, - config_settings=ireq.config_settings, - user_supplied=ireq.user_supplied, - ) - - -def install_req_drop_extras(ireq: InstallRequirement) -> InstallRequirement: - """ - Creates a new InstallationRequirement using the given template but without - any extras. Sets the original requirement as the new one's parent - (comes_from). - """ - return InstallRequirement( - req=( - _set_requirement_extras(ireq.req, set()) if ireq.req is not None else None - ), - comes_from=ireq, - editable=ireq.editable, - link=ireq.link, - markers=ireq.markers, - use_pep517=ireq.use_pep517, - isolated=ireq.isolated, - global_options=ireq.global_options, - hash_options=ireq.hash_options, - constraint=ireq.constraint, - extras=[], - config_settings=ireq.config_settings, - user_supplied=ireq.user_supplied, - permit_editable_wheels=ireq.permit_editable_wheels, - ) - - -def install_req_extend_extras( - ireq: InstallRequirement, - extras: Collection[str], -) -> InstallRequirement: - """ - Returns a copy of an installation requirement with some additional extras. - Makes a shallow copy of the ireq object. - """ - result = copy.copy(ireq) - result.extras = {*ireq.extras, *extras} - result.req = ( - _set_requirement_extras(ireq.req, result.extras) - if ireq.req is not None - else None - ) - return result diff --git a/venv/lib/python3.11/site-packages/pip/_internal/req/req_file.py b/venv/lib/python3.11/site-packages/pip/_internal/req/req_file.py deleted file mode 100644 index 1ef3d5e..0000000 --- a/venv/lib/python3.11/site-packages/pip/_internal/req/req_file.py +++ /dev/null @@ -1,554 +0,0 @@ -""" -Requirements file parsing -""" - -import logging -import optparse -import os -import re -import shlex -import urllib.parse -from optparse import Values -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Dict, - Generator, - Iterable, - List, - Optional, - Tuple, -) - -from pip._internal.cli import cmdoptions -from pip._internal.exceptions import InstallationError, RequirementsFileParseError -from pip._internal.models.search_scope import SearchScope -from pip._internal.network.session import PipSession -from pip._internal.network.utils import raise_for_status -from pip._internal.utils.encoding import auto_decode -from pip._internal.utils.urls import get_url_scheme - -if TYPE_CHECKING: - # NoReturn introduced in 3.6.2; imported only for type checking to maintain - # pip compatibility with older patch versions of Python 3.6 - from typing import NoReturn - - from pip._internal.index.package_finder import PackageFinder - -__all__ = ["parse_requirements"] - -ReqFileLines = Iterable[Tuple[int, str]] - -LineParser = Callable[[str], Tuple[str, Values]] - -SCHEME_RE = re.compile(r"^(http|https|file):", re.I) -COMMENT_RE = re.compile(r"(^|\s+)#.*$") - -# Matches environment variable-style values in '${MY_VARIABLE_1}' with the -# variable name consisting of only uppercase letters, digits or the '_' -# (underscore). This follows the POSIX standard defined in IEEE Std 1003.1, -# 2013 Edition. -ENV_VAR_RE = re.compile(r"(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})") - -SUPPORTED_OPTIONS: List[Callable[..., optparse.Option]] = [ - cmdoptions.index_url, - cmdoptions.extra_index_url, - cmdoptions.no_index, - cmdoptions.constraints, - cmdoptions.requirements, - cmdoptions.editable, - cmdoptions.find_links, - cmdoptions.no_binary, - cmdoptions.only_binary, - cmdoptions.prefer_binary, - cmdoptions.require_hashes, - cmdoptions.pre, - cmdoptions.trusted_host, - cmdoptions.use_new_feature, -] - -# options to be passed to requirements -SUPPORTED_OPTIONS_REQ: List[Callable[..., optparse.Option]] = [ - cmdoptions.global_options, - cmdoptions.hash, - cmdoptions.config_settings, -] - -SUPPORTED_OPTIONS_EDITABLE_REQ: List[Callable[..., optparse.Option]] = [ - cmdoptions.config_settings, -] - - -# the 'dest' string values -SUPPORTED_OPTIONS_REQ_DEST = [str(o().dest) for o in SUPPORTED_OPTIONS_REQ] -SUPPORTED_OPTIONS_EDITABLE_REQ_DEST = [ - str(o().dest) for o in SUPPORTED_OPTIONS_EDITABLE_REQ -] - -logger = logging.getLogger(__name__) - - -class ParsedRequirement: - def __init__( - self, - requirement: str, - is_editable: bool, - comes_from: str, - constraint: bool, - options: Optional[Dict[str, Any]] = None, - line_source: Optional[str] = None, - ) -> None: - self.requirement = requirement - self.is_editable = is_editable - self.comes_from = comes_from - self.options = options - self.constraint = constraint - self.line_source = line_source - - -class ParsedLine: - def __init__( - self, - filename: str, - lineno: int, - args: str, - opts: Values, - constraint: bool, - ) -> None: - self.filename = filename - self.lineno = lineno - self.opts = opts - self.constraint = constraint - - if args: - self.is_requirement = True - self.is_editable = False - self.requirement = args - elif opts.editables: - self.is_requirement = True - self.is_editable = True - # We don't support multiple -e on one line - self.requirement = opts.editables[0] - else: - self.is_requirement = False - - -def parse_requirements( - filename: str, - session: PipSession, - finder: Optional["PackageFinder"] = None, - options: Optional[optparse.Values] = None, - constraint: bool = False, -) -> Generator[ParsedRequirement, None, None]: - """Parse a requirements file and yield ParsedRequirement instances. - - :param filename: Path or url of requirements file. - :param session: PipSession instance. - :param finder: Instance of pip.index.PackageFinder. - :param options: cli options. - :param constraint: If true, parsing a constraint file rather than - requirements file. - """ - line_parser = get_line_parser(finder) - parser = RequirementsFileParser(session, line_parser) - - for parsed_line in parser.parse(filename, constraint): - parsed_req = handle_line( - parsed_line, options=options, finder=finder, session=session - ) - if parsed_req is not None: - yield parsed_req - - -def preprocess(content: str) -> ReqFileLines: - """Split, filter, and join lines, and return a line iterator - - :param content: the content of the requirements file - """ - lines_enum: ReqFileLines = enumerate(content.splitlines(), start=1) - lines_enum = join_lines(lines_enum) - lines_enum = ignore_comments(lines_enum) - lines_enum = expand_env_variables(lines_enum) - return lines_enum - - -def handle_requirement_line( - line: ParsedLine, - options: Optional[optparse.Values] = None, -) -> ParsedRequirement: - # preserve for the nested code path - line_comes_from = "{} {} (line {})".format( - "-c" if line.constraint else "-r", - line.filename, - line.lineno, - ) - - assert line.is_requirement - - # get the options that apply to requirements - if line.is_editable: - supported_dest = SUPPORTED_OPTIONS_EDITABLE_REQ_DEST - else: - supported_dest = SUPPORTED_OPTIONS_REQ_DEST - req_options = {} - for dest in supported_dest: - if dest in line.opts.__dict__ and line.opts.__dict__[dest]: - req_options[dest] = line.opts.__dict__[dest] - - line_source = f"line {line.lineno} of {line.filename}" - return ParsedRequirement( - requirement=line.requirement, - is_editable=line.is_editable, - comes_from=line_comes_from, - constraint=line.constraint, - options=req_options, - line_source=line_source, - ) - - -def handle_option_line( - opts: Values, - filename: str, - lineno: int, - finder: Optional["PackageFinder"] = None, - options: Optional[optparse.Values] = None, - session: Optional[PipSession] = None, -) -> None: - if opts.hashes: - logger.warning( - "%s line %s has --hash but no requirement, and will be ignored.", - filename, - lineno, - ) - - if options: - # percolate options upward - if opts.require_hashes: - options.require_hashes = opts.require_hashes - if opts.features_enabled: - options.features_enabled.extend( - f for f in opts.features_enabled if f not in options.features_enabled - ) - - # set finder options - if finder: - find_links = finder.find_links - index_urls = finder.index_urls - no_index = finder.search_scope.no_index - if opts.no_index is True: - no_index = True - index_urls = [] - if opts.index_url and not no_index: - index_urls = [opts.index_url] - if opts.extra_index_urls and not no_index: - index_urls.extend(opts.extra_index_urls) - if opts.find_links: - # FIXME: it would be nice to keep track of the source - # of the find_links: support a find-links local path - # relative to a requirements file. - value = opts.find_links[0] - req_dir = os.path.dirname(os.path.abspath(filename)) - relative_to_reqs_file = os.path.join(req_dir, value) - if os.path.exists(relative_to_reqs_file): - value = relative_to_reqs_file - find_links.append(value) - - if session: - # We need to update the auth urls in session - session.update_index_urls(index_urls) - - search_scope = SearchScope( - find_links=find_links, - index_urls=index_urls, - no_index=no_index, - ) - finder.search_scope = search_scope - - if opts.pre: - finder.set_allow_all_prereleases() - - if opts.prefer_binary: - finder.set_prefer_binary() - - if session: - for host in opts.trusted_hosts or []: - source = f"line {lineno} of {filename}" - session.add_trusted_host(host, source=source) - - -def handle_line( - line: ParsedLine, - options: Optional[optparse.Values] = None, - finder: Optional["PackageFinder"] = None, - session: Optional[PipSession] = None, -) -> Optional[ParsedRequirement]: - """Handle a single parsed requirements line; This can result in - creating/yielding requirements, or updating the finder. - - :param line: The parsed line to be processed. - :param options: CLI options. - :param finder: The finder - updated by non-requirement lines. - :param session: The session - updated by non-requirement lines. - - Returns a ParsedRequirement object if the line is a requirement line, - otherwise returns None. - - For lines that contain requirements, the only options that have an effect - are from SUPPORTED_OPTIONS_REQ, and they are scoped to the - requirement. Other options from SUPPORTED_OPTIONS may be present, but are - ignored. - - For lines that do not contain requirements, the only options that have an - effect are from SUPPORTED_OPTIONS. Options from SUPPORTED_OPTIONS_REQ may - be present, but are ignored. These lines may contain multiple options - (although our docs imply only one is supported), and all our parsed and - affect the finder. - """ - - if line.is_requirement: - parsed_req = handle_requirement_line(line, options) - return parsed_req - else: - handle_option_line( - line.opts, - line.filename, - line.lineno, - finder, - options, - session, - ) - return None - - -class RequirementsFileParser: - def __init__( - self, - session: PipSession, - line_parser: LineParser, - ) -> None: - self._session = session - self._line_parser = line_parser - - def parse( - self, filename: str, constraint: bool - ) -> Generator[ParsedLine, None, None]: - """Parse a given file, yielding parsed lines.""" - yield from self._parse_and_recurse(filename, constraint) - - def _parse_and_recurse( - self, filename: str, constraint: bool - ) -> Generator[ParsedLine, None, None]: - for line in self._parse_file(filename, constraint): - if not line.is_requirement and ( - line.opts.requirements or line.opts.constraints - ): - # parse a nested requirements file - if line.opts.requirements: - req_path = line.opts.requirements[0] - nested_constraint = False - else: - req_path = line.opts.constraints[0] - nested_constraint = True - - # original file is over http - if SCHEME_RE.search(filename): - # do a url join so relative paths work - req_path = urllib.parse.urljoin(filename, req_path) - # original file and nested file are paths - elif not SCHEME_RE.search(req_path): - # do a join so relative paths work - req_path = os.path.join( - os.path.dirname(filename), - req_path, - ) - - yield from self._parse_and_recurse(req_path, nested_constraint) - else: - yield line - - def _parse_file( - self, filename: str, constraint: bool - ) -> Generator[ParsedLine, None, None]: - _, content = get_file_content(filename, self._session) - - lines_enum = preprocess(content) - - for line_number, line in lines_enum: - try: - args_str, opts = self._line_parser(line) - except OptionParsingError as e: - # add offending line - msg = f"Invalid requirement: {line}\n{e.msg}" - raise RequirementsFileParseError(msg) - - yield ParsedLine( - filename, - line_number, - args_str, - opts, - constraint, - ) - - -def get_line_parser(finder: Optional["PackageFinder"]) -> LineParser: - def parse_line(line: str) -> Tuple[str, Values]: - # Build new parser for each line since it accumulates appendable - # options. - parser = build_parser() - defaults = parser.get_default_values() - defaults.index_url = None - if finder: - defaults.format_control = finder.format_control - - args_str, options_str = break_args_options(line) - - try: - options = shlex.split(options_str) - except ValueError as e: - raise OptionParsingError(f"Could not split options: {options_str}") from e - - opts, _ = parser.parse_args(options, defaults) - - return args_str, opts - - return parse_line - - -def break_args_options(line: str) -> Tuple[str, str]: - """Break up the line into an args and options string. We only want to shlex - (and then optparse) the options, not the args. args can contain markers - which are corrupted by shlex. - """ - tokens = line.split(" ") - args = [] - options = tokens[:] - for token in tokens: - if token.startswith("-") or token.startswith("--"): - break - else: - args.append(token) - options.pop(0) - return " ".join(args), " ".join(options) - - -class OptionParsingError(Exception): - def __init__(self, msg: str) -> None: - self.msg = msg - - -def build_parser() -> optparse.OptionParser: - """ - Return a parser for parsing requirement lines - """ - parser = optparse.OptionParser(add_help_option=False) - - option_factories = SUPPORTED_OPTIONS + SUPPORTED_OPTIONS_REQ - for option_factory in option_factories: - option = option_factory() - parser.add_option(option) - - # By default optparse sys.exits on parsing errors. We want to wrap - # that in our own exception. - def parser_exit(self: Any, msg: str) -> "NoReturn": - raise OptionParsingError(msg) - - # NOTE: mypy disallows assigning to a method - # https://github.com/python/mypy/issues/2427 - parser.exit = parser_exit # type: ignore - - return parser - - -def join_lines(lines_enum: ReqFileLines) -> ReqFileLines: - """Joins a line ending in '\' with the previous line (except when following - comments). The joined line takes on the index of the first line. - """ - primary_line_number = None - new_line: List[str] = [] - for line_number, line in lines_enum: - if not line.endswith("\\") or COMMENT_RE.match(line): - if COMMENT_RE.match(line): - # this ensures comments are always matched later - line = " " + line - if new_line: - new_line.append(line) - assert primary_line_number is not None - yield primary_line_number, "".join(new_line) - new_line = [] - else: - yield line_number, line - else: - if not new_line: - primary_line_number = line_number - new_line.append(line.strip("\\")) - - # last line contains \ - if new_line: - assert primary_line_number is not None - yield primary_line_number, "".join(new_line) - - # TODO: handle space after '\'. - - -def ignore_comments(lines_enum: ReqFileLines) -> ReqFileLines: - """ - Strips comments and filter empty lines. - """ - for line_number, line in lines_enum: - line = COMMENT_RE.sub("", line) - line = line.strip() - if line: - yield line_number, line - - -def expand_env_variables(lines_enum: ReqFileLines) -> ReqFileLines: - """Replace all environment variables that can be retrieved via `os.getenv`. - - The only allowed format for environment variables defined in the - requirement file is `${MY_VARIABLE_1}` to ensure two things: - - 1. Strings that contain a `$` aren't accidentally (partially) expanded. - 2. Ensure consistency across platforms for requirement files. - - These points are the result of a discussion on the `github pull - request #3514 <https://github.com/pypa/pip/pull/3514>`_. - - Valid characters in variable names follow the `POSIX standard - <http://pubs.opengroup.org/onlinepubs/9699919799/>`_ and are limited - to uppercase letter, digits and the `_` (underscore). - """ - for line_number, line in lines_enum: - for env_var, var_name in ENV_VAR_RE.findall(line): - value = os.getenv(var_name) - if not value: - continue - - line = line.replace(env_var, value) - - yield line_number, line - - -def get_file_content(url: str, session: PipSession) -> Tuple[str, str]: - """Gets the content of a file; it may be a filename, file: URL, or - http: URL. Returns (location, content). Content is unicode. - Respects # -*- coding: declarations on the retrieved files. - - :param url: File path or url. - :param session: PipSession instance. - """ - scheme = get_url_scheme(url) - - # Pip has special support for file:// URLs (LocalFSAdapter). - if scheme in ["http", "https", "file"]: - resp = session.get(url) - raise_for_status(resp) - return resp.url, resp.text - - # Assume this is a bare path. - try: - with open(url, "rb") as f: - content = auto_decode(f.read()) - except OSError as exc: - raise InstallationError(f"Could not open requirements file: {exc}") - return url, content diff --git a/venv/lib/python3.11/site-packages/pip/_internal/req/req_install.py b/venv/lib/python3.11/site-packages/pip/_internal/req/req_install.py deleted file mode 100644 index a65611c..0000000 --- a/venv/lib/python3.11/site-packages/pip/_internal/req/req_install.py +++ /dev/null @@ -1,923 +0,0 @@ -import functools -import logging -import os -import shutil -import sys -import uuid -import zipfile -from optparse import Values -from pathlib import Path -from typing import Any, Collection, Dict, Iterable, List, Optional, Sequence, Union - -from pip._vendor.packaging.markers import Marker -from pip._vendor.packaging.requirements import Requirement -from pip._vendor.packaging.specifiers import SpecifierSet -from pip._vendor.packaging.utils import canonicalize_name -from pip._vendor.packaging.version import Version -from pip._vendor.packaging.version import parse as parse_version -from pip._vendor.pyproject_hooks import BuildBackendHookCaller - -from pip._internal.build_env import BuildEnvironment, NoOpBuildEnvironment -from pip._internal.exceptions import InstallationError, PreviousBuildDirError -from pip._internal.locations import get_scheme -from pip._internal.metadata import ( - BaseDistribution, - get_default_environment, - get_directory_distribution, - get_wheel_distribution, -) -from pip._internal.metadata.base import FilesystemWheel -from pip._internal.models.direct_url import DirectUrl -from pip._internal.models.link import Link -from pip._internal.operations.build.metadata import generate_metadata -from pip._internal.operations.build.metadata_editable import generate_editable_metadata -from pip._internal.operations.build.metadata_legacy import ( - generate_metadata as generate_metadata_legacy, -) -from pip._internal.operations.install.editable_legacy import ( - install_editable as install_editable_legacy, -) -from pip._internal.operations.install.wheel import install_wheel -from pip._internal.pyproject import load_pyproject_toml, make_pyproject_path -from pip._internal.req.req_uninstall import UninstallPathSet -from pip._internal.utils.deprecation import deprecated -from pip._internal.utils.hashes import Hashes -from pip._internal.utils.misc import ( - ConfiguredBuildBackendHookCaller, - ask_path_exists, - backup_dir, - display_path, - hide_url, - is_installable_dir, - redact_auth_from_requirement, - redact_auth_from_url, -) -from pip._internal.utils.packaging import safe_extra -from pip._internal.utils.subprocess import runner_with_spinner_message -from pip._internal.utils.temp_dir import TempDirectory, tempdir_kinds -from pip._internal.utils.unpacking import unpack_file -from pip._internal.utils.virtualenv import running_under_virtualenv -from pip._internal.vcs import vcs - -logger = logging.getLogger(__name__) - - -class InstallRequirement: - """ - Represents something that may be installed later on, may have information - about where to fetch the relevant requirement and also contains logic for - installing the said requirement. - """ - - def __init__( - self, - req: Optional[Requirement], - comes_from: Optional[Union[str, "InstallRequirement"]], - editable: bool = False, - link: Optional[Link] = None, - markers: Optional[Marker] = None, - use_pep517: Optional[bool] = None, - isolated: bool = False, - *, - global_options: Optional[List[str]] = None, - hash_options: Optional[Dict[str, List[str]]] = None, - config_settings: Optional[Dict[str, Union[str, List[str]]]] = None, - constraint: bool = False, - extras: Collection[str] = (), - user_supplied: bool = False, - permit_editable_wheels: bool = False, - ) -> None: - assert req is None or isinstance(req, Requirement), req - self.req = req - self.comes_from = comes_from - self.constraint = constraint - self.editable = editable - self.permit_editable_wheels = permit_editable_wheels - - # source_dir is the local directory where the linked requirement is - # located, or unpacked. In case unpacking is needed, creating and - # populating source_dir is done by the RequirementPreparer. Note this - # is not necessarily the directory where pyproject.toml or setup.py is - # located - that one is obtained via unpacked_source_directory. - self.source_dir: Optional[str] = None - if self.editable: - assert link - if link.is_file: - self.source_dir = os.path.normpath(os.path.abspath(link.file_path)) - - # original_link is the direct URL that was provided by the user for the - # requirement, either directly or via a constraints file. - if link is None and req and req.url: - # PEP 508 URL requirement - link = Link(req.url) - self.link = self.original_link = link - - # When this InstallRequirement is a wheel obtained from the cache of locally - # built wheels, this is the source link corresponding to the cache entry, which - # was used to download and build the cached wheel. - self.cached_wheel_source_link: Optional[Link] = None - - # Information about the location of the artifact that was downloaded . This - # property is guaranteed to be set in resolver results. - self.download_info: Optional[DirectUrl] = None - - # Path to any downloaded or already-existing package. - self.local_file_path: Optional[str] = None - if self.link and self.link.is_file: - self.local_file_path = self.link.file_path - - if extras: - self.extras = extras - elif req: - self.extras = req.extras - else: - self.extras = set() - if markers is None and req: - markers = req.marker - self.markers = markers - - # This holds the Distribution object if this requirement is already installed. - self.satisfied_by: Optional[BaseDistribution] = None - # Whether the installation process should try to uninstall an existing - # distribution before installing this requirement. - self.should_reinstall = False - # Temporary build location - self._temp_build_dir: Optional[TempDirectory] = None - # Set to True after successful installation - self.install_succeeded: Optional[bool] = None - # Supplied options - self.global_options = global_options if global_options else [] - self.hash_options = hash_options if hash_options else {} - self.config_settings = config_settings - # Set to True after successful preparation of this requirement - self.prepared = False - # User supplied requirement are explicitly requested for installation - # by the user via CLI arguments or requirements files, as opposed to, - # e.g. dependencies, extras or constraints. - self.user_supplied = user_supplied - - self.isolated = isolated - self.build_env: BuildEnvironment = NoOpBuildEnvironment() - - # For PEP 517, the directory where we request the project metadata - # gets stored. We need this to pass to build_wheel, so the backend - # can ensure that the wheel matches the metadata (see the PEP for - # details). - self.metadata_directory: Optional[str] = None - - # The static build requirements (from pyproject.toml) - self.pyproject_requires: Optional[List[str]] = None - - # Build requirements that we will check are available - self.requirements_to_check: List[str] = [] - - # The PEP 517 backend we should use to build the project - self.pep517_backend: Optional[BuildBackendHookCaller] = None - - # Are we using PEP 517 for this requirement? - # After pyproject.toml has been loaded, the only valid values are True - # and False. Before loading, None is valid (meaning "use the default"). - # Setting an explicit value before loading pyproject.toml is supported, - # but after loading this flag should be treated as read only. - self.use_pep517 = use_pep517 - - # If config settings are provided, enforce PEP 517. - if self.config_settings: - if self.use_pep517 is False: - logger.warning( - "--no-use-pep517 ignored for %s " - "because --config-settings are specified.", - self, - ) - self.use_pep517 = True - - # This requirement needs more preparation before it can be built - self.needs_more_preparation = False - - # This requirement needs to be unpacked before it can be installed. - self._archive_source: Optional[Path] = None - - def __str__(self) -> str: - if self.req: - s = redact_auth_from_requirement(self.req) - if self.link: - s += f" from {redact_auth_from_url(self.link.url)}" - elif self.link: - s = redact_auth_from_url(self.link.url) - else: - s = "<InstallRequirement>" - if self.satisfied_by is not None: - if self.satisfied_by.location is not None: - location = display_path(self.satisfied_by.location) - else: - location = "<memory>" - s += f" in {location}" - if self.comes_from: - if isinstance(self.comes_from, str): - comes_from: Optional[str] = self.comes_from - else: - comes_from = self.comes_from.from_path() - if comes_from: - s += f" (from {comes_from})" - return s - - def __repr__(self) -> str: - return "<{} object: {} editable={!r}>".format( - self.__class__.__name__, str(self), self.editable - ) - - def format_debug(self) -> str: - """An un-tested helper for getting state, for debugging.""" - attributes = vars(self) - names = sorted(attributes) - - state = (f"{attr}={attributes[attr]!r}" for attr in sorted(names)) - return "<{name} object: {{{state}}}>".format( - name=self.__class__.__name__, - state=", ".join(state), - ) - - # Things that are valid for all kinds of requirements? - @property - def name(self) -> Optional[str]: - if self.req is None: - return None - return self.req.name - - @functools.lru_cache() # use cached_property in python 3.8+ - def supports_pyproject_editable(self) -> bool: - if not self.use_pep517: - return False - assert self.pep517_backend - with self.build_env: - runner = runner_with_spinner_message( - "Checking if build backend supports build_editable" - ) - with self.pep517_backend.subprocess_runner(runner): - return "build_editable" in self.pep517_backend._supported_features() - - @property - def specifier(self) -> SpecifierSet: - assert self.req is not None - return self.req.specifier - - @property - def is_direct(self) -> bool: - """Whether this requirement was specified as a direct URL.""" - return self.original_link is not None - - @property - def is_pinned(self) -> bool: - """Return whether I am pinned to an exact version. - - For example, some-package==1.2 is pinned; some-package>1.2 is not. - """ - assert self.req is not None - specifiers = self.req.specifier - return len(specifiers) == 1 and next(iter(specifiers)).operator in {"==", "==="} - - def match_markers(self, extras_requested: Optional[Iterable[str]] = None) -> bool: - if not extras_requested: - # Provide an extra to safely evaluate the markers - # without matching any extra - extras_requested = ("",) - if self.markers is not None: - return any( - self.markers.evaluate({"extra": extra}) - # TODO: Remove these two variants when packaging is upgraded to - # support the marker comparison logic specified in PEP 685. - or self.markers.evaluate({"extra": safe_extra(extra)}) - or self.markers.evaluate({"extra": canonicalize_name(extra)}) - for extra in extras_requested - ) - else: - return True - - @property - def has_hash_options(self) -> bool: - """Return whether any known-good hashes are specified as options. - - These activate --require-hashes mode; hashes specified as part of a - URL do not. - - """ - return bool(self.hash_options) - - def hashes(self, trust_internet: bool = True) -> Hashes: - """Return a hash-comparer that considers my option- and URL-based - hashes to be known-good. - - Hashes in URLs--ones embedded in the requirements file, not ones - downloaded from an index server--are almost peers with ones from - flags. They satisfy --require-hashes (whether it was implicitly or - explicitly activated) but do not activate it. md5 and sha224 are not - allowed in flags, which should nudge people toward good algos. We - always OR all hashes together, even ones from URLs. - - :param trust_internet: Whether to trust URL-based (#md5=...) hashes - downloaded from the internet, as by populate_link() - - """ - good_hashes = self.hash_options.copy() - if trust_internet: - link = self.link - elif self.is_direct and self.user_supplied: - link = self.original_link - else: - link = None - if link and link.hash: - assert link.hash_name is not None - good_hashes.setdefault(link.hash_name, []).append(link.hash) - return Hashes(good_hashes) - - def from_path(self) -> Optional[str]: - """Format a nice indicator to show where this "comes from" """ - if self.req is None: - return None - s = str(self.req) - if self.comes_from: - comes_from: Optional[str] - if isinstance(self.comes_from, str): - comes_from = self.comes_from - else: - comes_from = self.comes_from.from_path() - if comes_from: - s += "->" + comes_from - return s - - def ensure_build_location( - self, build_dir: str, autodelete: bool, parallel_builds: bool - ) -> str: - assert build_dir is not None - if self._temp_build_dir is not None: - assert self._temp_build_dir.path - return self._temp_build_dir.path - if self.req is None: - # Some systems have /tmp as a symlink which confuses custom - # builds (such as numpy). Thus, we ensure that the real path - # is returned. - self._temp_build_dir = TempDirectory( - kind=tempdir_kinds.REQ_BUILD, globally_managed=True - ) - - return self._temp_build_dir.path - - # This is the only remaining place where we manually determine the path - # for the temporary directory. It is only needed for editables where - # it is the value of the --src option. - - # When parallel builds are enabled, add a UUID to the build directory - # name so multiple builds do not interfere with each other. - dir_name: str = canonicalize_name(self.req.name) - if parallel_builds: - dir_name = f"{dir_name}_{uuid.uuid4().hex}" - - # FIXME: Is there a better place to create the build_dir? (hg and bzr - # need this) - if not os.path.exists(build_dir): - logger.debug("Creating directory %s", build_dir) - os.makedirs(build_dir) - actual_build_dir = os.path.join(build_dir, dir_name) - # `None` indicates that we respect the globally-configured deletion - # settings, which is what we actually want when auto-deleting. - delete_arg = None if autodelete else False - return TempDirectory( - path=actual_build_dir, - delete=delete_arg, - kind=tempdir_kinds.REQ_BUILD, - globally_managed=True, - ).path - - def _set_requirement(self) -> None: - """Set requirement after generating metadata.""" - assert self.req is None - assert self.metadata is not None - assert self.source_dir is not None - - # Construct a Requirement object from the generated metadata - if isinstance(parse_version(self.metadata["Version"]), Version): - op = "==" - else: - op = "===" - - self.req = Requirement( - "".join( - [ - self.metadata["Name"], - op, - self.metadata["Version"], - ] - ) - ) - - def warn_on_mismatching_name(self) -> None: - assert self.req is not None - metadata_name = canonicalize_name(self.metadata["Name"]) - if canonicalize_name(self.req.name) == metadata_name: - # Everything is fine. - return - - # If we're here, there's a mismatch. Log a warning about it. - logger.warning( - "Generating metadata for package %s " - "produced metadata for project name %s. Fix your " - "#egg=%s fragments.", - self.name, - metadata_name, - self.name, - ) - self.req = Requirement(metadata_name) - - def check_if_exists(self, use_user_site: bool) -> None: - """Find an installed distribution that satisfies or conflicts - with this requirement, and set self.satisfied_by or - self.should_reinstall appropriately. - """ - if self.req is None: - return - existing_dist = get_default_environment().get_distribution(self.req.name) - if not existing_dist: - return - - version_compatible = self.req.specifier.contains( - existing_dist.version, - prereleases=True, - ) - if not version_compatible: - self.satisfied_by = None - if use_user_site: - if existing_dist.in_usersite: - self.should_reinstall = True - elif running_under_virtualenv() and existing_dist.in_site_packages: - raise InstallationError( - f"Will not install to the user site because it will " - f"lack sys.path precedence to {existing_dist.raw_name} " - f"in {existing_dist.location}" - ) - else: - self.should_reinstall = True - else: - if self.editable: - self.should_reinstall = True - # when installing editables, nothing pre-existing should ever - # satisfy - self.satisfied_by = None - else: - self.satisfied_by = existing_dist - - # Things valid for wheels - @property - def is_wheel(self) -> bool: - if not self.link: - return False - return self.link.is_wheel - - @property - def is_wheel_from_cache(self) -> bool: - # When True, it means that this InstallRequirement is a local wheel file in the - # cache of locally built wheels. - return self.cached_wheel_source_link is not None - - # Things valid for sdists - @property - def unpacked_source_directory(self) -> str: - assert self.source_dir, f"No source dir for {self}" - return os.path.join( - self.source_dir, self.link and self.link.subdirectory_fragment or "" - ) - - @property - def setup_py_path(self) -> str: - assert self.source_dir, f"No source dir for {self}" - setup_py = os.path.join(self.unpacked_source_directory, "setup.py") - - return setup_py - - @property - def setup_cfg_path(self) -> str: - assert self.source_dir, f"No source dir for {self}" - setup_cfg = os.path.join(self.unpacked_source_directory, "setup.cfg") - - return setup_cfg - - @property - def pyproject_toml_path(self) -> str: - assert self.source_dir, f"No source dir for {self}" - return make_pyproject_path(self.unpacked_source_directory) - - def load_pyproject_toml(self) -> None: - """Load the pyproject.toml file. - - After calling this routine, all of the attributes related to PEP 517 - processing for this requirement have been set. In particular, the - use_pep517 attribute can be used to determine whether we should - follow the PEP 517 or legacy (setup.py) code path. - """ - pyproject_toml_data = load_pyproject_toml( - self.use_pep517, self.pyproject_toml_path, self.setup_py_path, str(self) - ) - - if pyproject_toml_data is None: - assert not self.config_settings - self.use_pep517 = False - return - - self.use_pep517 = True - requires, backend, check, backend_path = pyproject_toml_data - self.requirements_to_check = check - self.pyproject_requires = requires - self.pep517_backend = ConfiguredBuildBackendHookCaller( - self, - self.unpacked_source_directory, - backend, - backend_path=backend_path, - ) - - def isolated_editable_sanity_check(self) -> None: - """Check that an editable requirement if valid for use with PEP 517/518. - - This verifies that an editable that has a pyproject.toml either supports PEP 660 - or as a setup.py or a setup.cfg - """ - if ( - self.editable - and self.use_pep517 - and not self.supports_pyproject_editable() - and not os.path.isfile(self.setup_py_path) - and not os.path.isfile(self.setup_cfg_path) - ): - raise InstallationError( - f"Project {self} has a 'pyproject.toml' and its build " - f"backend is missing the 'build_editable' hook. Since it does not " - f"have a 'setup.py' nor a 'setup.cfg', " - f"it cannot be installed in editable mode. " - f"Consider using a build backend that supports PEP 660." - ) - - def prepare_metadata(self) -> None: - """Ensure that project metadata is available. - - Under PEP 517 and PEP 660, call the backend hook to prepare the metadata. - Under legacy processing, call setup.py egg-info. - """ - assert self.source_dir, f"No source dir for {self}" - details = self.name or f"from {self.link}" - - if self.use_pep517: - assert self.pep517_backend is not None - if ( - self.editable - and self.permit_editable_wheels - and self.supports_pyproject_editable() - ): - self.metadata_directory = generate_editable_metadata( - build_env=self.build_env, - backend=self.pep517_backend, - details=details, - ) - else: - self.metadata_directory = generate_metadata( - build_env=self.build_env, - backend=self.pep517_backend, - details=details, - ) - else: - self.metadata_directory = generate_metadata_legacy( - build_env=self.build_env, - setup_py_path=self.setup_py_path, - source_dir=self.unpacked_source_directory, - isolated=self.isolated, - details=details, - ) - - # Act on the newly generated metadata, based on the name and version. - if not self.name: - self._set_requirement() - else: - self.warn_on_mismatching_name() - - self.assert_source_matches_version() - - @property - def metadata(self) -> Any: - if not hasattr(self, "_metadata"): - self._metadata = self.get_dist().metadata - - return self._metadata - - def get_dist(self) -> BaseDistribution: - if self.metadata_directory: - return get_directory_distribution(self.metadata_directory) - elif self.local_file_path and self.is_wheel: - assert self.req is not None - return get_wheel_distribution( - FilesystemWheel(self.local_file_path), - canonicalize_name(self.req.name), - ) - raise AssertionError( - f"InstallRequirement {self} has no metadata directory and no wheel: " - f"can't make a distribution." - ) - - def assert_source_matches_version(self) -> None: - assert self.source_dir, f"No source dir for {self}" - version = self.metadata["version"] - if self.req and self.req.specifier and version not in self.req.specifier: - logger.warning( - "Requested %s, but installing version %s", - self, - version, - ) - else: - logger.debug( - "Source in %s has version %s, which satisfies requirement %s", - display_path(self.source_dir), - version, - self, - ) - - # For both source distributions and editables - def ensure_has_source_dir( - self, - parent_dir: str, - autodelete: bool = False, - parallel_builds: bool = False, - ) -> None: - """Ensure that a source_dir is set. - - This will create a temporary build dir if the name of the requirement - isn't known yet. - - :param parent_dir: The ideal pip parent_dir for the source_dir. - Generally src_dir for editables and build_dir for sdists. - :return: self.source_dir - """ - if self.source_dir is None: - self.source_dir = self.ensure_build_location( - parent_dir, - autodelete=autodelete, - parallel_builds=parallel_builds, - ) - - def needs_unpacked_archive(self, archive_source: Path) -> None: - assert self._archive_source is None - self._archive_source = archive_source - - def ensure_pristine_source_checkout(self) -> None: - """Ensure the source directory has not yet been built in.""" - assert self.source_dir is not None - if self._archive_source is not None: - unpack_file(str(self._archive_source), self.source_dir) - elif is_installable_dir(self.source_dir): - # If a checkout exists, it's unwise to keep going. - # version inconsistencies are logged later, but do not fail - # the installation. - raise PreviousBuildDirError( - f"pip can't proceed with requirements '{self}' due to a " - f"pre-existing build directory ({self.source_dir}). This is likely " - "due to a previous installation that failed . pip is " - "being responsible and not assuming it can delete this. " - "Please delete it and try again." - ) - - # For editable installations - def update_editable(self) -> None: - if not self.link: - logger.debug( - "Cannot update repository at %s; repository location is unknown", - self.source_dir, - ) - return - assert self.editable - assert self.source_dir - if self.link.scheme == "file": - # Static paths don't get updated - return - vcs_backend = vcs.get_backend_for_scheme(self.link.scheme) - # Editable requirements are validated in Requirement constructors. - # So here, if it's neither a path nor a valid VCS URL, it's a bug. - assert vcs_backend, f"Unsupported VCS URL {self.link.url}" - hidden_url = hide_url(self.link.url) - vcs_backend.obtain(self.source_dir, url=hidden_url, verbosity=0) - - # Top-level Actions - def uninstall( - self, auto_confirm: bool = False, verbose: bool = False - ) -> Optional[UninstallPathSet]: - """ - Uninstall the distribution currently satisfying this requirement. - - Prompts before removing or modifying files unless - ``auto_confirm`` is True. - - Refuses to delete or modify files outside of ``sys.prefix`` - - thus uninstallation within a virtual environment can only - modify that virtual environment, even if the virtualenv is - linked to global site-packages. - - """ - assert self.req - dist = get_default_environment().get_distribution(self.req.name) - if not dist: - logger.warning("Skipping %s as it is not installed.", self.name) - return None - logger.info("Found existing installation: %s", dist) - - uninstalled_pathset = UninstallPathSet.from_dist(dist) - uninstalled_pathset.remove(auto_confirm, verbose) - return uninstalled_pathset - - def _get_archive_name(self, path: str, parentdir: str, rootdir: str) -> str: - def _clean_zip_name(name: str, prefix: str) -> str: - assert name.startswith( - prefix + os.path.sep - ), f"name {name!r} doesn't start with prefix {prefix!r}" - name = name[len(prefix) + 1 :] - name = name.replace(os.path.sep, "/") - return name - - assert self.req is not None - path = os.path.join(parentdir, path) - name = _clean_zip_name(path, rootdir) - return self.req.name + "/" + name - - def archive(self, build_dir: Optional[str]) -> None: - """Saves archive to provided build_dir. - - Used for saving downloaded VCS requirements as part of `pip download`. - """ - assert self.source_dir - if build_dir is None: - return - - create_archive = True - archive_name = "{}-{}.zip".format(self.name, self.metadata["version"]) - archive_path = os.path.join(build_dir, archive_name) - - if os.path.exists(archive_path): - response = ask_path_exists( - f"The file {display_path(archive_path)} exists. (i)gnore, (w)ipe, " - "(b)ackup, (a)bort ", - ("i", "w", "b", "a"), - ) - if response == "i": - create_archive = False - elif response == "w": - logger.warning("Deleting %s", display_path(archive_path)) - os.remove(archive_path) - elif response == "b": - dest_file = backup_dir(archive_path) - logger.warning( - "Backing up %s to %s", - display_path(archive_path), - display_path(dest_file), - ) - shutil.move(archive_path, dest_file) - elif response == "a": - sys.exit(-1) - - if not create_archive: - return - - zip_output = zipfile.ZipFile( - archive_path, - "w", - zipfile.ZIP_DEFLATED, - allowZip64=True, - ) - with zip_output: - dir = os.path.normcase(os.path.abspath(self.unpacked_source_directory)) - for dirpath, dirnames, filenames in os.walk(dir): - for dirname in dirnames: - dir_arcname = self._get_archive_name( - dirname, - parentdir=dirpath, - rootdir=dir, - ) - zipdir = zipfile.ZipInfo(dir_arcname + "/") - zipdir.external_attr = 0x1ED << 16 # 0o755 - zip_output.writestr(zipdir, "") - for filename in filenames: - file_arcname = self._get_archive_name( - filename, - parentdir=dirpath, - rootdir=dir, - ) - filename = os.path.join(dirpath, filename) - zip_output.write(filename, file_arcname) - - logger.info("Saved %s", display_path(archive_path)) - - def install( - self, - global_options: Optional[Sequence[str]] = None, - root: Optional[str] = None, - home: Optional[str] = None, - prefix: Optional[str] = None, - warn_script_location: bool = True, - use_user_site: bool = False, - pycompile: bool = True, - ) -> None: - assert self.req is not None - scheme = get_scheme( - self.req.name, - user=use_user_site, - home=home, - root=root, - isolated=self.isolated, - prefix=prefix, - ) - - if self.editable and not self.is_wheel: - if self.config_settings: - logger.warning( - "--config-settings ignored for legacy editable install of %s. " - "Consider upgrading to a version of setuptools " - "that supports PEP 660 (>= 64).", - self, - ) - install_editable_legacy( - global_options=global_options if global_options is not None else [], - prefix=prefix, - home=home, - use_user_site=use_user_site, - name=self.req.name, - setup_py_path=self.setup_py_path, - isolated=self.isolated, - build_env=self.build_env, - unpacked_source_directory=self.unpacked_source_directory, - ) - self.install_succeeded = True - return - - assert self.is_wheel - assert self.local_file_path - - install_wheel( - self.req.name, - self.local_file_path, - scheme=scheme, - req_description=str(self.req), - pycompile=pycompile, - warn_script_location=warn_script_location, - direct_url=self.download_info if self.is_direct else None, - requested=self.user_supplied, - ) - self.install_succeeded = True - - -def check_invalid_constraint_type(req: InstallRequirement) -> str: - # Check for unsupported forms - problem = "" - if not req.name: - problem = "Unnamed requirements are not allowed as constraints" - elif req.editable: - problem = "Editable requirements are not allowed as constraints" - elif req.extras: - problem = "Constraints cannot have extras" - - if problem: - deprecated( - reason=( - "Constraints are only allowed to take the form of a package " - "name and a version specifier. Other forms were originally " - "permitted as an accident of the implementation, but were " - "undocumented. The new implementation of the resolver no " - "longer supports these forms." - ), - replacement="replacing the constraint with a requirement", - # No plan yet for when the new resolver becomes default - gone_in=None, - issue=8210, - ) - - return problem - - -def _has_option(options: Values, reqs: List[InstallRequirement], option: str) -> bool: - if getattr(options, option, None): - return True - for req in reqs: - if getattr(req, option, None): - return True - return False - - -def check_legacy_setup_py_options( - options: Values, - reqs: List[InstallRequirement], -) -> None: - has_build_options = _has_option(options, reqs, "build_options") - has_global_options = _has_option(options, reqs, "global_options") - if has_build_options or has_global_options: - deprecated( - reason="--build-option and --global-option are deprecated.", - issue=11859, - replacement="to use --config-settings", - gone_in="24.2", - ) - logger.warning( - "Implying --no-binary=:all: due to the presence of " - "--build-option / --global-option. " - ) - options.format_control.disallow_binaries() diff --git a/venv/lib/python3.11/site-packages/pip/_internal/req/req_set.py b/venv/lib/python3.11/site-packages/pip/_internal/req/req_set.py deleted file mode 100644 index bf36114..0000000 --- a/venv/lib/python3.11/site-packages/pip/_internal/req/req_set.py +++ /dev/null @@ -1,119 +0,0 @@ -import logging -from collections import OrderedDict -from typing import Dict, List - -from pip._vendor.packaging.specifiers import LegacySpecifier -from pip._vendor.packaging.utils import canonicalize_name -from pip._vendor.packaging.version import LegacyVersion - -from pip._internal.req.req_install import InstallRequirement -from pip._internal.utils.deprecation import deprecated - -logger = logging.getLogger(__name__) - - -class RequirementSet: - def __init__(self, check_supported_wheels: bool = True) -> None: - """Create a RequirementSet.""" - - self.requirements: Dict[str, InstallRequirement] = OrderedDict() - self.check_supported_wheels = check_supported_wheels - - self.unnamed_requirements: List[InstallRequirement] = [] - - def __str__(self) -> str: - requirements = sorted( - (req for req in self.requirements.values() if not req.comes_from), - key=lambda req: canonicalize_name(req.name or ""), - ) - return " ".join(str(req.req) for req in requirements) - - def __repr__(self) -> str: - requirements = sorted( - self.requirements.values(), - key=lambda req: canonicalize_name(req.name or ""), - ) - - format_string = "<{classname} object; {count} requirement(s): {reqs}>" - return format_string.format( - classname=self.__class__.__name__, - count=len(requirements), - reqs=", ".join(str(req.req) for req in requirements), - ) - - def add_unnamed_requirement(self, install_req: InstallRequirement) -> None: - assert not install_req.name - self.unnamed_requirements.append(install_req) - - def add_named_requirement(self, install_req: InstallRequirement) -> None: - assert install_req.name - - project_name = canonicalize_name(install_req.name) - self.requirements[project_name] = install_req - - def has_requirement(self, name: str) -> bool: - project_name = canonicalize_name(name) - - return ( - project_name in self.requirements - and not self.requirements[project_name].constraint - ) - - def get_requirement(self, name: str) -> InstallRequirement: - project_name = canonicalize_name(name) - - if project_name in self.requirements: - return self.requirements[project_name] - - raise KeyError(f"No project with the name {name!r}") - - @property - def all_requirements(self) -> List[InstallRequirement]: - return self.unnamed_requirements + list(self.requirements.values()) - - @property - def requirements_to_install(self) -> List[InstallRequirement]: - """Return the list of requirements that need to be installed. - - TODO remove this property together with the legacy resolver, since the new - resolver only returns requirements that need to be installed. - """ - return [ - install_req - for install_req in self.all_requirements - if not install_req.constraint and not install_req.satisfied_by - ] - - def warn_legacy_versions_and_specifiers(self) -> None: - for req in self.requirements_to_install: - version = req.get_dist().version - if isinstance(version, LegacyVersion): - deprecated( - reason=( - f"pip has selected the non standard version {version} " - f"of {req}. In the future this version will be " - f"ignored as it isn't standard compliant." - ), - replacement=( - "set or update constraints to select another version " - "or contact the package author to fix the version number" - ), - issue=12063, - gone_in="24.1", - ) - for dep in req.get_dist().iter_dependencies(): - if any(isinstance(spec, LegacySpecifier) for spec in dep.specifier): - deprecated( - reason=( - f"pip has selected {req} {version} which has non " - f"standard dependency specifier {dep}. " - f"In the future this version of {req} will be " - f"ignored as it isn't standard compliant." - ), - replacement=( - "set or update constraints to select another version " - "or contact the package author to fix the version number" - ), - issue=12063, - gone_in="24.1", - ) diff --git a/venv/lib/python3.11/site-packages/pip/_internal/req/req_uninstall.py b/venv/lib/python3.11/site-packages/pip/_internal/req/req_uninstall.py deleted file mode 100644 index 707fde1..0000000 --- a/venv/lib/python3.11/site-packages/pip/_internal/req/req_uninstall.py +++ /dev/null @@ -1,649 +0,0 @@ -import functools -import os -import sys -import sysconfig -from importlib.util import cache_from_source -from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Set, Tuple - -from pip._internal.exceptions import UninstallationError -from pip._internal.locations import get_bin_prefix, get_bin_user -from pip._internal.metadata import BaseDistribution -from pip._internal.utils.compat import WINDOWS -from pip._internal.utils.egg_link import egg_link_path_from_location -from pip._internal.utils.logging import getLogger, indent_log -from pip._internal.utils.misc import ask, normalize_path, renames, rmtree -from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory -from pip._internal.utils.virtualenv import running_under_virtualenv - -logger = getLogger(__name__) - - -def _script_names( - bin_dir: str, script_name: str, is_gui: bool -) -> Generator[str, None, None]: - """Create the fully qualified name of the files created by - {console,gui}_scripts for the given ``dist``. - Returns the list of file names - """ - exe_name = os.path.join(bin_dir, script_name) - yield exe_name - if not WINDOWS: - return - yield f"{exe_name}.exe" - yield f"{exe_name}.exe.manifest" - if is_gui: - yield f"{exe_name}-script.pyw" - else: - yield f"{exe_name}-script.py" - - -def _unique( - fn: Callable[..., Generator[Any, None, None]] -) -> Callable[..., Generator[Any, None, None]]: - @functools.wraps(fn) - def unique(*args: Any, **kw: Any) -> Generator[Any, None, None]: - seen: Set[Any] = set() - for item in fn(*args, **kw): - if item not in seen: - seen.add(item) - yield item - - return unique - - -@_unique -def uninstallation_paths(dist: BaseDistribution) -> Generator[str, None, None]: - """ - Yield all the uninstallation paths for dist based on RECORD-without-.py[co] - - Yield paths to all the files in RECORD. For each .py file in RECORD, add - the .pyc and .pyo in the same directory. - - UninstallPathSet.add() takes care of the __pycache__ .py[co]. - - If RECORD is not found, raises UninstallationError, - with possible information from the INSTALLER file. - - https://packaging.python.org/specifications/recording-installed-packages/ - """ - location = dist.location - assert location is not None, "not installed" - - entries = dist.iter_declared_entries() - if entries is None: - msg = f"Cannot uninstall {dist}, RECORD file not found." - installer = dist.installer - if not installer or installer == "pip": - dep = f"{dist.raw_name}=={dist.version}" - msg += ( - " You might be able to recover from this via: " - f"'pip install --force-reinstall --no-deps {dep}'." - ) - else: - msg += f" Hint: The package was installed by {installer}." - raise UninstallationError(msg) - - for entry in entries: - path = os.path.join(location, entry) - yield path - if path.endswith(".py"): - dn, fn = os.path.split(path) - base = fn[:-3] - path = os.path.join(dn, base + ".pyc") - yield path - path = os.path.join(dn, base + ".pyo") - yield path - - -def compact(paths: Iterable[str]) -> Set[str]: - """Compact a path set to contain the minimal number of paths - necessary to contain all paths in the set. If /a/path/ and - /a/path/to/a/file.txt are both in the set, leave only the - shorter path.""" - - sep = os.path.sep - short_paths: Set[str] = set() - for path in sorted(paths, key=len): - should_skip = any( - path.startswith(shortpath.rstrip("*")) - and path[len(shortpath.rstrip("*").rstrip(sep))] == sep - for shortpath in short_paths - ) - if not should_skip: - short_paths.add(path) - return short_paths - - -def compress_for_rename(paths: Iterable[str]) -> Set[str]: - """Returns a set containing the paths that need to be renamed. - - This set may include directories when the original sequence of paths - included every file on disk. - """ - case_map = {os.path.normcase(p): p for p in paths} - remaining = set(case_map) - unchecked = sorted({os.path.split(p)[0] for p in case_map.values()}, key=len) - wildcards: Set[str] = set() - - def norm_join(*a: str) -> str: - return os.path.normcase(os.path.join(*a)) - - for root in unchecked: - if any(os.path.normcase(root).startswith(w) for w in wildcards): - # This directory has already been handled. - continue - - all_files: Set[str] = set() - all_subdirs: Set[str] = set() - for dirname, subdirs, files in os.walk(root): - all_subdirs.update(norm_join(root, dirname, d) for d in subdirs) - all_files.update(norm_join(root, dirname, f) for f in files) - # If all the files we found are in our remaining set of files to - # remove, then remove them from the latter set and add a wildcard - # for the directory. - if not (all_files - remaining): - remaining.difference_update(all_files) - wildcards.add(root + os.sep) - - return set(map(case_map.__getitem__, remaining)) | wildcards - - -def compress_for_output_listing(paths: Iterable[str]) -> Tuple[Set[str], Set[str]]: - """Returns a tuple of 2 sets of which paths to display to user - - The first set contains paths that would be deleted. Files of a package - are not added and the top-level directory of the package has a '*' added - at the end - to signify that all it's contents are removed. - - The second set contains files that would have been skipped in the above - folders. - """ - - will_remove = set(paths) - will_skip = set() - - # Determine folders and files - folders = set() - files = set() - for path in will_remove: - if path.endswith(".pyc"): - continue - if path.endswith("__init__.py") or ".dist-info" in path: - folders.add(os.path.dirname(path)) - files.add(path) - - _normcased_files = set(map(os.path.normcase, files)) - - folders = compact(folders) - - # This walks the tree using os.walk to not miss extra folders - # that might get added. - for folder in folders: - for dirpath, _, dirfiles in os.walk(folder): - for fname in dirfiles: - if fname.endswith(".pyc"): - continue - - file_ = os.path.join(dirpath, fname) - if ( - os.path.isfile(file_) - and os.path.normcase(file_) not in _normcased_files - ): - # We are skipping this file. Add it to the set. - will_skip.add(file_) - - will_remove = files | {os.path.join(folder, "*") for folder in folders} - - return will_remove, will_skip - - -class StashedUninstallPathSet: - """A set of file rename operations to stash files while - tentatively uninstalling them.""" - - def __init__(self) -> None: - # Mapping from source file root to [Adjacent]TempDirectory - # for files under that directory. - self._save_dirs: Dict[str, TempDirectory] = {} - # (old path, new path) tuples for each move that may need - # to be undone. - self._moves: List[Tuple[str, str]] = [] - - def _get_directory_stash(self, path: str) -> str: - """Stashes a directory. - - Directories are stashed adjacent to their original location if - possible, or else moved/copied into the user's temp dir.""" - - try: - save_dir: TempDirectory = AdjacentTempDirectory(path) - except OSError: - save_dir = TempDirectory(kind="uninstall") - self._save_dirs[os.path.normcase(path)] = save_dir - - return save_dir.path - - def _get_file_stash(self, path: str) -> str: - """Stashes a file. - - If no root has been provided, one will be created for the directory - in the user's temp directory.""" - path = os.path.normcase(path) - head, old_head = os.path.dirname(path), None - save_dir = None - - while head != old_head: - try: - save_dir = self._save_dirs[head] - break - except KeyError: - pass - head, old_head = os.path.dirname(head), head - else: - # Did not find any suitable root - head = os.path.dirname(path) - save_dir = TempDirectory(kind="uninstall") - self._save_dirs[head] = save_dir - - relpath = os.path.relpath(path, head) - if relpath and relpath != os.path.curdir: - return os.path.join(save_dir.path, relpath) - return save_dir.path - - def stash(self, path: str) -> str: - """Stashes the directory or file and returns its new location. - Handle symlinks as files to avoid modifying the symlink targets. - """ - path_is_dir = os.path.isdir(path) and not os.path.islink(path) - if path_is_dir: - new_path = self._get_directory_stash(path) - else: - new_path = self._get_file_stash(path) - - self._moves.append((path, new_path)) - if path_is_dir and os.path.isdir(new_path): - # If we're moving a directory, we need to - # remove the destination first or else it will be - # moved to inside the existing directory. - # We just created new_path ourselves, so it will - # be removable. - os.rmdir(new_path) - renames(path, new_path) - return new_path - - def commit(self) -> None: - """Commits the uninstall by removing stashed files.""" - for save_dir in self._save_dirs.values(): - save_dir.cleanup() - self._moves = [] - self._save_dirs = {} - - def rollback(self) -> None: - """Undoes the uninstall by moving stashed files back.""" - for p in self._moves: - logger.info("Moving to %s\n from %s", *p) - - for new_path, path in self._moves: - try: - logger.debug("Replacing %s from %s", new_path, path) - if os.path.isfile(new_path) or os.path.islink(new_path): - os.unlink(new_path) - elif os.path.isdir(new_path): - rmtree(new_path) - renames(path, new_path) - except OSError as ex: - logger.error("Failed to restore %s", new_path) - logger.debug("Exception: %s", ex) - - self.commit() - - @property - def can_rollback(self) -> bool: - return bool(self._moves) - - -class UninstallPathSet: - """A set of file paths to be removed in the uninstallation of a - requirement.""" - - def __init__(self, dist: BaseDistribution) -> None: - self._paths: Set[str] = set() - self._refuse: Set[str] = set() - self._pth: Dict[str, UninstallPthEntries] = {} - self._dist = dist - self._moved_paths = StashedUninstallPathSet() - # Create local cache of normalize_path results. Creating an UninstallPathSet - # can result in hundreds/thousands of redundant calls to normalize_path with - # the same args, which hurts performance. - self._normalize_path_cached = functools.lru_cache()(normalize_path) - - def _permitted(self, path: str) -> bool: - """ - Return True if the given path is one we are permitted to - remove/modify, False otherwise. - - """ - # aka is_local, but caching normalized sys.prefix - if not running_under_virtualenv(): - return True - return path.startswith(self._normalize_path_cached(sys.prefix)) - - def add(self, path: str) -> None: - head, tail = os.path.split(path) - - # we normalize the head to resolve parent directory symlinks, but not - # the tail, since we only want to uninstall symlinks, not their targets - path = os.path.join(self._normalize_path_cached(head), os.path.normcase(tail)) - - if not os.path.exists(path): - return - if self._permitted(path): - self._paths.add(path) - else: - self._refuse.add(path) - - # __pycache__ files can show up after 'installed-files.txt' is created, - # due to imports - if os.path.splitext(path)[1] == ".py": - self.add(cache_from_source(path)) - - def add_pth(self, pth_file: str, entry: str) -> None: - pth_file = self._normalize_path_cached(pth_file) - if self._permitted(pth_file): - if pth_file not in self._pth: - self._pth[pth_file] = UninstallPthEntries(pth_file) - self._pth[pth_file].add(entry) - else: - self._refuse.add(pth_file) - - def remove(self, auto_confirm: bool = False, verbose: bool = False) -> None: - """Remove paths in ``self._paths`` with confirmation (unless - ``auto_confirm`` is True).""" - - if not self._paths: - logger.info( - "Can't uninstall '%s'. No files were found to uninstall.", - self._dist.raw_name, - ) - return - - dist_name_version = f"{self._dist.raw_name}-{self._dist.version}" - logger.info("Uninstalling %s:", dist_name_version) - - with indent_log(): - if auto_confirm or self._allowed_to_proceed(verbose): - moved = self._moved_paths - - for_rename = compress_for_rename(self._paths) - - for path in sorted(compact(for_rename)): - moved.stash(path) - logger.verbose("Removing file or directory %s", path) - - for pth in self._pth.values(): - pth.remove() - - logger.info("Successfully uninstalled %s", dist_name_version) - - def _allowed_to_proceed(self, verbose: bool) -> bool: - """Display which files would be deleted and prompt for confirmation""" - - def _display(msg: str, paths: Iterable[str]) -> None: - if not paths: - return - - logger.info(msg) - with indent_log(): - for path in sorted(compact(paths)): - logger.info(path) - - if not verbose: - will_remove, will_skip = compress_for_output_listing(self._paths) - else: - # In verbose mode, display all the files that are going to be - # deleted. - will_remove = set(self._paths) - will_skip = set() - - _display("Would remove:", will_remove) - _display("Would not remove (might be manually added):", will_skip) - _display("Would not remove (outside of prefix):", self._refuse) - if verbose: - _display("Will actually move:", compress_for_rename(self._paths)) - - return ask("Proceed (Y/n)? ", ("y", "n", "")) != "n" - - def rollback(self) -> None: - """Rollback the changes previously made by remove().""" - if not self._moved_paths.can_rollback: - logger.error( - "Can't roll back %s; was not uninstalled", - self._dist.raw_name, - ) - return - logger.info("Rolling back uninstall of %s", self._dist.raw_name) - self._moved_paths.rollback() - for pth in self._pth.values(): - pth.rollback() - - def commit(self) -> None: - """Remove temporary save dir: rollback will no longer be possible.""" - self._moved_paths.commit() - - @classmethod - def from_dist(cls, dist: BaseDistribution) -> "UninstallPathSet": - dist_location = dist.location - info_location = dist.info_location - if dist_location is None: - logger.info( - "Not uninstalling %s since it is not installed", - dist.canonical_name, - ) - return cls(dist) - - normalized_dist_location = normalize_path(dist_location) - if not dist.local: - logger.info( - "Not uninstalling %s at %s, outside environment %s", - dist.canonical_name, - normalized_dist_location, - sys.prefix, - ) - return cls(dist) - - if normalized_dist_location in { - p - for p in {sysconfig.get_path("stdlib"), sysconfig.get_path("platstdlib")} - if p - }: - logger.info( - "Not uninstalling %s at %s, as it is in the standard library.", - dist.canonical_name, - normalized_dist_location, - ) - return cls(dist) - - paths_to_remove = cls(dist) - develop_egg_link = egg_link_path_from_location(dist.raw_name) - - # Distribution is installed with metadata in a "flat" .egg-info - # directory. This means it is not a modern .dist-info installation, an - # egg, or legacy editable. - setuptools_flat_installation = ( - dist.installed_with_setuptools_egg_info - and info_location is not None - and os.path.exists(info_location) - # If dist is editable and the location points to a ``.egg-info``, - # we are in fact in the legacy editable case. - and not info_location.endswith(f"{dist.setuptools_filename}.egg-info") - ) - - # Uninstall cases order do matter as in the case of 2 installs of the - # same package, pip needs to uninstall the currently detected version - if setuptools_flat_installation: - if info_location is not None: - paths_to_remove.add(info_location) - installed_files = dist.iter_declared_entries() - if installed_files is not None: - for installed_file in installed_files: - paths_to_remove.add(os.path.join(dist_location, installed_file)) - # FIXME: need a test for this elif block - # occurs with --single-version-externally-managed/--record outside - # of pip - elif dist.is_file("top_level.txt"): - try: - namespace_packages = dist.read_text("namespace_packages.txt") - except FileNotFoundError: - namespaces = [] - else: - namespaces = namespace_packages.splitlines(keepends=False) - for top_level_pkg in [ - p - for p in dist.read_text("top_level.txt").splitlines() - if p and p not in namespaces - ]: - path = os.path.join(dist_location, top_level_pkg) - paths_to_remove.add(path) - paths_to_remove.add(f"{path}.py") - paths_to_remove.add(f"{path}.pyc") - paths_to_remove.add(f"{path}.pyo") - - elif dist.installed_by_distutils: - raise UninstallationError( - "Cannot uninstall {!r}. It is a distutils installed project " - "and thus we cannot accurately determine which files belong " - "to it which would lead to only a partial uninstall.".format( - dist.raw_name, - ) - ) - - elif dist.installed_as_egg: - # package installed by easy_install - # We cannot match on dist.egg_name because it can slightly vary - # i.e. setuptools-0.6c11-py2.6.egg vs setuptools-0.6rc11-py2.6.egg - paths_to_remove.add(dist_location) - easy_install_egg = os.path.split(dist_location)[1] - easy_install_pth = os.path.join( - os.path.dirname(dist_location), - "easy-install.pth", - ) - paths_to_remove.add_pth(easy_install_pth, "./" + easy_install_egg) - - elif dist.installed_with_dist_info: - for path in uninstallation_paths(dist): - paths_to_remove.add(path) - - elif develop_egg_link: - # PEP 660 modern editable is handled in the ``.dist-info`` case - # above, so this only covers the setuptools-style editable. - with open(develop_egg_link) as fh: - link_pointer = os.path.normcase(fh.readline().strip()) - normalized_link_pointer = paths_to_remove._normalize_path_cached( - link_pointer - ) - assert os.path.samefile( - normalized_link_pointer, normalized_dist_location - ), ( - f"Egg-link {develop_egg_link} (to {link_pointer}) does not match " - f"installed location of {dist.raw_name} (at {dist_location})" - ) - paths_to_remove.add(develop_egg_link) - easy_install_pth = os.path.join( - os.path.dirname(develop_egg_link), "easy-install.pth" - ) - paths_to_remove.add_pth(easy_install_pth, dist_location) - - else: - logger.debug( - "Not sure how to uninstall: %s - Check: %s", - dist, - dist_location, - ) - - if dist.in_usersite: - bin_dir = get_bin_user() - else: - bin_dir = get_bin_prefix() - - # find distutils scripts= scripts - try: - for script in dist.iter_distutils_script_names(): - paths_to_remove.add(os.path.join(bin_dir, script)) - if WINDOWS: - paths_to_remove.add(os.path.join(bin_dir, f"{script}.bat")) - except (FileNotFoundError, NotADirectoryError): - pass - - # find console_scripts and gui_scripts - def iter_scripts_to_remove( - dist: BaseDistribution, - bin_dir: str, - ) -> Generator[str, None, None]: - for entry_point in dist.iter_entry_points(): - if entry_point.group == "console_scripts": - yield from _script_names(bin_dir, entry_point.name, False) - elif entry_point.group == "gui_scripts": - yield from _script_names(bin_dir, entry_point.name, True) - - for s in iter_scripts_to_remove(dist, bin_dir): - paths_to_remove.add(s) - - return paths_to_remove - - -class UninstallPthEntries: - def __init__(self, pth_file: str) -> None: - self.file = pth_file - self.entries: Set[str] = set() - self._saved_lines: Optional[List[bytes]] = None - - def add(self, entry: str) -> None: - entry = os.path.normcase(entry) - # On Windows, os.path.normcase converts the entry to use - # backslashes. This is correct for entries that describe absolute - # paths outside of site-packages, but all the others use forward - # slashes. - # os.path.splitdrive is used instead of os.path.isabs because isabs - # treats non-absolute paths with drive letter markings like c:foo\bar - # as absolute paths. It also does not recognize UNC paths if they don't - # have more than "\\sever\share". Valid examples: "\\server\share\" or - # "\\server\share\folder". - if WINDOWS and not os.path.splitdrive(entry)[0]: - entry = entry.replace("\\", "/") - self.entries.add(entry) - - def remove(self) -> None: - logger.verbose("Removing pth entries from %s:", self.file) - - # If the file doesn't exist, log a warning and return - if not os.path.isfile(self.file): - logger.warning("Cannot remove entries from nonexistent file %s", self.file) - return - with open(self.file, "rb") as fh: - # windows uses '\r\n' with py3k, but uses '\n' with py2.x - lines = fh.readlines() - self._saved_lines = lines - if any(b"\r\n" in line for line in lines): - endline = "\r\n" - else: - endline = "\n" - # handle missing trailing newline - if lines and not lines[-1].endswith(endline.encode("utf-8")): - lines[-1] = lines[-1] + endline.encode("utf-8") - for entry in self.entries: - try: - logger.verbose("Removing entry: %s", entry) - lines.remove((entry + endline).encode("utf-8")) - except ValueError: - pass - with open(self.file, "wb") as fh: - fh.writelines(lines) - - def rollback(self) -> bool: - if self._saved_lines is None: - logger.error("Cannot roll back changes to %s, none were made", self.file) - return False - logger.debug("Rolling %s back to previous state", self.file) - with open(self.file, "wb") as fh: - fh.writelines(self._saved_lines) - return True |