summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/setuptools/archive_util.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/setuptools/archive_util.py')
-rw-r--r--venv/lib/python3.11/site-packages/setuptools/archive_util.py213
1 files changed, 213 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/setuptools/archive_util.py b/venv/lib/python3.11/site-packages/setuptools/archive_util.py
new file mode 100644
index 0000000..d8e10c1
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/setuptools/archive_util.py
@@ -0,0 +1,213 @@
+"""Utilities for extracting common archive formats"""
+
+import zipfile
+import tarfile
+import os
+import shutil
+import posixpath
+import contextlib
+from distutils.errors import DistutilsError
+
+from ._path import ensure_directory
+
+__all__ = [
+ "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter",
+ "UnrecognizedFormat", "extraction_drivers", "unpack_directory",
+]
+
+
+class UnrecognizedFormat(DistutilsError):
+ """Couldn't recognize the archive type"""
+
+
+def default_filter(src, dst):
+ """The default progress/filter callback; returns True for all files"""
+ return dst
+
+
+def unpack_archive(
+ filename, extract_dir, progress_filter=default_filter,
+ drivers=None):
+ """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat``
+
+ `progress_filter` is a function taking two arguments: a source path
+ internal to the archive ('/'-separated), and a filesystem path where it
+ will be extracted. The callback must return the desired extract path
+ (which may be the same as the one passed in), or else ``None`` to skip
+ that file or directory. The callback can thus be used to report on the
+ progress of the extraction, as well as to filter the items extracted or
+ alter their extraction paths.
+
+ `drivers`, if supplied, must be a non-empty sequence of functions with the
+ same signature as this function (minus the `drivers` argument), that raise
+ ``UnrecognizedFormat`` if they do not support extracting the designated
+ archive type. The `drivers` are tried in sequence until one is found that
+ does not raise an error, or until all are exhausted (in which case
+ ``UnrecognizedFormat`` is raised). If you do not supply a sequence of
+ drivers, the module's ``extraction_drivers`` constant will be used, which
+ means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that
+ order.
+ """
+ for driver in drivers or extraction_drivers:
+ try:
+ driver(filename, extract_dir, progress_filter)
+ except UnrecognizedFormat:
+ continue
+ else:
+ return
+ else:
+ raise UnrecognizedFormat(
+ "Not a recognized archive type: %s" % filename
+ )
+
+
+def unpack_directory(filename, extract_dir, progress_filter=default_filter):
+ """"Unpack" a directory, using the same interface as for archives
+
+ Raises ``UnrecognizedFormat`` if `filename` is not a directory
+ """
+ if not os.path.isdir(filename):
+ raise UnrecognizedFormat("%s is not a directory" % filename)
+
+ paths = {
+ filename: ('', extract_dir),
+ }
+ for base, dirs, files in os.walk(filename):
+ src, dst = paths[base]
+ for d in dirs:
+ paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
+ for f in files:
+ target = os.path.join(dst, f)
+ target = progress_filter(src + f, target)
+ if not target:
+ # skip non-files
+ continue
+ ensure_directory(target)
+ f = os.path.join(base, f)
+ shutil.copyfile(f, target)
+ shutil.copystat(f, target)
+
+
+def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
+ """Unpack zip `filename` to `extract_dir`
+
+ Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined
+ by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation
+ of the `progress_filter` argument.
+ """
+
+ if not zipfile.is_zipfile(filename):
+ raise UnrecognizedFormat("%s is not a zip file" % (filename,))
+
+ with zipfile.ZipFile(filename) as z:
+ _unpack_zipfile_obj(z, extract_dir, progress_filter)
+
+
+def _unpack_zipfile_obj(zipfile_obj, extract_dir, progress_filter=default_filter):
+ """Internal/private API used by other parts of setuptools.
+ Similar to ``unpack_zipfile``, but receives an already opened :obj:`zipfile.ZipFile`
+ object instead of a filename.
+ """
+ for info in zipfile_obj.infolist():
+ name = info.filename
+
+ # don't extract absolute paths or ones with .. in them
+ if name.startswith('/') or '..' in name.split('/'):
+ continue
+
+ target = os.path.join(extract_dir, *name.split('/'))
+ target = progress_filter(name, target)
+ if not target:
+ continue
+ if name.endswith('/'):
+ # directory
+ ensure_directory(target)
+ else:
+ # file
+ ensure_directory(target)
+ data = zipfile_obj.read(info.filename)
+ with open(target, 'wb') as f:
+ f.write(data)
+ unix_attributes = info.external_attr >> 16
+ if unix_attributes:
+ os.chmod(target, unix_attributes)
+
+
+def _resolve_tar_file_or_dir(tar_obj, tar_member_obj):
+ """Resolve any links and extract link targets as normal files."""
+ while tar_member_obj is not None and (
+ tar_member_obj.islnk() or tar_member_obj.issym()):
+ linkpath = tar_member_obj.linkname
+ if tar_member_obj.issym():
+ base = posixpath.dirname(tar_member_obj.name)
+ linkpath = posixpath.join(base, linkpath)
+ linkpath = posixpath.normpath(linkpath)
+ tar_member_obj = tar_obj._getmember(linkpath)
+
+ is_file_or_dir = (
+ tar_member_obj is not None and
+ (tar_member_obj.isfile() or tar_member_obj.isdir())
+ )
+ if is_file_or_dir:
+ return tar_member_obj
+
+ raise LookupError('Got unknown file type')
+
+
+def _iter_open_tar(tar_obj, extract_dir, progress_filter):
+ """Emit member-destination pairs from a tar archive."""
+ # don't do any chowning!
+ tar_obj.chown = lambda *args: None
+
+ with contextlib.closing(tar_obj):
+ for member in tar_obj:
+ name = member.name
+ # don't extract absolute paths or ones with .. in them
+ if name.startswith('/') or '..' in name.split('/'):
+ continue
+
+ prelim_dst = os.path.join(extract_dir, *name.split('/'))
+
+ try:
+ member = _resolve_tar_file_or_dir(tar_obj, member)
+ except LookupError:
+ continue
+
+ final_dst = progress_filter(name, prelim_dst)
+ if not final_dst:
+ continue
+
+ if final_dst.endswith(os.sep):
+ final_dst = final_dst[:-1]
+
+ yield member, final_dst
+
+
+def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
+ """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
+
+ Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
+ by ``tarfile.open()``). See ``unpack_archive()`` for an explanation
+ of the `progress_filter` argument.
+ """
+ try:
+ tarobj = tarfile.open(filename)
+ except tarfile.TarError as e:
+ raise UnrecognizedFormat(
+ "%s is not a compressed or uncompressed tar file" % (filename,)
+ ) from e
+
+ for member, final_dst in _iter_open_tar(
+ tarobj, extract_dir, progress_filter,
+ ):
+ try:
+ # XXX Ugh
+ tarobj._extract_member(member, final_dst)
+ except tarfile.ExtractError:
+ # chown/chmod/mkfifo/mknode/makedev failed
+ pass
+
+ return True
+
+
+extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile