summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/pip/_internal/index
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/pip/_internal/index')
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/index/__init__.py2
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/__init__.cpython-311.pycbin0 -> 248 bytes
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/collector.cpython-311.pycbin0 -> 24593 bytes
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/package_finder.cpython-311.pycbin0 -> 44157 bytes
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/sources.cpython-311.pycbin0 -> 13964 bytes
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/index/collector.py507
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/index/package_finder.py1027
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/index/sources.py285
8 files changed, 1821 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/index/__init__.py b/venv/lib/python3.11/site-packages/pip/_internal/index/__init__.py
new file mode 100644
index 0000000..7a17b7b
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/index/__init__.py
@@ -0,0 +1,2 @@
+"""Index interaction code
+"""
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/__init__.cpython-311.pyc
new file mode 100644
index 0000000..446ac36
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/__init__.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/collector.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/collector.cpython-311.pyc
new file mode 100644
index 0000000..a1dc4c1
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/collector.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/package_finder.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/package_finder.cpython-311.pyc
new file mode 100644
index 0000000..8379671
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/package_finder.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/sources.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/sources.cpython-311.pyc
new file mode 100644
index 0000000..e7e9b99
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/sources.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/index/collector.py b/venv/lib/python3.11/site-packages/pip/_internal/index/collector.py
new file mode 100644
index 0000000..08c8bdd
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/index/collector.py
@@ -0,0 +1,507 @@
+"""
+The main purpose of this module is to expose LinkCollector.collect_sources().
+"""
+
+import collections
+import email.message
+import functools
+import itertools
+import json
+import logging
+import os
+import urllib.parse
+import urllib.request
+from html.parser import HTMLParser
+from optparse import Values
+from typing import (
+ TYPE_CHECKING,
+ Callable,
+ Dict,
+ Iterable,
+ List,
+ MutableMapping,
+ NamedTuple,
+ Optional,
+ Sequence,
+ Tuple,
+ Union,
+)
+
+from pip._vendor import requests
+from pip._vendor.requests import Response
+from pip._vendor.requests.exceptions import RetryError, SSLError
+
+from pip._internal.exceptions import NetworkConnectionError
+from pip._internal.models.link import Link
+from pip._internal.models.search_scope import SearchScope
+from pip._internal.network.session import PipSession
+from pip._internal.network.utils import raise_for_status
+from pip._internal.utils.filetypes import is_archive_file
+from pip._internal.utils.misc import redact_auth_from_url
+from pip._internal.vcs import vcs
+
+from .sources import CandidatesFromPage, LinkSource, build_source
+
+if TYPE_CHECKING:
+ from typing import Protocol
+else:
+ Protocol = object
+
+logger = logging.getLogger(__name__)
+
+ResponseHeaders = MutableMapping[str, str]
+
+
+def _match_vcs_scheme(url: str) -> Optional[str]:
+ """Look for VCS schemes in the URL.
+
+ Returns the matched VCS scheme, or None if there's no match.
+ """
+ for scheme in vcs.schemes:
+ if url.lower().startswith(scheme) and url[len(scheme)] in "+:":
+ return scheme
+ return None
+
+
+class _NotAPIContent(Exception):
+ def __init__(self, content_type: str, request_desc: str) -> None:
+ super().__init__(content_type, request_desc)
+ self.content_type = content_type
+ self.request_desc = request_desc
+
+
+def _ensure_api_header(response: Response) -> None:
+ """
+ Check the Content-Type header to ensure the response contains a Simple
+ API Response.
+
+ Raises `_NotAPIContent` if the content type is not a valid content-type.
+ """
+ content_type = response.headers.get("Content-Type", "Unknown")
+
+ content_type_l = content_type.lower()
+ if content_type_l.startswith(
+ (
+ "text/html",
+ "application/vnd.pypi.simple.v1+html",
+ "application/vnd.pypi.simple.v1+json",
+ )
+ ):
+ return
+
+ raise _NotAPIContent(content_type, response.request.method)
+
+
+class _NotHTTP(Exception):
+ pass
+
+
+def _ensure_api_response(url: str, session: PipSession) -> None:
+ """
+ Send a HEAD request to the URL, and ensure the response contains a simple
+ API Response.
+
+ Raises `_NotHTTP` if the URL is not available for a HEAD request, or
+ `_NotAPIContent` if the content type is not a valid content type.
+ """
+ scheme, netloc, path, query, fragment = urllib.parse.urlsplit(url)
+ if scheme not in {"http", "https"}:
+ raise _NotHTTP()
+
+ resp = session.head(url, allow_redirects=True)
+ raise_for_status(resp)
+
+ _ensure_api_header(resp)
+
+
+def _get_simple_response(url: str, session: PipSession) -> Response:
+ """Access an Simple API response with GET, and return the response.
+
+ This consists of three parts:
+
+ 1. If the URL looks suspiciously like an archive, send a HEAD first to
+ check the Content-Type is HTML or Simple API, to avoid downloading a
+ large file. Raise `_NotHTTP` if the content type cannot be determined, or
+ `_NotAPIContent` if it is not HTML or a Simple API.
+ 2. Actually perform the request. Raise HTTP exceptions on network failures.
+ 3. Check the Content-Type header to make sure we got a Simple API response,
+ and raise `_NotAPIContent` otherwise.
+ """
+ if is_archive_file(Link(url).filename):
+ _ensure_api_response(url, session=session)
+
+ logger.debug("Getting page %s", redact_auth_from_url(url))
+
+ resp = session.get(
+ url,
+ headers={
+ "Accept": ", ".join(
+ [
+ "application/vnd.pypi.simple.v1+json",
+ "application/vnd.pypi.simple.v1+html; q=0.1",
+ "text/html; q=0.01",
+ ]
+ ),
+ # We don't want to blindly returned cached data for
+ # /simple/, because authors generally expecting that
+ # twine upload && pip install will function, but if
+ # they've done a pip install in the last ~10 minutes
+ # it won't. Thus by setting this to zero we will not
+ # blindly use any cached data, however the benefit of
+ # using max-age=0 instead of no-cache, is that we will
+ # still support conditional requests, so we will still
+ # minimize traffic sent in cases where the page hasn't
+ # changed at all, we will just always incur the round
+ # trip for the conditional GET now instead of only
+ # once per 10 minutes.
+ # For more information, please see pypa/pip#5670.
+ "Cache-Control": "max-age=0",
+ },
+ )
+ raise_for_status(resp)
+
+ # The check for archives above only works if the url ends with
+ # something that looks like an archive. However that is not a
+ # requirement of an url. Unless we issue a HEAD request on every
+ # url we cannot know ahead of time for sure if something is a
+ # Simple API response or not. However we can check after we've
+ # downloaded it.
+ _ensure_api_header(resp)
+
+ logger.debug(
+ "Fetched page %s as %s",
+ redact_auth_from_url(url),
+ resp.headers.get("Content-Type", "Unknown"),
+ )
+
+ return resp
+
+
+def _get_encoding_from_headers(headers: ResponseHeaders) -> Optional[str]:
+ """Determine if we have any encoding information in our headers."""
+ if headers and "Content-Type" in headers:
+ m = email.message.Message()
+ m["content-type"] = headers["Content-Type"]
+ charset = m.get_param("charset")
+ if charset:
+ return str(charset)
+ return None
+
+
+class CacheablePageContent:
+ def __init__(self, page: "IndexContent") -> None:
+ assert page.cache_link_parsing
+ self.page = page
+
+ def __eq__(self, other: object) -> bool:
+ return isinstance(other, type(self)) and self.page.url == other.page.url
+
+ def __hash__(self) -> int:
+ return hash(self.page.url)
+
+
+class ParseLinks(Protocol):
+ def __call__(self, page: "IndexContent") -> Iterable[Link]:
+ ...
+
+
+def with_cached_index_content(fn: ParseLinks) -> ParseLinks:
+ """
+ Given a function that parses an Iterable[Link] from an IndexContent, cache the
+ function's result (keyed by CacheablePageContent), unless the IndexContent
+ `page` has `page.cache_link_parsing == False`.
+ """
+
+ @functools.lru_cache(maxsize=None)
+ def wrapper(cacheable_page: CacheablePageContent) -> List[Link]:
+ return list(fn(cacheable_page.page))
+
+ @functools.wraps(fn)
+ def wrapper_wrapper(page: "IndexContent") -> List[Link]:
+ if page.cache_link_parsing:
+ return wrapper(CacheablePageContent(page))
+ return list(fn(page))
+
+ return wrapper_wrapper
+
+
+@with_cached_index_content
+def parse_links(page: "IndexContent") -> Iterable[Link]:
+ """
+ Parse a Simple API's Index Content, and yield its anchor elements as Link objects.
+ """
+
+ content_type_l = page.content_type.lower()
+ if content_type_l.startswith("application/vnd.pypi.simple.v1+json"):
+ data = json.loads(page.content)
+ for file in data.get("files", []):
+ link = Link.from_json(file, page.url)
+ if link is None:
+ continue
+ yield link
+ return
+
+ parser = HTMLLinkParser(page.url)
+ encoding = page.encoding or "utf-8"
+ parser.feed(page.content.decode(encoding))
+
+ url = page.url
+ base_url = parser.base_url or url
+ for anchor in parser.anchors:
+ link = Link.from_element(anchor, page_url=url, base_url=base_url)
+ if link is None:
+ continue
+ yield link
+
+
+class IndexContent:
+ """Represents one response (or page), along with its URL"""
+
+ def __init__(
+ self,
+ content: bytes,
+ content_type: str,
+ encoding: Optional[str],
+ url: str,
+ cache_link_parsing: bool = True,
+ ) -> None:
+ """
+ :param encoding: the encoding to decode the given content.
+ :param url: the URL from which the HTML was downloaded.
+ :param cache_link_parsing: whether links parsed from this page's url
+ should be cached. PyPI index urls should
+ have this set to False, for example.
+ """
+ self.content = content
+ self.content_type = content_type
+ self.encoding = encoding
+ self.url = url
+ self.cache_link_parsing = cache_link_parsing
+
+ def __str__(self) -> str:
+ return redact_auth_from_url(self.url)
+
+
+class HTMLLinkParser(HTMLParser):
+ """
+ HTMLParser that keeps the first base HREF and a list of all anchor
+ elements' attributes.
+ """
+
+ def __init__(self, url: str) -> None:
+ super().__init__(convert_charrefs=True)
+
+ self.url: str = url
+ self.base_url: Optional[str] = None
+ self.anchors: List[Dict[str, Optional[str]]] = []
+
+ def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None:
+ if tag == "base" and self.base_url is None:
+ href = self.get_href(attrs)
+ if href is not None:
+ self.base_url = href
+ elif tag == "a":
+ self.anchors.append(dict(attrs))
+
+ def get_href(self, attrs: List[Tuple[str, Optional[str]]]) -> Optional[str]:
+ for name, value in attrs:
+ if name == "href":
+ return value
+ return None
+
+
+def _handle_get_simple_fail(
+ link: Link,
+ reason: Union[str, Exception],
+ meth: Optional[Callable[..., None]] = None,
+) -> None:
+ if meth is None:
+ meth = logger.debug
+ meth("Could not fetch URL %s: %s - skipping", link, reason)
+
+
+def _make_index_content(
+ response: Response, cache_link_parsing: bool = True
+) -> IndexContent:
+ encoding = _get_encoding_from_headers(response.headers)
+ return IndexContent(
+ response.content,
+ response.headers["Content-Type"],
+ encoding=encoding,
+ url=response.url,
+ cache_link_parsing=cache_link_parsing,
+ )
+
+
+def _get_index_content(link: Link, *, session: PipSession) -> Optional["IndexContent"]:
+ url = link.url.split("#", 1)[0]
+
+ # Check for VCS schemes that do not support lookup as web pages.
+ vcs_scheme = _match_vcs_scheme(url)
+ if vcs_scheme:
+ logger.warning(
+ "Cannot look at %s URL %s because it does not support lookup as web pages.",
+ vcs_scheme,
+ link,
+ )
+ return None
+
+ # Tack index.html onto file:// URLs that point to directories
+ scheme, _, path, _, _, _ = urllib.parse.urlparse(url)
+ if scheme == "file" and os.path.isdir(urllib.request.url2pathname(path)):
+ # add trailing slash if not present so urljoin doesn't trim
+ # final segment
+ if not url.endswith("/"):
+ url += "/"
+ # TODO: In the future, it would be nice if pip supported PEP 691
+ # style responses in the file:// URLs, however there's no
+ # standard file extension for application/vnd.pypi.simple.v1+json
+ # so we'll need to come up with something on our own.
+ url = urllib.parse.urljoin(url, "index.html")
+ logger.debug(" file: URL is directory, getting %s", url)
+
+ try:
+ resp = _get_simple_response(url, session=session)
+ except _NotHTTP:
+ logger.warning(
+ "Skipping page %s because it looks like an archive, and cannot "
+ "be checked by a HTTP HEAD request.",
+ link,
+ )
+ except _NotAPIContent as exc:
+ logger.warning(
+ "Skipping page %s because the %s request got Content-Type: %s. "
+ "The only supported Content-Types are application/vnd.pypi.simple.v1+json, "
+ "application/vnd.pypi.simple.v1+html, and text/html",
+ link,
+ exc.request_desc,
+ exc.content_type,
+ )
+ except NetworkConnectionError as exc:
+ _handle_get_simple_fail(link, exc)
+ except RetryError as exc:
+ _handle_get_simple_fail(link, exc)
+ except SSLError as exc:
+ reason = "There was a problem confirming the ssl certificate: "
+ reason += str(exc)
+ _handle_get_simple_fail(link, reason, meth=logger.info)
+ except requests.ConnectionError as exc:
+ _handle_get_simple_fail(link, f"connection error: {exc}")
+ except requests.Timeout:
+ _handle_get_simple_fail(link, "timed out")
+ else:
+ return _make_index_content(resp, cache_link_parsing=link.cache_link_parsing)
+ return None
+
+
+class CollectedSources(NamedTuple):
+ find_links: Sequence[Optional[LinkSource]]
+ index_urls: Sequence[Optional[LinkSource]]
+
+
+class LinkCollector:
+
+ """
+ Responsible for collecting Link objects from all configured locations,
+ making network requests as needed.
+
+ The class's main method is its collect_sources() method.
+ """
+
+ def __init__(
+ self,
+ session: PipSession,
+ search_scope: SearchScope,
+ ) -> None:
+ self.search_scope = search_scope
+ self.session = session
+
+ @classmethod
+ def create(
+ cls,
+ session: PipSession,
+ options: Values,
+ suppress_no_index: bool = False,
+ ) -> "LinkCollector":
+ """
+ :param session: The Session to use to make requests.
+ :param suppress_no_index: Whether to ignore the --no-index option
+ when constructing the SearchScope object.
+ """
+ index_urls = [options.index_url] + options.extra_index_urls
+ if options.no_index and not suppress_no_index:
+ logger.debug(
+ "Ignoring indexes: %s",
+ ",".join(redact_auth_from_url(url) for url in index_urls),
+ )
+ index_urls = []
+
+ # Make sure find_links is a list before passing to create().
+ find_links = options.find_links or []
+
+ search_scope = SearchScope.create(
+ find_links=find_links,
+ index_urls=index_urls,
+ no_index=options.no_index,
+ )
+ link_collector = LinkCollector(
+ session=session,
+ search_scope=search_scope,
+ )
+ return link_collector
+
+ @property
+ def find_links(self) -> List[str]:
+ return self.search_scope.find_links
+
+ def fetch_response(self, location: Link) -> Optional[IndexContent]:
+ """
+ Fetch an HTML page containing package links.
+ """
+ return _get_index_content(location, session=self.session)
+
+ def collect_sources(
+ self,
+ project_name: str,
+ candidates_from_page: CandidatesFromPage,
+ ) -> CollectedSources:
+ # The OrderedDict calls deduplicate sources by URL.
+ index_url_sources = collections.OrderedDict(
+ build_source(
+ loc,
+ candidates_from_page=candidates_from_page,
+ page_validator=self.session.is_secure_origin,
+ expand_dir=False,
+ cache_link_parsing=False,
+ project_name=project_name,
+ )
+ for loc in self.search_scope.get_index_urls_locations(project_name)
+ ).values()
+ find_links_sources = collections.OrderedDict(
+ build_source(
+ loc,
+ candidates_from_page=candidates_from_page,
+ page_validator=self.session.is_secure_origin,
+ expand_dir=True,
+ cache_link_parsing=True,
+ project_name=project_name,
+ )
+ for loc in self.find_links
+ ).values()
+
+ if logger.isEnabledFor(logging.DEBUG):
+ lines = [
+ f"* {s.link}"
+ for s in itertools.chain(find_links_sources, index_url_sources)
+ if s is not None and s.link is not None
+ ]
+ lines = [
+ f"{len(lines)} location(s) to search "
+ f"for versions of {project_name}:"
+ ] + lines
+ logger.debug("\n".join(lines))
+
+ return CollectedSources(
+ find_links=list(find_links_sources),
+ index_urls=list(index_url_sources),
+ )
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/index/package_finder.py b/venv/lib/python3.11/site-packages/pip/_internal/index/package_finder.py
new file mode 100644
index 0000000..ec9ebc3
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/index/package_finder.py
@@ -0,0 +1,1027 @@
+"""Routines related to PyPI, indexes"""
+
+import enum
+import functools
+import itertools
+import logging
+import re
+from typing import TYPE_CHECKING, FrozenSet, Iterable, List, Optional, Set, Tuple, Union
+
+from pip._vendor.packaging import specifiers
+from pip._vendor.packaging.tags import Tag
+from pip._vendor.packaging.utils import canonicalize_name
+from pip._vendor.packaging.version import _BaseVersion
+from pip._vendor.packaging.version import parse as parse_version
+
+from pip._internal.exceptions import (
+ BestVersionAlreadyInstalled,
+ DistributionNotFound,
+ InvalidWheelFilename,
+ UnsupportedWheel,
+)
+from pip._internal.index.collector import LinkCollector, parse_links
+from pip._internal.models.candidate import InstallationCandidate
+from pip._internal.models.format_control import FormatControl
+from pip._internal.models.link import Link
+from pip._internal.models.search_scope import SearchScope
+from pip._internal.models.selection_prefs import SelectionPreferences
+from pip._internal.models.target_python import TargetPython
+from pip._internal.models.wheel import Wheel
+from pip._internal.req import InstallRequirement
+from pip._internal.utils._log import getLogger
+from pip._internal.utils.filetypes import WHEEL_EXTENSION
+from pip._internal.utils.hashes import Hashes
+from pip._internal.utils.logging import indent_log
+from pip._internal.utils.misc import build_netloc
+from pip._internal.utils.packaging import check_requires_python
+from pip._internal.utils.unpacking import SUPPORTED_EXTENSIONS
+
+if TYPE_CHECKING:
+ from pip._vendor.typing_extensions import TypeGuard
+
+__all__ = ["FormatControl", "BestCandidateResult", "PackageFinder"]
+
+
+logger = getLogger(__name__)
+
+BuildTag = Union[Tuple[()], Tuple[int, str]]
+CandidateSortingKey = Tuple[int, int, int, _BaseVersion, Optional[int], BuildTag]
+
+
+def _check_link_requires_python(
+ link: Link,
+ version_info: Tuple[int, int, int],
+ ignore_requires_python: bool = False,
+) -> bool:
+ """
+ Return whether the given Python version is compatible with a link's
+ "Requires-Python" value.
+
+ :param version_info: A 3-tuple of ints representing the Python
+ major-minor-micro version to check.
+ :param ignore_requires_python: Whether to ignore the "Requires-Python"
+ value if the given Python version isn't compatible.
+ """
+ try:
+ is_compatible = check_requires_python(
+ link.requires_python,
+ version_info=version_info,
+ )
+ except specifiers.InvalidSpecifier:
+ logger.debug(
+ "Ignoring invalid Requires-Python (%r) for link: %s",
+ link.requires_python,
+ link,
+ )
+ else:
+ if not is_compatible:
+ version = ".".join(map(str, version_info))
+ if not ignore_requires_python:
+ logger.verbose(
+ "Link requires a different Python (%s not in: %r): %s",
+ version,
+ link.requires_python,
+ link,
+ )
+ return False
+
+ logger.debug(
+ "Ignoring failed Requires-Python check (%s not in: %r) for link: %s",
+ version,
+ link.requires_python,
+ link,
+ )
+
+ return True
+
+
+class LinkType(enum.Enum):
+ candidate = enum.auto()
+ different_project = enum.auto()
+ yanked = enum.auto()
+ format_unsupported = enum.auto()
+ format_invalid = enum.auto()
+ platform_mismatch = enum.auto()
+ requires_python_mismatch = enum.auto()
+
+
+class LinkEvaluator:
+
+ """
+ Responsible for evaluating links for a particular project.
+ """
+
+ _py_version_re = re.compile(r"-py([123]\.?[0-9]?)$")
+
+ # Don't include an allow_yanked default value to make sure each call
+ # site considers whether yanked releases are allowed. This also causes
+ # that decision to be made explicit in the calling code, which helps
+ # people when reading the code.
+ def __init__(
+ self,
+ project_name: str,
+ canonical_name: str,
+ formats: FrozenSet[str],
+ target_python: TargetPython,
+ allow_yanked: bool,
+ ignore_requires_python: Optional[bool] = None,
+ ) -> None:
+ """
+ :param project_name: The user supplied package name.
+ :param canonical_name: The canonical package name.
+ :param formats: The formats allowed for this package. Should be a set
+ with 'binary' or 'source' or both in it.
+ :param target_python: The target Python interpreter to use when
+ evaluating link compatibility. This is used, for example, to
+ check wheel compatibility, as well as when checking the Python
+ version, e.g. the Python version embedded in a link filename
+ (or egg fragment) and against an HTML link's optional PEP 503
+ "data-requires-python" attribute.
+ :param allow_yanked: Whether files marked as yanked (in the sense
+ of PEP 592) are permitted to be candidates for install.
+ :param ignore_requires_python: Whether to ignore incompatible
+ PEP 503 "data-requires-python" values in HTML links. Defaults
+ to False.
+ """
+ if ignore_requires_python is None:
+ ignore_requires_python = False
+
+ self._allow_yanked = allow_yanked
+ self._canonical_name = canonical_name
+ self._ignore_requires_python = ignore_requires_python
+ self._formats = formats
+ self._target_python = target_python
+
+ self.project_name = project_name
+
+ def evaluate_link(self, link: Link) -> Tuple[LinkType, str]:
+ """
+ Determine whether a link is a candidate for installation.
+
+ :return: A tuple (result, detail), where *result* is an enum
+ representing whether the evaluation found a candidate, or the reason
+ why one is not found. If a candidate is found, *detail* will be the
+ candidate's version string; if one is not found, it contains the
+ reason the link fails to qualify.
+ """
+ version = None
+ if link.is_yanked and not self._allow_yanked:
+ reason = link.yanked_reason or "<none given>"
+ return (LinkType.yanked, f"yanked for reason: {reason}")
+
+ if link.egg_fragment:
+ egg_info = link.egg_fragment
+ ext = link.ext
+ else:
+ egg_info, ext = link.splitext()
+ if not ext:
+ return (LinkType.format_unsupported, "not a file")
+ if ext not in SUPPORTED_EXTENSIONS:
+ return (
+ LinkType.format_unsupported,
+ f"unsupported archive format: {ext}",
+ )
+ if "binary" not in self._formats and ext == WHEEL_EXTENSION:
+ reason = f"No binaries permitted for {self.project_name}"
+ return (LinkType.format_unsupported, reason)
+ if "macosx10" in link.path and ext == ".zip":
+ return (LinkType.format_unsupported, "macosx10 one")
+ if ext == WHEEL_EXTENSION:
+ try:
+ wheel = Wheel(link.filename)
+ except InvalidWheelFilename:
+ return (
+ LinkType.format_invalid,
+ "invalid wheel filename",
+ )
+ if canonicalize_name(wheel.name) != self._canonical_name:
+ reason = f"wrong project name (not {self.project_name})"
+ return (LinkType.different_project, reason)
+
+ supported_tags = self._target_python.get_unsorted_tags()
+ if not wheel.supported(supported_tags):
+ # Include the wheel's tags in the reason string to
+ # simplify troubleshooting compatibility issues.
+ file_tags = ", ".join(wheel.get_formatted_file_tags())
+ reason = (
+ f"none of the wheel's tags ({file_tags}) are compatible "
+ f"(run pip debug --verbose to show compatible tags)"
+ )
+ return (LinkType.platform_mismatch, reason)
+
+ version = wheel.version
+
+ # This should be up by the self.ok_binary check, but see issue 2700.
+ if "source" not in self._formats and ext != WHEEL_EXTENSION:
+ reason = f"No sources permitted for {self.project_name}"
+ return (LinkType.format_unsupported, reason)
+
+ if not version:
+ version = _extract_version_from_fragment(
+ egg_info,
+ self._canonical_name,
+ )
+ if not version:
+ reason = f"Missing project version for {self.project_name}"
+ return (LinkType.format_invalid, reason)
+
+ match = self._py_version_re.search(version)
+ if match:
+ version = version[: match.start()]
+ py_version = match.group(1)
+ if py_version != self._target_python.py_version:
+ return (
+ LinkType.platform_mismatch,
+ "Python version is incorrect",
+ )
+
+ supports_python = _check_link_requires_python(
+ link,
+ version_info=self._target_python.py_version_info,
+ ignore_requires_python=self._ignore_requires_python,
+ )
+ if not supports_python:
+ reason = f"{version} Requires-Python {link.requires_python}"
+ return (LinkType.requires_python_mismatch, reason)
+
+ logger.debug("Found link %s, version: %s", link, version)
+
+ return (LinkType.candidate, version)
+
+
+def filter_unallowed_hashes(
+ candidates: List[InstallationCandidate],
+ hashes: Optional[Hashes],
+ project_name: str,
+) -> List[InstallationCandidate]:
+ """
+ Filter out candidates whose hashes aren't allowed, and return a new
+ list of candidates.
+
+ If at least one candidate has an allowed hash, then all candidates with
+ either an allowed hash or no hash specified are returned. Otherwise,
+ the given candidates are returned.
+
+ Including the candidates with no hash specified when there is a match
+ allows a warning to be logged if there is a more preferred candidate
+ with no hash specified. Returning all candidates in the case of no
+ matches lets pip report the hash of the candidate that would otherwise
+ have been installed (e.g. permitting the user to more easily update
+ their requirements file with the desired hash).
+ """
+ if not hashes:
+ logger.debug(
+ "Given no hashes to check %s links for project %r: "
+ "discarding no candidates",
+ len(candidates),
+ project_name,
+ )
+ # Make sure we're not returning back the given value.
+ return list(candidates)
+
+ matches_or_no_digest = []
+ # Collect the non-matches for logging purposes.
+ non_matches = []
+ match_count = 0
+ for candidate in candidates:
+ link = candidate.link
+ if not link.has_hash:
+ pass
+ elif link.is_hash_allowed(hashes=hashes):
+ match_count += 1
+ else:
+ non_matches.append(candidate)
+ continue
+
+ matches_or_no_digest.append(candidate)
+
+ if match_count:
+ filtered = matches_or_no_digest
+ else:
+ # Make sure we're not returning back the given value.
+ filtered = list(candidates)
+
+ if len(filtered) == len(candidates):
+ discard_message = "discarding no candidates"
+ else:
+ discard_message = "discarding {} non-matches:\n {}".format(
+ len(non_matches),
+ "\n ".join(str(candidate.link) for candidate in non_matches),
+ )
+
+ logger.debug(
+ "Checked %s links for project %r against %s hashes "
+ "(%s matches, %s no digest): %s",
+ len(candidates),
+ project_name,
+ hashes.digest_count,
+ match_count,
+ len(matches_or_no_digest) - match_count,
+ discard_message,
+ )
+
+ return filtered
+
+
+class CandidatePreferences:
+
+ """
+ Encapsulates some of the preferences for filtering and sorting
+ InstallationCandidate objects.
+ """
+
+ def __init__(
+ self,
+ prefer_binary: bool = False,
+ allow_all_prereleases: bool = False,
+ ) -> None:
+ """
+ :param allow_all_prereleases: Whether to allow all pre-releases.
+ """
+ self.allow_all_prereleases = allow_all_prereleases
+ self.prefer_binary = prefer_binary
+
+
+class BestCandidateResult:
+ """A collection of candidates, returned by `PackageFinder.find_best_candidate`.
+
+ This class is only intended to be instantiated by CandidateEvaluator's
+ `compute_best_candidate()` method.
+ """
+
+ def __init__(
+ self,
+ candidates: List[InstallationCandidate],
+ applicable_candidates: List[InstallationCandidate],
+ best_candidate: Optional[InstallationCandidate],
+ ) -> None:
+ """
+ :param candidates: A sequence of all available candidates found.
+ :param applicable_candidates: The applicable candidates.
+ :param best_candidate: The most preferred candidate found, or None
+ if no applicable candidates were found.
+ """
+ assert set(applicable_candidates) <= set(candidates)
+
+ if best_candidate is None:
+ assert not applicable_candidates
+ else:
+ assert best_candidate in applicable_candidates
+
+ self._applicable_candidates = applicable_candidates
+ self._candidates = candidates
+
+ self.best_candidate = best_candidate
+
+ def iter_all(self) -> Iterable[InstallationCandidate]:
+ """Iterate through all candidates."""
+ return iter(self._candidates)
+
+ def iter_applicable(self) -> Iterable[InstallationCandidate]:
+ """Iterate through the applicable candidates."""
+ return iter(self._applicable_candidates)
+
+
+class CandidateEvaluator:
+
+ """
+ Responsible for filtering and sorting candidates for installation based
+ on what tags are valid.
+ """
+
+ @classmethod
+ def create(
+ cls,
+ project_name: str,
+ target_python: Optional[TargetPython] = None,
+ prefer_binary: bool = False,
+ allow_all_prereleases: bool = False,
+ specifier: Optional[specifiers.BaseSpecifier] = None,
+ hashes: Optional[Hashes] = None,
+ ) -> "CandidateEvaluator":
+ """Create a CandidateEvaluator object.
+
+ :param target_python: The target Python interpreter to use when
+ checking compatibility. If None (the default), a TargetPython
+ object will be constructed from the running Python.
+ :param specifier: An optional object implementing `filter`
+ (e.g. `packaging.specifiers.SpecifierSet`) to filter applicable
+ versions.
+ :param hashes: An optional collection of allowed hashes.
+ """
+ if target_python is None:
+ target_python = TargetPython()
+ if specifier is None:
+ specifier = specifiers.SpecifierSet()
+
+ supported_tags = target_python.get_sorted_tags()
+
+ return cls(
+ project_name=project_name,
+ supported_tags=supported_tags,
+ specifier=specifier,
+ prefer_binary=prefer_binary,
+ allow_all_prereleases=allow_all_prereleases,
+ hashes=hashes,
+ )
+
+ def __init__(
+ self,
+ project_name: str,
+ supported_tags: List[Tag],
+ specifier: specifiers.BaseSpecifier,
+ prefer_binary: bool = False,
+ allow_all_prereleases: bool = False,
+ hashes: Optional[Hashes] = None,
+ ) -> None:
+ """
+ :param supported_tags: The PEP 425 tags supported by the target
+ Python in order of preference (most preferred first).
+ """
+ self._allow_all_prereleases = allow_all_prereleases
+ self._hashes = hashes
+ self._prefer_binary = prefer_binary
+ self._project_name = project_name
+ self._specifier = specifier
+ self._supported_tags = supported_tags
+ # Since the index of the tag in the _supported_tags list is used
+ # as a priority, precompute a map from tag to index/priority to be
+ # used in wheel.find_most_preferred_tag.
+ self._wheel_tag_preferences = {
+ tag: idx for idx, tag in enumerate(supported_tags)
+ }
+
+ def get_applicable_candidates(
+ self,
+ candidates: List[InstallationCandidate],
+ ) -> List[InstallationCandidate]:
+ """
+ Return the applicable candidates from a list of candidates.
+ """
+ # Using None infers from the specifier instead.
+ allow_prereleases = self._allow_all_prereleases or None
+ specifier = self._specifier
+ versions = {
+ str(v)
+ for v in specifier.filter(
+ # We turn the version object into a str here because otherwise
+ # when we're debundled but setuptools isn't, Python will see
+ # packaging.version.Version and
+ # pkg_resources._vendor.packaging.version.Version as different
+ # types. This way we'll use a str as a common data interchange
+ # format. If we stop using the pkg_resources provided specifier
+ # and start using our own, we can drop the cast to str().
+ (str(c.version) for c in candidates),
+ prereleases=allow_prereleases,
+ )
+ }
+
+ # Again, converting version to str to deal with debundling.
+ applicable_candidates = [c for c in candidates if str(c.version) in versions]
+
+ filtered_applicable_candidates = filter_unallowed_hashes(
+ candidates=applicable_candidates,
+ hashes=self._hashes,
+ project_name=self._project_name,
+ )
+
+ return sorted(filtered_applicable_candidates, key=self._sort_key)
+
+ def _sort_key(self, candidate: InstallationCandidate) -> CandidateSortingKey:
+ """
+ Function to pass as the `key` argument to a call to sorted() to sort
+ InstallationCandidates by preference.
+
+ Returns a tuple such that tuples sorting as greater using Python's
+ default comparison operator are more preferred.
+
+ The preference is as follows:
+
+ First and foremost, candidates with allowed (matching) hashes are
+ always preferred over candidates without matching hashes. This is
+ because e.g. if the only candidate with an allowed hash is yanked,
+ we still want to use that candidate.
+
+ Second, excepting hash considerations, candidates that have been
+ yanked (in the sense of PEP 592) are always less preferred than
+ candidates that haven't been yanked. Then:
+
+ If not finding wheels, they are sorted by version only.
+ If finding wheels, then the sort order is by version, then:
+ 1. existing installs
+ 2. wheels ordered via Wheel.support_index_min(self._supported_tags)
+ 3. source archives
+ If prefer_binary was set, then all wheels are sorted above sources.
+
+ Note: it was considered to embed this logic into the Link
+ comparison operators, but then different sdist links
+ with the same version, would have to be considered equal
+ """
+ valid_tags = self._supported_tags
+ support_num = len(valid_tags)
+ build_tag: BuildTag = ()
+ binary_preference = 0
+ link = candidate.link
+ if link.is_wheel:
+ # can raise InvalidWheelFilename
+ wheel = Wheel(link.filename)
+ try:
+ pri = -(
+ wheel.find_most_preferred_tag(
+ valid_tags, self._wheel_tag_preferences
+ )
+ )
+ except ValueError:
+ raise UnsupportedWheel(
+ f"{wheel.filename} is not a supported wheel for this platform. It "
+ "can't be sorted."
+ )
+ if self._prefer_binary:
+ binary_preference = 1
+ if wheel.build_tag is not None:
+ match = re.match(r"^(\d+)(.*)$", wheel.build_tag)
+ assert match is not None, "guaranteed by filename validation"
+ build_tag_groups = match.groups()
+ build_tag = (int(build_tag_groups[0]), build_tag_groups[1])
+ else: # sdist
+ pri = -(support_num)
+ has_allowed_hash = int(link.is_hash_allowed(self._hashes))
+ yank_value = -1 * int(link.is_yanked) # -1 for yanked.
+ return (
+ has_allowed_hash,
+ yank_value,
+ binary_preference,
+ candidate.version,
+ pri,
+ build_tag,
+ )
+
+ def sort_best_candidate(
+ self,
+ candidates: List[InstallationCandidate],
+ ) -> Optional[InstallationCandidate]:
+ """
+ Return the best candidate per the instance's sort order, or None if
+ no candidate is acceptable.
+ """
+ if not candidates:
+ return None
+ best_candidate = max(candidates, key=self._sort_key)
+ return best_candidate
+
+ def compute_best_candidate(
+ self,
+ candidates: List[InstallationCandidate],
+ ) -> BestCandidateResult:
+ """
+ Compute and return a `BestCandidateResult` instance.
+ """
+ applicable_candidates = self.get_applicable_candidates(candidates)
+
+ best_candidate = self.sort_best_candidate(applicable_candidates)
+
+ return BestCandidateResult(
+ candidates,
+ applicable_candidates=applicable_candidates,
+ best_candidate=best_candidate,
+ )
+
+
+class PackageFinder:
+ """This finds packages.
+
+ This is meant to match easy_install's technique for looking for
+ packages, by reading pages and looking for appropriate links.
+ """
+
+ def __init__(
+ self,
+ link_collector: LinkCollector,
+ target_python: TargetPython,
+ allow_yanked: bool,
+ format_control: Optional[FormatControl] = None,
+ candidate_prefs: Optional[CandidatePreferences] = None,
+ ignore_requires_python: Optional[bool] = None,
+ ) -> None:
+ """
+ This constructor is primarily meant to be used by the create() class
+ method and from tests.
+
+ :param format_control: A FormatControl object, used to control
+ the selection of source packages / binary packages when consulting
+ the index and links.
+ :param candidate_prefs: Options to use when creating a
+ CandidateEvaluator object.
+ """
+ if candidate_prefs is None:
+ candidate_prefs = CandidatePreferences()
+
+ format_control = format_control or FormatControl(set(), set())
+
+ self._allow_yanked = allow_yanked
+ self._candidate_prefs = candidate_prefs
+ self._ignore_requires_python = ignore_requires_python
+ self._link_collector = link_collector
+ self._target_python = target_python
+
+ self.format_control = format_control
+
+ # These are boring links that have already been logged somehow.
+ self._logged_links: Set[Tuple[Link, LinkType, str]] = set()
+
+ # Don't include an allow_yanked default value to make sure each call
+ # site considers whether yanked releases are allowed. This also causes
+ # that decision to be made explicit in the calling code, which helps
+ # people when reading the code.
+ @classmethod
+ def create(
+ cls,
+ link_collector: LinkCollector,
+ selection_prefs: SelectionPreferences,
+ target_python: Optional[TargetPython] = None,
+ ) -> "PackageFinder":
+ """Create a PackageFinder.
+
+ :param selection_prefs: The candidate selection preferences, as a
+ SelectionPreferences object.
+ :param target_python: The target Python interpreter to use when
+ checking compatibility. If None (the default), a TargetPython
+ object will be constructed from the running Python.
+ """
+ if target_python is None:
+ target_python = TargetPython()
+
+ candidate_prefs = CandidatePreferences(
+ prefer_binary=selection_prefs.prefer_binary,
+ allow_all_prereleases=selection_prefs.allow_all_prereleases,
+ )
+
+ return cls(
+ candidate_prefs=candidate_prefs,
+ link_collector=link_collector,
+ target_python=target_python,
+ allow_yanked=selection_prefs.allow_yanked,
+ format_control=selection_prefs.format_control,
+ ignore_requires_python=selection_prefs.ignore_requires_python,
+ )
+
+ @property
+ def target_python(self) -> TargetPython:
+ return self._target_python
+
+ @property
+ def search_scope(self) -> SearchScope:
+ return self._link_collector.search_scope
+
+ @search_scope.setter
+ def search_scope(self, search_scope: SearchScope) -> None:
+ self._link_collector.search_scope = search_scope
+
+ @property
+ def find_links(self) -> List[str]:
+ return self._link_collector.find_links
+
+ @property
+ def index_urls(self) -> List[str]:
+ return self.search_scope.index_urls
+
+ @property
+ def trusted_hosts(self) -> Iterable[str]:
+ for host_port in self._link_collector.session.pip_trusted_origins:
+ yield build_netloc(*host_port)
+
+ @property
+ def allow_all_prereleases(self) -> bool:
+ return self._candidate_prefs.allow_all_prereleases
+
+ def set_allow_all_prereleases(self) -> None:
+ self._candidate_prefs.allow_all_prereleases = True
+
+ @property
+ def prefer_binary(self) -> bool:
+ return self._candidate_prefs.prefer_binary
+
+ def set_prefer_binary(self) -> None:
+ self._candidate_prefs.prefer_binary = True
+
+ def requires_python_skipped_reasons(self) -> List[str]:
+ reasons = {
+ detail
+ for _, result, detail in self._logged_links
+ if result == LinkType.requires_python_mismatch
+ }
+ return sorted(reasons)
+
+ def make_link_evaluator(self, project_name: str) -> LinkEvaluator:
+ canonical_name = canonicalize_name(project_name)
+ formats = self.format_control.get_allowed_formats(canonical_name)
+
+ return LinkEvaluator(
+ project_name=project_name,
+ canonical_name=canonical_name,
+ formats=formats,
+ target_python=self._target_python,
+ allow_yanked=self._allow_yanked,
+ ignore_requires_python=self._ignore_requires_python,
+ )
+
+ def _sort_links(self, links: Iterable[Link]) -> List[Link]:
+ """
+ Returns elements of links in order, non-egg links first, egg links
+ second, while eliminating duplicates
+ """
+ eggs, no_eggs = [], []
+ seen: Set[Link] = set()
+ for link in links:
+ if link not in seen:
+ seen.add(link)
+ if link.egg_fragment:
+ eggs.append(link)
+ else:
+ no_eggs.append(link)
+ return no_eggs + eggs
+
+ def _log_skipped_link(self, link: Link, result: LinkType, detail: str) -> None:
+ entry = (link, result, detail)
+ if entry not in self._logged_links:
+ # Put the link at the end so the reason is more visible and because
+ # the link string is usually very long.
+ logger.debug("Skipping link: %s: %s", detail, link)
+ self._logged_links.add(entry)
+
+ def get_install_candidate(
+ self, link_evaluator: LinkEvaluator, link: Link
+ ) -> Optional[InstallationCandidate]:
+ """
+ If the link is a candidate for install, convert it to an
+ InstallationCandidate and return it. Otherwise, return None.
+ """
+ result, detail = link_evaluator.evaluate_link(link)
+ if result != LinkType.candidate:
+ self._log_skipped_link(link, result, detail)
+ return None
+
+ return InstallationCandidate(
+ name=link_evaluator.project_name,
+ link=link,
+ version=detail,
+ )
+
+ def evaluate_links(
+ self, link_evaluator: LinkEvaluator, links: Iterable[Link]
+ ) -> List[InstallationCandidate]:
+ """
+ Convert links that are candidates to InstallationCandidate objects.
+ """
+ candidates = []
+ for link in self._sort_links(links):
+ candidate = self.get_install_candidate(link_evaluator, link)
+ if candidate is not None:
+ candidates.append(candidate)
+
+ return candidates
+
+ def process_project_url(
+ self, project_url: Link, link_evaluator: LinkEvaluator
+ ) -> List[InstallationCandidate]:
+ logger.debug(
+ "Fetching project page and analyzing links: %s",
+ project_url,
+ )
+ index_response = self._link_collector.fetch_response(project_url)
+ if index_response is None:
+ return []
+
+ page_links = list(parse_links(index_response))
+
+ with indent_log():
+ package_links = self.evaluate_links(
+ link_evaluator,
+ links=page_links,
+ )
+
+ return package_links
+
+ @functools.lru_cache(maxsize=None)
+ def find_all_candidates(self, project_name: str) -> List[InstallationCandidate]:
+ """Find all available InstallationCandidate for project_name
+
+ This checks index_urls and find_links.
+ All versions found are returned as an InstallationCandidate list.
+
+ See LinkEvaluator.evaluate_link() for details on which files
+ are accepted.
+ """
+ link_evaluator = self.make_link_evaluator(project_name)
+
+ collected_sources = self._link_collector.collect_sources(
+ project_name=project_name,
+ candidates_from_page=functools.partial(
+ self.process_project_url,
+ link_evaluator=link_evaluator,
+ ),
+ )
+
+ page_candidates_it = itertools.chain.from_iterable(
+ source.page_candidates()
+ for sources in collected_sources
+ for source in sources
+ if source is not None
+ )
+ page_candidates = list(page_candidates_it)
+
+ file_links_it = itertools.chain.from_iterable(
+ source.file_links()
+ for sources in collected_sources
+ for source in sources
+ if source is not None
+ )
+ file_candidates = self.evaluate_links(
+ link_evaluator,
+ sorted(file_links_it, reverse=True),
+ )
+
+ if logger.isEnabledFor(logging.DEBUG) and file_candidates:
+ paths = []
+ for candidate in file_candidates:
+ assert candidate.link.url # we need to have a URL
+ try:
+ paths.append(candidate.link.file_path)
+ except Exception:
+ paths.append(candidate.link.url) # it's not a local file
+
+ logger.debug("Local files found: %s", ", ".join(paths))
+
+ # This is an intentional priority ordering
+ return file_candidates + page_candidates
+
+ def make_candidate_evaluator(
+ self,
+ project_name: str,
+ specifier: Optional[specifiers.BaseSpecifier] = None,
+ hashes: Optional[Hashes] = None,
+ ) -> CandidateEvaluator:
+ """Create a CandidateEvaluator object to use."""
+ candidate_prefs = self._candidate_prefs
+ return CandidateEvaluator.create(
+ project_name=project_name,
+ target_python=self._target_python,
+ prefer_binary=candidate_prefs.prefer_binary,
+ allow_all_prereleases=candidate_prefs.allow_all_prereleases,
+ specifier=specifier,
+ hashes=hashes,
+ )
+
+ @functools.lru_cache(maxsize=None)
+ def find_best_candidate(
+ self,
+ project_name: str,
+ specifier: Optional[specifiers.BaseSpecifier] = None,
+ hashes: Optional[Hashes] = None,
+ ) -> BestCandidateResult:
+ """Find matches for the given project and specifier.
+
+ :param specifier: An optional object implementing `filter`
+ (e.g. `packaging.specifiers.SpecifierSet`) to filter applicable
+ versions.
+
+ :return: A `BestCandidateResult` instance.
+ """
+ candidates = self.find_all_candidates(project_name)
+ candidate_evaluator = self.make_candidate_evaluator(
+ project_name=project_name,
+ specifier=specifier,
+ hashes=hashes,
+ )
+ return candidate_evaluator.compute_best_candidate(candidates)
+
+ def find_requirement(
+ self, req: InstallRequirement, upgrade: bool
+ ) -> Optional[InstallationCandidate]:
+ """Try to find a Link matching req
+
+ Expects req, an InstallRequirement and upgrade, a boolean
+ Returns a InstallationCandidate if found,
+ Raises DistributionNotFound or BestVersionAlreadyInstalled otherwise
+ """
+ hashes = req.hashes(trust_internet=False)
+ best_candidate_result = self.find_best_candidate(
+ req.name,
+ specifier=req.specifier,
+ hashes=hashes,
+ )
+ best_candidate = best_candidate_result.best_candidate
+
+ installed_version: Optional[_BaseVersion] = None
+ if req.satisfied_by is not None:
+ installed_version = req.satisfied_by.version
+
+ def _format_versions(cand_iter: Iterable[InstallationCandidate]) -> str:
+ # This repeated parse_version and str() conversion is needed to
+ # handle different vendoring sources from pip and pkg_resources.
+ # If we stop using the pkg_resources provided specifier and start
+ # using our own, we can drop the cast to str().
+ return (
+ ", ".join(
+ sorted(
+ {str(c.version) for c in cand_iter},
+ key=parse_version,
+ )
+ )
+ or "none"
+ )
+
+ if installed_version is None and best_candidate is None:
+ logger.critical(
+ "Could not find a version that satisfies the requirement %s "
+ "(from versions: %s)",
+ req,
+ _format_versions(best_candidate_result.iter_all()),
+ )
+
+ raise DistributionNotFound(f"No matching distribution found for {req}")
+
+ def _should_install_candidate(
+ candidate: Optional[InstallationCandidate],
+ ) -> "TypeGuard[InstallationCandidate]":
+ if installed_version is None:
+ return True
+ if best_candidate is None:
+ return False
+ return best_candidate.version > installed_version
+
+ if not upgrade and installed_version is not None:
+ if _should_install_candidate(best_candidate):
+ logger.debug(
+ "Existing installed version (%s) satisfies requirement "
+ "(most up-to-date version is %s)",
+ installed_version,
+ best_candidate.version,
+ )
+ else:
+ logger.debug(
+ "Existing installed version (%s) is most up-to-date and "
+ "satisfies requirement",
+ installed_version,
+ )
+ return None
+
+ if _should_install_candidate(best_candidate):
+ logger.debug(
+ "Using version %s (newest of versions: %s)",
+ best_candidate.version,
+ _format_versions(best_candidate_result.iter_applicable()),
+ )
+ return best_candidate
+
+ # We have an existing version, and its the best version
+ logger.debug(
+ "Installed version (%s) is most up-to-date (past versions: %s)",
+ installed_version,
+ _format_versions(best_candidate_result.iter_applicable()),
+ )
+ raise BestVersionAlreadyInstalled
+
+
+def _find_name_version_sep(fragment: str, canonical_name: str) -> int:
+ """Find the separator's index based on the package's canonical name.
+
+ :param fragment: A <package>+<version> filename "fragment" (stem) or
+ egg fragment.
+ :param canonical_name: The package's canonical name.
+
+ This function is needed since the canonicalized name does not necessarily
+ have the same length as the egg info's name part. An example::
+
+ >>> fragment = 'foo__bar-1.0'
+ >>> canonical_name = 'foo-bar'
+ >>> _find_name_version_sep(fragment, canonical_name)
+ 8
+ """
+ # Project name and version must be separated by one single dash. Find all
+ # occurrences of dashes; if the string in front of it matches the canonical
+ # name, this is the one separating the name and version parts.
+ for i, c in enumerate(fragment):
+ if c != "-":
+ continue
+ if canonicalize_name(fragment[:i]) == canonical_name:
+ return i
+ raise ValueError(f"{fragment} does not match {canonical_name}")
+
+
+def _extract_version_from_fragment(fragment: str, canonical_name: str) -> Optional[str]:
+ """Parse the version string from a <package>+<version> filename
+ "fragment" (stem) or egg fragment.
+
+ :param fragment: The string to parse. E.g. foo-2.1
+ :param canonical_name: The canonicalized name of the package this
+ belongs to.
+ """
+ try:
+ version_start = _find_name_version_sep(fragment, canonical_name) + 1
+ except ValueError:
+ return None
+ version = fragment[version_start:]
+ if not version:
+ return None
+ return version
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/index/sources.py b/venv/lib/python3.11/site-packages/pip/_internal/index/sources.py
new file mode 100644
index 0000000..f4626d7
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/index/sources.py
@@ -0,0 +1,285 @@
+import logging
+import mimetypes
+import os
+from collections import defaultdict
+from typing import Callable, Dict, Iterable, List, Optional, Tuple
+
+from pip._vendor.packaging.utils import (
+ InvalidSdistFilename,
+ InvalidVersion,
+ InvalidWheelFilename,
+ canonicalize_name,
+ parse_sdist_filename,
+ parse_wheel_filename,
+)
+
+from pip._internal.models.candidate import InstallationCandidate
+from pip._internal.models.link import Link
+from pip._internal.utils.urls import path_to_url, url_to_path
+from pip._internal.vcs import is_url
+
+logger = logging.getLogger(__name__)
+
+FoundCandidates = Iterable[InstallationCandidate]
+FoundLinks = Iterable[Link]
+CandidatesFromPage = Callable[[Link], Iterable[InstallationCandidate]]
+PageValidator = Callable[[Link], bool]
+
+
+class LinkSource:
+ @property
+ def link(self) -> Optional[Link]:
+ """Returns the underlying link, if there's one."""
+ raise NotImplementedError()
+
+ def page_candidates(self) -> FoundCandidates:
+ """Candidates found by parsing an archive listing HTML file."""
+ raise NotImplementedError()
+
+ def file_links(self) -> FoundLinks:
+ """Links found by specifying archives directly."""
+ raise NotImplementedError()
+
+
+def _is_html_file(file_url: str) -> bool:
+ return mimetypes.guess_type(file_url, strict=False)[0] == "text/html"
+
+
+class _FlatDirectoryToUrls:
+ """Scans directory and caches results"""
+
+ def __init__(self, path: str) -> None:
+ self._path = path
+ self._page_candidates: List[str] = []
+ self._project_name_to_urls: Dict[str, List[str]] = defaultdict(list)
+ self._scanned_directory = False
+
+ def _scan_directory(self) -> None:
+ """Scans directory once and populates both page_candidates
+ and project_name_to_urls at the same time
+ """
+ for entry in os.scandir(self._path):
+ url = path_to_url(entry.path)
+ if _is_html_file(url):
+ self._page_candidates.append(url)
+ continue
+
+ # File must have a valid wheel or sdist name,
+ # otherwise not worth considering as a package
+ try:
+ project_filename = parse_wheel_filename(entry.name)[0]
+ except (InvalidWheelFilename, InvalidVersion):
+ try:
+ project_filename = parse_sdist_filename(entry.name)[0]
+ except (InvalidSdistFilename, InvalidVersion):
+ continue
+
+ self._project_name_to_urls[project_filename].append(url)
+ self._scanned_directory = True
+
+ @property
+ def page_candidates(self) -> List[str]:
+ if not self._scanned_directory:
+ self._scan_directory()
+
+ return self._page_candidates
+
+ @property
+ def project_name_to_urls(self) -> Dict[str, List[str]]:
+ if not self._scanned_directory:
+ self._scan_directory()
+
+ return self._project_name_to_urls
+
+
+class _FlatDirectorySource(LinkSource):
+ """Link source specified by ``--find-links=<path-to-dir>``.
+
+ This looks the content of the directory, and returns:
+
+ * ``page_candidates``: Links listed on each HTML file in the directory.
+ * ``file_candidates``: Archives in the directory.
+ """
+
+ _paths_to_urls: Dict[str, _FlatDirectoryToUrls] = {}
+
+ def __init__(
+ self,
+ candidates_from_page: CandidatesFromPage,
+ path: str,
+ project_name: str,
+ ) -> None:
+ self._candidates_from_page = candidates_from_page
+ self._project_name = canonicalize_name(project_name)
+
+ # Get existing instance of _FlatDirectoryToUrls if it exists
+ if path in self._paths_to_urls:
+ self._path_to_urls = self._paths_to_urls[path]
+ else:
+ self._path_to_urls = _FlatDirectoryToUrls(path=path)
+ self._paths_to_urls[path] = self._path_to_urls
+
+ @property
+ def link(self) -> Optional[Link]:
+ return None
+
+ def page_candidates(self) -> FoundCandidates:
+ for url in self._path_to_urls.page_candidates:
+ yield from self._candidates_from_page(Link(url))
+
+ def file_links(self) -> FoundLinks:
+ for url in self._path_to_urls.project_name_to_urls[self._project_name]:
+ yield Link(url)
+
+
+class _LocalFileSource(LinkSource):
+ """``--find-links=<path-or-url>`` or ``--[extra-]index-url=<path-or-url>``.
+
+ If a URL is supplied, it must be a ``file:`` URL. If a path is supplied to
+ the option, it is converted to a URL first. This returns:
+
+ * ``page_candidates``: Links listed on an HTML file.
+ * ``file_candidates``: The non-HTML file.
+ """
+
+ def __init__(
+ self,
+ candidates_from_page: CandidatesFromPage,
+ link: Link,
+ ) -> None:
+ self._candidates_from_page = candidates_from_page
+ self._link = link
+
+ @property
+ def link(self) -> Optional[Link]:
+ return self._link
+
+ def page_candidates(self) -> FoundCandidates:
+ if not _is_html_file(self._link.url):
+ return
+ yield from self._candidates_from_page(self._link)
+
+ def file_links(self) -> FoundLinks:
+ if _is_html_file(self._link.url):
+ return
+ yield self._link
+
+
+class _RemoteFileSource(LinkSource):
+ """``--find-links=<url>`` or ``--[extra-]index-url=<url>``.
+
+ This returns:
+
+ * ``page_candidates``: Links listed on an HTML file.
+ * ``file_candidates``: The non-HTML file.
+ """
+
+ def __init__(
+ self,
+ candidates_from_page: CandidatesFromPage,
+ page_validator: PageValidator,
+ link: Link,
+ ) -> None:
+ self._candidates_from_page = candidates_from_page
+ self._page_validator = page_validator
+ self._link = link
+
+ @property
+ def link(self) -> Optional[Link]:
+ return self._link
+
+ def page_candidates(self) -> FoundCandidates:
+ if not self._page_validator(self._link):
+ return
+ yield from self._candidates_from_page(self._link)
+
+ def file_links(self) -> FoundLinks:
+ yield self._link
+
+
+class _IndexDirectorySource(LinkSource):
+ """``--[extra-]index-url=<path-to-directory>``.
+
+ This is treated like a remote URL; ``candidates_from_page`` contains logic
+ for this by appending ``index.html`` to the link.
+ """
+
+ def __init__(
+ self,
+ candidates_from_page: CandidatesFromPage,
+ link: Link,
+ ) -> None:
+ self._candidates_from_page = candidates_from_page
+ self._link = link
+
+ @property
+ def link(self) -> Optional[Link]:
+ return self._link
+
+ def page_candidates(self) -> FoundCandidates:
+ yield from self._candidates_from_page(self._link)
+
+ def file_links(self) -> FoundLinks:
+ return ()
+
+
+def build_source(
+ location: str,
+ *,
+ candidates_from_page: CandidatesFromPage,
+ page_validator: PageValidator,
+ expand_dir: bool,
+ cache_link_parsing: bool,
+ project_name: str,
+) -> Tuple[Optional[str], Optional[LinkSource]]:
+ path: Optional[str] = None
+ url: Optional[str] = None
+ if os.path.exists(location): # Is a local path.
+ url = path_to_url(location)
+ path = location
+ elif location.startswith("file:"): # A file: URL.
+ url = location
+ path = url_to_path(location)
+ elif is_url(location):
+ url = location
+
+ if url is None:
+ msg = (
+ "Location '%s' is ignored: "
+ "it is either a non-existing path or lacks a specific scheme."
+ )
+ logger.warning(msg, location)
+ return (None, None)
+
+ if path is None:
+ source: LinkSource = _RemoteFileSource(
+ candidates_from_page=candidates_from_page,
+ page_validator=page_validator,
+ link=Link(url, cache_link_parsing=cache_link_parsing),
+ )
+ return (url, source)
+
+ if os.path.isdir(path):
+ if expand_dir:
+ source = _FlatDirectorySource(
+ candidates_from_page=candidates_from_page,
+ path=path,
+ project_name=project_name,
+ )
+ else:
+ source = _IndexDirectorySource(
+ candidates_from_page=candidates_from_page,
+ link=Link(url, cache_link_parsing=cache_link_parsing),
+ )
+ return (url, source)
+ elif os.path.isfile(path):
+ source = _LocalFileSource(
+ candidates_from_page=candidates_from_page,
+ link=Link(url, cache_link_parsing=cache_link_parsing),
+ )
+ return (url, source)
+ logger.warning(
+ "Location '%s' is ignored: it is neither a file nor a directory.",
+ location,
+ )
+ return (url, None)