refactoring

features-42
deadc0de6 4 months ago
parent 691396c96a
commit 0dcbfa94bd

@ -96,14 +96,13 @@ class Catalog:
def _restore_json(self, string: str) -> Optional[NodeTop]: def _restore_json(self, string: str) -> Optional[NodeTop]:
"""restore the tree from json""" """restore the tree from json"""
imp = JsonImporter(dictimporter=_DictImporter(debug=self.debug)) imp = JsonImporter(dictimporter=_DictImporter(debug=self.debug))
self._debug('import from string...')
root = imp.import_(string) root = imp.import_(string)
self._debug(f'Catalog imported from json \"{self.path}\"') self._debug(f'Catalog imported from json \"{self.path}\"')
self._debug(f'root imported: {root}') self._debug(f'root imported: {root}')
if root.type != nodes.TYPE_TOP: if root.type != nodes.TYPE_TOP:
return None return None
top = NodeTop(root.name, children=root.children) top = NodeTop(root.name, children=root.children)
self._debug(f'top imported: {top}') self._debug(f'top imported: {top.name}')
return top return top
@ -126,7 +125,7 @@ class _DictImporter():
assert "parent" not in data assert "parent" not in data
attrs = dict(data) attrs = dict(data)
# replace attr # replace attr
attrs = back_attriter(attrs, debug=self.debug) attrs = back_attriter(attrs)
children: Union[str, Any] = attrs.pop("children", []) children: Union[str, Any] = attrs.pop("children", [])
node = self.nodecls(parent=parent, **attrs) node = self.nodecls(parent=parent, **attrs)
for child in children: for child in children:
@ -134,16 +133,14 @@ class _DictImporter():
return node return node
def back_attriter(adict: Dict[str, str], def back_attriter(adict: Dict[str, str]) -> Dict[str, str]:
debug: bool = False) -> Dict[str, str]:
"""replace attribute on json restore""" """replace attribute on json restore"""
attrs = {} attrs = {}
for k, val in adict.items(): for k, val in adict.items():
newk = k
if k == 'size': if k == 'size':
if debug: newk = 'nodesize'
Logger.debug(f'changing {k}={val}') attrs[newk] = val
k = 'nodesize'
attrs[k] = val
return attrs return attrs

