add custom node type

pull/30/head
deadc0de6 1 year ago
parent 4a9e565e74
commit 50eb5cf9fd

@ -7,11 +7,12 @@ Class that represents the catcli catalog
import os
import pickle
import anytree # type: ignore
from typing import Optional, Union, Any, cast
from anytree.exporter import JsonExporter # type: ignore
from anytree.importer import JsonImporter # type: ignore
# local imports
from catcli.cnode import Node
from catcli.utils import ask
from catcli.logger import Logger
@ -32,10 +33,10 @@ class Catalog:
self.path = path
self.debug = debug
self.force = force
self.metanode = None
self.metanode: Optional[Node] = None
self.pickle = usepickle
def set_metanode(self, metanode: anytree.AnyNode) -> None:
def set_metanode(self, metanode: Node) -> None:
"""remove the metanode until tree is re-written"""
self.metanode = metanode
if self.metanode:
@ -49,7 +50,7 @@ class Catalog:
return True
return False
def restore(self) -> anytree.AnyNode:
def restore(self) -> Optional[Node]:
"""restore the catalog"""
if not self.path:
return None
@ -61,7 +62,7 @@ class Catalog:
content = file.read()
return self._restore_json(content)
def save(self, node: anytree.AnyNode) -> bool:
def save(self, node: Node) -> bool:
"""save the catalog"""
if not self.path:
Logger.err('Path not defined')
@ -87,14 +88,14 @@ class Catalog:
return
Logger.debug(text)
def _save_pickle(self, node: anytree.AnyNode) -> bool:
def _save_pickle(self, node: Node) -> bool:
"""pickle the catalog"""
with open(self.path, 'wb') as file:
pickle.dump(node, file)
self._debug(f'Catalog saved to pickle \"{self.path}\"')
return True
def _restore_pickle(self) -> anytree.AnyNode:
def _restore_pickle(self) -> Union[Node, Any]:
"""restore the pickled tree"""
with open(self.path, 'rb') as file:
root = pickle.load(file)
@ -102,7 +103,7 @@ class Catalog:
self._debug(msg)
return root
def _save_json(self, node: anytree.AnyNode) -> bool:
def _save_json(self, node: Node) -> bool:
"""export the catalog in json"""
exp = JsonExporter(indent=2, sort_keys=True)
with open(self.path, 'w', encoding='UTF-8') as file:
@ -110,9 +111,9 @@ class Catalog:
self._debug(f'Catalog saved to json \"{self.path}\"')
return True
def _restore_json(self, string: str) -> anytree.AnyNode:
def _restore_json(self, string: str) -> Node:
"""restore the tree from json"""
imp = JsonImporter()
root = imp.import_(string)
self._debug(f'Catalog imported from json \"{self.path}\"')
return root
return cast(Node, root)

