Source code for pathlib_mate.mate_tool_box

# -*- coding: utf-8 -*-

"""
File system utility tool box. mimic linux ``md5``, ``zip``, etc...
"""

from typing import TYPE_CHECKING, List
import os
import warnings
import hashlib
import contextlib

from .vendor import six
from .vendor.fileutils import atomic_save

from .mate_path_filters import all_true
from .helper import repr_data_size
from .mate_tool_box_zip import ToolBoxZip

if TYPE_CHECKING:  # pragma: no cover
    from .pathlib2 import Path


[docs]class ToolBox(ToolBoxZip):
[docs] def get_dir_fingerprint(self, hash_meth): """ Return md5 fingerprint of a directory. Calculation is based on iterate recursively through all files, ordered by absolute path, and stream in md5 for each file. :type self: Path :type hash_meth: Callable :rtype: str """ m = hash_meth() for p in self.sort_by_abspath(self.select_file(recursive=True)): m.update(str(p).encode("utf-8")) m.update(p.md5.encode("utf-8")) return m.hexdigest()
@property def dir_md5(self): """ Return md5 fingerprint of a directory. See :meth:`ToolBox.get_dir_fingerprint` for details :type self: Path :rtype: str """ return self.get_dir_fingerprint(hashlib.md5) @property def dir_sha256(self): """ Return sha256 fingerprint of a directory. See :meth:`ToolBox.get_dir_fingerprint` for details :type self: Path :rtype: str """ return self.get_dir_fingerprint(hashlib.sha256) @property def dir_sha512(self): """ Return sha512 fingerprint of a directory. See :meth:`ToolBox.get_dir_fingerprint` for details :type self: Path :rtype: str """ return self.get_dir_fingerprint(hashlib.sha512)
[docs] def is_empty(self, strict=True): """ If it's a file, check if it is a empty file. (0 bytes content) If it's a directory, check if there's no file and dir in it. But if ``strict = False``, then only check if there's no file in it. :type self: Path :type strict: bool :param strict: only useful when it is a directory. if True, only return True if this dir has no dir and file. if False, return True if it doesn't have any file. :rtype: bool """ if self.exists(): if self.is_file(): return self.size == 0 elif self.is_dir(): if strict: return len(list(self.select(recursive=True))) == 0 else: # pragma: no cover return len(list(self.select_file(recursive=True))) == 0 else: # pragma: no cover msg = "'%s' is not either file or directory! (maybe simlink)" % self raise EnvironmentError(msg) else: raise EnvironmentError("'%s' not exists!" % self)
[docs] def auto_complete_choices(self, case_sensitive=False): """ A command line auto complete similar behavior. Find all item with same prefix of this one. :type self: Path :type case_sensitive: bool :param case_sensitive: toggle if it is case sensitive. :rtype: List[Path] :return: list of :class:`pathlib_mate.pathlib2.Path`. """ self_basename = self.basename self_basename_lower = self.basename.lower() if case_sensitive: # pragma: no cover def match(basename): return basename.startswith(self_basename) else: def match(basename): return basename.lower().startswith(self_basename_lower) choices = list() if self.is_dir(): choices.append(self) for p in self.sort_by_abspath(self.select(recursive=False)): choices.append(p) else: p_parent = self.parent if p_parent.is_dir(): for p in self.sort_by_abspath(p_parent.select(recursive=False)): if match(p.basename): choices.append(p) else: # pragma: no cover raise ValueError("'%s' directory does not exist!" % p_parent) return choices
# --- Directory Exclusive Method ---
[docs] def print_big_dir(self, top_n=5): """ Print ``top_n`` big dir in this dir. :type self: Path :type top_n: int """ self.assert_is_dir_and_exists() size_table = sorted( [(p, p.dirsize) for p in self.select_dir(recursive=False)], key=lambda x: x[1], reverse=True, ) for p, size in size_table[:top_n]: print("{:<9} {:<9}".format(repr_data_size(size), p.abspath))
[docs] def print_big_file(self, top_n=5): """ Print ``top_n`` big file in this dir. :type self: Path :type top_n: int """ self.assert_is_dir_and_exists() size_table = sorted( [(p, p.size) for p in self.select_file(recursive=True)], key=lambda x: x[1], reverse=True, ) for p, size in size_table[:top_n]: print("{:<9} {:<9}".format(repr_data_size(size), p.abspath))
[docs] def print_big_dir_and_big_file(self, top_n=5): """ Print ``top_n`` big dir and ``top_n`` big file in each dir. :type self: Path :type top_n: int """ self.assert_is_dir_and_exists() size_table1 = sorted( [(p, p.dirsize) for p in self.select_dir(recursive=False)], key=lambda x: x[1], reverse=True, ) for p1, size1 in size_table1[:top_n]: print("{:<9} {:<9}".format(repr_data_size(size1), p1.abspath)) size_table2 = sorted( [(p, p.size) for p in p1.select_file(recursive=True)], key=lambda x: x[1], reverse=True, ) for p2, size2 in size_table2[:top_n]: print(" {:<9} {:<9}".format(repr_data_size(size2), p2.abspath))
[docs] def file_stat_for_all(self, filters=all_true): # pragma: no cover """ Find out how many files, directories and total size (Include file in it's sub-folder) it has for each folder and sub-folder. :type self: Path :type filters: Callable :rtype: dict :returns: stat, a dict like ``{"directory path": { "file": number of files, "dir": number of directories, "size": total size in bytes}}`` **中文文档** 返回一个目录中的每个子目录的, 文件, 文件夹, 大小的统计数据。 """ self.assert_is_dir_and_exists() from collections import OrderedDict stat = OrderedDict() stat[self.abspath] = {"file": 0, "dir": 0, "size": 0} for p in self.select(filters=filters, recursive=True): if p.is_file(): size = p.size while 1: parent = p.parent stat[parent.abspath]["file"] += 1 stat[parent.abspath]["size"] += size if parent.abspath == self.abspath: break p = parent elif p.is_dir(): stat[p.abspath] = {"file": 0, "dir": 0, "size": 0} while 1: parent = p.parent stat[parent.abspath]["dir"] += 1 if parent.abspath == self.abspath: break p = parent return stat
[docs] def file_stat(self, filters=all_true): """Find out how many files, directorys and total size (Include file in it's sub-folder). :type self: Path :type filters: Callable :rtype: dict :returns: stat, a dict like ``{"file": number of files, "dir": number of directorys, "size": total size in bytes}`` **中文文档** 返回一个目录中的文件, 文件夹, 大小的统计数据。 """ self.assert_is_dir_and_exists() stat = {"file": 0, "dir": 0, "size": 0} for p in self.select(filters=filters, recursive=True): if p.is_file(): stat["file"] += 1 stat["size"] += p.size elif p.is_dir(): stat["dir"] += 1 return stat
[docs] def mirror_to(self, dst): # pragma: no cover """ Create a new folder having exactly same structure with this directory. However, all files are just empty file with same file name. :type self: Path :type dst: str :param dst: destination directory. The directory can't exists before you execute this. **中文文档** 创建一个目录的镜像拷贝, 与拷贝操作不同的是, 文件的副本只是在文件名上 与原件一致, 但是是空文件, 完全没有内容, 文件大小为0。 """ self.assert_is_dir_and_exists() src = self.abspath dst = os.path.abspath(dst) if os.path.exists(dst): # pragma: no cover raise Exception("distination already exist!") for current_folder, _, file_list in os.walk(self.abspath): current_folder = current_folder.replace(src, dst) try: os.mkdir(current_folder) except: # pragma: no cover pass for basename in file_list: abspath = os.path.join(current_folder, basename) with open(abspath, "wb") as _: pass
[docs] def execute_pyfile(self, py_exe=None): # pragma: no cover """ Execute every ``.py`` file as main script. :type self: Path :type py_exe: str :param py_exe: python command or python executable path. **中文文档** 将目录下的所有 Python 文件作为主脚本用当前解释器运行。 """ warnings.warn( "this feature will be deprecated soon! this is a historical feature", DeprecationWarning, ) import subprocess self.assert_is_dir_and_exists() if py_exe is None: if six.PY2: py_exe = "python2" elif six.PY3: py_exe = "python3" for p in self.select_by_ext(".py"): subprocess.Popen('%s "%s"' % (py_exe, p.abspath))
[docs] def trail_space(self, filters=lambda p: p.ext == ".py"): # pragma: no cover """ Trail white space at end of each line for every ``.py`` file. :type self: Path :type filters: Callable **中文文档** 将目录下的所有被选择的文件中行末的空格删除. """ self.assert_is_dir_and_exists() for p in self.select_file(filters): try: with open(p.abspath, "rb") as f: lines = list() for line in f: lines.append(line.decode("utf-8").rstrip()) with open(p.abspath, "wb") as f: f.write("\n".join(lines).encode("utf-8")) except Exception as e: # pragma: no cover raise e
[docs] def autopep8(self, **kwargs): # pragma: no cover """ Auto convert your python code in a directory to pep8 styled code. :type self: Path :param kwargs: arguments for ``autopep8.fix_code`` method. **中文文档** 将目录下的所有 Python 文件用 pep8 风格格式化. 增加其可读性和规范性. """ warnings.warn( "this feature will be deprecated soon! use subprocess + cli instead", DeprecationWarning, ) try: import autopep8 except ImportError as e: warnings.warn("you have to 'pip install autopep8' to enable this feature!") raise e self.assert_is_dir_and_exists() for p in self.select_by_ext(".py"): with open(p.abspath, "rb") as f: code = f.read().decode("utf-8") formatted_code = autopep8.fix_code(code, **kwargs) with open(p.abspath, "wb") as f: f.write(formatted_code.encode("utf-8"))
[docs] @contextlib.contextmanager def temp_cwd(self): """ Temporarily set the current working directory and automatically switch back when it's done. :type self: Path :rtype: Path """ cwd = os.getcwd() os.chdir(self.abspath) try: yield self finally: os.chdir(cwd)
[docs] def atomic_write_bytes(self, data, overwrite=False): """ An atomic write action for binary data. Either fully done or nothing happen. Preventing overwriting existing file with incomplete data. Reference: - https://boltons.readthedocs.io/en/latest/fileutils.html#boltons.fileutils.atomic_save :type self: Path :type data: bytes :type overwrite: bool """ if overwrite is False: # pragma: no cover if self.exists(): raise FileExistsError("file already exists!") with atomic_save(self.abspath, text_mode=False) as f: f.write(data)
[docs] def atomic_write_text(self, data, encoding="utf-8", overwrite=False): """ An atomic write action for text. Either fully done or nothing happen. Preventing overwriting existing file with incomplete data. Reference: - https://boltons.readthedocs.io/en/latest/fileutils.html#boltons.fileutils.atomic_save :type self: Path :type data: str :type encoding: str, recommend to use "utf-8" :type overwrite: bool :return: """ if overwrite is False: # pragma: no cover if self.exists(): raise FileExistsError("file already exists!") with atomic_save(self.abspath, text_mode=False) as f: f.write(data.encode(encoding))
[docs] def atomic_open( self, mode="r", buffering=-1, encoding=None, errors=None, newline=None, overwrite=None, file_perms=None, part_file=None, overwrite_part=None, ): """ A context manager that support :type self: Path :type mode: str :param buffering: original argument for ``pathlib.Path.open()`` :param encoding: original argument for ``pathlib.Path.open()`` :param errors: original argument for ``pathlib.Path.open()`` :param newline: original argument for ``pathlib.Path.open()`` :param overwrite: original argument for ``boltons.fileutils.atomic_save()`` :param file_perms: original argument for ``boltons.fileutils.atomic_save()`` :param part_file: original argument for ``boltons.fileutils.atomic_save()`` :param overwrite_part: original argument for ``boltons.fileutils.atomic_save()`` Reference: - https://boltons.readthedocs.io/en/latest/fileutils.html#boltons.fileutils.atomic_save """ if mode in ["r", "rb", "a"]: return self.open( mode=mode, buffering=buffering, encoding=encoding, errors=errors, newline=newline, ) else: kwargs = dict( overwrite=overwrite, file_perms=file_perms, part_file=part_file, overwrite_part=overwrite_part, ) kwargs = {k: v for k, v in kwargs.items() if v is not None} if mode == "w": return atomic_save( self.abspath, text_mode=True, **kwargs, ) elif mode == "wb": return atomic_save( self.abspath, text_mode=False, **kwargs, ) else: # pragma: no cover raise ValueError("mode must be one of 'r', 'rb', 'w', 'wb', 'a'!")