master
deadc0de6 3 months ago
parent e6ca6e2fcc
commit c9b4043e5f

@ -102,7 +102,7 @@ class Catalog:
if root.type != nodes.TYPE_TOP: if root.type != nodes.TYPE_TOP:
return None return None
top = NodeTop(root.name, children=root.children) top = NodeTop(root.name, children=root.children)
self._debug(f'top imported: {top.name}') self._debug(f'top imported: {top.get_name()}')
return top return top

@ -25,7 +25,8 @@ from catcli.colors import Colors
from catcli.catalog import Catalog from catcli.catalog import Catalog
from catcli.walker import Walker from catcli.walker import Walker
from catcli.noder import Noder from catcli.noder import Noder
from catcli.utils import ask, edit, path_to_search_all from catcli.utils import ask, edit
from catcli.nodes_utils import path_to_search_all
from catcli.exceptions import BadFormatException, CatcliException from catcli.exceptions import BadFormatException, CatcliException
NAME = 'catcli' NAME = 'catcli'
@ -241,7 +242,7 @@ def cmd_find(args: Dict[str, Any],
script = args['--script'] script = args['--script']
search_for = args['<term>'] search_for = args['<term>']
if args['--verbose']: if args['--verbose']:
Logger.debug(f'search for \"{search_for}\" under \"{top.name}\"') Logger.debug(f'search for "{search_for}" under "{top.get_name()}"')
found = noder.find(top, search_for, found = noder.find(top, search_for,
script=script, script=script,
startnode=startpath, startnode=startpath,
@ -279,10 +280,10 @@ def cmd_rename(args: Dict[str, Any],
"""rename action""" """rename action"""
storage = args['<storage>'] storage = args['<storage>']
new = args['<name>'] new = args['<name>']
storages = list(x.name for x in top.children) storages = list(x.get_name() for x in top.children)
if storage in storages: if storage in storages:
node = next(filter(lambda x: x.name == storage, top.children)) node = next(filter(lambda x: x.get_name() == storage, top.children))
node.name = new node.set_name(new)
if catalog.save(top): if catalog.save(top):
msg = f'Storage \"{storage}\" renamed to \"{new}\"' msg = f'Storage \"{storage}\" renamed to \"{new}\"'
Logger.info(msg) Logger.info(msg)
@ -296,9 +297,9 @@ def cmd_edit(args: Dict[str, Any],
top: NodeTop) -> None: top: NodeTop) -> None:
"""edit action""" """edit action"""
storage = args['<storage>'] storage = args['<storage>']
storages = list(x.name for x in top.children) storages = list(x.get_name() for x in top.children)
if storage in storages: if storage in storages:
node = next(filter(lambda x: x.name == storage, top.children)) node = next(filter(lambda x: x.get_name() == storage, top.children))
attr = node.attr attr = node.attr
if not attr: if not attr:
attr = '' attr = ''

@ -17,7 +17,7 @@ except ModuleNotFoundError:
# local imports # local imports
from catcli.noder import Noder from catcli.noder import Noder
from catcli.nodes import NodeTop, NodeAny from catcli.nodes import NodeTop, NodeAny
from catcli.utils import path_to_search_all, path_to_top from catcli.nodes_utils import path_to_search_all, path_to_top
from catcli import nodes from catcli import nodes
@ -129,5 +129,5 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
content = ['.', '..'] content = ['.', '..']
entries = self._get_entries(path) entries = self._get_entries(path)
for entry in entries: for entry in entries:
content.append(entry.name) content.append(entry.get_name())
return content return content

@ -18,7 +18,7 @@ from catcli import nodes
from catcli.nodes import NodeAny, NodeStorage, \ from catcli.nodes import NodeAny, NodeStorage, \
NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta, \ NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta, \
typcast_node typcast_node
from catcli.utils import md5sum, fix_badchars, has_attr from catcli.utils import md5sum
from catcli.logger import Logger from catcli.logger import Logger
from catcli.printer_native import NativePrinter from catcli.printer_native import NativePrinter
from catcli.printer_csv import CsvPrinter from catcli.printer_csv import CsvPrinter
@ -117,7 +117,7 @@ class Noder:
return node, False return node, False
# force re-indexing if no maccess # force re-indexing if no maccess
maccess = os.path.getmtime(path) maccess = os.path.getmtime(path)
if not has_attr(node, 'maccess') or \ if not node.has_attr('maccess') or \
not node.maccess: not node.maccess:
self._debug('\tchange: no maccess found') self._debug('\tchange: no maccess found')
return node, True return node, True
@ -336,7 +336,7 @@ class Noder:
typcast_node(node) typcast_node(node)
if node.type == nodes.TYPE_TOP: if node.type == nodes.TYPE_TOP:
# top node # top node
self.native_printer.print_top(pre, node.name) self.native_printer.print_top(pre, node.get_name())
elif node.type == nodes.TYPE_FILE: elif node.type == nodes.TYPE_FILE:
# node of type file # node of type file
self.native_printer.print_file(pre, node, self.native_printer.print_file(pre, node,
@ -420,7 +420,7 @@ class Noder:
continue continue
parents = rend.get_fullpath() parents = rend.get_fullpath()
storage = rend.get_storage_node() storage = rend.get_storage_node()
fullpath = os.path.join(storage.name, parents) fullpath = os.path.join(storage.get_name(), parents)
the_nodes[fullpath] = rend the_nodes[fullpath] = rend
# prompt with fzf # prompt with fzf
paths = self._fzf_prompt(the_nodes.keys()) paths = self._fzf_prompt(the_nodes.keys())
@ -477,7 +477,7 @@ class Noder:
paths = {} paths = {}
for item in found: for item in found:
typcast_node(item) typcast_node(item)
item.name = fix_badchars(item.name) item.set_name(item.get_name())
key = item.get_fullpath() key = item.get_fullpath()
paths[key] = item paths[key] = item
@ -574,7 +574,7 @@ class Noder:
@fmt: output format @fmt: output format
@raw: print raw size @raw: print raw size
""" """
self._debug(f'ls walking path: \"{path}\" from \"{top.name}\"') self._debug(f'ls walking path: \"{path}\" from \"{top.get_name()}\"')
resolv = anytree.resolver.Resolver('name') resolv = anytree.resolver.Resolver('name')
found = [] found = []
try: try:
@ -633,7 +633,7 @@ class Noder:
path: str, path: str,
raw: bool = False) -> List[NodeAny]: raw: bool = False) -> List[NodeAny]:
"""disk usage""" """disk usage"""
self._debug(f'du walking path: \"{path}\" from \"{top.name}\"') self._debug(f'du walking path: \"{path}\" from \"{top.get_name()}\"')
resolv = anytree.resolver.Resolver('name') resolv = anytree.resolver.Resolver('name')
found: NodeAny found: NodeAny
try: try:
@ -660,15 +660,15 @@ class Noder:
"""add an entry to the tree""" """add an entry to the tree"""
entries = name.rstrip(os.sep).split(os.sep) entries = name.rstrip(os.sep).split(os.sep)
if len(entries) == 1: if len(entries) == 1:
self.new_archive_node(name, top, top.name) self.new_archive_node(name, top, top.get_name())
return return
sub = os.sep.join(entries[:-1]) sub = os.sep.join(entries[:-1])
nodename = entries[-1] nodename = entries[-1]
try: try:
parent = resolv.get(top, sub) parent = resolv.get(top, sub)
parent = self.new_archive_node(nodename, parent, top.name) parent = self.new_archive_node(nodename, parent, top.get_name())
except anytree.resolver.ChildResolverError: except anytree.resolver.ChildResolverError:
self.new_archive_node(nodename, top, top.name) self.new_archive_node(nodename, top, top.get_name())
def list_to_tree(self, parent: NodeAny, names: List[str]) -> None: def list_to_tree(self, parent: NodeAny, names: List[str]) -> None:
"""convert list of files to a tree""" """convert list of files to a tree"""

@ -11,6 +11,7 @@ from typing import Dict, Any, cast
from anytree import NodeMixin from anytree import NodeMixin
from catcli.exceptions import CatcliException from catcli.exceptions import CatcliException
from catcli.utils import fix_badchars
TYPE_TOP = 'top' TYPE_TOP = 'top'
@ -58,6 +59,18 @@ class NodeAny(NodeMixin): # type: ignore
if children: if children:
self.children = children self.children = children
def get_name(self) -> str:
"""get node name"""
return fix_badchars(self.name)
def set_name(self, name: str) -> None:
"""set node name"""
self.name = fix_badchars(name)
def has_attr(self, attr: str) -> bool:
"""return True if node has attr as attribute"""
return attr in self.__dict__
def may_have_children(self) -> bool: def may_have_children(self) -> bool:
"""can node contains sub""" """can node contains sub"""
raise NotImplementedError raise NotImplementedError
@ -75,12 +88,12 @@ class NodeAny(NodeMixin): # type: ignore
def get_fullpath(self) -> str: def get_fullpath(self) -> str:
"""return full path to this node""" """return full path to this node"""
path = self.name path = self.get_name()
if self.parent: if self.parent:
typcast_node(self.parent) typcast_node(self.parent)
ppath = self.parent.get_fullpath() ppath = self.parent.get_fullpath()
path = os.path.join(ppath, path) path = os.path.join(ppath, path)
return str(path) return fix_badchars(path)
def get_rec_size(self) -> int: def get_rec_size(self) -> int:
"""recursively traverse tree and return size""" """recursively traverse tree and return size"""

@ -0,0 +1,39 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2024, deadc0de6
nodes helpers
"""
import os
# local imports
from catcli import nodes
def path_to_top(path: str) -> str:
"""path pivot under top"""
pre = f"{os.path.sep}{nodes.NAME_TOP}"
if not path.startswith(pre):
# prepend with top node path
path = pre + path
return path
def path_to_search_all(path: str) -> str:
"""path to search for all subs"""
if not path:
path = os.path.sep
if not path.startswith(os.path.sep):
path = os.path.sep + path
pre = f"{os.path.sep}{nodes.NAME_TOP}"
if not path.startswith(pre):
# prepend with top node path
path = pre + path
# if not path.endswith(os.path.sep):
# # ensure ends with a separator
# path += os.path.sep
# if not path.endswith(WILD):
# # add wild card
# path += WILD
return path

@ -9,8 +9,7 @@ import sys
from typing import List from typing import List
from catcli.nodes import NodeAny, NodeStorage, TYPE_DIR from catcli.nodes import NodeAny, NodeStorage, TYPE_DIR
from catcli.utils import size_to_str, epoch_to_str, \ from catcli.utils import size_to_str, epoch_to_str
has_attr
class CsvPrinter: class CsvPrinter:
@ -35,7 +34,7 @@ class CsvPrinter:
raw: bool = False) -> None: raw: bool = False) -> None:
"""print a storage node""" """print a storage node"""
out = [] out = []
out.append(node.name) # name out.append(node.get_name()) # name
out.append(node.type) # type out.append(node.type) # type
out.append('') # fake full path out.append('') # fake full path
size = node.get_rec_size() size = node.get_rec_size()
@ -56,7 +55,7 @@ class CsvPrinter:
raw: bool = False) -> None: raw: bool = False) -> None:
"""print other nodes""" """print other nodes"""
out = [] out = []
out.append(node.name.replace('"', '""')) # name out.append(node.get_name().replace('"', '""')) # name
out.append(node.type) # type out.append(node.type) # type
fullpath = node.get_fullpath() fullpath = node.get_fullpath()
out.append(fullpath.replace('"', '""')) # full path out.append(fullpath.replace('"', '""')) # full path
@ -64,11 +63,11 @@ class CsvPrinter:
out.append(size_to_str(node.nodesize, raw=raw)) # size out.append(size_to_str(node.nodesize, raw=raw)) # size
storage = node.get_storage_node() storage = node.get_storage_node()
out.append(epoch_to_str(storage.ts)) # indexed_at out.append(epoch_to_str(storage.ts)) # indexed_at
if has_attr(node, 'maccess'): if node.has_attr('maccess'):
out.append(epoch_to_str(node.maccess)) # maccess out.append(epoch_to_str(node.maccess)) # maccess
else: else:
out.append('') # fake maccess out.append('') # fake maccess
if has_attr(node, 'md5'): if node.has_attr('md5'):
out.append(node.md5) # md5 out.append(node.md5) # md5
else: else:
out.append('') # fake md5 out.append('') # fake md5

@ -12,7 +12,7 @@ from catcli.nodes import NodeFile, NodeDir, \
from catcli.colors import Colors from catcli.colors import Colors
from catcli.logger import Logger from catcli.logger import Logger
from catcli.utils import fix_badchars, size_to_str, \ from catcli.utils import fix_badchars, size_to_str, \
has_attr, epoch_to_str epoch_to_str
COLOR_STORAGE = Colors.YELLOW COLOR_STORAGE = Colors.YELLOW
@ -54,8 +54,7 @@ class NativePrinter:
raw: bool = False) -> None: raw: bool = False) -> None:
"""print a storage node""" """print a storage node"""
# construct name # construct name
name = node.name name = node.get_name()
name = fix_badchars(name)
# construct attrs # construct attrs
attrs = [] attrs = []
# nb files # nb files
@ -74,7 +73,7 @@ class NativePrinter:
szused = size_to_str(node.total - node.free, raw=raw) szused = size_to_str(node.total - node.free, raw=raw)
attrs.append(f'du:{szused}/{sztotal}') attrs.append(f'du:{szused}/{sztotal}')
# timestamp # timestamp
if has_attr(node, 'ts'): if node.has_attr('ts'):
attrs.append(f'date:{epoch_to_str(node.ts)}') attrs.append(f'date:{epoch_to_str(node.ts)}')
# print # print
@ -91,7 +90,7 @@ class NativePrinter:
raw: bool = False) -> None: raw: bool = False) -> None:
"""print a file node""" """print a file node"""
# construct name # construct name
name = node.name name = node.get_name()
storage = node.get_storage_node() storage = node.get_storage_node()
if withpath: if withpath:
name = node.get_fullpath() name = node.get_fullpath()
@ -100,7 +99,7 @@ class NativePrinter:
if node.md5: if node.md5:
attrs.append(f'md5:{node.md5}') attrs.append(f'md5:{node.md5}')
if withstorage: if withstorage:
content = Logger.get_bold_text(storage.name) content = Logger.get_bold_text(storage.get_name())
attrs.append(f'storage:{content}') attrs.append(f'storage:{content}')
# print # print
out = [] out = []
@ -111,7 +110,7 @@ class NativePrinter:
size = node.nodesize size = node.nodesize
line = size_to_str(size, raw=raw) line = size_to_str(size, raw=raw)
out.append(f'{COLOR_SIZE}{line}{Colors.RESET}') out.append(f'{COLOR_SIZE}{line}{Colors.RESET}')
if has_attr(node, 'maccess'): if node.has_attr('maccess'):
line = epoch_to_str(node.maccess) line = epoch_to_str(node.maccess)
out.append(f'{COLOR_TS}{line}{Colors.RESET}') out.append(f'{COLOR_TS}{line}{Colors.RESET}')
if attrs: if attrs:
@ -128,7 +127,7 @@ class NativePrinter:
raw: bool = False) -> None: raw: bool = False) -> None:
"""print a directory node""" """print a directory node"""
# construct name # construct name
name = node.name name = node.get_name()
storage = node.get_storage_node() storage = node.get_storage_node()
if withpath: if withpath:
name = node.get_fullpath() name = node.get_fullpath()
@ -138,7 +137,7 @@ class NativePrinter:
nbchildren = len(node.children) nbchildren = len(node.children)
attrs.append(f'{self.NBFILES}:{nbchildren}') attrs.append(f'{self.NBFILES}:{nbchildren}')
if withstorage: if withstorage:
attrs.append(f'storage:{Logger.get_bold_text(storage.name)}') attrs.append(f"storage:{Logger.get_bold_text(storage.get_name())}")
# print # print
out = [] out = []
out.append(f'{pre}') out.append(f'{pre}')
@ -148,7 +147,7 @@ class NativePrinter:
size = node.nodesize size = node.nodesize
line = size_to_str(size, raw=raw) line = size_to_str(size, raw=raw)
out.append(f'{COLOR_SIZE}{line}{Colors.RESET}') out.append(f'{COLOR_SIZE}{line}{Colors.RESET}')
if has_attr(node, 'maccess'): if node.has_attr('maccess'):
line = epoch_to_str(node.maccess) line = epoch_to_str(node.maccess)
out.append(f'{COLOR_TS}{line}{Colors.RESET}') out.append(f'{COLOR_TS}{line}{Colors.RESET}')
if attrs: if attrs:

@ -10,43 +10,15 @@ import hashlib
import tempfile import tempfile
import subprocess import subprocess
import datetime import datetime
import string
# local imports # local imports
from catcli import nodes
from catcli.exceptions import CatcliException from catcli.exceptions import CatcliException
WILD = '*' WILD = '*'
def path_to_top(path: str) -> str:
"""path pivot under top"""
pre = f'{os.path.sep}{nodes.NAME_TOP}'
if not path.startswith(pre):
# prepend with top node path
path = pre + path
return path
def path_to_search_all(path: str) -> str:
"""path to search for all subs"""
if not path:
path = os.path.sep
if not path.startswith(os.path.sep):
path = os.path.sep + path
pre = f'{os.path.sep}{nodes.NAME_TOP}'
if not path.startswith(pre):
# prepend with top node path
path = pre + path
# if not path.endswith(os.path.sep):
# # ensure ends with a separator
# path += os.path.sep
# if not path.endswith(WILD):
# # add wild card
# path += WILD
return path
def md5sum(path: str) -> str: def md5sum(path: str) -> str:
""" """
calculate md5 sum of a file calculate md5 sum of a file
@ -101,24 +73,20 @@ def ask(question: str) -> bool:
return resp.lower() == 'y' return resp.lower() == 'y'
def edit(string: str) -> str: def edit(data: str) -> str:
"""edit the information with the default EDITOR""" """edit the information with the default EDITOR"""
data = string.encode('utf-8') content = fix_badchars(data)
editor = os.environ.get('EDITOR', 'vim') editor = os.environ.get('EDITOR', 'vim')
with tempfile.NamedTemporaryFile(prefix='catcli', suffix='.tmp') as file: with tempfile.NamedTemporaryFile(prefix='catcli', suffix='.tmp') as file:
file.write(data) file.write(content.encode('utf-8'))
file.flush() file.flush()
subprocess.call([editor, file.name]) subprocess.call([editor, file.get_name()])
file.seek(0) file.seek(0)
new = file.read() new = file.read()
return new.decode('utf-8') return new.decode('utf-8')
def fix_badchars(string: str) -> str: def fix_badchars(data: str) -> str:
"""fix none utf-8 chars in string""" """fix none utf-8 chars in string"""
return string.encode('utf-8', 'ignore').decode('utf-8') data = "".join(x for x in data if x in string.printable)
return data.encode("utf-8", "ignore").decode("utf-8")
def has_attr(node: nodes.NodeAny, attr: str) -> bool:
"""return True if node has attr as attribute"""
return attr in node.__dict__.keys()

@ -171,7 +171,7 @@ class Walker:
if node and changed: if node and changed:
# remove this node and re-add # remove this node and re-add
self._debug(f'\t{path} has changed') self._debug(f'\t{path} has changed')
self._debug(f'\tremoving node {node.name} for {path}') self._debug(f"\tremoving node {node.get_name()} for {path}")
node.parent = None node.parent = None
return True, node return True, node

@ -62,7 +62,7 @@ class TestIndexing(unittest.TestCase):
self.assertTrue(len(storage.children) == 5) self.assertTrue(len(storage.children) == 5)
# ensures files and directories are in # ensures files and directories are in
names = [x.name for x in storage.children] names = [x.get_name() for x in storage.children]
self.assertTrue(os.path.basename(file1) in names) self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names) self.assertTrue(os.path.basename(file2) in names)
self.assertTrue(os.path.basename(file3) in names) self.assertTrue(os.path.basename(file3) in names)
@ -70,9 +70,9 @@ class TestIndexing(unittest.TestCase):
self.assertTrue(os.path.basename(dir2) in names) self.assertTrue(os.path.basename(dir2) in names)
for node in storage.children: for node in storage.children:
if node.name == os.path.basename(dir1): if node.get_name() == os.path.basename(dir1):
self.assertTrue(len(node.children) == 2) self.assertTrue(len(node.children) == 2)
elif node.name == os.path.basename(dir2): elif node.get_name() == os.path.basename(dir2):
self.assertTrue(len(node.children) == 1) self.assertTrue(len(node.children) == 1)

@ -151,7 +151,7 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(nod.md5 == d2f2_md5_new) self.assertTrue(nod.md5 == d2f2_md5_new)
# ensures files and directories are in # ensures files and directories are in
names = [node.name for node in anytree.PreOrderIter(storage)] names = [node.get_name() for node in anytree.PreOrderIter(storage)]
print(names) print(names)
self.assertTrue(os.path.basename(file1) in names) self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names) self.assertTrue(os.path.basename(file2) in names)
@ -169,13 +169,13 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(os.path.basename(new5) in names) self.assertTrue(os.path.basename(new5) in names)
for node in storage.children: for node in storage.children:
if node.name == os.path.basename(dir1): if node.get_name() == os.path.basename(dir1):
self.assertTrue(len(node.children) == 3) self.assertTrue(len(node.children) == 3)
elif node.name == os.path.basename(dir2): elif node.get_name() == os.path.basename(dir2):
self.assertTrue(len(node.children) == 3) self.assertTrue(len(node.children) == 3)
elif node.name == os.path.basename(new3): elif node.get_name() == os.path.basename(new3):
self.assertTrue(len(node.children) == 0) self.assertTrue(len(node.children) == 0)
elif node.name == os.path.basename(new4): elif node.get_name() == os.path.basename(new4):
self.assertTrue(len(node.children) == 1) self.assertTrue(len(node.children) == 1)
self.assertTrue(read_from_file(d1f1) == editval) self.assertTrue(read_from_file(d1f1) == editval)
@ -189,7 +189,7 @@ class TestUpdate(unittest.TestCase):
cmd_update(args, noder, catalog, top) cmd_update(args, noder, catalog, top)
# ensures files and directories are (not) in # ensures files and directories are (not) in
names = [node.name for node in anytree.PreOrderIter(storage)] names = [node.get_name() for node in anytree.PreOrderIter(storage)]
print(names) print(names)
self.assertTrue(os.path.basename(file1) in names) self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names) self.assertTrue(os.path.basename(file2) in names)
@ -207,9 +207,9 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(os.path.basename(new4) not in names) self.assertTrue(os.path.basename(new4) not in names)
self.assertTrue(os.path.basename(new5) not in names) self.assertTrue(os.path.basename(new5) not in names)
for node in storage.children: for node in storage.children:
if node.name == os.path.basename(dir1): if node.get_name() == os.path.basename(dir1):
self.assertTrue(len(node.children) == 2) self.assertTrue(len(node.children) == 2)
elif node.name == os.path.basename(new3): elif node.get_name() == os.path.basename(new3):
self.assertTrue(len(node.children) == 0) self.assertTrue(len(node.children) == 0)

Loading…
Cancel
Save