You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
catcli/catcli/noder.py

850 lines
30 KiB
Python

"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2017, deadc0de6
Class that process nodes in the catalog tree
"""
import os
import shutil
import time
from typing import List, Union, Tuple, Any, Optional, Dict, cast
import anytree # type: ignore
# local imports
from catcli import nodes
from catcli.nodes import NodeAny, NodeStorage, \
NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta, \
typcast_node
from catcli.utils import size_to_str, epoch_to_str, md5sum, fix_badchars
from catcli.logger import Logger
from catcli.nodeprinter import NodePrinter
from catcli.decomp import Decomp
from catcli.version import __version__ as VERSION
from catcli.exceptions import CatcliException
class Noder:
"""
handles node in the catalog tree
There are 4 types of node:
* "top" node representing the top node (generic node)
* "storage" node representing a storage
* "dir" node representing a directory
* "file" node representing a file
"""
CSV_HEADER = ('name,type,path,size,indexed_at,'
'maccess,md5,nbfiles,free_space,'
'total_space,meta')
def __init__(self, debug: bool = False,
sortsize: bool = False,
arc: bool = False) -> None:
"""
@debug: debug mode
@sortsize: sort nodes by size
@arch: handle archive
"""
self.hash = True
self.debug = debug
self.sortsize = sortsize
self.arc = arc
if self.arc:
self.decomp = Decomp()
@staticmethod
def get_storage_names(top: NodeTop) -> List[str]:
"""return a list of all storage names"""
return [x.name for x in list(top.children)]
def get_storage_node(self, top: NodeTop,
name: str,
newpath: str = '') -> NodeStorage:
"""
return the storage node if any
if newpath is submitted, it will update the media info
"""
found = None
for node in top.children:
if node.type != nodes.TYPE_STORAGE:
continue
if node.name == name:
found = node
break
if found and newpath and os.path.exists(newpath):
found.free = shutil.disk_usage(newpath).free
found.total = shutil.disk_usage(newpath).total
found.ts = int(time.time())
return cast(NodeStorage, found)
@staticmethod
def get_node(top: NodeTop,
path: str,
quiet: bool = False) -> Optional[NodeAny]:
"""get the node by internal tree path"""
resolv = anytree.resolver.Resolver('name')
try:
bpath = os.path.basename(path)
the_node = resolv.get(top, bpath)
typcast_node(the_node)
return cast(NodeAny, the_node)
except anytree.resolver.ChildResolverError:
if not quiet:
Logger.err(f'No node at path \"{bpath}\"')
return None
def get_node_if_changed(self,
top: NodeTop,
path: str,
treepath: str) -> Tuple[Optional[NodeAny], bool]:
"""
return the node (if any) and if it has changed
@top: top node (storage)
@path: abs path to file
@treepath: rel path from indexed directory
"""
treepath = treepath.lstrip(os.sep)
node = self.get_node(top, treepath, quiet=True)
# node does not exist
if not node:
self._debug('\tchange: node does not exist')
return None, True
if os.path.isdir(path):
return node, False
# force re-indexing if no maccess
maccess = os.path.getmtime(path)
if not self._has_attr(node, 'maccess') or \
not node.maccess:
self._debug('\tchange: no maccess found')
return node, True
# maccess changed
old_maccess = node.maccess
if float(maccess) != float(old_maccess):
self._debug(f'\tchange: maccess changed for \"{path}\"')
return node, True
# test hash
if self.hash and node.md5:
md5 = self._get_hash(path)
if md5 and md5 != node.md5:
msg = f'\tchange: checksum changed for \"{path}\"'
self._debug(msg)
return node, True
self._debug(f'\tchange: no change for \"{path}\"')
return node, False
def rec_size(self, node: Union[NodeDir, NodeStorage],
store: bool = True) -> int:
"""
recursively traverse tree and return size
@store: store the size in the node
"""
if node.type == nodes.TYPE_FILE:
node.__class__ = NodeFile
msg = f'size of {node.type} \"{node.name}\": {node.nodesize}'
self._debug(msg)
return node.nodesize
msg = f'getting node size recursively for \"{node.name}\"'
self._debug(msg)
fullsize: int = 0
for i in node.children:
if node.type == nodes.TYPE_DIR:
sub_size = self.rec_size(i, store=store)
if store:
i.nodesize = sub_size
fullsize += sub_size
continue
if node.type == nodes.TYPE_STORAGE:
sub_size = self.rec_size(i, store=store)
if store:
i.nodesize = sub_size
fullsize += sub_size
continue
self._debug(f'skipping {node.name}')
if store:
node.nodesize = fullsize
self._debug(f'size of {node.type} \"{node.name}\": {fullsize}')
return fullsize
###############################################################
# public helpers
###############################################################
@staticmethod
def attrs_to_string(attr: Union[List[str], Dict[str, str], str]) -> str:
"""format the storage attr for saving"""
if not attr:
return ''
if isinstance(attr, list):
return ', '.join(attr)
if isinstance(attr, dict):
ret = []
for key, val in attr.items():
ret.append(f'{key}={val}')
return ', '.join(ret)
attr = attr.rstrip()
return attr
def do_hashing(self, val: bool) -> None:
"""hash files when indexing"""
self.hash = val
###############################################################
# node creation
###############################################################
def new_top_node(self) -> NodeTop:
"""create a new top node"""
top = NodeTop(nodes.NAME_TOP)
self._debug(f'new top node: {top}')
return top
def new_file_node(self, name: str, path: str,
parent: NodeAny) -> Optional[NodeFile]:
"""create a new node representing a file"""
if not os.path.exists(path):
Logger.err(f'File \"{path}\" does not exist')
return None
path = os.path.abspath(path)
try:
stat = os.lstat(path)
except OSError as exc:
Logger.err(f'OSError: {exc}')
return None
md5 = ''
if self.hash:
md5 = self._get_hash(path)
maccess = os.path.getmtime(path)
node = NodeFile(name,
stat.st_size,
md5,
maccess,
parent=parent)
if self.arc:
ext = os.path.splitext(path)[1][1:]
if ext.lower() in self.decomp.get_formats():
self._debug(f'{path} is an archive')
names = self.decomp.get_names(path)
self.list_to_tree(node, names)
else:
self._debug(f'{path} is NOT an archive')
return node
def new_dir_node(self, name: str, path: str,
parent: NodeAny) -> NodeDir:
"""create a new node representing a directory"""
path = os.path.abspath(path)
maccess = os.path.getmtime(path)
return NodeDir(name,
0,
maccess,
parent=parent)
def new_storage_node(self, name: str,
path: str,
parent: str,
attrs: Dict[str, Any]) \
-> NodeStorage:
"""create a new node representing a storage"""
path = os.path.abspath(path)
free = shutil.disk_usage(path).free
total = shutil.disk_usage(path).total
epoch = int(time.time())
return NodeStorage(name,
free,
total,
0,
epoch,
self.attrs_to_string(attrs),
parent=parent)
def new_archive_node(self,
name: str,
parent: str,
archive: str) -> NodeArchived:
"""create a new node for archive data"""
return NodeArchived(name=name,
parent=parent, nodesize=0, md5='',
archive=archive)
###############################################################
# node management
###############################################################
def update_metanode(self, top: NodeTop) -> NodeMeta:
"""create or update meta node information"""
meta = self._get_meta_node(top)
epoch = int(time.time())
if not meta:
attrs: Dict[str, Any] = {}
attrs['created'] = epoch
attrs['created_version'] = VERSION
meta = NodeMeta(name=nodes.NAME_META,
attr=attrs)
meta.attr['access'] = epoch
meta.attr['access_version'] = VERSION
return meta
def _get_meta_node(self, top: NodeTop) -> Optional[NodeMeta]:
"""return the meta node if any"""
try:
found = next(filter(lambda x: x.type == nodes.TYPE_META,
top.children))
return cast(NodeMeta, found)
except StopIteration:
return None
def clean_not_flagged(self, top: NodeTop) -> int:
"""remove any node not flagged and clean flags"""
cnt = 0
for node in anytree.PreOrderIter(top):
typcast_node(node)
if node.type not in [nodes.TYPE_DIR, nodes.TYPE_FILE]:
continue
if self._clean(node):
cnt += 1
return cnt
def _clean(self, node: NodeAny) -> bool:
"""remove node if not flagged"""
if not node.flagged():
node.parent = None
return True
node.unflag()
return False
###############################################################
# printing
###############################################################
def _node_to_csv(self, node: NodeAny,
sep: str = ',',
raw: bool = False) -> None:
"""
print a node to csv
@node: the node to consider
@sep: CSV separator character
@raw: print raw size rather than human readable
"""
if not node:
return
if node.type == nodes.TYPE_TOP:
return
out = []
if node.type == nodes.TYPE_STORAGE:
# handle storage
out.append(node.name) # name
out.append(node.type) # type
out.append('') # fake full path
size = self.rec_size(node, store=False)
out.append(size_to_str(size, raw=raw)) # size
out.append(epoch_to_str(node.ts)) # indexed_at
out.append('') # fake maccess
out.append('') # fake md5
out.append(str(len(node.children))) # nbfiles
# fake free_space
out.append(size_to_str(node.free, raw=raw))
# fake total_space
out.append(size_to_str(node.total, raw=raw))
out.append(node.attr) # meta
else:
# handle other nodes
out.append(node.name.replace('"', '""')) # name
out.append(node.type) # type
parents = self._get_parents(node)
storage = self._get_storage(node)
fullpath = os.path.join(storage.name, parents)
out.append(fullpath.replace('"', '""')) # full path
out.append(size_to_str(node.nodesize, raw=raw)) # size
out.append(epoch_to_str(storage.ts)) # indexed_at
if self._has_attr(node, 'maccess'):
out.append(epoch_to_str(node.maccess)) # maccess
else:
out.append('') # fake maccess
if self._has_attr(node, 'md5'):
out.append(node.md5) # md5
else:
out.append('') # fake md5
if node.type == nodes.TYPE_DIR:
out.append(str(len(node.children))) # nbfiles
else:
out.append('') # fake nbfiles
out.append('') # fake free_space
out.append('') # fake total_space
out.append('') # fake meta
line = sep.join(['"' + o + '"' for o in out])
if len(line) > 0:
Logger.stdout_nocolor(line)
def node_has_subs(self, node: Any) -> bool:
"""
node may have children
we explicitely handle all case
for clarity
"""
if not node:
return False
if node.type == nodes.TYPE_TOP:
return True
if node.type == nodes.TYPE_FILE:
return False
if node.type == nodes.TYPE_DIR:
return True
if node.type == nodes.TYPE_STORAGE:
return True
if node.type == nodes.TYPE_ARCHIVED:
return True
return False
def _print_node_native(self, node: NodeAny,
pre: str = '',
withpath: bool = False,
withnbchildren: bool = False,
withstorage: bool = False,
raw: bool = False) -> None:
"""
print a node
@node: the node to print
@pre: string to print before node
@withpath: print the node path
@withnbchildren: print the node nb children
@withstorage: print the node storage it belongs to
@raw: print raw size rather than human readable
"""
if node.type == nodes.TYPE_TOP:
# top node
node.__class__ = NodeTop
Logger.stdout_nocolor(f'{pre}{node.name}')
elif node.type == nodes.TYPE_FILE:
# node of type file
node.__class__ = NodeFile
name = node.name
storage = self._get_storage(node)
if withpath:
name = os.sep.join([
storage.name,
self._get_parents(node.parent),
name])
name = name.lstrip(os.sep)
attr_str = ''
if node.md5:
attr_str = f', md5:{node.md5}'
size = size_to_str(node.nodesize, raw=raw)
compl = f'size:{size}{attr_str}'
if withstorage:
content = Logger.get_bold_text(storage.name)
compl += f', storage:{content}'
NodePrinter.print_file_native(pre, name, compl)
elif node.type == nodes.TYPE_DIR:
# node of type directory
node.__class__ = NodeDir
name = node.name
storage = self._get_storage(node)
if withpath:
name = os.sep.join([
storage.name,
self._get_parents(node.parent),
name])
name = name.lstrip(os.sep)
nbchildren = 0
if withnbchildren:
nbchildren = len(node.children)
attr: List[Tuple[str, str]] = []
if node.nodesize:
attr.append(('totsize', size_to_str(node.nodesize, raw=raw)))
if withstorage:
attr.append(('storage', Logger.get_bold_text(storage.name)))
NodePrinter.print_dir_native(pre,
name,
nbchildren=nbchildren,
attr=attr)
elif node.type == nodes.TYPE_STORAGE:
# node of type storage
node.__class__ = NodeStorage
sztotal = size_to_str(node.total, raw=raw)
szused = size_to_str(node.total - node.free, raw=raw)
nbchildren = len(node.children)
pcent = 0
if node.total > 0:
pcent = node.free * 100 / node.total
freepercent = f'{pcent:.1f}%'
# get the date
timestamp = ''
if self._has_attr(node, 'ts'):
timestamp = 'date:'
timestamp += epoch_to_str(node.ts)
disksize = ''
# the children size
recsize = self.rec_size(node, store=False)
sizestr = size_to_str(recsize, raw=raw)
disksize = 'totsize:' + f'{sizestr}'
# format the output
name = node.name
args = [
'nbfiles:' + f'{nbchildren}',
disksize,
f'free:{freepercent}',
'du:' + f'{szused}/{sztotal}',
timestamp]
argsstring = ' | '.join(args)
NodePrinter.print_storage_native(pre,
name,
argsstring,
node.attr)
elif node.type == nodes.TYPE_ARCHIVED:
# archive node
node.__class__ = NodeArchived
if self.arc:
NodePrinter.print_archive_native(pre, node.name, node.archive)
else:
Logger.err(f'bad node encountered: {node}')
def print_tree(self, node: NodeAny,
fmt: str = 'native',
raw: bool = False) -> None:
"""
print the tree in different format
@node: start node
@style: when fmt=native, defines the tree style
@fmt: output format
@raw: print the raw size rather than human readable
"""
if fmt == 'native':
# "tree" style
rend = anytree.RenderTree(node, childiter=self._sort_tree)
for pre, _, thenode in rend:
self._print_node_native(thenode, pre=pre,
withnbchildren=True, raw=raw)
elif fmt == 'csv':
# csv output
self._to_csv(node, raw=raw)
elif fmt == 'csv-with-header':
# csv output
Logger.stdout_nocolor(self.CSV_HEADER)
self._to_csv(node, raw=raw)
def _to_csv(self, node: NodeAny,
raw: bool = False) -> None:
"""print the tree to csv"""
rend = anytree.RenderTree(node, childiter=self._sort_tree)
for _, _, item in rend:
self._node_to_csv(item, raw=raw)
@staticmethod
def _fzf_prompt(strings: Any) -> Any:
"""prompt with fzf"""
try:
from pyfzf.pyfzf import FzfPrompt # type: ignore # pylint: disable=C0415 # noqa
fzf = FzfPrompt()
selected = fzf.prompt(strings)
return selected
except ModuleNotFoundError:
Logger.err('install pyfzf to use fzf')
return None
def _to_fzf(self, node: NodeAny, fmt: str) -> None:
"""
fzf prompt with list and print selected node(s)
@node: node to start with
@fmt: output format for selected nodes
"""
rendered = anytree.RenderTree(node, childiter=self._sort_tree)
the_nodes = {}
# construct node names list
for _, _, rend in rendered:
if not rend:
continue
parents = self._get_parents(rend)
storage = self._get_storage(rend)
fullpath = os.path.join(storage.name, parents)
the_nodes[fullpath] = rend
# prompt with fzf
paths = self._fzf_prompt(the_nodes.keys())
# print the resulting tree
subfmt = fmt.replace('fzf-', '')
for path in paths:
if not path:
continue
if path not in the_nodes:
continue
rend = the_nodes[path]
self.print_tree(rend, fmt=subfmt)
@staticmethod
def to_dot(top: NodeTop,
path: str = 'tree.dot') -> str:
"""export to dot for graphing"""
anytree.exporter.DotExporter(top).to_dotfile(path)
Logger.info(f'dot file created under \"{path}\"')
return f'dot {path} -T png -o /tmp/tree.png'
###############################################################
# searching
###############################################################
def find_name(self, top: NodeTop,
key: str,
script: bool = False,
only_dir: bool = False,
startnode: Optional[NodeAny] = None,
fmt: str = 'native',
raw: bool = False) -> List[NodeAny]:
"""
find files based on their names
@top: top node
@key: term to search for
@script: output script
@directory: only search for directories
@startpath: node to start with
@fmt: output format
@raw: raw size output
returns the found nodes
"""
self._debug(f'searching for \"{key}\"')
# search for nodes based on path
start: Optional[NodeAny] = top
if startnode:
start = self.get_node(top, startnode)
filterfunc = self._callback_find_name(key, only_dir)
found = anytree.findall(start, filter_=filterfunc)
self._debug(f'found {len(found)} node(s)')
# compile found nodes
paths = {}
for item in found:
item.name = fix_badchars(item.name)
storage = self._get_storage(item)
parents = self._get_parents(item)
key = f'{storage}/{parents}/{item.name}'
paths[parents] = item
# handle fzf mode
if fmt.startswith('fzf'):
selected = self._fzf_prompt(paths.keys())
newpaths = {}
subfmt = fmt.replace('fzf-', '')
for item in selected:
if item not in paths:
continue
newpaths[item] = paths[item]
self.print_tree(newpaths[item], fmt=subfmt)
paths = newpaths
else:
if fmt == 'native':
for _, item in paths.items():
self._print_node_native(item, withpath=True,
withnbchildren=True,
withstorage=True,
raw=raw)
elif fmt.startswith('csv'):
if fmt == 'csv-with-header':
Logger.stdout_nocolor(self.CSV_HEADER)
for _, item in paths.items():
self._node_to_csv(item, raw=raw)
# execute script if any
if script:
tmp = ['${source}/' + x for x in paths]
tmpstr = ' '.join(tmp)
cmd = f'op=file; source=/media/mnt; $op {tmpstr}'
Logger.info(cmd)
return list(paths.values())
def _callback_find_name(self, term: str, only_dir: bool) -> Any:
"""callback for finding files"""
def find_name(node: NodeAny) -> bool:
if node.type == nodes.TYPE_STORAGE:
# ignore storage nodes
return False
if node.type == nodes.TYPE_TOP:
# ignore top nodes
return False
if node.type == nodes.TYPE_META:
# ignore meta nodes
return False
if only_dir and node.type == nodes.TYPE_DIR:
# ignore non directory
return False
# filter
if not term:
return True
if term.lower() in node.name.lower():
return True
# ignore
return False
return find_name
###############################################################
# ls
###############################################################
def list(self, top: NodeTop,
path: str,
rec: bool = False,
fmt: str = 'native',
raw: bool = False) -> List[NodeAny]:
"""
list nodes for "ls"
@top: top node
@path: path to search for
@rec: recursive walk
@fmt: output format
@raw: print raw size
"""
self._debug(f'walking path: \"{path}\" from {top}')
resolv = anytree.resolver.Resolver('name')
found = []
try:
if '*' in path or '?' in path:
# we need to handle glob
found = resolv.glob(top, path)
else:
# we have a canonical path
found = resolv.get(top, path)
if found and self.node_has_subs(found):
# let's find its children as well
print(path)
modpath = os.path.join(path, '*')
found = resolv.glob(top, modpath)
else:
found = [found]
if len(found) < 1:
# nothing found
self._debug('nothing found')
return []
if rec:
# print the entire tree
self.print_tree(found[0].parent, fmt=fmt, raw=raw)
return found
# sort found nodes
found = sorted(found, key=self._sort, reverse=self.sortsize)
# print the parent
if fmt == 'native':
self._print_node_native(found[0].parent,
withpath=False,
withnbchildren=True,
raw=raw)
elif fmt.startswith('csv'):
self._node_to_csv(found[0].parent, raw=raw)
elif fmt.startswith('fzf'):
pass
# print all found nodes
if fmt == 'csv-with-header':
Logger.stdout_nocolor(self.CSV_HEADER)
for item in found:
if fmt == 'native':
self._print_node_native(item, withpath=False,
pre='- ',
withnbchildren=True,
raw=raw)
elif fmt.startswith('csv'):
self._node_to_csv(item, raw=raw)
elif fmt.startswith('fzf'):
self._to_fzf(item, fmt)
except anytree.resolver.ChildResolverError:
pass
return found
###############################################################
# tree creation
###############################################################
def _add_entry(self, name: str,
top: NodeTop,
resolv: Any) -> None:
"""add an entry to the tree"""
entries = name.rstrip(os.sep).split(os.sep)
if len(entries) == 1:
self.new_archive_node(name, top, top.name)
return
sub = os.sep.join(entries[:-1])
nodename = entries[-1]
try:
parent = resolv.get(top, sub)
parent = self.new_archive_node(nodename, parent, top.name)
except anytree.resolver.ChildResolverError:
self.new_archive_node(nodename, top, top.name)
def list_to_tree(self, parent: NodeAny, names: List[str]) -> None:
"""convert list of files to a tree"""
if not names:
return
resolv = anytree.resolver.Resolver('name')
for name in names:
name = name.rstrip(os.sep)
self._add_entry(name, parent, resolv)
###############################################################
# diverse
###############################################################
def _sort_tree(self,
items: List[NodeAny]) -> List[NodeAny]:
"""sorting a list of items"""
return sorted(items, key=self._sort, reverse=self.sortsize)
def _sort(self, lst: NodeAny) -> Any:
"""sort a list"""
if self.sortsize:
return self._sort_size(lst)
return self._sort_fs(lst)
@staticmethod
def _sort_fs(node: NodeAny) -> Tuple[str, str]:
"""sorting nodes dir first and alpha"""
return (node.type, node.name.lstrip('.').lower())
@staticmethod
def _sort_size(node: NodeAny) -> float:
"""sorting nodes by size"""
try:
if not node.nodesize:
return 0
return float(node.nodesize)
except AttributeError:
return 0
def _get_storage(self, node: NodeAny) -> NodeStorage:
"""recursively traverse up to find storage"""
if node.type == nodes.TYPE_STORAGE:
return node
return cast(NodeStorage, node.ancestors[1])
@staticmethod
def _has_attr(node: NodeAny, attr: str) -> bool:
"""return True if node has attr as attribute"""
return attr in node.__dict__.keys()
def _get_parents(self, node: NodeAny) -> str:
"""get all parents recursively"""
if node.type == nodes.TYPE_STORAGE:
return ''
if node.type == nodes.TYPE_TOP:
return ''
parent = self._get_parents(node.parent)
if parent:
return os.sep.join([parent, node.name])
return str(node.name)
@staticmethod
def _get_hash(path: str) -> str:
"""return md5 hash of node"""
try:
return md5sum(path)
except CatcliException as exc:
Logger.err(str(exc))
return ''
def _debug(self, string: str) -> None:
"""print debug"""
if not self.debug:
return
Logger.debug(string)