@ -18,6 +18,7 @@ from docopt import docopt
from catcli.version import __version__ as VERSION from catcli.version import __version__ as VERSION
from catcli.nodes import NodeTop, NodeAny from catcli.nodes import NodeTop, NodeAny
from catcli.logger import Logger from catcli.logger import Logger
from catcli.printer_csv import CsvPrinter
from catcli.colors import Colors from catcli.colors import Colors
from catcli.catalog import Catalog from catcli.catalog import Catalog
from catcli.walker import Walker from catcli.walker import Walker
@ -116,7 +117,7 @@ def cmd_index(args: Dict[str, Any],
except KeyboardInterrupt: except KeyboardInterrupt:
Logger.err('aborted') Logger.err('aborted')
return return
node = noder.get_storage_node(top, name) node = top.get_storage_node()
node.parent = None node.parent = None
start = datetime.datetime.now() start = datetime.datetime.now()
@ -125,7 +126,7 @@ def cmd_index(args: Dict[str, Any],
root = noder.new_storage_node(name, path, top, attr) root = noder.new_storage_node(name, path, top, attr)
_, cnt = walker.index(path, root, name) _, cnt = walker.index(path, root, name)
if subsize: if subsize:
noder.rec_size(root, store=True) root.nodesize = root.get_rec_size()
stop = datetime.datetime.now() stop = datetime.datetime.now()
diff = stop - start diff = stop - start
Logger.info(f'Indexed {cnt} file(s) in {diff}') Logger.info(f'Indexed {cnt} file(s) in {diff}')
@ -147,16 +148,17 @@ def cmd_update(args: Dict[str, Any],
if not os.path.exists(path): if not os.path.exists(path):
Logger.err(f'\"{path}\" does not exist') Logger.err(f'\"{path}\" does not exist')
return return
root = noder.get_storage_node(top, name, newpath=path) storage = noder.find_storage_node_by_name(top, name)
if not root: if not storage:
Logger.err(f'storage named \"{name}\" does not exist') Logger.err(f'storage named \"{name}\" does not exist')
return return
noder.update_storage_path(top, name, path)
start = datetime.datetime.now() start = datetime.datetime.now()
walker = Walker(noder, usehash=usehash, debug=debug, walker = Walker(noder, usehash=usehash, debug=debug,
logpath=logpath) logpath=logpath)
cnt = walker.reindex(path, root, top) cnt = walker.reindex(path, storage, top)
if subsize: if subsize:
noder.rec_size(root, store=True) storage.nodesize = storage.get_rec_size()
stop = datetime.datetime.now() stop = datetime.datetime.now()
diff = stop - start diff = stop - start
Logger.info(f'updated {cnt} file(s) in {diff}') Logger.info(f'updated {cnt} file(s) in {diff}')
@ -189,7 +191,7 @@ def cmd_rm(args: Dict[str, Any],
top: NodeTop) -> NodeTop: top: NodeTop) -> NodeTop:
"""rm action""" """rm action"""
name = args['<storage>'] name = args['<storage>']
node = noder.get_storage_node(top, name) node = noder.find_storage_node_by_name(top, name)
if node: if node:
node.parent = None node.parent = None
if catalog.save(top): if catalog.save(top):
@ -278,7 +280,7 @@ def print_supported_formats() -> None:
"""print all supported formats to stdout""" """print all supported formats to stdout"""
print('"native" : native format') print('"native" : native format')
print('"csv" : CSV format') print('"csv" : CSV format')
print(f' {Noder.CSV_HEADER}') print(f' {CsvPrinter.CSV_HEADER}')
print('"fzf-native" : fzf to native (only valid for find)') print('"fzf-native" : fzf to native (only valid for find)')
print('"fzf-csv" : fzf to csv (only valid for find)') print('"fzf-csv" : fzf to csv (only valid for find)')
@ -303,7 +305,7 @@ def main() -> bool:
return False return False
if args['--verbose']: if args['--verbose']:
print(args) print(f'args: {args}')
# print banner # print banner
if not args['--no-banner']: if not args['--no-banner']:

@ -1,73 +0,0 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
Class for printing nodes
"""
import sys
from typing import TypeVar, Type, Optional, Tuple, List, \
Dict, Any
from catcli.colors import Colors
from catcli.utils import fix_badchars
CLASSTYPE = TypeVar('CLASSTYPE', bound='NodePrinter')
class NodePrinter:
"""a node printer class"""
STORAGE = 'storage'
ARCHIVE = 'archive'
NBFILES = 'nbfiles'
@classmethod
def print_storage_native(cls: Type[CLASSTYPE], pre: str,
name: str, args: str,
attr: Dict[str, Any]) -> None:
"""print a storage node"""
end = ''
if attr:
end = f' {Colors.GRAY}({attr}){Colors.RESET}'
out = f'{pre}{Colors.UND}{cls.STORAGE}{Colors.RESET}:'
out += ' ' + Colors.PURPLE + fix_badchars(name) + \
Colors.RESET + end + '\n'
out += f' {Colors.GRAY}{args}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def print_file_native(cls: Type[CLASSTYPE], pre: str,
name: str, attr: str) -> None:
"""print a file node"""
nobad = fix_badchars(name)
out = f'{pre}{nobad}'
out += f' {Colors.GRAY}[{attr}]{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def print_dir_native(cls: Type[CLASSTYPE], pre: str,
name: str,
nbchildren: int = 0,
attr: Optional[List[Tuple[str, str]]] = None) -> None:
"""print a directory node"""
end = []
if nbchildren > 0:
end.append(f'{cls.NBFILES}:{nbchildren}')
if attr:
end.append(' '.join([f'{x}:{y}' for x, y in attr]))
end_string = ''
if end:
end_string = f' [{", ".join(end)}]'
out = pre + Colors.BLUE + fix_badchars(name) + Colors.RESET
out += f'{Colors.GRAY}{end_string}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def print_archive_native(cls: Type[CLASSTYPE], pre: str,
name: str, archive: str) -> None:
"""archive to stdout"""
out = pre + Colors.YELLOW + fix_badchars(name) + Colors.RESET
out += f' {Colors.GRAY}[{cls.ARCHIVE}:{archive}]{Colors.RESET}'
sys.stdout.write(f'{out}\n')

@ -10,16 +10,17 @@ import shutil
import time import time
from typing import List, Union, Tuple, Any, Optional, Dict, cast from typing import List, Union, Tuple, Any, Optional, Dict, cast
import anytree # type: ignore import anytree # type: ignore
from natsort import os_sort_keygen from natsort import os_sort_keygen # type: ignore
# local imports # local imports
from catcli import nodes from catcli import nodes
from catcli.nodes import NodeAny, NodeStorage, \ from catcli.nodes import NodeAny, NodeStorage, \
NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta, \ NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta, \
typcast_node typcast_node
from catcli.utils import size_to_str, epoch_to_str, md5sum, fix_badchars from catcli.utils import md5sum, fix_badchars, has_attr
from catcli.logger import Logger from catcli.logger import Logger
from catcli.nodeprinter import NodePrinter from catcli.printer_native import NativePrinter
from catcli.printer_csv import CsvPrinter
from catcli.decomp import Decomp from catcli.decomp import Decomp
from catcli.version import __version__ as VERSION from catcli.version import __version__ as VERSION
from catcli.exceptions import CatcliException from catcli.exceptions import CatcliException
@ -35,9 +36,7 @@ class Noder:
* "file" node representing a file * "file" node representing a file
""" """
CSV_HEADER = ('name,type,path,size,indexed_at,' PRE = ' '
'maccess,md5,nbfiles,free_space,'
'total_space,meta')
def __init__(self, debug: bool = False, def __init__(self, debug: bool = False,
sortsize: bool = False, sortsize: bool = False,
@ -53,31 +52,33 @@ class Noder:
self.arc = arc self.arc = arc
if self.arc: if self.arc:
self.decomp = Decomp() self.decomp = Decomp()
self.csv_printer = CsvPrinter()
self.native_printer = NativePrinter()
@staticmethod @staticmethod
def get_storage_names(top: NodeTop) -> List[str]: def get_storage_names(top: NodeTop) -> List[str]:
"""return a list of all storage names""" """return a list of all storage names"""
return [x.name for x in list(top.children)] return [x.name for x in list(top.children)]
def get_storage_node(self, top: NodeTop, def find_storage_node_by_name(self, top: NodeTop,
name: str, name: str) -> Optional[NodeStorage]:
newpath: str = '') -> NodeStorage: """find a storage node by name"""
"""
return the storage node if any
if newpath is submitted, it will update the media info
"""
found = None
for node in top.children: for node in top.children:
if node.type != nodes.TYPE_STORAGE: if node.type != nodes.TYPE_STORAGE:
continue continue
if node.name == name: if node.name == name:
found = node return cast(NodeStorage, node)
break return None
if found and newpath and os.path.exists(newpath):
found.free = shutil.disk_usage(newpath).free def update_storage_path(self, top: NodeTop,
found.total = shutil.disk_usage(newpath).total name: str,
found.ts = int(time.time()) newpath: str) -> None:
return cast(NodeStorage, found) """find and update storage path on update"""
storage = self.find_storage_node_by_name(top, name)
if storage and newpath and os.path.exists(newpath):
storage.free = shutil.disk_usage(newpath).free
storage.total = shutil.disk_usage(newpath).total
storage.ts = int(time.time())
@staticmethod @staticmethod
def get_node(top: NodeTop, def get_node(top: NodeTop,
@ -115,7 +116,7 @@ class Noder:
return node, False return node, False
# force re-indexing if no maccess # force re-indexing if no maccess
maccess = os.path.getmtime(path) maccess = os.path.getmtime(path)
if not self._has_attr(node, 'maccess') or \ if not has_attr(node, 'maccess') or \
not node.maccess: not node.maccess:
self._debug('\tchange: no maccess found') self._debug('\tchange: no maccess found')
return node, True return node, True
@ -134,39 +135,6 @@ class Noder:
self._debug(f'\tchange: no change for \"{path}\"') self._debug(f'\tchange: no change for \"{path}\"')
return node, False return node, False
def rec_size(self, node: Union[NodeDir, NodeStorage],
store: bool = True) -> int:
"""
recursively traverse tree and return size
@store: store the size in the node
"""
if node.type == nodes.TYPE_FILE:
node.__class__ = NodeFile
msg = f'size of {node.type} \"{node.name}\": {node.nodesize}'
self._debug(msg)
return node.nodesize
msg = f'getting node size recursively for \"{node.name}\"'
self._debug(msg)
fullsize: int = 0
for i in node.children:
if node.type == nodes.TYPE_DIR:
sub_size = self.rec_size(i, store=store)
if store:
i.nodesize = sub_size
fullsize += sub_size
continue
if node.type == nodes.TYPE_STORAGE:
sub_size = self.rec_size(i, store=store)
if store:
i.nodesize = sub_size
fullsize += sub_size
continue
self._debug(f'skipping {node.name}')
if store:
node.nodesize = fullsize
self._debug(f'size of {node.type} \"{node.name}\": {fullsize}')
return fullsize
############################################################### ###############################################################
# public helpers # public helpers
############################################################### ###############################################################
@ -315,9 +283,9 @@ class Noder:
############################################################### ###############################################################
# printing # printing
############################################################### ###############################################################
def _node_to_csv(self, node: NodeAny, def _print_node_csv(self, node: NodeAny,
sep: str = ',', sep: str = ',',
raw: bool = False) -> None: raw: bool = False) -> None:
""" """
print a node to csv print a node to csv
@node: the node to consider @node: the node to consider
@ -329,53 +297,14 @@ class Noder:
if node.type == nodes.TYPE_TOP: if node.type == nodes.TYPE_TOP:
return return
out = []
if node.type == nodes.TYPE_STORAGE: if node.type == nodes.TYPE_STORAGE:
# handle storage self.csv_printer.print_storage(node,
out.append(node.name) # name sep=sep,
out.append(node.type) # type raw=raw)
out.append('') # fake full path
size = self.rec_size(node, store=False)
out.append(size_to_str(size, raw=raw)) # size
out.append(epoch_to_str(node.ts)) # indexed_at
out.append('') # fake maccess
out.append('') # fake md5
out.append(str(len(node.children))) # nbfiles
# fake free_space
out.append(size_to_str(node.free, raw=raw))
# fake total_space
out.append(size_to_str(node.total, raw=raw))
out.append(node.attr) # meta
else: else:
# handle other nodes self.csv_printer.print_node(node,
out.append(node.name.replace('"', '""')) # name sep=sep,
out.append(node.type) # type raw=raw)
parents = self._get_parents(node)
storage = self._get_storage(node)
fullpath = os.path.join(storage.name, parents)
out.append(fullpath.replace('"', '""')) # full path
out.append(size_to_str(node.nodesize, raw=raw)) # size
out.append(epoch_to_str(storage.ts)) # indexed_at
if self._has_attr(node, 'maccess'):
out.append(epoch_to_str(node.maccess)) # maccess
else:
out.append('') # fake maccess
if self._has_attr(node, 'md5'):
out.append(node.md5) # md5
else:
out.append('') # fake md5
if node.type == nodes.TYPE_DIR:
out.append(str(len(node.children))) # nbfiles
else:
out.append('') # fake nbfiles
out.append('') # fake free_space
out.append('') # fake total_space
out.append('') # fake meta
line = sep.join(['"' + o + '"' for o in out])
if len(line) > 0:
Logger.stdout_nocolor(line)
def node_has_subs(self, node: Any) -> bool: def node_has_subs(self, node: Any) -> bool:
""" """
@ -415,88 +344,35 @@ class Noder:
if node.type == nodes.TYPE_TOP: if node.type == nodes.TYPE_TOP:
# top node # top node
node.__class__ = NodeTop node.__class__ = NodeTop
Logger.stdout_nocolor(f'{pre}{node.name}') self.native_printer.print_top(pre, node.name)
elif node.type == nodes.TYPE_FILE: elif node.type == nodes.TYPE_FILE:
# node of type file # node of type file
node.__class__ = NodeFile node.__class__ = NodeFile
name = node.name self.native_printer.print_file(pre, node,
storage = self._get_storage(node) withpath=withpath,
if withpath: withstorage=withstorage,
name = os.sep.join([ raw=raw)
storage.name,
self._get_parents(node.parent),
name])
name = name.lstrip(os.sep)
attr_str = ''
if node.md5:
attr_str = f', md5:{node.md5}'
size = size_to_str(node.nodesize, raw=raw)
compl = f'size:{size}{attr_str}'
if withstorage:
content = Logger.get_bold_text(storage.name)
compl += f', storage:{content}'
NodePrinter.print_file_native(pre, name, compl)
elif node.type == nodes.TYPE_DIR: elif node.type == nodes.TYPE_DIR:
# node of type directory # node of type directory
node.__class__ = NodeDir node.__class__ = NodeDir
name = node.name self.native_printer.print_dir(pre,
storage = self._get_storage(node) node,
if withpath: withpath=withpath,
name = os.sep.join([ withstorage=withstorage,
storage.name, withnbchildren=withnbchildren,
self._get_parents(node.parent), raw=raw)
name])
name = name.lstrip(os.sep)
nbchildren = 0
if withnbchildren:
nbchildren = len(node.children)
attr: List[Tuple[str, str]] = []
if node.nodesize:
attr.append(('totsize', size_to_str(node.nodesize, raw=raw)))
if withstorage:
attr.append(('storage', Logger.get_bold_text(storage.name)))
NodePrinter.print_dir_native(pre,
name,
nbchildren=nbchildren,
attr=attr)
elif node.type == nodes.TYPE_STORAGE: elif node.type == nodes.TYPE_STORAGE:
# node of type storage # node of type storage
node.__class__ = NodeStorage node.__class__ = NodeStorage
sztotal = size_to_str(node.total, raw=raw)
szused = size_to_str(node.total - node.free, raw=raw) self.native_printer.print_storage(pre,
nbchildren = len(node.children) node,
pcent = 0 raw=raw)
if node.total > 0:
pcent = node.free * 100 / node.total
freepercent = f'{pcent:.1f}%'
# get the date
timestamp = ''
if self._has_attr(node, 'ts'):
timestamp = 'date:'
timestamp += epoch_to_str(node.ts)
disksize = ''
# the children size
recsize = self.rec_size(node, store=False)
sizestr = size_to_str(recsize, raw=raw)
disksize = 'totsize:' + f'{sizestr}'
# format the output
name = node.name
args = [
'nbfiles:' + f'{nbchildren}',
disksize,
f'free:{freepercent}',
'du:' + f'{szused}/{sztotal}',
timestamp]
argsstring = ' | '.join(args)
NodePrinter.print_storage_native(pre,
name,
argsstring,
node.attr)
elif node.type == nodes.TYPE_ARCHIVED: elif node.type == nodes.TYPE_ARCHIVED:
# archive node # archive node
node.__class__ = NodeArchived node.__class__ = NodeArchived
if self.arc: if self.arc:
NodePrinter.print_archive_native(pre, node.name, node.archive) self.native_printer.print_archive(pre, node.name, node.archive)
else: else:
Logger.err(f'bad node encountered: {node}') Logger.err(f'bad node encountered: {node}')
@ -518,18 +394,18 @@ class Noder:
withnbchildren=True, raw=raw) withnbchildren=True, raw=raw)
elif fmt == 'csv': elif fmt == 'csv':
# csv output # csv output
self._to_csv(node, raw=raw) self._print_nodes_csv(node, raw=raw)
elif fmt == 'csv-with-header': elif fmt == 'csv-with-header':
# csv output # csv output
Logger.stdout_nocolor(self.CSV_HEADER) self.csv_printer.print_header()
self._to_csv(node, raw=raw) self._print_nodes_csv(node, raw=raw)
def _to_csv(self, node: NodeAny, def _print_nodes_csv(self, node: NodeAny,
raw: bool = False) -> None: raw: bool = False) -> None:
"""print the tree to csv""" """print the tree to csv"""
rend = anytree.RenderTree(node, childiter=self._sort_tree) rend = anytree.RenderTree(node, childiter=self._sort_tree)
for _, _, item in rend: for _, _, item in rend:
self._node_to_csv(item, raw=raw) self._print_node_csv(item, raw=raw)
@staticmethod @staticmethod
def _fzf_prompt(strings: Any) -> Any: def _fzf_prompt(strings: Any) -> Any:
@ -555,8 +431,8 @@ class Noder:
for _, _, rend in rendered: for _, _, rend in rendered:
if not rend: if not rend:
continue continue
parents = self._get_parents(rend) parents = rend.get_parent_hierarchy()
storage = self._get_storage(rend) storage = rend.get_storage_node()
fullpath = os.path.join(storage.name, parents) fullpath = os.path.join(storage.name, parents)
the_nodes[fullpath] = rend the_nodes[fullpath] = rend
# prompt with fzf # prompt with fzf
@ -613,9 +489,10 @@ class Noder:
# compile found nodes # compile found nodes
paths = {} paths = {}
for item in found: for item in found:
typcast_node(item)
item.name = fix_badchars(item.name) item.name = fix_badchars(item.name)
storage = self._get_storage(item) storage = item.get_storage_node()
parents = self._get_parents(item) parents = item.get_parent_hierarchy()
parent_key = f'{storage.name}/{parents}' parent_key = f'{storage.name}/{parents}'
key = f'{parent_key}/{item.name}' key = f'{parent_key}/{item.name}'
paths[parent_key] = item paths[parent_key] = item
@ -640,9 +517,9 @@ class Noder:
raw=raw) raw=raw)
elif fmt.startswith('csv'): elif fmt.startswith('csv'):
if fmt == 'csv-with-header': if fmt == 'csv-with-header':
Logger.stdout_nocolor(self.CSV_HEADER) self.csv_printer.print_header()
for _, item in paths.items(): for _, item in paths.items():
self._node_to_csv(item, raw=raw) self._print_node_csv(item, raw=raw)
# execute script if any # execute script if any
if script: if script:
@ -695,13 +572,14 @@ class Noder:
@fmt: output format @fmt: output format
@raw: print raw size @raw: print raw size
""" """
self._debug(f'walking path: \"{path}\" from {top}') self._debug(f'walking path: \"{path}\" from {top.name}')
resolv = anytree.resolver.Resolver('name') resolv = anytree.resolver.Resolver('name')
found = [] found = []
try: try:
if '*' in path or '?' in path: if '*' in path or '?' in path:
# we need to handle glob # we need to handle glob
self._debug(f'glob with top {top.name} and path {path}')
found = resolv.glob(top, path) found = resolv.glob(top, path)
else: else:
# we have a canonical path # we have a canonical path
@ -724,9 +602,7 @@ class Noder:
return found return found
# sort found nodes # sort found nodes
#found = os_sorted(found)
found = sorted(found, key=os_sort_keygen(self._sort)) found = sorted(found, key=os_sort_keygen(self._sort))
#found = sorted(found, key=cmp_to_key(self._sort), reverse=self.sortsize)
# print the parent # print the parent
if fmt == 'native': if fmt == 'native':
@ -735,21 +611,21 @@ class Noder:
withnbchildren=True, withnbchildren=True,
raw=raw) raw=raw)
elif fmt.startswith('csv'): elif fmt.startswith('csv'):
self._node_to_csv(found[0].parent, raw=raw) self._print_node_csv(found[0].parent, raw=raw)
elif fmt.startswith('fzf'): elif fmt.startswith('fzf'):
pass pass
# print all found nodes # print all found nodes
if fmt == 'csv-with-header': if fmt == 'csv-with-header':
Logger.stdout_nocolor(self.CSV_HEADER) self.csv_printer.print_header()
for item in found: for item in found:
if fmt == 'native': if fmt == 'native':
self._print_node_native(item, withpath=False, self._print_node_native(item, withpath=False,
pre='- ', pre=Noder.PRE,
withnbchildren=True, withnbchildren=True,
raw=raw) raw=raw)
elif fmt.startswith('csv'): elif fmt.startswith('csv'):
self._node_to_csv(item, raw=raw) self._print_node_csv(item, raw=raw)
elif fmt.startswith('fzf'): elif fmt.startswith('fzf'):
self._to_fzf(item, fmt) self._to_fzf(item, fmt)
@ -800,11 +676,10 @@ class Noder:
return self._sort_fs(lst) return self._sort_fs(lst)
@staticmethod @staticmethod
def _sort_fs(node: NodeAny) -> Tuple[str, str]: def _sort_fs(node: NodeAny) -> str:
"""sort by name""" """sort by name"""
# to sort by types then name # to sort by types then name
# return (node.type, node.name) return str(node.name)
return node.name
@staticmethod @staticmethod
def _sort_size(node: NodeAny) -> float: def _sort_size(node: NodeAny) -> float:
@ -816,28 +691,6 @@ class Noder:
except AttributeError: except AttributeError:
return 0 return 0
def _get_storage(self, node: NodeAny) -> NodeStorage:
"""recursively traverse up to find storage"""
if node.type == nodes.TYPE_STORAGE:
return node
return cast(NodeStorage, node.ancestors[1])
@staticmethod
def _has_attr(node: NodeAny, attr: str) -> bool:
"""return True if node has attr as attribute"""
return attr in node.__dict__.keys()
def _get_parents(self, node: NodeAny) -> str:
"""get all parents recursively"""
if node.type == nodes.TYPE_STORAGE:
return ''
if node.type == nodes.TYPE_TOP:
return ''
parent = self._get_parents(node.parent)
if parent:
return os.sep.join([parent, node.name])
return str(node.name)
@staticmethod @staticmethod
def _get_hash(path: str) -> str: def _get_hash(path: str) -> str:
"""return md5 hash of node""" """return md5 hash of node"""

@ -6,9 +6,12 @@ Class that represents a node in the catalog tree
""" """
# pylint: disable=W0622 # pylint: disable=W0622
from typing import Dict, Any import os
from typing import Dict, Any, cast
from anytree import NodeMixin # type: ignore from anytree import NodeMixin # type: ignore
from catcli.exceptions import CatcliException
TYPE_TOP = 'top' TYPE_TOP = 'top'
TYPE_FILE = 'file' TYPE_FILE = 'file'
@ -35,6 +38,8 @@ def typcast_node(node: Any) -> None:
node.__class__ = NodeStorage node.__class__ = NodeStorage
elif node.type == TYPE_META: elif node.type == TYPE_META:
node.__class__ = NodeMeta node.__class__ = NodeMeta
else:
raise CatcliException(f"bad node: {node}")
class NodeAny(NodeMixin): # type: ignore class NodeAny(NodeMixin): # type: ignore
@ -60,6 +65,18 @@ class NodeAny(NodeMixin): # type: ignore
def __str__(self) -> str: def __str__(self) -> str:
return self._to_str() return self._to_str()
def get_parent_hierarchy(self) -> str:
"""get all parents recursively"""
raise NotImplementedError
def get_storage_node(self) -> NodeMixin:
"""recursively traverse up to find storage"""
return None
def get_rec_size(self) -> int:
"""recursively traverse tree and return size"""
raise NotImplementedError
def flagged(self) -> bool: def flagged(self) -> bool:
"""is flagged""" """is flagged"""
if not hasattr(self, '_flagged'): if not hasattr(self, '_flagged'):
@ -90,6 +107,14 @@ class NodeTop(NodeAny):
if children: if children:
self.children = children self.children = children
def get_parent_hierarchy(self) -> str:
"""get all parents recursively"""
return ''
def get_rec_size(self) -> int:
"""recursively traverse tree and return size"""
return 0
def __str__(self) -> str: def __str__(self) -> str:
return self._to_str() return self._to_str()
@ -115,6 +140,22 @@ class NodeFile(NodeAny):
if children: if children:
self.children = children self.children = children
def get_parent_hierarchy(self) -> str:
"""get all parents recursively"""
typcast_node(self.parent)
path = self.parent.get_parent_hierarchy()
if path:
return os.sep.join([path, self.name])
return str(self.name)
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return cast(NodeStorage, self.ancestors[1])
def get_rec_size(self) -> int:
"""recursively traverse tree and return size"""
return self.nodesize
def __str__(self) -> str: def __str__(self) -> str:
return self._to_str() return self._to_str()
@ -138,6 +179,26 @@ class NodeDir(NodeAny):
if children: if children:
self.children = children self.children = children
def get_parent_hierarchy(self) -> str:
"""get all parents recursively"""
typcast_node(self.parent)
path = self.parent.get_parent_hierarchy()
if path:
return os.sep.join([path, self.name])
return str(self.name)
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return cast(NodeStorage, self.ancestors[1])
def get_rec_size(self) -> int:
"""recursively traverse tree and return size"""
totsize: int = 0
for node in self.children:
typcast_node(node)
totsize += node.get_rec_size()
return totsize
def __str__(self) -> str: def __str__(self) -> str:
return self._to_str() return self._to_str()
@ -163,6 +224,22 @@ class NodeArchived(NodeAny):
if children: if children:
self.children = children self.children = children
def get_parent_hierarchy(self) -> str:
"""get all parents recursively"""
typcast_node(self.parent)
path = self.parent.get_parent_hierarchy()
if path:
return os.sep.join([path, self.name])
return str(self.name)
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return cast(NodeStorage, self.ancestors[1])
def get_rec_size(self) -> int:
"""recursively traverse tree and return size"""
return self.nodesize
def __str__(self) -> str: def __str__(self) -> str:
return self._to_str() return self._to_str()
@ -192,6 +269,22 @@ class NodeStorage(NodeAny):
if children: if children:
self.children = children self.children = children
def get_parent_hierarchy(self) -> str:
"""get all parents recursively"""
return ''
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return self
def get_rec_size(self) -> int:
"""recursively traverse tree and return size"""
totsize: int = 0
for node in self.children:
typcast_node(node)
totsize += node.get_rec_size()
return totsize
def __str__(self) -> str: def __str__(self) -> str:
return self._to_str() return self._to_str()
@ -213,5 +306,17 @@ class NodeMeta(NodeAny):
if children: if children:
self.children = children self.children = children
def get_parent_hierarchy(self) -> str:
"""get all parents recursively"""
typcast_node(self.parent)
path = self.parent.get_parent_hierarchy()
if path:
return os.sep.join([path, self.name])
return str(self.name)
def get_rec_size(self) -> int:
"""recursively traverse tree and return size"""
return 0
def __str__(self) -> str: def __str__(self) -> str:
return self._to_str() return self._to_str()

@ -0,0 +1,84 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2024, deadc0de6
Class for printing nodes in csv format
"""
import sys
import os
from typing import List
from catcli.nodes import NodeAny, NodeStorage, TYPE_DIR
from catcli.utils import size_to_str, epoch_to_str, \
has_attr
class CsvPrinter:
"""a node printer class"""
DEFSEP = ','
CSV_HEADER = ('name,type,path,size,indexed_at,'
'maccess,md5,nbfiles,free_space,'
'total_space,meta')
def _print_entries(self, entries: List[str], sep: str = DEFSEP) -> None:
line = sep.join(['"' + o + '"' for o in entries])
if len(line) > 0:
sys.stdout.write(f'{line}\n')
def print_header(self) -> None:
"""print csv header"""
sys.stdout.write(f'{self.CSV_HEADER}\n')
def print_storage(self, node: NodeStorage,
sep: str = DEFSEP,
raw: bool = False) -> None:
"""print a storage node"""
out = []
out.append(node.name) # name
out.append(node.type) # type
out.append('') # fake full path
size = node.get_rec_size()
out.append(size_to_str(size, raw=raw)) # size
out.append(epoch_to_str(node.ts)) # indexed_at
out.append('') # fake maccess
out.append('') # fake md5
out.append(str(len(node.children))) # nbfiles
# fake free_space
out.append(size_to_str(node.free, raw=raw))
# fake total_space
out.append(size_to_str(node.total, raw=raw))
out.append(node.attr) # meta
self._print_entries(out, sep=sep)
def print_node(self, node: NodeAny,
sep: str = DEFSEP,
raw: bool = False) -> None:
"""print other nodes"""
out = []
out.append(node.name.replace('"', '""')) # name
out.append(node.type) # type
parents = node.get_parent_hierarchy()
storage = node.get_storage_node()
fullpath = os.path.join(storage.name, parents)
out.append(fullpath.replace('"', '""')) # full path
out.append(size_to_str(node.nodesize, raw=raw)) # size
out.append(epoch_to_str(storage.ts)) # indexed_at
if has_attr(node, 'maccess'):
out.append(epoch_to_str(node.maccess)) # maccess
else:
out.append('') # fake maccess
if has_attr(node, 'md5'):
out.append(node.md5) # md5
else:
out.append('') # fake md5
if node.type == TYPE_DIR:
out.append(str(len(node.children))) # nbfiles
else:
out.append('') # fake nbfiles
out.append('') # fake free_space
out.append('') # fake total_space
out.append('') # fake meta
self._print_entries(out, sep=sep)

@ -0,0 +1,151 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
Class for printing nodes in native format
"""
import sys
import os
from catcli.nodes import NodeFile, NodeDir, NodeStorage
from catcli.colors import Colors
from catcli.logger import Logger
from catcli.utils import fix_badchars, size_to_str, \
has_attr, epoch_to_ls_str
TS_LJUST = 13
SIZE_LJUST = 6
NAME_LJUST = 20
class NativePrinter:
"""a node printer class"""
STORAGE = 'storage'
ARCHIVE = 'archive'
NBFILES = 'nbfiles'
def print_top(self, pre: str, name: str) -> None:
"""print top node"""
sys.stdout.write(f'{pre}{name}\n')
def print_storage(self, pre: str,
node: NodeStorage,
raw: bool = False) -> None:
"""print a storage node"""
# construct name
name = node.name
name = fix_badchars(name)
# construct attrs
attrs = []
# nb files
attrs.append(f'nbfiles:{len(node.children)}')
# the children size
recsize = node.get_rec_size()
sizestr = size_to_str(recsize, raw=raw)
attrs.append(f'totsize:{sizestr}')
# free
pcent = 0.0
if node.total > 0:
pcent = node.free * 100 / node.total
attrs.append(f'free:{pcent:.1f}%')
# du
sztotal = size_to_str(node.total, raw=raw)
szused = size_to_str(node.total - node.free, raw=raw)
attrs.append(f'du:{szused}/{sztotal}')
# timestamp
if has_attr(node, 'ts'):
attrs.append(f'date:{epoch_to_ls_str(node.ts)}')
# print
out = f'{pre}{Colors.UND}{self.STORAGE}{Colors.RESET}: '
out += f'{Colors.PURPLE}{name}{Colors.RESET}'
if attrs:
out += f'\n{" "*len(name)}{Colors.GRAY}{"|".join(attrs)}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
def print_file(self, pre: str,
node: NodeFile,
withpath: bool = False,
withstorage: bool = False,
raw: bool = False) -> None:
"""print a file node"""
# construct name
name = node.name
storage = node.get_storage_node()
if withpath:
name = os.sep.join([
storage.name,
node.parent.get_parent_hierarchy(),
name])
name = fix_badchars(name)
# construct attributes
attrs = []
if node.md5:
attrs.append(f'md5:{node.md5}')
if withstorage:
content = Logger.get_bold_text(storage.name)
attrs.append(f', storage:{content}')
# print
out = []
out .append(f'{pre}')
line = name.ljust(NAME_LJUST, ' ')
out.append(f'{line}')
size = 0
if node.nodesize:
size = node.nodesize
line = size_to_str(size, raw=raw).ljust(SIZE_LJUST, ' ')
out.append(f'{Colors.BLUE}{line}{Colors.RESET}')
line = epoch_to_ls_str(node.maccess).ljust(TS_LJUST, ' ')
out.append(f'{Colors.PURPLE}{line}{Colors.RESET}')
if attrs:
out.append(f'{Colors.GRAY}[{",".join(attrs)}]{Colors.RESET}')
sys.stdout.write(f'{" ".join(out)}\n')
def print_dir(self, pre: str,
node: NodeDir,
withpath: bool = False,
withstorage: bool = False,
withnbchildren: bool = False,
raw: bool = False) -> None:
"""print a directory node"""
# construct name
name = node.name
storage = node.get_storage_node()
if withpath:
name = os.sep.join([
storage.name,
node.parent.get_parent_hierarchy(),
name])
name = fix_badchars(name)
# construct attrs
attrs = []
if withnbchildren:
nbchildren = len(node.children)
attrs.append(f'{self.NBFILES}:{nbchildren}')
if withstorage:
attrs.append(f'storage:{Logger.get_bold_text(storage.name)}')
# print
out = []
out.append(f'{pre}')
line = name.ljust(NAME_LJUST, ' ')
out.append(f'{Colors.BLUE}{line}{Colors.RESET}')
size = 0
if node.nodesize:
size = node.nodesize
line = size_to_str(size, raw=raw).ljust(SIZE_LJUST, ' ')
out.append(f'{Colors.GRAY}{line}{Colors.RESET}')
line = epoch_to_ls_str(node.maccess).ljust(TS_LJUST, ' ')
out.append(f'{Colors.GRAY}{line}{Colors.RESET}')
if attrs:
out.append(f'{Colors.GRAY}[{",".join(attrs)}]{Colors.RESET}')
sys.stdout.write(f'{" ".join(out)}\n')
def print_archive(self, pre: str,
name: str, archive: str) -> None:
"""print an archive"""
name = fix_badchars(name)
out = f'{pre}{Colors.YELLOW}{name}{Colors.RESET} '
out += f'{Colors.GRAY}[{self.ARCHIVE}:{archive}]{Colors.RESET}'
sys.stdout.write(f'{out}\n')

@ -17,6 +17,8 @@ from catcli.exceptions import CatcliException
WILD = '*' WILD = '*'
TS_FORMAT_6 = '%b %d %H:%M'
TS_FORMAT_MORE = '%b %d %Y'
def path_to_top(path: str) -> str: def path_to_top(path: str) -> str:
@ -38,9 +40,9 @@ def path_to_search_all(path: str) -> str:
if not path.startswith(pre): if not path.startswith(pre):
# prepend with top node path # prepend with top node path
path = pre + path path = pre + path
if not path.endswith(os.path.sep): # if not path.endswith(os.path.sep):
# ensure ends with a separator # # ensure ends with a separator
path += os.path.sep # path += os.path.sep
# if not path.endswith(WILD): # if not path.endswith(WILD):
# # add wild card # # add wild card
# path += WILD # path += WILD
@ -95,6 +97,18 @@ def epoch_to_str(epoch: float) -> str:
return timestamp.strftime(fmt) return timestamp.strftime(fmt)
def epoch_to_ls_str(epoch: float) -> str:
"""convert epoch to string"""
if not epoch:
return ''
timestamp = datetime.datetime.fromtimestamp(epoch)
delta = datetime.date.today() - datetime.timedelta(days=6*365/12)
fmt = TS_FORMAT_MORE
if timestamp.date() < delta:
fmt = TS_FORMAT_6
return timestamp.strftime(fmt)
def ask(question: str) -> bool: def ask(question: str) -> bool:
"""ask the user what to do""" """ask the user what to do"""
resp = input(f'{question} [y|N] ? ') resp = input(f'{question} [y|N] ? ')
@ -117,3 +131,8 @@ def edit(string: str) -> str:
def fix_badchars(string: str) -> str: def fix_badchars(string: str) -> str:
"""fix none utf-8 chars in string""" """fix none utf-8 chars in string"""
return string.encode('utf-8', 'ignore').decode('utf-8') return string.encode('utf-8', 'ignore').decode('utf-8')
def has_attr(node: nodes.NodeAny, attr: str) -> bool:
"""return True if node has attr as attribute"""
return attr in node.__dict__.keys()

@ -46,7 +46,7 @@ class TestRm(unittest.TestCase):
top = cmd_rm(args, noder, catalog, top) top = cmd_rm(args, noder, catalog, top)
# ensure there no children anymore # ensure there no children anymore
self.assertTrue(len(top.children) == 0) self.assertEqual(len(top.children), 0)
def main(): def main():

@ -125,7 +125,7 @@ class TestUpdate(unittest.TestCase):
# explore the top node to find all nodes # explore the top node to find all nodes
self.assertEqual(len(top.children), 1) self.assertEqual(len(top.children), 1)
storage = top.children[0] storage = top.children[0]
self.assertTrue(len(storage.children) == 8) self.assertEqual(len(storage.children), 8)
# ensure d1f1 md5 sum has changed in catalog # ensure d1f1 md5 sum has changed in catalog
nods = noder.find_name(top, os.path.basename(d1f1)) nods = noder.find_name(top, os.path.basename(d1f1))

Loading…
Cancel
Save