diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index f9fbf83..c7fca14 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -2,14 +2,14 @@ name: tests on: [push, pull_request] jobs: test: - runs-on: ubuntu-latest + runs-on: ubuntu-20.04 strategy: matrix: - python-version: [3.5, 3.6, 3.7, 3.8] + python-version: ["3.6", "3.7", "3.8", "3.9", "3.10"] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - name: Install dependencies diff --git a/.gitignore b/.gitignore index 8757cd6..860e52b 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ dist/ build/ *.egg-info/ +*.catalog diff --git a/README.md b/README.md index 430849c..1c188ee 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ pip3 install catcli --user # index a directory in the catalog catcli index --meta='some description' log /var/log # display the content -catcli tree +catcli ls -r # navigate catcli ls log # find files/directories named '*log*' @@ -72,7 +72,7 @@ See the [examples](#examples) for an overview of the available features. * [Index archive files](#index-archive-files) * [Walk indexed files with ls](#walk-indexed-files-with-ls) * [Find files](#find-files) - * [Display entire tree](#display-entire-tree) + * [Display entire hierarchy](#display-entire-hierarchy) * [Catalog graph](#catalog-graph) * [Edit storage](#edit-storage) * [Update catalog](#update-catalog) @@ -121,7 +121,7 @@ and they are all available through the command line interface of catcli. Five different types of entry are present in a catalog: - * **top node**: this is the root of the tree + * **top node**: this is the root of the hierarchy * **storage node**: this represents an indexed storage (a DVD, an external hard drive, an USB drive, some arbitrary directory, etc). * **dir node**: this is a directory @@ -177,9 +177,9 @@ Files and directories can be found based on their names using the `find` command. See the [examples](#examples) for more. -## Display entire tree +## Display entire hierarchy -The entire catalog can be shown using the `tree` command. +The entire catalog can be shown using the `ls -r` command. Resulting files can be sorted by size using the `-S --sortsize` switch. See the [examples](#examples) for more. @@ -267,10 +267,10 @@ $ catcli index --meta='my test directory' tmptest /tmp/test Catcli creates its catalog file in the current directory as `catcli.catalog`. -Printing the entire catalog as a tree is done with the command `tree` +Printing the entire catalog as a tree is done with the command `ls -r` ``` -$ catcli tree +$ catcli ls -r top └── storage: tmptest (my test directory) (nbfiles:3, free:3.7G/3.7G, date:2019-01-26 19:59:47) ├── a [nbfiles:3, totsize:72] @@ -391,8 +391,6 @@ $ catcli ls -ar some-name/v0.3.1.zip └── catcli-0.3.1/ [archive:v0.3.1.zip] ``` -All commands handle archive files (like `tree` or `find`). - # Contribution If you are having trouble installing or using catcli, open an issue. diff --git a/catcli/__init__.py b/catcli/__init__.py index 7bb492c..3205988 100644 --- a/catcli/__init__.py +++ b/catcli/__init__.py @@ -3,12 +3,12 @@ author: deadc0de6 (https://github.com/deadc0de6) Copyright (c) 2017, deadc0de6 """ +# pylint: disable=C0415 import sys -__version__ = '0.8.7' - def main(): + """entry point""" import catcli.catcli if catcli.catcli.main(): sys.exit(0) diff --git a/catcli/catalog.py b/catcli/catalog.py index 5c7542b..b5df73d 100644 --- a/catcli/catalog.py +++ b/catcli/catalog.py @@ -11,54 +11,65 @@ from anytree.exporter import JsonExporter from anytree.importer import JsonImporter # local imports -import catcli.utils as utils +from catcli.utils import ask from catcli.logger import Logger class Catalog: + """the catalog""" - def __init__(self, path, pickle=False, debug=False, force=False): - ''' + def __init__(self, path, usepickle=False, debug=False, force=False): + """ @path: catalog path - @pickle: use pickle + @usepickle: use pickle @debug: debug mode @force: force overwrite if exists - ''' + """ self.path = path self.debug = debug self.force = force self.metanode = None - self.pickle = pickle + self.pickle = usepickle def set_metanode(self, metanode): - '''remove the metanode until tree is re-written''' + """remove the metanode until tree is re-written""" self.metanode = metanode self.metanode.parent = None + def exists(self): + """does catalog exist""" + if not self.path: + return False + if os.path.exists(self.path): + return True + return False + def restore(self): - '''restore the catalog''' + """restore the catalog""" if not self.path: return None if not os.path.exists(self.path): return None if self.pickle: return self._restore_pickle() - return self._restore_json(open(self.path, 'r').read()) + with open(self.path, 'r', encoding='UTF-8') as file: + content = file.read() + return self._restore_json(content) def save(self, node): - '''save the catalog''' + """save the catalog""" if not self.path: Logger.err('Path not defined') return False - d = os.path.dirname(self.path) - if d and not os.path.exists(d): - os.makedirs(d) + directory = os.path.dirname(self.path) + if directory and not os.path.exists(directory): + os.makedirs(directory) elif os.path.exists(self.path) and not self.force: - if not utils.ask('Update catalog \"{}\"'.format(self.path)): + if not ask(f'Update catalog \"{self.path}\"'): Logger.info('Catalog not saved') return False - if d and not os.path.exists(d): - Logger.err('Cannot write to \"{}\"'.format(d)) + if directory and not os.path.exists(directory): + Logger.err(f'Cannot write to \"{directory}\"') return False if self.metanode: self.metanode.parent = node @@ -72,29 +83,31 @@ class Catalog: Logger.debug(text) def _save_pickle(self, node): - '''pickle the catalog''' - pickle.dump(node, open(self.path, 'wb')) - self._debug('Catalog saved to pickle \"{}\"'.format(self.path)) + """pickle the catalog""" + with open(self.path, 'wb') as file: + pickle.dump(node, file) + self._debug(f'Catalog saved to pickle \"{self.path}\"') return True def _restore_pickle(self): - '''restore the pickled tree''' - root = pickle.load(open(self.path, 'rb')) - m = 'Catalog imported from pickle \"{}\"'.format(self.path) - self._debug(m) + """restore the pickled tree""" + with open(self.path, 'rb') as file: + root = pickle.load(file) + msg = f'Catalog imported from pickle \"{self.path}\"' + self._debug(msg) return root def _save_json(self, node): - '''export the catalog in json''' + """export the catalog in json""" exp = JsonExporter(indent=2, sort_keys=True) - with open(self.path, 'w') as f: - exp.write(node, f) - self._debug('Catalog saved to json \"{}\"'.format(self.path)) + with open(self.path, 'w', encoding='UTF-8') as file: + exp.write(node, file) + self._debug(f'Catalog saved to json \"{self.path}\"') return True def _restore_json(self, string): - '''restore the tree from json''' + """restore the tree from json""" imp = JsonImporter() root = imp.import_(string) - self._debug('Catalog imported from json \"{}\"'.format(self.path)) + self._debug(f'Catalog imported from json \"{self.path}\"') return root diff --git a/catcli/catcli.py b/catcli/catcli.py index 2c9aa62..0655c62 100755 --- a/catcli/catcli.py +++ b/catcli/catcli.py @@ -14,44 +14,46 @@ import datetime from docopt import docopt # local imports -from . import __version__ as VERSION +from .version import __version__ as VERSION from .logger import Logger +from .colors import Colors from .catalog import Catalog from .walker import Walker from .noder import Noder from .utils import ask, edit +from .exceptions import BadFormatException, CatcliException NAME = 'catcli' CUR = os.path.dirname(os.path.abspath(__file__)) -CATALOGPATH = '{}.catalog'.format(NAME) -GRAPHPATH = '/tmp/{}.dot'.format(NAME) +CATALOGPATH = f'{NAME}.catalog' +GRAPHPATH = f'/tmp/{NAME}.dot' SEPARATOR = '/' WILD = '*' +FORMATS = ['native', 'csv', 'csv-with-header', 'fzf-native', 'fzf-csv'] -BANNER = """ +-+-+-+-+-+-+ +BANNER = f""" +-+-+-+-+-+-+ |c|a|t|c|l|i| - +-+-+-+-+-+-+ v{}""".format(VERSION) + +-+-+-+-+-+-+ v{VERSION}""" -USAGE = """ -{0} +USAGE = f""" +{BANNER} Usage: - {1} ls [--catalog=] [--format=] [-aBCrVSs] [] - {1} find [--catalog=] [--format=] [-aBCbdVsP] [--path=] - {1} tree [--catalog=] [--format=] [-aBCVSsH] [] - {1} index [--catalog=] [--meta=...] [-aBCcfnV] - {1} update [--catalog=] [-aBCcfnV] [--lpath=] - {1} rm [--catalog=] [-BCfV] - {1} rename [--catalog=] [-BCfV] - {1} edit [--catalog=] [-BCfV] - {1} graph [--catalog=] [-BCV] [] - {1} print_supported_formats - {1} help - {1} --help - {1} --version + {NAME} ls [--catalog=] [--format=] [-aBCrVSs] [] + {NAME} find [--catalog=] [--format=] [-aBCbdVsP] [--path=] [] + {NAME} index [--catalog=] [--meta=...] [-aBCcfnV] + {NAME} update [--catalog=] [-aBCcfnV] [--lpath=] + {NAME} rm [--catalog=] [-BCfV] + {NAME} rename [--catalog=] [-BCfV] + {NAME} edit [--catalog=] [-BCfV] + {NAME} graph [--catalog=] [-BCV] [] + {NAME} print_supported_formats + {NAME} help + {NAME} --help + {NAME} --version Options: - --catalog= Path to the catalog [default: {2}]. + --catalog= Path to the catalog [default: {CATALOGPATH}]. --meta= Additional attribute to store [default: ]. -a --archive Handle archive file [default: False]. -B --no-banner Do not display the banner [default: False]. @@ -61,7 +63,6 @@ Options: -d --directory Only directory [default: False]. -F --format= Print format, see command \"print_supported_formats\" [default: native]. -f --force Do not ask when updating the catalog [default: False]. - -H --header Print header on CSV format [default: False]. -l --lpath= Path where changes are logged [default: ] -n --no-subsize Do not store size of directories [default: False]. -P --parent Ignore stored relpath [default: True]. @@ -72,22 +73,23 @@ Options: -V --verbose Be verbose [default: False]. -v --version Show version. -h --help Show this screen. -""".format(BANNER, NAME, CATALOGPATH) # nopep8 +""" # nopep8 def cmd_index(args, noder, catalog, top): + """index action""" path = args[''] name = args[''] - hash = args['--hash'] + usehash = args['--hash'] debug = args['--verbose'] subsize = not args['--no-subsize'] if not os.path.exists(path): - Logger.err('\"{}\" does not exist'.format(path)) + Logger.err(f'\"{path}\" does not exist') return if name in noder.get_storage_names(top): try: - if not ask('Overwrite storage \"{}\"'.format(name)): - Logger.err('storage named \"{}\" already exist'.format(name)) + if not ask(f'Overwrite storage \"{name}\"'): + Logger.err(f'storage named \"{name}\" already exist') return except KeyboardInterrupt: Logger.err('aborted') @@ -96,114 +98,116 @@ def cmd_index(args, noder, catalog, top): node.parent = None start = datetime.datetime.now() - walker = Walker(noder, hash=hash, debug=debug) + walker = Walker(noder, usehash=usehash, debug=debug) attr = noder.format_storage_attr(args['--meta']) - root = noder.storage_node(name, path, parent=top, attr=attr) + root = noder.new_storage_node(name, path, parent=top, attr=attr) _, cnt = walker.index(path, root, name) if subsize: - noder.rec_size(root) + noder.rec_size(root, store=True) stop = datetime.datetime.now() - Logger.info('Indexed {} file(s) in {}'.format(cnt, stop - start)) + diff = stop - start + Logger.info(f'Indexed {cnt} file(s) in {diff}') if cnt > 0: catalog.save(top) def cmd_update(args, noder, catalog, top): + """update action""" path = args[''] name = args[''] - hash = args['--hash'] + usehash = args['--hash'] logpath = args['--lpath'] debug = args['--verbose'] subsize = not args['--no-subsize'] if not os.path.exists(path): - Logger.err('\"{}\" does not exist'.format(path)) + Logger.err(f'\"{path}\" does not exist') return root = noder.get_storage_node(top, name, path=path) if not root: - Logger.err('storage named \"{}\" does not exist'.format(name)) + Logger.err(f'storage named \"{name}\" does not exist') return start = datetime.datetime.now() - walker = Walker(noder, hash=hash, debug=debug, + walker = Walker(noder, usehash=usehash, debug=debug, logpath=logpath) cnt = walker.reindex(path, root, top) if subsize: - noder.rec_size(root) + noder.rec_size(root, store=True) stop = datetime.datetime.now() - Logger.info('updated {} file(s) in {}'.format(cnt, stop - start)) + diff = stop - start + Logger.info(f'updated {cnt} file(s) in {diff}') if cnt > 0: catalog.save(top) def cmd_ls(args, noder, top): + """ls action""" path = args[''] if not path: path = SEPARATOR if not path.startswith(SEPARATOR): path = SEPARATOR + path - pre = '{}{}'.format(SEPARATOR, noder.TOPNAME) + # prepend with top node path + pre = f'{SEPARATOR}{noder.NAME_TOP}' if not path.startswith(pre): path = pre + path + # ensure ends with a separator if not path.endswith(SEPARATOR): path += SEPARATOR + # add wild card if not path.endswith(WILD): path += WILD - found = noder.walk(top, path, + + fmt = args['--format'] + if fmt.startswith('fzf'): + raise BadFormatException('fzf is not supported in ls, use find') + found = noder.list(top, path, rec=args['--recursive'], - fmt=args['--format'], + fmt=fmt, raw=args['--raw-size']) if not found: - Logger.err('\"{}\": nothing found'.format(args[''])) + path = args[''] + Logger.err(f'\"{path}\": nothing found') return found def cmd_rm(args, noder, catalog, top): + """rm action""" name = args[''] node = noder.get_storage_node(top, name) if node: node.parent = None if catalog.save(top): - Logger.info('Storage \"{}\" removed'.format(name)) + Logger.info(f'Storage \"{name}\" removed') else: - Logger.err('Storage named \"{}\" does not exist'.format(name)) + Logger.err(f'Storage named \"{name}\" does not exist') return top def cmd_find(args, noder, top): + """find action""" fromtree = args['--parent'] directory = args['--directory'] startpath = args['--path'] fmt = args['--format'] raw = args['--raw-size'] - return noder.find_name(top, args[''], script=args['--script'], - startpath=startpath, directory=directory, + script = args['--script'] + search_for = args[''] + return noder.find_name(top, search_for, script=script, + startpath=startpath, only_dir=directory, parentfromtree=fromtree, fmt=fmt, raw=raw) -def cmd_tree(args, noder, top): - path = args[''] - fmt = args['--format'] - hdr = args['--header'] - raw = args['--raw-size'] - - # find node to start with - node = top - if path: - node = noder.get_node(top, path) - - if node: - # print the tree - noder.print_tree(node, fmt=fmt, header=hdr, raw=raw) - - def cmd_graph(args, noder, top): + """graph action""" path = args[''] if not path: path = GRAPHPATH cmd = noder.to_dot(top, path) - Logger.info('create graph with \"{}\" (you need graphviz)'.format(cmd)) + Logger.info(f'create graph with \"{cmd}\" (you need graphviz)') -def cmd_rename(args, noder, catalog, top): +def cmd_rename(args, catalog, top): + """rename action""" storage = args[''] new = args[''] storages = list(x.name for x in top.children) @@ -211,14 +215,15 @@ def cmd_rename(args, noder, catalog, top): node = next(filter(lambda x: x.name == storage, top.children)) node.name = new if catalog.save(top): - m = 'Storage \"{}\" renamed to \"{}\"'.format(storage, new) - Logger.info(m) + msg = f'Storage \"{storage}\" renamed to \"{new}\"' + Logger.info(msg) else: - Logger.err('Storage named \"{}\" does not exist'.format(storage)) + Logger.err(f'Storage named \"{storage}\" does not exist') return top def cmd_edit(args, noder, catalog, top): + """edit action""" storage = args[''] storages = list(x.name for x in top.children) if storage in storages: @@ -229,18 +234,29 @@ def cmd_edit(args, noder, catalog, top): new = edit(attr) node.attr = noder.format_storage_attr(new) if catalog.save(top): - Logger.info('Storage \"{}\" edited'.format(storage)) + Logger.info(f'Storage \"{storage}\" edited') else: - Logger.err('Storage named \"{}\" does not exist'.format(storage)) + Logger.err(f'Storage named \"{storage}\" does not exist') return top def banner(): - Logger.out_err(BANNER) - Logger.out_err("") + """print banner""" + Logger.stderr_nocolor(BANNER) + Logger.stderr_nocolor("") + + +def print_supported_formats(): + """print all supported formats to stdout""" + print('"native" : native format') + print('"csv" : CSV format') + print(f' {Noder.CSV_HEADER}') + print('"fzf-native" : fzf to native (only for find)') + print('"fzf-csv" : fzf to csv (only for find)') def main(): + """entry point""" args = docopt(USAGE, version=VERSION) if args['help'] or args['--help']: @@ -248,15 +264,14 @@ def main(): return True if args['print_supported_formats']: - print('"native": native format') - print('"csv" : CSV format') - print(' {}'.format(Noder.CSV_HEADER)) + print_supported_formats() return True # check format fmt = args['--format'] - if fmt != 'native' and fmt != 'csv': - Logger.err('bad format: {}'.format(fmt)) + if fmt not in FORMATS: + Logger.err(f'bad format: {fmt}') + print_supported_formats() return False if args['--verbose']: @@ -268,13 +283,14 @@ def main(): # set colors if args['--no-color']: - Logger.no_color() + Colors.no_color() # init noder noder = Noder(debug=args['--verbose'], sortsize=args['--sortsize'], arc=args['--archive']) # init catalog - catalog = Catalog(args['--catalog'], debug=args['--verbose'], + catalog_path = args['--catalog'] + catalog = Catalog(catalog_path, debug=args['--verbose'], force=args['--force']) # init top node top = catalog.restore() @@ -286,30 +302,52 @@ def main(): catalog.set_metanode(meta) # parse command - if args['index']: - cmd_index(args, noder, catalog, top) - if args['update']: - cmd_update(args, noder, catalog, top) - elif args['find']: - cmd_find(args, noder, top) - elif args['tree']: - cmd_tree(args, noder, top) - elif args['ls']: - cmd_ls(args, noder, top) - elif args['rm']: - cmd_rm(args, noder, catalog, top) - elif args['graph']: - cmd_graph(args, noder, top) - elif args['rename']: - cmd_rename(args, noder, catalog, top) - elif args['edit']: - cmd_edit(args, noder, catalog, top) + try: + if args['index']: + cmd_index(args, noder, catalog, top) + if args['update']: + if not catalog.exists(): + Logger.err(f'no such catalog: {catalog_path}') + return False + cmd_update(args, noder, catalog, top) + elif args['find']: + if not catalog.exists(): + Logger.err(f'no such catalog: {catalog_path}') + return False + cmd_find(args, noder, top) + elif args['ls']: + if not catalog.exists(): + Logger.err(f'no such catalog: {catalog_path}') + return False + cmd_ls(args, noder, top) + elif args['rm']: + if not catalog.exists(): + Logger.err(f'no such catalog: {catalog_path}') + return False + cmd_rm(args, noder, catalog, top) + elif args['graph']: + if not catalog.exists(): + Logger.err(f'no such catalog: {catalog_path}') + return False + cmd_graph(args, noder, top) + elif args['rename']: + if not catalog.exists(): + Logger.err(f'no such catalog: {catalog_path}') + return False + cmd_rename(args, catalog, top) + elif args['edit']: + if not catalog.exists(): + Logger.err(f'no such catalog: {catalog_path}') + return False + cmd_edit(args, noder, catalog, top) + except CatcliException as exc: + Logger.stderr_nocolor('ERROR ' + str(exc)) + return False return True if __name__ == '__main__': - '''entry point''' if main(): sys.exit(0) sys.exit(1) diff --git a/catcli/colors.py b/catcli/colors.py new file mode 100644 index 0000000..e53cd6c --- /dev/null +++ b/catcli/colors.py @@ -0,0 +1,37 @@ +""" +author: deadc0de6 (https://github.com/deadc0de6) +Copyright (c) 2022, deadc0de6 + +shell colors +""" + + +class Colors: + """shell colors""" + + RED = '\033[91m' + GREEN = '\033[92m' + YELLOW = '\033[93m' + PURPLE = '\033[1;35m' + BLUE = '\033[94m' + GRAY = '\033[0;37m' + MAGENTA = '\033[95m' + RESET = '\033[0m' + EMPH = '\033[33m' + BOLD = '\033[1m' + UND = '\033[4m' + + @classmethod + def no_color(cls): + """disable colors""" + Colors.RED = '' + Colors.GREEN = '' + Colors.YELLOW = '' + Colors.PURPLE = '' + Colors.BLUE = '' + Colors.GRAY = '' + Colors.MAGENTA = '' + Colors.RESET = '' + Colors.EMPH = '' + Colors.BOLD = '' + Colors.UND = '' diff --git a/catcli/decomp.py b/catcli/decomp.py index e7285fd..13f4ff4 100644 --- a/catcli/decomp.py +++ b/catcli/decomp.py @@ -11,6 +11,7 @@ import zipfile class Decomp: + """decompressor""" def __init__(self): self.ext = { @@ -28,26 +29,28 @@ class Decomp: 'zip': self._zip} def get_formats(self): - '''return list of supported extensions''' + """return list of supported extensions""" return list(self.ext.keys()) def get_names(self, path): - '''get tree of compressed archive''' + """get tree of compressed archive""" ext = os.path.splitext(path)[1][1:].lower() - if ext in list(self.ext.keys()): + if ext in list(self.ext): return self.ext[ext](path) return None - def _tar(self, path): - '''return list of file names in tar''' + @staticmethod + def _tar(path): + """return list of file names in tar""" if not tarfile.is_tarfile(path): return None - tar = tarfile.open(path, "r") - return tar.getnames() + with tarfile.open(path, "r") as tar: + return tar.getnames() - def _zip(self, path): - '''return list of file names in zip''' + @staticmethod + def _zip(path): + """return list of file names in zip""" if not zipfile.is_zipfile(path): return None - z = zipfile.ZipFile(path) - return z.namelist() + with zipfile.ZipFile(path) as file: + return file.namelist() diff --git a/catcli/exceptions.py b/catcli/exceptions.py new file mode 100644 index 0000000..fce8f80 --- /dev/null +++ b/catcli/exceptions.py @@ -0,0 +1,14 @@ +""" +author: deadc0de6 (https://github.com/deadc0de6) +Copyright (c) 2022, deadc0de6 + +Catcli exceptions +""" + + +class CatcliException(Exception): + """generic catcli exception""" + + +class BadFormatException(CatcliException): + """use of bad format""" diff --git a/catcli/logger.py b/catcli/logger.py index 9d7ab92..586b033 100644 --- a/catcli/logger.py +++ b/catcli/logger.py @@ -7,128 +7,64 @@ Logging helper import sys +# local imports +from catcli.colors import Colors +from catcli.utils import fix_badchars -class Logger: - - RED = '\033[91m' - GREEN = '\033[92m' - YELLOW = '\033[93m' - PURPLE = '\033[1;35m' - BLUE = '\033[94m' - GRAY = '\033[0;37m' - MAGENTA = '\033[95m' - RESET = '\033[0m' - EMPH = '\033[33m' - BOLD = '\033[1m' - UND = '\033[4m' - - STORAGE = 'storage' - ARCHIVE = 'archive' - NBFILES = 'nbfiles' - - def no_color(): - Logger.RED = '' - Logger.GREEN = '' - Logger.YELLOW = '' - Logger.PURPLE = '' - Logger.BLUE = '' - Logger.GRAY = '' - Logger.MAGENTA = '' - Logger.RESET = '' - Logger.EMPH = '' - Logger.BOLD = '' - Logger.UND = '' - - def fix_badchars(line): - return line.encode('utf-8', 'ignore').decode('utf-8') - - ###################################################################### - # node specific output - ###################################################################### - def storage(pre, name, args, attr): - '''print a storage node''' - end = '' - if attr: - end = ' {}({}){}'.format(Logger.GRAY, attr, Logger.RESET) - s = '{}{}{}{}:'.format(pre, Logger.UND, Logger.STORAGE, Logger.RESET) - s += ' {}{}{}{}\n'.format(Logger.PURPLE, - Logger.fix_badchars(name), - Logger.RESET, end) - s += ' {}{}{}'.format(Logger.GRAY, args, Logger.RESET) - sys.stdout.write('{}\n'.format(s)) - - def file(pre, name, attr): - '''print a file node''' - s = '{}{}'.format(pre, Logger.fix_badchars(name)) - s += ' {}[{}]{}'.format(Logger.GRAY, attr, Logger.RESET) - sys.stdout.write('{}\n'.format(s)) - - def dir(pre, name, depth='', attr=None): - '''print a directory node''' - end = [] - if depth != '': - end.append('{}:{}'.format(Logger.NBFILES, depth)) - if attr: - end.append(' '.join(['{}:{}'.format(x, y) for x, y in attr])) - if end: - end = ' [{}]'.format(', '.join(end)) - s = '{}{}{}{}'.format(pre, Logger.BLUE, - Logger.fix_badchars(name), Logger.RESET) - s += '{}{}{}'.format(Logger.GRAY, end, Logger.RESET) - sys.stdout.write('{}\n'.format(s)) - def arc(pre, name, archive): - s = '{}{}{}{}'.format(pre, Logger.YELLOW, - Logger.fix_badchars(name), Logger.RESET) - s += ' {}[{}:{}]{}'.format(Logger.GRAY, Logger.ARCHIVE, - archive, Logger.RESET) - sys.stdout.write('{}\n'.format(s)) - - ###################################################################### - # generic output - ###################################################################### - def out(string): - '''to stdout no color''' - string = Logger.fix_badchars(string) - sys.stdout.write('{}\n'.format(string)) - - def out_err(string): - '''to stderr no color''' - string = Logger.fix_badchars(string) - sys.stderr.write('{}\n'.format(string)) - - def debug(string): - '''to stderr no color''' - string = Logger.fix_badchars(string) - sys.stderr.write('[DBG] {}\n'.format(string)) - - def info(string): - '''to stdout in color''' - string = Logger.fix_badchars(string) - s = '{}{}{}'.format(Logger.MAGENTA, string, Logger.RESET) - sys.stdout.write('{}\n'.format(s)) - - def err(string): - '''to stderr in RED''' - string = Logger.fix_badchars(string) - s = '{}{}{}'.format(Logger.RED, string, Logger.RESET) - sys.stderr.write('{}\n'.format(s)) - - def progr(string): - '''print progress''' - string = Logger.fix_badchars(string) - sys.stderr.write('{}\r'.format(string)) +class Logger: + """log to stdout/stderr""" + + @classmethod + def stdout_nocolor(cls, string): + """to stdout no color""" + string = fix_badchars(string) + sys.stdout.write(f'{string}\n') + + @classmethod + def stderr_nocolor(cls, string): + """to stderr no color""" + string = fix_badchars(string) + sys.stderr.write(f'{string}\n') + + @classmethod + def debug(cls, string): + """to stderr no color""" + cls.stderr_nocolor(f'[DBG] {string}\n') + + @classmethod + def info(cls, string): + """to stdout in color""" + string = fix_badchars(string) + out = f'{Colors.MAGENTA}{string}{Colors.RESET}' + sys.stdout.write(f'{out}\n') + + @classmethod + def err(cls, string): + """to stderr in RED""" + string = fix_badchars(string) + out = f'{Colors.RED}{string}{Colors.RESET}' + sys.stderr.write(f'{out}\n') + + @classmethod + def progr(cls, string): + """print progress""" + string = fix_badchars(string) + sys.stderr.write(f'{string}\r') sys.stderr.flush() - def bold(string): - '''make it bold''' - string = Logger.fix_badchars(string) - return '{}{}{}'.format(Logger.BOLD, string, Logger.RESET) + @classmethod + def get_bold_text(cls, string): + """make it bold""" + string = fix_badchars(string) + return f'{Colors.BOLD}{string}{Colors.RESET}' - def flog(path, string, append=True): - string = Logger.fix_badchars(string) + @classmethod + def log_to_file(cls, path, string, append=True): + """log to file""" + string = fix_badchars(string) mode = 'w' if append: mode = 'a' - with open(path, mode) as f: - f.write(string) + with open(path, mode, encoding='UTF-8') as file: + file.write(string) diff --git a/catcli/nodeprinter.py b/catcli/nodeprinter.py new file mode 100644 index 0000000..b43613b --- /dev/null +++ b/catcli/nodeprinter.py @@ -0,0 +1,61 @@ +""" +author: deadc0de6 (https://github.com/deadc0de6) +Copyright (c) 2022, deadc0de6 + +Class for printing nodes +""" + +import sys + +from catcli.colors import Colors +from catcli.utils import fix_badchars + + +class NodePrinter: + """a node printer class""" + + STORAGE = 'storage' + ARCHIVE = 'archive' + NBFILES = 'nbfiles' + + @classmethod + def print_storage_native(cls, pre, name, args, attr): + """print a storage node""" + end = '' + if attr: + end = f' {Colors.GRAY}({attr}){Colors.RESET}' + out = f'{pre}{Colors.UND}{cls.STORAGE}{Colors.RESET}:' + out += ' ' + Colors.PURPLE + fix_badchars(name) + \ + Colors.RESET + end + '\n' + out += f' {Colors.GRAY}{args}{Colors.RESET}' + sys.stdout.write(f'{out}\n') + + @classmethod + def print_file_native(cls, pre, name, attr): + """print a file node""" + nobad = fix_badchars(name) + out = f'{pre}{nobad}' + out += f' {Colors.GRAY}[{attr}]{Colors.RESET}' + sys.stdout.write(f'{out}\n') + + @classmethod + def print_dir_native(cls, pre, name, depth='', attr=None): + """print a directory node""" + end = [] + if depth != '': + end.append(f'{cls.NBFILES}:{depth}') + if attr: + end.append(' '.join([f'{x}:{y}' for x, y in attr])) + if end: + endstring = ', '.join(end) + end = f' [{endstring}]' + out = pre + Colors.BLUE + fix_badchars(name) + Colors.RESET + out += f'{Colors.GRAY}{end}{Colors.RESET}' + sys.stdout.write(f'{out}\n') + + @classmethod + def print_archive_native(cls, pre, name, archive): + """archive to stdout""" + out = pre + Colors.YELLOW + fix_badchars(name) + Colors.RESET + out += f' {Colors.GRAY}[{cls.ARCHIVE}:{archive}]{Colors.RESET}' + sys.stdout.write(f'{out}\n') diff --git a/catcli/noder.py b/catcli/noder.py index 4997b9c..7ad39bd 100644 --- a/catcli/noder.py +++ b/catcli/noder.py @@ -6,45 +6,50 @@ Class that represents a node in the catalog tree """ import os -import anytree import shutil import time +import anytree +from pyfzf.pyfzf import FzfPrompt # local imports -from . import __version__ as VERSION -import catcli.utils as utils +from catcli.utils import size_to_str, epoch_to_str, md5sum, fix_badchars from catcli.logger import Logger +from catcli.nodeprinter import NodePrinter from catcli.decomp import Decomp +from catcli.version import __version__ as VERSION +from catcli.exceptions import CatcliException -''' -There are 4 types of node: + +class Noder: + """ + handles node in the catalog tree + There are 4 types of node: * "top" node representing the top node (generic node) * "storage" node representing a storage * "dir" node representing a directory * "file" node representing a file -''' + """ + NAME_TOP = 'top' + NAME_META = 'meta' -class Noder: - - TOPNAME = 'top' - METANAME = 'meta' TYPE_TOP = 'top' TYPE_FILE = 'file' TYPE_DIR = 'dir' TYPE_ARC = 'arc' TYPE_STORAGE = 'storage' TYPE_META = 'meta' + CSV_HEADER = ('name,type,path,size,indexed_at,' 'maccess,md5,nbfiles,free_space,' 'total_space,meta') def __init__(self, debug=False, sortsize=False, arc=False): - ''' + """ @debug: debug mode @sortsize: sort nodes by size @arch: handle archive - ''' + """ self.hash = True self.debug = debug self.sortsize = sortsize @@ -52,21 +57,22 @@ class Noder: if self.arc: self.decomp = Decomp() - def get_storage_names(self, top): - '''return a list of all storage names''' + @staticmethod + def get_storage_names(top): + """return a list of all storage names""" return [x.name for x in list(top.children)] def get_storage_node(self, top, name, path=None): - ''' + """ return the storage node if any if path is submitted, it will update the media info - ''' + """ found = None - for n in top.children: - if n.type != self.TYPE_STORAGE: + for node in top.children: + if node.type != self.TYPE_STORAGE: continue - if n.name == name: - found = n + if node.name == name: + found = node break if found and path and os.path.exists(path): found.free = shutil.disk_usage(path).free @@ -74,24 +80,25 @@ class Noder: found.ts = int(time.time()) return found - def get_node(self, top, path, quiet=False): - '''get the node by internal tree path''' - r = anytree.resolver.Resolver('name') + @staticmethod + def get_node(top, path, quiet=False): + """get the node by internal tree path""" + resolv = anytree.resolver.Resolver('name') try: - p = os.path.basename(path) - return r.get(top, p) + bpath = os.path.basename(path) + return resolv.get(top, bpath) except anytree.resolver.ChildResolverError: if not quiet: - Logger.err('No node at path \"{}\"'.format(p)) + Logger.err(f'No node at path \"{bpath}\"') return None def get_node_if_changed(self, top, path, treepath): - ''' + """ return the node (if any) and if it has changed @top: top node (storage) @path: abs path to file @treepath: rel path from indexed directory - ''' + """ treepath = treepath.lstrip(os.sep) node = self.get_node(top, treepath, quiet=True) # node does not exist @@ -109,105 +116,80 @@ class Noder: # maccess changed old_maccess = node.maccess if float(maccess) != float(old_maccess): - self._debug('\tchange: maccess changed for \"{}\"'.format(path)) + self._debug(f'\tchange: maccess changed for \"{path}\"') return node, True # test hash if self.hash and node.md5: md5 = self._get_hash(path) - if md5 != node.md5: - m = '\tchange: checksum changed for \"{}\"'.format(path) - self._debug(m) + if md5 and md5 != node.md5: + msg = f'\tchange: checksum changed for \"{path}\"' + self._debug(msg) return node, True - self._debug('\tchange: no change for \"{}\"'.format(path)) + self._debug(f'\tchange: no change for \"{path}\"') return node, False - def _rec_size(self, node, store=True): - ''' + def rec_size(self, node, store=True): + """ recursively traverse tree and return size @store: store the size in the node - ''' + """ if node.type == self.TYPE_FILE: - self._debug('getting node size for \"{}\"'.format(node.name)) + self._debug(f'getting node size for \"{node.name}\"') return node.size - m = 'getting node size recursively for \"{}\"'.format(node.name) - self._debug(m) + msg = f'getting node size recursively for \"{node.name}\"' + self._debug(msg) size = 0 for i in node.children: if node.type == self.TYPE_DIR: - sz = self._rec_size(i, store=store) + size = self.rec_size(i, store=store) if store: - i.size = sz - size += sz + i.size = size + size += size if node.type == self.TYPE_STORAGE: - sz = self._rec_size(i, store=store) + size = self.rec_size(i, store=store) if store: - i.size = sz - size += sz + i.size = size + size += size else: continue if store: node.size = size return size - def rec_size(self, node): - '''recursively traverse tree and store dir size''' - return self._rec_size(node, store=True) - ############################################################### # public helpers ############################################################### - def format_storage_attr(self, attr): - '''format the storage attr for saving''' + @staticmethod + def format_storage_attr(attr): + """format the storage attr for saving""" if not attr: return '' - if type(attr) is list: + if isinstance(attr, list): return ', '.join(attr) attr = attr.rstrip() return attr def set_hashing(self, val): - '''hash files when indexing''' + """hash files when indexing""" self.hash = val ############################################################### - # node creationg + # node creation ############################################################### def new_top_node(self): - '''create a new top node''' - return anytree.AnyNode(name=self.TOPNAME, type=self.TYPE_TOP) - - def update_metanode(self, top): - '''create or update meta node information''' - meta = self._get_meta_node(top) - epoch = int(time.time()) - if not meta: - attr = {} - attr['created'] = epoch - attr['created_version'] = VERSION - meta = anytree.AnyNode(name=self.METANAME, type=self.TYPE_META, - attr=attr) - meta.attr['access'] = epoch - meta.attr['access_version'] = VERSION - return meta - - def _get_meta_node(self, top): - '''return the meta node if any''' - try: - return next(filter(lambda x: x.type == self.TYPE_META, - top.children)) - except StopIteration: - return None + """create a new top node""" + return anytree.AnyNode(name=self.NAME_TOP, type=self.TYPE_TOP) - def file_node(self, name, path, parent, storagepath): - '''create a new node representing a file''' + def new_file_node(self, name, path, parent, storagepath): + """create a new node representing a file""" if not os.path.exists(path): - Logger.err('File \"{}\" does not exist'.format(path)) + Logger.err(f'File \"{path}\" does not exist') return None path = os.path.abspath(path) try: - st = os.lstat(path) - except OSError as e: - Logger.err('OSError: {}'.format(e)) + stat = os.lstat(path) + except OSError as exc: + Logger.err(f'OSError: {exc}') return None md5 = None if self.hash: @@ -215,42 +197,92 @@ class Noder: relpath = os.sep.join([storagepath, name]) maccess = os.path.getmtime(path) - n = self._node(name, self.TYPE_FILE, relpath, parent, - size=st.st_size, md5=md5, maccess=maccess) + node = self._new_generic_node(name, self.TYPE_FILE, relpath, parent, + size=stat.st_size, md5=md5, + maccess=maccess) if self.arc: ext = os.path.splitext(path)[1][1:] if ext.lower() in self.decomp.get_formats(): - self._debug('{} is an archive'.format(path)) + self._debug(f'{path} is an archive') names = self.decomp.get_names(path) - self.list_to_tree(n, names) + self.list_to_tree(node, names) else: - self._debug('{} is NOT an archive'.format(path)) - return n + self._debug(f'{path} is NOT an archive') + return node - def dir_node(self, name, path, parent, storagepath): - '''create a new node representing a directory''' + def new_dir_node(self, name, path, parent, storagepath): + """create a new node representing a directory""" path = os.path.abspath(path) relpath = os.sep.join([storagepath, name]) maccess = os.path.getmtime(path) - return self._node(name, self.TYPE_DIR, relpath, - parent, maccess=maccess) + return self._new_generic_node(name, self.TYPE_DIR, relpath, + parent, maccess=maccess) + + def new_storage_node(self, name, path, parent, attr=None): + """create a new node representing a storage""" + path = os.path.abspath(path) + free = shutil.disk_usage(path).free + total = shutil.disk_usage(path).total + epoch = int(time.time()) + return anytree.AnyNode(name=name, type=self.TYPE_STORAGE, free=free, + total=total, parent=parent, attr=attr, ts=epoch) + + def new_archive_node(self, name, path, parent, archive): + """create a new node for archive data""" + return anytree.AnyNode(name=name, type=self.TYPE_ARC, relpath=path, + parent=parent, size=0, md5=None, + archive=archive) + + @staticmethod + def _new_generic_node(name, nodetype, relpath, parent, + size=None, md5=None, maccess=None): + """generic node creation""" + return anytree.AnyNode(name=name, type=nodetype, relpath=relpath, + parent=parent, size=size, + md5=md5, maccess=maccess) + + ############################################################### + # node management + ############################################################### + def update_metanode(self, top): + """create or update meta node information""" + meta = self._get_meta_node(top) + epoch = int(time.time()) + if not meta: + attr = {} + attr['created'] = epoch + attr['created_version'] = VERSION + meta = anytree.AnyNode(name=self.NAME_META, type=self.TYPE_META, + attr=attr) + meta.attr['access'] = epoch + meta.attr['access_version'] = VERSION + return meta + + def _get_meta_node(self, top): + """return the meta node if any""" + try: + return next(filter(lambda x: x.type == self.TYPE_META, + top.children)) + except StopIteration: + return None def clean_not_flagged(self, top): - '''remove any node not flagged and clean flags''' + """remove any node not flagged and clean flags""" cnt = 0 for node in anytree.PreOrderIter(top): - if node.type != self.TYPE_FILE and node.type != self.TYPE_DIR: + if node.type not in [self.TYPE_FILE, self.TYPE_DIR]: continue if self._clean(node): cnt += 1 return cnt - def flag(self, node): - '''flag a node''' + @staticmethod + def flag(node): + """flag a node""" node.flag = True def _clean(self, node): - '''remove node if not flagged''' + """remove node if not flagged""" if not self._has_attr(node, 'flag') or \ not node.flag: node.parent = None @@ -258,42 +290,20 @@ class Noder: del node.flag return False - def storage_node(self, name, path, parent, attr=None): - '''create a new node representing a storage''' - path = os.path.abspath(path) - free = shutil.disk_usage(path).free - total = shutil.disk_usage(path).total - epoch = int(time.time()) - return anytree.AnyNode(name=name, type=self.TYPE_STORAGE, free=free, - total=total, parent=parent, attr=attr, ts=epoch) - - def archive_node(self, name, path, parent, archive): - '''crete a new node for archive data''' - return anytree.AnyNode(name=name, type=self.TYPE_ARC, relpath=path, - parent=parent, size=0, md5=None, - archive=archive) - - def _node(self, name, type, relpath, parent, - size=None, md5=None, maccess=None): - '''generic node creation''' - return anytree.AnyNode(name=name, type=type, relpath=relpath, - parent=parent, size=size, - md5=md5, maccess=maccess) - ############################################################### # printing ############################################################### def _node_to_csv(self, node, sep=',', raw=False): - ''' + """ print a node to csv @node: the node to consider @sep: CSV separator character @raw: print raw size rather than human readable - ''' + """ if not node: - return '' + return if node.type == self.TYPE_TOP: - return '' + return out = [] if node.type == self.TYPE_STORAGE: @@ -301,16 +311,16 @@ class Noder: out.append(node.name) # name out.append(node.type) # type out.append('') # fake full path - sz = self._rec_size(node, store=False) - out.append(utils.size_to_str(sz, raw=raw)) # size - out.append(utils.epoch_to_str(node.ts)) # indexed_at + size = self.rec_size(node, store=False) + out.append(size_to_str(size, raw=raw)) # size + out.append(epoch_to_str(node.ts)) # indexed_at out.append('') # fake maccess out.append('') # fake md5 out.append(str(len(node.children))) # nbfiles # fake free_space - out.append(utils.size_to_str(node.free, raw=raw)) + out.append(size_to_str(node.free, raw=raw)) # fake total_space - out.append(utils.size_to_str(node.total, raw=raw)) + out.append(size_to_str(node.total, raw=raw)) out.append(node.attr) # meta else: # handle other nodes @@ -321,10 +331,10 @@ class Noder: fullpath = os.path.join(storage.name, parents) out.append(fullpath.replace('"', '""')) # full path - out.append(utils.size_to_str(node.size, raw=raw)) # size - out.append(utils.epoch_to_str(storage.ts)) # indexed_at + out.append(size_to_str(node.size, raw=raw)) # size + out.append(epoch_to_str(storage.ts)) # indexed_at if self._has_attr(node, 'maccess'): - out.append(utils.epoch_to_str(node.maccess)) # maccess + out.append(epoch_to_str(node.maccess)) # maccess else: out.append('') # fake maccess if node.md5: @@ -341,12 +351,12 @@ class Noder: line = sep.join(['"' + o + '"' for o in out]) if len(line) > 0: - Logger.out(line) + Logger.stdout_nocolor(line) - def _print_node(self, node, pre='', withpath=False, - withdepth=False, withstorage=False, - recalcparent=False, raw=False): - ''' + def _print_node_native(self, node, pre='', withpath=False, + withdepth=False, withstorage=False, + recalcparent=False, raw=False): + """ print a node @node: the node to print @pre: string to print before node @@ -355,10 +365,10 @@ class Noder: @withstorage: print the node storage it belongs to @recalcparent: get relpath from tree instead of relpath field @raw: print raw size rather than human readable - ''' + """ if node.type == self.TYPE_TOP: # top node - Logger.out('{}{}'.format(pre, node.name)) + Logger.stdout_nocolor(f'{pre}{node.name}') elif node.type == self.TYPE_FILE: # node of type file name = node.name @@ -372,12 +382,13 @@ class Noder: storage = self._get_storage(node) attr = '' if node.md5: - attr = ', md5:{}'.format(node.md5) - sz = utils.size_to_str(node.size, raw=raw) - compl = 'size:{}{}'.format(sz, attr) + attr = f', md5:{node.md5}' + size = size_to_str(node.size, raw=raw) + compl = f'size:{size}{attr}' if withstorage: - compl += ', storage:{}'.format(Logger.bold(storage.name)) - Logger.file(pre, name, compl) + content = Logger.get_bold_text(storage.name) + compl += f', storage:{content}' + NodePrinter.print_file_native(pre, name, compl) elif node.type == self.TYPE_DIR: # node of type directory name = node.name @@ -394,154 +405,243 @@ class Noder: storage = self._get_storage(node) attr = [] if node.size: - attr.append(['totsize', utils.size_to_str(node.size, raw=raw)]) + attr.append(['totsize', size_to_str(node.size, raw=raw)]) if withstorage: - attr.append(['storage', Logger.bold(storage.name)]) - Logger.dir(pre, name, depth=depth, attr=attr) + attr.append(['storage', Logger.get_bold_text(storage.name)]) + NodePrinter.print_dir_native(pre, name, depth=depth, attr=attr) elif node.type == self.TYPE_STORAGE: # node of type storage - hf = utils.size_to_str(node.free, raw=raw) - ht = utils.size_to_str(node.total, raw=raw) + sztotal = size_to_str(node.total, raw=raw) + szused = size_to_str(node.total - node.free, raw=raw) nbchildren = len(node.children) - freepercent = '{:.1f}%'.format( - node.free * 100 / node.total - ) + pcent = node.free * 100 / node.total + freepercent = f'{pcent:.1f}%' # get the date - dt = '' + timestamp = '' if self._has_attr(node, 'ts'): - dt = 'date:' - dt += '{}'.format(utils.epoch_to_str(node.ts)) - ds = '' + timestamp = 'date:' + timestamp += epoch_to_str(node.ts) + disksize = '' # the children size - sz = self._rec_size(node, store=False) - sz = utils.size_to_str(sz, raw=raw) - ds = 'totsize:' + '{}'.format(sz) + size = self.rec_size(node, store=False) + size = size_to_str(size, raw=raw) + disksize = 'totsize:' + f'{size}' # format the output - name = '{}'.format(node.name) + name = node.name args = [ - 'nbfiles:' + '{}'.format(nbchildren), - ds, - 'free:{}'.format(freepercent), - 'du:' + '{}/{}'.format(hf, ht), - dt] - Logger.storage(pre, - name, - '{}'.format(' | '.join(args)), - node.attr) + 'nbfiles:' + f'{nbchildren}', + disksize, + f'free:{freepercent}', + 'du:' + f'{szused}/{sztotal}', + timestamp] + argsstring = ' | '.join(args) + NodePrinter.print_storage_native(pre, + name, + argsstring, + node.attr) elif node.type == self.TYPE_ARC: # archive node if self.arc: - Logger.arc(pre, node.name, node.archive) + NodePrinter.print_archive_native(pre, node.name, node.archive) else: - Logger.err('bad node encountered: {}'.format(node)) + Logger.err(f'bad node encountered: {node}') - def print_tree(self, node, style=anytree.ContRoundStyle(), - fmt='native', header=False, raw=False): - ''' - print the tree similar to unix tool "tree" + def print_tree(self, node, + fmt='native', + raw=False): + """ + print the tree in different format @node: start node @style: when fmt=native, defines the tree style @fmt: output format - @header: when fmt=csv, print the header @raw: print the raw size rather than human readable - ''' + """ if fmt == 'native': + # "tree" style rend = anytree.RenderTree(node, childiter=self._sort_tree) - for pre, fill, node in rend: - self._print_node(node, pre=pre, withdepth=True, raw=raw) + for pre, _, thenode in rend: + self._print_node_native(thenode, pre=pre, + withdepth=True, raw=raw) elif fmt == 'csv': - self._to_csv(node, with_header=header, raw=raw) - - def _to_csv(self, node, with_header=False, raw=False): - '''print the tree to csv''' + # csv output + self._to_csv(node, raw=raw) + elif fmt == 'csv-with-header': + # csv output + Logger.stdout_nocolor(self.CSV_HEADER) + self._to_csv(node, raw=raw) + + def _to_csv(self, node, raw=False): + """print the tree to csv""" rend = anytree.RenderTree(node, childiter=self._sort_tree) - if with_header: - Logger.out(self.CSV_HEADER) - for _, _, node in rend: - self._node_to_csv(node, raw=raw) + for _, _, item in rend: + self._node_to_csv(item, raw=raw) + + @staticmethod + def _fzf_prompt(strings): + # prompt with fzf + fzf = FzfPrompt() + selected = fzf.prompt(strings) + return selected + + def _to_fzf(self, node, fmt): + """ + fzf prompt with list and print selected node(s) + @node: node to start with + @fmt: output format for selected nodes + """ + rendered = anytree.RenderTree(node, childiter=self._sort_tree) + nodes = {} + # construct node names list + for _, _, rend in rendered: + if not rend: + continue + parents = self._get_parents(rend) + storage = self._get_storage(rend) + fullpath = os.path.join(storage.name, parents) + nodes[fullpath] = rend + # prompt with fzf + paths = self._fzf_prompt(nodes.keys()) + # print the resulting tree + subfmt = fmt.replace('fzf-', '') + for path in paths: + if not path: + continue + if path not in nodes: + continue + rend = nodes[path] + self.print_tree(rend, fmt=subfmt) - def to_dot(self, node, path='tree.dot'): - '''export to dot for graphing''' + @staticmethod + def to_dot(node, path='tree.dot'): + """export to dot for graphing""" anytree.exporter.DotExporter(node).to_dotfile(path) - Logger.info('dot file created under \"{}\"'.format(path)) - return 'dot {} -T png -o /tmp/tree.png'.format(path) + Logger.info(f'dot file created under \"{path}\"') + return f'dot {path} -T png -o /tmp/tree.png' ############################################################### # searching ############################################################### - def find_name(self, root, key, - script=False, directory=False, + def find_name(self, top, key, + script=False, only_dir=False, startpath=None, parentfromtree=False, fmt='native', raw=False): - ''' + """ find files based on their names + @top: top node + @key: term to search for @script: output script @directory: only search for directories @startpath: node to start with @parentfromtree: get path from parent instead of stored relpath @fmt: output format - ''' - self._debug('searching for \"{}\"'.format(key)) - start = root - if startpath: - start = self.get_node(root, startpath) - self.term = key - found = anytree.findall(start, filter_=self._find_name) - paths = [] - for f in found: - if f.type == self.TYPE_STORAGE: - # ignore storage nodes - continue - if directory and f.type != self.TYPE_DIR: - # ignore non directory - continue - - # print the node - if fmt == 'native': - self._print_node(f, withpath=True, - withdepth=True, - withstorage=True, - recalcparent=parentfromtree, - raw=raw) - elif fmt == 'csv': - self._node_to_csv(f, raw=raw) + @raw: raw size output + returns the found nodes + """ + self._debug(f'searching for \"{key}\"') + # search for nodes based on path + start = top + if startpath: + start = self.get_node(top, startpath) + filterfunc = self._callback_find_name(key, only_dir) + found = anytree.findall(start, filter_=filterfunc) + nbfound = len(found) + self._debug(f'found {nbfound} node(s)') + + # compile found nodes + paths = {} + for item in found: + item = self._sanitize(item) if parentfromtree: - paths.append(self._get_parents(f)) + paths[self._get_parents(item)] = item else: - paths.append(f.relpath) - + paths[item.relpath] = item + + # handle fzf mode + if fmt.startswith('fzf'): + selected = self._fzf_prompt(paths.keys()) + newpaths = {} + subfmt = fmt.replace('fzf-', '') + for item in selected: + if item not in paths: + continue + newpaths[item] = paths[item] + self.print_tree(newpaths[item], fmt=subfmt) + paths = newpaths + else: + if fmt == 'native': + for _, item in paths.items(): + self._print_node_native(item, withpath=True, + withdepth=True, + withstorage=True, + recalcparent=parentfromtree, + raw=raw) + elif fmt.startswith('csv'): + if fmt == 'csv-with-header': + Logger.stdout_nocolor(self.CSV_HEADER) + for _, item in paths.items(): + self._node_to_csv(item, raw=raw) + + # execute script if any if script: tmp = ['${source}/' + x for x in paths] - cmd = 'op=file; source=/media/mnt; $op {}'.format(' '.join(tmp)) + tmpstr = ' '.join(tmp) + cmd = f'op=file; source=/media/mnt; $op {tmpstr}' Logger.info(cmd) - return found + return list(paths.values()) - def _find_name(self, node): - '''callback for finding files''' - if self.term.lower() in node.name.lower(): - return True - return False + def _callback_find_name(self, term, only_dir): + """callback for finding files""" + def find_name(node): + if node.type == self.TYPE_STORAGE: + # ignore storage nodes + return False + if node.type == self.TYPE_TOP: + # ignore top nodes + return False + if node.type == self.TYPE_META: + # ignore meta nodes + return False + if only_dir and node.type != self.TYPE_DIR: + # ignore non directory + return False + + # filter + if not term: + return True + if term.lower() in node.name.lower(): + return True + + # ignore + return False + return find_name ############################################################### - # climbing + # ls ############################################################### - def walk(self, root, path, rec=False, fmt='native', raw=False): - ''' - walk the tree for ls based on names - @root: start node + def list(self, top, path, + rec=False, + fmt='native', + raw=False): + """ + list nodes for "ls" + @top: top node + @path: path to search for @rec: recursive walk @fmt: output format - ''' - self._debug('walking path: \"{}\"'.format(path)) + @raw: print raw size + """ + self._debug(f'walking path: \"{path}\" from {top}') - r = anytree.resolver.Resolver('name') + resolv = anytree.resolver.Resolver('name') found = [] try: - found = r.glob(root, path) + # resolve the path in the tree + found = resolv.glob(top, path) if len(found) < 1: # nothing found + self._debug('nothing found') return [] if rec: @@ -554,20 +654,28 @@ class Noder: # print the parent if fmt == 'native': - self._print_node(found[0].parent, - withpath=False, withdepth=True, raw=raw) - elif fmt == 'csv': + self._print_node_native(found[0].parent, + withpath=False, + withdepth=True, + raw=raw) + elif fmt.startswith('csv'): self._node_to_csv(found[0].parent, raw=raw) + elif fmt.startswith('fzf'): + pass # print all found nodes - for f in found: + if fmt == 'csv-with-header': + Logger.stdout_nocolor(self.CSV_HEADER) + for item in found: if fmt == 'native': - self._print_node(f, withpath=False, - pre='- ', - withdepth=True, - raw=raw) - elif fmt == 'csv': - self._node_to_csv(f, raw=raw) + self._print_node_native(item, withpath=False, + pre='- ', + withdepth=True, + raw=raw) + elif fmt.startswith('csv'): + self._node_to_csv(item, raw=raw) + elif fmt.startswith('fzf'): + self._to_fzf(item, fmt) except anytree.resolver.ChildResolverError: pass @@ -577,78 +685,96 @@ class Noder: # tree creation ############################################################### def _add_entry(self, name, top, resolv): - '''add an entry to the tree''' + """add an entry to the tree""" entries = name.rstrip(os.sep).split(os.sep) if len(entries) == 1: - self.archive_node(name, name, top, top.name) + self.new_archive_node(name, name, top, top.name) return sub = os.sep.join(entries[:-1]) - f = entries[-1] + nodename = entries[-1] try: parent = resolv.get(top, sub) - parent = self.archive_node(f, name, parent, top.name) + parent = self.new_archive_node(nodename, name, parent, top.name) except anytree.resolver.ChildResolverError: - self.archive_node(f, name, top, top.name) + self.new_archive_node(nodename, name, top, top.name) def list_to_tree(self, parent, names): - '''convert list of files to a tree''' + """convert list of files to a tree""" if not names: return - r = anytree.resolver.Resolver('name') + resolv = anytree.resolver.Resolver('name') for name in names: name = name.rstrip(os.sep) - self._add_entry(name, parent, r) + self._add_entry(name, parent, resolv) ############################################################### # diverse ############################################################### def _sort_tree(self, items): - '''sorting a list of items''' + """sorting a list of items""" return sorted(items, key=self._sort, reverse=self.sortsize) - def _sort(self, x): - '''sort a list''' + def _sort(self, lst): + """sort a list""" if self.sortsize: - return self._sort_size(x) - return self._sort_fs(x) + return self._sort_size(lst) + return self._sort_fs(lst) - def _sort_fs(self, n): - '''sorting nodes dir first and alpha''' - return (n.type, n.name.lstrip('\.').lower()) + @staticmethod + def _sort_fs(node): + """sorting nodes dir first and alpha""" + return (node.type, node.name.lstrip('.').lower()) - def _sort_size(self, n): - '''sorting nodes by size''' + @staticmethod + def _sort_size(node): + """sorting nodes by size""" try: - if not n.size: + if not node.size: return 0 - return n.size + return node.size except AttributeError: return 0 def _get_storage(self, node): - '''recursively traverse up to find storage''' + """recursively traverse up to find storage""" if node.type == self.TYPE_STORAGE: return node return node.ancestors[1] - def _has_attr(self, node, attr): + @staticmethod + def _has_attr(node, attr): + """return True if node has attr as attribute""" return attr in node.__dict__.keys() def _get_parents(self, node): - '''get all parents recursively''' + """get all parents recursively""" if node.type == self.TYPE_STORAGE: return '' + if node.type == self.TYPE_TOP: + return '' parent = self._get_parents(node.parent) if parent: return os.sep.join([parent, node.name]) return node.name - def _get_hash(self, path): + @staticmethod + def _get_hash(path): """return md5 hash of node""" - return utils.md5sum(path) + try: + return md5sum(path) + except CatcliException as exc: + Logger.err(str(exc)) + return None + + @staticmethod + def _sanitize(node): + """sanitize node strings""" + node.name = fix_badchars(node.name) + node.relpath = fix_badchars(node.relpath) + return node def _debug(self, string): - '''print debug''' + """print debug""" if not self.debug: return Logger.debug(string) diff --git a/catcli/utils.py b/catcli/utils.py index fce54e8..6bc197d 100644 --- a/catcli/utils.py +++ b/catcli/utils.py @@ -12,67 +12,75 @@ import subprocess import datetime # local imports -from catcli.logger import Logger +from catcli.exceptions import CatcliException def md5sum(path): - '''calculate md5 sum of a file''' - p = os.path.realpath(path) - if not os.path.exists(p): - Logger.err('\nmd5sum - file does not exist: {}'.format(p)) - return None + """ + calculate md5 sum of a file + may raise exception + """ + rpath = os.path.realpath(path) + if not os.path.exists(rpath): + raise CatcliException(f'md5sum - file does not exist: {rpath}') try: - with open(p, mode='rb') as f: - d = hashlib.md5() + with open(rpath, mode='rb') as file: + hashv = hashlib.md5() while True: - buf = f.read(4096) + buf = file.read(4096) if not buf: break - d.update(buf) - return d.hexdigest() + hashv.update(buf) + return hashv.hexdigest() except PermissionError: pass - except OSError as e: - Logger.err('md5sum error: {}'.format(e)) + except OSError as exc: + raise CatcliException(f'md5sum error: {exc}') from exc return None def size_to_str(size, raw=True): - '''convert size to string, optionally human readable''' + """convert size to string, optionally human readable""" div = 1024. suf = ['B', 'K', 'M', 'G', 'T', 'P'] if raw or size < div: - return '{}'.format(size) + return f'{size}' for i in suf: if size < div: - return '{:.1f}{}'.format(size, i) + return f'{size:.1f}{i}' size = size / div - return '{:.1f}{}'.format(size, suf[-1]) + sufix = suf[-1] + return f'{size:.1f}{sufix}' def epoch_to_str(epoch): - '''convert epoch to string''' + """convert epoch to string""" if not epoch: return '' fmt = '%Y-%m-%d %H:%M:%S' - t = datetime.datetime.fromtimestamp(float(epoch)) - return t.strftime(fmt) + timestamp = datetime.datetime.fromtimestamp(float(epoch)) + return timestamp.strftime(fmt) def ask(question): - '''ask the user what to do''' - resp = input('{} [y|N] ? '.format(question)) + """ask the user what to do""" + resp = input(f'{question} [y|N] ? ') return resp.lower() == 'y' def edit(string): - '''edit the information with the default EDITOR''' + """edit the information with the default EDITOR""" string = string.encode('utf-8') - EDITOR = os.environ.get('EDITOR', 'vim') - with tempfile.NamedTemporaryFile(prefix='catcli', suffix='.tmp') as f: - f.write(string) - f.flush() - subprocess.call([EDITOR, f.name]) - f.seek(0) - new = f.read() + editor = os.environ.get('EDITOR', 'vim') + with tempfile.NamedTemporaryFile(prefix='catcli', suffix='.tmp') as file: + file.write(string) + file.flush() + subprocess.call([editor, file.name]) + file.seek(0) + new = file.read() return new.decode('utf-8') + + +def fix_badchars(string): + """fix none utf-8 chars in string""" + return string.encode('utf-8', 'ignore').decode('utf-8') diff --git a/catcli/version.py b/catcli/version.py new file mode 100644 index 0000000..1418b8f --- /dev/null +++ b/catcli/version.py @@ -0,0 +1,6 @@ +""" +author: deadc0de6 (https://github.com/deadc0de6) +Copyright (c) 2022, deadc0de6 +""" + +__version__ = '0.8.7' diff --git a/catcli/walker.py b/catcli/walker.py index 1dcd65c..4740faf 100644 --- a/catcli/walker.py +++ b/catcli/walker.py @@ -12,61 +12,62 @@ from catcli.logger import Logger class Walker: + """a filesystem walker""" - MAXLINE = 80 - 15 + MAXLINELEN = 80 - 15 - def __init__(self, noder, hash=True, debug=False, + def __init__(self, noder, usehash=True, debug=False, logpath=None): - ''' + """ @noder: the noder to use @hash: calculate hash of nodes @debug: debug mode @logpath: path where to log catalog changes on reindex - ''' + """ self.noder = noder - self.hash = hash - self.noder.set_hashing(self.hash) + self.usehash = usehash + self.noder.set_hashing(self.usehash) self.debug = debug self.lpath = logpath def index(self, path, parent, name, storagepath=''): - ''' + """ index a directory and store in tree @path: path to index @parent: parent node @name: this stoarge name - ''' - self._debug('indexing starting at {}'.format(path)) + """ + self._debug(f'indexing starting at {path}') if not parent: - parent = self.noder.dir_node(name, path, parent) + parent = self.noder.new_dir_node(name, path, parent) if os.path.islink(path): rel = os.readlink(path) - ab = os.path.join(path, rel) - if os.path.isdir(ab): + abspath = os.path.join(path, rel) + if os.path.isdir(abspath): return parent, 0 cnt = 0 for (root, dirs, files) in os.walk(path): - for f in files: - self._debug('found file {} under {}'.format(f, path)) - sub = os.path.join(root, f) + for file in files: + self._debug(f'found file {file} under {path}') + sub = os.path.join(root, file) if not os.path.exists(sub): continue - self._progress(f) - self._debug('index file {}'.format(sub)) - n = self.noder.file_node(os.path.basename(f), sub, - parent, storagepath) - if n: + self._progress(file) + self._debug(f'index file {sub}') + node = self.noder.new_file_node(os.path.basename(file), sub, + parent, storagepath) + if node: cnt += 1 - for d in dirs: - self._debug('found dir {} under {}'.format(d, path)) - base = os.path.basename(d) - sub = os.path.join(root, d) - self._debug('index directory {}'.format(sub)) + for adir in dirs: + self._debug(f'found dir {adir} under {path}') + base = os.path.basename(adir) + sub = os.path.join(root, adir) + self._debug(f'index directory {sub}') if not os.path.exists(sub): continue - dummy = self.noder.dir_node(base, sub, parent, storagepath) + dummy = self.noder.new_dir_node(base, sub, parent, storagepath) if not dummy: continue cnt += 1 @@ -80,47 +81,48 @@ class Walker: return parent, cnt def reindex(self, path, parent, top): - '''reindex a directory and store in tree''' + """reindex a directory and store in tree""" cnt = self._reindex(path, parent, top) cnt += self.noder.clean_not_flagged(parent) return cnt def _reindex(self, path, parent, top, storagepath=''): - ''' + """ reindex a directory and store in tree @path: directory path to re-index @top: top node (storage) @storagepath: rel path relative to indexed directory - ''' - self._debug('reindexing starting at {}'.format(path)) + """ + self._debug(f'reindexing starting at {path}') cnt = 0 for (root, dirs, files) in os.walk(path): - for f in files: - self._debug('found file \"{}\" under {}'.format(f, path)) - sub = os.path.join(root, f) - treepath = os.path.join(storagepath, f) - reindex, n = self._need_reindex(parent, sub, treepath) + for file in files: + self._debug(f'found file \"{file}\" under {path}') + sub = os.path.join(root, file) + treepath = os.path.join(storagepath, file) + reindex, node = self._need_reindex(parent, sub, treepath) if not reindex: - self._debug('\tskip file {}'.format(sub)) - self.noder.flag(n) + self._debug(f'\tskip file {sub}') + self.noder.flag(node) continue - self._log2file('update catalog for \"{}\"'.format(sub)) - n = self.noder.file_node(os.path.basename(f), sub, - parent, storagepath) - self.noder.flag(n) + self._log2file(f'update catalog for \"{sub}\"') + node = self.noder.new_file_node(os.path.basename(file), sub, + parent, storagepath) + self.noder.flag(node) cnt += 1 - for d in dirs: - self._debug('found dir \"{}\" under {}'.format(d, path)) - base = os.path.basename(d) - sub = os.path.join(root, d) - treepath = os.path.join(storagepath, d) + for adir in dirs: + self._debug(f'found dir \"{adir}\" under {path}') + base = os.path.basename(adir) + sub = os.path.join(root, adir) + treepath = os.path.join(storagepath, adir) reindex, dummy = self._need_reindex(parent, sub, treepath) if reindex: - self._log2file('update catalog for \"{}\"'.format(sub)) - dummy = self.noder.dir_node(base, sub, parent, storagepath) + self._log2file(f'update catalog for \"{sub}\"') + dummy = self.noder.new_dir_node(base, sub, + parent, storagepath) cnt += 1 self.noder.flag(dummy) - self._debug('reindexing deeper under {}'.format(sub)) + self._debug(f'reindexing deeper under {sub}') nstoragepath = os.sep.join([storagepath, base]) if not storagepath: nstoragepath = base @@ -130,48 +132,48 @@ class Walker: return cnt def _need_reindex(self, top, path, treepath): - ''' + """ test if node needs re-indexing @top: top node (storage) @path: abs path to file @treepath: rel path from indexed directory - ''' + """ cnode, changed = self.noder.get_node_if_changed(top, path, treepath) if not cnode: - self._debug('\t{} does not exist'.format(path)) + self._debug(f'\t{path} does not exist') return True, cnode if cnode and not changed: # ignore this node - self._debug('\t{} has not changed'.format(path)) + self._debug(f'\t{path} has not changed') return False, cnode if cnode and changed: # remove this node and re-add - self._debug('\t{} has changed'.format(path)) - self._debug('\tremoving node {} for {}'.format(cnode.name, path)) + self._debug(f'\t{path} has changed') + self._debug(f'\tremoving node {cnode.name} for {path}') cnode.parent = None return True, cnode def _debug(self, string): - '''print to debug''' + """print to debug""" if not self.debug: return Logger.debug(string) def _progress(self, string): - '''print progress''' + """print progress""" if self.debug: return if not string: # clean - Logger.progr('{:80}'.format(' ')) + Logger.progr(' ' * 80) return - if len(string) > self.MAXLINE: - string = string[:self.MAXLINE] + '...' - Logger.progr('indexing: {:80}'.format(string)) + if len(string) > self.MAXLINELEN: + string = string[:self.MAXLINELEN] + '...' + Logger.progr(f'indexing: {string:80}') def _log2file(self, string): - '''log to file''' + """log to file""" if not self.lpath: return - line = '{}\n'.format(string) - Logger.flog(self.lpath, line, append=True) + line = f'{string}\n' + Logger.log_to_file(self.lpath, line, append=True) diff --git a/requirements.txt b/requirements.txt index 6b7fb29..936d33b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ docopt; python_version >= '3.0' anytree; python_version >= '3.0' +pyfzf; python_version >= '3.0' diff --git a/setup.py b/setup.py index 4903469..e3e5cd9 100644 --- a/setup.py +++ b/setup.py @@ -31,10 +31,11 @@ setup( python_requires=REQUIRES_PYTHON, classifiers=[ 'Development Status :: 5 - Production/Stable', - 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', 'License :: OSI Approved :: GNU General Public License v3 (GPLv3)', ], diff --git a/tests-requirements.txt b/tests-requirements.txt index 82bfba8..8e826f4 100644 --- a/tests-requirements.txt +++ b/tests-requirements.txt @@ -1,6 +1,6 @@ pycodestyle; python_version >= '3.0' pyflakes; python_version >= '3.0' -#nose-py3; python_version >= '3.0' -nose; python_version >= '3.0' +nose2; python_version >= '3.0' coverage; python_version >= '3.0' coveralls; python_version >= '3.0' +pylint; python_version > '3.0' diff --git a/tests.sh b/tests.sh index 4513102..040de1a 100755 --- a/tests.sh +++ b/tests.sh @@ -7,16 +7,38 @@ cur=$(dirname "$(readlink -f "${0}")") # stop on first error set -ev +pycodestyle --version pycodestyle --ignore=W605 catcli/ pycodestyle tests/ +pyflakes --version pyflakes catcli/ pyflakes tests/ -nosebin="nosetests" - -PYTHONPATH=catcli ${nosebin} -s --with-coverage --cover-package=catcli -#PYTHONPATH=catcli ${nosebin} -s +# R0914: Too many local variables +# R0913: Too many arguments +# R0912: Too many branches +# R0915: Too many statements +# R0911: Too many return statements +# R0903: Too few public methods +pylint --version +pylint \ + --disable=R0914 \ + --disable=R0913 \ + --disable=R0912 \ + --disable=R0915 \ + --disable=R0911 \ + --disable=R0903 \ + catcli/ +pylint \ + --disable=W0212 \ + --disable=R0914 \ + --disable=R0915 \ + --disable=R0801 \ + tests/ + +nosebin="nose2" +PYTHONPATH=catcli ${nosebin} --with-coverage --coverage=catcli for t in ${cur}/tests-ng/*; do echo "running test \"`basename ${t}`\"" diff --git a/tests/helpers.py b/tests/helpers.py index 2d4c6d6..a6da349 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -21,32 +21,32 @@ TMPSUFFIX = '.catcli' def get_rnd_string(length): - '''Get a random string of specific length ''' + """Get a random string of specific length """ alpha = string.ascii_uppercase + string.digits return ''.join(random.choice(alpha) for _ in range(length)) def md5sum(path): - '''calculate md5 sum of a file''' - p = os.path.realpath(path) - if not os.path.exists(p): + """calculate md5 sum of a file""" + rpath = os.path.realpath(path) + if not os.path.exists(rpath): return None try: - with open(p, mode='rb') as f: - d = hashlib.md5() + with open(rpath, mode='rb') as file: + val = hashlib.md5() while True: - buf = f.read(4096) + buf = file.read(4096) if not buf: break - d.update(buf) - return d.hexdigest() + val.update(buf) + return val.hexdigest() except PermissionError: pass return None def clean(path): - '''Delete file or folder.''' + """Delete file or folder.""" if not os.path.exists(path): return if os.path.islink(path): @@ -58,10 +58,12 @@ def clean(path): def edit_file(path, newcontent): + """edit file content""" return write_to_file(path, newcontent) def unix_tree(path): + """print using unix tree tool""" if not os.path.exists(path): return # cmd = ['tree', path] @@ -75,7 +77,7 @@ def unix_tree(path): def create_tree(): - ''' create a random tree of files and directories ''' + """ create a random tree of files and directories """ dirpath = get_tempdir() # create 3 files create_rnd_file(dirpath, get_rnd_string(5)) @@ -83,13 +85,13 @@ def create_tree(): create_rnd_file(dirpath, get_rnd_string(5)) # create 2 directories - d1 = create_dir(dirpath, get_rnd_string(3)) - d2 = create_dir(dirpath, get_rnd_string(3)) + dir1 = create_dir(dirpath, get_rnd_string(3)) + dir2 = create_dir(dirpath, get_rnd_string(3)) # fill directories - create_rnd_file(d1, get_rnd_string(4)) - create_rnd_file(d1, get_rnd_string(4)) - create_rnd_file(d2, get_rnd_string(6)) + create_rnd_file(dir1, get_rnd_string(4)) + create_rnd_file(dir1, get_rnd_string(4)) + create_rnd_file(dir2, get_rnd_string(6)) return dirpath @@ -99,12 +101,12 @@ def create_tree(): def get_tempdir(): - '''Get a temporary directory ''' + """Get a temporary directory """ return tempfile.mkdtemp(suffix=TMPSUFFIX) def create_dir(path, dirname): - '''Create a directory ''' + """Create a directory """ fpath = os.path.join(path, dirname) if not os.path.exists(fpath): os.mkdir(fpath) @@ -112,7 +114,7 @@ def create_dir(path, dirname): def create_rnd_file(path, filename, content=None): - '''Create the file filename in path with random content if None ''' + """Create the file filename in path with random content if None """ if not content: content = get_rnd_string(100) fpath = os.path.join(path, filename) @@ -120,23 +122,25 @@ def create_rnd_file(path, filename, content=None): def write_to_file(path, content): - with open(path, 'w') as f: - f.write(content) + """write content to file""" + with open(path, 'w', encoding='utf-8') as file: + file.write(content) return path def read_from_file(path): + """read file content""" if not os.path.exists(path): return '' - with open(path, 'r') as f: - content = f.read() + with open(path, 'r', encoding='utf-8') as file: + content = file.read() return content ############################################################ # fake tree in json ############################################################ -FAKECATALOG = ''' +FAKECATALOG = """ { "children": [ { @@ -214,9 +218,9 @@ FAKECATALOG = ''' "name": "top", "type": "top" } -''' +""" def get_fakecatalog(): - # catalog constructed through test_index + """catalog constructed through test_index""" return FAKECATALOG diff --git a/tests/test_find.py b/tests/test_find.py index 15d4ff6..f55ac8a 100644 --- a/tests/test_find.py +++ b/tests/test_find.py @@ -14,8 +14,10 @@ from tests.helpers import get_fakecatalog class TestFind(unittest.TestCase): + """test find""" def test_find(self): + """test find""" # init catalog = Catalog('fake', force=True, debug=False) top = catalog._restore_json(get_fakecatalog()) @@ -38,6 +40,7 @@ class TestFind(unittest.TestCase): def main(): + """entry point""" unittest.main() diff --git a/tests/test_graph.py b/tests/test_graph.py index 4d68dbc..1ccbb6c 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -16,8 +16,10 @@ from tests.helpers import clean, get_fakecatalog class TestGraph(unittest.TestCase): + """test graph""" def test_graph(self): + """test graph""" # init path = 'fake' gpath = tempfile.gettempdir() + os.sep + 'graph.dot' @@ -38,6 +40,7 @@ class TestGraph(unittest.TestCase): def main(): + """entry point""" unittest.main() diff --git a/tests/test_index.py b/tests/test_index.py index 966acc0..088f65a 100644 --- a/tests/test_index.py +++ b/tests/test_index.py @@ -16,8 +16,10 @@ from tests.helpers import get_tempdir, create_rnd_file, clean, \ class TestIndexing(unittest.TestCase): + """test index""" def test_index(self): + """test index""" # init workingdir = get_tempdir() catalogpath = create_rnd_file(workingdir, 'catalog.json', content='') @@ -27,18 +29,18 @@ class TestIndexing(unittest.TestCase): self.addCleanup(clean, dirpath) # create 3 files - f1 = create_rnd_file(dirpath, get_rnd_string(5)) - f2 = create_rnd_file(dirpath, get_rnd_string(5)) - f3 = create_rnd_file(dirpath, get_rnd_string(5)) + file1 = create_rnd_file(dirpath, get_rnd_string(5)) + file2 = create_rnd_file(dirpath, get_rnd_string(5)) + file3 = create_rnd_file(dirpath, get_rnd_string(5)) # create 2 directories - d1 = create_dir(dirpath, get_rnd_string(3)) - d2 = create_dir(dirpath, get_rnd_string(3)) + dir1 = create_dir(dirpath, get_rnd_string(3)) + dir2 = create_dir(dirpath, get_rnd_string(3)) # fill directories with files - _ = create_rnd_file(d1, get_rnd_string(4)) - _ = create_rnd_file(d1, get_rnd_string(4)) - _ = create_rnd_file(d2, get_rnd_string(6)) + _ = create_rnd_file(dir1, get_rnd_string(4)) + _ = create_rnd_file(dir1, get_rnd_string(4)) + _ = create_rnd_file(dir2, get_rnd_string(6)) noder = Noder() top = noder.new_top_node() @@ -61,20 +63,21 @@ class TestIndexing(unittest.TestCase): # ensures files and directories are in names = [x.name for x in storage.children] - self.assertTrue(os.path.basename(f1) in names) - self.assertTrue(os.path.basename(f2) in names) - self.assertTrue(os.path.basename(f3) in names) - self.assertTrue(os.path.basename(d1) in names) - self.assertTrue(os.path.basename(d2) in names) + self.assertTrue(os.path.basename(file1) in names) + self.assertTrue(os.path.basename(file2) in names) + self.assertTrue(os.path.basename(file3) in names) + self.assertTrue(os.path.basename(dir1) in names) + self.assertTrue(os.path.basename(dir2) in names) for node in storage.children: - if node.name == os.path.basename(d1): + if node.name == os.path.basename(dir1): self.assertTrue(len(node.children) == 2) - elif node.name == os.path.basename(d2): + elif node.name == os.path.basename(dir2): self.assertTrue(len(node.children) == 1) def main(): + """entry point""" unittest.main() diff --git a/tests/test_ls.py b/tests/test_ls.py index a6c0736..df5f3f9 100644 --- a/tests/test_ls.py +++ b/tests/test_ls.py @@ -14,8 +14,10 @@ from tests.helpers import get_fakecatalog, clean class TestWalking(unittest.TestCase): + """test ls""" def test_ls(self): + """test ls""" # init path = 'fake' self.addCleanup(clean, path) @@ -56,6 +58,7 @@ class TestWalking(unittest.TestCase): def main(): + """entry point""" unittest.main() diff --git a/tests/test_rm.py b/tests/test_rm.py index 0a20dc1..e9e1b6f 100644 --- a/tests/test_rm.py +++ b/tests/test_rm.py @@ -14,8 +14,10 @@ from tests.helpers import clean, get_fakecatalog class TestRm(unittest.TestCase): + """test rm""" def test_rm(self): + """test rm""" # init path = 'fake' self.addCleanup(clean, path) @@ -48,6 +50,7 @@ class TestRm(unittest.TestCase): def main(): + """entry point""" unittest.main() diff --git a/tests/test_tree.py b/tests/test_tree.py index 61fbb2f..7522c1c 100644 --- a/tests/test_tree.py +++ b/tests/test_tree.py @@ -7,15 +7,17 @@ Basic unittest for tree import unittest -from catcli.catcli import cmd_tree +from catcli.catcli import cmd_ls from catcli.noder import Noder from catcli.catalog import Catalog from tests.helpers import clean, get_fakecatalog class TestTree(unittest.TestCase): + """Test the tree""" def test_tree(self): + """test the tree""" # init path = 'fake' self.addCleanup(clean, path) @@ -30,13 +32,15 @@ class TestTree(unittest.TestCase): '--format': 'native', '--header': False, '--raw-size': False, + '--recursive': True, } # print tree and wait for any errors - cmd_tree(args, noder, top) + cmd_ls(args, noder, top) def main(): + """entry point""" unittest.main() diff --git a/tests/test_update.py b/tests/test_update.py index 5d4d4de..fc478e7 100644 --- a/tests/test_update.py +++ b/tests/test_update.py @@ -7,18 +7,20 @@ Basic unittest for updating an index import unittest import os +import anytree from catcli.catcli import cmd_index, cmd_update from catcli.noder import Noder from catcli.catalog import Catalog from tests.helpers import create_dir, create_rnd_file, get_tempdir, \ clean, unix_tree, edit_file, read_from_file, md5sum -import anytree -class TestIndexing(unittest.TestCase): +class TestUpdate(unittest.TestCase): + """test update""" - def test_index(self): + def test_update(self): + """test update""" # init workingdir = get_tempdir() catalogpath = create_rnd_file(workingdir, 'catalog.json', content='') @@ -28,20 +30,20 @@ class TestIndexing(unittest.TestCase): self.addCleanup(clean, dirpath) # create 3 files - f1 = create_rnd_file(dirpath, 'file1') - f2 = create_rnd_file(dirpath, 'file2') - f3 = create_rnd_file(dirpath, 'file3') - f4 = create_rnd_file(dirpath, 'file4') + file1 = create_rnd_file(dirpath, 'file1') + file2 = create_rnd_file(dirpath, 'file2') + file3 = create_rnd_file(dirpath, 'file3') + file4 = create_rnd_file(dirpath, 'file4') # create 2 directories - d1 = create_dir(dirpath, 'dir1') - d2 = create_dir(dirpath, 'dir2') + dir1 = create_dir(dirpath, 'dir1') + dir2 = create_dir(dirpath, 'dir2') # fill directories with files - d1f1 = create_rnd_file(d1, 'dir1file1') - d1f2 = create_rnd_file(d1, 'dir1file2') - d2f1 = create_rnd_file(d2, 'dir2file1') - d2f2 = create_rnd_file(d2, 'dir2file2') + d1f1 = create_rnd_file(dir1, 'dir1file1') + d1f2 = create_rnd_file(dir1, 'dir1file2') + d2f1 = create_rnd_file(dir2, 'dir2file1') + d2f2 = create_rnd_file(dir2, 'dir2file2') noder = Noder(debug=True) noder.set_hashing(True) @@ -49,7 +51,7 @@ class TestIndexing(unittest.TestCase): catalog = Catalog(catalogpath, force=True, debug=False) # get checksums - f4_md5 = md5sum(f4) + f4_md5 = md5sum(file4) self.assertTrue(f4_md5) d1f1_md5 = md5sum(d1f1) self.assertTrue(d1f1_md5) @@ -69,7 +71,7 @@ class TestIndexing(unittest.TestCase): self.assertTrue(os.stat(catalogpath).st_size != 0) # ensure md5 sum are in - nods = noder.find_name(top, os.path.basename(f4)) + nods = noder.find_name(top, os.path.basename(file4)) self.assertTrue(len(nods) == 1) nod = nods[0] self.assertTrue(nod) @@ -79,34 +81,34 @@ class TestIndexing(unittest.TestCase): noder.print_tree(top) # add some files and directories - new1 = create_rnd_file(d1, 'newf1') + new1 = create_rnd_file(dir1, 'newf1') new2 = create_rnd_file(dirpath, 'newf2') new3 = create_dir(dirpath, 'newd3') - new4 = create_dir(d2, 'newd4') + new4 = create_dir(dir2, 'newd4') new5 = create_rnd_file(new4, 'newf5') unix_tree(dirpath) # modify files - EDIT = 'edited' - edit_file(d1f1, EDIT) + editval = 'edited' + edit_file(d1f1, editval) d1f1_md5_new = md5sum(d1f1) self.assertTrue(d1f1_md5_new) self.assertTrue(d1f1_md5_new != d1f1_md5) # change file without mtime - maccess = os.path.getmtime(f4) - EDIT = 'edited' - edit_file(f4, EDIT) + maccess = os.path.getmtime(file4) + editval = 'edited' + edit_file(file4, editval) # reset edit time - os.utime(f4, (maccess, maccess)) + os.utime(file4, (maccess, maccess)) f4_md5_new = md5sum(d1f1) self.assertTrue(f4_md5_new) self.assertTrue(f4_md5_new != f4_md5) # change file without mtime maccess = os.path.getmtime(d2f2) - EDIT = 'edited' - edit_file(d2f2, EDIT) + editval = 'edited' + edit_file(d2f2, editval) # reset edit time os.utime(d2f2, (maccess, maccess)) d2f2_md5_new = md5sum(d2f2) @@ -134,7 +136,7 @@ class TestIndexing(unittest.TestCase): self.assertTrue(nod.md5 == d1f1_md5_new) # ensure f4 md5 sum has changed in catalog - nods = noder.find_name(top, os.path.basename(f4)) + nods = noder.find_name(top, os.path.basename(file4)) self.assertTrue(len(nods) == 1) nod = nods[0] self.assertTrue(nod) @@ -152,14 +154,14 @@ class TestIndexing(unittest.TestCase): # ensures files and directories are in names = [node.name for node in anytree.PreOrderIter(storage)] print(names) - self.assertTrue(os.path.basename(f1) in names) - self.assertTrue(os.path.basename(f2) in names) - self.assertTrue(os.path.basename(f3) in names) - self.assertTrue(os.path.basename(f4) in names) - self.assertTrue(os.path.basename(d1) in names) + self.assertTrue(os.path.basename(file1) in names) + self.assertTrue(os.path.basename(file2) in names) + self.assertTrue(os.path.basename(file3) in names) + self.assertTrue(os.path.basename(file4) in names) + self.assertTrue(os.path.basename(dir1) in names) self.assertTrue(os.path.basename(d1f1) in names) self.assertTrue(os.path.basename(d1f2) in names) - self.assertTrue(os.path.basename(d2) in names) + self.assertTrue(os.path.basename(dir2) in names) self.assertTrue(os.path.basename(d2f1) in names) self.assertTrue(os.path.basename(new1) in names) self.assertTrue(os.path.basename(new2) in names) @@ -168,19 +170,19 @@ class TestIndexing(unittest.TestCase): self.assertTrue(os.path.basename(new5) in names) for node in storage.children: - if node.name == os.path.basename(d1): + if node.name == os.path.basename(dir1): self.assertTrue(len(node.children) == 3) - elif node.name == os.path.basename(d2): + elif node.name == os.path.basename(dir2): self.assertTrue(len(node.children) == 3) elif node.name == os.path.basename(new3): self.assertTrue(len(node.children) == 0) elif node.name == os.path.basename(new4): self.assertTrue(len(node.children) == 1) - self.assertTrue(read_from_file(d1f1) == EDIT) + self.assertTrue(read_from_file(d1f1) == editval) # remove some files clean(d1f1) - clean(d2) + clean(dir2) clean(new2) clean(new4) @@ -190,14 +192,14 @@ class TestIndexing(unittest.TestCase): # ensures files and directories are (not) in names = [node.name for node in anytree.PreOrderIter(storage)] print(names) - self.assertTrue(os.path.basename(f1) in names) - self.assertTrue(os.path.basename(f2) in names) - self.assertTrue(os.path.basename(f3) in names) - self.assertTrue(os.path.basename(f4) in names) - self.assertTrue(os.path.basename(d1) in names) + self.assertTrue(os.path.basename(file1) in names) + self.assertTrue(os.path.basename(file2) in names) + self.assertTrue(os.path.basename(file3) in names) + self.assertTrue(os.path.basename(file4) in names) + self.assertTrue(os.path.basename(dir1) in names) self.assertTrue(os.path.basename(d1f1) not in names) self.assertTrue(os.path.basename(d1f2) in names) - self.assertTrue(os.path.basename(d2) not in names) + self.assertTrue(os.path.basename(dir2) not in names) self.assertTrue(os.path.basename(d2f1) not in names) self.assertTrue(os.path.basename(d2f1) not in names) self.assertTrue(os.path.basename(new1) in names) @@ -206,13 +208,14 @@ class TestIndexing(unittest.TestCase): self.assertTrue(os.path.basename(new4) not in names) self.assertTrue(os.path.basename(new5) not in names) for node in storage.children: - if node.name == os.path.basename(d1): + if node.name == os.path.basename(dir1): self.assertTrue(len(node.children) == 2) elif node.name == os.path.basename(new3): self.assertTrue(len(node.children) == 0) def main(): + """entry point""" unittest.main()