Source code for pathlib_mate.mate_tool_box_zip
# -*- coding: utf-8 -*-
"""
Provide zip related functions.
"""
from typing import TYPE_CHECKING, Optional, List, Union
import os
import random
import string
from datetime import datetime
from zipfile import ZipFile, ZIP_STORED, ZIP_DEFLATED
from .mate_path_filters import all_true
from .helper import repr_data_size
if TYPE_CHECKING: # pragma: no cover
from .pathlib2 import Path
alpha_digits = string.ascii_letters + string.digits
[docs]def rand_str(length):
"""
:type length: int
:rtype: str
"""
return "".join([random.choice(alpha_digits) for _ in range(length)])
[docs]class ToolBoxZip(object):
"""
Provide zip related functions.
"""
def _default_zip_dst(self):
"""
automatically create destination zip file ``Path`` object.
:type self: Path
:rtype: Path
"""
new_basename = "{}-{}-{}.zip".format(
self.basename,
datetime.now().strftime("%Y-%m-%d-%Hh-%Mm-%Ss"),
rand_str(4),
)
return self.change(new_basename=new_basename)
[docs] def make_zip_archive(
self,
dst=None,
filters=all_true,
compress=True,
overwrite=False,
makedirs=False,
include_dir=True,
verbose=False,
):
"""
Make a zip archive of a directory or a file.
:type self: Path
:type dst: Optional[Union[Path, str]]
:param dst: output file path. if not given, will be automatically assigned.
:type filters: typing.Callable
:param filters: custom path filter. By default it allows any file.
:type compress: bool
:param compress: compress or not.
:type verbose: bool
:param overwrite: overwrite exists or not.
:type makedirs: bool
:param makedirs: if True, automatically create the parent dir if not
exists
:type include_dir: bool
:param include_dir: if True, then you will see the source dir when you
unzip it. It only apply when zipping a directory
:type verbose: bool
:param verbose: display log or not.
"""
self.assert_exists()
if dst is None:
dst = self._default_zip_dst()
else:
dst = self.change(new_abspath=dst)
if not dst.basename.lower().endswith(".zip"):
raise ValueError("zip archive name has to be endswith '.zip'!")
if dst.exists():
if not overwrite:
raise IOError("'%s' already exists!" % dst)
if compress:
compression = ZIP_DEFLATED
else:
compression = ZIP_STORED
if not dst.parent.exists():
if makedirs: # pragma: no cover
os.makedirs(dst.parent.abspath)
if verbose:
msg = "Making zip archive for '%s' ..." % self
print(msg)
if self.is_dir():
total_size = 0
selected = list()
for p in self.glob("**/*"):
if filters(p):
selected.append(p)
total_size += p.size
if verbose:
msg = "Got {} files, total size is {}, compressing ...".format(
len(selected),
repr_data_size(total_size),
)
print(msg)
with ZipFile(dst.abspath, "w", compression) as f:
if include_dir:
relpath_root = self.parent
else:
relpath_root = self
for p in selected:
relpath = p.relative_to(relpath_root).__str__()
f.write(p.abspath, relpath)
elif self.is_file():
with ZipFile(dst.abspath, "w", compression) as f:
f.write(self.abspath, self.basename)
if verbose:
msg = "Complete! Archive size is {}.".format(dst.size_in_text)
print(msg)
[docs] def backup(
self,
dst=None,
ignore=None,
ignore_ext=None,
ignore_pattern=None,
ignore_size_smaller_than=None,
ignore_size_larger_than=None,
case_sensitive=False,
include_dir=True,
verbose=True,
): # pragma: no cover
"""
Create a compressed zip archive backup for a directory.
:type self: Path
:type dst: Optional[Union[Path, str]]
:param dst: the output file path.
:type ignore: Optional[List[str]]
:param ignore: file or directory defined in this list will be ignored.
:type ignore_ext: Optional[List[str]]
:param ignore_ext: file with extensions defined in this list will be ignored.
:type ignore_pattern: Optional[List[str]]
:param ignore_pattern: any file or directory that contains this pattern
will be ignored.
:type ignore_size_smaller_than: int
:param ignore_size_smaller_than: any file size smaller than this
will be ignored.
:type ignore_size_larger_than: int
:param ignore_size_larger_than: any file size larger than this
will be ignored.
:type case_sensitive: bool
:param case_sensitive: if True, the ignore rules are case sensitive
:type include_dir: bool
:param include_dir: if True, then you will see the source dir when you
unzip it. It only apply when zipping a directory
:type verbose: bool
:param verbose: display log or not.
**中文文档**
为一个目录创建一个备份压缩包。可以通过过滤器选择你要备份的文件。
"""
def preprocess_arg(arg): # pragma: no cover
if arg is None:
return []
if isinstance(arg, (tuple, list)):
return list(arg)
else:
return [
arg,
]
ignore = preprocess_arg(ignore)
for i in ignore:
if i.startswith("/") or i.startswith("\\"):
raise ValueError
ignore_ext = preprocess_arg(ignore_ext)
for ext in ignore_ext:
if not ext.startswith("."):
raise ValueError
ignore_pattern = preprocess_arg(ignore_pattern)
if case_sensitive:
pass
else:
ignore = [i.lower() for i in ignore]
ignore_ext = [i.lower() for i in ignore_ext]
ignore_pattern = [i.lower() for i in ignore_pattern]
def filters(p):
relpath = p.relative_to(self).abspath
if not case_sensitive:
relpath = relpath.lower()
# ignore
for i in ignore:
if relpath.startswith(i):
return False
# ignore_ext
if case_sensitive:
ext = p.ext
else:
ext = p.ext.lower()
if ext in ignore_ext:
return False
# ignore_pattern
for pattern in ignore_pattern:
if pattern in relpath:
return False
# ignore_size_smaller_than
if ignore_size_smaller_than:
if p.size < ignore_size_smaller_than:
return False
# ignore_size_larger_than
if ignore_size_larger_than:
if p.size > ignore_size_larger_than:
return False
return True
self.make_zip_archive(
dst=dst,
filters=filters,
compress=True,
overwrite=False,
include_dir=include_dir,
verbose=verbose,
)