@ -12,19 +12,20 @@ import sys
import os
import datetime
from typing import Dict, Any, List
import anytree # type: ignore
from docopt import docopt
# local imports
from .version import __version__ as VERSION
from .logger import Logger
from .colors import Colors
from .catalog import Catalog
from .walker import Walker
from .noder import Noder
from .utils import ask, edit
from .fuser import Fuser
from .exceptions import BadFormatException, CatcliException
from catcli import cnode
from catcli.version import __version__ as VERSION
from catcli.logger import Logger
from catcli.colors import Colors
from catcli.catalog import Catalog
from catcli.walker import Walker
from catcli.cnode import Node
from catcli.noder import Noder
from catcli.utils import ask, edit
from catcli.fuser import Fuser
from catcli.exceptions import BadFormatException, CatcliException
NAME = 'catcli'
CUR = os.path.dirname(os.path.abspath(__file__))
@ -81,7 +82,7 @@ Options:
def cmd_mount(args: Dict[str, Any],
top: anytree.AnyNode,
top: Node,
noder: Noder) -> None:
"""mount action"""
mountpoint = args['<mountpoint>']
@ -93,7 +94,7 @@ def cmd_mount(args: Dict[str, Any],
def cmd_index(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: anytree.AnyNode) -> None:
top: Node) -> None:
"""index action"""
path = args['<path>']
name = args['<name>']
@ -116,8 +117,8 @@ def cmd_index(args: Dict[str, Any],
start = datetime.datetime.now()
walker = Walker(noder, usehash=usehash, debug=debug)
attr = noder.format_storage_attr(args['--meta'])
root = noder.new_storage_node(name, path, parent=top, attr=attr)
attr = noder.attrs_to_string(args['--meta'])
root = noder.new_storage_node(name, path, parent=top, attrs=attr)
_, cnt = walker.index(path, root, name)
if subsize:
noder.rec_size(root, store=True)
@ -131,7 +132,7 @@ def cmd_index(args: Dict[str, Any],
def cmd_update(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: anytree.AnyNode) -> None:
top: Node) -> None:
"""update action"""
path = args['<path>']
name = args['<name>']
@ -142,7 +143,7 @@ def cmd_update(args: Dict[str, Any],
if not os.path.exists(path):
Logger.err(f'\"{path}\" does not exist')
return
root = noder.get_storage_node(top, name, path=path)
root = noder.get_storage_node(top, name, newpath=path)
if not root:
Logger.err(f'storage named \"{name}\" does not exist')
return
@ -161,7 +162,7 @@ def cmd_update(args: Dict[str, Any],
def cmd_ls(args: Dict[str, Any],
noder: Noder,
top: anytree.AnyNode) -> List[anytree.AnyNode]:
top: Node) -> List[Node]:
"""ls action"""
path = args['<path>']
if not path:
@ -169,7 +170,7 @@ def cmd_ls(args: Dict[str, Any],
if not path.startswith(SEPARATOR):
path = SEPARATOR + path
# prepend with top node path
pre = f'{SEPARATOR}{noder.NAME_TOP}'
pre = f'{SEPARATOR}{cnode.NAME_TOP}'
if not path.startswith(pre):
path = pre + path
# ensure ends with a separator
@ -196,7 +197,7 @@ def cmd_ls(args: Dict[str, Any],
def cmd_rm(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: anytree.AnyNode) -> anytree.AnyNode:
top: Node) -> Node:
"""rm action"""
name = args['<storage>']
node = noder.get_storage_node(top, name)
@ -211,7 +212,7 @@ def cmd_rm(args: Dict[str, Any],
def cmd_find(args: Dict[str, Any],
noder: Noder,
top: anytree.AnyNode) -> List[anytree.AnyNode]:
top: Node) -> List[Node]:
"""find action"""
fromtree = args['--parent']
directory = args['--directory']
@ -231,7 +232,7 @@ def cmd_find(args: Dict[str, Any],
def cmd_graph(args: Dict[str, Any],
noder: Noder,
top: anytree.AnyNode) -> None:
top: Node) -> None:
"""graph action"""
path = args['<path>']
if not path:
@ -242,7 +243,7 @@ def cmd_graph(args: Dict[str, Any],
def cmd_rename(args: Dict[str, Any],
catalog: Catalog,
top: anytree.AnyNode) -> None:
top: Node) -> None:
"""rename action"""
storage = args['<storage>']
new = args['<name>']
@ -260,7 +261,7 @@ def cmd_rename(args: Dict[str, Any],
def cmd_edit(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: anytree.AnyNode) -> None:
top: Node) -> None:
"""edit action"""
storage = args['<storage>']
storages = list(x.name for x in top.children)
@ -270,7 +271,7 @@ def cmd_edit(args: Dict[str, Any],
if not attr:
attr = ''
new = edit(attr)
node.attr = noder.format_storage_attr(new)
node.attr = noder.attrs_to_string(new)
if catalog.save(top):
Logger.info(f'Storage \"{storage}\" edited')
else:

@ -0,0 +1,68 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2023, deadc0de6
Class that represents a node in the catalog tree
"""
# pylint: disable=W0622
from anytree import NodeMixin # type: ignore
TYPE_TOP = 'top'
TYPE_FILE = 'file'
TYPE_DIR = 'dir'
TYPE_ARC = 'arc'
TYPE_STORAGE = 'storage'
TYPE_META = 'meta'
NAME_TOP = 'top'
NAME_META = 'meta'
class Node(NodeMixin): # type: ignore
"""a node in the catalog"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
type: str,
size: float = 0,
relpath: str = '',
md5: str = '',
maccess: float = 0,
free: int = 0,
total: int = 0,
indexed_dt: int = 0,
attr: str = '',
archive: str = '',
parent=None,
children=None):
"""build a node"""
super().__init__()
self.name = name
self.type = type
self.size = size
self.relpath = relpath
self.md5 = md5
self.maccess = maccess
self.free = free
self.total = total
self.indexed_dt = indexed_dt
self.attr = attr
self.archive = archive
self.parent = parent
if children:
self.children = children
self._flagged = False
def flagged(self) -> bool:
"""is flagged"""
return self._flagged
def flag(self) -> None:
"""flag a node"""
self._flagged = True
def unflag(self) -> None:
"""unflag node"""
self._flagged = False

@ -9,10 +9,13 @@ import os
import logging
from time import time
from stat import S_IFDIR, S_IFREG
from typing import List, Dict, Any
import anytree # type: ignore
from typing import List, Dict, Any, Optional
import fuse # type: ignore
from .noder import Noder
# local imports
from catcli.noder import Noder
from catcli import cnode
from catcli.cnode import Node
# build custom logger to log in /tmp
@ -31,7 +34,7 @@ class Fuser:
"""fuse filesystem mounter"""
def __init__(self, mountpoint: str,
top: anytree.AnyNode,
top: Node,
noder: Noder,
debug: bool = False):
"""fuse filesystem"""
@ -47,15 +50,15 @@ class Fuser:
class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
"""in-memory filesystem for catcli catalog"""
def __init__(self, top: anytree.AnyNode,
def __init__(self, top: Node,
noder: Noder):
"""init fuse filesystem"""
self.top = top
self.noder = noder
def _get_entry(self, path: str) -> anytree.AnyNode:
def _get_entry(self, path: str) -> Optional[Node]:
"""return the node pointed by path"""
pre = f'{SEPARATOR}{self.noder.NAME_TOP}'
pre = f'{SEPARATOR}{cnode.NAME_TOP}'
if not path.startswith(pre):
path = pre + path
found = self.noder.list(self.top, path,
@ -66,9 +69,9 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
return found[0]
return None
def _get_entries(self, path: str) -> List[anytree.AnyNode]:
def _get_entries(self, path: str) -> List[Node]:
"""return nodes pointed by path"""
pre = f'{SEPARATOR}{self.noder.NAME_TOP}'
pre = f'{SEPARATOR}{cnode.NAME_TOP}'
if not path.startswith(pre):
path = pre + path
if not path.endswith(SEPARATOR):
@ -88,17 +91,17 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
curt = time()
mode: Any = S_IFREG
if entry.type == Noder.TYPE_ARC:
if entry.type == cnode.TYPE_ARC:
mode = S_IFREG
elif entry.type == Noder.TYPE_DIR:
elif entry.type == cnode.TYPE_DIR:
mode = S_IFDIR
elif entry.type == Noder.TYPE_FILE:
elif entry.type == cnode.TYPE_FILE:
mode = S_IFREG
elif entry.type == Noder.TYPE_STORAGE:
elif entry.type == cnode.TYPE_STORAGE:
mode = S_IFDIR
elif entry.type == Noder.TYPE_META:
elif entry.type == cnode.TYPE_META:
mode = S_IFREG
elif entry.type == Noder.TYPE_TOP:
elif entry.type == cnode.TYPE_TOP:
mode = S_IFREG
return {
'st_mode': (mode),

@ -8,11 +8,13 @@ Class that process nodes in the catalog tree
import os
import shutil
import time
from typing import List, Union, Tuple, Any, Optional, Dict
from typing import List, Union, Tuple, Any, Optional, Dict, cast
import anytree # type: ignore
from pyfzf.pyfzf import FzfPrompt # type: ignore
# local imports
from catcli import cnode
from catcli.cnode import Node
from catcli.utils import size_to_str, epoch_to_str, md5sum, fix_badchars
from catcli.logger import Logger
from catcli.nodeprinter import NodePrinter
@ -31,16 +33,6 @@ class Noder:
* "file" node representing a file
"""
NAME_TOP = 'top'
NAME_META = 'meta'
TYPE_TOP = 'top'
TYPE_FILE = 'file'
TYPE_DIR = 'dir'
TYPE_ARC = 'arc'
TYPE_STORAGE = 'storage'
TYPE_META = 'meta'
CSV_HEADER = ('name,type,path,size,indexed_at,'
'maccess,md5,nbfiles,free_space,'
'total_space,meta')
@ -61,46 +53,49 @@ class Noder:
self.decomp = Decomp()
@staticmethod
def get_storage_names(top: anytree.AnyNode) -> List[str]:
def get_storage_names(top: Node) -> List[str]:
"""return a list of all storage names"""
return [x.name for x in list(top.children)]
def get_storage_node(self, top: anytree.AnyNode,
name: str, path: str = '') -> anytree.AnyNode:
def get_storage_node(self, top: Node,
name: str,
newpath: str = '') -> Node:
"""
return the storage node if any
if path is submitted, it will update the media info
"""
found = None
for node in top.children:
if node.type != self.TYPE_STORAGE:
if node.type != cnode.TYPE_STORAGE:
continue
if node.name == name:
found = node
break
if found and path and os.path.exists(path):
found.free = shutil.disk_usage(path).free
found.total = shutil.disk_usage(path).total
if found and newpath and os.path.exists(newpath):
found.free = shutil.disk_usage(newpath).free
found.total = shutil.disk_usage(newpath).total
found.ts = int(time.time())
return found
return cast(Node, found)
@staticmethod
def get_node(top: str, path: str,
quiet: bool = False) -> anytree.AnyNode:
def get_node(top: Node,
path: str,
quiet: bool = False) -> Optional[Node]:
"""get the node by internal tree path"""
resolv = anytree.resolver.Resolver('name')
try:
bpath = os.path.basename(path)
return resolv.get(top, bpath)
the_node = resolv.get(top, bpath)
return cast(Node, the_node)
except anytree.resolver.ChildResolverError:
if not quiet:
Logger.err(f'No node at path \"{bpath}\"')
return None
def get_node_if_changed(self,
top: anytree.AnyNode,
top: Node,
path: str,
treepath: str) -> Tuple[anytree.AnyNode, bool]:
treepath: str) -> Tuple[Optional[Node], bool]:
"""
return the node (if any) and if it has changed
@top: top node (storage)
@ -136,25 +131,25 @@ class Noder:
self._debug(f'\tchange: no change for \"{path}\"')
return node, False
def rec_size(self, node: anytree.AnyNode,
def rec_size(self, node: Node,
store: bool = True) -> float:
"""
recursively traverse tree and return size
@store: store the size in the node
"""
if node.type == self.TYPE_FILE:
if node.type == cnode.TYPE_FILE:
self._debug(f'getting node size for \"{node.name}\"')
return float(node.size)
msg = f'getting node size recursively for \"{node.name}\"'
self._debug(msg)
size: float = 0
for i in node.children:
if node.type == self.TYPE_DIR:
if node.type == cnode.TYPE_DIR:
size = self.rec_size(i, store=store)
if store:
i.size = size
size += size
if node.type == self.TYPE_STORAGE:
if node.type == cnode.TYPE_STORAGE:
size = self.rec_size(i, store=store)
if store:
i.size = size
@ -169,28 +164,34 @@ class Noder:
# public helpers
###############################################################
@staticmethod
def format_storage_attr(attr: Union[str, List[str]]) -> str:
def attrs_to_string(attr: Union[List[str], Dict[str, str], str]) -> str:
"""format the storage attr for saving"""
if not attr:
return ''
if isinstance(attr, list):
return ', '.join(attr)
if isinstance(attr, dict):
ret = []
for key, val in attr.items():
ret.append(f'{key}={val}')
return ', '.join(ret)
attr = attr.rstrip()
return attr
def set_hashing(self, val: bool) -> None:
def do_hashing(self, val: bool) -> None:
"""hash files when indexing"""
self.hash = val
###############################################################
# node creation
###############################################################
def new_top_node(self) -> anytree.AnyNode:
def new_top_node(self) -> Node:
"""create a new top node"""
return anytree.AnyNode(name=self.NAME_TOP, type=self.TYPE_TOP)
return Node(cnode.NAME_TOP,
cnode.TYPE_TOP)
def new_file_node(self, name: str, path: str,
parent: str, storagepath: str) -> anytree.AnyNode:
parent: Node, storagepath: str) -> Optional[Node]:
"""create a new node representing a file"""
if not os.path.exists(path):
Logger.err(f'File \"{path}\" does not exist')
@ -207,7 +208,7 @@ class Noder:
relpath = os.sep.join([storagepath, name])
maccess = os.path.getmtime(path)
node = self._new_generic_node(name, self.TYPE_FILE,
node = self._new_generic_node(name, cnode.TYPE_FILE,
relpath, parent,
size=stat.st_size,
md5=md5,
@ -223,98 +224,107 @@ class Noder:
return node
def new_dir_node(self, name: str, path: str,
parent: str, storagepath: str) -> anytree.AnyNode:
parent: Node, storagepath: str) -> Node:
"""create a new node representing a directory"""
path = os.path.abspath(path)
relpath = os.sep.join([storagepath, name])
maccess = os.path.getmtime(path)
return self._new_generic_node(name, self.TYPE_DIR, relpath,
return self._new_generic_node(name, cnode.TYPE_DIR, relpath,
parent, maccess=maccess)
def new_storage_node(self, name: str,
path: str,
parent: str,
attr: Optional[str] = None) -> anytree.AnyNode:
attrs: str = '') -> Node:
"""create a new node representing a storage"""
path = os.path.abspath(path)
free = shutil.disk_usage(path).free
total = shutil.disk_usage(path).total
epoch = int(time.time())
return anytree.AnyNode(name=name, type=self.TYPE_STORAGE, free=free,
total=total, parent=parent, attr=attr, ts=epoch)
return Node(name=name,
type=cnode.TYPE_STORAGE,
free=free,
total=total,
parent=parent,
attr=attrs,
indexed_dt=epoch)
def new_archive_node(self, name: str, path: str,
parent: str, archive: str) -> anytree.AnyNode:
parent: str, archive: str) -> Node:
"""create a new node for archive data"""
return anytree.AnyNode(name=name, type=self.TYPE_ARC, relpath=path,
parent=parent, size=0, md5=None,
archive=archive)
return Node(name=name, type=cnode.TYPE_ARC, relpath=path,
parent=parent, size=0, md5='',
archive=archive)
@staticmethod
def _new_generic_node(name: str, nodetype: str,
relpath: str, parent: str,
def _new_generic_node(name: str,
nodetype: str,
relpath: str,
parent: Node,
size: float = 0,
md5: str = '',
maccess: float = 0) -> anytree.AnyNode:
maccess: float = 0) -> Node:
"""generic node creation"""
return anytree.AnyNode(name=name, type=nodetype, relpath=relpath,
parent=parent, size=size,
md5=md5, maccess=maccess)
return Node(name,
nodetype,
size=size,
relpath=relpath,
md5=md5,
maccess=maccess,
parent=parent)
###############################################################
# node management
###############################################################
def update_metanode(self, top: anytree.AnyNode) -> anytree.AnyNode:
def update_metanode(self, top: Node) -> Node:
"""create or update meta node information"""
meta = self._get_meta_node(top)
epoch = int(time.time())
if not meta:
attr: Dict[str, Any] = {}
attr['created'] = epoch
attr['created_version'] = VERSION
meta = anytree.AnyNode(name=self.NAME_META,
type=self.TYPE_META,
attr=attr)
meta.attr['access'] = epoch
meta.attr['access_version'] = VERSION
attrs: Dict[str, Any] = {}
attrs['created'] = epoch
attrs['created_version'] = VERSION
meta = Node(name=cnode.NAME_META,
type=cnode.TYPE_META,
attr=self.attrs_to_string(attrs))
if meta.attr:
meta.attr += ', '
meta.attr += f'access={epoch}'
meta.attr += ', '
meta.attr += f'access_version={VERSION}'
return meta
def _get_meta_node(self, top: anytree.AnyNode) -> anytree.AnyNode:
def _get_meta_node(self, top: Node) -> Optional[Node]:
"""return the meta node if any"""
try:
return next(filter(lambda x: x.type == self.TYPE_META,
top.children))
found = next(filter(lambda x: x.type == cnode.TYPE_META,
top.children))
return cast(Node, found)
except StopIteration:
return None
def clean_not_flagged(self, top: anytree.AnyNode) -> int:
def clean_not_flagged(self, top: Node) -> int:
"""remove any node not flagged and clean flags"""
cnt = 0
for node in anytree.PreOrderIter(top):
if node.type not in [self.TYPE_FILE, self.TYPE_DIR]:
if node.type not in [cnode.TYPE_FILE, cnode.TYPE_DIR]:
continue
if self._clean(node):
cnt += 1
return cnt
@staticmethod
def flag(node: anytree.AnyNode) -> None:
"""flag a node"""
node.flag = True
def _clean(self, node: anytree.AnyNode) -> bool:
def _clean(self, node: Node) -> bool:
"""remove node if not flagged"""
if not self._has_attr(node, 'flag') or \
not node.flag:
if not node.flagged():
node.parent = None
return True
del node.flag
node.unflag()
return False
###############################################################
# printing
###############################################################
def _node_to_csv(self, node: anytree.AnyNode,
def _node_to_csv(self, node: Node,
sep: str = ',',
raw: bool = False) -> None:
"""
@ -323,13 +333,13 @@ class Noder:
@sep: CSV separator character
@raw: print raw size rather than human readable
"""
if not node:
if not cnode:
return
if node.type == self.TYPE_TOP:
if node.type == node.TYPE_TOP:
return
out = []
if node.type == self.TYPE_STORAGE:
if node.type == node.TYPE_STORAGE:
# handle storage
out.append(node.name) # name
out.append(node.type) # type
@ -364,7 +374,7 @@ class Noder:
out.append(node.md5) # md5
else:
out.append('') # fake md5
if node.type == self.TYPE_DIR:
if node.type == cnode.TYPE_DIR:
out.append(str(len(node.children))) # nbfiles
else:
out.append('') # fake nbfiles
@ -376,7 +386,7 @@ class Noder:
if len(line) > 0:
Logger.stdout_nocolor(line)
def _print_node_native(self, node: anytree.AnyNode,
def _print_node_native(self, node: Node,
pre: str = '',
withpath: bool = False,
withdepth: bool = False,
@ -393,10 +403,10 @@ class Noder:
@recalcparent: get relpath from tree instead of relpath field
@raw: print raw size rather than human readable
"""
if node.type == self.TYPE_TOP:
if node.type == cnode.TYPE_TOP:
# top node
Logger.stdout_nocolor(f'{pre}{node.name}')
elif node.type == self.TYPE_FILE:
elif node.type == cnode.TYPE_FILE:
# node of type file
name = node.name
if withpath:
@ -416,7 +426,7 @@ class Noder:
content = Logger.get_bold_text(storage.name)
compl += f', storage:{content}'
NodePrinter.print_file_native(pre, name, compl)
elif node.type == self.TYPE_DIR:
elif node.type == cnode.TYPE_DIR:
# node of type directory
name = node.name
if withpath:
@ -436,7 +446,7 @@ class Noder:
if withstorage:
attr.append(('storage', Logger.get_bold_text(storage.name)))
NodePrinter.print_dir_native(pre, name, depth=depth, attr=attr)
elif node.type == self.TYPE_STORAGE:
elif node.type == cnode.TYPE_STORAGE:
# node of type storage
sztotal = size_to_str(node.total, raw=raw)
szused = size_to_str(node.total - node.free, raw=raw)
@ -466,14 +476,14 @@ class Noder:
name,
argsstring,
node.attr)
elif node.type == self.TYPE_ARC:
elif node.type == cnode.TYPE_ARC:
# archive node
if self.arc:
NodePrinter.print_archive_native(pre, node.name, node.archive)
else:
Logger.err(f'bad node encountered: {node}')
def print_tree(self, node: anytree.AnyNode,
def print_tree(self, node: Node,
fmt: str = 'native',
raw: bool = False) -> None:
"""
@ -497,7 +507,7 @@ class Noder:
Logger.stdout_nocolor(self.CSV_HEADER)
self._to_csv(node, raw=raw)
def _to_csv(self, node: anytree.AnyNode,
def _to_csv(self, node: Node,
raw: bool = False) -> None:
"""print the tree to csv"""
rend = anytree.RenderTree(node, childiter=self._sort_tree)
@ -511,7 +521,7 @@ class Noder:
selected = fzf.prompt(strings)
return selected
def _to_fzf(self, node: anytree.AnyNode, fmt: str) -> None:
def _to_fzf(self, node: Node, fmt: str) -> None:
"""
fzf prompt with list and print selected node(s)
@node: node to start with
@ -540,7 +550,7 @@ class Noder:
self.print_tree(rend, fmt=subfmt)
@staticmethod
def to_dot(node: anytree.AnyNode,
def to_dot(node: Node,
path: str = 'tree.dot') -> str:
"""export to dot for graphing"""
anytree.exporter.DotExporter(node).to_dotfile(path)
@ -550,14 +560,14 @@ class Noder:
###############################################################
# searching
###############################################################
def find_name(self, top: anytree.AnyNode,
def find_name(self, top: Node,
key: str,
script: bool = False,
only_dir: bool = False,
startnode: anytree.AnyNode = None,
startnode: Optional[Node] = None,
parentfromtree: bool = False,
fmt: str = 'native',
raw: bool = False) -> List[anytree.AnyNode]:
raw: bool = False) -> List[Node]:
"""
find files based on their names
@top: top node
@ -573,7 +583,7 @@ class Noder:
self._debug(f'searching for \"{key}\"')
# search for nodes based on path
start = top
start: Optional[Node] = top
if startnode:
start = self.get_node(top, startnode)
filterfunc = self._callback_find_name(key, only_dir)
@ -626,17 +636,17 @@ class Noder:
def _callback_find_name(self, term: str, only_dir: bool) -> Any:
"""callback for finding files"""
def find_name(node: anytree.AnyNode) -> bool:
if node.type == self.TYPE_STORAGE:
def find_name(node: Node) -> bool:
if node.type == cnode.TYPE_STORAGE:
# ignore storage nodes
return False
if node.type == self.TYPE_TOP:
if node.type == cnode.TYPE_TOP:
# ignore top nodes
return False
if node.type == self.TYPE_META:
if node.type == cnode.TYPE_META:
# ignore meta nodes
return False
if only_dir and node.type != self.TYPE_DIR:
if only_dir and node.type != cnode.TYPE_DIR:
# ignore non directory
return False
@ -653,11 +663,11 @@ class Noder:
###############################################################
# ls
###############################################################
def list(self, top: anytree.AnyNode,
def list(self, top: Node,
path: str,
rec: bool = False,
fmt: str = 'native',
raw: bool = False) -> List[anytree.AnyNode]:
raw: bool = False) -> List[Node]:
"""
list nodes for "ls"
@top: top node
@ -719,7 +729,7 @@ class Noder:
# tree creation
###############################################################
def _add_entry(self, name: str,
top: anytree.AnyNode,
top: Node,
resolv: Any) -> None:
"""add an entry to the tree"""
entries = name.rstrip(os.sep).split(os.sep)
@ -734,7 +744,7 @@ class Noder:
except anytree.resolver.ChildResolverError:
self.new_archive_node(nodename, name, top, top.name)
def list_to_tree(self, parent: anytree.AnyNode, names: List[str]) -> None:
def list_to_tree(self, parent: Node, names: List[str]) -> None:
"""convert list of files to a tree"""
if not names:
return
@ -747,23 +757,23 @@ class Noder:
# diverse
###############################################################
def _sort_tree(self,
items: List[anytree.AnyNode]) -> List[anytree.AnyNode]:
items: List[Node]) -> List[Node]:
"""sorting a list of items"""
return sorted(items, key=self._sort, reverse=self.sortsize)
def _sort(self, lst: List[anytree.AnyNode]) -> Any:
def _sort(self, lst: Node) -> Any:
"""sort a list"""
if self.sortsize:
return self._sort_size(lst)
return self._sort_fs(lst)
@staticmethod
def _sort_fs(node: anytree.AnyNode) -> Tuple[str, str]:
def _sort_fs(node: Node) -> Tuple[str, str]:
"""sorting nodes dir first and alpha"""
return (node.type, node.name.lstrip('.').lower())
@staticmethod
def _sort_size(node: anytree.AnyNode) -> float:
def _sort_size(node: Node) -> float:
"""sorting nodes by size"""
try:
if not node.size:
@ -772,22 +782,22 @@ class Noder:
except AttributeError:
return 0
def _get_storage(self, node: anytree.AnyNode) -> anytree.AnyNode:
def _get_storage(self, node: Node) -> Node:
"""recursively traverse up to find storage"""
if node.type == self.TYPE_STORAGE:
if node.type == cnode.TYPE_STORAGE:
return node
return node.ancestors[1]
return cast(Node, node.ancestors[1])
@staticmethod
def _has_attr(node: anytree.AnyNode, attr: str) -> bool:
def _has_attr(node: Node, attr: str) -> bool:
"""return True if node has attr as attribute"""
return attr in node.__dict__.keys()
def _get_parents(self, node: anytree.AnyNode) -> str:
def _get_parents(self, node: Node) -> str:
"""get all parents recursively"""
if node.type == self.TYPE_STORAGE:
if node.type == cnode.TYPE_STORAGE:
return ''
if node.type == self.TYPE_TOP:
if node.type == cnode.TYPE_TOP:
return ''
parent = self._get_parents(node.parent)
if parent:
@ -804,7 +814,7 @@ class Noder:
return ''
@staticmethod
def _sanitize(node: anytree.AnyNode) -> anytree.AnyNode:
def _sanitize(node: Node) -> Node:
"""sanitize node strings"""
node.name = fix_badchars(node.name)
node.relpath = fix_badchars(node.relpath)

@ -57,12 +57,12 @@ def size_to_str(size: float,
return f'{size:.1f}{sufix}'
def epoch_to_str(epoch: int) -> str:
def epoch_to_str(epoch: float) -> str:
"""convert epoch to string"""
if not epoch:
return ''
fmt = '%Y-%m-%d %H:%M:%S'
timestamp = datetime.datetime.fromtimestamp(float(epoch))
timestamp = datetime.datetime.fromtimestamp(epoch)
return timestamp.strftime(fmt)

@ -6,10 +6,10 @@ Catcli filesystem indexer
"""
import os
from typing import Tuple
import anytree # type: ignore
from typing import Tuple, Optional
# local imports
from catcli.cnode import Node
from catcli.noder import Noder
from catcli.logger import Logger
@ -31,12 +31,12 @@ class Walker:
"""
self.noder = noder
self.usehash = usehash
self.noder.set_hashing(self.usehash)
self.noder.do_hashing(self.usehash)
self.debug = debug
self.lpath = logpath
def index(self, path: str,
parent: str,
parent: Node,
name: str,
storagepath: str = '') -> Tuple[str, int]:
"""
@ -89,15 +89,15 @@ class Walker:
self._progress('')
return parent, cnt
def reindex(self, path: str, parent: str, top: str) -> int:
def reindex(self, path: str, parent: Node, top: Node) -> int:
"""reindex a directory and store in tree"""
cnt = self._reindex(path, parent, top)
cnt += self.noder.clean_not_flagged(parent)
return cnt
def _reindex(self, path: str,
parent: str,
top: anytree.AnyNode,
parent: Node,
top: Node,
storagepath: str = '') -> int:
"""
reindex a directory and store in tree
@ -115,13 +115,15 @@ class Walker:
reindex, node = self._need_reindex(parent, sub, treepath)
if not reindex:
self._debug(f'\tskip file {sub}')
self.noder.flag(node)
if node:
node.flag()
continue
self._log2file(f'update catalog for \"{sub}\"')
node = self.noder.new_file_node(os.path.basename(file), sub,
parent, storagepath)
self.noder.flag(node)
cnt += 1
if node:
node.flag()
cnt += 1
for adir in dirs:
self._debug(f'found dir \"{adir}\" under {path}')
base = os.path.basename(adir)
@ -133,40 +135,42 @@ class Walker:
dummy = self.noder.new_dir_node(base, sub,
parent, storagepath)
cnt += 1
self.noder.flag(dummy)
if dummy:
dummy.flag()
self._debug(f'reindexing deeper under {sub}')
nstoragepath = os.sep.join([storagepath, base])
if not storagepath:
nstoragepath = base
cnt2 = self._reindex(sub, dummy, top, nstoragepath)
cnt += cnt2
if dummy:
cnt2 = self._reindex(sub, dummy, top, nstoragepath)
cnt += cnt2
break
return cnt
def _need_reindex(self,
top: anytree.AnyNode,
top: Node,
path: str,
treepath: str) -> Tuple[bool, anytree.AnyNode]:
treepath: str) -> Tuple[bool, Optional[Node]]:
"""
test if node needs re-indexing
@top: top node (storage)
@path: abs path to file
@treepath: rel path from indexed directory
"""
cnode, changed = self.noder.get_node_if_changed(top, path, treepath)
if not cnode:
node, changed = self.noder.get_node_if_changed(top, path, treepath)
if not node:
self._debug(f'\t{path} does not exist')
return True, cnode
if cnode and not changed:
return True, node
if node and not changed:
# ignore this node
self._debug(f'\t{path} has not changed')
return False, cnode
if cnode and changed:
return False, node
if node and changed:
# remove this node and re-add
self._debug(f'\t{path} has changed')
self._debug(f'\tremoving node {cnode.name} for {path}')
cnode.parent = None
return True, cnode
self._debug(f'\tremoving node {node.name} for {path}')
node.parent = None
return True, node
def _debug(self, string: str) -> None:
"""print to debug"""

@ -46,7 +46,7 @@ class TestUpdate(unittest.TestCase):
d2f2 = create_rnd_file(dir2, 'dir2file2')
noder = Noder(debug=True)
noder.set_hashing(True)
noder.do_hashing(True)
top = noder.new_top_node()
catalog = Catalog(catalogpath, force=True, debug=False)

Loading…
Cancel
Save