From 0dcbfa94bd3bb8320cad287febbdc97b30d59298 Mon Sep 17 00:00:00 2001 From: deadc0de6 Date: Sun, 7 Jan 2024 23:26:44 +0100 Subject: [PATCH] refactoring --- catcli/catalog.py | 15 +-- catcli/catcli.py | 20 +-- catcli/nodeprinter.py | 73 ---------- catcli/noder.py | 285 ++++++++++----------------------------- catcli/nodes.py | 107 ++++++++++++++- catcli/printer_csv.py | 84 ++++++++++++ catcli/printer_native.py | 151 +++++++++++++++++++++ catcli/utils.py | 25 +++- tests/test_rm.py | 2 +- tests/test_update.py | 2 +- 10 files changed, 451 insertions(+), 313 deletions(-) delete mode 100644 catcli/nodeprinter.py create mode 100644 catcli/printer_csv.py create mode 100644 catcli/printer_native.py diff --git a/catcli/catalog.py b/catcli/catalog.py index 67afe28..d075399 100644 --- a/catcli/catalog.py +++ b/catcli/catalog.py @@ -96,14 +96,13 @@ class Catalog: def _restore_json(self, string: str) -> Optional[NodeTop]: """restore the tree from json""" imp = JsonImporter(dictimporter=_DictImporter(debug=self.debug)) - self._debug('import from string...') root = imp.import_(string) self._debug(f'Catalog imported from json \"{self.path}\"') self._debug(f'root imported: {root}') if root.type != nodes.TYPE_TOP: return None top = NodeTop(root.name, children=root.children) - self._debug(f'top imported: {top}') + self._debug(f'top imported: {top.name}') return top @@ -126,7 +125,7 @@ class _DictImporter(): assert "parent" not in data attrs = dict(data) # replace attr - attrs = back_attriter(attrs, debug=self.debug) + attrs = back_attriter(attrs) children: Union[str, Any] = attrs.pop("children", []) node = self.nodecls(parent=parent, **attrs) for child in children: @@ -134,16 +133,14 @@ class _DictImporter(): return node -def back_attriter(adict: Dict[str, str], - debug: bool = False) -> Dict[str, str]: +def back_attriter(adict: Dict[str, str]) -> Dict[str, str]: """replace attribute on json restore""" attrs = {} for k, val in adict.items(): + newk = k if k == 'size': - if debug: - Logger.debug(f'changing {k}={val}') - k = 'nodesize' - attrs[k] = val + newk = 'nodesize' + attrs[newk] = val return attrs diff --git a/catcli/catcli.py b/catcli/catcli.py index 0fb2726..593bc71 100755 --- a/catcli/catcli.py +++ b/catcli/catcli.py @@ -18,6 +18,7 @@ from docopt import docopt from catcli.version import __version__ as VERSION from catcli.nodes import NodeTop, NodeAny from catcli.logger import Logger +from catcli.printer_csv import CsvPrinter from catcli.colors import Colors from catcli.catalog import Catalog from catcli.walker import Walker @@ -116,7 +117,7 @@ def cmd_index(args: Dict[str, Any], except KeyboardInterrupt: Logger.err('aborted') return - node = noder.get_storage_node(top, name) + node = top.get_storage_node() node.parent = None start = datetime.datetime.now() @@ -125,7 +126,7 @@ def cmd_index(args: Dict[str, Any], root = noder.new_storage_node(name, path, top, attr) _, cnt = walker.index(path, root, name) if subsize: - noder.rec_size(root, store=True) + root.nodesize = root.get_rec_size() stop = datetime.datetime.now() diff = stop - start Logger.info(f'Indexed {cnt} file(s) in {diff}') @@ -147,16 +148,17 @@ def cmd_update(args: Dict[str, Any], if not os.path.exists(path): Logger.err(f'\"{path}\" does not exist') return - root = noder.get_storage_node(top, name, newpath=path) - if not root: + storage = noder.find_storage_node_by_name(top, name) + if not storage: Logger.err(f'storage named \"{name}\" does not exist') return + noder.update_storage_path(top, name, path) start = datetime.datetime.now() walker = Walker(noder, usehash=usehash, debug=debug, logpath=logpath) - cnt = walker.reindex(path, root, top) + cnt = walker.reindex(path, storage, top) if subsize: - noder.rec_size(root, store=True) + storage.nodesize = storage.get_rec_size() stop = datetime.datetime.now() diff = stop - start Logger.info(f'updated {cnt} file(s) in {diff}') @@ -189,7 +191,7 @@ def cmd_rm(args: Dict[str, Any], top: NodeTop) -> NodeTop: """rm action""" name = args[''] - node = noder.get_storage_node(top, name) + node = noder.find_storage_node_by_name(top, name) if node: node.parent = None if catalog.save(top): @@ -278,7 +280,7 @@ def print_supported_formats() -> None: """print all supported formats to stdout""" print('"native" : native format') print('"csv" : CSV format') - print(f' {Noder.CSV_HEADER}') + print(f' {CsvPrinter.CSV_HEADER}') print('"fzf-native" : fzf to native (only valid for find)') print('"fzf-csv" : fzf to csv (only valid for find)') @@ -303,7 +305,7 @@ def main() -> bool: return False if args['--verbose']: - print(args) + print(f'args: {args}') # print banner if not args['--no-banner']: diff --git a/catcli/nodeprinter.py b/catcli/nodeprinter.py deleted file mode 100644 index e20b5d5..0000000 --- a/catcli/nodeprinter.py +++ /dev/null @@ -1,73 +0,0 @@ -""" -author: deadc0de6 (https://github.com/deadc0de6) -Copyright (c) 2022, deadc0de6 - -Class for printing nodes -""" - -import sys -from typing import TypeVar, Type, Optional, Tuple, List, \ - Dict, Any - -from catcli.colors import Colors -from catcli.utils import fix_badchars - - -CLASSTYPE = TypeVar('CLASSTYPE', bound='NodePrinter') - - -class NodePrinter: - """a node printer class""" - - STORAGE = 'storage' - ARCHIVE = 'archive' - NBFILES = 'nbfiles' - - @classmethod - def print_storage_native(cls: Type[CLASSTYPE], pre: str, - name: str, args: str, - attr: Dict[str, Any]) -> None: - """print a storage node""" - end = '' - if attr: - end = f' {Colors.GRAY}({attr}){Colors.RESET}' - out = f'{pre}{Colors.UND}{cls.STORAGE}{Colors.RESET}:' - out += ' ' + Colors.PURPLE + fix_badchars(name) + \ - Colors.RESET + end + '\n' - out += f' {Colors.GRAY}{args}{Colors.RESET}' - sys.stdout.write(f'{out}\n') - - @classmethod - def print_file_native(cls: Type[CLASSTYPE], pre: str, - name: str, attr: str) -> None: - """print a file node""" - nobad = fix_badchars(name) - out = f'{pre}{nobad}' - out += f' {Colors.GRAY}[{attr}]{Colors.RESET}' - sys.stdout.write(f'{out}\n') - - @classmethod - def print_dir_native(cls: Type[CLASSTYPE], pre: str, - name: str, - nbchildren: int = 0, - attr: Optional[List[Tuple[str, str]]] = None) -> None: - """print a directory node""" - end = [] - if nbchildren > 0: - end.append(f'{cls.NBFILES}:{nbchildren}') - if attr: - end.append(' '.join([f'{x}:{y}' for x, y in attr])) - end_string = '' - if end: - end_string = f' [{", ".join(end)}]' - out = pre + Colors.BLUE + fix_badchars(name) + Colors.RESET - out += f'{Colors.GRAY}{end_string}{Colors.RESET}' - sys.stdout.write(f'{out}\n') - - @classmethod - def print_archive_native(cls: Type[CLASSTYPE], pre: str, - name: str, archive: str) -> None: - """archive to stdout""" - out = pre + Colors.YELLOW + fix_badchars(name) + Colors.RESET - out += f' {Colors.GRAY}[{cls.ARCHIVE}:{archive}]{Colors.RESET}' - sys.stdout.write(f'{out}\n') diff --git a/catcli/noder.py b/catcli/noder.py index 61457f0..3e4f8f7 100644 --- a/catcli/noder.py +++ b/catcli/noder.py @@ -10,16 +10,17 @@ import shutil import time from typing import List, Union, Tuple, Any, Optional, Dict, cast import anytree # type: ignore -from natsort import os_sort_keygen +from natsort import os_sort_keygen # type: ignore # local imports from catcli import nodes from catcli.nodes import NodeAny, NodeStorage, \ NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta, \ typcast_node -from catcli.utils import size_to_str, epoch_to_str, md5sum, fix_badchars +from catcli.utils import md5sum, fix_badchars, has_attr from catcli.logger import Logger -from catcli.nodeprinter import NodePrinter +from catcli.printer_native import NativePrinter +from catcli.printer_csv import CsvPrinter from catcli.decomp import Decomp from catcli.version import __version__ as VERSION from catcli.exceptions import CatcliException @@ -35,9 +36,7 @@ class Noder: * "file" node representing a file """ - CSV_HEADER = ('name,type,path,size,indexed_at,' - 'maccess,md5,nbfiles,free_space,' - 'total_space,meta') + PRE = ' ' def __init__(self, debug: bool = False, sortsize: bool = False, @@ -53,31 +52,33 @@ class Noder: self.arc = arc if self.arc: self.decomp = Decomp() + self.csv_printer = CsvPrinter() + self.native_printer = NativePrinter() @staticmethod def get_storage_names(top: NodeTop) -> List[str]: """return a list of all storage names""" return [x.name for x in list(top.children)] - def get_storage_node(self, top: NodeTop, - name: str, - newpath: str = '') -> NodeStorage: - """ - return the storage node if any - if newpath is submitted, it will update the media info - """ - found = None + def find_storage_node_by_name(self, top: NodeTop, + name: str) -> Optional[NodeStorage]: + """find a storage node by name""" for node in top.children: if node.type != nodes.TYPE_STORAGE: continue if node.name == name: - found = node - break - if found and newpath and os.path.exists(newpath): - found.free = shutil.disk_usage(newpath).free - found.total = shutil.disk_usage(newpath).total - found.ts = int(time.time()) - return cast(NodeStorage, found) + return cast(NodeStorage, node) + return None + + def update_storage_path(self, top: NodeTop, + name: str, + newpath: str) -> None: + """find and update storage path on update""" + storage = self.find_storage_node_by_name(top, name) + if storage and newpath and os.path.exists(newpath): + storage.free = shutil.disk_usage(newpath).free + storage.total = shutil.disk_usage(newpath).total + storage.ts = int(time.time()) @staticmethod def get_node(top: NodeTop, @@ -115,7 +116,7 @@ class Noder: return node, False # force re-indexing if no maccess maccess = os.path.getmtime(path) - if not self._has_attr(node, 'maccess') or \ + if not has_attr(node, 'maccess') or \ not node.maccess: self._debug('\tchange: no maccess found') return node, True @@ -134,39 +135,6 @@ class Noder: self._debug(f'\tchange: no change for \"{path}\"') return node, False - def rec_size(self, node: Union[NodeDir, NodeStorage], - store: bool = True) -> int: - """ - recursively traverse tree and return size - @store: store the size in the node - """ - if node.type == nodes.TYPE_FILE: - node.__class__ = NodeFile - msg = f'size of {node.type} \"{node.name}\": {node.nodesize}' - self._debug(msg) - return node.nodesize - msg = f'getting node size recursively for \"{node.name}\"' - self._debug(msg) - fullsize: int = 0 - for i in node.children: - if node.type == nodes.TYPE_DIR: - sub_size = self.rec_size(i, store=store) - if store: - i.nodesize = sub_size - fullsize += sub_size - continue - if node.type == nodes.TYPE_STORAGE: - sub_size = self.rec_size(i, store=store) - if store: - i.nodesize = sub_size - fullsize += sub_size - continue - self._debug(f'skipping {node.name}') - if store: - node.nodesize = fullsize - self._debug(f'size of {node.type} \"{node.name}\": {fullsize}') - return fullsize - ############################################################### # public helpers ############################################################### @@ -315,9 +283,9 @@ class Noder: ############################################################### # printing ############################################################### - def _node_to_csv(self, node: NodeAny, - sep: str = ',', - raw: bool = False) -> None: + def _print_node_csv(self, node: NodeAny, + sep: str = ',', + raw: bool = False) -> None: """ print a node to csv @node: the node to consider @@ -329,53 +297,14 @@ class Noder: if node.type == nodes.TYPE_TOP: return - out = [] if node.type == nodes.TYPE_STORAGE: - # handle storage - out.append(node.name) # name - out.append(node.type) # type - out.append('') # fake full path - size = self.rec_size(node, store=False) - out.append(size_to_str(size, raw=raw)) # size - out.append(epoch_to_str(node.ts)) # indexed_at - out.append('') # fake maccess - out.append('') # fake md5 - out.append(str(len(node.children))) # nbfiles - # fake free_space - out.append(size_to_str(node.free, raw=raw)) - # fake total_space - out.append(size_to_str(node.total, raw=raw)) - out.append(node.attr) # meta + self.csv_printer.print_storage(node, + sep=sep, + raw=raw) else: - # handle other nodes - out.append(node.name.replace('"', '""')) # name - out.append(node.type) # type - parents = self._get_parents(node) - storage = self._get_storage(node) - fullpath = os.path.join(storage.name, parents) - out.append(fullpath.replace('"', '""')) # full path - - out.append(size_to_str(node.nodesize, raw=raw)) # size - out.append(epoch_to_str(storage.ts)) # indexed_at - if self._has_attr(node, 'maccess'): - out.append(epoch_to_str(node.maccess)) # maccess - else: - out.append('') # fake maccess - if self._has_attr(node, 'md5'): - out.append(node.md5) # md5 - else: - out.append('') # fake md5 - if node.type == nodes.TYPE_DIR: - out.append(str(len(node.children))) # nbfiles - else: - out.append('') # fake nbfiles - out.append('') # fake free_space - out.append('') # fake total_space - out.append('') # fake meta - - line = sep.join(['"' + o + '"' for o in out]) - if len(line) > 0: - Logger.stdout_nocolor(line) + self.csv_printer.print_node(node, + sep=sep, + raw=raw) def node_has_subs(self, node: Any) -> bool: """ @@ -415,88 +344,35 @@ class Noder: if node.type == nodes.TYPE_TOP: # top node node.__class__ = NodeTop - Logger.stdout_nocolor(f'{pre}{node.name}') + self.native_printer.print_top(pre, node.name) elif node.type == nodes.TYPE_FILE: # node of type file node.__class__ = NodeFile - name = node.name - storage = self._get_storage(node) - if withpath: - name = os.sep.join([ - storage.name, - self._get_parents(node.parent), - name]) - name = name.lstrip(os.sep) - attr_str = '' - if node.md5: - attr_str = f', md5:{node.md5}' - size = size_to_str(node.nodesize, raw=raw) - compl = f'size:{size}{attr_str}' - if withstorage: - content = Logger.get_bold_text(storage.name) - compl += f', storage:{content}' - NodePrinter.print_file_native(pre, name, compl) + self.native_printer.print_file(pre, node, + withpath=withpath, + withstorage=withstorage, + raw=raw) elif node.type == nodes.TYPE_DIR: # node of type directory node.__class__ = NodeDir - name = node.name - storage = self._get_storage(node) - if withpath: - name = os.sep.join([ - storage.name, - self._get_parents(node.parent), - name]) - name = name.lstrip(os.sep) - nbchildren = 0 - if withnbchildren: - nbchildren = len(node.children) - attr: List[Tuple[str, str]] = [] - if node.nodesize: - attr.append(('totsize', size_to_str(node.nodesize, raw=raw))) - if withstorage: - attr.append(('storage', Logger.get_bold_text(storage.name))) - NodePrinter.print_dir_native(pre, - name, - nbchildren=nbchildren, - attr=attr) + self.native_printer.print_dir(pre, + node, + withpath=withpath, + withstorage=withstorage, + withnbchildren=withnbchildren, + raw=raw) elif node.type == nodes.TYPE_STORAGE: # node of type storage node.__class__ = NodeStorage - sztotal = size_to_str(node.total, raw=raw) - szused = size_to_str(node.total - node.free, raw=raw) - nbchildren = len(node.children) - pcent = 0 - if node.total > 0: - pcent = node.free * 100 / node.total - freepercent = f'{pcent:.1f}%' - # get the date - timestamp = '' - if self._has_attr(node, 'ts'): - timestamp = 'date:' - timestamp += epoch_to_str(node.ts) - disksize = '' - # the children size - recsize = self.rec_size(node, store=False) - sizestr = size_to_str(recsize, raw=raw) - disksize = 'totsize:' + f'{sizestr}' - # format the output - name = node.name - args = [ - 'nbfiles:' + f'{nbchildren}', - disksize, - f'free:{freepercent}', - 'du:' + f'{szused}/{sztotal}', - timestamp] - argsstring = ' | '.join(args) - NodePrinter.print_storage_native(pre, - name, - argsstring, - node.attr) + + self.native_printer.print_storage(pre, + node, + raw=raw) elif node.type == nodes.TYPE_ARCHIVED: # archive node node.__class__ = NodeArchived if self.arc: - NodePrinter.print_archive_native(pre, node.name, node.archive) + self.native_printer.print_archive(pre, node.name, node.archive) else: Logger.err(f'bad node encountered: {node}') @@ -518,18 +394,18 @@ class Noder: withnbchildren=True, raw=raw) elif fmt == 'csv': # csv output - self._to_csv(node, raw=raw) + self._print_nodes_csv(node, raw=raw) elif fmt == 'csv-with-header': # csv output - Logger.stdout_nocolor(self.CSV_HEADER) - self._to_csv(node, raw=raw) + self.csv_printer.print_header() + self._print_nodes_csv(node, raw=raw) - def _to_csv(self, node: NodeAny, - raw: bool = False) -> None: + def _print_nodes_csv(self, node: NodeAny, + raw: bool = False) -> None: """print the tree to csv""" rend = anytree.RenderTree(node, childiter=self._sort_tree) for _, _, item in rend: - self._node_to_csv(item, raw=raw) + self._print_node_csv(item, raw=raw) @staticmethod def _fzf_prompt(strings: Any) -> Any: @@ -555,8 +431,8 @@ class Noder: for _, _, rend in rendered: if not rend: continue - parents = self._get_parents(rend) - storage = self._get_storage(rend) + parents = rend.get_parent_hierarchy() + storage = rend.get_storage_node() fullpath = os.path.join(storage.name, parents) the_nodes[fullpath] = rend # prompt with fzf @@ -613,9 +489,10 @@ class Noder: # compile found nodes paths = {} for item in found: + typcast_node(item) item.name = fix_badchars(item.name) - storage = self._get_storage(item) - parents = self._get_parents(item) + storage = item.get_storage_node() + parents = item.get_parent_hierarchy() parent_key = f'{storage.name}/{parents}' key = f'{parent_key}/{item.name}' paths[parent_key] = item @@ -640,9 +517,9 @@ class Noder: raw=raw) elif fmt.startswith('csv'): if fmt == 'csv-with-header': - Logger.stdout_nocolor(self.CSV_HEADER) + self.csv_printer.print_header() for _, item in paths.items(): - self._node_to_csv(item, raw=raw) + self._print_node_csv(item, raw=raw) # execute script if any if script: @@ -695,13 +572,14 @@ class Noder: @fmt: output format @raw: print raw size """ - self._debug(f'walking path: \"{path}\" from {top}') + self._debug(f'walking path: \"{path}\" from {top.name}') resolv = anytree.resolver.Resolver('name') found = [] try: if '*' in path or '?' in path: # we need to handle glob + self._debug(f'glob with top {top.name} and path {path}') found = resolv.glob(top, path) else: # we have a canonical path @@ -724,9 +602,7 @@ class Noder: return found # sort found nodes - #found = os_sorted(found) found = sorted(found, key=os_sort_keygen(self._sort)) - #found = sorted(found, key=cmp_to_key(self._sort), reverse=self.sortsize) # print the parent if fmt == 'native': @@ -735,21 +611,21 @@ class Noder: withnbchildren=True, raw=raw) elif fmt.startswith('csv'): - self._node_to_csv(found[0].parent, raw=raw) + self._print_node_csv(found[0].parent, raw=raw) elif fmt.startswith('fzf'): pass # print all found nodes if fmt == 'csv-with-header': - Logger.stdout_nocolor(self.CSV_HEADER) + self.csv_printer.print_header() for item in found: if fmt == 'native': self._print_node_native(item, withpath=False, - pre='- ', + pre=Noder.PRE, withnbchildren=True, raw=raw) elif fmt.startswith('csv'): - self._node_to_csv(item, raw=raw) + self._print_node_csv(item, raw=raw) elif fmt.startswith('fzf'): self._to_fzf(item, fmt) @@ -800,11 +676,10 @@ class Noder: return self._sort_fs(lst) @staticmethod - def _sort_fs(node: NodeAny) -> Tuple[str, str]: + def _sort_fs(node: NodeAny) -> str: """sort by name""" # to sort by types then name - # return (node.type, node.name) - return node.name + return str(node.name) @staticmethod def _sort_size(node: NodeAny) -> float: @@ -816,28 +691,6 @@ class Noder: except AttributeError: return 0 - def _get_storage(self, node: NodeAny) -> NodeStorage: - """recursively traverse up to find storage""" - if node.type == nodes.TYPE_STORAGE: - return node - return cast(NodeStorage, node.ancestors[1]) - - @staticmethod - def _has_attr(node: NodeAny, attr: str) -> bool: - """return True if node has attr as attribute""" - return attr in node.__dict__.keys() - - def _get_parents(self, node: NodeAny) -> str: - """get all parents recursively""" - if node.type == nodes.TYPE_STORAGE: - return '' - if node.type == nodes.TYPE_TOP: - return '' - parent = self._get_parents(node.parent) - if parent: - return os.sep.join([parent, node.name]) - return str(node.name) - @staticmethod def _get_hash(path: str) -> str: """return md5 hash of node""" diff --git a/catcli/nodes.py b/catcli/nodes.py index 6743958..545ba3f 100644 --- a/catcli/nodes.py +++ b/catcli/nodes.py @@ -6,9 +6,12 @@ Class that represents a node in the catalog tree """ # pylint: disable=W0622 -from typing import Dict, Any +import os +from typing import Dict, Any, cast from anytree import NodeMixin # type: ignore +from catcli.exceptions import CatcliException + TYPE_TOP = 'top' TYPE_FILE = 'file' @@ -35,6 +38,8 @@ def typcast_node(node: Any) -> None: node.__class__ = NodeStorage elif node.type == TYPE_META: node.__class__ = NodeMeta + else: + raise CatcliException(f"bad node: {node}") class NodeAny(NodeMixin): # type: ignore @@ -60,6 +65,18 @@ class NodeAny(NodeMixin): # type: ignore def __str__(self) -> str: return self._to_str() + def get_parent_hierarchy(self) -> str: + """get all parents recursively""" + raise NotImplementedError + + def get_storage_node(self) -> NodeMixin: + """recursively traverse up to find storage""" + return None + + def get_rec_size(self) -> int: + """recursively traverse tree and return size""" + raise NotImplementedError + def flagged(self) -> bool: """is flagged""" if not hasattr(self, '_flagged'): @@ -90,6 +107,14 @@ class NodeTop(NodeAny): if children: self.children = children + def get_parent_hierarchy(self) -> str: + """get all parents recursively""" + return '' + + def get_rec_size(self) -> int: + """recursively traverse tree and return size""" + return 0 + def __str__(self) -> str: return self._to_str() @@ -115,6 +140,22 @@ class NodeFile(NodeAny): if children: self.children = children + def get_parent_hierarchy(self) -> str: + """get all parents recursively""" + typcast_node(self.parent) + path = self.parent.get_parent_hierarchy() + if path: + return os.sep.join([path, self.name]) + return str(self.name) + + def get_storage_node(self) -> NodeAny: + """recursively traverse up to find storage""" + return cast(NodeStorage, self.ancestors[1]) + + def get_rec_size(self) -> int: + """recursively traverse tree and return size""" + return self.nodesize + def __str__(self) -> str: return self._to_str() @@ -138,6 +179,26 @@ class NodeDir(NodeAny): if children: self.children = children + def get_parent_hierarchy(self) -> str: + """get all parents recursively""" + typcast_node(self.parent) + path = self.parent.get_parent_hierarchy() + if path: + return os.sep.join([path, self.name]) + return str(self.name) + + def get_storage_node(self) -> NodeAny: + """recursively traverse up to find storage""" + return cast(NodeStorage, self.ancestors[1]) + + def get_rec_size(self) -> int: + """recursively traverse tree and return size""" + totsize: int = 0 + for node in self.children: + typcast_node(node) + totsize += node.get_rec_size() + return totsize + def __str__(self) -> str: return self._to_str() @@ -163,6 +224,22 @@ class NodeArchived(NodeAny): if children: self.children = children + def get_parent_hierarchy(self) -> str: + """get all parents recursively""" + typcast_node(self.parent) + path = self.parent.get_parent_hierarchy() + if path: + return os.sep.join([path, self.name]) + return str(self.name) + + def get_storage_node(self) -> NodeAny: + """recursively traverse up to find storage""" + return cast(NodeStorage, self.ancestors[1]) + + def get_rec_size(self) -> int: + """recursively traverse tree and return size""" + return self.nodesize + def __str__(self) -> str: return self._to_str() @@ -192,6 +269,22 @@ class NodeStorage(NodeAny): if children: self.children = children + def get_parent_hierarchy(self) -> str: + """get all parents recursively""" + return '' + + def get_storage_node(self) -> NodeAny: + """recursively traverse up to find storage""" + return self + + def get_rec_size(self) -> int: + """recursively traverse tree and return size""" + totsize: int = 0 + for node in self.children: + typcast_node(node) + totsize += node.get_rec_size() + return totsize + def __str__(self) -> str: return self._to_str() @@ -213,5 +306,17 @@ class NodeMeta(NodeAny): if children: self.children = children + def get_parent_hierarchy(self) -> str: + """get all parents recursively""" + typcast_node(self.parent) + path = self.parent.get_parent_hierarchy() + if path: + return os.sep.join([path, self.name]) + return str(self.name) + + def get_rec_size(self) -> int: + """recursively traverse tree and return size""" + return 0 + def __str__(self) -> str: return self._to_str() diff --git a/catcli/printer_csv.py b/catcli/printer_csv.py new file mode 100644 index 0000000..3b7a453 --- /dev/null +++ b/catcli/printer_csv.py @@ -0,0 +1,84 @@ +""" +author: deadc0de6 (https://github.com/deadc0de6) +Copyright (c) 2024, deadc0de6 + +Class for printing nodes in csv format +""" + +import sys +import os +from typing import List + +from catcli.nodes import NodeAny, NodeStorage, TYPE_DIR +from catcli.utils import size_to_str, epoch_to_str, \ + has_attr + + +class CsvPrinter: + """a node printer class""" + + DEFSEP = ',' + CSV_HEADER = ('name,type,path,size,indexed_at,' + 'maccess,md5,nbfiles,free_space,' + 'total_space,meta') + + def _print_entries(self, entries: List[str], sep: str = DEFSEP) -> None: + line = sep.join(['"' + o + '"' for o in entries]) + if len(line) > 0: + sys.stdout.write(f'{line}\n') + + def print_header(self) -> None: + """print csv header""" + sys.stdout.write(f'{self.CSV_HEADER}\n') + + def print_storage(self, node: NodeStorage, + sep: str = DEFSEP, + raw: bool = False) -> None: + """print a storage node""" + out = [] + out.append(node.name) # name + out.append(node.type) # type + out.append('') # fake full path + size = node.get_rec_size() + out.append(size_to_str(size, raw=raw)) # size + out.append(epoch_to_str(node.ts)) # indexed_at + out.append('') # fake maccess + out.append('') # fake md5 + out.append(str(len(node.children))) # nbfiles + # fake free_space + out.append(size_to_str(node.free, raw=raw)) + # fake total_space + out.append(size_to_str(node.total, raw=raw)) + out.append(node.attr) # meta + self._print_entries(out, sep=sep) + + def print_node(self, node: NodeAny, + sep: str = DEFSEP, + raw: bool = False) -> None: + """print other nodes""" + out = [] + out.append(node.name.replace('"', '""')) # name + out.append(node.type) # type + parents = node.get_parent_hierarchy() + storage = node.get_storage_node() + fullpath = os.path.join(storage.name, parents) + out.append(fullpath.replace('"', '""')) # full path + + out.append(size_to_str(node.nodesize, raw=raw)) # size + out.append(epoch_to_str(storage.ts)) # indexed_at + if has_attr(node, 'maccess'): + out.append(epoch_to_str(node.maccess)) # maccess + else: + out.append('') # fake maccess + if has_attr(node, 'md5'): + out.append(node.md5) # md5 + else: + out.append('') # fake md5 + if node.type == TYPE_DIR: + out.append(str(len(node.children))) # nbfiles + else: + out.append('') # fake nbfiles + out.append('') # fake free_space + out.append('') # fake total_space + out.append('') # fake meta + self._print_entries(out, sep=sep) diff --git a/catcli/printer_native.py b/catcli/printer_native.py new file mode 100644 index 0000000..5091eca --- /dev/null +++ b/catcli/printer_native.py @@ -0,0 +1,151 @@ +""" +author: deadc0de6 (https://github.com/deadc0de6) +Copyright (c) 2022, deadc0de6 + +Class for printing nodes in native format +""" + +import sys +import os + +from catcli.nodes import NodeFile, NodeDir, NodeStorage +from catcli.colors import Colors +from catcli.logger import Logger +from catcli.utils import fix_badchars, size_to_str, \ + has_attr, epoch_to_ls_str + + +TS_LJUST = 13 +SIZE_LJUST = 6 +NAME_LJUST = 20 + +class NativePrinter: + """a node printer class""" + + STORAGE = 'storage' + ARCHIVE = 'archive' + NBFILES = 'nbfiles' + + def print_top(self, pre: str, name: str) -> None: + """print top node""" + sys.stdout.write(f'{pre}{name}\n') + + def print_storage(self, pre: str, + node: NodeStorage, + raw: bool = False) -> None: + """print a storage node""" + # construct name + name = node.name + name = fix_badchars(name) + # construct attrs + attrs = [] + # nb files + attrs.append(f'nbfiles:{len(node.children)}') + # the children size + recsize = node.get_rec_size() + sizestr = size_to_str(recsize, raw=raw) + attrs.append(f'totsize:{sizestr}') + # free + pcent = 0.0 + if node.total > 0: + pcent = node.free * 100 / node.total + attrs.append(f'free:{pcent:.1f}%') + # du + sztotal = size_to_str(node.total, raw=raw) + szused = size_to_str(node.total - node.free, raw=raw) + attrs.append(f'du:{szused}/{sztotal}') + # timestamp + if has_attr(node, 'ts'): + attrs.append(f'date:{epoch_to_ls_str(node.ts)}') + + # print + out = f'{pre}{Colors.UND}{self.STORAGE}{Colors.RESET}: ' + out += f'{Colors.PURPLE}{name}{Colors.RESET}' + if attrs: + out += f'\n{" "*len(name)}{Colors.GRAY}{"|".join(attrs)}{Colors.RESET}' + sys.stdout.write(f'{out}\n') + + def print_file(self, pre: str, + node: NodeFile, + withpath: bool = False, + withstorage: bool = False, + raw: bool = False) -> None: + """print a file node""" + # construct name + name = node.name + storage = node.get_storage_node() + if withpath: + name = os.sep.join([ + storage.name, + node.parent.get_parent_hierarchy(), + name]) + name = fix_badchars(name) + # construct attributes + attrs = [] + if node.md5: + attrs.append(f'md5:{node.md5}') + if withstorage: + content = Logger.get_bold_text(storage.name) + attrs.append(f', storage:{content}') + # print + out = [] + out .append(f'{pre}') + line = name.ljust(NAME_LJUST, ' ') + out.append(f'{line}') + size = 0 + if node.nodesize: + size = node.nodesize + line = size_to_str(size, raw=raw).ljust(SIZE_LJUST, ' ') + out.append(f'{Colors.BLUE}{line}{Colors.RESET}') + line = epoch_to_ls_str(node.maccess).ljust(TS_LJUST, ' ') + out.append(f'{Colors.PURPLE}{line}{Colors.RESET}') + if attrs: + out.append(f'{Colors.GRAY}[{",".join(attrs)}]{Colors.RESET}') + sys.stdout.write(f'{" ".join(out)}\n') + + def print_dir(self, pre: str, + node: NodeDir, + withpath: bool = False, + withstorage: bool = False, + withnbchildren: bool = False, + raw: bool = False) -> None: + """print a directory node""" + # construct name + name = node.name + storage = node.get_storage_node() + if withpath: + name = os.sep.join([ + storage.name, + node.parent.get_parent_hierarchy(), + name]) + name = fix_badchars(name) + # construct attrs + attrs = [] + if withnbchildren: + nbchildren = len(node.children) + attrs.append(f'{self.NBFILES}:{nbchildren}') + if withstorage: + attrs.append(f'storage:{Logger.get_bold_text(storage.name)}') + # print + out = [] + out.append(f'{pre}') + line = name.ljust(NAME_LJUST, ' ') + out.append(f'{Colors.BLUE}{line}{Colors.RESET}') + size = 0 + if node.nodesize: + size = node.nodesize + line = size_to_str(size, raw=raw).ljust(SIZE_LJUST, ' ') + out.append(f'{Colors.GRAY}{line}{Colors.RESET}') + line = epoch_to_ls_str(node.maccess).ljust(TS_LJUST, ' ') + out.append(f'{Colors.GRAY}{line}{Colors.RESET}') + if attrs: + out.append(f'{Colors.GRAY}[{",".join(attrs)}]{Colors.RESET}') + sys.stdout.write(f'{" ".join(out)}\n') + + def print_archive(self, pre: str, + name: str, archive: str) -> None: + """print an archive""" + name = fix_badchars(name) + out = f'{pre}{Colors.YELLOW}{name}{Colors.RESET} ' + out += f'{Colors.GRAY}[{self.ARCHIVE}:{archive}]{Colors.RESET}' + sys.stdout.write(f'{out}\n') diff --git a/catcli/utils.py b/catcli/utils.py index 60d07d1..078fec0 100644 --- a/catcli/utils.py +++ b/catcli/utils.py @@ -17,6 +17,8 @@ from catcli.exceptions import CatcliException WILD = '*' +TS_FORMAT_6 = '%b %d %H:%M' +TS_FORMAT_MORE = '%b %d %Y' def path_to_top(path: str) -> str: @@ -38,9 +40,9 @@ def path_to_search_all(path: str) -> str: if not path.startswith(pre): # prepend with top node path path = pre + path - if not path.endswith(os.path.sep): - # ensure ends with a separator - path += os.path.sep + # if not path.endswith(os.path.sep): + # # ensure ends with a separator + # path += os.path.sep # if not path.endswith(WILD): # # add wild card # path += WILD @@ -95,6 +97,18 @@ def epoch_to_str(epoch: float) -> str: return timestamp.strftime(fmt) +def epoch_to_ls_str(epoch: float) -> str: + """convert epoch to string""" + if not epoch: + return '' + timestamp = datetime.datetime.fromtimestamp(epoch) + delta = datetime.date.today() - datetime.timedelta(days=6*365/12) + fmt = TS_FORMAT_MORE + if timestamp.date() < delta: + fmt = TS_FORMAT_6 + return timestamp.strftime(fmt) + + def ask(question: str) -> bool: """ask the user what to do""" resp = input(f'{question} [y|N] ? ') @@ -117,3 +131,8 @@ def edit(string: str) -> str: def fix_badchars(string: str) -> str: """fix none utf-8 chars in string""" return string.encode('utf-8', 'ignore').decode('utf-8') + + +def has_attr(node: nodes.NodeAny, attr: str) -> bool: + """return True if node has attr as attribute""" + return attr in node.__dict__.keys() diff --git a/tests/test_rm.py b/tests/test_rm.py index e9e1b6f..7087ebf 100644 --- a/tests/test_rm.py +++ b/tests/test_rm.py @@ -46,7 +46,7 @@ class TestRm(unittest.TestCase): top = cmd_rm(args, noder, catalog, top) # ensure there no children anymore - self.assertTrue(len(top.children) == 0) + self.assertEqual(len(top.children), 0) def main(): diff --git a/tests/test_update.py b/tests/test_update.py index 6f75610..7a5b61f 100644 --- a/tests/test_update.py +++ b/tests/test_update.py @@ -125,7 +125,7 @@ class TestUpdate(unittest.TestCase): # explore the top node to find all nodes self.assertEqual(len(top.children), 1) storage = top.children[0] - self.assertTrue(len(storage.children) == 8) + self.assertEqual(len(storage.children), 8) # ensure d1f1 md5 sum has changed in catalog nods = noder.find_name(top, os.path.basename(d1f1))