diff --git a/catcli/catalog.py b/catcli/catalog.py index 005d081..821327c 100644 --- a/catcli/catalog.py +++ b/catcli/catalog.py @@ -7,11 +7,12 @@ Class that represents the catcli catalog import os import pickle -import anytree # type: ignore +from typing import Optional, Union, Any, cast from anytree.exporter import JsonExporter # type: ignore from anytree.importer import JsonImporter # type: ignore # local imports +from catcli.cnode import Node from catcli.utils import ask from catcli.logger import Logger @@ -32,10 +33,10 @@ class Catalog: self.path = path self.debug = debug self.force = force - self.metanode = None + self.metanode: Optional[Node] = None self.pickle = usepickle - def set_metanode(self, metanode: anytree.AnyNode) -> None: + def set_metanode(self, metanode: Node) -> None: """remove the metanode until tree is re-written""" self.metanode = metanode if self.metanode: @@ -49,7 +50,7 @@ class Catalog: return True return False - def restore(self) -> anytree.AnyNode: + def restore(self) -> Optional[Node]: """restore the catalog""" if not self.path: return None @@ -61,7 +62,7 @@ class Catalog: content = file.read() return self._restore_json(content) - def save(self, node: anytree.AnyNode) -> bool: + def save(self, node: Node) -> bool: """save the catalog""" if not self.path: Logger.err('Path not defined') @@ -87,14 +88,14 @@ class Catalog: return Logger.debug(text) - def _save_pickle(self, node: anytree.AnyNode) -> bool: + def _save_pickle(self, node: Node) -> bool: """pickle the catalog""" with open(self.path, 'wb') as file: pickle.dump(node, file) self._debug(f'Catalog saved to pickle \"{self.path}\"') return True - def _restore_pickle(self) -> anytree.AnyNode: + def _restore_pickle(self) -> Union[Node, Any]: """restore the pickled tree""" with open(self.path, 'rb') as file: root = pickle.load(file) @@ -102,7 +103,7 @@ class Catalog: self._debug(msg) return root - def _save_json(self, node: anytree.AnyNode) -> bool: + def _save_json(self, node: Node) -> bool: """export the catalog in json""" exp = JsonExporter(indent=2, sort_keys=True) with open(self.path, 'w', encoding='UTF-8') as file: @@ -110,9 +111,9 @@ class Catalog: self._debug(f'Catalog saved to json \"{self.path}\"') return True - def _restore_json(self, string: str) -> anytree.AnyNode: + def _restore_json(self, string: str) -> Node: """restore the tree from json""" imp = JsonImporter() root = imp.import_(string) self._debug(f'Catalog imported from json \"{self.path}\"') - return root + return cast(Node, root) diff --git a/catcli/catcli.py b/catcli/catcli.py index 49f5e0a..efcbfb8 100755 --- a/catcli/catcli.py +++ b/catcli/catcli.py @@ -12,19 +12,20 @@ import sys import os import datetime from typing import Dict, Any, List -import anytree # type: ignore from docopt import docopt # local imports -from .version import __version__ as VERSION -from .logger import Logger -from .colors import Colors -from .catalog import Catalog -from .walker import Walker -from .noder import Noder -from .utils import ask, edit -from .fuser import Fuser -from .exceptions import BadFormatException, CatcliException +from catcli import cnode +from catcli.version import __version__ as VERSION +from catcli.logger import Logger +from catcli.colors import Colors +from catcli.catalog import Catalog +from catcli.walker import Walker +from catcli.cnode import Node +from catcli.noder import Noder +from catcli.utils import ask, edit +from catcli.fuser import Fuser +from catcli.exceptions import BadFormatException, CatcliException NAME = 'catcli' CUR = os.path.dirname(os.path.abspath(__file__)) @@ -81,7 +82,7 @@ Options: def cmd_mount(args: Dict[str, Any], - top: anytree.AnyNode, + top: Node, noder: Noder) -> None: """mount action""" mountpoint = args[''] @@ -93,7 +94,7 @@ def cmd_mount(args: Dict[str, Any], def cmd_index(args: Dict[str, Any], noder: Noder, catalog: Catalog, - top: anytree.AnyNode) -> None: + top: Node) -> None: """index action""" path = args[''] name = args[''] @@ -116,8 +117,8 @@ def cmd_index(args: Dict[str, Any], start = datetime.datetime.now() walker = Walker(noder, usehash=usehash, debug=debug) - attr = noder.format_storage_attr(args['--meta']) - root = noder.new_storage_node(name, path, parent=top, attr=attr) + attr = noder.attrs_to_string(args['--meta']) + root = noder.new_storage_node(name, path, parent=top, attrs=attr) _, cnt = walker.index(path, root, name) if subsize: noder.rec_size(root, store=True) @@ -131,7 +132,7 @@ def cmd_index(args: Dict[str, Any], def cmd_update(args: Dict[str, Any], noder: Noder, catalog: Catalog, - top: anytree.AnyNode) -> None: + top: Node) -> None: """update action""" path = args[''] name = args[''] @@ -142,7 +143,7 @@ def cmd_update(args: Dict[str, Any], if not os.path.exists(path): Logger.err(f'\"{path}\" does not exist') return - root = noder.get_storage_node(top, name, path=path) + root = noder.get_storage_node(top, name, newpath=path) if not root: Logger.err(f'storage named \"{name}\" does not exist') return @@ -161,7 +162,7 @@ def cmd_update(args: Dict[str, Any], def cmd_ls(args: Dict[str, Any], noder: Noder, - top: anytree.AnyNode) -> List[anytree.AnyNode]: + top: Node) -> List[Node]: """ls action""" path = args[''] if not path: @@ -169,7 +170,7 @@ def cmd_ls(args: Dict[str, Any], if not path.startswith(SEPARATOR): path = SEPARATOR + path # prepend with top node path - pre = f'{SEPARATOR}{noder.NAME_TOP}' + pre = f'{SEPARATOR}{cnode.NAME_TOP}' if not path.startswith(pre): path = pre + path # ensure ends with a separator @@ -196,7 +197,7 @@ def cmd_ls(args: Dict[str, Any], def cmd_rm(args: Dict[str, Any], noder: Noder, catalog: Catalog, - top: anytree.AnyNode) -> anytree.AnyNode: + top: Node) -> Node: """rm action""" name = args[''] node = noder.get_storage_node(top, name) @@ -211,7 +212,7 @@ def cmd_rm(args: Dict[str, Any], def cmd_find(args: Dict[str, Any], noder: Noder, - top: anytree.AnyNode) -> List[anytree.AnyNode]: + top: Node) -> List[Node]: """find action""" fromtree = args['--parent'] directory = args['--directory'] @@ -231,7 +232,7 @@ def cmd_find(args: Dict[str, Any], def cmd_graph(args: Dict[str, Any], noder: Noder, - top: anytree.AnyNode) -> None: + top: Node) -> None: """graph action""" path = args[''] if not path: @@ -242,7 +243,7 @@ def cmd_graph(args: Dict[str, Any], def cmd_rename(args: Dict[str, Any], catalog: Catalog, - top: anytree.AnyNode) -> None: + top: Node) -> None: """rename action""" storage = args[''] new = args[''] @@ -260,7 +261,7 @@ def cmd_rename(args: Dict[str, Any], def cmd_edit(args: Dict[str, Any], noder: Noder, catalog: Catalog, - top: anytree.AnyNode) -> None: + top: Node) -> None: """edit action""" storage = args[''] storages = list(x.name for x in top.children) @@ -270,7 +271,7 @@ def cmd_edit(args: Dict[str, Any], if not attr: attr = '' new = edit(attr) - node.attr = noder.format_storage_attr(new) + node.attr = noder.attrs_to_string(new) if catalog.save(top): Logger.info(f'Storage \"{storage}\" edited') else: diff --git a/catcli/cnode.py b/catcli/cnode.py new file mode 100644 index 0000000..599d4fa --- /dev/null +++ b/catcli/cnode.py @@ -0,0 +1,68 @@ +""" +author: deadc0de6 (https://github.com/deadc0de6) +Copyright (c) 2023, deadc0de6 + +Class that represents a node in the catalog tree +""" +# pylint: disable=W0622 + +from anytree import NodeMixin # type: ignore + + +TYPE_TOP = 'top' +TYPE_FILE = 'file' +TYPE_DIR = 'dir' +TYPE_ARC = 'arc' +TYPE_STORAGE = 'storage' +TYPE_META = 'meta' + +NAME_TOP = 'top' +NAME_META = 'meta' + + +class Node(NodeMixin): # type: ignore + """a node in the catalog""" + + def __init__(self, # type: ignore[no-untyped-def] + name: str, + type: str, + size: float = 0, + relpath: str = '', + md5: str = '', + maccess: float = 0, + free: int = 0, + total: int = 0, + indexed_dt: int = 0, + attr: str = '', + archive: str = '', + parent=None, + children=None): + """build a node""" + super().__init__() + self.name = name + self.type = type + self.size = size + self.relpath = relpath + self.md5 = md5 + self.maccess = maccess + self.free = free + self.total = total + self.indexed_dt = indexed_dt + self.attr = attr + self.archive = archive + self.parent = parent + if children: + self.children = children + self._flagged = False + + def flagged(self) -> bool: + """is flagged""" + return self._flagged + + def flag(self) -> None: + """flag a node""" + self._flagged = True + + def unflag(self) -> None: + """unflag node""" + self._flagged = False diff --git a/catcli/fuser.py b/catcli/fuser.py index 0b3d011..6a4b6cd 100644 --- a/catcli/fuser.py +++ b/catcli/fuser.py @@ -9,10 +9,13 @@ import os import logging from time import time from stat import S_IFDIR, S_IFREG -from typing import List, Dict, Any -import anytree # type: ignore +from typing import List, Dict, Any, Optional import fuse # type: ignore -from .noder import Noder + +# local imports +from catcli.noder import Noder +from catcli import cnode +from catcli.cnode import Node # build custom logger to log in /tmp @@ -31,7 +34,7 @@ class Fuser: """fuse filesystem mounter""" def __init__(self, mountpoint: str, - top: anytree.AnyNode, + top: Node, noder: Noder, debug: bool = False): """fuse filesystem""" @@ -47,15 +50,15 @@ class Fuser: class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore """in-memory filesystem for catcli catalog""" - def __init__(self, top: anytree.AnyNode, + def __init__(self, top: Node, noder: Noder): """init fuse filesystem""" self.top = top self.noder = noder - def _get_entry(self, path: str) -> anytree.AnyNode: + def _get_entry(self, path: str) -> Optional[Node]: """return the node pointed by path""" - pre = f'{SEPARATOR}{self.noder.NAME_TOP}' + pre = f'{SEPARATOR}{cnode.NAME_TOP}' if not path.startswith(pre): path = pre + path found = self.noder.list(self.top, path, @@ -66,9 +69,9 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore return found[0] return None - def _get_entries(self, path: str) -> List[anytree.AnyNode]: + def _get_entries(self, path: str) -> List[Node]: """return nodes pointed by path""" - pre = f'{SEPARATOR}{self.noder.NAME_TOP}' + pre = f'{SEPARATOR}{cnode.NAME_TOP}' if not path.startswith(pre): path = pre + path if not path.endswith(SEPARATOR): @@ -88,17 +91,17 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore curt = time() mode: Any = S_IFREG - if entry.type == Noder.TYPE_ARC: + if entry.type == cnode.TYPE_ARC: mode = S_IFREG - elif entry.type == Noder.TYPE_DIR: + elif entry.type == cnode.TYPE_DIR: mode = S_IFDIR - elif entry.type == Noder.TYPE_FILE: + elif entry.type == cnode.TYPE_FILE: mode = S_IFREG - elif entry.type == Noder.TYPE_STORAGE: + elif entry.type == cnode.TYPE_STORAGE: mode = S_IFDIR - elif entry.type == Noder.TYPE_META: + elif entry.type == cnode.TYPE_META: mode = S_IFREG - elif entry.type == Noder.TYPE_TOP: + elif entry.type == cnode.TYPE_TOP: mode = S_IFREG return { 'st_mode': (mode), diff --git a/catcli/noder.py b/catcli/noder.py index 8624e8b..5acf7f6 100644 --- a/catcli/noder.py +++ b/catcli/noder.py @@ -8,11 +8,13 @@ Class that process nodes in the catalog tree import os import shutil import time -from typing import List, Union, Tuple, Any, Optional, Dict +from typing import List, Union, Tuple, Any, Optional, Dict, cast import anytree # type: ignore from pyfzf.pyfzf import FzfPrompt # type: ignore # local imports +from catcli import cnode +from catcli.cnode import Node from catcli.utils import size_to_str, epoch_to_str, md5sum, fix_badchars from catcli.logger import Logger from catcli.nodeprinter import NodePrinter @@ -31,16 +33,6 @@ class Noder: * "file" node representing a file """ - NAME_TOP = 'top' - NAME_META = 'meta' - - TYPE_TOP = 'top' - TYPE_FILE = 'file' - TYPE_DIR = 'dir' - TYPE_ARC = 'arc' - TYPE_STORAGE = 'storage' - TYPE_META = 'meta' - CSV_HEADER = ('name,type,path,size,indexed_at,' 'maccess,md5,nbfiles,free_space,' 'total_space,meta') @@ -61,46 +53,49 @@ class Noder: self.decomp = Decomp() @staticmethod - def get_storage_names(top: anytree.AnyNode) -> List[str]: + def get_storage_names(top: Node) -> List[str]: """return a list of all storage names""" return [x.name for x in list(top.children)] - def get_storage_node(self, top: anytree.AnyNode, - name: str, path: str = '') -> anytree.AnyNode: + def get_storage_node(self, top: Node, + name: str, + newpath: str = '') -> Node: """ return the storage node if any if path is submitted, it will update the media info """ found = None for node in top.children: - if node.type != self.TYPE_STORAGE: + if node.type != cnode.TYPE_STORAGE: continue if node.name == name: found = node break - if found and path and os.path.exists(path): - found.free = shutil.disk_usage(path).free - found.total = shutil.disk_usage(path).total + if found and newpath and os.path.exists(newpath): + found.free = shutil.disk_usage(newpath).free + found.total = shutil.disk_usage(newpath).total found.ts = int(time.time()) - return found + return cast(Node, found) @staticmethod - def get_node(top: str, path: str, - quiet: bool = False) -> anytree.AnyNode: + def get_node(top: Node, + path: str, + quiet: bool = False) -> Optional[Node]: """get the node by internal tree path""" resolv = anytree.resolver.Resolver('name') try: bpath = os.path.basename(path) - return resolv.get(top, bpath) + the_node = resolv.get(top, bpath) + return cast(Node, the_node) except anytree.resolver.ChildResolverError: if not quiet: Logger.err(f'No node at path \"{bpath}\"') return None def get_node_if_changed(self, - top: anytree.AnyNode, + top: Node, path: str, - treepath: str) -> Tuple[anytree.AnyNode, bool]: + treepath: str) -> Tuple[Optional[Node], bool]: """ return the node (if any) and if it has changed @top: top node (storage) @@ -136,25 +131,25 @@ class Noder: self._debug(f'\tchange: no change for \"{path}\"') return node, False - def rec_size(self, node: anytree.AnyNode, + def rec_size(self, node: Node, store: bool = True) -> float: """ recursively traverse tree and return size @store: store the size in the node """ - if node.type == self.TYPE_FILE: + if node.type == cnode.TYPE_FILE: self._debug(f'getting node size for \"{node.name}\"') return float(node.size) msg = f'getting node size recursively for \"{node.name}\"' self._debug(msg) size: float = 0 for i in node.children: - if node.type == self.TYPE_DIR: + if node.type == cnode.TYPE_DIR: size = self.rec_size(i, store=store) if store: i.size = size size += size - if node.type == self.TYPE_STORAGE: + if node.type == cnode.TYPE_STORAGE: size = self.rec_size(i, store=store) if store: i.size = size @@ -169,28 +164,34 @@ class Noder: # public helpers ############################################################### @staticmethod - def format_storage_attr(attr: Union[str, List[str]]) -> str: + def attrs_to_string(attr: Union[List[str], Dict[str, str], str]) -> str: """format the storage attr for saving""" if not attr: return '' if isinstance(attr, list): return ', '.join(attr) + if isinstance(attr, dict): + ret = [] + for key, val in attr.items(): + ret.append(f'{key}={val}') + return ', '.join(ret) attr = attr.rstrip() return attr - def set_hashing(self, val: bool) -> None: + def do_hashing(self, val: bool) -> None: """hash files when indexing""" self.hash = val ############################################################### # node creation ############################################################### - def new_top_node(self) -> anytree.AnyNode: + def new_top_node(self) -> Node: """create a new top node""" - return anytree.AnyNode(name=self.NAME_TOP, type=self.TYPE_TOP) + return Node(cnode.NAME_TOP, + cnode.TYPE_TOP) def new_file_node(self, name: str, path: str, - parent: str, storagepath: str) -> anytree.AnyNode: + parent: Node, storagepath: str) -> Optional[Node]: """create a new node representing a file""" if not os.path.exists(path): Logger.err(f'File \"{path}\" does not exist') @@ -207,7 +208,7 @@ class Noder: relpath = os.sep.join([storagepath, name]) maccess = os.path.getmtime(path) - node = self._new_generic_node(name, self.TYPE_FILE, + node = self._new_generic_node(name, cnode.TYPE_FILE, relpath, parent, size=stat.st_size, md5=md5, @@ -223,98 +224,107 @@ class Noder: return node def new_dir_node(self, name: str, path: str, - parent: str, storagepath: str) -> anytree.AnyNode: + parent: Node, storagepath: str) -> Node: """create a new node representing a directory""" path = os.path.abspath(path) relpath = os.sep.join([storagepath, name]) maccess = os.path.getmtime(path) - return self._new_generic_node(name, self.TYPE_DIR, relpath, + return self._new_generic_node(name, cnode.TYPE_DIR, relpath, parent, maccess=maccess) def new_storage_node(self, name: str, path: str, parent: str, - attr: Optional[str] = None) -> anytree.AnyNode: + attrs: str = '') -> Node: """create a new node representing a storage""" path = os.path.abspath(path) free = shutil.disk_usage(path).free total = shutil.disk_usage(path).total epoch = int(time.time()) - return anytree.AnyNode(name=name, type=self.TYPE_STORAGE, free=free, - total=total, parent=parent, attr=attr, ts=epoch) + return Node(name=name, + type=cnode.TYPE_STORAGE, + free=free, + total=total, + parent=parent, + attr=attrs, + indexed_dt=epoch) def new_archive_node(self, name: str, path: str, - parent: str, archive: str) -> anytree.AnyNode: + parent: str, archive: str) -> Node: """create a new node for archive data""" - return anytree.AnyNode(name=name, type=self.TYPE_ARC, relpath=path, - parent=parent, size=0, md5=None, - archive=archive) + return Node(name=name, type=cnode.TYPE_ARC, relpath=path, + parent=parent, size=0, md5='', + archive=archive) @staticmethod - def _new_generic_node(name: str, nodetype: str, - relpath: str, parent: str, + def _new_generic_node(name: str, + nodetype: str, + relpath: str, + parent: Node, size: float = 0, md5: str = '', - maccess: float = 0) -> anytree.AnyNode: + maccess: float = 0) -> Node: """generic node creation""" - return anytree.AnyNode(name=name, type=nodetype, relpath=relpath, - parent=parent, size=size, - md5=md5, maccess=maccess) + return Node(name, + nodetype, + size=size, + relpath=relpath, + md5=md5, + maccess=maccess, + parent=parent) ############################################################### # node management ############################################################### - def update_metanode(self, top: anytree.AnyNode) -> anytree.AnyNode: + def update_metanode(self, top: Node) -> Node: """create or update meta node information""" meta = self._get_meta_node(top) epoch = int(time.time()) if not meta: - attr: Dict[str, Any] = {} - attr['created'] = epoch - attr['created_version'] = VERSION - meta = anytree.AnyNode(name=self.NAME_META, - type=self.TYPE_META, - attr=attr) - meta.attr['access'] = epoch - meta.attr['access_version'] = VERSION + attrs: Dict[str, Any] = {} + attrs['created'] = epoch + attrs['created_version'] = VERSION + meta = Node(name=cnode.NAME_META, + type=cnode.TYPE_META, + attr=self.attrs_to_string(attrs)) + if meta.attr: + meta.attr += ', ' + meta.attr += f'access={epoch}' + meta.attr += ', ' + meta.attr += f'access_version={VERSION}' return meta - def _get_meta_node(self, top: anytree.AnyNode) -> anytree.AnyNode: + def _get_meta_node(self, top: Node) -> Optional[Node]: """return the meta node if any""" try: - return next(filter(lambda x: x.type == self.TYPE_META, - top.children)) + found = next(filter(lambda x: x.type == cnode.TYPE_META, + top.children)) + return cast(Node, found) except StopIteration: return None - def clean_not_flagged(self, top: anytree.AnyNode) -> int: + def clean_not_flagged(self, top: Node) -> int: """remove any node not flagged and clean flags""" cnt = 0 for node in anytree.PreOrderIter(top): - if node.type not in [self.TYPE_FILE, self.TYPE_DIR]: + if node.type not in [cnode.TYPE_FILE, cnode.TYPE_DIR]: continue if self._clean(node): cnt += 1 return cnt - @staticmethod - def flag(node: anytree.AnyNode) -> None: - """flag a node""" - node.flag = True - - def _clean(self, node: anytree.AnyNode) -> bool: + def _clean(self, node: Node) -> bool: """remove node if not flagged""" - if not self._has_attr(node, 'flag') or \ - not node.flag: + if not node.flagged(): node.parent = None return True - del node.flag + node.unflag() return False ############################################################### # printing ############################################################### - def _node_to_csv(self, node: anytree.AnyNode, + def _node_to_csv(self, node: Node, sep: str = ',', raw: bool = False) -> None: """ @@ -323,13 +333,13 @@ class Noder: @sep: CSV separator character @raw: print raw size rather than human readable """ - if not node: + if not cnode: return - if node.type == self.TYPE_TOP: + if node.type == node.TYPE_TOP: return out = [] - if node.type == self.TYPE_STORAGE: + if node.type == node.TYPE_STORAGE: # handle storage out.append(node.name) # name out.append(node.type) # type @@ -364,7 +374,7 @@ class Noder: out.append(node.md5) # md5 else: out.append('') # fake md5 - if node.type == self.TYPE_DIR: + if node.type == cnode.TYPE_DIR: out.append(str(len(node.children))) # nbfiles else: out.append('') # fake nbfiles @@ -376,7 +386,7 @@ class Noder: if len(line) > 0: Logger.stdout_nocolor(line) - def _print_node_native(self, node: anytree.AnyNode, + def _print_node_native(self, node: Node, pre: str = '', withpath: bool = False, withdepth: bool = False, @@ -393,10 +403,10 @@ class Noder: @recalcparent: get relpath from tree instead of relpath field @raw: print raw size rather than human readable """ - if node.type == self.TYPE_TOP: + if node.type == cnode.TYPE_TOP: # top node Logger.stdout_nocolor(f'{pre}{node.name}') - elif node.type == self.TYPE_FILE: + elif node.type == cnode.TYPE_FILE: # node of type file name = node.name if withpath: @@ -416,7 +426,7 @@ class Noder: content = Logger.get_bold_text(storage.name) compl += f', storage:{content}' NodePrinter.print_file_native(pre, name, compl) - elif node.type == self.TYPE_DIR: + elif node.type == cnode.TYPE_DIR: # node of type directory name = node.name if withpath: @@ -436,7 +446,7 @@ class Noder: if withstorage: attr.append(('storage', Logger.get_bold_text(storage.name))) NodePrinter.print_dir_native(pre, name, depth=depth, attr=attr) - elif node.type == self.TYPE_STORAGE: + elif node.type == cnode.TYPE_STORAGE: # node of type storage sztotal = size_to_str(node.total, raw=raw) szused = size_to_str(node.total - node.free, raw=raw) @@ -466,14 +476,14 @@ class Noder: name, argsstring, node.attr) - elif node.type == self.TYPE_ARC: + elif node.type == cnode.TYPE_ARC: # archive node if self.arc: NodePrinter.print_archive_native(pre, node.name, node.archive) else: Logger.err(f'bad node encountered: {node}') - def print_tree(self, node: anytree.AnyNode, + def print_tree(self, node: Node, fmt: str = 'native', raw: bool = False) -> None: """ @@ -497,7 +507,7 @@ class Noder: Logger.stdout_nocolor(self.CSV_HEADER) self._to_csv(node, raw=raw) - def _to_csv(self, node: anytree.AnyNode, + def _to_csv(self, node: Node, raw: bool = False) -> None: """print the tree to csv""" rend = anytree.RenderTree(node, childiter=self._sort_tree) @@ -511,7 +521,7 @@ class Noder: selected = fzf.prompt(strings) return selected - def _to_fzf(self, node: anytree.AnyNode, fmt: str) -> None: + def _to_fzf(self, node: Node, fmt: str) -> None: """ fzf prompt with list and print selected node(s) @node: node to start with @@ -540,7 +550,7 @@ class Noder: self.print_tree(rend, fmt=subfmt) @staticmethod - def to_dot(node: anytree.AnyNode, + def to_dot(node: Node, path: str = 'tree.dot') -> str: """export to dot for graphing""" anytree.exporter.DotExporter(node).to_dotfile(path) @@ -550,14 +560,14 @@ class Noder: ############################################################### # searching ############################################################### - def find_name(self, top: anytree.AnyNode, + def find_name(self, top: Node, key: str, script: bool = False, only_dir: bool = False, - startnode: anytree.AnyNode = None, + startnode: Optional[Node] = None, parentfromtree: bool = False, fmt: str = 'native', - raw: bool = False) -> List[anytree.AnyNode]: + raw: bool = False) -> List[Node]: """ find files based on their names @top: top node @@ -573,7 +583,7 @@ class Noder: self._debug(f'searching for \"{key}\"') # search for nodes based on path - start = top + start: Optional[Node] = top if startnode: start = self.get_node(top, startnode) filterfunc = self._callback_find_name(key, only_dir) @@ -626,17 +636,17 @@ class Noder: def _callback_find_name(self, term: str, only_dir: bool) -> Any: """callback for finding files""" - def find_name(node: anytree.AnyNode) -> bool: - if node.type == self.TYPE_STORAGE: + def find_name(node: Node) -> bool: + if node.type == cnode.TYPE_STORAGE: # ignore storage nodes return False - if node.type == self.TYPE_TOP: + if node.type == cnode.TYPE_TOP: # ignore top nodes return False - if node.type == self.TYPE_META: + if node.type == cnode.TYPE_META: # ignore meta nodes return False - if only_dir and node.type != self.TYPE_DIR: + if only_dir and node.type != cnode.TYPE_DIR: # ignore non directory return False @@ -653,11 +663,11 @@ class Noder: ############################################################### # ls ############################################################### - def list(self, top: anytree.AnyNode, + def list(self, top: Node, path: str, rec: bool = False, fmt: str = 'native', - raw: bool = False) -> List[anytree.AnyNode]: + raw: bool = False) -> List[Node]: """ list nodes for "ls" @top: top node @@ -719,7 +729,7 @@ class Noder: # tree creation ############################################################### def _add_entry(self, name: str, - top: anytree.AnyNode, + top: Node, resolv: Any) -> None: """add an entry to the tree""" entries = name.rstrip(os.sep).split(os.sep) @@ -734,7 +744,7 @@ class Noder: except anytree.resolver.ChildResolverError: self.new_archive_node(nodename, name, top, top.name) - def list_to_tree(self, parent: anytree.AnyNode, names: List[str]) -> None: + def list_to_tree(self, parent: Node, names: List[str]) -> None: """convert list of files to a tree""" if not names: return @@ -747,23 +757,23 @@ class Noder: # diverse ############################################################### def _sort_tree(self, - items: List[anytree.AnyNode]) -> List[anytree.AnyNode]: + items: List[Node]) -> List[Node]: """sorting a list of items""" return sorted(items, key=self._sort, reverse=self.sortsize) - def _sort(self, lst: List[anytree.AnyNode]) -> Any: + def _sort(self, lst: Node) -> Any: """sort a list""" if self.sortsize: return self._sort_size(lst) return self._sort_fs(lst) @staticmethod - def _sort_fs(node: anytree.AnyNode) -> Tuple[str, str]: + def _sort_fs(node: Node) -> Tuple[str, str]: """sorting nodes dir first and alpha""" return (node.type, node.name.lstrip('.').lower()) @staticmethod - def _sort_size(node: anytree.AnyNode) -> float: + def _sort_size(node: Node) -> float: """sorting nodes by size""" try: if not node.size: @@ -772,22 +782,22 @@ class Noder: except AttributeError: return 0 - def _get_storage(self, node: anytree.AnyNode) -> anytree.AnyNode: + def _get_storage(self, node: Node) -> Node: """recursively traverse up to find storage""" - if node.type == self.TYPE_STORAGE: + if node.type == cnode.TYPE_STORAGE: return node - return node.ancestors[1] + return cast(Node, node.ancestors[1]) @staticmethod - def _has_attr(node: anytree.AnyNode, attr: str) -> bool: + def _has_attr(node: Node, attr: str) -> bool: """return True if node has attr as attribute""" return attr in node.__dict__.keys() - def _get_parents(self, node: anytree.AnyNode) -> str: + def _get_parents(self, node: Node) -> str: """get all parents recursively""" - if node.type == self.TYPE_STORAGE: + if node.type == cnode.TYPE_STORAGE: return '' - if node.type == self.TYPE_TOP: + if node.type == cnode.TYPE_TOP: return '' parent = self._get_parents(node.parent) if parent: @@ -804,7 +814,7 @@ class Noder: return '' @staticmethod - def _sanitize(node: anytree.AnyNode) -> anytree.AnyNode: + def _sanitize(node: Node) -> Node: """sanitize node strings""" node.name = fix_badchars(node.name) node.relpath = fix_badchars(node.relpath) diff --git a/catcli/utils.py b/catcli/utils.py index 38a257e..dd058e8 100644 --- a/catcli/utils.py +++ b/catcli/utils.py @@ -57,12 +57,12 @@ def size_to_str(size: float, return f'{size:.1f}{sufix}' -def epoch_to_str(epoch: int) -> str: +def epoch_to_str(epoch: float) -> str: """convert epoch to string""" if not epoch: return '' fmt = '%Y-%m-%d %H:%M:%S' - timestamp = datetime.datetime.fromtimestamp(float(epoch)) + timestamp = datetime.datetime.fromtimestamp(epoch) return timestamp.strftime(fmt) diff --git a/catcli/walker.py b/catcli/walker.py index 4d9f79d..e263fce 100644 --- a/catcli/walker.py +++ b/catcli/walker.py @@ -6,10 +6,10 @@ Catcli filesystem indexer """ import os -from typing import Tuple -import anytree # type: ignore +from typing import Tuple, Optional # local imports +from catcli.cnode import Node from catcli.noder import Noder from catcli.logger import Logger @@ -31,12 +31,12 @@ class Walker: """ self.noder = noder self.usehash = usehash - self.noder.set_hashing(self.usehash) + self.noder.do_hashing(self.usehash) self.debug = debug self.lpath = logpath def index(self, path: str, - parent: str, + parent: Node, name: str, storagepath: str = '') -> Tuple[str, int]: """ @@ -89,15 +89,15 @@ class Walker: self._progress('') return parent, cnt - def reindex(self, path: str, parent: str, top: str) -> int: + def reindex(self, path: str, parent: Node, top: Node) -> int: """reindex a directory and store in tree""" cnt = self._reindex(path, parent, top) cnt += self.noder.clean_not_flagged(parent) return cnt def _reindex(self, path: str, - parent: str, - top: anytree.AnyNode, + parent: Node, + top: Node, storagepath: str = '') -> int: """ reindex a directory and store in tree @@ -115,13 +115,15 @@ class Walker: reindex, node = self._need_reindex(parent, sub, treepath) if not reindex: self._debug(f'\tskip file {sub}') - self.noder.flag(node) + if node: + node.flag() continue self._log2file(f'update catalog for \"{sub}\"') node = self.noder.new_file_node(os.path.basename(file), sub, parent, storagepath) - self.noder.flag(node) - cnt += 1 + if node: + node.flag() + cnt += 1 for adir in dirs: self._debug(f'found dir \"{adir}\" under {path}') base = os.path.basename(adir) @@ -133,40 +135,42 @@ class Walker: dummy = self.noder.new_dir_node(base, sub, parent, storagepath) cnt += 1 - self.noder.flag(dummy) + if dummy: + dummy.flag() self._debug(f'reindexing deeper under {sub}') nstoragepath = os.sep.join([storagepath, base]) if not storagepath: nstoragepath = base - cnt2 = self._reindex(sub, dummy, top, nstoragepath) - cnt += cnt2 + if dummy: + cnt2 = self._reindex(sub, dummy, top, nstoragepath) + cnt += cnt2 break return cnt def _need_reindex(self, - top: anytree.AnyNode, + top: Node, path: str, - treepath: str) -> Tuple[bool, anytree.AnyNode]: + treepath: str) -> Tuple[bool, Optional[Node]]: """ test if node needs re-indexing @top: top node (storage) @path: abs path to file @treepath: rel path from indexed directory """ - cnode, changed = self.noder.get_node_if_changed(top, path, treepath) - if not cnode: + node, changed = self.noder.get_node_if_changed(top, path, treepath) + if not node: self._debug(f'\t{path} does not exist') - return True, cnode - if cnode and not changed: + return True, node + if node and not changed: # ignore this node self._debug(f'\t{path} has not changed') - return False, cnode - if cnode and changed: + return False, node + if node and changed: # remove this node and re-add self._debug(f'\t{path} has changed') - self._debug(f'\tremoving node {cnode.name} for {path}') - cnode.parent = None - return True, cnode + self._debug(f'\tremoving node {node.name} for {path}') + node.parent = None + return True, node def _debug(self, string: str) -> None: """print to debug""" diff --git a/tests/test_update.py b/tests/test_update.py index fc478e7..1107b45 100644 --- a/tests/test_update.py +++ b/tests/test_update.py @@ -46,7 +46,7 @@ class TestUpdate(unittest.TestCase): d2f2 = create_rnd_file(dir2, 'dir2file2') noder = Noder(debug=True) - noder.set_hashing(True) + noder.do_hashing(True) top = noder.new_top_node() catalog = Catalog(catalogpath, force=True, debug=False)