summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/pip/_internal/vcs
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
commit6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch)
treeb1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/pip/_internal/vcs
parent4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff)
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/pip/_internal/vcs')
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/__init__.py15
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/__init__.cpython-311.pycbin0 -> 638 bytes
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/bazaar.cpython-311.pycbin0 -> 5863 bytes
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/git.cpython-311.pycbin0 -> 21378 bytes
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/mercurial.cpython-311.pycbin0 -> 8731 bytes
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/subversion.cpython-311.pycbin0 -> 14606 bytes
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/versioncontrol.cpython-311.pycbin0 -> 31770 bytes
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/bazaar.py112
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/git.py526
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/mercurial.py163
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/subversion.py324
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/versioncontrol.py705
12 files changed, 1845 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__init__.py b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__init__.py
new file mode 100644
index 0000000..b6beddb
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__init__.py
@@ -0,0 +1,15 @@
+# Expose a limited set of classes and functions so callers outside of
+# the vcs package don't need to import deeper than `pip._internal.vcs`.
+# (The test directory may still need to import from a vcs sub-package.)
+# Import all vcs modules to register each VCS in the VcsSupport object.
+import pip._internal.vcs.bazaar
+import pip._internal.vcs.git
+import pip._internal.vcs.mercurial
+import pip._internal.vcs.subversion # noqa: F401
+from pip._internal.vcs.versioncontrol import ( # noqa: F401
+ RemoteNotFoundError,
+ RemoteNotValidError,
+ is_url,
+ make_vcs_requirement_url,
+ vcs,
+)
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/__init__.cpython-311.pyc
new file mode 100644
index 0000000..058060c
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/__init__.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/bazaar.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/bazaar.cpython-311.pyc
new file mode 100644
index 0000000..874b93a
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/bazaar.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/git.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/git.cpython-311.pyc
new file mode 100644
index 0000000..6c092b0
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/git.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/mercurial.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/mercurial.cpython-311.pyc
new file mode 100644
index 0000000..4aa50d2
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/mercurial.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/subversion.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/subversion.cpython-311.pyc
new file mode 100644
index 0000000..8f4e3f2
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/subversion.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/versioncontrol.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/versioncontrol.cpython-311.pyc
new file mode 100644
index 0000000..0f06f47
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/versioncontrol.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/bazaar.py b/venv/lib/python3.11/site-packages/pip/_internal/vcs/bazaar.py
new file mode 100644
index 0000000..20a17ed
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/vcs/bazaar.py
@@ -0,0 +1,112 @@
+import logging
+from typing import List, Optional, Tuple
+
+from pip._internal.utils.misc import HiddenText, display_path
+from pip._internal.utils.subprocess import make_command
+from pip._internal.utils.urls import path_to_url
+from pip._internal.vcs.versioncontrol import (
+ AuthInfo,
+ RemoteNotFoundError,
+ RevOptions,
+ VersionControl,
+ vcs,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class Bazaar(VersionControl):
+ name = "bzr"
+ dirname = ".bzr"
+ repo_name = "branch"
+ schemes = (
+ "bzr+http",
+ "bzr+https",
+ "bzr+ssh",
+ "bzr+sftp",
+ "bzr+ftp",
+ "bzr+lp",
+ "bzr+file",
+ )
+
+ @staticmethod
+ def get_base_rev_args(rev: str) -> List[str]:
+ return ["-r", rev]
+
+ def fetch_new(
+ self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
+ ) -> None:
+ rev_display = rev_options.to_display()
+ logger.info(
+ "Checking out %s%s to %s",
+ url,
+ rev_display,
+ display_path(dest),
+ )
+ if verbosity <= 0:
+ flag = "--quiet"
+ elif verbosity == 1:
+ flag = ""
+ else:
+ flag = f"-{'v'*verbosity}"
+ cmd_args = make_command(
+ "checkout", "--lightweight", flag, rev_options.to_args(), url, dest
+ )
+ self.run_command(cmd_args)
+
+ def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
+ self.run_command(make_command("switch", url), cwd=dest)
+
+ def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
+ output = self.run_command(
+ make_command("info"), show_stdout=False, stdout_only=True, cwd=dest
+ )
+ if output.startswith("Standalone "):
+ # Older versions of pip used to create standalone branches.
+ # Convert the standalone branch to a checkout by calling "bzr bind".
+ cmd_args = make_command("bind", "-q", url)
+ self.run_command(cmd_args, cwd=dest)
+
+ cmd_args = make_command("update", "-q", rev_options.to_args())
+ self.run_command(cmd_args, cwd=dest)
+
+ @classmethod
+ def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
+ # hotfix the URL scheme after removing bzr+ from bzr+ssh:// re-add it
+ url, rev, user_pass = super().get_url_rev_and_auth(url)
+ if url.startswith("ssh://"):
+ url = "bzr+" + url
+ return url, rev, user_pass
+
+ @classmethod
+ def get_remote_url(cls, location: str) -> str:
+ urls = cls.run_command(
+ ["info"], show_stdout=False, stdout_only=True, cwd=location
+ )
+ for line in urls.splitlines():
+ line = line.strip()
+ for x in ("checkout of branch: ", "parent branch: "):
+ if line.startswith(x):
+ repo = line.split(x)[1]
+ if cls._is_local_repository(repo):
+ return path_to_url(repo)
+ return repo
+ raise RemoteNotFoundError
+
+ @classmethod
+ def get_revision(cls, location: str) -> str:
+ revision = cls.run_command(
+ ["revno"],
+ show_stdout=False,
+ stdout_only=True,
+ cwd=location,
+ )
+ return revision.splitlines()[-1]
+
+ @classmethod
+ def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
+ """Always assume the versions don't match"""
+ return False
+
+
+vcs.register(Bazaar)
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/git.py b/venv/lib/python3.11/site-packages/pip/_internal/vcs/git.py
new file mode 100644
index 0000000..8c242cf
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/vcs/git.py
@@ -0,0 +1,526 @@
+import logging
+import os.path
+import pathlib
+import re
+import urllib.parse
+import urllib.request
+from typing import List, Optional, Tuple
+
+from pip._internal.exceptions import BadCommand, InstallationError
+from pip._internal.utils.misc import HiddenText, display_path, hide_url
+from pip._internal.utils.subprocess import make_command
+from pip._internal.vcs.versioncontrol import (
+ AuthInfo,
+ RemoteNotFoundError,
+ RemoteNotValidError,
+ RevOptions,
+ VersionControl,
+ find_path_to_project_root_from_repo_root,
+ vcs,
+)
+
+urlsplit = urllib.parse.urlsplit
+urlunsplit = urllib.parse.urlunsplit
+
+
+logger = logging.getLogger(__name__)
+
+
+GIT_VERSION_REGEX = re.compile(
+ r"^git version " # Prefix.
+ r"(\d+)" # Major.
+ r"\.(\d+)" # Dot, minor.
+ r"(?:\.(\d+))?" # Optional dot, patch.
+ r".*$" # Suffix, including any pre- and post-release segments we don't care about.
+)
+
+HASH_REGEX = re.compile("^[a-fA-F0-9]{40}$")
+
+# SCP (Secure copy protocol) shorthand. e.g. 'git@example.com:foo/bar.git'
+SCP_REGEX = re.compile(
+ r"""^
+ # Optional user, e.g. 'git@'
+ (\w+@)?
+ # Server, e.g. 'github.com'.
+ ([^/:]+):
+ # The server-side path. e.g. 'user/project.git'. Must start with an
+ # alphanumeric character so as not to be confusable with a Windows paths
+ # like 'C:/foo/bar' or 'C:\foo\bar'.
+ (\w[^:]*)
+ $""",
+ re.VERBOSE,
+)
+
+
+def looks_like_hash(sha: str) -> bool:
+ return bool(HASH_REGEX.match(sha))
+
+
+class Git(VersionControl):
+ name = "git"
+ dirname = ".git"
+ repo_name = "clone"
+ schemes = (
+ "git+http",
+ "git+https",
+ "git+ssh",
+ "git+git",
+ "git+file",
+ )
+ # Prevent the user's environment variables from interfering with pip:
+ # https://github.com/pypa/pip/issues/1130
+ unset_environ = ("GIT_DIR", "GIT_WORK_TREE")
+ default_arg_rev = "HEAD"
+
+ @staticmethod
+ def get_base_rev_args(rev: str) -> List[str]:
+ return [rev]
+
+ def is_immutable_rev_checkout(self, url: str, dest: str) -> bool:
+ _, rev_options = self.get_url_rev_options(hide_url(url))
+ if not rev_options.rev:
+ return False
+ if not self.is_commit_id_equal(dest, rev_options.rev):
+ # the current commit is different from rev,
+ # which means rev was something else than a commit hash
+ return False
+ # return False in the rare case rev is both a commit hash
+ # and a tag or a branch; we don't want to cache in that case
+ # because that branch/tag could point to something else in the future
+ is_tag_or_branch = bool(self.get_revision_sha(dest, rev_options.rev)[0])
+ return not is_tag_or_branch
+
+ def get_git_version(self) -> Tuple[int, ...]:
+ version = self.run_command(
+ ["version"],
+ command_desc="git version",
+ show_stdout=False,
+ stdout_only=True,
+ )
+ match = GIT_VERSION_REGEX.match(version)
+ if not match:
+ logger.warning("Can't parse git version: %s", version)
+ return ()
+ return (int(match.group(1)), int(match.group(2)))
+
+ @classmethod
+ def get_current_branch(cls, location: str) -> Optional[str]:
+ """
+ Return the current branch, or None if HEAD isn't at a branch
+ (e.g. detached HEAD).
+ """
+ # git-symbolic-ref exits with empty stdout if "HEAD" is a detached
+ # HEAD rather than a symbolic ref. In addition, the -q causes the
+ # command to exit with status code 1 instead of 128 in this case
+ # and to suppress the message to stderr.
+ args = ["symbolic-ref", "-q", "HEAD"]
+ output = cls.run_command(
+ args,
+ extra_ok_returncodes=(1,),
+ show_stdout=False,
+ stdout_only=True,
+ cwd=location,
+ )
+ ref = output.strip()
+
+ if ref.startswith("refs/heads/"):
+ return ref[len("refs/heads/") :]
+
+ return None
+
+ @classmethod
+ def get_revision_sha(cls, dest: str, rev: str) -> Tuple[Optional[str], bool]:
+ """
+ Return (sha_or_none, is_branch), where sha_or_none is a commit hash
+ if the revision names a remote branch or tag, otherwise None.
+
+ Args:
+ dest: the repository directory.
+ rev: the revision name.
+ """
+ # Pass rev to pre-filter the list.
+ output = cls.run_command(
+ ["show-ref", rev],
+ cwd=dest,
+ show_stdout=False,
+ stdout_only=True,
+ on_returncode="ignore",
+ )
+ refs = {}
+ # NOTE: We do not use splitlines here since that would split on other
+ # unicode separators, which can be maliciously used to install a
+ # different revision.
+ for line in output.strip().split("\n"):
+ line = line.rstrip("\r")
+ if not line:
+ continue
+ try:
+ ref_sha, ref_name = line.split(" ", maxsplit=2)
+ except ValueError:
+ # Include the offending line to simplify troubleshooting if
+ # this error ever occurs.
+ raise ValueError(f"unexpected show-ref line: {line!r}")
+
+ refs[ref_name] = ref_sha
+
+ branch_ref = f"refs/remotes/origin/{rev}"
+ tag_ref = f"refs/tags/{rev}"
+
+ sha = refs.get(branch_ref)
+ if sha is not None:
+ return (sha, True)
+
+ sha = refs.get(tag_ref)
+
+ return (sha, False)
+
+ @classmethod
+ def _should_fetch(cls, dest: str, rev: str) -> bool:
+ """
+ Return true if rev is a ref or is a commit that we don't have locally.
+
+ Branches and tags are not considered in this method because they are
+ assumed to be always available locally (which is a normal outcome of
+ ``git clone`` and ``git fetch --tags``).
+ """
+ if rev.startswith("refs/"):
+ # Always fetch remote refs.
+ return True
+
+ if not looks_like_hash(rev):
+ # Git fetch would fail with abbreviated commits.
+ return False
+
+ if cls.has_commit(dest, rev):
+ # Don't fetch if we have the commit locally.
+ return False
+
+ return True
+
+ @classmethod
+ def resolve_revision(
+ cls, dest: str, url: HiddenText, rev_options: RevOptions
+ ) -> RevOptions:
+ """
+ Resolve a revision to a new RevOptions object with the SHA1 of the
+ branch, tag, or ref if found.
+
+ Args:
+ rev_options: a RevOptions object.
+ """
+ rev = rev_options.arg_rev
+ # The arg_rev property's implementation for Git ensures that the
+ # rev return value is always non-None.
+ assert rev is not None
+
+ sha, is_branch = cls.get_revision_sha(dest, rev)
+
+ if sha is not None:
+ rev_options = rev_options.make_new(sha)
+ rev_options.branch_name = rev if is_branch else None
+
+ return rev_options
+
+ # Do not show a warning for the common case of something that has
+ # the form of a Git commit hash.
+ if not looks_like_hash(rev):
+ logger.warning(
+ "Did not find branch or tag '%s', assuming revision or ref.",
+ rev,
+ )
+
+ if not cls._should_fetch(dest, rev):
+ return rev_options
+
+ # fetch the requested revision
+ cls.run_command(
+ make_command("fetch", "-q", url, rev_options.to_args()),
+ cwd=dest,
+ )
+ # Change the revision to the SHA of the ref we fetched
+ sha = cls.get_revision(dest, rev="FETCH_HEAD")
+ rev_options = rev_options.make_new(sha)
+
+ return rev_options
+
+ @classmethod
+ def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
+ """
+ Return whether the current commit hash equals the given name.
+
+ Args:
+ dest: the repository directory.
+ name: a string name.
+ """
+ if not name:
+ # Then avoid an unnecessary subprocess call.
+ return False
+
+ return cls.get_revision(dest) == name
+
+ def fetch_new(
+ self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
+ ) -> None:
+ rev_display = rev_options.to_display()
+ logger.info("Cloning %s%s to %s", url, rev_display, display_path(dest))
+ if verbosity <= 0:
+ flags: Tuple[str, ...] = ("--quiet",)
+ elif verbosity == 1:
+ flags = ()
+ else:
+ flags = ("--verbose", "--progress")
+ if self.get_git_version() >= (2, 17):
+ # Git added support for partial clone in 2.17
+ # https://git-scm.com/docs/partial-clone
+ # Speeds up cloning by functioning without a complete copy of repository
+ self.run_command(
+ make_command(
+ "clone",
+ "--filter=blob:none",
+ *flags,
+ url,
+ dest,
+ )
+ )
+ else:
+ self.run_command(make_command("clone", *flags, url, dest))
+
+ if rev_options.rev:
+ # Then a specific revision was requested.
+ rev_options = self.resolve_revision(dest, url, rev_options)
+ branch_name = getattr(rev_options, "branch_name", None)
+ logger.debug("Rev options %s, branch_name %s", rev_options, branch_name)
+ if branch_name is None:
+ # Only do a checkout if the current commit id doesn't match
+ # the requested revision.
+ if not self.is_commit_id_equal(dest, rev_options.rev):
+ cmd_args = make_command(
+ "checkout",
+ "-q",
+ rev_options.to_args(),
+ )
+ self.run_command(cmd_args, cwd=dest)
+ elif self.get_current_branch(dest) != branch_name:
+ # Then a specific branch was requested, and that branch
+ # is not yet checked out.
+ track_branch = f"origin/{branch_name}"
+ cmd_args = [
+ "checkout",
+ "-b",
+ branch_name,
+ "--track",
+ track_branch,
+ ]
+ self.run_command(cmd_args, cwd=dest)
+ else:
+ sha = self.get_revision(dest)
+ rev_options = rev_options.make_new(sha)
+
+ logger.info("Resolved %s to commit %s", url, rev_options.rev)
+
+ #: repo may contain submodules
+ self.update_submodules(dest)
+
+ def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
+ self.run_command(
+ make_command("config", "remote.origin.url", url),
+ cwd=dest,
+ )
+ cmd_args = make_command("checkout", "-q", rev_options.to_args())
+ self.run_command(cmd_args, cwd=dest)
+
+ self.update_submodules(dest)
+
+ def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
+ # First fetch changes from the default remote
+ if self.get_git_version() >= (1, 9):
+ # fetch tags in addition to everything else
+ self.run_command(["fetch", "-q", "--tags"], cwd=dest)
+ else:
+ self.run_command(["fetch", "-q"], cwd=dest)
+ # Then reset to wanted revision (maybe even origin/master)
+ rev_options = self.resolve_revision(dest, url, rev_options)
+ cmd_args = make_command("reset", "--hard", "-q", rev_options.to_args())
+ self.run_command(cmd_args, cwd=dest)
+ #: update submodules
+ self.update_submodules(dest)
+
+ @classmethod
+ def get_remote_url(cls, location: str) -> str:
+ """
+ Return URL of the first remote encountered.
+
+ Raises RemoteNotFoundError if the repository does not have a remote
+ url configured.
+ """
+ # We need to pass 1 for extra_ok_returncodes since the command
+ # exits with return code 1 if there are no matching lines.
+ stdout = cls.run_command(
+ ["config", "--get-regexp", r"remote\..*\.url"],
+ extra_ok_returncodes=(1,),
+ show_stdout=False,
+ stdout_only=True,
+ cwd=location,
+ )
+ remotes = stdout.splitlines()
+ try:
+ found_remote = remotes[0]
+ except IndexError:
+ raise RemoteNotFoundError
+
+ for remote in remotes:
+ if remote.startswith("remote.origin.url "):
+ found_remote = remote
+ break
+ url = found_remote.split(" ")[1]
+ return cls._git_remote_to_pip_url(url.strip())
+
+ @staticmethod
+ def _git_remote_to_pip_url(url: str) -> str:
+ """
+ Convert a remote url from what git uses to what pip accepts.
+
+ There are 3 legal forms **url** may take:
+
+ 1. A fully qualified url: ssh://git@example.com/foo/bar.git
+ 2. A local project.git folder: /path/to/bare/repository.git
+ 3. SCP shorthand for form 1: git@example.com:foo/bar.git
+
+ Form 1 is output as-is. Form 2 must be converted to URI and form 3 must
+ be converted to form 1.
+
+ See the corresponding test test_git_remote_url_to_pip() for examples of
+ sample inputs/outputs.
+ """
+ if re.match(r"\w+://", url):
+ # This is already valid. Pass it though as-is.
+ return url
+ if os.path.exists(url):
+ # A local bare remote (git clone --mirror).
+ # Needs a file:// prefix.
+ return pathlib.PurePath(url).as_uri()
+ scp_match = SCP_REGEX.match(url)
+ if scp_match:
+ # Add an ssh:// prefix and replace the ':' with a '/'.
+ return scp_match.expand(r"ssh://\1\2/\3")
+ # Otherwise, bail out.
+ raise RemoteNotValidError(url)
+
+ @classmethod
+ def has_commit(cls, location: str, rev: str) -> bool:
+ """
+ Check if rev is a commit that is available in the local repository.
+ """
+ try:
+ cls.run_command(
+ ["rev-parse", "-q", "--verify", "sha^" + rev],
+ cwd=location,
+ log_failed_cmd=False,
+ )
+ except InstallationError:
+ return False
+ else:
+ return True
+
+ @classmethod
+ def get_revision(cls, location: str, rev: Optional[str] = None) -> str:
+ if rev is None:
+ rev = "HEAD"
+ current_rev = cls.run_command(
+ ["rev-parse", rev],
+ show_stdout=False,
+ stdout_only=True,
+ cwd=location,
+ )
+ return current_rev.strip()
+
+ @classmethod
+ def get_subdirectory(cls, location: str) -> Optional[str]:
+ """
+ Return the path to Python project root, relative to the repo root.
+ Return None if the project root is in the repo root.
+ """
+ # find the repo root
+ git_dir = cls.run_command(
+ ["rev-parse", "--git-dir"],
+ show_stdout=False,
+ stdout_only=True,
+ cwd=location,
+ ).strip()
+ if not os.path.isabs(git_dir):
+ git_dir = os.path.join(location, git_dir)
+ repo_root = os.path.abspath(os.path.join(git_dir, ".."))
+ return find_path_to_project_root_from_repo_root(location, repo_root)
+
+ @classmethod
+ def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
+ """
+ Prefixes stub URLs like 'user@hostname:user/repo.git' with 'ssh://'.
+ That's required because although they use SSH they sometimes don't
+ work with a ssh:// scheme (e.g. GitHub). But we need a scheme for
+ parsing. Hence we remove it again afterwards and return it as a stub.
+ """
+ # Works around an apparent Git bug
+ # (see https://article.gmane.org/gmane.comp.version-control.git/146500)
+ scheme, netloc, path, query, fragment = urlsplit(url)
+ if scheme.endswith("file"):
+ initial_slashes = path[: -len(path.lstrip("/"))]
+ newpath = initial_slashes + urllib.request.url2pathname(path).replace(
+ "\\", "/"
+ ).lstrip("/")
+ after_plus = scheme.find("+") + 1
+ url = scheme[:after_plus] + urlunsplit(
+ (scheme[after_plus:], netloc, newpath, query, fragment),
+ )
+
+ if "://" not in url:
+ assert "file:" not in url
+ url = url.replace("git+", "git+ssh://")
+ url, rev, user_pass = super().get_url_rev_and_auth(url)
+ url = url.replace("ssh://", "")
+ else:
+ url, rev, user_pass = super().get_url_rev_and_auth(url)
+
+ return url, rev, user_pass
+
+ @classmethod
+ def update_submodules(cls, location: str) -> None:
+ if not os.path.exists(os.path.join(location, ".gitmodules")):
+ return
+ cls.run_command(
+ ["submodule", "update", "--init", "--recursive", "-q"],
+ cwd=location,
+ )
+
+ @classmethod
+ def get_repository_root(cls, location: str) -> Optional[str]:
+ loc = super().get_repository_root(location)
+ if loc:
+ return loc
+ try:
+ r = cls.run_command(
+ ["rev-parse", "--show-toplevel"],
+ cwd=location,
+ show_stdout=False,
+ stdout_only=True,
+ on_returncode="raise",
+ log_failed_cmd=False,
+ )
+ except BadCommand:
+ logger.debug(
+ "could not determine if %s is under git control "
+ "because git is not available",
+ location,
+ )
+ return None
+ except InstallationError:
+ return None
+ return os.path.normpath(r.rstrip("\r\n"))
+
+ @staticmethod
+ def should_add_vcs_url_prefix(repo_url: str) -> bool:
+ """In either https or ssh form, requirements must be prefixed with git+."""
+ return True
+
+
+vcs.register(Git)
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/mercurial.py b/venv/lib/python3.11/site-packages/pip/_internal/vcs/mercurial.py
new file mode 100644
index 0000000..c183d41
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/vcs/mercurial.py
@@ -0,0 +1,163 @@
+import configparser
+import logging
+import os
+from typing import List, Optional, Tuple
+
+from pip._internal.exceptions import BadCommand, InstallationError
+from pip._internal.utils.misc import HiddenText, display_path
+from pip._internal.utils.subprocess import make_command
+from pip._internal.utils.urls import path_to_url
+from pip._internal.vcs.versioncontrol import (
+ RevOptions,
+ VersionControl,
+ find_path_to_project_root_from_repo_root,
+ vcs,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class Mercurial(VersionControl):
+ name = "hg"
+ dirname = ".hg"
+ repo_name = "clone"
+ schemes = (
+ "hg+file",
+ "hg+http",
+ "hg+https",
+ "hg+ssh",
+ "hg+static-http",
+ )
+
+ @staticmethod
+ def get_base_rev_args(rev: str) -> List[str]:
+ return [f"--rev={rev}"]
+
+ def fetch_new(
+ self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
+ ) -> None:
+ rev_display = rev_options.to_display()
+ logger.info(
+ "Cloning hg %s%s to %s",
+ url,
+ rev_display,
+ display_path(dest),
+ )
+ if verbosity <= 0:
+ flags: Tuple[str, ...] = ("--quiet",)
+ elif verbosity == 1:
+ flags = ()
+ elif verbosity == 2:
+ flags = ("--verbose",)
+ else:
+ flags = ("--verbose", "--debug")
+ self.run_command(make_command("clone", "--noupdate", *flags, url, dest))
+ self.run_command(
+ make_command("update", *flags, rev_options.to_args()),
+ cwd=dest,
+ )
+
+ def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
+ repo_config = os.path.join(dest, self.dirname, "hgrc")
+ config = configparser.RawConfigParser()
+ try:
+ config.read(repo_config)
+ config.set("paths", "default", url.secret)
+ with open(repo_config, "w") as config_file:
+ config.write(config_file)
+ except (OSError, configparser.NoSectionError) as exc:
+ logger.warning("Could not switch Mercurial repository to %s: %s", url, exc)
+ else:
+ cmd_args = make_command("update", "-q", rev_options.to_args())
+ self.run_command(cmd_args, cwd=dest)
+
+ def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
+ self.run_command(["pull", "-q"], cwd=dest)
+ cmd_args = make_command("update", "-q", rev_options.to_args())
+ self.run_command(cmd_args, cwd=dest)
+
+ @classmethod
+ def get_remote_url(cls, location: str) -> str:
+ url = cls.run_command(
+ ["showconfig", "paths.default"],
+ show_stdout=False,
+ stdout_only=True,
+ cwd=location,
+ ).strip()
+ if cls._is_local_repository(url):
+ url = path_to_url(url)
+ return url.strip()
+
+ @classmethod
+ def get_revision(cls, location: str) -> str:
+ """
+ Return the repository-local changeset revision number, as an integer.
+ """
+ current_revision = cls.run_command(
+ ["parents", "--template={rev}"],
+ show_stdout=False,
+ stdout_only=True,
+ cwd=location,
+ ).strip()
+ return current_revision
+
+ @classmethod
+ def get_requirement_revision(cls, location: str) -> str:
+ """
+ Return the changeset identification hash, as a 40-character
+ hexadecimal string
+ """
+ current_rev_hash = cls.run_command(
+ ["parents", "--template={node}"],
+ show_stdout=False,
+ stdout_only=True,
+ cwd=location,
+ ).strip()
+ return current_rev_hash
+
+ @classmethod
+ def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
+ """Always assume the versions don't match"""
+ return False
+
+ @classmethod
+ def get_subdirectory(cls, location: str) -> Optional[str]:
+ """
+ Return the path to Python project root, relative to the repo root.
+ Return None if the project root is in the repo root.
+ """
+ # find the repo root
+ repo_root = cls.run_command(
+ ["root"], show_stdout=False, stdout_only=True, cwd=location
+ ).strip()
+ if not os.path.isabs(repo_root):
+ repo_root = os.path.abspath(os.path.join(location, repo_root))
+ return find_path_to_project_root_from_repo_root(location, repo_root)
+
+ @classmethod
+ def get_repository_root(cls, location: str) -> Optional[str]:
+ loc = super().get_repository_root(location)
+ if loc:
+ return loc
+ try:
+ r = cls.run_command(
+ ["root"],
+ cwd=location,
+ show_stdout=False,
+ stdout_only=True,
+ on_returncode="raise",
+ log_failed_cmd=False,
+ )
+ except BadCommand:
+ logger.debug(
+ "could not determine if %s is under hg control "
+ "because hg is not available",
+ location,
+ )
+ return None
+ except InstallationError:
+ return None
+ return os.path.normpath(r.rstrip("\r\n"))
+
+
+vcs.register(Mercurial)
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/subversion.py b/venv/lib/python3.11/site-packages/pip/_internal/vcs/subversion.py
new file mode 100644
index 0000000..16d93a6
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/vcs/subversion.py
@@ -0,0 +1,324 @@
+import logging
+import os
+import re
+from typing import List, Optional, Tuple
+
+from pip._internal.utils.misc import (
+ HiddenText,
+ display_path,
+ is_console_interactive,
+ is_installable_dir,
+ split_auth_from_netloc,
+)
+from pip._internal.utils.subprocess import CommandArgs, make_command
+from pip._internal.vcs.versioncontrol import (
+ AuthInfo,
+ RemoteNotFoundError,
+ RevOptions,
+ VersionControl,
+ vcs,
+)
+
+logger = logging.getLogger(__name__)
+
+_svn_xml_url_re = re.compile('url="([^"]+)"')
+_svn_rev_re = re.compile(r'committed-rev="(\d+)"')
+_svn_info_xml_rev_re = re.compile(r'\s*revision="(\d+)"')
+_svn_info_xml_url_re = re.compile(r"<url>(.*)</url>")
+
+
+class Subversion(VersionControl):
+ name = "svn"
+ dirname = ".svn"
+ repo_name = "checkout"
+ schemes = ("svn+ssh", "svn+http", "svn+https", "svn+svn", "svn+file")
+
+ @classmethod
+ def should_add_vcs_url_prefix(cls, remote_url: str) -> bool:
+ return True
+
+ @staticmethod
+ def get_base_rev_args(rev: str) -> List[str]:
+ return ["-r", rev]
+
+ @classmethod
+ def get_revision(cls, location: str) -> str:
+ """
+ Return the maximum revision for all files under a given location
+ """
+ # Note: taken from setuptools.command.egg_info
+ revision = 0
+
+ for base, dirs, _ in os.walk(location):
+ if cls.dirname not in dirs:
+ dirs[:] = []
+ continue # no sense walking uncontrolled subdirs
+ dirs.remove(cls.dirname)
+ entries_fn = os.path.join(base, cls.dirname, "entries")
+ if not os.path.exists(entries_fn):
+ # FIXME: should we warn?
+ continue
+
+ dirurl, localrev = cls._get_svn_url_rev(base)
+
+ if base == location:
+ assert dirurl is not None
+ base = dirurl + "/" # save the root url
+ elif not dirurl or not dirurl.startswith(base):
+ dirs[:] = []
+ continue # not part of the same svn tree, skip it
+ revision = max(revision, localrev)
+ return str(revision)
+
+ @classmethod
+ def get_netloc_and_auth(
+ cls, netloc: str, scheme: str
+ ) -> Tuple[str, Tuple[Optional[str], Optional[str]]]:
+ """
+ This override allows the auth information to be passed to svn via the
+ --username and --password options instead of via the URL.
+ """
+ if scheme == "ssh":
+ # The --username and --password options can't be used for
+ # svn+ssh URLs, so keep the auth information in the URL.
+ return super().get_netloc_and_auth(netloc, scheme)
+
+ return split_auth_from_netloc(netloc)
+
+ @classmethod
+ def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
+ # hotfix the URL scheme after removing svn+ from svn+ssh:// re-add it
+ url, rev, user_pass = super().get_url_rev_and_auth(url)
+ if url.startswith("ssh://"):
+ url = "svn+" + url
+ return url, rev, user_pass
+
+ @staticmethod
+ def make_rev_args(
+ username: Optional[str], password: Optional[HiddenText]
+ ) -> CommandArgs:
+ extra_args: CommandArgs = []
+ if username:
+ extra_args += ["--username", username]
+ if password:
+ extra_args += ["--password", password]
+
+ return extra_args
+
+ @classmethod
+ def get_remote_url(cls, location: str) -> str:
+ # In cases where the source is in a subdirectory, we have to look up in
+ # the location until we find a valid project root.
+ orig_location = location
+ while not is_installable_dir(location):
+ last_location = location
+ location = os.path.dirname(location)
+ if location == last_location:
+ # We've traversed up to the root of the filesystem without
+ # finding a Python project.
+ logger.warning(
+ "Could not find Python project for directory %s (tried all "
+ "parent directories)",
+ orig_location,
+ )
+ raise RemoteNotFoundError
+
+ url, _rev = cls._get_svn_url_rev(location)
+ if url is None:
+ raise RemoteNotFoundError
+
+ return url
+
+ @classmethod
+ def _get_svn_url_rev(cls, location: str) -> Tuple[Optional[str], int]:
+ from pip._internal.exceptions import InstallationError
+
+ entries_path = os.path.join(location, cls.dirname, "entries")
+ if os.path.exists(entries_path):
+ with open(entries_path) as f:
+ data = f.read()
+ else: # subversion >= 1.7 does not have the 'entries' file
+ data = ""
+
+ url = None
+ if data.startswith("8") or data.startswith("9") or data.startswith("10"):
+ entries = list(map(str.splitlines, data.split("\n\x0c\n")))
+ del entries[0][0] # get rid of the '8'
+ url = entries[0][3]
+ revs = [int(d[9]) for d in entries if len(d) > 9 and d[9]] + [0]
+ elif data.startswith("<?xml"):
+ match = _svn_xml_url_re.search(data)
+ if not match:
+ raise ValueError(f"Badly formatted data: {data!r}")
+ url = match.group(1) # get repository URL
+ revs = [int(m.group(1)) for m in _svn_rev_re.finditer(data)] + [0]
+ else:
+ try:
+ # subversion >= 1.7
+ # Note that using get_remote_call_options is not necessary here
+ # because `svn info` is being run against a local directory.
+ # We don't need to worry about making sure interactive mode
+ # is being used to prompt for passwords, because passwords
+ # are only potentially needed for remote server requests.
+ xml = cls.run_command(
+ ["info", "--xml", location],
+ show_stdout=False,
+ stdout_only=True,
+ )
+ match = _svn_info_xml_url_re.search(xml)
+ assert match is not None
+ url = match.group(1)
+ revs = [int(m.group(1)) for m in _svn_info_xml_rev_re.finditer(xml)]
+ except InstallationError:
+ url, revs = None, []
+
+ if revs:
+ rev = max(revs)
+ else:
+ rev = 0
+
+ return url, rev
+
+ @classmethod
+ def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
+ """Always assume the versions don't match"""
+ return False
+
+ def __init__(self, use_interactive: Optional[bool] = None) -> None:
+ if use_interactive is None:
+ use_interactive = is_console_interactive()
+ self.use_interactive = use_interactive
+
+ # This member is used to cache the fetched version of the current
+ # ``svn`` client.
+ # Special value definitions:
+ # None: Not evaluated yet.
+ # Empty tuple: Could not parse version.
+ self._vcs_version: Optional[Tuple[int, ...]] = None
+
+ super().__init__()
+
+ def call_vcs_version(self) -> Tuple[int, ...]:
+ """Query the version of the currently installed Subversion client.
+
+ :return: A tuple containing the parts of the version information or
+ ``()`` if the version returned from ``svn`` could not be parsed.
+ :raises: BadCommand: If ``svn`` is not installed.
+ """
+ # Example versions:
+ # svn, version 1.10.3 (r1842928)
+ # compiled Feb 25 2019, 14:20:39 on x86_64-apple-darwin17.0.0
+ # svn, version 1.7.14 (r1542130)
+ # compiled Mar 28 2018, 08:49:13 on x86_64-pc-linux-gnu
+ # svn, version 1.12.0-SlikSvn (SlikSvn/1.12.0)
+ # compiled May 28 2019, 13:44:56 on x86_64-microsoft-windows6.2
+ version_prefix = "svn, version "
+ version = self.run_command(["--version"], show_stdout=False, stdout_only=True)
+ if not version.startswith(version_prefix):
+ return ()
+
+ version = version[len(version_prefix) :].split()[0]
+ version_list = version.partition("-")[0].split(".")
+ try:
+ parsed_version = tuple(map(int, version_list))
+ except ValueError:
+ return ()
+
+ return parsed_version
+
+ def get_vcs_version(self) -> Tuple[int, ...]:
+ """Return the version of the currently installed Subversion client.
+
+ If the version of the Subversion client has already been queried,
+ a cached value will be used.
+
+ :return: A tuple containing the parts of the version information or
+ ``()`` if the version returned from ``svn`` could not be parsed.
+ :raises: BadCommand: If ``svn`` is not installed.
+ """
+ if self._vcs_version is not None:
+ # Use cached version, if available.
+ # If parsing the version failed previously (empty tuple),
+ # do not attempt to parse it again.
+ return self._vcs_version
+
+ vcs_version = self.call_vcs_version()
+ self._vcs_version = vcs_version
+ return vcs_version
+
+ def get_remote_call_options(self) -> CommandArgs:
+ """Return options to be used on calls to Subversion that contact the server.
+
+ These options are applicable for the following ``svn`` subcommands used
+ in this class.
+
+ - checkout
+ - switch
+ - update
+
+ :return: A list of command line arguments to pass to ``svn``.
+ """
+ if not self.use_interactive:
+ # --non-interactive switch is available since Subversion 0.14.4.
+ # Subversion < 1.8 runs in interactive mode by default.
+ return ["--non-interactive"]
+
+ svn_version = self.get_vcs_version()
+ # By default, Subversion >= 1.8 runs in non-interactive mode if
+ # stdin is not a TTY. Since that is how pip invokes SVN, in
+ # call_subprocess(), pip must pass --force-interactive to ensure
+ # the user can be prompted for a password, if required.
+ # SVN added the --force-interactive option in SVN 1.8. Since
+ # e.g. RHEL/CentOS 7, which is supported until 2024, ships with
+ # SVN 1.7, pip should continue to support SVN 1.7. Therefore, pip
+ # can't safely add the option if the SVN version is < 1.8 (or unknown).
+ if svn_version >= (1, 8):
+ return ["--force-interactive"]
+
+ return []
+
+ def fetch_new(
+ self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
+ ) -> None:
+ rev_display = rev_options.to_display()
+ logger.info(
+ "Checking out %s%s to %s",
+ url,
+ rev_display,
+ display_path(dest),
+ )
+ if verbosity <= 0:
+ flag = "--quiet"
+ else:
+ flag = ""
+ cmd_args = make_command(
+ "checkout",
+ flag,
+ self.get_remote_call_options(),
+ rev_options.to_args(),
+ url,
+ dest,
+ )
+ self.run_command(cmd_args)
+
+ def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
+ cmd_args = make_command(
+ "switch",
+ self.get_remote_call_options(),
+ rev_options.to_args(),
+ url,
+ dest,
+ )
+ self.run_command(cmd_args)
+
+ def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
+ cmd_args = make_command(
+ "update",
+ self.get_remote_call_options(),
+ rev_options.to_args(),
+ dest,
+ )
+ self.run_command(cmd_args)
+
+
+vcs.register(Subversion)
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/versioncontrol.py b/venv/lib/python3.11/site-packages/pip/_internal/vcs/versioncontrol.py
new file mode 100644
index 0000000..46ca279
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/pip/_internal/vcs/versioncontrol.py
@@ -0,0 +1,705 @@
+"""Handles all VCS (version control) support"""
+
+import logging
+import os
+import shutil
+import sys
+import urllib.parse
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Dict,
+ Iterable,
+ Iterator,
+ List,
+ Mapping,
+ Optional,
+ Tuple,
+ Type,
+ Union,
+)
+
+from pip._internal.cli.spinners import SpinnerInterface
+from pip._internal.exceptions import BadCommand, InstallationError
+from pip._internal.utils.misc import (
+ HiddenText,
+ ask_path_exists,
+ backup_dir,
+ display_path,
+ hide_url,
+ hide_value,
+ is_installable_dir,
+ rmtree,
+)
+from pip._internal.utils.subprocess import (
+ CommandArgs,
+ call_subprocess,
+ format_command_args,
+ make_command,
+)
+from pip._internal.utils.urls import get_url_scheme
+
+if TYPE_CHECKING:
+ # Literal was introduced in Python 3.8.
+ #
+ # TODO: Remove `if TYPE_CHECKING` when dropping support for Python 3.7.
+ from typing import Literal
+
+
+__all__ = ["vcs"]
+
+
+logger = logging.getLogger(__name__)
+
+AuthInfo = Tuple[Optional[str], Optional[str]]
+
+
+def is_url(name: str) -> bool:
+ """
+ Return true if the name looks like a URL.
+ """
+ scheme = get_url_scheme(name)
+ if scheme is None:
+ return False
+ return scheme in ["http", "https", "file", "ftp"] + vcs.all_schemes
+
+
+def make_vcs_requirement_url(
+ repo_url: str, rev: str, project_name: str, subdir: Optional[str] = None
+) -> str:
+ """
+ Return the URL for a VCS requirement.
+
+ Args:
+ repo_url: the remote VCS url, with any needed VCS prefix (e.g. "git+").
+ project_name: the (unescaped) project name.
+ """
+ egg_project_name = project_name.replace("-", "_")
+ req = f"{repo_url}@{rev}#egg={egg_project_name}"
+ if subdir:
+ req += f"&subdirectory={subdir}"
+
+ return req
+
+
+def find_path_to_project_root_from_repo_root(
+ location: str, repo_root: str
+) -> Optional[str]:
+ """
+ Find the the Python project's root by searching up the filesystem from
+ `location`. Return the path to project root relative to `repo_root`.
+ Return None if the project root is `repo_root`, or cannot be found.
+ """
+ # find project root.
+ orig_location = location
+ while not is_installable_dir(location):
+ last_location = location
+ location = os.path.dirname(location)
+ if location == last_location:
+ # We've traversed up to the root of the filesystem without
+ # finding a Python project.
+ logger.warning(
+ "Could not find a Python project for directory %s (tried all "
+ "parent directories)",
+ orig_location,
+ )
+ return None
+
+ if os.path.samefile(repo_root, location):
+ return None
+
+ return os.path.relpath(location, repo_root)
+
+
+class RemoteNotFoundError(Exception):
+ pass
+
+
+class RemoteNotValidError(Exception):
+ def __init__(self, url: str):
+ super().__init__(url)
+ self.url = url
+
+
+class RevOptions:
+
+ """
+ Encapsulates a VCS-specific revision to install, along with any VCS
+ install options.
+
+ Instances of this class should be treated as if immutable.
+ """
+
+ def __init__(
+ self,
+ vc_class: Type["VersionControl"],
+ rev: Optional[str] = None,
+ extra_args: Optional[CommandArgs] = None,
+ ) -> None:
+ """
+ Args:
+ vc_class: a VersionControl subclass.
+ rev: the name of the revision to install.
+ extra_args: a list of extra options.
+ """
+ if extra_args is None:
+ extra_args = []
+
+ self.extra_args = extra_args
+ self.rev = rev
+ self.vc_class = vc_class
+ self.branch_name: Optional[str] = None
+
+ def __repr__(self) -> str:
+ return f"<RevOptions {self.vc_class.name}: rev={self.rev!r}>"
+
+ @property
+ def arg_rev(self) -> Optional[str]:
+ if self.rev is None:
+ return self.vc_class.default_arg_rev
+
+ return self.rev
+
+ def to_args(self) -> CommandArgs:
+ """
+ Return the VCS-specific command arguments.
+ """
+ args: CommandArgs = []
+ rev = self.arg_rev
+ if rev is not None:
+ args += self.vc_class.get_base_rev_args(rev)
+ args += self.extra_args
+
+ return args
+
+ def to_display(self) -> str:
+ if not self.rev:
+ return ""
+
+ return f" (to revision {self.rev})"
+
+ def make_new(self, rev: str) -> "RevOptions":
+ """
+ Make a copy of the current instance, but with a new rev.
+
+ Args:
+ rev: the name of the revision for the new object.
+ """
+ return self.vc_class.make_rev_options(rev, extra_args=self.extra_args)
+
+
+class VcsSupport:
+ _registry: Dict[str, "VersionControl"] = {}
+ schemes = ["ssh", "git", "hg", "bzr", "sftp", "svn"]
+
+ def __init__(self) -> None:
+ # Register more schemes with urlparse for various version control
+ # systems
+ urllib.parse.uses_netloc.extend(self.schemes)
+ super().__init__()
+
+ def __iter__(self) -> Iterator[str]:
+ return self._registry.__iter__()
+
+ @property
+ def backends(self) -> List["VersionControl"]:
+ return list(self._registry.values())
+
+ @property
+ def dirnames(self) -> List[str]:
+ return [backend.dirname for backend in self.backends]
+
+ @property
+ def all_schemes(self) -> List[str]:
+ schemes: List[str] = []
+ for backend in self.backends:
+ schemes.extend(backend.schemes)
+ return schemes
+
+ def register(self, cls: Type["VersionControl"]) -> None:
+ if not hasattr(cls, "name"):
+ logger.warning("Cannot register VCS %s", cls.__name__)
+ return
+ if cls.name not in self._registry:
+ self._registry[cls.name] = cls()
+ logger.debug("Registered VCS backend: %s", cls.name)
+
+ def unregister(self, name: str) -> None:
+ if name in self._registry:
+ del self._registry[name]
+
+ def get_backend_for_dir(self, location: str) -> Optional["VersionControl"]:
+ """
+ Return a VersionControl object if a repository of that type is found
+ at the given directory.
+ """
+ vcs_backends = {}
+ for vcs_backend in self._registry.values():
+ repo_path = vcs_backend.get_repository_root(location)
+ if not repo_path:
+ continue
+ logger.debug("Determine that %s uses VCS: %s", location, vcs_backend.name)
+ vcs_backends[repo_path] = vcs_backend
+
+ if not vcs_backends:
+ return None
+
+ # Choose the VCS in the inner-most directory. Since all repository
+ # roots found here would be either `location` or one of its
+ # parents, the longest path should have the most path components,
+ # i.e. the backend representing the inner-most repository.
+ inner_most_repo_path = max(vcs_backends, key=len)
+ return vcs_backends[inner_most_repo_path]
+
+ def get_backend_for_scheme(self, scheme: str) -> Optional["VersionControl"]:
+ """
+ Return a VersionControl object or None.
+ """
+ for vcs_backend in self._registry.values():
+ if scheme in vcs_backend.schemes:
+ return vcs_backend
+ return None
+
+ def get_backend(self, name: str) -> Optional["VersionControl"]:
+ """
+ Return a VersionControl object or None.
+ """
+ name = name.lower()
+ return self._registry.get(name)
+
+
+vcs = VcsSupport()
+
+
+class VersionControl:
+ name = ""
+ dirname = ""
+ repo_name = ""
+ # List of supported schemes for this Version Control
+ schemes: Tuple[str, ...] = ()
+ # Iterable of environment variable names to pass to call_subprocess().
+ unset_environ: Tuple[str, ...] = ()
+ default_arg_rev: Optional[str] = None
+
+ @classmethod
+ def should_add_vcs_url_prefix(cls, remote_url: str) -> bool:
+ """
+ Return whether the vcs prefix (e.g. "git+") should be added to a
+ repository's remote url when used in a requirement.
+ """
+ return not remote_url.lower().startswith(f"{cls.name}:")
+
+ @classmethod
+ def get_subdirectory(cls, location: str) -> Optional[str]:
+ """
+ Return the path to Python project root, relative to the repo root.
+ Return None if the project root is in the repo root.
+ """
+ return None
+
+ @classmethod
+ def get_requirement_revision(cls, repo_dir: str) -> str:
+ """
+ Return the revision string that should be used in a requirement.
+ """
+ return cls.get_revision(repo_dir)
+
+ @classmethod
+ def get_src_requirement(cls, repo_dir: str, project_name: str) -> str:
+ """
+ Return the requirement string to use to redownload the files
+ currently at the given repository directory.
+
+ Args:
+ project_name: the (unescaped) project name.
+
+ The return value has a form similar to the following:
+
+ {repository_url}@{revision}#egg={project_name}
+ """
+ repo_url = cls.get_remote_url(repo_dir)
+
+ if cls.should_add_vcs_url_prefix(repo_url):
+ repo_url = f"{cls.name}+{repo_url}"
+
+ revision = cls.get_requirement_revision(repo_dir)
+ subdir = cls.get_subdirectory(repo_dir)
+ req = make_vcs_requirement_url(repo_url, revision, project_name, subdir=subdir)
+
+ return req
+
+ @staticmethod
+ def get_base_rev_args(rev: str) -> List[str]:
+ """
+ Return the base revision arguments for a vcs command.
+
+ Args:
+ rev: the name of a revision to install. Cannot be None.
+ """
+ raise NotImplementedError
+
+ def is_immutable_rev_checkout(self, url: str, dest: str) -> bool:
+ """
+ Return true if the commit hash checked out at dest matches
+ the revision in url.
+
+ Always return False, if the VCS does not support immutable commit
+ hashes.
+
+ This method does not check if there are local uncommitted changes
+ in dest after checkout, as pip currently has no use case for that.
+ """
+ return False
+
+ @classmethod
+ def make_rev_options(
+ cls, rev: Optional[str] = None, extra_args: Optional[CommandArgs] = None
+ ) -> RevOptions:
+ """
+ Return a RevOptions object.
+
+ Args:
+ rev: the name of a revision to install.
+ extra_args: a list of extra options.
+ """
+ return RevOptions(cls, rev, extra_args=extra_args)
+
+ @classmethod
+ def _is_local_repository(cls, repo: str) -> bool:
+ """
+ posix absolute paths start with os.path.sep,
+ win32 ones start with drive (like c:\\folder)
+ """
+ drive, tail = os.path.splitdrive(repo)
+ return repo.startswith(os.path.sep) or bool(drive)
+
+ @classmethod
+ def get_netloc_and_auth(
+ cls, netloc: str, scheme: str
+ ) -> Tuple[str, Tuple[Optional[str], Optional[str]]]:
+ """
+ Parse the repository URL's netloc, and return the new netloc to use
+ along with auth information.
+
+ Args:
+ netloc: the original repository URL netloc.
+ scheme: the repository URL's scheme without the vcs prefix.
+
+ This is mainly for the Subversion class to override, so that auth
+ information can be provided via the --username and --password options
+ instead of through the URL. For other subclasses like Git without
+ such an option, auth information must stay in the URL.
+
+ Returns: (netloc, (username, password)).
+ """
+ return netloc, (None, None)
+
+ @classmethod
+ def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
+ """
+ Parse the repository URL to use, and return the URL, revision,
+ and auth info to use.
+
+ Returns: (url, rev, (username, password)).
+ """
+ scheme, netloc, path, query, frag = urllib.parse.urlsplit(url)
+ if "+" not in scheme:
+ raise ValueError(
+ f"Sorry, {url!r} is a malformed VCS url. "
+ "The format is <vcs>+<protocol>://<url>, "
+ "e.g. svn+http://myrepo/svn/MyApp#egg=MyApp"
+ )
+ # Remove the vcs prefix.
+ scheme = scheme.split("+", 1)[1]
+ netloc, user_pass = cls.get_netloc_and_auth(netloc, scheme)
+ rev = None
+ if "@" in path:
+ path, rev = path.rsplit("@", 1)
+ if not rev:
+ raise InstallationError(
+ f"The URL {url!r} has an empty revision (after @) "
+ "which is not supported. Include a revision after @ "
+ "or remove @ from the URL."
+ )
+ url = urllib.parse.urlunsplit((scheme, netloc, path, query, ""))
+ return url, rev, user_pass
+
+ @staticmethod
+ def make_rev_args(
+ username: Optional[str], password: Optional[HiddenText]
+ ) -> CommandArgs:
+ """
+ Return the RevOptions "extra arguments" to use in obtain().
+ """
+ return []
+
+ def get_url_rev_options(self, url: HiddenText) -> Tuple[HiddenText, RevOptions]:
+ """
+ Return the URL and RevOptions object to use in obtain(),
+ as a tuple (url, rev_options).
+ """
+ secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret)
+ username, secret_password = user_pass
+ password: Optional[HiddenText] = None
+ if secret_password is not None:
+ password = hide_value(secret_password)
+ extra_args = self.make_rev_args(username, password)
+ rev_options = self.make_rev_options(rev, extra_args=extra_args)
+
+ return hide_url(secret_url), rev_options
+
+ @staticmethod
+ def normalize_url(url: str) -> str:
+ """
+ Normalize a URL for comparison by unquoting it and removing any
+ trailing slash.
+ """
+ return urllib.parse.unquote(url).rstrip("/")
+
+ @classmethod
+ def compare_urls(cls, url1: str, url2: str) -> bool:
+ """
+ Compare two repo URLs for identity, ignoring incidental differences.
+ """
+ return cls.normalize_url(url1) == cls.normalize_url(url2)
+
+ def fetch_new(
+ self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
+ ) -> None:
+ """
+ Fetch a revision from a repository, in the case that this is the
+ first fetch from the repository.
+
+ Args:
+ dest: the directory to fetch the repository to.
+ rev_options: a RevOptions object.
+ verbosity: verbosity level.
+ """
+ raise NotImplementedError
+
+ def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
+ """
+ Switch the repo at ``dest`` to point to ``URL``.
+
+ Args:
+ rev_options: a RevOptions object.
+ """
+ raise NotImplementedError
+
+ def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
+ """
+ Update an already-existing repo to the given ``rev_options``.
+
+ Args:
+ rev_options: a RevOptions object.
+ """
+ raise NotImplementedError
+
+ @classmethod
+ def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
+ """
+ Return whether the id of the current commit equals the given name.
+
+ Args:
+ dest: the repository directory.
+ name: a string name.
+ """
+ raise NotImplementedError
+
+ def obtain(self, dest: str, url: HiddenText, verbosity: int) -> None:
+ """
+ Install or update in editable mode the package represented by this
+ VersionControl object.
+
+ :param dest: the repository directory in which to install or update.
+ :param url: the repository URL starting with a vcs prefix.
+ :param verbosity: verbosity level.
+ """
+ url, rev_options = self.get_url_rev_options(url)
+
+ if not os.path.exists(dest):
+ self.fetch_new(dest, url, rev_options, verbosity=verbosity)
+ return
+
+ rev_display = rev_options.to_display()
+ if self.is_repository_directory(dest):
+ existing_url = self.get_remote_url(dest)
+ if self.compare_urls(existing_url, url.secret):
+ logger.debug(
+ "%s in %s exists, and has correct URL (%s)",
+ self.repo_name.title(),
+ display_path(dest),
+ url,
+ )
+ if not self.is_commit_id_equal(dest, rev_options.rev):
+ logger.info(
+ "Updating %s %s%s",
+ display_path(dest),
+ self.repo_name,
+ rev_display,
+ )
+ self.update(dest, url, rev_options)
+ else:
+ logger.info("Skipping because already up-to-date.")
+ return
+
+ logger.warning(
+ "%s %s in %s exists with URL %s",
+ self.name,
+ self.repo_name,
+ display_path(dest),
+ existing_url,
+ )
+ prompt = ("(s)witch, (i)gnore, (w)ipe, (b)ackup ", ("s", "i", "w", "b"))
+ else:
+ logger.warning(
+ "Directory %s already exists, and is not a %s %s.",
+ dest,
+ self.name,
+ self.repo_name,
+ )
+ # https://github.com/python/mypy/issues/1174
+ prompt = ("(i)gnore, (w)ipe, (b)ackup ", ("i", "w", "b")) # type: ignore
+
+ logger.warning(
+ "The plan is to install the %s repository %s",
+ self.name,
+ url,
+ )
+ response = ask_path_exists(f"What to do? {prompt[0]}", prompt[1])
+
+ if response == "a":
+ sys.exit(-1)
+
+ if response == "w":
+ logger.warning("Deleting %s", display_path(dest))
+ rmtree(dest)
+ self.fetch_new(dest, url, rev_options, verbosity=verbosity)
+ return
+
+ if response == "b":
+ dest_dir = backup_dir(dest)
+ logger.warning("Backing up %s to %s", display_path(dest), dest_dir)
+ shutil.move(dest, dest_dir)
+ self.fetch_new(dest, url, rev_options, verbosity=verbosity)
+ return
+
+ # Do nothing if the response is "i".
+ if response == "s":
+ logger.info(
+ "Switching %s %s to %s%s",
+ self.repo_name,
+ display_path(dest),
+ url,
+ rev_display,
+ )
+ self.switch(dest, url, rev_options)
+
+ def unpack(self, location: str, url: HiddenText, verbosity: int) -> None:
+ """
+ Clean up current location and download the url repository
+ (and vcs infos) into location
+
+ :param url: the repository URL starting with a vcs prefix.
+ :param verbosity: verbosity level.
+ """
+ if os.path.exists(location):
+ rmtree(location)
+ self.obtain(location, url=url, verbosity=verbosity)
+
+ @classmethod
+ def get_remote_url(cls, location: str) -> str:
+ """
+ Return the url used at location
+
+ Raises RemoteNotFoundError if the repository does not have a remote
+ url configured.
+ """
+ raise NotImplementedError
+
+ @classmethod
+ def get_revision(cls, location: str) -> str:
+ """
+ Return the current commit id of the files at the given location.
+ """
+ raise NotImplementedError
+
+ @classmethod
+ def run_command(
+ cls,
+ cmd: Union[List[str], CommandArgs],
+ show_stdout: bool = True,
+ cwd: Optional[str] = None,
+ on_returncode: 'Literal["raise", "warn", "ignore"]' = "raise",
+ extra_ok_returncodes: Optional[Iterable[int]] = None,
+ command_desc: Optional[str] = None,
+ extra_environ: Optional[Mapping[str, Any]] = None,
+ spinner: Optional[SpinnerInterface] = None,
+ log_failed_cmd: bool = True,
+ stdout_only: bool = False,
+ ) -> str:
+ """
+ Run a VCS subcommand
+ This is simply a wrapper around call_subprocess that adds the VCS
+ command name, and checks that the VCS is available
+ """
+ cmd = make_command(cls.name, *cmd)
+ if command_desc is None:
+ command_desc = format_command_args(cmd)
+ try:
+ return call_subprocess(
+ cmd,
+ show_stdout,
+ cwd,
+ on_returncode=on_returncode,
+ extra_ok_returncodes=extra_ok_returncodes,
+ command_desc=command_desc,
+ extra_environ=extra_environ,
+ unset_environ=cls.unset_environ,
+ spinner=spinner,
+ log_failed_cmd=log_failed_cmd,
+ stdout_only=stdout_only,
+ )
+ except FileNotFoundError:
+ # errno.ENOENT = no such file or directory
+ # In other words, the VCS executable isn't available
+ raise BadCommand(
+ f"Cannot find command {cls.name!r} - do you have "
+ f"{cls.name!r} installed and in your PATH?"
+ )
+ except PermissionError:
+ # errno.EACCES = Permission denied
+ # This error occurs, for instance, when the command is installed
+ # only for another user. So, the current user don't have
+ # permission to call the other user command.
+ raise BadCommand(
+ f"No permission to execute {cls.name!r} - install it "
+ f"locally, globally (ask admin), or check your PATH. "
+ f"See possible solutions at "
+ f"https://pip.pypa.io/en/latest/reference/pip_freeze/"
+ f"#fixing-permission-denied."
+ )
+
+ @classmethod
+ def is_repository_directory(cls, path: str) -> bool:
+ """
+ Return whether a directory path is a repository directory.
+ """
+ logger.debug("Checking in %s for %s (%s)...", path, cls.dirname, cls.name)
+ return os.path.exists(os.path.join(path, cls.dirname))
+
+ @classmethod
+ def get_repository_root(cls, location: str) -> Optional[str]:
+ """
+ Return the "root" (top-level) directory controlled by the vcs,
+ or `None` if the directory is not in any.
+
+ It is meant to be overridden to implement smarter detection
+ mechanisms for specific vcs.
+
+ This can do more than is_repository_directory() alone. For
+ example, the Git override checks that Git is actually available.
+ """
+ if cls.is_repository_directory(location):
+ return location
+ return None