refactor node types

pull/30/head
deadc0de6 1 year ago
parent 45c2599a95
commit 45d3b096f3

@ -12,7 +12,7 @@ from anytree.exporter import JsonExporter # type: ignore
from anytree.importer import JsonImporter # type: ignore
# local imports
from catcli.cnode import Node
from catcli.cnode import NodeMeta, NodeTop
from catcli.utils import ask
from catcli.logger import Logger
@ -33,10 +33,10 @@ class Catalog:
self.path = path
self.debug = debug
self.force = force
self.metanode: Optional[Node] = None
self.metanode: Optional[NodeMeta] = None
self.pickle = usepickle
def set_metanode(self, metanode: Node) -> None:
def set_metanode(self, metanode: NodeMeta) -> None:
"""remove the metanode until tree is re-written"""
self.metanode = metanode
if self.metanode:
@ -50,7 +50,7 @@ class Catalog:
return True
return False
def restore(self) -> Optional[Node]:
def restore(self) -> Optional[NodeTop]:
"""restore the catalog"""
if not self.path:
return None
@ -62,7 +62,7 @@ class Catalog:
content = file.read()
return self._restore_json(content)
def save(self, node: Node) -> bool:
def save(self, node: NodeTop) -> bool:
"""save the catalog"""
if not self.path:
Logger.err('Path not defined')
@ -88,14 +88,14 @@ class Catalog:
return
Logger.debug(text)
def _save_pickle(self, node: Node) -> bool:
def _save_pickle(self, node: NodeTop) -> bool:
"""pickle the catalog"""
with open(self.path, 'wb') as file:
pickle.dump(node, file)
self._debug(f'Catalog saved to pickle \"{self.path}\"')
return True
def _restore_pickle(self) -> Union[Node, Any]:
def _restore_pickle(self) -> Union[NodeTop, Any]:
"""restore the pickled tree"""
with open(self.path, 'rb') as file:
root = pickle.load(file)
@ -103,7 +103,7 @@ class Catalog:
self._debug(msg)
return root
def _save_json(self, node: Node) -> bool:
def _save_json(self, node: NodeTop) -> bool:
"""export the catalog in json"""
exp = JsonExporter(indent=2, sort_keys=True)
with open(self.path, 'w', encoding='UTF-8') as file:
@ -111,9 +111,9 @@ class Catalog:
self._debug(f'Catalog saved to json \"{self.path}\"')
return True
def _restore_json(self, string: str) -> Node:
def _restore_json(self, string: str) -> NodeTop:
"""restore the tree from json"""
imp = JsonImporter()
root = imp.import_(string)
self._debug(f'Catalog imported from json \"{self.path}\"')
return cast(Node, root)
return cast(NodeTop, root)

