summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/pip/_internal/vcs
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/pip/_internal/vcs')
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/__init__.py15
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/__init__.cpython-311.pycbin638 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/bazaar.cpython-311.pycbin5863 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/git.cpython-311.pycbin21378 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/mercurial.cpython-311.pycbin8731 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/subversion.cpython-311.pycbin14606 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/versioncontrol.cpython-311.pycbin31770 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/bazaar.py112
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/git.py526
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/mercurial.py163
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/subversion.py324
-rw-r--r--venv/lib/python3.11/site-packages/pip/_internal/vcs/versioncontrol.py705
12 files changed, 0 insertions, 1845 deletions
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__init__.py b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__init__.py
deleted file mode 100644
index b6beddb..0000000
--- a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__init__.py
+++ /dev/null
@@ -1,15 +0,0 @@
-# Expose a limited set of classes and functions so callers outside of
-# the vcs package don't need to import deeper than `pip._internal.vcs`.
-# (The test directory may still need to import from a vcs sub-package.)
-# Import all vcs modules to register each VCS in the VcsSupport object.
-import pip._internal.vcs.bazaar
-import pip._internal.vcs.git
-import pip._internal.vcs.mercurial
-import pip._internal.vcs.subversion # noqa: F401
-from pip._internal.vcs.versioncontrol import ( # noqa: F401
- RemoteNotFoundError,
- RemoteNotValidError,
- is_url,
- make_vcs_requirement_url,
- vcs,
-)
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/__init__.cpython-311.pyc
deleted file mode 100644
index 058060c..0000000
--- a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/__init__.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/bazaar.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/bazaar.cpython-311.pyc
deleted file mode 100644
index 874b93a..0000000
--- a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/bazaar.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/git.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/git.cpython-311.pyc
deleted file mode 100644
index 6c092b0..0000000
--- a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/git.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/mercurial.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/mercurial.cpython-311.pyc
deleted file mode 100644
index 4aa50d2..0000000
--- a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/mercurial.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/subversion.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/subversion.cpython-311.pyc
deleted file mode 100644
index 8f4e3f2..0000000
--- a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/subversion.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/versioncontrol.cpython-311.pyc b/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/versioncontrol.cpython-311.pyc
deleted file mode 100644
index 0f06f47..0000000
--- a/venv/lib/python3.11/site-packages/pip/_internal/vcs/__pycache__/versioncontrol.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/bazaar.py b/venv/lib/python3.11/site-packages/pip/_internal/vcs/bazaar.py
deleted file mode 100644
index 20a17ed..0000000
--- a/venv/lib/python3.11/site-packages/pip/_internal/vcs/bazaar.py
+++ /dev/null
@@ -1,112 +0,0 @@
-import logging
-from typing import List, Optional, Tuple
-
-from pip._internal.utils.misc import HiddenText, display_path
-from pip._internal.utils.subprocess import make_command
-from pip._internal.utils.urls import path_to_url
-from pip._internal.vcs.versioncontrol import (
- AuthInfo,
- RemoteNotFoundError,
- RevOptions,
- VersionControl,
- vcs,
-)
-
-logger = logging.getLogger(__name__)
-
-
-class Bazaar(VersionControl):
- name = "bzr"
- dirname = ".bzr"
- repo_name = "branch"
- schemes = (
- "bzr+http",
- "bzr+https",
- "bzr+ssh",
- "bzr+sftp",
- "bzr+ftp",
- "bzr+lp",
- "bzr+file",
- )
-
- @staticmethod
- def get_base_rev_args(rev: str) -> List[str]:
- return ["-r", rev]
-
- def fetch_new(
- self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
- ) -> None:
- rev_display = rev_options.to_display()
- logger.info(
- "Checking out %s%s to %s",
- url,
- rev_display,
- display_path(dest),
- )
- if verbosity <= 0:
- flag = "--quiet"
- elif verbosity == 1:
- flag = ""
- else:
- flag = f"-{'v'*verbosity}"
- cmd_args = make_command(
- "checkout", "--lightweight", flag, rev_options.to_args(), url, dest
- )
- self.run_command(cmd_args)
-
- def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
- self.run_command(make_command("switch", url), cwd=dest)
-
- def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
- output = self.run_command(
- make_command("info"), show_stdout=False, stdout_only=True, cwd=dest
- )
- if output.startswith("Standalone "):
- # Older versions of pip used to create standalone branches.
- # Convert the standalone branch to a checkout by calling "bzr bind".
- cmd_args = make_command("bind", "-q", url)
- self.run_command(cmd_args, cwd=dest)
-
- cmd_args = make_command("update", "-q", rev_options.to_args())
- self.run_command(cmd_args, cwd=dest)
-
- @classmethod
- def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
- # hotfix the URL scheme after removing bzr+ from bzr+ssh:// re-add it
- url, rev, user_pass = super().get_url_rev_and_auth(url)
- if url.startswith("ssh://"):
- url = "bzr+" + url
- return url, rev, user_pass
-
- @classmethod
- def get_remote_url(cls, location: str) -> str:
- urls = cls.run_command(
- ["info"], show_stdout=False, stdout_only=True, cwd=location
- )
- for line in urls.splitlines():
- line = line.strip()
- for x in ("checkout of branch: ", "parent branch: "):
- if line.startswith(x):
- repo = line.split(x)[1]
- if cls._is_local_repository(repo):
- return path_to_url(repo)
- return repo
- raise RemoteNotFoundError
-
- @classmethod
- def get_revision(cls, location: str) -> str:
- revision = cls.run_command(
- ["revno"],
- show_stdout=False,
- stdout_only=True,
- cwd=location,
- )
- return revision.splitlines()[-1]
-
- @classmethod
- def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
- """Always assume the versions don't match"""
- return False
-
-
-vcs.register(Bazaar)
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/git.py b/venv/lib/python3.11/site-packages/pip/_internal/vcs/git.py
deleted file mode 100644
index 8c242cf..0000000
--- a/venv/lib/python3.11/site-packages/pip/_internal/vcs/git.py
+++ /dev/null
@@ -1,526 +0,0 @@
-import logging
-import os.path
-import pathlib
-import re
-import urllib.parse
-import urllib.request
-from typing import List, Optional, Tuple
-
-from pip._internal.exceptions import BadCommand, InstallationError
-from pip._internal.utils.misc import HiddenText, display_path, hide_url
-from pip._internal.utils.subprocess import make_command
-from pip._internal.vcs.versioncontrol import (
- AuthInfo,
- RemoteNotFoundError,
- RemoteNotValidError,
- RevOptions,
- VersionControl,
- find_path_to_project_root_from_repo_root,
- vcs,
-)
-
-urlsplit = urllib.parse.urlsplit
-urlunsplit = urllib.parse.urlunsplit
-
-
-logger = logging.getLogger(__name__)
-
-
-GIT_VERSION_REGEX = re.compile(
- r"^git version " # Prefix.
- r"(\d+)" # Major.
- r"\.(\d+)" # Dot, minor.
- r"(?:\.(\d+))?" # Optional dot, patch.
- r".*$" # Suffix, including any pre- and post-release segments we don't care about.
-)
-
-HASH_REGEX = re.compile("^[a-fA-F0-9]{40}$")
-
-# SCP (Secure copy protocol) shorthand. e.g. 'git@example.com:foo/bar.git'
-SCP_REGEX = re.compile(
- r"""^
- # Optional user, e.g. 'git@'
- (\w+@)?
- # Server, e.g. 'github.com'.
- ([^/:]+):
- # The server-side path. e.g. 'user/project.git'. Must start with an
- # alphanumeric character so as not to be confusable with a Windows paths
- # like 'C:/foo/bar' or 'C:\foo\bar'.
- (\w[^:]*)
- $""",
- re.VERBOSE,
-)
-
-
-def looks_like_hash(sha: str) -> bool:
- return bool(HASH_REGEX.match(sha))
-
-
-class Git(VersionControl):
- name = "git"
- dirname = ".git"
- repo_name = "clone"
- schemes = (
- "git+http",
- "git+https",
- "git+ssh",
- "git+git",
- "git+file",
- )
- # Prevent the user's environment variables from interfering with pip:
- # https://github.com/pypa/pip/issues/1130
- unset_environ = ("GIT_DIR", "GIT_WORK_TREE")
- default_arg_rev = "HEAD"
-
- @staticmethod
- def get_base_rev_args(rev: str) -> List[str]:
- return [rev]
-
- def is_immutable_rev_checkout(self, url: str, dest: str) -> bool:
- _, rev_options = self.get_url_rev_options(hide_url(url))
- if not rev_options.rev:
- return False
- if not self.is_commit_id_equal(dest, rev_options.rev):
- # the current commit is different from rev,
- # which means rev was something else than a commit hash
- return False
- # return False in the rare case rev is both a commit hash
- # and a tag or a branch; we don't want to cache in that case
- # because that branch/tag could point to something else in the future
- is_tag_or_branch = bool(self.get_revision_sha(dest, rev_options.rev)[0])
- return not is_tag_or_branch
-
- def get_git_version(self) -> Tuple[int, ...]:
- version = self.run_command(
- ["version"],
- command_desc="git version",
- show_stdout=False,
- stdout_only=True,
- )
- match = GIT_VERSION_REGEX.match(version)
- if not match:
- logger.warning("Can't parse git version: %s", version)
- return ()
- return (int(match.group(1)), int(match.group(2)))
-
- @classmethod
- def get_current_branch(cls, location: str) -> Optional[str]:
- """
- Return the current branch, or None if HEAD isn't at a branch
- (e.g. detached HEAD).
- """
- # git-symbolic-ref exits with empty stdout if "HEAD" is a detached
- # HEAD rather than a symbolic ref. In addition, the -q causes the
- # command to exit with status code 1 instead of 128 in this case
- # and to suppress the message to stderr.
- args = ["symbolic-ref", "-q", "HEAD"]
- output = cls.run_command(
- args,
- extra_ok_returncodes=(1,),
- show_stdout=False,
- stdout_only=True,
- cwd=location,
- )
- ref = output.strip()
-
- if ref.startswith("refs/heads/"):
- return ref[len("refs/heads/") :]
-
- return None
-
- @classmethod
- def get_revision_sha(cls, dest: str, rev: str) -> Tuple[Optional[str], bool]:
- """
- Return (sha_or_none, is_branch), where sha_or_none is a commit hash
- if the revision names a remote branch or tag, otherwise None.
-
- Args:
- dest: the repository directory.
- rev: the revision name.
- """
- # Pass rev to pre-filter the list.
- output = cls.run_command(
- ["show-ref", rev],
- cwd=dest,
- show_stdout=False,
- stdout_only=True,
- on_returncode="ignore",
- )
- refs = {}
- # NOTE: We do not use splitlines here since that would split on other
- # unicode separators, which can be maliciously used to install a
- # different revision.
- for line in output.strip().split("\n"):
- line = line.rstrip("\r")
- if not line:
- continue
- try:
- ref_sha, ref_name = line.split(" ", maxsplit=2)
- except ValueError:
- # Include the offending line to simplify troubleshooting if
- # this error ever occurs.
- raise ValueError(f"unexpected show-ref line: {line!r}")
-
- refs[ref_name] = ref_sha
-
- branch_ref = f"refs/remotes/origin/{rev}"
- tag_ref = f"refs/tags/{rev}"
-
- sha = refs.get(branch_ref)
- if sha is not None:
- return (sha, True)
-
- sha = refs.get(tag_ref)
-
- return (sha, False)
-
- @classmethod
- def _should_fetch(cls, dest: str, rev: str) -> bool:
- """
- Return true if rev is a ref or is a commit that we don't have locally.
-
- Branches and tags are not considered in this method because they are
- assumed to be always available locally (which is a normal outcome of
- ``git clone`` and ``git fetch --tags``).
- """
- if rev.startswith("refs/"):
- # Always fetch remote refs.
- return True
-
- if not looks_like_hash(rev):
- # Git fetch would fail with abbreviated commits.
- return False
-
- if cls.has_commit(dest, rev):
- # Don't fetch if we have the commit locally.
- return False
-
- return True
-
- @classmethod
- def resolve_revision(
- cls, dest: str, url: HiddenText, rev_options: RevOptions
- ) -> RevOptions:
- """
- Resolve a revision to a new RevOptions object with the SHA1 of the
- branch, tag, or ref if found.
-
- Args:
- rev_options: a RevOptions object.
- """
- rev = rev_options.arg_rev
- # The arg_rev property's implementation for Git ensures that the
- # rev return value is always non-None.
- assert rev is not None
-
- sha, is_branch = cls.get_revision_sha(dest, rev)
-
- if sha is not None:
- rev_options = rev_options.make_new(sha)
- rev_options.branch_name = rev if is_branch else None
-
- return rev_options
-
- # Do not show a warning for the common case of something that has
- # the form of a Git commit hash.
- if not looks_like_hash(rev):
- logger.warning(
- "Did not find branch or tag '%s', assuming revision or ref.",
- rev,
- )
-
- if not cls._should_fetch(dest, rev):
- return rev_options
-
- # fetch the requested revision
- cls.run_command(
- make_command("fetch", "-q", url, rev_options.to_args()),
- cwd=dest,
- )
- # Change the revision to the SHA of the ref we fetched
- sha = cls.get_revision(dest, rev="FETCH_HEAD")
- rev_options = rev_options.make_new(sha)
-
- return rev_options
-
- @classmethod
- def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
- """
- Return whether the current commit hash equals the given name.
-
- Args:
- dest: the repository directory.
- name: a string name.
- """
- if not name:
- # Then avoid an unnecessary subprocess call.
- return False
-
- return cls.get_revision(dest) == name
-
- def fetch_new(
- self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
- ) -> None:
- rev_display = rev_options.to_display()
- logger.info("Cloning %s%s to %s", url, rev_display, display_path(dest))
- if verbosity <= 0:
- flags: Tuple[str, ...] = ("--quiet",)
- elif verbosity == 1:
- flags = ()
- else:
- flags = ("--verbose", "--progress")
- if self.get_git_version() >= (2, 17):
- # Git added support for partial clone in 2.17
- # https://git-scm.com/docs/partial-clone
- # Speeds up cloning by functioning without a complete copy of repository
- self.run_command(
- make_command(
- "clone",
- "--filter=blob:none",
- *flags,
- url,
- dest,
- )
- )
- else:
- self.run_command(make_command("clone", *flags, url, dest))
-
- if rev_options.rev:
- # Then a specific revision was requested.
- rev_options = self.resolve_revision(dest, url, rev_options)
- branch_name = getattr(rev_options, "branch_name", None)
- logger.debug("Rev options %s, branch_name %s", rev_options, branch_name)
- if branch_name is None:
- # Only do a checkout if the current commit id doesn't match
- # the requested revision.
- if not self.is_commit_id_equal(dest, rev_options.rev):
- cmd_args = make_command(
- "checkout",
- "-q",
- rev_options.to_args(),
- )
- self.run_command(cmd_args, cwd=dest)
- elif self.get_current_branch(dest) != branch_name:
- # Then a specific branch was requested, and that branch
- # is not yet checked out.
- track_branch = f"origin/{branch_name}"
- cmd_args = [
- "checkout",
- "-b",
- branch_name,
- "--track",
- track_branch,
- ]
- self.run_command(cmd_args, cwd=dest)
- else:
- sha = self.get_revision(dest)
- rev_options = rev_options.make_new(sha)
-
- logger.info("Resolved %s to commit %s", url, rev_options.rev)
-
- #: repo may contain submodules
- self.update_submodules(dest)
-
- def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
- self.run_command(
- make_command("config", "remote.origin.url", url),
- cwd=dest,
- )
- cmd_args = make_command("checkout", "-q", rev_options.to_args())
- self.run_command(cmd_args, cwd=dest)
-
- self.update_submodules(dest)
-
- def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
- # First fetch changes from the default remote
- if self.get_git_version() >= (1, 9):
- # fetch tags in addition to everything else
- self.run_command(["fetch", "-q", "--tags"], cwd=dest)
- else:
- self.run_command(["fetch", "-q"], cwd=dest)
- # Then reset to wanted revision (maybe even origin/master)
- rev_options = self.resolve_revision(dest, url, rev_options)
- cmd_args = make_command("reset", "--hard", "-q", rev_options.to_args())
- self.run_command(cmd_args, cwd=dest)
- #: update submodules
- self.update_submodules(dest)
-
- @classmethod
- def get_remote_url(cls, location: str) -> str:
- """
- Return URL of the first remote encountered.
-
- Raises RemoteNotFoundError if the repository does not have a remote
- url configured.
- """
- # We need to pass 1 for extra_ok_returncodes since the command
- # exits with return code 1 if there are no matching lines.
- stdout = cls.run_command(
- ["config", "--get-regexp", r"remote\..*\.url"],
- extra_ok_returncodes=(1,),
- show_stdout=False,
- stdout_only=True,
- cwd=location,
- )
- remotes = stdout.splitlines()
- try:
- found_remote = remotes[0]
- except IndexError:
- raise RemoteNotFoundError
-
- for remote in remotes:
- if remote.startswith("remote.origin.url "):
- found_remote = remote
- break
- url = found_remote.split(" ")[1]
- return cls._git_remote_to_pip_url(url.strip())
-
- @staticmethod
- def _git_remote_to_pip_url(url: str) -> str:
- """
- Convert a remote url from what git uses to what pip accepts.
-
- There are 3 legal forms **url** may take:
-
- 1. A fully qualified url: ssh://git@example.com/foo/bar.git
- 2. A local project.git folder: /path/to/bare/repository.git
- 3. SCP shorthand for form 1: git@example.com:foo/bar.git
-
- Form 1 is output as-is. Form 2 must be converted to URI and form 3 must
- be converted to form 1.
-
- See the corresponding test test_git_remote_url_to_pip() for examples of
- sample inputs/outputs.
- """
- if re.match(r"\w+://", url):
- # This is already valid. Pass it though as-is.
- return url
- if os.path.exists(url):
- # A local bare remote (git clone --mirror).
- # Needs a file:// prefix.
- return pathlib.PurePath(url).as_uri()
- scp_match = SCP_REGEX.match(url)
- if scp_match:
- # Add an ssh:// prefix and replace the ':' with a '/'.
- return scp_match.expand(r"ssh://\1\2/\3")
- # Otherwise, bail out.
- raise RemoteNotValidError(url)
-
- @classmethod
- def has_commit(cls, location: str, rev: str) -> bool:
- """
- Check if rev is a commit that is available in the local repository.
- """
- try:
- cls.run_command(
- ["rev-parse", "-q", "--verify", "sha^" + rev],
- cwd=location,
- log_failed_cmd=False,
- )
- except InstallationError:
- return False
- else:
- return True
-
- @classmethod
- def get_revision(cls, location: str, rev: Optional[str] = None) -> str:
- if rev is None:
- rev = "HEAD"
- current_rev = cls.run_command(
- ["rev-parse", rev],
- show_stdout=False,
- stdout_only=True,
- cwd=location,
- )
- return current_rev.strip()
-
- @classmethod
- def get_subdirectory(cls, location: str) -> Optional[str]:
- """
- Return the path to Python project root, relative to the repo root.
- Return None if the project root is in the repo root.
- """
- # find the repo root
- git_dir = cls.run_command(
- ["rev-parse", "--git-dir"],
- show_stdout=False,
- stdout_only=True,
- cwd=location,
- ).strip()
- if not os.path.isabs(git_dir):
- git_dir = os.path.join(location, git_dir)
- repo_root = os.path.abspath(os.path.join(git_dir, ".."))
- return find_path_to_project_root_from_repo_root(location, repo_root)
-
- @classmethod
- def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
- """
- Prefixes stub URLs like 'user@hostname:user/repo.git' with 'ssh://'.
- That's required because although they use SSH they sometimes don't
- work with a ssh:// scheme (e.g. GitHub). But we need a scheme for
- parsing. Hence we remove it again afterwards and return it as a stub.
- """
- # Works around an apparent Git bug
- # (see https://article.gmane.org/gmane.comp.version-control.git/146500)
- scheme, netloc, path, query, fragment = urlsplit(url)
- if scheme.endswith("file"):
- initial_slashes = path[: -len(path.lstrip("/"))]
- newpath = initial_slashes + urllib.request.url2pathname(path).replace(
- "\\", "/"
- ).lstrip("/")
- after_plus = scheme.find("+") + 1
- url = scheme[:after_plus] + urlunsplit(
- (scheme[after_plus:], netloc, newpath, query, fragment),
- )
-
- if "://" not in url:
- assert "file:" not in url
- url = url.replace("git+", "git+ssh://")
- url, rev, user_pass = super().get_url_rev_and_auth(url)
- url = url.replace("ssh://", "")
- else:
- url, rev, user_pass = super().get_url_rev_and_auth(url)
-
- return url, rev, user_pass
-
- @classmethod
- def update_submodules(cls, location: str) -> None:
- if not os.path.exists(os.path.join(location, ".gitmodules")):
- return
- cls.run_command(
- ["submodule", "update", "--init", "--recursive", "-q"],
- cwd=location,
- )
-
- @classmethod
- def get_repository_root(cls, location: str) -> Optional[str]:
- loc = super().get_repository_root(location)
- if loc:
- return loc
- try:
- r = cls.run_command(
- ["rev-parse", "--show-toplevel"],
- cwd=location,
- show_stdout=False,
- stdout_only=True,
- on_returncode="raise",
- log_failed_cmd=False,
- )
- except BadCommand:
- logger.debug(
- "could not determine if %s is under git control "
- "because git is not available",
- location,
- )
- return None
- except InstallationError:
- return None
- return os.path.normpath(r.rstrip("\r\n"))
-
- @staticmethod
- def should_add_vcs_url_prefix(repo_url: str) -> bool:
- """In either https or ssh form, requirements must be prefixed with git+."""
- return True
-
-
-vcs.register(Git)
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/mercurial.py b/venv/lib/python3.11/site-packages/pip/_internal/vcs/mercurial.py
deleted file mode 100644
index c183d41..0000000
--- a/venv/lib/python3.11/site-packages/pip/_internal/vcs/mercurial.py
+++ /dev/null
@@ -1,163 +0,0 @@
-import configparser
-import logging
-import os
-from typing import List, Optional, Tuple
-
-from pip._internal.exceptions import BadCommand, InstallationError
-from pip._internal.utils.misc import HiddenText, display_path
-from pip._internal.utils.subprocess import make_command
-from pip._internal.utils.urls import path_to_url
-from pip._internal.vcs.versioncontrol import (
- RevOptions,
- VersionControl,
- find_path_to_project_root_from_repo_root,
- vcs,
-)
-
-logger = logging.getLogger(__name__)
-
-
-class Mercurial(VersionControl):
- name = "hg"
- dirname = ".hg"
- repo_name = "clone"
- schemes = (
- "hg+file",
- "hg+http",
- "hg+https",
- "hg+ssh",
- "hg+static-http",
- )
-
- @staticmethod
- def get_base_rev_args(rev: str) -> List[str]:
- return [f"--rev={rev}"]
-
- def fetch_new(
- self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
- ) -> None:
- rev_display = rev_options.to_display()
- logger.info(
- "Cloning hg %s%s to %s",
- url,
- rev_display,
- display_path(dest),
- )
- if verbosity <= 0:
- flags: Tuple[str, ...] = ("--quiet",)
- elif verbosity == 1:
- flags = ()
- elif verbosity == 2:
- flags = ("--verbose",)
- else:
- flags = ("--verbose", "--debug")
- self.run_command(make_command("clone", "--noupdate", *flags, url, dest))
- self.run_command(
- make_command("update", *flags, rev_options.to_args()),
- cwd=dest,
- )
-
- def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
- repo_config = os.path.join(dest, self.dirname, "hgrc")
- config = configparser.RawConfigParser()
- try:
- config.read(repo_config)
- config.set("paths", "default", url.secret)
- with open(repo_config, "w") as config_file:
- config.write(config_file)
- except (OSError, configparser.NoSectionError) as exc:
- logger.warning("Could not switch Mercurial repository to %s: %s", url, exc)
- else:
- cmd_args = make_command("update", "-q", rev_options.to_args())
- self.run_command(cmd_args, cwd=dest)
-
- def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
- self.run_command(["pull", "-q"], cwd=dest)
- cmd_args = make_command("update", "-q", rev_options.to_args())
- self.run_command(cmd_args, cwd=dest)
-
- @classmethod
- def get_remote_url(cls, location: str) -> str:
- url = cls.run_command(
- ["showconfig", "paths.default"],
- show_stdout=False,
- stdout_only=True,
- cwd=location,
- ).strip()
- if cls._is_local_repository(url):
- url = path_to_url(url)
- return url.strip()
-
- @classmethod
- def get_revision(cls, location: str) -> str:
- """
- Return the repository-local changeset revision number, as an integer.
- """
- current_revision = cls.run_command(
- ["parents", "--template={rev}"],
- show_stdout=False,
- stdout_only=True,
- cwd=location,
- ).strip()
- return current_revision
-
- @classmethod
- def get_requirement_revision(cls, location: str) -> str:
- """
- Return the changeset identification hash, as a 40-character
- hexadecimal string
- """
- current_rev_hash = cls.run_command(
- ["parents", "--template={node}"],
- show_stdout=False,
- stdout_only=True,
- cwd=location,
- ).strip()
- return current_rev_hash
-
- @classmethod
- def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
- """Always assume the versions don't match"""
- return False
-
- @classmethod
- def get_subdirectory(cls, location: str) -> Optional[str]:
- """
- Return the path to Python project root, relative to the repo root.
- Return None if the project root is in the repo root.
- """
- # find the repo root
- repo_root = cls.run_command(
- ["root"], show_stdout=False, stdout_only=True, cwd=location
- ).strip()
- if not os.path.isabs(repo_root):
- repo_root = os.path.abspath(os.path.join(location, repo_root))
- return find_path_to_project_root_from_repo_root(location, repo_root)
-
- @classmethod
- def get_repository_root(cls, location: str) -> Optional[str]:
- loc = super().get_repository_root(location)
- if loc:
- return loc
- try:
- r = cls.run_command(
- ["root"],
- cwd=location,
- show_stdout=False,
- stdout_only=True,
- on_returncode="raise",
- log_failed_cmd=False,
- )
- except BadCommand:
- logger.debug(
- "could not determine if %s is under hg control "
- "because hg is not available",
- location,
- )
- return None
- except InstallationError:
- return None
- return os.path.normpath(r.rstrip("\r\n"))
-
-
-vcs.register(Mercurial)
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/subversion.py b/venv/lib/python3.11/site-packages/pip/_internal/vcs/subversion.py
deleted file mode 100644
index 16d93a6..0000000
--- a/venv/lib/python3.11/site-packages/pip/_internal/vcs/subversion.py
+++ /dev/null
@@ -1,324 +0,0 @@
-import logging
-import os
-import re
-from typing import List, Optional, Tuple
-
-from pip._internal.utils.misc import (
- HiddenText,
- display_path,
- is_console_interactive,
- is_installable_dir,
- split_auth_from_netloc,
-)
-from pip._internal.utils.subprocess import CommandArgs, make_command
-from pip._internal.vcs.versioncontrol import (
- AuthInfo,
- RemoteNotFoundError,
- RevOptions,
- VersionControl,
- vcs,
-)
-
-logger = logging.getLogger(__name__)
-
-_svn_xml_url_re = re.compile('url="([^"]+)"')
-_svn_rev_re = re.compile(r'committed-rev="(\d+)"')
-_svn_info_xml_rev_re = re.compile(r'\s*revision="(\d+)"')
-_svn_info_xml_url_re = re.compile(r"<url>(.*)</url>")
-
-
-class Subversion(VersionControl):
- name = "svn"
- dirname = ".svn"
- repo_name = "checkout"
- schemes = ("svn+ssh", "svn+http", "svn+https", "svn+svn", "svn+file")
-
- @classmethod
- def should_add_vcs_url_prefix(cls, remote_url: str) -> bool:
- return True
-
- @staticmethod
- def get_base_rev_args(rev: str) -> List[str]:
- return ["-r", rev]
-
- @classmethod
- def get_revision(cls, location: str) -> str:
- """
- Return the maximum revision for all files under a given location
- """
- # Note: taken from setuptools.command.egg_info
- revision = 0
-
- for base, dirs, _ in os.walk(location):
- if cls.dirname not in dirs:
- dirs[:] = []
- continue # no sense walking uncontrolled subdirs
- dirs.remove(cls.dirname)
- entries_fn = os.path.join(base, cls.dirname, "entries")
- if not os.path.exists(entries_fn):
- # FIXME: should we warn?
- continue
-
- dirurl, localrev = cls._get_svn_url_rev(base)
-
- if base == location:
- assert dirurl is not None
- base = dirurl + "/" # save the root url
- elif not dirurl or not dirurl.startswith(base):
- dirs[:] = []
- continue # not part of the same svn tree, skip it
- revision = max(revision, localrev)
- return str(revision)
-
- @classmethod
- def get_netloc_and_auth(
- cls, netloc: str, scheme: str
- ) -> Tuple[str, Tuple[Optional[str], Optional[str]]]:
- """
- This override allows the auth information to be passed to svn via the
- --username and --password options instead of via the URL.
- """
- if scheme == "ssh":
- # The --username and --password options can't be used for
- # svn+ssh URLs, so keep the auth information in the URL.
- return super().get_netloc_and_auth(netloc, scheme)
-
- return split_auth_from_netloc(netloc)
-
- @classmethod
- def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
- # hotfix the URL scheme after removing svn+ from svn+ssh:// re-add it
- url, rev, user_pass = super().get_url_rev_and_auth(url)
- if url.startswith("ssh://"):
- url = "svn+" + url
- return url, rev, user_pass
-
- @staticmethod
- def make_rev_args(
- username: Optional[str], password: Optional[HiddenText]
- ) -> CommandArgs:
- extra_args: CommandArgs = []
- if username:
- extra_args += ["--username", username]
- if password:
- extra_args += ["--password", password]
-
- return extra_args
-
- @classmethod
- def get_remote_url(cls, location: str) -> str:
- # In cases where the source is in a subdirectory, we have to look up in
- # the location until we find a valid project root.
- orig_location = location
- while not is_installable_dir(location):
- last_location = location
- location = os.path.dirname(location)
- if location == last_location:
- # We've traversed up to the root of the filesystem without
- # finding a Python project.
- logger.warning(
- "Could not find Python project for directory %s (tried all "
- "parent directories)",
- orig_location,
- )
- raise RemoteNotFoundError
-
- url, _rev = cls._get_svn_url_rev(location)
- if url is None:
- raise RemoteNotFoundError
-
- return url
-
- @classmethod
- def _get_svn_url_rev(cls, location: str) -> Tuple[Optional[str], int]:
- from pip._internal.exceptions import InstallationError
-
- entries_path = os.path.join(location, cls.dirname, "entries")
- if os.path.exists(entries_path):
- with open(entries_path) as f:
- data = f.read()
- else: # subversion >= 1.7 does not have the 'entries' file
- data = ""
-
- url = None
- if data.startswith("8") or data.startswith("9") or data.startswith("10"):
- entries = list(map(str.splitlines, data.split("\n\x0c\n")))
- del entries[0][0] # get rid of the '8'
- url = entries[0][3]
- revs = [int(d[9]) for d in entries if len(d) > 9 and d[9]] + [0]
- elif data.startswith("<?xml"):
- match = _svn_xml_url_re.search(data)
- if not match:
- raise ValueError(f"Badly formatted data: {data!r}")
- url = match.group(1) # get repository URL
- revs = [int(m.group(1)) for m in _svn_rev_re.finditer(data)] + [0]
- else:
- try:
- # subversion >= 1.7
- # Note that using get_remote_call_options is not necessary here
- # because `svn info` is being run against a local directory.
- # We don't need to worry about making sure interactive mode
- # is being used to prompt for passwords, because passwords
- # are only potentially needed for remote server requests.
- xml = cls.run_command(
- ["info", "--xml", location],
- show_stdout=False,
- stdout_only=True,
- )
- match = _svn_info_xml_url_re.search(xml)
- assert match is not None
- url = match.group(1)
- revs = [int(m.group(1)) for m in _svn_info_xml_rev_re.finditer(xml)]
- except InstallationError:
- url, revs = None, []
-
- if revs:
- rev = max(revs)
- else:
- rev = 0
-
- return url, rev
-
- @classmethod
- def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
- """Always assume the versions don't match"""
- return False
-
- def __init__(self, use_interactive: Optional[bool] = None) -> None:
- if use_interactive is None:
- use_interactive = is_console_interactive()
- self.use_interactive = use_interactive
-
- # This member is used to cache the fetched version of the current
- # ``svn`` client.
- # Special value definitions:
- # None: Not evaluated yet.
- # Empty tuple: Could not parse version.
- self._vcs_version: Optional[Tuple[int, ...]] = None
-
- super().__init__()
-
- def call_vcs_version(self) -> Tuple[int, ...]:
- """Query the version of the currently installed Subversion client.
-
- :return: A tuple containing the parts of the version information or
- ``()`` if the version returned from ``svn`` could not be parsed.
- :raises: BadCommand: If ``svn`` is not installed.
- """
- # Example versions:
- # svn, version 1.10.3 (r1842928)
- # compiled Feb 25 2019, 14:20:39 on x86_64-apple-darwin17.0.0
- # svn, version 1.7.14 (r1542130)
- # compiled Mar 28 2018, 08:49:13 on x86_64-pc-linux-gnu
- # svn, version 1.12.0-SlikSvn (SlikSvn/1.12.0)
- # compiled May 28 2019, 13:44:56 on x86_64-microsoft-windows6.2
- version_prefix = "svn, version "
- version = self.run_command(["--version"], show_stdout=False, stdout_only=True)
- if not version.startswith(version_prefix):
- return ()
-
- version = version[len(version_prefix) :].split()[0]
- version_list = version.partition("-")[0].split(".")
- try:
- parsed_version = tuple(map(int, version_list))
- except ValueError:
- return ()
-
- return parsed_version
-
- def get_vcs_version(self) -> Tuple[int, ...]:
- """Return the version of the currently installed Subversion client.
-
- If the version of the Subversion client has already been queried,
- a cached value will be used.
-
- :return: A tuple containing the parts of the version information or
- ``()`` if the version returned from ``svn`` could not be parsed.
- :raises: BadCommand: If ``svn`` is not installed.
- """
- if self._vcs_version is not None:
- # Use cached version, if available.
- # If parsing the version failed previously (empty tuple),
- # do not attempt to parse it again.
- return self._vcs_version
-
- vcs_version = self.call_vcs_version()
- self._vcs_version = vcs_version
- return vcs_version
-
- def get_remote_call_options(self) -> CommandArgs:
- """Return options to be used on calls to Subversion that contact the server.
-
- These options are applicable for the following ``svn`` subcommands used
- in this class.
-
- - checkout
- - switch
- - update
-
- :return: A list of command line arguments to pass to ``svn``.
- """
- if not self.use_interactive:
- # --non-interactive switch is available since Subversion 0.14.4.
- # Subversion < 1.8 runs in interactive mode by default.
- return ["--non-interactive"]
-
- svn_version = self.get_vcs_version()
- # By default, Subversion >= 1.8 runs in non-interactive mode if
- # stdin is not a TTY. Since that is how pip invokes SVN, in
- # call_subprocess(), pip must pass --force-interactive to ensure
- # the user can be prompted for a password, if required.
- # SVN added the --force-interactive option in SVN 1.8. Since
- # e.g. RHEL/CentOS 7, which is supported until 2024, ships with
- # SVN 1.7, pip should continue to support SVN 1.7. Therefore, pip
- # can't safely add the option if the SVN version is < 1.8 (or unknown).
- if svn_version >= (1, 8):
- return ["--force-interactive"]
-
- return []
-
- def fetch_new(
- self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
- ) -> None:
- rev_display = rev_options.to_display()
- logger.info(
- "Checking out %s%s to %s",
- url,
- rev_display,
- display_path(dest),
- )
- if verbosity <= 0:
- flag = "--quiet"
- else:
- flag = ""
- cmd_args = make_command(
- "checkout",
- flag,
- self.get_remote_call_options(),
- rev_options.to_args(),
- url,
- dest,
- )
- self.run_command(cmd_args)
-
- def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
- cmd_args = make_command(
- "switch",
- self.get_remote_call_options(),
- rev_options.to_args(),
- url,
- dest,
- )
- self.run_command(cmd_args)
-
- def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
- cmd_args = make_command(
- "update",
- self.get_remote_call_options(),
- rev_options.to_args(),
- dest,
- )
- self.run_command(cmd_args)
-
-
-vcs.register(Subversion)
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/vcs/versioncontrol.py b/venv/lib/python3.11/site-packages/pip/_internal/vcs/versioncontrol.py
deleted file mode 100644
index 46ca279..0000000
--- a/venv/lib/python3.11/site-packages/pip/_internal/vcs/versioncontrol.py
+++ /dev/null
@@ -1,705 +0,0 @@
-"""Handles all VCS (version control) support"""
-
-import logging
-import os
-import shutil
-import sys
-import urllib.parse
-from typing import (
- TYPE_CHECKING,
- Any,
- Dict,
- Iterable,
- Iterator,
- List,
- Mapping,
- Optional,
- Tuple,
- Type,
- Union,
-)
-
-from pip._internal.cli.spinners import SpinnerInterface
-from pip._internal.exceptions import BadCommand, InstallationError
-from pip._internal.utils.misc import (
- HiddenText,
- ask_path_exists,
- backup_dir,
- display_path,
- hide_url,
- hide_value,
- is_installable_dir,
- rmtree,
-)
-from pip._internal.utils.subprocess import (
- CommandArgs,
- call_subprocess,
- format_command_args,
- make_command,
-)
-from pip._internal.utils.urls import get_url_scheme
-
-if TYPE_CHECKING:
- # Literal was introduced in Python 3.8.
- #
- # TODO: Remove `if TYPE_CHECKING` when dropping support for Python 3.7.
- from typing import Literal
-
-
-__all__ = ["vcs"]
-
-
-logger = logging.getLogger(__name__)
-
-AuthInfo = Tuple[Optional[str], Optional[str]]
-
-
-def is_url(name: str) -> bool:
- """
- Return true if the name looks like a URL.
- """
- scheme = get_url_scheme(name)
- if scheme is None:
- return False
- return scheme in ["http", "https", "file", "ftp"] + vcs.all_schemes
-
-
-def make_vcs_requirement_url(
- repo_url: str, rev: str, project_name: str, subdir: Optional[str] = None
-) -> str:
- """
- Return the URL for a VCS requirement.
-
- Args:
- repo_url: the remote VCS url, with any needed VCS prefix (e.g. "git+").
- project_name: the (unescaped) project name.
- """
- egg_project_name = project_name.replace("-", "_")
- req = f"{repo_url}@{rev}#egg={egg_project_name}"
- if subdir:
- req += f"&subdirectory={subdir}"
-
- return req
-
-
-def find_path_to_project_root_from_repo_root(
- location: str, repo_root: str
-) -> Optional[str]:
- """
- Find the the Python project's root by searching up the filesystem from
- `location`. Return the path to project root relative to `repo_root`.
- Return None if the project root is `repo_root`, or cannot be found.
- """
- # find project root.
- orig_location = location
- while not is_installable_dir(location):
- last_location = location
- location = os.path.dirname(location)
- if location == last_location:
- # We've traversed up to the root of the filesystem without
- # finding a Python project.
- logger.warning(
- "Could not find a Python project for directory %s (tried all "
- "parent directories)",
- orig_location,
- )
- return None
-
- if os.path.samefile(repo_root, location):
- return None
-
- return os.path.relpath(location, repo_root)
-
-
-class RemoteNotFoundError(Exception):
- pass
-
-
-class RemoteNotValidError(Exception):
- def __init__(self, url: str):
- super().__init__(url)
- self.url = url
-
-
-class RevOptions:
-
- """
- Encapsulates a VCS-specific revision to install, along with any VCS
- install options.
-
- Instances of this class should be treated as if immutable.
- """
-
- def __init__(
- self,
- vc_class: Type["VersionControl"],
- rev: Optional[str] = None,
- extra_args: Optional[CommandArgs] = None,
- ) -> None:
- """
- Args:
- vc_class: a VersionControl subclass.
- rev: the name of the revision to install.
- extra_args: a list of extra options.
- """
- if extra_args is None:
- extra_args = []
-
- self.extra_args = extra_args
- self.rev = rev
- self.vc_class = vc_class
- self.branch_name: Optional[str] = None
-
- def __repr__(self) -> str:
- return f"<RevOptions {self.vc_class.name}: rev={self.rev!r}>"
-
- @property
- def arg_rev(self) -> Optional[str]:
- if self.rev is None:
- return self.vc_class.default_arg_rev
-
- return self.rev
-
- def to_args(self) -> CommandArgs:
- """
- Return the VCS-specific command arguments.
- """
- args: CommandArgs = []
- rev = self.arg_rev
- if rev is not None:
- args += self.vc_class.get_base_rev_args(rev)
- args += self.extra_args
-
- return args
-
- def to_display(self) -> str:
- if not self.rev:
- return ""
-
- return f" (to revision {self.rev})"
-
- def make_new(self, rev: str) -> "RevOptions":
- """
- Make a copy of the current instance, but with a new rev.
-
- Args:
- rev: the name of the revision for the new object.
- """
- return self.vc_class.make_rev_options(rev, extra_args=self.extra_args)
-
-
-class VcsSupport:
- _registry: Dict[str, "VersionControl"] = {}
- schemes = ["ssh", "git", "hg", "bzr", "sftp", "svn"]
-
- def __init__(self) -> None:
- # Register more schemes with urlparse for various version control
- # systems
- urllib.parse.uses_netloc.extend(self.schemes)
- super().__init__()
-
- def __iter__(self) -> Iterator[str]:
- return self._registry.__iter__()
-
- @property
- def backends(self) -> List["VersionControl"]:
- return list(self._registry.values())
-
- @property
- def dirnames(self) -> List[str]:
- return [backend.dirname for backend in self.backends]
-
- @property
- def all_schemes(self) -> List[str]:
- schemes: List[str] = []
- for backend in self.backends:
- schemes.extend(backend.schemes)
- return schemes
-
- def register(self, cls: Type["VersionControl"]) -> None:
- if not hasattr(cls, "name"):
- logger.warning("Cannot register VCS %s", cls.__name__)
- return
- if cls.name not in self._registry:
- self._registry[cls.name] = cls()
- logger.debug("Registered VCS backend: %s", cls.name)
-
- def unregister(self, name: str) -> None:
- if name in self._registry:
- del self._registry[name]
-
- def get_backend_for_dir(self, location: str) -> Optional["VersionControl"]:
- """
- Return a VersionControl object if a repository of that type is found
- at the given directory.
- """
- vcs_backends = {}
- for vcs_backend in self._registry.values():
- repo_path = vcs_backend.get_repository_root(location)
- if not repo_path:
- continue
- logger.debug("Determine that %s uses VCS: %s", location, vcs_backend.name)
- vcs_backends[repo_path] = vcs_backend
-
- if not vcs_backends:
- return None
-
- # Choose the VCS in the inner-most directory. Since all repository
- # roots found here would be either `location` or one of its
- # parents, the longest path should have the most path components,
- # i.e. the backend representing the inner-most repository.
- inner_most_repo_path = max(vcs_backends, key=len)
- return vcs_backends[inner_most_repo_path]
-
- def get_backend_for_scheme(self, scheme: str) -> Optional["VersionControl"]:
- """
- Return a VersionControl object or None.
- """
- for vcs_backend in self._registry.values():
- if scheme in vcs_backend.schemes:
- return vcs_backend
- return None
-
- def get_backend(self, name: str) -> Optional["VersionControl"]:
- """
- Return a VersionControl object or None.
- """
- name = name.lower()
- return self._registry.get(name)
-
-
-vcs = VcsSupport()
-
-
-class VersionControl:
- name = ""
- dirname = ""
- repo_name = ""
- # List of supported schemes for this Version Control
- schemes: Tuple[str, ...] = ()
- # Iterable of environment variable names to pass to call_subprocess().
- unset_environ: Tuple[str, ...] = ()
- default_arg_rev: Optional[str] = None
-
- @classmethod
- def should_add_vcs_url_prefix(cls, remote_url: str) -> bool:
- """
- Return whether the vcs prefix (e.g. "git+") should be added to a
- repository's remote url when used in a requirement.
- """
- return not remote_url.lower().startswith(f"{cls.name}:")
-
- @classmethod
- def get_subdirectory(cls, location: str) -> Optional[str]:
- """
- Return the path to Python project root, relative to the repo root.
- Return None if the project root is in the repo root.
- """
- return None
-
- @classmethod
- def get_requirement_revision(cls, repo_dir: str) -> str:
- """
- Return the revision string that should be used in a requirement.
- """
- return cls.get_revision(repo_dir)
-
- @classmethod
- def get_src_requirement(cls, repo_dir: str, project_name: str) -> str:
- """
- Return the requirement string to use to redownload the files
- currently at the given repository directory.
-
- Args:
- project_name: the (unescaped) project name.
-
- The return value has a form similar to the following:
-
- {repository_url}@{revision}#egg={project_name}
- """
- repo_url = cls.get_remote_url(repo_dir)
-
- if cls.should_add_vcs_url_prefix(repo_url):
- repo_url = f"{cls.name}+{repo_url}"
-
- revision = cls.get_requirement_revision(repo_dir)
- subdir = cls.get_subdirectory(repo_dir)
- req = make_vcs_requirement_url(repo_url, revision, project_name, subdir=subdir)
-
- return req
-
- @staticmethod
- def get_base_rev_args(rev: str) -> List[str]:
- """
- Return the base revision arguments for a vcs command.
-
- Args:
- rev: the name of a revision to install. Cannot be None.
- """
- raise NotImplementedError
-
- def is_immutable_rev_checkout(self, url: str, dest: str) -> bool:
- """
- Return true if the commit hash checked out at dest matches
- the revision in url.
-
- Always return False, if the VCS does not support immutable commit
- hashes.
-
- This method does not check if there are local uncommitted changes
- in dest after checkout, as pip currently has no use case for that.
- """
- return False
-
- @classmethod
- def make_rev_options(
- cls, rev: Optional[str] = None, extra_args: Optional[CommandArgs] = None
- ) -> RevOptions:
- """
- Return a RevOptions object.
-
- Args:
- rev: the name of a revision to install.
- extra_args: a list of extra options.
- """
- return RevOptions(cls, rev, extra_args=extra_args)
-
- @classmethod
- def _is_local_repository(cls, repo: str) -> bool:
- """
- posix absolute paths start with os.path.sep,
- win32 ones start with drive (like c:\\folder)
- """
- drive, tail = os.path.splitdrive(repo)
- return repo.startswith(os.path.sep) or bool(drive)
-
- @classmethod
- def get_netloc_and_auth(
- cls, netloc: str, scheme: str
- ) -> Tuple[str, Tuple[Optional[str], Optional[str]]]:
- """
- Parse the repository URL's netloc, and return the new netloc to use
- along with auth information.
-
- Args:
- netloc: the original repository URL netloc.
- scheme: the repository URL's scheme without the vcs prefix.
-
- This is mainly for the Subversion class to override, so that auth
- information can be provided via the --username and --password options
- instead of through the URL. For other subclasses like Git without
- such an option, auth information must stay in the URL.
-
- Returns: (netloc, (username, password)).
- """
- return netloc, (None, None)
-
- @classmethod
- def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
- """
- Parse the repository URL to use, and return the URL, revision,
- and auth info to use.
-
- Returns: (url, rev, (username, password)).
- """
- scheme, netloc, path, query, frag = urllib.parse.urlsplit(url)
- if "+" not in scheme:
- raise ValueError(
- f"Sorry, {url!r} is a malformed VCS url. "
- "The format is <vcs>+<protocol>://<url>, "
- "e.g. svn+http://myrepo/svn/MyApp#egg=MyApp"
- )
- # Remove the vcs prefix.
- scheme = scheme.split("+", 1)[1]
- netloc, user_pass = cls.get_netloc_and_auth(netloc, scheme)
- rev = None
- if "@" in path:
- path, rev = path.rsplit("@", 1)
- if not rev:
- raise InstallationError(
- f"The URL {url!r} has an empty revision (after @) "
- "which is not supported. Include a revision after @ "
- "or remove @ from the URL."
- )
- url = urllib.parse.urlunsplit((scheme, netloc, path, query, ""))
- return url, rev, user_pass
-
- @staticmethod
- def make_rev_args(
- username: Optional[str], password: Optional[HiddenText]
- ) -> CommandArgs:
- """
- Return the RevOptions "extra arguments" to use in obtain().
- """
- return []
-
- def get_url_rev_options(self, url: HiddenText) -> Tuple[HiddenText, RevOptions]:
- """
- Return the URL and RevOptions object to use in obtain(),
- as a tuple (url, rev_options).
- """
- secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret)
- username, secret_password = user_pass
- password: Optional[HiddenText] = None
- if secret_password is not None:
- password = hide_value(secret_password)
- extra_args = self.make_rev_args(username, password)
- rev_options = self.make_rev_options(rev, extra_args=extra_args)
-
- return hide_url(secret_url), rev_options
-
- @staticmethod
- def normalize_url(url: str) -> str:
- """
- Normalize a URL for comparison by unquoting it and removing any
- trailing slash.
- """
- return urllib.parse.unquote(url).rstrip("/")
-
- @classmethod
- def compare_urls(cls, url1: str, url2: str) -> bool:
- """
- Compare two repo URLs for identity, ignoring incidental differences.
- """
- return cls.normalize_url(url1) == cls.normalize_url(url2)
-
- def fetch_new(
- self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
- ) -> None:
- """
- Fetch a revision from a repository, in the case that this is the
- first fetch from the repository.
-
- Args:
- dest: the directory to fetch the repository to.
- rev_options: a RevOptions object.
- verbosity: verbosity level.
- """
- raise NotImplementedError
-
- def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
- """
- Switch the repo at ``dest`` to point to ``URL``.
-
- Args:
- rev_options: a RevOptions object.
- """
- raise NotImplementedError
-
- def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
- """
- Update an already-existing repo to the given ``rev_options``.
-
- Args:
- rev_options: a RevOptions object.
- """
- raise NotImplementedError
-
- @classmethod
- def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
- """
- Return whether the id of the current commit equals the given name.
-
- Args:
- dest: the repository directory.
- name: a string name.
- """
- raise NotImplementedError
-
- def obtain(self, dest: str, url: HiddenText, verbosity: int) -> None:
- """
- Install or update in editable mode the package represented by this
- VersionControl object.
-
- :param dest: the repository directory in which to install or update.
- :param url: the repository URL starting with a vcs prefix.
- :param verbosity: verbosity level.
- """
- url, rev_options = self.get_url_rev_options(url)
-
- if not os.path.exists(dest):
- self.fetch_new(dest, url, rev_options, verbosity=verbosity)
- return
-
- rev_display = rev_options.to_display()
- if self.is_repository_directory(dest):
- existing_url = self.get_remote_url(dest)
- if self.compare_urls(existing_url, url.secret):
- logger.debug(
- "%s in %s exists, and has correct URL (%s)",
- self.repo_name.title(),
- display_path(dest),
- url,
- )
- if not self.is_commit_id_equal(dest, rev_options.rev):
- logger.info(
- "Updating %s %s%s",
- display_path(dest),
- self.repo_name,
- rev_display,
- )
- self.update(dest, url, rev_options)
- else:
- logger.info("Skipping because already up-to-date.")
- return
-
- logger.warning(
- "%s %s in %s exists with URL %s",
- self.name,
- self.repo_name,
- display_path(dest),
- existing_url,
- )
- prompt = ("(s)witch, (i)gnore, (w)ipe, (b)ackup ", ("s", "i", "w", "b"))
- else:
- logger.warning(
- "Directory %s already exists, and is not a %s %s.",
- dest,
- self.name,
- self.repo_name,
- )
- # https://github.com/python/mypy/issues/1174
- prompt = ("(i)gnore, (w)ipe, (b)ackup ", ("i", "w", "b")) # type: ignore
-
- logger.warning(
- "The plan is to install the %s repository %s",
- self.name,
- url,
- )
- response = ask_path_exists(f"What to do? {prompt[0]}", prompt[1])
-
- if response == "a":
- sys.exit(-1)
-
- if response == "w":
- logger.warning("Deleting %s", display_path(dest))
- rmtree(dest)
- self.fetch_new(dest, url, rev_options, verbosity=verbosity)
- return
-
- if response == "b":
- dest_dir = backup_dir(dest)
- logger.warning("Backing up %s to %s", display_path(dest), dest_dir)
- shutil.move(dest, dest_dir)
- self.fetch_new(dest, url, rev_options, verbosity=verbosity)
- return
-
- # Do nothing if the response is "i".
- if response == "s":
- logger.info(
- "Switching %s %s to %s%s",
- self.repo_name,
- display_path(dest),
- url,
- rev_display,
- )
- self.switch(dest, url, rev_options)
-
- def unpack(self, location: str, url: HiddenText, verbosity: int) -> None:
- """
- Clean up current location and download the url repository
- (and vcs infos) into location
-
- :param url: the repository URL starting with a vcs prefix.
- :param verbosity: verbosity level.
- """
- if os.path.exists(location):
- rmtree(location)
- self.obtain(location, url=url, verbosity=verbosity)
-
- @classmethod
- def get_remote_url(cls, location: str) -> str:
- """
- Return the url used at location
-
- Raises RemoteNotFoundError if the repository does not have a remote
- url configured.
- """
- raise NotImplementedError
-
- @classmethod
- def get_revision(cls, location: str) -> str:
- """
- Return the current commit id of the files at the given location.
- """
- raise NotImplementedError
-
- @classmethod
- def run_command(
- cls,
- cmd: Union[List[str], CommandArgs],
- show_stdout: bool = True,
- cwd: Optional[str] = None,
- on_returncode: 'Literal["raise", "warn", "ignore"]' = "raise",
- extra_ok_returncodes: Optional[Iterable[int]] = None,
- command_desc: Optional[str] = None,
- extra_environ: Optional[Mapping[str, Any]] = None,
- spinner: Optional[SpinnerInterface] = None,
- log_failed_cmd: bool = True,
- stdout_only: bool = False,
- ) -> str:
- """
- Run a VCS subcommand
- This is simply a wrapper around call_subprocess that adds the VCS
- command name, and checks that the VCS is available
- """
- cmd = make_command(cls.name, *cmd)
- if command_desc is None:
- command_desc = format_command_args(cmd)
- try:
- return call_subprocess(
- cmd,
- show_stdout,
- cwd,
- on_returncode=on_returncode,
- extra_ok_returncodes=extra_ok_returncodes,
- command_desc=command_desc,
- extra_environ=extra_environ,
- unset_environ=cls.unset_environ,
- spinner=spinner,
- log_failed_cmd=log_failed_cmd,
- stdout_only=stdout_only,
- )
- except FileNotFoundError:
- # errno.ENOENT = no such file or directory
- # In other words, the VCS executable isn't available
- raise BadCommand(
- f"Cannot find command {cls.name!r} - do you have "
- f"{cls.name!r} installed and in your PATH?"
- )
- except PermissionError:
- # errno.EACCES = Permission denied
- # This error occurs, for instance, when the command is installed
- # only for another user. So, the current user don't have
- # permission to call the other user command.
- raise BadCommand(
- f"No permission to execute {cls.name!r} - install it "
- f"locally, globally (ask admin), or check your PATH. "
- f"See possible solutions at "
- f"https://pip.pypa.io/en/latest/reference/pip_freeze/"
- f"#fixing-permission-denied."
- )
-
- @classmethod
- def is_repository_directory(cls, path: str) -> bool:
- """
- Return whether a directory path is a repository directory.
- """
- logger.debug("Checking in %s for %s (%s)...", path, cls.dirname, cls.name)
- return os.path.exists(os.path.join(path, cls.dirname))
-
- @classmethod
- def get_repository_root(cls, location: str) -> Optional[str]:
- """
- Return the "root" (top-level) directory controlled by the vcs,
- or `None` if the directory is not in any.
-
- It is meant to be overridden to implement smarter detection
- mechanisms for specific vcs.
-
- This can do more than is_repository_directory() alone. For
- example, the Git override checks that Git is actually available.
- """
- if cls.is_repository_directory(location):
- return location
- return None