diff options
author | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 |
---|---|---|
committer | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 |
commit | 6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch) | |
tree | b1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/pip/_internal/index | |
parent | 4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff) |
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/pip/_internal/index')
-rw-r--r-- | venv/lib/python3.11/site-packages/pip/_internal/index/__init__.py | 2 | ||||
-rw-r--r-- | venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/__init__.cpython-311.pyc | bin | 0 -> 248 bytes | |||
-rw-r--r-- | venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/collector.cpython-311.pyc | bin | 0 -> 24593 bytes | |||
-rw-r--r-- | venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/package_finder.cpython-311.pyc | bin | 0 -> 44157 bytes | |||
-rw-r--r-- | venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/sources.cpython-311.pyc | bin | 0 -> 13964 bytes | |||
-rw-r--r-- | venv/lib/python3.11/site-packages/pip/_internal/index/collector.py | 507 | ||||
-rw-r--r-- | venv/lib/python3.11/site-packages/pip/_internal/index/package_finder.py | 1027 | ||||
-rw-r--r-- | venv/lib/python3.11/site-packages/pip/_internal/index/sources.py | 285 |
8 files changed, 1821 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/index/__init__.py b/venv/lib/python3.11/site-packages/pip/_internal/index/__init__.py new file mode 100644 index 0000000..7a17b7b --- /dev/null +++ b/venv/lib/python3.11/site-packages/pip/_internal/index/__init__.py @@ -0,0 +1,2 @@ +"""Index interaction code +""" diff --git a/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..446ac36 --- /dev/null +++ b/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/collector.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/collector.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..a1dc4c1 --- /dev/null +++ b/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/collector.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/package_finder.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/package_finder.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..8379671 --- /dev/null +++ b/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/package_finder.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/sources.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/sources.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..e7e9b99 --- /dev/null +++ b/venv/lib/python3.11/site-packages/pip/_internal/index/__pycache__/sources.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/pip/_internal/index/collector.py b/venv/lib/python3.11/site-packages/pip/_internal/index/collector.py new file mode 100644 index 0000000..08c8bdd --- /dev/null +++ b/venv/lib/python3.11/site-packages/pip/_internal/index/collector.py @@ -0,0 +1,507 @@ +""" +The main purpose of this module is to expose LinkCollector.collect_sources(). +""" + +import collections +import email.message +import functools +import itertools +import json +import logging +import os +import urllib.parse +import urllib.request +from html.parser import HTMLParser +from optparse import Values +from typing import ( + TYPE_CHECKING, + Callable, + Dict, + Iterable, + List, + MutableMapping, + NamedTuple, + Optional, + Sequence, + Tuple, + Union, +) + +from pip._vendor import requests +from pip._vendor.requests import Response +from pip._vendor.requests.exceptions import RetryError, SSLError + +from pip._internal.exceptions import NetworkConnectionError +from pip._internal.models.link import Link +from pip._internal.models.search_scope import SearchScope +from pip._internal.network.session import PipSession +from pip._internal.network.utils import raise_for_status +from pip._internal.utils.filetypes import is_archive_file +from pip._internal.utils.misc import redact_auth_from_url +from pip._internal.vcs import vcs + +from .sources import CandidatesFromPage, LinkSource, build_source + +if TYPE_CHECKING: + from typing import Protocol +else: + Protocol = object + +logger = logging.getLogger(__name__) + +ResponseHeaders = MutableMapping[str, str] + + +def _match_vcs_scheme(url: str) -> Optional[str]: + """Look for VCS schemes in the URL. + + Returns the matched VCS scheme, or None if there's no match. + """ + for scheme in vcs.schemes: + if url.lower().startswith(scheme) and url[len(scheme)] in "+:": + return scheme + return None + + +class _NotAPIContent(Exception): + def __init__(self, content_type: str, request_desc: str) -> None: + super().__init__(content_type, request_desc) + self.content_type = content_type + self.request_desc = request_desc + + +def _ensure_api_header(response: Response) -> None: + """ + Check the Content-Type header to ensure the response contains a Simple + API Response. + + Raises `_NotAPIContent` if the content type is not a valid content-type. + """ + content_type = response.headers.get("Content-Type", "Unknown") + + content_type_l = content_type.lower() + if content_type_l.startswith( + ( + "text/html", + "application/vnd.pypi.simple.v1+html", + "application/vnd.pypi.simple.v1+json", + ) + ): + return + + raise _NotAPIContent(content_type, response.request.method) + + +class _NotHTTP(Exception): + pass + + +def _ensure_api_response(url: str, session: PipSession) -> None: + """ + Send a HEAD request to the URL, and ensure the response contains a simple + API Response. + + Raises `_NotHTTP` if the URL is not available for a HEAD request, or + `_NotAPIContent` if the content type is not a valid content type. + """ + scheme, netloc, path, query, fragment = urllib.parse.urlsplit(url) + if scheme not in {"http", "https"}: + raise _NotHTTP() + + resp = session.head(url, allow_redirects=True) + raise_for_status(resp) + + _ensure_api_header(resp) + + +def _get_simple_response(url: str, session: PipSession) -> Response: + """Access an Simple API response with GET, and return the response. + + This consists of three parts: + + 1. If the URL looks suspiciously like an archive, send a HEAD first to + check the Content-Type is HTML or Simple API, to avoid downloading a + large file. Raise `_NotHTTP` if the content type cannot be determined, or + `_NotAPIContent` if it is not HTML or a Simple API. + 2. Actually perform the request. Raise HTTP exceptions on network failures. + 3. Check the Content-Type header to make sure we got a Simple API response, + and raise `_NotAPIContent` otherwise. + """ + if is_archive_file(Link(url).filename): + _ensure_api_response(url, session=session) + + logger.debug("Getting page %s", redact_auth_from_url(url)) + + resp = session.get( + url, + headers={ + "Accept": ", ".join( + [ + "application/vnd.pypi.simple.v1+json", + "application/vnd.pypi.simple.v1+html; q=0.1", + "text/html; q=0.01", + ] + ), + # We don't want to blindly returned cached data for + # /simple/, because authors generally expecting that + # twine upload && pip install will function, but if + # they've done a pip install in the last ~10 minutes + # it won't. Thus by setting this to zero we will not + # blindly use any cached data, however the benefit of + # using max-age=0 instead of no-cache, is that we will + # still support conditional requests, so we will still + # minimize traffic sent in cases where the page hasn't + # changed at all, we will just always incur the round + # trip for the conditional GET now instead of only + # once per 10 minutes. + # For more information, please see pypa/pip#5670. + "Cache-Control": "max-age=0", + }, + ) + raise_for_status(resp) + + # The check for archives above only works if the url ends with + # something that looks like an archive. However that is not a + # requirement of an url. Unless we issue a HEAD request on every + # url we cannot know ahead of time for sure if something is a + # Simple API response or not. However we can check after we've + # downloaded it. + _ensure_api_header(resp) + + logger.debug( + "Fetched page %s as %s", + redact_auth_from_url(url), + resp.headers.get("Content-Type", "Unknown"), + ) + + return resp + + +def _get_encoding_from_headers(headers: ResponseHeaders) -> Optional[str]: + """Determine if we have any encoding information in our headers.""" + if headers and "Content-Type" in headers: + m = email.message.Message() + m["content-type"] = headers["Content-Type"] + charset = m.get_param("charset") + if charset: + return str(charset) + return None + + +class CacheablePageContent: + def __init__(self, page: "IndexContent") -> None: + assert page.cache_link_parsing + self.page = page + + def __eq__(self, other: object) -> bool: + return isinstance(other, type(self)) and self.page.url == other.page.url + + def __hash__(self) -> int: + return hash(self.page.url) + + +class ParseLinks(Protocol): + def __call__(self, page: "IndexContent") -> Iterable[Link]: + ... + + +def with_cached_index_content(fn: ParseLinks) -> ParseLinks: + """ + Given a function that parses an Iterable[Link] from an IndexContent, cache the + function's result (keyed by CacheablePageContent), unless the IndexContent + `page` has `page.cache_link_parsing == False`. + """ + + @functools.lru_cache(maxsize=None) + def wrapper(cacheable_page: CacheablePageContent) -> List[Link]: + return list(fn(cacheable_page.page)) + + @functools.wraps(fn) + def wrapper_wrapper(page: "IndexContent") -> List[Link]: + if page.cache_link_parsing: + return wrapper(CacheablePageContent(page)) + return list(fn(page)) + + return wrapper_wrapper + + +@with_cached_index_content +def parse_links(page: "IndexContent") -> Iterable[Link]: + """ + Parse a Simple API's Index Content, and yield its anchor elements as Link objects. + """ + + content_type_l = page.content_type.lower() + if content_type_l.startswith("application/vnd.pypi.simple.v1+json"): + data = json.loads(page.content) + for file in data.get("files", []): + link = Link.from_json(file, page.url) + if link is None: + continue + yield link + return + + parser = HTMLLinkParser(page.url) + encoding = page.encoding or "utf-8" + parser.feed(page.content.decode(encoding)) + + url = page.url + base_url = parser.base_url or url + for anchor in parser.anchors: + link = Link.from_element(anchor, page_url=url, base_url=base_url) + if link is None: + continue + yield link + + +class IndexContent: + """Represents one response (or page), along with its URL""" + + def __init__( + self, + content: bytes, + content_type: str, + encoding: Optional[str], + url: str, + cache_link_parsing: bool = True, + ) -> None: + """ + :param encoding: the encoding to decode the given content. + :param url: the URL from which the HTML was downloaded. + :param cache_link_parsing: whether links parsed from this page's url + should be cached. PyPI index urls should + have this set to False, for example. + """ + self.content = content + self.content_type = content_type + self.encoding = encoding + self.url = url + self.cache_link_parsing = cache_link_parsing + + def __str__(self) -> str: + return redact_auth_from_url(self.url) + + +class HTMLLinkParser(HTMLParser): + """ + HTMLParser that keeps the first base HREF and a list of all anchor + elements' attributes. + """ + + def __init__(self, url: str) -> None: + super().__init__(convert_charrefs=True) + + self.url: str = url + self.base_url: Optional[str] = None + self.anchors: List[Dict[str, Optional[str]]] = [] + + def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None: + if tag == "base" and self.base_url is None: + href = self.get_href(attrs) + if href is not None: + self.base_url = href + elif tag == "a": + self.anchors.append(dict(attrs)) + + def get_href(self, attrs: List[Tuple[str, Optional[str]]]) -> Optional[str]: + for name, value in attrs: + if name == "href": + return value + return None + + +def _handle_get_simple_fail( + link: Link, + reason: Union[str, Exception], + meth: Optional[Callable[..., None]] = None, +) -> None: + if meth is None: + meth = logger.debug + meth("Could not fetch URL %s: %s - skipping", link, reason) + + +def _make_index_content( + response: Response, cache_link_parsing: bool = True +) -> IndexContent: + encoding = _get_encoding_from_headers(response.headers) + return IndexContent( + response.content, + response.headers["Content-Type"], + encoding=encoding, + url=response.url, + cache_link_parsing=cache_link_parsing, + ) + + +def _get_index_content(link: Link, *, session: PipSession) -> Optional["IndexContent"]: + url = link.url.split("#", 1)[0] + + # Check for VCS schemes that do not support lookup as web pages. + vcs_scheme = _match_vcs_scheme(url) + if vcs_scheme: + logger.warning( + "Cannot look at %s URL %s because it does not support lookup as web pages.", + vcs_scheme, + link, + ) + return None + + # Tack index.html onto file:// URLs that point to directories + scheme, _, path, _, _, _ = urllib.parse.urlparse(url) + if scheme == "file" and os.path.isdir(urllib.request.url2pathname(path)): + # add trailing slash if not present so urljoin doesn't trim + # final segment + if not url.endswith("/"): + url += "/" + # TODO: In the future, it would be nice if pip supported PEP 691 + # style responses in the file:// URLs, however there's no + # standard file extension for application/vnd.pypi.simple.v1+json + # so we'll need to come up with something on our own. + url = urllib.parse.urljoin(url, "index.html") + logger.debug(" file: URL is directory, getting %s", url) + + try: + resp = _get_simple_response(url, session=session) + except _NotHTTP: + logger.warning( + "Skipping page %s because it looks like an archive, and cannot " + "be checked by a HTTP HEAD request.", + link, + ) + except _NotAPIContent as exc: + logger.warning( + "Skipping page %s because the %s request got Content-Type: %s. " + "The only supported Content-Types are application/vnd.pypi.simple.v1+json, " + "application/vnd.pypi.simple.v1+html, and text/html", + link, + exc.request_desc, + exc.content_type, + ) + except NetworkConnectionError as exc: + _handle_get_simple_fail(link, exc) + except RetryError as exc: + _handle_get_simple_fail(link, exc) + except SSLError as exc: + reason = "There was a problem confirming the ssl certificate: " + reason += str(exc) + _handle_get_simple_fail(link, reason, meth=logger.info) + except requests.ConnectionError as exc: + _handle_get_simple_fail(link, f"connection error: {exc}") + except requests.Timeout: + _handle_get_simple_fail(link, "timed out") + else: + return _make_index_content(resp, cache_link_parsing=link.cache_link_parsing) + return None + + +class CollectedSources(NamedTuple): + find_links: Sequence[Optional[LinkSource]] + index_urls: Sequence[Optional[LinkSource]] + + +class LinkCollector: + + """ + Responsible for collecting Link objects from all configured locations, + making network requests as needed. + + The class's main method is its collect_sources() method. + """ + + def __init__( + self, + session: PipSession, + search_scope: SearchScope, + ) -> None: + self.search_scope = search_scope + self.session = session + + @classmethod + def create( + cls, + session: PipSession, + options: Values, + suppress_no_index: bool = False, + ) -> "LinkCollector": + """ + :param session: The Session to use to make requests. + :param suppress_no_index: Whether to ignore the --no-index option + when constructing the SearchScope object. + """ + index_urls = [options.index_url] + options.extra_index_urls + if options.no_index and not suppress_no_index: + logger.debug( + "Ignoring indexes: %s", + ",".join(redact_auth_from_url(url) for url in index_urls), + ) + index_urls = [] + + # Make sure find_links is a list before passing to create(). + find_links = options.find_links or [] + + search_scope = SearchScope.create( + find_links=find_links, + index_urls=index_urls, + no_index=options.no_index, + ) + link_collector = LinkCollector( + session=session, + search_scope=search_scope, + ) + return link_collector + + @property + def find_links(self) -> List[str]: + return self.search_scope.find_links + + def fetch_response(self, location: Link) -> Optional[IndexContent]: + """ + Fetch an HTML page containing package links. + """ + return _get_index_content(location, session=self.session) + + def collect_sources( + self, + project_name: str, + candidates_from_page: CandidatesFromPage, + ) -> CollectedSources: + # The OrderedDict calls deduplicate sources by URL. + index_url_sources = collections.OrderedDict( + build_source( + loc, + candidates_from_page=candidates_from_page, + page_validator=self.session.is_secure_origin, + expand_dir=False, + cache_link_parsing=False, + project_name=project_name, + ) + for loc in self.search_scope.get_index_urls_locations(project_name) + ).values() + find_links_sources = collections.OrderedDict( + build_source( + loc, + candidates_from_page=candidates_from_page, + page_validator=self.session.is_secure_origin, + expand_dir=True, + cache_link_parsing=True, + project_name=project_name, + ) + for loc in self.find_links + ).values() + + if logger.isEnabledFor(logging.DEBUG): + lines = [ + f"* {s.link}" + for s in itertools.chain(find_links_sources, index_url_sources) + if s is not None and s.link is not None + ] + lines = [ + f"{len(lines)} location(s) to search " + f"for versions of {project_name}:" + ] + lines + logger.debug("\n".join(lines)) + + return CollectedSources( + find_links=list(find_links_sources), + index_urls=list(index_url_sources), + ) diff --git a/venv/lib/python3.11/site-packages/pip/_internal/index/package_finder.py b/venv/lib/python3.11/site-packages/pip/_internal/index/package_finder.py new file mode 100644 index 0000000..ec9ebc3 --- /dev/null +++ b/venv/lib/python3.11/site-packages/pip/_internal/index/package_finder.py @@ -0,0 +1,1027 @@ +"""Routines related to PyPI, indexes""" + +import enum +import functools +import itertools +import logging +import re +from typing import TYPE_CHECKING, FrozenSet, Iterable, List, Optional, Set, Tuple, Union + +from pip._vendor.packaging import specifiers +from pip._vendor.packaging.tags import Tag +from pip._vendor.packaging.utils import canonicalize_name +from pip._vendor.packaging.version import _BaseVersion +from pip._vendor.packaging.version import parse as parse_version + +from pip._internal.exceptions import ( + BestVersionAlreadyInstalled, + DistributionNotFound, + InvalidWheelFilename, + UnsupportedWheel, +) +from pip._internal.index.collector import LinkCollector, parse_links +from pip._internal.models.candidate import InstallationCandidate +from pip._internal.models.format_control import FormatControl +from pip._internal.models.link import Link +from pip._internal.models.search_scope import SearchScope +from pip._internal.models.selection_prefs import SelectionPreferences +from pip._internal.models.target_python import TargetPython +from pip._internal.models.wheel import Wheel +from pip._internal.req import InstallRequirement +from pip._internal.utils._log import getLogger +from pip._internal.utils.filetypes import WHEEL_EXTENSION +from pip._internal.utils.hashes import Hashes +from pip._internal.utils.logging import indent_log +from pip._internal.utils.misc import build_netloc +from pip._internal.utils.packaging import check_requires_python +from pip._internal.utils.unpacking import SUPPORTED_EXTENSIONS + +if TYPE_CHECKING: + from pip._vendor.typing_extensions import TypeGuard + +__all__ = ["FormatControl", "BestCandidateResult", "PackageFinder"] + + +logger = getLogger(__name__) + +BuildTag = Union[Tuple[()], Tuple[int, str]] +CandidateSortingKey = Tuple[int, int, int, _BaseVersion, Optional[int], BuildTag] + + +def _check_link_requires_python( + link: Link, + version_info: Tuple[int, int, int], + ignore_requires_python: bool = False, +) -> bool: + """ + Return whether the given Python version is compatible with a link's + "Requires-Python" value. + + :param version_info: A 3-tuple of ints representing the Python + major-minor-micro version to check. + :param ignore_requires_python: Whether to ignore the "Requires-Python" + value if the given Python version isn't compatible. + """ + try: + is_compatible = check_requires_python( + link.requires_python, + version_info=version_info, + ) + except specifiers.InvalidSpecifier: + logger.debug( + "Ignoring invalid Requires-Python (%r) for link: %s", + link.requires_python, + link, + ) + else: + if not is_compatible: + version = ".".join(map(str, version_info)) + if not ignore_requires_python: + logger.verbose( + "Link requires a different Python (%s not in: %r): %s", + version, + link.requires_python, + link, + ) + return False + + logger.debug( + "Ignoring failed Requires-Python check (%s not in: %r) for link: %s", + version, + link.requires_python, + link, + ) + + return True + + +class LinkType(enum.Enum): + candidate = enum.auto() + different_project = enum.auto() + yanked = enum.auto() + format_unsupported = enum.auto() + format_invalid = enum.auto() + platform_mismatch = enum.auto() + requires_python_mismatch = enum.auto() + + +class LinkEvaluator: + + """ + Responsible for evaluating links for a particular project. + """ + + _py_version_re = re.compile(r"-py([123]\.?[0-9]?)$") + + # Don't include an allow_yanked default value to make sure each call + # site considers whether yanked releases are allowed. This also causes + # that decision to be made explicit in the calling code, which helps + # people when reading the code. + def __init__( + self, + project_name: str, + canonical_name: str, + formats: FrozenSet[str], + target_python: TargetPython, + allow_yanked: bool, + ignore_requires_python: Optional[bool] = None, + ) -> None: + """ + :param project_name: The user supplied package name. + :param canonical_name: The canonical package name. + :param formats: The formats allowed for this package. Should be a set + with 'binary' or 'source' or both in it. + :param target_python: The target Python interpreter to use when + evaluating link compatibility. This is used, for example, to + check wheel compatibility, as well as when checking the Python + version, e.g. the Python version embedded in a link filename + (or egg fragment) and against an HTML link's optional PEP 503 + "data-requires-python" attribute. + :param allow_yanked: Whether files marked as yanked (in the sense + of PEP 592) are permitted to be candidates for install. + :param ignore_requires_python: Whether to ignore incompatible + PEP 503 "data-requires-python" values in HTML links. Defaults + to False. + """ + if ignore_requires_python is None: + ignore_requires_python = False + + self._allow_yanked = allow_yanked + self._canonical_name = canonical_name + self._ignore_requires_python = ignore_requires_python + self._formats = formats + self._target_python = target_python + + self.project_name = project_name + + def evaluate_link(self, link: Link) -> Tuple[LinkType, str]: + """ + Determine whether a link is a candidate for installation. + + :return: A tuple (result, detail), where *result* is an enum + representing whether the evaluation found a candidate, or the reason + why one is not found. If a candidate is found, *detail* will be the + candidate's version string; if one is not found, it contains the + reason the link fails to qualify. + """ + version = None + if link.is_yanked and not self._allow_yanked: + reason = link.yanked_reason or "<none given>" + return (LinkType.yanked, f"yanked for reason: {reason}") + + if link.egg_fragment: + egg_info = link.egg_fragment + ext = link.ext + else: + egg_info, ext = link.splitext() + if not ext: + return (LinkType.format_unsupported, "not a file") + if ext not in SUPPORTED_EXTENSIONS: + return ( + LinkType.format_unsupported, + f"unsupported archive format: {ext}", + ) + if "binary" not in self._formats and ext == WHEEL_EXTENSION: + reason = f"No binaries permitted for {self.project_name}" + return (LinkType.format_unsupported, reason) + if "macosx10" in link.path and ext == ".zip": + return (LinkType.format_unsupported, "macosx10 one") + if ext == WHEEL_EXTENSION: + try: + wheel = Wheel(link.filename) + except InvalidWheelFilename: + return ( + LinkType.format_invalid, + "invalid wheel filename", + ) + if canonicalize_name(wheel.name) != self._canonical_name: + reason = f"wrong project name (not {self.project_name})" + return (LinkType.different_project, reason) + + supported_tags = self._target_python.get_unsorted_tags() + if not wheel.supported(supported_tags): + # Include the wheel's tags in the reason string to + # simplify troubleshooting compatibility issues. + file_tags = ", ".join(wheel.get_formatted_file_tags()) + reason = ( + f"none of the wheel's tags ({file_tags}) are compatible " + f"(run pip debug --verbose to show compatible tags)" + ) + return (LinkType.platform_mismatch, reason) + + version = wheel.version + + # This should be up by the self.ok_binary check, but see issue 2700. + if "source" not in self._formats and ext != WHEEL_EXTENSION: + reason = f"No sources permitted for {self.project_name}" + return (LinkType.format_unsupported, reason) + + if not version: + version = _extract_version_from_fragment( + egg_info, + self._canonical_name, + ) + if not version: + reason = f"Missing project version for {self.project_name}" + return (LinkType.format_invalid, reason) + + match = self._py_version_re.search(version) + if match: + version = version[: match.start()] + py_version = match.group(1) + if py_version != self._target_python.py_version: + return ( + LinkType.platform_mismatch, + "Python version is incorrect", + ) + + supports_python = _check_link_requires_python( + link, + version_info=self._target_python.py_version_info, + ignore_requires_python=self._ignore_requires_python, + ) + if not supports_python: + reason = f"{version} Requires-Python {link.requires_python}" + return (LinkType.requires_python_mismatch, reason) + + logger.debug("Found link %s, version: %s", link, version) + + return (LinkType.candidate, version) + + +def filter_unallowed_hashes( + candidates: List[InstallationCandidate], + hashes: Optional[Hashes], + project_name: str, +) -> List[InstallationCandidate]: + """ + Filter out candidates whose hashes aren't allowed, and return a new + list of candidates. + + If at least one candidate has an allowed hash, then all candidates with + either an allowed hash or no hash specified are returned. Otherwise, + the given candidates are returned. + + Including the candidates with no hash specified when there is a match + allows a warning to be logged if there is a more preferred candidate + with no hash specified. Returning all candidates in the case of no + matches lets pip report the hash of the candidate that would otherwise + have been installed (e.g. permitting the user to more easily update + their requirements file with the desired hash). + """ + if not hashes: + logger.debug( + "Given no hashes to check %s links for project %r: " + "discarding no candidates", + len(candidates), + project_name, + ) + # Make sure we're not returning back the given value. + return list(candidates) + + matches_or_no_digest = [] + # Collect the non-matches for logging purposes. + non_matches = [] + match_count = 0 + for candidate in candidates: + link = candidate.link + if not link.has_hash: + pass + elif link.is_hash_allowed(hashes=hashes): + match_count += 1 + else: + non_matches.append(candidate) + continue + + matches_or_no_digest.append(candidate) + + if match_count: + filtered = matches_or_no_digest + else: + # Make sure we're not returning back the given value. + filtered = list(candidates) + + if len(filtered) == len(candidates): + discard_message = "discarding no candidates" + else: + discard_message = "discarding {} non-matches:\n {}".format( + len(non_matches), + "\n ".join(str(candidate.link) for candidate in non_matches), + ) + + logger.debug( + "Checked %s links for project %r against %s hashes " + "(%s matches, %s no digest): %s", + len(candidates), + project_name, + hashes.digest_count, + match_count, + len(matches_or_no_digest) - match_count, + discard_message, + ) + + return filtered + + +class CandidatePreferences: + + """ + Encapsulates some of the preferences for filtering and sorting + InstallationCandidate objects. + """ + + def __init__( + self, + prefer_binary: bool = False, + allow_all_prereleases: bool = False, + ) -> None: + """ + :param allow_all_prereleases: Whether to allow all pre-releases. + """ + self.allow_all_prereleases = allow_all_prereleases + self.prefer_binary = prefer_binary + + +class BestCandidateResult: + """A collection of candidates, returned by `PackageFinder.find_best_candidate`. + + This class is only intended to be instantiated by CandidateEvaluator's + `compute_best_candidate()` method. + """ + + def __init__( + self, + candidates: List[InstallationCandidate], + applicable_candidates: List[InstallationCandidate], + best_candidate: Optional[InstallationCandidate], + ) -> None: + """ + :param candidates: A sequence of all available candidates found. + :param applicable_candidates: The applicable candidates. + :param best_candidate: The most preferred candidate found, or None + if no applicable candidates were found. + """ + assert set(applicable_candidates) <= set(candidates) + + if best_candidate is None: + assert not applicable_candidates + else: + assert best_candidate in applicable_candidates + + self._applicable_candidates = applicable_candidates + self._candidates = candidates + + self.best_candidate = best_candidate + + def iter_all(self) -> Iterable[InstallationCandidate]: + """Iterate through all candidates.""" + return iter(self._candidates) + + def iter_applicable(self) -> Iterable[InstallationCandidate]: + """Iterate through the applicable candidates.""" + return iter(self._applicable_candidates) + + +class CandidateEvaluator: + + """ + Responsible for filtering and sorting candidates for installation based + on what tags are valid. + """ + + @classmethod + def create( + cls, + project_name: str, + target_python: Optional[TargetPython] = None, + prefer_binary: bool = False, + allow_all_prereleases: bool = False, + specifier: Optional[specifiers.BaseSpecifier] = None, + hashes: Optional[Hashes] = None, + ) -> "CandidateEvaluator": + """Create a CandidateEvaluator object. + + :param target_python: The target Python interpreter to use when + checking compatibility. If None (the default), a TargetPython + object will be constructed from the running Python. + :param specifier: An optional object implementing `filter` + (e.g. `packaging.specifiers.SpecifierSet`) to filter applicable + versions. + :param hashes: An optional collection of allowed hashes. + """ + if target_python is None: + target_python = TargetPython() + if specifier is None: + specifier = specifiers.SpecifierSet() + + supported_tags = target_python.get_sorted_tags() + + return cls( + project_name=project_name, + supported_tags=supported_tags, + specifier=specifier, + prefer_binary=prefer_binary, + allow_all_prereleases=allow_all_prereleases, + hashes=hashes, + ) + + def __init__( + self, + project_name: str, + supported_tags: List[Tag], + specifier: specifiers.BaseSpecifier, + prefer_binary: bool = False, + allow_all_prereleases: bool = False, + hashes: Optional[Hashes] = None, + ) -> None: + """ + :param supported_tags: The PEP 425 tags supported by the target + Python in order of preference (most preferred first). + """ + self._allow_all_prereleases = allow_all_prereleases + self._hashes = hashes + self._prefer_binary = prefer_binary + self._project_name = project_name + self._specifier = specifier + self._supported_tags = supported_tags + # Since the index of the tag in the _supported_tags list is used + # as a priority, precompute a map from tag to index/priority to be + # used in wheel.find_most_preferred_tag. + self._wheel_tag_preferences = { + tag: idx for idx, tag in enumerate(supported_tags) + } + + def get_applicable_candidates( + self, + candidates: List[InstallationCandidate], + ) -> List[InstallationCandidate]: + """ + Return the applicable candidates from a list of candidates. + """ + # Using None infers from the specifier instead. + allow_prereleases = self._allow_all_prereleases or None + specifier = self._specifier + versions = { + str(v) + for v in specifier.filter( + # We turn the version object into a str here because otherwise + # when we're debundled but setuptools isn't, Python will see + # packaging.version.Version and + # pkg_resources._vendor.packaging.version.Version as different + # types. This way we'll use a str as a common data interchange + # format. If we stop using the pkg_resources provided specifier + # and start using our own, we can drop the cast to str(). + (str(c.version) for c in candidates), + prereleases=allow_prereleases, + ) + } + + # Again, converting version to str to deal with debundling. + applicable_candidates = [c for c in candidates if str(c.version) in versions] + + filtered_applicable_candidates = filter_unallowed_hashes( + candidates=applicable_candidates, + hashes=self._hashes, + project_name=self._project_name, + ) + + return sorted(filtered_applicable_candidates, key=self._sort_key) + + def _sort_key(self, candidate: InstallationCandidate) -> CandidateSortingKey: + """ + Function to pass as the `key` argument to a call to sorted() to sort + InstallationCandidates by preference. + + Returns a tuple such that tuples sorting as greater using Python's + default comparison operator are more preferred. + + The preference is as follows: + + First and foremost, candidates with allowed (matching) hashes are + always preferred over candidates without matching hashes. This is + because e.g. if the only candidate with an allowed hash is yanked, + we still want to use that candidate. + + Second, excepting hash considerations, candidates that have been + yanked (in the sense of PEP 592) are always less preferred than + candidates that haven't been yanked. Then: + + If not finding wheels, they are sorted by version only. + If finding wheels, then the sort order is by version, then: + 1. existing installs + 2. wheels ordered via Wheel.support_index_min(self._supported_tags) + 3. source archives + If prefer_binary was set, then all wheels are sorted above sources. + + Note: it was considered to embed this logic into the Link + comparison operators, but then different sdist links + with the same version, would have to be considered equal + """ + valid_tags = self._supported_tags + support_num = len(valid_tags) + build_tag: BuildTag = () + binary_preference = 0 + link = candidate.link + if link.is_wheel: + # can raise InvalidWheelFilename + wheel = Wheel(link.filename) + try: + pri = -( + wheel.find_most_preferred_tag( + valid_tags, self._wheel_tag_preferences + ) + ) + except ValueError: + raise UnsupportedWheel( + f"{wheel.filename} is not a supported wheel for this platform. It " + "can't be sorted." + ) + if self._prefer_binary: + binary_preference = 1 + if wheel.build_tag is not None: + match = re.match(r"^(\d+)(.*)$", wheel.build_tag) + assert match is not None, "guaranteed by filename validation" + build_tag_groups = match.groups() + build_tag = (int(build_tag_groups[0]), build_tag_groups[1]) + else: # sdist + pri = -(support_num) + has_allowed_hash = int(link.is_hash_allowed(self._hashes)) + yank_value = -1 * int(link.is_yanked) # -1 for yanked. + return ( + has_allowed_hash, + yank_value, + binary_preference, + candidate.version, + pri, + build_tag, + ) + + def sort_best_candidate( + self, + candidates: List[InstallationCandidate], + ) -> Optional[InstallationCandidate]: + """ + Return the best candidate per the instance's sort order, or None if + no candidate is acceptable. + """ + if not candidates: + return None + best_candidate = max(candidates, key=self._sort_key) + return best_candidate + + def compute_best_candidate( + self, + candidates: List[InstallationCandidate], + ) -> BestCandidateResult: + """ + Compute and return a `BestCandidateResult` instance. + """ + applicable_candidates = self.get_applicable_candidates(candidates) + + best_candidate = self.sort_best_candidate(applicable_candidates) + + return BestCandidateResult( + candidates, + applicable_candidates=applicable_candidates, + best_candidate=best_candidate, + ) + + +class PackageFinder: + """This finds packages. + + This is meant to match easy_install's technique for looking for + packages, by reading pages and looking for appropriate links. + """ + + def __init__( + self, + link_collector: LinkCollector, + target_python: TargetPython, + allow_yanked: bool, + format_control: Optional[FormatControl] = None, + candidate_prefs: Optional[CandidatePreferences] = None, + ignore_requires_python: Optional[bool] = None, + ) -> None: + """ + This constructor is primarily meant to be used by the create() class + method and from tests. + + :param format_control: A FormatControl object, used to control + the selection of source packages / binary packages when consulting + the index and links. + :param candidate_prefs: Options to use when creating a + CandidateEvaluator object. + """ + if candidate_prefs is None: + candidate_prefs = CandidatePreferences() + + format_control = format_control or FormatControl(set(), set()) + + self._allow_yanked = allow_yanked + self._candidate_prefs = candidate_prefs + self._ignore_requires_python = ignore_requires_python + self._link_collector = link_collector + self._target_python = target_python + + self.format_control = format_control + + # These are boring links that have already been logged somehow. + self._logged_links: Set[Tuple[Link, LinkType, str]] = set() + + # Don't include an allow_yanked default value to make sure each call + # site considers whether yanked releases are allowed. This also causes + # that decision to be made explicit in the calling code, which helps + # people when reading the code. + @classmethod + def create( + cls, + link_collector: LinkCollector, + selection_prefs: SelectionPreferences, + target_python: Optional[TargetPython] = None, + ) -> "PackageFinder": + """Create a PackageFinder. + + :param selection_prefs: The candidate selection preferences, as a + SelectionPreferences object. + :param target_python: The target Python interpreter to use when + checking compatibility. If None (the default), a TargetPython + object will be constructed from the running Python. + """ + if target_python is None: + target_python = TargetPython() + + candidate_prefs = CandidatePreferences( + prefer_binary=selection_prefs.prefer_binary, + allow_all_prereleases=selection_prefs.allow_all_prereleases, + ) + + return cls( + candidate_prefs=candidate_prefs, + link_collector=link_collector, + target_python=target_python, + allow_yanked=selection_prefs.allow_yanked, + format_control=selection_prefs.format_control, + ignore_requires_python=selection_prefs.ignore_requires_python, + ) + + @property + def target_python(self) -> TargetPython: + return self._target_python + + @property + def search_scope(self) -> SearchScope: + return self._link_collector.search_scope + + @search_scope.setter + def search_scope(self, search_scope: SearchScope) -> None: + self._link_collector.search_scope = search_scope + + @property + def find_links(self) -> List[str]: + return self._link_collector.find_links + + @property + def index_urls(self) -> List[str]: + return self.search_scope.index_urls + + @property + def trusted_hosts(self) -> Iterable[str]: + for host_port in self._link_collector.session.pip_trusted_origins: + yield build_netloc(*host_port) + + @property + def allow_all_prereleases(self) -> bool: + return self._candidate_prefs.allow_all_prereleases + + def set_allow_all_prereleases(self) -> None: + self._candidate_prefs.allow_all_prereleases = True + + @property + def prefer_binary(self) -> bool: + return self._candidate_prefs.prefer_binary + + def set_prefer_binary(self) -> None: + self._candidate_prefs.prefer_binary = True + + def requires_python_skipped_reasons(self) -> List[str]: + reasons = { + detail + for _, result, detail in self._logged_links + if result == LinkType.requires_python_mismatch + } + return sorted(reasons) + + def make_link_evaluator(self, project_name: str) -> LinkEvaluator: + canonical_name = canonicalize_name(project_name) + formats = self.format_control.get_allowed_formats(canonical_name) + + return LinkEvaluator( + project_name=project_name, + canonical_name=canonical_name, + formats=formats, + target_python=self._target_python, + allow_yanked=self._allow_yanked, + ignore_requires_python=self._ignore_requires_python, + ) + + def _sort_links(self, links: Iterable[Link]) -> List[Link]: + """ + Returns elements of links in order, non-egg links first, egg links + second, while eliminating duplicates + """ + eggs, no_eggs = [], [] + seen: Set[Link] = set() + for link in links: + if link not in seen: + seen.add(link) + if link.egg_fragment: + eggs.append(link) + else: + no_eggs.append(link) + return no_eggs + eggs + + def _log_skipped_link(self, link: Link, result: LinkType, detail: str) -> None: + entry = (link, result, detail) + if entry not in self._logged_links: + # Put the link at the end so the reason is more visible and because + # the link string is usually very long. + logger.debug("Skipping link: %s: %s", detail, link) + self._logged_links.add(entry) + + def get_install_candidate( + self, link_evaluator: LinkEvaluator, link: Link + ) -> Optional[InstallationCandidate]: + """ + If the link is a candidate for install, convert it to an + InstallationCandidate and return it. Otherwise, return None. + """ + result, detail = link_evaluator.evaluate_link(link) + if result != LinkType.candidate: + self._log_skipped_link(link, result, detail) + return None + + return InstallationCandidate( + name=link_evaluator.project_name, + link=link, + version=detail, + ) + + def evaluate_links( + self, link_evaluator: LinkEvaluator, links: Iterable[Link] + ) -> List[InstallationCandidate]: + """ + Convert links that are candidates to InstallationCandidate objects. + """ + candidates = [] + for link in self._sort_links(links): + candidate = self.get_install_candidate(link_evaluator, link) + if candidate is not None: + candidates.append(candidate) + + return candidates + + def process_project_url( + self, project_url: Link, link_evaluator: LinkEvaluator + ) -> List[InstallationCandidate]: + logger.debug( + "Fetching project page and analyzing links: %s", + project_url, + ) + index_response = self._link_collector.fetch_response(project_url) + if index_response is None: + return [] + + page_links = list(parse_links(index_response)) + + with indent_log(): + package_links = self.evaluate_links( + link_evaluator, + links=page_links, + ) + + return package_links + + @functools.lru_cache(maxsize=None) + def find_all_candidates(self, project_name: str) -> List[InstallationCandidate]: + """Find all available InstallationCandidate for project_name + + This checks index_urls and find_links. + All versions found are returned as an InstallationCandidate list. + + See LinkEvaluator.evaluate_link() for details on which files + are accepted. + """ + link_evaluator = self.make_link_evaluator(project_name) + + collected_sources = self._link_collector.collect_sources( + project_name=project_name, + candidates_from_page=functools.partial( + self.process_project_url, + link_evaluator=link_evaluator, + ), + ) + + page_candidates_it = itertools.chain.from_iterable( + source.page_candidates() + for sources in collected_sources + for source in sources + if source is not None + ) + page_candidates = list(page_candidates_it) + + file_links_it = itertools.chain.from_iterable( + source.file_links() + for sources in collected_sources + for source in sources + if source is not None + ) + file_candidates = self.evaluate_links( + link_evaluator, + sorted(file_links_it, reverse=True), + ) + + if logger.isEnabledFor(logging.DEBUG) and file_candidates: + paths = [] + for candidate in file_candidates: + assert candidate.link.url # we need to have a URL + try: + paths.append(candidate.link.file_path) + except Exception: + paths.append(candidate.link.url) # it's not a local file + + logger.debug("Local files found: %s", ", ".join(paths)) + + # This is an intentional priority ordering + return file_candidates + page_candidates + + def make_candidate_evaluator( + self, + project_name: str, + specifier: Optional[specifiers.BaseSpecifier] = None, + hashes: Optional[Hashes] = None, + ) -> CandidateEvaluator: + """Create a CandidateEvaluator object to use.""" + candidate_prefs = self._candidate_prefs + return CandidateEvaluator.create( + project_name=project_name, + target_python=self._target_python, + prefer_binary=candidate_prefs.prefer_binary, + allow_all_prereleases=candidate_prefs.allow_all_prereleases, + specifier=specifier, + hashes=hashes, + ) + + @functools.lru_cache(maxsize=None) + def find_best_candidate( + self, + project_name: str, + specifier: Optional[specifiers.BaseSpecifier] = None, + hashes: Optional[Hashes] = None, + ) -> BestCandidateResult: + """Find matches for the given project and specifier. + + :param specifier: An optional object implementing `filter` + (e.g. `packaging.specifiers.SpecifierSet`) to filter applicable + versions. + + :return: A `BestCandidateResult` instance. + """ + candidates = self.find_all_candidates(project_name) + candidate_evaluator = self.make_candidate_evaluator( + project_name=project_name, + specifier=specifier, + hashes=hashes, + ) + return candidate_evaluator.compute_best_candidate(candidates) + + def find_requirement( + self, req: InstallRequirement, upgrade: bool + ) -> Optional[InstallationCandidate]: + """Try to find a Link matching req + + Expects req, an InstallRequirement and upgrade, a boolean + Returns a InstallationCandidate if found, + Raises DistributionNotFound or BestVersionAlreadyInstalled otherwise + """ + hashes = req.hashes(trust_internet=False) + best_candidate_result = self.find_best_candidate( + req.name, + specifier=req.specifier, + hashes=hashes, + ) + best_candidate = best_candidate_result.best_candidate + + installed_version: Optional[_BaseVersion] = None + if req.satisfied_by is not None: + installed_version = req.satisfied_by.version + + def _format_versions(cand_iter: Iterable[InstallationCandidate]) -> str: + # This repeated parse_version and str() conversion is needed to + # handle different vendoring sources from pip and pkg_resources. + # If we stop using the pkg_resources provided specifier and start + # using our own, we can drop the cast to str(). + return ( + ", ".join( + sorted( + {str(c.version) for c in cand_iter}, + key=parse_version, + ) + ) + or "none" + ) + + if installed_version is None and best_candidate is None: + logger.critical( + "Could not find a version that satisfies the requirement %s " + "(from versions: %s)", + req, + _format_versions(best_candidate_result.iter_all()), + ) + + raise DistributionNotFound(f"No matching distribution found for {req}") + + def _should_install_candidate( + candidate: Optional[InstallationCandidate], + ) -> "TypeGuard[InstallationCandidate]": + if installed_version is None: + return True + if best_candidate is None: + return False + return best_candidate.version > installed_version + + if not upgrade and installed_version is not None: + if _should_install_candidate(best_candidate): + logger.debug( + "Existing installed version (%s) satisfies requirement " + "(most up-to-date version is %s)", + installed_version, + best_candidate.version, + ) + else: + logger.debug( + "Existing installed version (%s) is most up-to-date and " + "satisfies requirement", + installed_version, + ) + return None + + if _should_install_candidate(best_candidate): + logger.debug( + "Using version %s (newest of versions: %s)", + best_candidate.version, + _format_versions(best_candidate_result.iter_applicable()), + ) + return best_candidate + + # We have an existing version, and its the best version + logger.debug( + "Installed version (%s) is most up-to-date (past versions: %s)", + installed_version, + _format_versions(best_candidate_result.iter_applicable()), + ) + raise BestVersionAlreadyInstalled + + +def _find_name_version_sep(fragment: str, canonical_name: str) -> int: + """Find the separator's index based on the package's canonical name. + + :param fragment: A <package>+<version> filename "fragment" (stem) or + egg fragment. + :param canonical_name: The package's canonical name. + + This function is needed since the canonicalized name does not necessarily + have the same length as the egg info's name part. An example:: + + >>> fragment = 'foo__bar-1.0' + >>> canonical_name = 'foo-bar' + >>> _find_name_version_sep(fragment, canonical_name) + 8 + """ + # Project name and version must be separated by one single dash. Find all + # occurrences of dashes; if the string in front of it matches the canonical + # name, this is the one separating the name and version parts. + for i, c in enumerate(fragment): + if c != "-": + continue + if canonicalize_name(fragment[:i]) == canonical_name: + return i + raise ValueError(f"{fragment} does not match {canonical_name}") + + +def _extract_version_from_fragment(fragment: str, canonical_name: str) -> Optional[str]: + """Parse the version string from a <package>+<version> filename + "fragment" (stem) or egg fragment. + + :param fragment: The string to parse. E.g. foo-2.1 + :param canonical_name: The canonicalized name of the package this + belongs to. + """ + try: + version_start = _find_name_version_sep(fragment, canonical_name) + 1 + except ValueError: + return None + version = fragment[version_start:] + if not version: + return None + return version diff --git a/venv/lib/python3.11/site-packages/pip/_internal/index/sources.py b/venv/lib/python3.11/site-packages/pip/_internal/index/sources.py new file mode 100644 index 0000000..f4626d7 --- /dev/null +++ b/venv/lib/python3.11/site-packages/pip/_internal/index/sources.py @@ -0,0 +1,285 @@ +import logging +import mimetypes +import os +from collections import defaultdict +from typing import Callable, Dict, Iterable, List, Optional, Tuple + +from pip._vendor.packaging.utils import ( + InvalidSdistFilename, + InvalidVersion, + InvalidWheelFilename, + canonicalize_name, + parse_sdist_filename, + parse_wheel_filename, +) + +from pip._internal.models.candidate import InstallationCandidate +from pip._internal.models.link import Link +from pip._internal.utils.urls import path_to_url, url_to_path +from pip._internal.vcs import is_url + +logger = logging.getLogger(__name__) + +FoundCandidates = Iterable[InstallationCandidate] +FoundLinks = Iterable[Link] +CandidatesFromPage = Callable[[Link], Iterable[InstallationCandidate]] +PageValidator = Callable[[Link], bool] + + +class LinkSource: + @property + def link(self) -> Optional[Link]: + """Returns the underlying link, if there's one.""" + raise NotImplementedError() + + def page_candidates(self) -> FoundCandidates: + """Candidates found by parsing an archive listing HTML file.""" + raise NotImplementedError() + + def file_links(self) -> FoundLinks: + """Links found by specifying archives directly.""" + raise NotImplementedError() + + +def _is_html_file(file_url: str) -> bool: + return mimetypes.guess_type(file_url, strict=False)[0] == "text/html" + + +class _FlatDirectoryToUrls: + """Scans directory and caches results""" + + def __init__(self, path: str) -> None: + self._path = path + self._page_candidates: List[str] = [] + self._project_name_to_urls: Dict[str, List[str]] = defaultdict(list) + self._scanned_directory = False + + def _scan_directory(self) -> None: + """Scans directory once and populates both page_candidates + and project_name_to_urls at the same time + """ + for entry in os.scandir(self._path): + url = path_to_url(entry.path) + if _is_html_file(url): + self._page_candidates.append(url) + continue + + # File must have a valid wheel or sdist name, + # otherwise not worth considering as a package + try: + project_filename = parse_wheel_filename(entry.name)[0] + except (InvalidWheelFilename, InvalidVersion): + try: + project_filename = parse_sdist_filename(entry.name)[0] + except (InvalidSdistFilename, InvalidVersion): + continue + + self._project_name_to_urls[project_filename].append(url) + self._scanned_directory = True + + @property + def page_candidates(self) -> List[str]: + if not self._scanned_directory: + self._scan_directory() + + return self._page_candidates + + @property + def project_name_to_urls(self) -> Dict[str, List[str]]: + if not self._scanned_directory: + self._scan_directory() + + return self._project_name_to_urls + + +class _FlatDirectorySource(LinkSource): + """Link source specified by ``--find-links=<path-to-dir>``. + + This looks the content of the directory, and returns: + + * ``page_candidates``: Links listed on each HTML file in the directory. + * ``file_candidates``: Archives in the directory. + """ + + _paths_to_urls: Dict[str, _FlatDirectoryToUrls] = {} + + def __init__( + self, + candidates_from_page: CandidatesFromPage, + path: str, + project_name: str, + ) -> None: + self._candidates_from_page = candidates_from_page + self._project_name = canonicalize_name(project_name) + + # Get existing instance of _FlatDirectoryToUrls if it exists + if path in self._paths_to_urls: + self._path_to_urls = self._paths_to_urls[path] + else: + self._path_to_urls = _FlatDirectoryToUrls(path=path) + self._paths_to_urls[path] = self._path_to_urls + + @property + def link(self) -> Optional[Link]: + return None + + def page_candidates(self) -> FoundCandidates: + for url in self._path_to_urls.page_candidates: + yield from self._candidates_from_page(Link(url)) + + def file_links(self) -> FoundLinks: + for url in self._path_to_urls.project_name_to_urls[self._project_name]: + yield Link(url) + + +class _LocalFileSource(LinkSource): + """``--find-links=<path-or-url>`` or ``--[extra-]index-url=<path-or-url>``. + + If a URL is supplied, it must be a ``file:`` URL. If a path is supplied to + the option, it is converted to a URL first. This returns: + + * ``page_candidates``: Links listed on an HTML file. + * ``file_candidates``: The non-HTML file. + """ + + def __init__( + self, + candidates_from_page: CandidatesFromPage, + link: Link, + ) -> None: + self._candidates_from_page = candidates_from_page + self._link = link + + @property + def link(self) -> Optional[Link]: + return self._link + + def page_candidates(self) -> FoundCandidates: + if not _is_html_file(self._link.url): + return + yield from self._candidates_from_page(self._link) + + def file_links(self) -> FoundLinks: + if _is_html_file(self._link.url): + return + yield self._link + + +class _RemoteFileSource(LinkSource): + """``--find-links=<url>`` or ``--[extra-]index-url=<url>``. + + This returns: + + * ``page_candidates``: Links listed on an HTML file. + * ``file_candidates``: The non-HTML file. + """ + + def __init__( + self, + candidates_from_page: CandidatesFromPage, + page_validator: PageValidator, + link: Link, + ) -> None: + self._candidates_from_page = candidates_from_page + self._page_validator = page_validator + self._link = link + + @property + def link(self) -> Optional[Link]: + return self._link + + def page_candidates(self) -> FoundCandidates: + if not self._page_validator(self._link): + return + yield from self._candidates_from_page(self._link) + + def file_links(self) -> FoundLinks: + yield self._link + + +class _IndexDirectorySource(LinkSource): + """``--[extra-]index-url=<path-to-directory>``. + + This is treated like a remote URL; ``candidates_from_page`` contains logic + for this by appending ``index.html`` to the link. + """ + + def __init__( + self, + candidates_from_page: CandidatesFromPage, + link: Link, + ) -> None: + self._candidates_from_page = candidates_from_page + self._link = link + + @property + def link(self) -> Optional[Link]: + return self._link + + def page_candidates(self) -> FoundCandidates: + yield from self._candidates_from_page(self._link) + + def file_links(self) -> FoundLinks: + return () + + +def build_source( + location: str, + *, + candidates_from_page: CandidatesFromPage, + page_validator: PageValidator, + expand_dir: bool, + cache_link_parsing: bool, + project_name: str, +) -> Tuple[Optional[str], Optional[LinkSource]]: + path: Optional[str] = None + url: Optional[str] = None + if os.path.exists(location): # Is a local path. + url = path_to_url(location) + path = location + elif location.startswith("file:"): # A file: URL. + url = location + path = url_to_path(location) + elif is_url(location): + url = location + + if url is None: + msg = ( + "Location '%s' is ignored: " + "it is either a non-existing path or lacks a specific scheme." + ) + logger.warning(msg, location) + return (None, None) + + if path is None: + source: LinkSource = _RemoteFileSource( + candidates_from_page=candidates_from_page, + page_validator=page_validator, + link=Link(url, cache_link_parsing=cache_link_parsing), + ) + return (url, source) + + if os.path.isdir(path): + if expand_dir: + source = _FlatDirectorySource( + candidates_from_page=candidates_from_page, + path=path, + project_name=project_name, + ) + else: + source = _IndexDirectorySource( + candidates_from_page=candidates_from_page, + link=Link(url, cache_link_parsing=cache_link_parsing), + ) + return (url, source) + elif os.path.isfile(path): + source = _LocalFileSource( + candidates_from_page=candidates_from_page, + link=Link(url, cache_link_parsing=cache_link_parsing), + ) + return (url, source) + logger.warning( + "Location '%s' is ignored: it is neither a file nor a directory.", + location, + ) + return (url, None) |