diff --git a/catcli/catalog.py b/catcli/catalog.py index 2744430..dfdc696 100644 --- a/catcli/catalog.py +++ b/catcli/catalog.py @@ -102,7 +102,7 @@ class Catalog: if root.type != nodes.TYPE_TOP: return None top = NodeTop(root.name, children=root.children) - self._debug(f'top imported: {top.name}') + self._debug(f'top imported: {top.get_name()}') return top diff --git a/catcli/catcli.py b/catcli/catcli.py index 55b20a4..3f926f2 100755 --- a/catcli/catcli.py +++ b/catcli/catcli.py @@ -25,7 +25,8 @@ from catcli.colors import Colors from catcli.catalog import Catalog from catcli.walker import Walker from catcli.noder import Noder -from catcli.utils import ask, edit, path_to_search_all +from catcli.utils import ask, edit +from catcli.nodes_utils import path_to_search_all from catcli.exceptions import BadFormatException, CatcliException NAME = 'catcli' @@ -241,7 +242,7 @@ def cmd_find(args: Dict[str, Any], script = args['--script'] search_for = args[''] if args['--verbose']: - Logger.debug(f'search for \"{search_for}\" under \"{top.name}\"') + Logger.debug(f'search for "{search_for}" under "{top.get_name()}"') found = noder.find(top, search_for, script=script, startnode=startpath, @@ -279,10 +280,10 @@ def cmd_rename(args: Dict[str, Any], """rename action""" storage = args[''] new = args[''] - storages = list(x.name for x in top.children) + storages = list(x.get_name() for x in top.children) if storage in storages: - node = next(filter(lambda x: x.name == storage, top.children)) - node.name = new + node = next(filter(lambda x: x.get_name() == storage, top.children)) + node.set_name(new) if catalog.save(top): msg = f'Storage \"{storage}\" renamed to \"{new}\"' Logger.info(msg) @@ -296,9 +297,9 @@ def cmd_edit(args: Dict[str, Any], top: NodeTop) -> None: """edit action""" storage = args[''] - storages = list(x.name for x in top.children) + storages = list(x.get_name() for x in top.children) if storage in storages: - node = next(filter(lambda x: x.name == storage, top.children)) + node = next(filter(lambda x: x.get_name() == storage, top.children)) attr = node.attr if not attr: attr = '' diff --git a/catcli/fuser.py b/catcli/fuser.py index afbcfd2..9c525f8 100644 --- a/catcli/fuser.py +++ b/catcli/fuser.py @@ -17,7 +17,7 @@ except ModuleNotFoundError: # local imports from catcli.noder import Noder from catcli.nodes import NodeTop, NodeAny -from catcli.utils import path_to_search_all, path_to_top +from catcli.nodes_utils import path_to_search_all, path_to_top from catcli import nodes @@ -129,5 +129,5 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore content = ['.', '..'] entries = self._get_entries(path) for entry in entries: - content.append(entry.name) + content.append(entry.get_name()) return content diff --git a/catcli/noder.py b/catcli/noder.py index 57ef430..25b5764 100644 --- a/catcli/noder.py +++ b/catcli/noder.py @@ -18,7 +18,7 @@ from catcli import nodes from catcli.nodes import NodeAny, NodeStorage, \ NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta, \ typcast_node -from catcli.utils import md5sum, fix_badchars, has_attr +from catcli.utils import md5sum from catcli.logger import Logger from catcli.printer_native import NativePrinter from catcli.printer_csv import CsvPrinter @@ -117,7 +117,7 @@ class Noder: return node, False # force re-indexing if no maccess maccess = os.path.getmtime(path) - if not has_attr(node, 'maccess') or \ + if not node.has_attr('maccess') or \ not node.maccess: self._debug('\tchange: no maccess found') return node, True @@ -336,7 +336,7 @@ class Noder: typcast_node(node) if node.type == nodes.TYPE_TOP: # top node - self.native_printer.print_top(pre, node.name) + self.native_printer.print_top(pre, node.get_name()) elif node.type == nodes.TYPE_FILE: # node of type file self.native_printer.print_file(pre, node, @@ -420,7 +420,7 @@ class Noder: continue parents = rend.get_fullpath() storage = rend.get_storage_node() - fullpath = os.path.join(storage.name, parents) + fullpath = os.path.join(storage.get_name(), parents) the_nodes[fullpath] = rend # prompt with fzf paths = self._fzf_prompt(the_nodes.keys()) @@ -477,7 +477,7 @@ class Noder: paths = {} for item in found: typcast_node(item) - item.name = fix_badchars(item.name) + item.set_name(item.get_name()) key = item.get_fullpath() paths[key] = item @@ -574,7 +574,7 @@ class Noder: @fmt: output format @raw: print raw size """ - self._debug(f'ls walking path: \"{path}\" from \"{top.name}\"') + self._debug(f'ls walking path: \"{path}\" from \"{top.get_name()}\"') resolv = anytree.resolver.Resolver('name') found = [] try: @@ -633,7 +633,7 @@ class Noder: path: str, raw: bool = False) -> List[NodeAny]: """disk usage""" - self._debug(f'du walking path: \"{path}\" from \"{top.name}\"') + self._debug(f'du walking path: \"{path}\" from \"{top.get_name()}\"') resolv = anytree.resolver.Resolver('name') found: NodeAny try: @@ -660,15 +660,15 @@ class Noder: """add an entry to the tree""" entries = name.rstrip(os.sep).split(os.sep) if len(entries) == 1: - self.new_archive_node(name, top, top.name) + self.new_archive_node(name, top, top.get_name()) return sub = os.sep.join(entries[:-1]) nodename = entries[-1] try: parent = resolv.get(top, sub) - parent = self.new_archive_node(nodename, parent, top.name) + parent = self.new_archive_node(nodename, parent, top.get_name()) except anytree.resolver.ChildResolverError: - self.new_archive_node(nodename, top, top.name) + self.new_archive_node(nodename, top, top.get_name()) def list_to_tree(self, parent: NodeAny, names: List[str]) -> None: """convert list of files to a tree""" diff --git a/catcli/nodes.py b/catcli/nodes.py index 3f19553..97acab3 100644 --- a/catcli/nodes.py +++ b/catcli/nodes.py @@ -11,6 +11,7 @@ from typing import Dict, Any, cast from anytree import NodeMixin from catcli.exceptions import CatcliException +from catcli.utils import fix_badchars TYPE_TOP = 'top' @@ -58,6 +59,18 @@ class NodeAny(NodeMixin): # type: ignore if children: self.children = children + def get_name(self) -> str: + """get node name""" + return fix_badchars(self.name) + + def set_name(self, name: str) -> None: + """set node name""" + self.name = fix_badchars(name) + + def has_attr(self, attr: str) -> bool: + """return True if node has attr as attribute""" + return attr in self.__dict__ + def may_have_children(self) -> bool: """can node contains sub""" raise NotImplementedError @@ -75,12 +88,12 @@ class NodeAny(NodeMixin): # type: ignore def get_fullpath(self) -> str: """return full path to this node""" - path = self.name + path = self.get_name() if self.parent: typcast_node(self.parent) ppath = self.parent.get_fullpath() path = os.path.join(ppath, path) - return str(path) + return fix_badchars(path) def get_rec_size(self) -> int: """recursively traverse tree and return size""" diff --git a/catcli/nodes_utils.py b/catcli/nodes_utils.py new file mode 100644 index 0000000..e43fdc9 --- /dev/null +++ b/catcli/nodes_utils.py @@ -0,0 +1,39 @@ +""" +author: deadc0de6 (https://github.com/deadc0de6) +Copyright (c) 2024, deadc0de6 + +nodes helpers +""" + +import os + +# local imports +from catcli import nodes + + +def path_to_top(path: str) -> str: + """path pivot under top""" + pre = f"{os.path.sep}{nodes.NAME_TOP}" + if not path.startswith(pre): + # prepend with top node path + path = pre + path + return path + + +def path_to_search_all(path: str) -> str: + """path to search for all subs""" + if not path: + path = os.path.sep + if not path.startswith(os.path.sep): + path = os.path.sep + path + pre = f"{os.path.sep}{nodes.NAME_TOP}" + if not path.startswith(pre): + # prepend with top node path + path = pre + path + # if not path.endswith(os.path.sep): + # # ensure ends with a separator + # path += os.path.sep + # if not path.endswith(WILD): + # # add wild card + # path += WILD + return path diff --git a/catcli/printer_csv.py b/catcli/printer_csv.py index eed7dc5..ae6068b 100644 --- a/catcli/printer_csv.py +++ b/catcli/printer_csv.py @@ -9,8 +9,7 @@ import sys from typing import List from catcli.nodes import NodeAny, NodeStorage, TYPE_DIR -from catcli.utils import size_to_str, epoch_to_str, \ - has_attr +from catcli.utils import size_to_str, epoch_to_str class CsvPrinter: @@ -35,7 +34,7 @@ class CsvPrinter: raw: bool = False) -> None: """print a storage node""" out = [] - out.append(node.name) # name + out.append(node.get_name()) # name out.append(node.type) # type out.append('') # fake full path size = node.get_rec_size() @@ -56,7 +55,7 @@ class CsvPrinter: raw: bool = False) -> None: """print other nodes""" out = [] - out.append(node.name.replace('"', '""')) # name + out.append(node.get_name().replace('"', '""')) # name out.append(node.type) # type fullpath = node.get_fullpath() out.append(fullpath.replace('"', '""')) # full path @@ -64,11 +63,11 @@ class CsvPrinter: out.append(size_to_str(node.nodesize, raw=raw)) # size storage = node.get_storage_node() out.append(epoch_to_str(storage.ts)) # indexed_at - if has_attr(node, 'maccess'): + if node.has_attr('maccess'): out.append(epoch_to_str(node.maccess)) # maccess else: out.append('') # fake maccess - if has_attr(node, 'md5'): + if node.has_attr('md5'): out.append(node.md5) # md5 else: out.append('') # fake md5 diff --git a/catcli/printer_native.py b/catcli/printer_native.py index dc3d6bb..8954b5a 100644 --- a/catcli/printer_native.py +++ b/catcli/printer_native.py @@ -12,7 +12,7 @@ from catcli.nodes import NodeFile, NodeDir, \ from catcli.colors import Colors from catcli.logger import Logger from catcli.utils import fix_badchars, size_to_str, \ - has_attr, epoch_to_str + epoch_to_str COLOR_STORAGE = Colors.YELLOW @@ -54,8 +54,7 @@ class NativePrinter: raw: bool = False) -> None: """print a storage node""" # construct name - name = node.name - name = fix_badchars(name) + name = node.get_name() # construct attrs attrs = [] # nb files @@ -74,7 +73,7 @@ class NativePrinter: szused = size_to_str(node.total - node.free, raw=raw) attrs.append(f'du:{szused}/{sztotal}') # timestamp - if has_attr(node, 'ts'): + if node.has_attr('ts'): attrs.append(f'date:{epoch_to_str(node.ts)}') # print @@ -91,7 +90,7 @@ class NativePrinter: raw: bool = False) -> None: """print a file node""" # construct name - name = node.name + name = node.get_name() storage = node.get_storage_node() if withpath: name = node.get_fullpath() @@ -100,7 +99,7 @@ class NativePrinter: if node.md5: attrs.append(f'md5:{node.md5}') if withstorage: - content = Logger.get_bold_text(storage.name) + content = Logger.get_bold_text(storage.get_name()) attrs.append(f'storage:{content}') # print out = [] @@ -111,7 +110,7 @@ class NativePrinter: size = node.nodesize line = size_to_str(size, raw=raw) out.append(f'{COLOR_SIZE}{line}{Colors.RESET}') - if has_attr(node, 'maccess'): + if node.has_attr('maccess'): line = epoch_to_str(node.maccess) out.append(f'{COLOR_TS}{line}{Colors.RESET}') if attrs: @@ -128,7 +127,7 @@ class NativePrinter: raw: bool = False) -> None: """print a directory node""" # construct name - name = node.name + name = node.get_name() storage = node.get_storage_node() if withpath: name = node.get_fullpath() @@ -138,7 +137,7 @@ class NativePrinter: nbchildren = len(node.children) attrs.append(f'{self.NBFILES}:{nbchildren}') if withstorage: - attrs.append(f'storage:{Logger.get_bold_text(storage.name)}') + attrs.append(f"storage:{Logger.get_bold_text(storage.get_name())}") # print out = [] out.append(f'{pre}') @@ -148,7 +147,7 @@ class NativePrinter: size = node.nodesize line = size_to_str(size, raw=raw) out.append(f'{COLOR_SIZE}{line}{Colors.RESET}') - if has_attr(node, 'maccess'): + if node.has_attr('maccess'): line = epoch_to_str(node.maccess) out.append(f'{COLOR_TS}{line}{Colors.RESET}') if attrs: diff --git a/catcli/utils.py b/catcli/utils.py index 7fef081..228fb49 100644 --- a/catcli/utils.py +++ b/catcli/utils.py @@ -10,43 +10,15 @@ import hashlib import tempfile import subprocess import datetime +import string # local imports -from catcli import nodes from catcli.exceptions import CatcliException WILD = '*' -def path_to_top(path: str) -> str: - """path pivot under top""" - pre = f'{os.path.sep}{nodes.NAME_TOP}' - if not path.startswith(pre): - # prepend with top node path - path = pre + path - return path - - -def path_to_search_all(path: str) -> str: - """path to search for all subs""" - if not path: - path = os.path.sep - if not path.startswith(os.path.sep): - path = os.path.sep + path - pre = f'{os.path.sep}{nodes.NAME_TOP}' - if not path.startswith(pre): - # prepend with top node path - path = pre + path - # if not path.endswith(os.path.sep): - # # ensure ends with a separator - # path += os.path.sep - # if not path.endswith(WILD): - # # add wild card - # path += WILD - return path - - def md5sum(path: str) -> str: """ calculate md5 sum of a file @@ -101,24 +73,20 @@ def ask(question: str) -> bool: return resp.lower() == 'y' -def edit(string: str) -> str: +def edit(data: str) -> str: """edit the information with the default EDITOR""" - data = string.encode('utf-8') + content = fix_badchars(data) editor = os.environ.get('EDITOR', 'vim') with tempfile.NamedTemporaryFile(prefix='catcli', suffix='.tmp') as file: - file.write(data) + file.write(content.encode('utf-8')) file.flush() - subprocess.call([editor, file.name]) + subprocess.call([editor, file.get_name()]) file.seek(0) new = file.read() return new.decode('utf-8') -def fix_badchars(string: str) -> str: +def fix_badchars(data: str) -> str: """fix none utf-8 chars in string""" - return string.encode('utf-8', 'ignore').decode('utf-8') - - -def has_attr(node: nodes.NodeAny, attr: str) -> bool: - """return True if node has attr as attribute""" - return attr in node.__dict__.keys() + data = "".join(x for x in data if x in string.printable) + return data.encode("utf-8", "ignore").decode("utf-8") diff --git a/catcli/walker.py b/catcli/walker.py index d57f5cd..3c0d6b6 100644 --- a/catcli/walker.py +++ b/catcli/walker.py @@ -171,7 +171,7 @@ class Walker: if node and changed: # remove this node and re-add self._debug(f'\t{path} has changed') - self._debug(f'\tremoving node {node.name} for {path}') + self._debug(f"\tremoving node {node.get_name()} for {path}") node.parent = None return True, node diff --git a/tests/test_index.py b/tests/test_index.py index 46734de..d8680ea 100644 --- a/tests/test_index.py +++ b/tests/test_index.py @@ -62,7 +62,7 @@ class TestIndexing(unittest.TestCase): self.assertTrue(len(storage.children) == 5) # ensures files and directories are in - names = [x.name for x in storage.children] + names = [x.get_name() for x in storage.children] self.assertTrue(os.path.basename(file1) in names) self.assertTrue(os.path.basename(file2) in names) self.assertTrue(os.path.basename(file3) in names) @@ -70,9 +70,9 @@ class TestIndexing(unittest.TestCase): self.assertTrue(os.path.basename(dir2) in names) for node in storage.children: - if node.name == os.path.basename(dir1): + if node.get_name() == os.path.basename(dir1): self.assertTrue(len(node.children) == 2) - elif node.name == os.path.basename(dir2): + elif node.get_name() == os.path.basename(dir2): self.assertTrue(len(node.children) == 1) diff --git a/tests/test_update.py b/tests/test_update.py index d70c564..053ce00 100644 --- a/tests/test_update.py +++ b/tests/test_update.py @@ -151,7 +151,7 @@ class TestUpdate(unittest.TestCase): self.assertTrue(nod.md5 == d2f2_md5_new) # ensures files and directories are in - names = [node.name for node in anytree.PreOrderIter(storage)] + names = [node.get_name() for node in anytree.PreOrderIter(storage)] print(names) self.assertTrue(os.path.basename(file1) in names) self.assertTrue(os.path.basename(file2) in names) @@ -169,13 +169,13 @@ class TestUpdate(unittest.TestCase): self.assertTrue(os.path.basename(new5) in names) for node in storage.children: - if node.name == os.path.basename(dir1): + if node.get_name() == os.path.basename(dir1): self.assertTrue(len(node.children) == 3) - elif node.name == os.path.basename(dir2): + elif node.get_name() == os.path.basename(dir2): self.assertTrue(len(node.children) == 3) - elif node.name == os.path.basename(new3): + elif node.get_name() == os.path.basename(new3): self.assertTrue(len(node.children) == 0) - elif node.name == os.path.basename(new4): + elif node.get_name() == os.path.basename(new4): self.assertTrue(len(node.children) == 1) self.assertTrue(read_from_file(d1f1) == editval) @@ -189,7 +189,7 @@ class TestUpdate(unittest.TestCase): cmd_update(args, noder, catalog, top) # ensures files and directories are (not) in - names = [node.name for node in anytree.PreOrderIter(storage)] + names = [node.get_name() for node in anytree.PreOrderIter(storage)] print(names) self.assertTrue(os.path.basename(file1) in names) self.assertTrue(os.path.basename(file2) in names) @@ -207,9 +207,9 @@ class TestUpdate(unittest.TestCase): self.assertTrue(os.path.basename(new4) not in names) self.assertTrue(os.path.basename(new5) not in names) for node in storage.children: - if node.name == os.path.basename(dir1): + if node.get_name() == os.path.basename(dir1): self.assertTrue(len(node.children) == 2) - elif node.name == os.path.basename(new3): + elif node.get_name() == os.path.basename(new3): self.assertTrue(len(node.children) == 0)