@ -17,11 +17,11 @@ from docopt import docopt
# local imports
from catcli import cnode
from catcli.version import __version__ as VERSION
from catcli.cnode import NodeTop, NodeAny
from catcli.logger import Logger
from catcli.colors import Colors
from catcli.catalog import Catalog
from catcli.walker import Walker
from catcli.cnode import Node
from catcli.noder import Noder
from catcli.utils import ask, edit
from catcli.fuser import Fuser
@ -82,7 +82,7 @@ Options:
def cmd_mount(args: Dict[str, Any],
top: Node,
top: NodeTop,
noder: Noder) -> None:
"""mount action"""
mountpoint = args['<mountpoint>']
@ -94,7 +94,7 @@ def cmd_mount(args: Dict[str, Any],
def cmd_index(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: Node) -> None:
top: NodeTop) -> None:
"""index action"""
path = args['<path>']
name = args['<name>']
@ -117,8 +117,8 @@ def cmd_index(args: Dict[str, Any],
start = datetime.datetime.now()
walker = Walker(noder, usehash=usehash, debug=debug)
attr = noder.attrs_to_string(args['--meta'])
root = noder.new_storage_node(name, path, parent=top, attrs=attr)
attr = args['--meta']
root = noder.new_storage_node(name, path, top, attr)
_, cnt = walker.index(path, root, name)
if subsize:
noder.rec_size(root, store=True)
@ -132,7 +132,7 @@ def cmd_index(args: Dict[str, Any],
def cmd_update(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: Node) -> None:
top: NodeTop) -> None:
"""update action"""
path = args['<path>']
name = args['<name>']
@ -162,7 +162,7 @@ def cmd_update(args: Dict[str, Any],
def cmd_ls(args: Dict[str, Any],
noder: Noder,
top: Node) -> List[Node]:
top: NodeTop) -> List[NodeAny]:
"""ls action"""
path = args['<path>']
if not path:
@ -197,7 +197,7 @@ def cmd_ls(args: Dict[str, Any],
def cmd_rm(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: Node) -> Node:
top: NodeTop) -> NodeTop:
"""rm action"""
name = args['<storage>']
node = noder.get_storage_node(top, name)
@ -212,7 +212,7 @@ def cmd_rm(args: Dict[str, Any],
def cmd_find(args: Dict[str, Any],
noder: Noder,
top: Node) -> List[Node]:
top: NodeTop) -> List[NodeAny]:
"""find action"""
fromtree = args['--parent']
directory = args['--directory']
@ -232,7 +232,7 @@ def cmd_find(args: Dict[str, Any],
def cmd_graph(args: Dict[str, Any],
noder: Noder,
top: Node) -> None:
top: NodeTop) -> None:
"""graph action"""
path = args['<path>']
if not path:
@ -243,7 +243,7 @@ def cmd_graph(args: Dict[str, Any],
def cmd_rename(args: Dict[str, Any],
catalog: Catalog,
top: Node) -> None:
top: NodeTop) -> None:
"""rename action"""
storage = args['<storage>']
new = args['<name>']
@ -261,7 +261,7 @@ def cmd_rename(args: Dict[str, Any],
def cmd_edit(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: Node) -> None:
top: NodeTop) -> None:
"""edit action"""
storage = args['<storage>']
storages = list(x.name for x in top.children)

@ -6,57 +6,39 @@ Class that represents a node in the catalog tree
"""
# pylint: disable=W0622
from typing import Dict, Any
from anytree import NodeMixin # type: ignore
TYPE_TOP = 'top'
TYPE_FILE = 'file'
TYPE_DIR = 'dir'
TYPE_ARC = 'arc'
TYPE_STORAGE = 'storage'
TYPE_META = 'meta'
_TYPE_BAD = 'badtype'
_TYPE_TOP = 'top'
_TYPE_FILE = 'file'
_TYPE_DIR = 'dir'
_TYPE_ARC = 'arc'
_TYPE_STORAGE = 'storage'
_TYPE_META = 'meta'
NAME_TOP = 'top'
NAME_META = 'meta'
class Node(NodeMixin): # type: ignore
"""a node in the catalog"""
class NodeAny(NodeMixin): # type: ignore
"""generic node"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
type: str,
size: float = 0,
relpath: str = '',
md5: str = '',
maccess: float = 0,
free: int = 0,
total: int = 0,
indexed_dt: int = 0,
attr: str = '',
archive: str = '',
parent=None,
children=None):
"""build a node"""
"""build generic node"""
super().__init__()
self.name = name
self.type = type
self.size = size
self.relpath = relpath
self.md5 = md5
self.maccess = maccess
self.free = free
self.total = total
self.indexed_dt = indexed_dt
self.attr = attr
self.archive = archive
self._flagged = False
self.parent = parent
if children:
self.children = children
self._flagged = False
def flagged(self) -> bool:
"""is flagged"""
if not hasattr(self, '_flagged'):
return False
return self._flagged
def flag(self) -> None:
@ -66,3 +48,133 @@ class Node(NodeMixin): # type: ignore
def unflag(self) -> None:
"""unflag node"""
self._flagged = False
del self._flagged
class NodeTop(NodeAny):
"""a top node"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
children=None):
"""build a top node"""
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = _TYPE_TOP
self.parent = None
if children:
self.children = children
class NodeFile(NodeAny):
"""a file node"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
relpath: str,
size: int,
md5: str,
maccess: float,
parent=None,
children=None):
"""build a file node"""
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = _TYPE_FILE
self.relpath = relpath
self.size = size
self.md5 = md5
self.maccess = maccess
self.parent = parent
if children:
self.children = children
class NodeDir(NodeAny):
"""a directory node"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
relpath: str,
size: int,
maccess: float,
parent=None,
children=None):
"""build a directory node"""
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = _TYPE_DIR
self.relpath = relpath
self.size = size
self.maccess = maccess
self.parent = parent
if children:
self.children = children
class NodeArchived(NodeAny):
"""an archived node"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
relpath: str,
size: int,
md5: str,
archive: str,
parent=None,
children=None):
"""build an archived node"""
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = _TYPE_ARC
self.relpath = relpath
self.size = size
self.md5 = md5
self.archive = archive
self.parent = parent
if children:
self.children = children
class NodeStorage(NodeAny):
"""a storage node"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
free: int,
total: int,
size: int,
indexed_dt: float,
attr: Dict[str, Any],
parent=None,
children=None):
"""build a storage node"""
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = _TYPE_STORAGE
self.free = free
self.total = total
self.attr = attr
self.size = size
self.indexed_dt = indexed_dt
self.parent = parent
if children:
self.children = children
class NodeMeta(NodeAny):
"""a meta node"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
attr: str,
parent=None,
children=None):
"""build a meta node"""
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = _TYPE_META
self.attr = attr
self.parent = parent
if children:
self.children = children

@ -14,8 +14,8 @@ import fuse # type: ignore
# local imports
from catcli.noder import Noder
from catcli.cnode import NodeTop, NodeAny
from catcli import cnode
from catcli.cnode import Node
# build custom logger to log in /tmp
@ -34,7 +34,7 @@ class Fuser:
"""fuse filesystem mounter"""
def __init__(self, mountpoint: str,
top: Node,
top: NodeTop,
noder: Noder,
debug: bool = False):
"""fuse filesystem"""
@ -50,13 +50,13 @@ class Fuser:
class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
"""in-memory filesystem for catcli catalog"""
def __init__(self, top: Node,
def __init__(self, top: NodeTop,
noder: Noder):
"""init fuse filesystem"""
self.top = top
self.noder = noder
def _get_entry(self, path: str) -> Optional[Node]:
def _get_entry(self, path: str) -> Optional[NodeAny]:
"""return the node pointed by path"""
pre = f'{SEPARATOR}{cnode.NAME_TOP}'
if not path.startswith(pre):
@ -69,7 +69,7 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
return found[0]
return None
def _get_entries(self, path: str) -> List[Node]:
def _get_entries(self, path: str) -> List[NodeAny]:
"""return nodes pointed by path"""
pre = f'{SEPARATOR}{cnode.NAME_TOP}'
if not path.startswith(pre):
@ -91,17 +91,17 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
curt = time()
mode: Any = S_IFREG
if entry.type == cnode.TYPE_ARC:
if isinstance(entry, cnode.NodeArchived):
mode = S_IFREG
elif entry.type == cnode.TYPE_DIR:
elif isinstance(entry, cnode.NodeDir):
mode = S_IFDIR
elif entry.type == cnode.TYPE_FILE:
elif isinstance(entry, cnode.NodeFile):
mode = S_IFREG
elif entry.type == cnode.TYPE_STORAGE:
elif isinstance(entry, cnode.NodeStorage):
mode = S_IFDIR
elif entry.type == cnode.TYPE_META:
elif isinstance(entry, cnode.NodeMeta):
mode = S_IFREG
elif entry.type == cnode.TYPE_TOP:
elif isinstance(entry, cnode.NodeTop):
mode = S_IFREG
return {
'st_mode': (mode),

@ -6,7 +6,8 @@ Class for printing nodes
"""
import sys
from typing import TypeVar, Type, Optional, Tuple, List
from typing import TypeVar, Type, Optional, Tuple, List, \
Dict, Any
from catcli.colors import Colors
from catcli.utils import fix_badchars
@ -25,7 +26,7 @@ class NodePrinter:
@classmethod
def print_storage_native(cls: Type[CLASSTYPE], pre: str,
name: str, args: str,
attr: str) -> None:
attr: Dict[str, Any]) -> None:
"""print a storage node"""
end = ''
if attr:

@ -14,7 +14,8 @@ from pyfzf.pyfzf import FzfPrompt # type: ignore
# local imports
from catcli import cnode
from catcli.cnode import Node
from catcli.cnode import NodeAny, NodeStorage, \
NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta
from catcli.utils import size_to_str, epoch_to_str, md5sum, fix_badchars
from catcli.logger import Logger
from catcli.nodeprinter import NodePrinter
@ -53,20 +54,20 @@ class Noder:
self.decomp = Decomp()
@staticmethod
def get_storage_names(top: Node) -> List[str]:
def get_storage_names(top: NodeTop) -> List[str]:
"""return a list of all storage names"""
return [x.name for x in list(top.children)]
def get_storage_node(self, top: Node,
def get_storage_node(self, top: NodeTop,
name: str,
newpath: str = '') -> Node:
newpath: str = '') -> NodeStorage:
"""
return the storage node if any
if newpath is submitted, it will update the media info
"""
found = None
for node in top.children:
if node.type != cnode.TYPE_STORAGE:
if not isinstance(node, cnode.NodeStorage):
continue
if node.name == name:
found = node
@ -75,27 +76,27 @@ class Noder:
found.free = shutil.disk_usage(newpath).free
found.total = shutil.disk_usage(newpath).total
found.ts = int(time.time())
return cast(Node, found)
return cast(NodeStorage, found)
@staticmethod
def get_node(top: Node,
def get_node(top: NodeTop,
path: str,
quiet: bool = False) -> Optional[Node]:
quiet: bool = False) -> Optional[NodeAny]:
"""get the node by internal tree path"""
resolv = anytree.resolver.Resolver('name')
try:
bpath = os.path.basename(path)
the_node = resolv.get(top, bpath)
return cast(Node, the_node)
return cast(NodeAny, the_node)
except anytree.resolver.ChildResolverError:
if not quiet:
Logger.err(f'No node at path \"{bpath}\"')
return None
def get_node_if_changed(self,
top: Node,
top: NodeTop,
path: str,
treepath: str) -> Tuple[Optional[Node], bool]:
treepath: str) -> Tuple[Optional[NodeAny], bool]:
"""
return the node (if any) and if it has changed
@top: top node (storage)
@ -131,25 +132,25 @@ class Noder:
self._debug(f'\tchange: no change for \"{path}\"')
return node, False
def rec_size(self, node: Node,
store: bool = True) -> float:
def rec_size(self, node: Union[NodeDir, NodeStorage],
store: bool = True) -> int:
"""
recursively traverse tree and return size
@store: store the size in the node
"""
if node.type == cnode.TYPE_FILE:
if isinstance(node, cnode.NodeFile):
self._debug(f'getting node size for \"{node.name}\"')
return float(node.size)
return node.size
msg = f'getting node size recursively for \"{node.name}\"'
self._debug(msg)
size: float = 0
size: int = 0
for i in node.children:
if node.type == cnode.TYPE_DIR:
if isinstance(node, cnode.NodeDir):
size = self.rec_size(i, store=store)
if store:
i.size = size
size += size
if node.type == cnode.TYPE_STORAGE:
if isinstance(node, cnode.NodeStorage):
size = self.rec_size(i, store=store)
if store:
i.size = size
@ -185,13 +186,12 @@ class Noder:
###############################################################
# node creation
###############################################################
def new_top_node(self) -> Node:
def new_top_node(self) -> NodeTop:
"""create a new top node"""
return Node(cnode.NAME_TOP,
cnode.TYPE_TOP)
return NodeTop(cnode.NAME_TOP)
def new_file_node(self, name: str, path: str,
parent: Node, storagepath: str) -> Optional[Node]:
parent: NodeAny, storagepath: str) -> Optional[NodeFile]:
"""create a new node representing a file"""
if not os.path.exists(path):
Logger.err(f'File \"{path}\" does not exist')
@ -208,11 +208,12 @@ class Noder:
relpath = os.sep.join([storagepath, name])
maccess = os.path.getmtime(path)
node = self._new_generic_node(name, cnode.TYPE_FILE,
relpath, parent,
size=stat.st_size,
md5=md5,
maccess=maccess)
node = NodeFile(name,
relpath,
stat.st_size,
md5,
maccess,
parent=parent)
if self.arc:
ext = os.path.splitext(path)[1][1:]
if ext.lower() in self.decomp.get_formats():
@ -224,59 +225,46 @@ class Noder:
return node
def new_dir_node(self, name: str, path: str,
parent: Node, storagepath: str) -> Node:
parent: NodeAny, storagepath: str) -> NodeDir:
"""create a new node representing a directory"""
path = os.path.abspath(path)
relpath = os.sep.join([storagepath, name])
maccess = os.path.getmtime(path)
return self._new_generic_node(name, cnode.TYPE_DIR, relpath,
parent, maccess=maccess)
return NodeDir(name,
relpath,
0,
maccess,
parent=parent)
def new_storage_node(self, name: str,
path: str,
parent: str,
attrs: str = '') -> Node:
attrs: Dict[str, Any]) \
-> NodeStorage:
"""create a new node representing a storage"""
path = os.path.abspath(path)
free = shutil.disk_usage(path).free
total = shutil.disk_usage(path).total
epoch = int(time.time())
return Node(name=name,
type=cnode.TYPE_STORAGE,
free=free,
total=total,
parent=parent,
attr=attrs,
indexed_dt=epoch)
return NodeStorage(name,
free,
total,
0,
epoch,
attrs,
parent=parent)
def new_archive_node(self, name: str, path: str,
parent: str, archive: str) -> Node:
parent: str, archive: str) -> NodeArchived:
"""create a new node for archive data"""
return Node(name=name, type=cnode.TYPE_ARC, relpath=path,
parent=parent, size=0, md5='',
archive=archive)
@staticmethod
def _new_generic_node(name: str,
nodetype: str,
relpath: str,
parent: Node,
size: float = 0,
md5: str = '',
maccess: float = 0) -> Node:
"""generic node creation"""
return Node(name,
nodetype,
size=size,
relpath=relpath,
md5=md5,
maccess=maccess,
parent=parent)
return NodeArchived(name=name, relpath=path,
parent=parent, size=0, md5='',
archive=archive)
###############################################################
# node management
###############################################################
def update_metanode(self, top: Node) -> Node:
def update_metanode(self, top: NodeTop) -> NodeMeta:
"""create or update meta node information"""
meta = self._get_meta_node(top)
epoch = int(time.time())
@ -284,9 +272,8 @@ class Noder:
attrs: Dict[str, Any] = {}
attrs['created'] = epoch
attrs['created_version'] = VERSION
meta = Node(name=cnode.NAME_META,
type=cnode.TYPE_META,
attr=self.attrs_to_string(attrs))
meta = NodeMeta(name=cnode.NAME_META,
attr=self.attrs_to_string(attrs))
if meta.attr:
meta.attr += ', '
meta.attr += f'access={epoch}'
@ -294,26 +281,26 @@ class Noder:
meta.attr += f'access_version={VERSION}'
return meta
def _get_meta_node(self, top: Node) -> Optional[Node]:
def _get_meta_node(self, top: NodeTop) -> Optional[NodeMeta]:
"""return the meta node if any"""
try:
found = next(filter(lambda x: x.type == cnode.TYPE_META,
found = next(filter(lambda x: isinstance(x, cnode.NodeMeta),
top.children))
return cast(Node, found)
return cast(NodeMeta, found)
except StopIteration:
return None
def clean_not_flagged(self, top: Node) -> int:
def clean_not_flagged(self, top: NodeTop) -> int:
"""remove any node not flagged and clean flags"""
cnt = 0
for node in anytree.PreOrderIter(top):
if node.type not in [cnode.TYPE_FILE, cnode.TYPE_DIR]:
if not isinstance(node, (cnode.NodeDir, cnode.NodeFile)):
continue
if self._clean(node):
cnt += 1
return cnt
def _clean(self, node: Node) -> bool:
def _clean(self, node: NodeAny) -> bool:
"""remove node if not flagged"""
if not node.flagged():
node.parent = None
@ -324,7 +311,7 @@ class Noder:
###############################################################
# printing
###############################################################
def _node_to_csv(self, node: Node,
def _node_to_csv(self, node: NodeAny,
sep: str = ',',
raw: bool = False) -> None:
"""
@ -333,7 +320,7 @@ class Noder:
@sep: CSV separator character
@raw: print raw size rather than human readable
"""
if not cnode:
if not node:
return
if node.type == node.TYPE_TOP:
return
@ -374,7 +361,7 @@ class Noder:
out.append(node.md5) # md5
else:
out.append('') # fake md5
if node.type == cnode.TYPE_DIR:
if isinstance(node, cnode.NodeDir):
out.append(str(len(node.children))) # nbfiles
else:
out.append('') # fake nbfiles
@ -386,7 +373,7 @@ class Noder:
if len(line) > 0:
Logger.stdout_nocolor(line)
def _print_node_native(self, node: Node,
def _print_node_native(self, node: NodeAny,
pre: str = '',
withpath: bool = False,
withdepth: bool = False,
@ -403,10 +390,10 @@ class Noder:
@recalcparent: get relpath from tree instead of relpath field
@raw: print raw size rather than human readable
"""
if node.type == cnode.TYPE_TOP:
if isinstance(node, cnode.NodeTop):
# top node
Logger.stdout_nocolor(f'{pre}{node.name}')
elif node.type == cnode.TYPE_FILE:
elif isinstance(node, cnode.NodeFile):
# node of type file
name = node.name
if withpath:
@ -426,7 +413,7 @@ class Noder:
content = Logger.get_bold_text(storage.name)
compl += f', storage:{content}'
NodePrinter.print_file_native(pre, name, compl)
elif node.type == cnode.TYPE_DIR:
elif isinstance(node, cnode.NodeDir):
# node of type directory
name = node.name
if withpath:
@ -446,7 +433,7 @@ class Noder:
if withstorage:
attr.append(('storage', Logger.get_bold_text(storage.name)))
NodePrinter.print_dir_native(pre, name, depth=depth, attr=attr)
elif node.type == cnode.TYPE_STORAGE:
elif isinstance(node, cnode.NodeStorage):
# node of type storage
sztotal = size_to_str(node.total, raw=raw)
szused = size_to_str(node.total - node.free, raw=raw)
@ -476,14 +463,14 @@ class Noder:
name,
argsstring,
node.attr)
elif node.type == cnode.TYPE_ARC:
elif isinstance(node, cnode.NodeArchived):
# archive node
if self.arc:
NodePrinter.print_archive_native(pre, node.name, node.archive)
else:
Logger.err(f'bad node encountered: {node}')
def print_tree(self, node: Node,
def print_tree(self, node: NodeAny,
fmt: str = 'native',
raw: bool = False) -> None:
"""
@ -507,7 +494,7 @@ class Noder:
Logger.stdout_nocolor(self.CSV_HEADER)
self._to_csv(node, raw=raw)
def _to_csv(self, node: Node,
def _to_csv(self, node: NodeAny,
raw: bool = False) -> None:
"""print the tree to csv"""
rend = anytree.RenderTree(node, childiter=self._sort_tree)
@ -521,7 +508,7 @@ class Noder:
selected = fzf.prompt(strings)
return selected
def _to_fzf(self, node: Node, fmt: str) -> None:
def _to_fzf(self, node: NodeAny, fmt: str) -> None:
"""
fzf prompt with list and print selected node(s)
@node: node to start with
@ -550,24 +537,24 @@ class Noder:
self.print_tree(rend, fmt=subfmt)
@staticmethod
def to_dot(node: Node,
def to_dot(top: NodeTop,
path: str = 'tree.dot') -> str:
"""export to dot for graphing"""
anytree.exporter.DotExporter(node).to_dotfile(path)
anytree.exporter.DotExporter(top).to_dotfile(path)
Logger.info(f'dot file created under \"{path}\"')
return f'dot {path} -T png -o /tmp/tree.png'
###############################################################
# searching
###############################################################
def find_name(self, top: Node,
def find_name(self, top: NodeTop,
key: str,
script: bool = False,
only_dir: bool = False,
startnode: Optional[Node] = None,
startnode: Optional[NodeAny] = None,
parentfromtree: bool = False,
fmt: str = 'native',
raw: bool = False) -> List[Node]:
raw: bool = False) -> List[NodeAny]:
"""
find files based on their names
@top: top node
@ -583,7 +570,7 @@ class Noder:
self._debug(f'searching for \"{key}\"')
# search for nodes based on path
start: Optional[Node] = top
start: Optional[NodeAny] = top
if startnode:
start = self.get_node(top, startnode)
filterfunc = self._callback_find_name(key, only_dir)
@ -594,7 +581,9 @@ class Noder:
# compile found nodes
paths = {}
for item in found:
item = self._sanitize(item)
item.name = fix_badchars(item.name)
if hasattr(item, 'relpath'):
item.relpath = fix_badchars(item.relpath)
if parentfromtree:
paths[self._get_parents(item)] = item
else:
@ -636,17 +625,17 @@ class Noder:
def _callback_find_name(self, term: str, only_dir: bool) -> Any:
"""callback for finding files"""
def find_name(node: Node) -> bool:
if node.type == cnode.TYPE_STORAGE:
def find_name(node: NodeAny) -> bool:
if isinstance(node, cnode.NodeStorage):
# ignore storage nodes
return False
if node.type == cnode.TYPE_TOP:
if isinstance(node, cnode.NodeTop):
# ignore top nodes
return False
if node.type == cnode.TYPE_META:
if isinstance(node, cnode.NodeMeta):
# ignore meta nodes
return False
if only_dir and node.type != cnode.TYPE_DIR:
if only_dir and isinstance(node, cnode.NodeDir):
# ignore non directory
return False
@ -663,11 +652,11 @@ class Noder:
###############################################################
# ls
###############################################################
def list(self, top: Node,
def list(self, top: NodeTop,
path: str,
rec: bool = False,
fmt: str = 'native',
raw: bool = False) -> List[Node]:
raw: bool = False) -> List[NodeAny]:
"""
list nodes for "ls"
@top: top node
@ -729,7 +718,7 @@ class Noder:
# tree creation
###############################################################
def _add_entry(self, name: str,
top: Node,
top: NodeTop,
resolv: Any) -> None:
"""add an entry to the tree"""
entries = name.rstrip(os.sep).split(os.sep)
@ -744,7 +733,7 @@ class Noder:
except anytree.resolver.ChildResolverError:
self.new_archive_node(nodename, name, top, top.name)
def list_to_tree(self, parent: Node, names: List[str]) -> None:
def list_to_tree(self, parent: NodeAny, names: List[str]) -> None:
"""convert list of files to a tree"""
if not names:
return
@ -757,23 +746,23 @@ class Noder:
# diverse
###############################################################
def _sort_tree(self,
items: List[Node]) -> List[Node]:
items: List[NodeAny]) -> List[NodeAny]:
"""sorting a list of items"""
return sorted(items, key=self._sort, reverse=self.sortsize)
def _sort(self, lst: Node) -> Any:
def _sort(self, lst: NodeAny) -> Any:
"""sort a list"""
if self.sortsize:
return self._sort_size(lst)
return self._sort_fs(lst)
@staticmethod
def _sort_fs(node: Node) -> Tuple[str, str]:
def _sort_fs(node: NodeAny) -> Tuple[str, str]:
"""sorting nodes dir first and alpha"""
return (node.type, node.name.lstrip('.').lower())
@staticmethod
def _sort_size(node: Node) -> float:
def _sort_size(node: NodeAny) -> float:
"""sorting nodes by size"""
try:
if not node.size:
@ -782,22 +771,22 @@ class Noder:
except AttributeError:
return 0
def _get_storage(self, node: Node) -> Node:
def _get_storage(self, node: NodeAny) -> NodeStorage:
"""recursively traverse up to find storage"""
if node.type == cnode.TYPE_STORAGE:
if isinstance(node, cnode.NodeStorage):
return node
return cast(Node, node.ancestors[1])
return cast(NodeStorage, node.ancestors[1])
@staticmethod
def _has_attr(node: Node, attr: str) -> bool:
def _has_attr(node: NodeAny, attr: str) -> bool:
"""return True if node has attr as attribute"""
return attr in node.__dict__.keys()
def _get_parents(self, node: Node) -> str:
def _get_parents(self, node: NodeAny) -> str:
"""get all parents recursively"""
if node.type == cnode.TYPE_STORAGE:
if isinstance(node, cnode.NodeStorage):
return ''
if node.type == cnode.TYPE_TOP:
if isinstance(node, cnode.NodeTop):
return ''
parent = self._get_parents(node.parent)
if parent:
@ -813,13 +802,6 @@ class Noder:
Logger.err(str(exc))
return ''
@staticmethod
def _sanitize(node: Node) -> Node:
"""sanitize node strings"""
node.name = fix_badchars(node.name)
node.relpath = fix_badchars(node.relpath)
return node
def _debug(self, string: str) -> None:
"""print debug"""
if not self.debug:

@ -9,9 +9,9 @@ import os
from typing import Tuple, Optional
# local imports
from catcli.cnode import Node
from catcli.noder import Noder
from catcli.logger import Logger
from catcli.cnode import NodeAny, NodeTop
class Walker:
@ -36,7 +36,7 @@ class Walker:
self.lpath = logpath
def index(self, path: str,
parent: Node,
parent: NodeAny,
name: str,
storagepath: str = '') -> Tuple[str, int]:
"""
@ -89,15 +89,15 @@ class Walker:
self._progress('')
return parent, cnt
def reindex(self, path: str, parent: Node, top: Node) -> int:
def reindex(self, path: str, parent: NodeAny, top: NodeTop) -> int:
"""reindex a directory and store in tree"""
cnt = self._reindex(path, parent, top)
cnt += self.noder.clean_not_flagged(parent)
return cnt
def _reindex(self, path: str,
parent: Node,
top: Node,
parent: NodeAny,
top: NodeTop,
storagepath: str = '') -> int:
"""
reindex a directory and store in tree
@ -148,9 +148,9 @@ class Walker:
return cnt
def _need_reindex(self,
top: Node,
top: NodeTop,
path: str,
treepath: str) -> Tuple[bool, Optional[Node]]:
treepath: str) -> Tuple[bool, Optional[NodeTop]]:
"""
test if node needs re-indexing
@top: top node (storage)

Loading…
Cancel
Save