pull/29/head
deadc0de6 2 years ago
parent cf3e465525
commit c0afb0feeb

@ -3,12 +3,12 @@ author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2017, deadc0de6
"""
# pylint: disable=C0415
import sys
__version__ = '0.8.7'
def main():
"""entry point"""
import catcli.catcli
if catcli.catcli.main():
sys.exit(0)

@ -11,54 +11,57 @@ from anytree.exporter import JsonExporter
from anytree.importer import JsonImporter
# local imports
import catcli.utils as utils
from catcli.utils import ask
from catcli.logger import Logger
class Catalog:
"""the catalog"""
def __init__(self, path, pickle=False, debug=False, force=False):
'''
def __init__(self, path, usepickle=False, debug=False, force=False):
"""
@path: catalog path
@pickle: use pickle
@usepickle: use pickle
@debug: debug mode
@force: force overwrite if exists
'''
"""
self.path = path
self.debug = debug
self.force = force
self.metanode = None
self.pickle = pickle
self.pickle = usepickle
def set_metanode(self, metanode):
'''remove the metanode until tree is re-written'''
"""remove the metanode until tree is re-written"""
self.metanode = metanode
self.metanode.parent = None
def restore(self):
'''restore the catalog'''
"""restore the catalog"""
if not self.path:
return None
if not os.path.exists(self.path):
return None
if self.pickle:
return self._restore_pickle()
return self._restore_json(open(self.path, 'r').read())
with open(self.path, 'r', encoding='UTF-8') as file:
content = file.read()
return self._restore_json(content)
def save(self, node):
'''save the catalog'''
"""save the catalog"""
if not self.path:
Logger.err('Path not defined')
return False
d = os.path.dirname(self.path)
if d and not os.path.exists(d):
os.makedirs(d)
directory = os.path.dirname(self.path)
if directory and not os.path.exists(directory):
os.makedirs(directory)
elif os.path.exists(self.path) and not self.force:
if not utils.ask(f'Update catalog \"{self.path}\"'):
if not ask(f'Update catalog \"{self.path}\"'):
Logger.info('Catalog not saved')
return False
if d and not os.path.exists(d):
Logger.err(f'Cannot write to \"{d}\"')
if directory and not os.path.exists(directory):
Logger.err(f'Cannot write to \"{directory}\"')
return False
if self.metanode:
self.metanode.parent = node
@ -72,28 +75,30 @@ class Catalog:
Logger.debug(text)
def _save_pickle(self, node):
'''pickle the catalog'''
pickle.dump(node, open(self.path, 'wb'))
"""pickle the catalog"""
with open(self.path, 'wb') as file:
pickle.dump(node, file)
self._debug(f'Catalog saved to pickle \"{self.path}\"')
return True
def _restore_pickle(self):
'''restore the pickled tree'''
root = pickle.load(open(self.path, 'rb'))
m = f'Catalog imported from pickle \"{self.path}\"'
self._debug(m)
"""restore the pickled tree"""
with open(self.path, 'rb') as file:
root = pickle.load(file)
msg = f'Catalog imported from pickle \"{self.path}\"'
self._debug(msg)
return root
def _save_json(self, node):
'''export the catalog in json'''
"""export the catalog in json"""
exp = JsonExporter(indent=2, sort_keys=True)
with open(self.path, 'w') as f:
exp.write(node, f)
with open(self.path, 'w', encoding='UTF-8') as file:
exp.write(node, file)
self._debug(f'Catalog saved to json \"{self.path}\"')
return True
def _restore_json(self, string):
'''restore the tree from json'''
"""restore the tree from json"""
imp = JsonImporter()
root = imp.import_(string)
self._debug(f'Catalog imported from json \"{self.path}\"')

@ -14,7 +14,7 @@ import datetime
from docopt import docopt
# local imports
from . import __version__ as VERSION
from .version import __version__ as VERSION
from .logger import Logger
from .catalog import Catalog
from .walker import Walker
@ -77,9 +77,10 @@ Options:
def cmd_index(args, noder, catalog, top):
"""index action"""
path = args['<path>']
name = args['<name>']
hash = args['--hash']
usehash = args['--hash']
debug = args['--verbose']
subsize = not args['--no-subsize']
if not os.path.exists(path):
@ -97,7 +98,7 @@ def cmd_index(args, noder, catalog, top):
node.parent = None
start = datetime.datetime.now()
walker = Walker(noder, hash=hash, debug=debug)
walker = Walker(noder, usehash=usehash, debug=debug)
attr = noder.format_storage_attr(args['--meta'])
root = noder.storage_node(name, path, parent=top, attr=attr)
_, cnt = walker.index(path, root, name)
@ -111,9 +112,10 @@ def cmd_index(args, noder, catalog, top):
def cmd_update(args, noder, catalog, top):
"""update action"""
path = args['<path>']
name = args['<name>']
hash = args['--hash']
usehash = args['--hash']
logpath = args['--lpath']
debug = args['--verbose']
subsize = not args['--no-subsize']
@ -125,7 +127,7 @@ def cmd_update(args, noder, catalog, top):
Logger.err(f'storage named \"{name}\" does not exist')
return
start = datetime.datetime.now()
walker = Walker(noder, hash=hash, debug=debug,
walker = Walker(noder, usehash=usehash, debug=debug,
logpath=logpath)
cnt = walker.reindex(path, root, top)
if subsize:
@ -138,6 +140,7 @@ def cmd_update(args, noder, catalog, top):
def cmd_ls(args, noder, top):
"""ls action"""
path = args['<path>']
if not path:
path = SEPARATOR
@ -161,6 +164,7 @@ def cmd_ls(args, noder, top):
def cmd_rm(args, noder, catalog, top):
"""rm action"""
name = args['<storage>']
node = noder.get_storage_node(top, name)
if node:
@ -173,6 +177,7 @@ def cmd_rm(args, noder, catalog, top):
def cmd_find(args, noder, top):
"""find action"""
fromtree = args['--parent']
directory = args['--directory']
startpath = args['--path']
@ -186,6 +191,7 @@ def cmd_find(args, noder, top):
def cmd_tree(args, noder, top):
"""tree action"""
path = args['<path>']
hdr = args['--header']
raw = args['--raw-size']
@ -201,6 +207,7 @@ def cmd_tree(args, noder, top):
def cmd_graph(args, noder, top):
"""graph action"""
path = args['<path>']
if not path:
path = GRAPHPATH
@ -208,7 +215,8 @@ def cmd_graph(args, noder, top):
Logger.info(f'create graph with \"{cmd}\" (you need graphviz)')
def cmd_rename(args, noder, catalog, top):
def cmd_rename(args, catalog, top):
"""rename action"""
storage = args['<storage>']
new = args['<name>']
storages = list(x.name for x in top.children)
@ -216,14 +224,15 @@ def cmd_rename(args, noder, catalog, top):
node = next(filter(lambda x: x.name == storage, top.children))
node.name = new
if catalog.save(top):
m = f'Storage \"{storage}\" renamed to \"{new}\"'
Logger.info(m)
msg = f'Storage \"{storage}\" renamed to \"{new}\"'
Logger.info(msg)
else:
Logger.err(f'Storage named \"{storage}\" does not exist')
return top
def cmd_edit(args, noder, catalog, top):
"""edit action"""
storage = args['<storage>']
storages = list(x.name for x in top.children)
if storage in storages:
@ -241,11 +250,13 @@ def cmd_edit(args, noder, catalog, top):
def banner():
"""print banner"""
Logger.out_err(BANNER)
Logger.out_err("")
def print_supported_formats():
"""print all supported formats to stdout"""
print('"native" : native format')
print('"csv" : CSV format')
print(f' {Noder.CSV_HEADER}')
@ -254,6 +265,7 @@ def print_supported_formats():
def main():
"""entry point"""
args = docopt(USAGE, version=VERSION)
if args['help'] or args['--help']:
@ -313,7 +325,7 @@ def main():
elif args['graph']:
cmd_graph(args, noder, top)
elif args['rename']:
cmd_rename(args, noder, catalog, top)
cmd_rename(args, catalog, top)
elif args['edit']:
cmd_edit(args, noder, catalog, top)
@ -321,7 +333,6 @@ def main():
if __name__ == '__main__':
'''entry point'''
if main():
sys.exit(0)
sys.exit(1)

@ -11,6 +11,7 @@ import zipfile
class Decomp:
"""decompressor"""
def __init__(self):
self.ext = {
@ -28,26 +29,26 @@ class Decomp:
'zip': self._zip}
def get_formats(self):
'''return list of supported extensions'''
"""return list of supported extensions"""
return list(self.ext.keys())
def get_names(self, path):
'''get tree of compressed archive'''
"""get tree of compressed archive"""
ext = os.path.splitext(path)[1][1:].lower()
if ext in list(self.ext.keys()):
if ext in list(self.ext):
return self.ext[ext](path)
return None
def _tar(self, path):
'''return list of file names in tar'''
"""return list of file names in tar"""
if not tarfile.is_tarfile(path):
return None
tar = tarfile.open(path, "r")
return tar.getnames()
with tarfile.open(path, "r") as tar:
return tar.getnames()
def _zip(self, path):
'''return list of file names in zip'''
"""return list of file names in zip"""
if not zipfile.is_zipfile(path):
return None
z = zipfile.ZipFile(path)
return z.namelist()
with zipfile.ZipFile(path) as file:
return file.namelist()

@ -9,6 +9,7 @@ import sys
class Logger:
"""log to stdout/stderr"""
RED = '\033[91m'
GREEN = '\033[92m'
@ -26,7 +27,9 @@ class Logger:
ARCHIVE = 'archive'
NBFILES = 'nbfiles'
def no_color():
@classmethod
def no_color(cls):
"""disable colors"""
Logger.RED = ''
Logger.GREEN = ''
Logger.YELLOW = ''
@ -39,32 +42,37 @@ class Logger:
Logger.BOLD = ''
Logger.UND = ''
def fix_badchars(line):
@classmethod
def fix_badchars(cls, line):
"""fix none utf-8 chars in line"""
return line.encode('utf-8', 'ignore').decode('utf-8')
######################################################################
# node specific output
######################################################################
def storage(pre, name, args, attr):
'''print a storage node'''
@classmethod
def storage(cls, pre, name, args, attr):
"""print a storage node"""
end = ''
if attr:
end = f' {Logger.GRAY}({attr}){Logger.RESET}'
s = f'{pre}{Logger.UND}{Logger.STORAGE}{Logger.RESET}:'
s += ' ' + Logger.PURPLE + Logger.fix_badchars(name) + \
out = f'{pre}{Logger.UND}{Logger.STORAGE}{Logger.RESET}:'
out += ' ' + Logger.PURPLE + Logger.fix_badchars(name) + \
Logger.RESET + end + '\n'
s += f' {Logger.GRAY}{args}{Logger.RESET}'
sys.stdout.write(f'{s}\n')
out += f' {Logger.GRAY}{args}{Logger.RESET}'
sys.stdout.write(f'{out}\n')
def file(pre, name, attr):
'''print a file node'''
@classmethod
def file(cls, pre, name, attr):
"""print a file node"""
nobad = Logger.fix_badchars(name)
s = f'{pre}{nobad}'
s += f' {Logger.GRAY}[{attr}]{Logger.RESET}'
sys.stdout.write(f'{s}\n')
out = f'{pre}{nobad}'
out += f' {Logger.GRAY}[{attr}]{Logger.RESET}'
sys.stdout.write(f'{out}\n')
def dir(pre, name, depth='', attr=None):
'''print a directory node'''
@classmethod
def dir(cls, pre, name, depth='', attr=None):
"""print a directory node"""
end = []
if depth != '':
end.append(f'{Logger.NBFILES}:{depth}')
@ -73,60 +81,71 @@ class Logger:
if end:
endstring = ', '.join(end)
end = f' [{endstring}]'
s = pre + Logger.BLUE + Logger.fix_badchars(name) + Logger.RESET
s += f'{Logger.GRAY}{end}{Logger.RESET}'
sys.stdout.write(f'{s}\n')
out = pre + Logger.BLUE + Logger.fix_badchars(name) + Logger.RESET
out += f'{Logger.GRAY}{end}{Logger.RESET}'
sys.stdout.write(f'{out}\n')
def arc(pre, name, archive):
s = pre + Logger.YELLOW + Logger.fix_badchars(name) + Logger.RESET
s += f' {Logger.GRAY}[{Logger.ARCHIVE}:{archive}]{Logger.RESET}'
sys.stdout.write(f'{s}\n')
@classmethod
def arc(cls, pre, name, archive):
"""archive to stdout"""
out = pre + Logger.YELLOW + Logger.fix_badchars(name) + Logger.RESET
out += f' {Logger.GRAY}[{Logger.ARCHIVE}:{archive}]{Logger.RESET}'
sys.stdout.write(f'{out}\n')
######################################################################
# generic output
######################################################################
def out(string):
'''to stdout no color'''
@classmethod
def out(cls, string):
"""to stdout no color"""
string = Logger.fix_badchars(string)
sys.stdout.write(f'{string}\n')
def out_err(string):
'''to stderr no color'''
@classmethod
def out_err(cls, string):
"""to stderr no color"""
string = Logger.fix_badchars(string)
sys.stderr.write(f'{string}\n')
def debug(string):
'''to stderr no color'''
@classmethod
def debug(cls, string):
"""to stderr no color"""
string = Logger.fix_badchars(string)
sys.stderr.write(f'[DBG] {string}\n')
def info(string):
'''to stdout in color'''
@classmethod
def info(cls, string):
"""to stdout in color"""
string = Logger.fix_badchars(string)
s = f'{Logger.MAGENTA}{string}{Logger.RESET}'
sys.stdout.write(f'{s}\n')
out = f'{Logger.MAGENTA}{string}{Logger.RESET}'
sys.stdout.write(f'{out}\n')
def err(string):
'''to stderr in RED'''
@classmethod
def err(cls, string):
"""to stderr in RED"""
string = Logger.fix_badchars(string)
s = f'{Logger.RED}{string}{Logger.RESET}'
sys.stderr.write(f'{s}\n')
out = f'{Logger.RED}{string}{Logger.RESET}'
sys.stderr.write(f'{out}\n')
def progr(string):
'''print progress'''
@classmethod
def progr(cls, string):
"""print progress"""
string = Logger.fix_badchars(string)
sys.stderr.write(f'{string}\r')
sys.stderr.flush()
def bold(string):
'''make it bold'''
@classmethod
def bold(cls, string):
"""make it bold"""
string = Logger.fix_badchars(string)
return f'{Logger.BOLD}{string}{Logger.RESET}'
def flog(path, string, append=True):
@classmethod
def flog(cls, path, string, append=True):
"""log and fix bad chars"""
string = Logger.fix_badchars(string)
mode = 'w'
if append:
mode = 'a'
with open(path, mode) as f:
f.write(string)
with open(path, mode, encoding='UTF-8') as file:
file.write(string)

@ -12,21 +12,21 @@ import anytree
from pyfzf.pyfzf import FzfPrompt
# local imports
import catcli.utils as utils
from catcli.utils import size_to_str, epoch_to_str, md5sum
from catcli.logger import Logger
from catcli.decomp import Decomp
from . import __version__ as VERSION
from catcli.version import __version__ as VERSION
'''
There are 4 types of node:
class Noder:
"""
handles node in the catalog tree
There are 4 types of node:
* "top" node representing the top node (generic node)
* "storage" node representing a storage
* "dir" node representing a directory
* "file" node representing a file
'''
class Noder:
"""
TOPNAME = 'top'
METANAME = 'meta'
@ -41,11 +41,11 @@ class Noder:
'total_space,meta')
def __init__(self, debug=False, sortsize=False, arc=False):
'''
"""
@debug: debug mode
@sortsize: sort nodes by size
@arch: handle archive
'''
"""
self.hash = True
self.debug = debug
self.sortsize = sortsize
@ -54,20 +54,20 @@ class Noder:
self.decomp = Decomp()
def get_storage_names(self, top):
'''return a list of all storage names'''
"""return a list of all storage names"""
return [x.name for x in list(top.children)]
def get_storage_node(self, top, name, path=None):
'''
"""
return the storage node if any
if path is submitted, it will update the media info
'''
"""
found = None
for n in top.children:
if n.type != self.TYPE_STORAGE:
for node in top.children:
if node.type != self.TYPE_STORAGE:
continue
if n.name == name:
found = n
if node.name == name:
found = node
break
if found and path and os.path.exists(path):
found.free = shutil.disk_usage(path).free
@ -76,23 +76,23 @@ class Noder:
return found
def get_node(self, top, path, quiet=False):
'''get the node by internal tree path'''
r = anytree.resolver.Resolver('name')
"""get the node by internal tree path"""
resolv = anytree.resolver.Resolver('name')
try:
p = os.path.basename(path)
return r.get(top, p)
bpath = os.path.basename(path)
return resolv.get(top, bpath)
except anytree.resolver.ChildResolverError:
if not quiet:
Logger.err(f'No node at path \"{p}\"')
Logger.err(f'No node at path \"{bpath}\"')
return None
def get_node_if_changed(self, top, path, treepath):
'''
"""
return the node (if any) and if it has changed
@top: top node (storage)
@path: abs path to file
@treepath: rel path from indexed directory
'''
"""
treepath = treepath.lstrip(os.sep)
node = self.get_node(top, treepath, quiet=True)
# node does not exist
@ -116,34 +116,34 @@ class Noder:
if self.hash and node.md5:
md5 = self._get_hash(path)
if md5 != node.md5:
m = f'\tchange: checksum changed for \"{path}\"'
self._debug(m)
msg = f'\tchange: checksum changed for \"{path}\"'
self._debug(msg)
return node, True
self._debug(f'\tchange: no change for \"{path}\"')
return node, False
def _rec_size(self, node, store=True):
'''
"""
recursively traverse tree and return size
@store: store the size in the node
'''
"""
if node.type == self.TYPE_FILE:
self._debug(f'getting node size for \"{node.name}\"')
return node.size
m = f'getting node size recursively for \"{node.name}\"'
self._debug(m)
msg = f'getting node size recursively for \"{node.name}\"'
self._debug(msg)
size = 0
for i in node.children:
if node.type == self.TYPE_DIR:
sz = self._rec_size(i, store=store)
size = self._rec_size(i, store=store)
if store:
i.size = sz
size += sz
i.size = size
size += size
if node.type == self.TYPE_STORAGE:
sz = self._rec_size(i, store=store)
size = self._rec_size(i, store=store)
if store:
i.size = sz
size += sz
i.size = size
size += size
else:
continue
if store:
@ -151,34 +151,34 @@ class Noder:
return size
def rec_size(self, node):
'''recursively traverse tree and store dir size'''
"""recursively traverse tree and store dir size"""
return self._rec_size(node, store=True)
###############################################################
# public helpers
###############################################################
def format_storage_attr(self, attr):
'''format the storage attr for saving'''
"""format the storage attr for saving"""
if not attr:
return ''
if type(attr) is list:
if isinstance(attr, list):
return ', '.join(attr)
attr = attr.rstrip()
return attr
def set_hashing(self, val):
'''hash files when indexing'''
"""hash files when indexing"""
self.hash = val
###############################################################
# node creationg
###############################################################
def new_top_node(self):
'''create a new top node'''
"""create a new top node"""
return anytree.AnyNode(name=self.TOPNAME, type=self.TYPE_TOP)
def update_metanode(self, top):
'''create or update meta node information'''
"""create or update meta node information"""
meta = self._get_meta_node(top)
epoch = int(time.time())
if not meta:
@ -192,7 +192,7 @@ class Noder:
return meta
def _get_meta_node(self, top):
'''return the meta node if any'''
"""return the meta node if any"""
try:
return next(filter(lambda x: x.type == self.TYPE_META,
top.children))
@ -200,15 +200,15 @@ class Noder:
return None
def file_node(self, name, path, parent, storagepath):
'''create a new node representing a file'''
"""create a new node representing a file"""
if not os.path.exists(path):
Logger.err(f'File \"{path}\" does not exist')
return None
path = os.path.abspath(path)
try:
st = os.lstat(path)
except OSError as e:
Logger.err(f'OSError: {e}')
stat = os.lstat(path)
except OSError as exc:
Logger.err(f'OSError: {exc}')
return None
md5 = None
if self.hash:
@ -216,20 +216,20 @@ class Noder:
relpath = os.sep.join([storagepath, name])
maccess = os.path.getmtime(path)
n = self._node(name, self.TYPE_FILE, relpath, parent,
size=st.st_size, md5=md5, maccess=maccess)
node = self._node(name, self.TYPE_FILE, relpath, parent,
size=stat.st_size, md5=md5, maccess=maccess)
if self.arc:
ext = os.path.splitext(path)[1][1:]
if ext.lower() in self.decomp.get_formats():
self._debug(f'{path} is an archive')
names = self.decomp.get_names(path)
self.list_to_tree(n, names)
self.list_to_tree(node, names)
else:
self._debug(f'{path} is NOT an archive')
return n
return node
def dir_node(self, name, path, parent, storagepath):
'''create a new node representing a directory'''
"""create a new node representing a directory"""
path = os.path.abspath(path)
relpath = os.sep.join([storagepath, name])
maccess = os.path.getmtime(path)
@ -237,21 +237,21 @@ class Noder:
parent, maccess=maccess)
def clean_not_flagged(self, top):
'''remove any node not flagged and clean flags'''
"""remove any node not flagged and clean flags"""
cnt = 0
for node in anytree.PreOrderIter(top):
if node.type != self.TYPE_FILE and node.type != self.TYPE_DIR:
if node.type not in [self.TYPE_FILE, self.TYPE_DIR]:
continue
if self._clean(node):
cnt += 1
return cnt
def flag(self, node):
'''flag a node'''
"""flag a node"""
node.flag = True
def _clean(self, node):
'''remove node if not flagged'''
"""remove node if not flagged"""
if not self._has_attr(node, 'flag') or \
not node.flag:
node.parent = None
@ -260,7 +260,7 @@ class Noder:
return False
def storage_node(self, name, path, parent, attr=None):
'''create a new node representing a storage'''
"""create a new node representing a storage"""
path = os.path.abspath(path)
free = shutil.disk_usage(path).free
total = shutil.disk_usage(path).total
@ -269,15 +269,15 @@ class Noder:
total=total, parent=parent, attr=attr, ts=epoch)
def archive_node(self, name, path, parent, archive):
'''crete a new node for archive data'''
"""crete a new node for archive data"""
return anytree.AnyNode(name=name, type=self.TYPE_ARC, relpath=path,
parent=parent, size=0, md5=None,
archive=archive)
def _node(self, name, type, relpath, parent,
def _node(self, name, nodetype, relpath, parent,
size=None, md5=None, maccess=None):
'''generic node creation'''
return anytree.AnyNode(name=name, type=type, relpath=relpath,
"""generic node creation"""
return anytree.AnyNode(name=name, type=nodetype, relpath=relpath,
parent=parent, size=size,
md5=md5, maccess=maccess)
@ -285,16 +285,16 @@ class Noder:
# printing
###############################################################
def _node_to_csv(self, node, sep=',', raw=False):
'''
"""
print a node to csv
@node: the node to consider
@sep: CSV separator character
@raw: print raw size rather than human readable
'''
"""
if not node:
return ''
return
if node.type == self.TYPE_TOP:
return ''
return
out = []
if node.type == self.TYPE_STORAGE:
@ -302,16 +302,16 @@ class Noder:
out.append(node.name) # name
out.append(node.type) # type
out.append('') # fake full path
sz = self._rec_size(node, store=False)
out.append(utils.size_to_str(sz, raw=raw)) # size
out.append(utils.epoch_to_str(node.ts)) # indexed_at
size = self._rec_size(node, store=False)
out.append(size_to_str(size, raw=raw)) # size
out.append(epoch_to_str(node.ts)) # indexed_at
out.append('') # fake maccess
out.append('') # fake md5
out.append(str(len(node.children))) # nbfiles
# fake free_space
out.append(utils.size_to_str(node.free, raw=raw))
out.append(size_to_str(node.free, raw=raw))
# fake total_space
out.append(utils.size_to_str(node.total, raw=raw))
out.append(size_to_str(node.total, raw=raw))
out.append(node.attr) # meta
else:
# handle other nodes
@ -322,10 +322,10 @@ class Noder:
fullpath = os.path.join(storage.name, parents)
out.append(fullpath.replace('"', '""')) # full path
out.append(utils.size_to_str(node.size, raw=raw)) # size
out.append(utils.epoch_to_str(storage.ts)) # indexed_at
out.append(size_to_str(node.size, raw=raw)) # size
out.append(epoch_to_str(storage.ts)) # indexed_at
if self._has_attr(node, 'maccess'):
out.append(utils.epoch_to_str(node.maccess)) # maccess
out.append(epoch_to_str(node.maccess)) # maccess
else:
out.append('') # fake maccess
if node.md5:
@ -347,7 +347,7 @@ class Noder:
def _print_node(self, node, pre='', withpath=False,
withdepth=False, withstorage=False,
recalcparent=False, raw=False):
'''
"""
print a node
@node: the node to print
@pre: string to print before node
@ -356,7 +356,7 @@ class Noder:
@withstorage: print the node storage it belongs to
@recalcparent: get relpath from tree instead of relpath field
@raw: print raw size rather than human readable
'''
"""
if node.type == self.TYPE_TOP:
# top node
Logger.out(f'{pre}{node.name}')
@ -374,8 +374,8 @@ class Noder:
attr = ''
if node.md5:
attr = f', md5:{node.md5}'
sz = utils.size_to_str(node.size, raw=raw)
compl = f'size:{sz}{attr}'
size = size_to_str(node.size, raw=raw)
compl = f'size:{size}{attr}'
if withstorage:
content = Logger.bold(storage.name)
compl += f', storage:{content}'
@ -396,35 +396,35 @@ class Noder:
storage = self._get_storage(node)
attr = []
if node.size:
attr.append(['totsize', utils.size_to_str(node.size, raw=raw)])
attr.append(['totsize', size_to_str(node.size, raw=raw)])
if withstorage:
attr.append(['storage', Logger.bold(storage.name)])
Logger.dir(pre, name, depth=depth, attr=attr)
elif node.type == self.TYPE_STORAGE:
# node of type storage
hf = utils.size_to_str(node.free, raw=raw)
ht = utils.size_to_str(node.total, raw=raw)
szfree = size_to_str(node.free, raw=raw)
sztotal = size_to_str(node.total, raw=raw)
nbchildren = len(node.children)
pcent = node.free * 100 / node.total
freepercent = f'{pcent:.1f}%'
# get the date
dt = ''
timestamp = ''
if self._has_attr(node, 'ts'):
dt = 'date:'
dt += utils.epoch_to_str(node.ts)
ds = ''
timestamp = 'date:'
timestamp += epoch_to_str(node.ts)
disksize = ''
# the children size
sz = self._rec_size(node, store=False)
sz = utils.size_to_str(sz, raw=raw)
ds = 'totsize:' + f'{sz}'
size = self._rec_size(node, store=False)
size = size_to_str(size, raw=raw)
disksize = 'totsize:' + f'{size}'
# format the output
name = node.name
args = [
'nbfiles:' + f'{nbchildren}',
ds,
disksize,
f'free:{freepercent}',
'du:' + f'{hf}/{ht}',
dt]
'du:' + f'{szfree}/{sztotal}',
timestamp]
argsstring = ' | '.join(args)
Logger.storage(pre,
name,
@ -437,35 +437,37 @@ class Noder:
else:
Logger.err(f'bad node encountered: {node}')
def print_tree(self, top, node, style=anytree.ContRoundStyle(),
fmt='native', header=False, raw=False):
'''
def print_tree(self, top, node,
fmt='native',
header=False,
raw=False):
"""
print the tree in different format
@node: start node
@style: when fmt=native, defines the tree style
@fmt: output format
@header: when fmt=csv, print the header
@raw: print the raw size rather than human readable
'''
"""
if fmt == 'native':
# "tree" style
rend = anytree.RenderTree(node, childiter=self._sort_tree)
for pre, fill, node in rend:
self._print_node(node, pre=pre, withdepth=True, raw=raw)
for pre, _, thenode in rend:
self._print_node(thenode, pre=pre, withdepth=True, raw=raw)
elif fmt == 'csv':
# csv output
self._to_csv(node, with_header=header, raw=raw)
elif fmt.startswith('fzf'):
# flat
self._to_fzf(top, node)
self._to_fzf(top, node, fmt)
def _to_csv(self, node, with_header=False, raw=False):
'''print the tree to csv'''
"""print the tree to csv"""
rend = anytree.RenderTree(node, childiter=self._sort_tree)
if with_header:
Logger.out(self.CSV_HEADER)
for _, _, node in rend:
self._node_to_csv(node, raw=raw)
for _, _, item in rend:
self._node_to_csv(item, raw=raw)
def _fzf_prompt(self, strings):
# prompt with fzf
@ -480,16 +482,16 @@ class Noder:
@node: node to start with
@fmt: output format for selected nodes
"""
rend = anytree.RenderTree(node, childiter=self._sort_tree)
nodes = dict()
rendered = anytree.RenderTree(node, childiter=self._sort_tree)
nodes = {}
# construct node names list
for _, _, node in rend:
if not node:
for _, _, rend in rendered:
if not rend:
continue
parents = self._get_parents(node)
storage = self._get_storage(node)
parents = self._get_parents(rend)
storage = self._get_storage(rend)
fullpath = os.path.join(storage.name, parents)
nodes[fullpath] = node
nodes[fullpath] = rend
# prompt with fzf
paths = self._fzf_prompt(nodes.keys())
# print the resulting tree
@ -499,11 +501,11 @@ class Noder:
continue
if path not in nodes:
continue
node = nodes[path]
self.print_tree(top, node, fmt=subfmt)
rend = nodes[path]
self.print_tree(top, rend, fmt=subfmt)
def to_dot(self, node, path='tree.dot'):
'''export to dot for graphing'''
"""export to dot for graphing"""
anytree.exporter.DotExporter(node).to_dotfile(path)
Logger.info(f'dot file created under \"{path}\"')
return f'dot {path} -T png -o /tmp/tree.png'
@ -515,7 +517,7 @@ class Noder:
script=False, directory=False,
startpath=None, parentfromtree=False,
fmt='native', raw=False):
'''
"""
find files based on their names
@top: top node
@key: term to search for
@ -525,7 +527,7 @@ class Noder:
@parentfromtree: get path from parent instead of stored relpath
@fmt: output format
@raw: raw size output
'''
"""
self._debug(f'searching for \"{key}\"')
if not key:
# nothing to search for
@ -534,49 +536,49 @@ class Noder:
if startpath:
start = self.get_node(top, startpath)
found = anytree.findall(start, filter_=self._callback_find_name(key))
nb = len(found)
self._debug(f'found {nb} node(s)')
nbfound = len(found)
self._debug(f'found {nbfound} node(s)')
# compile found nodes
paths = dict()
paths = {}
nodes = []
for f in found:
if f.type == self.TYPE_STORAGE:
for item in found:
if item.type == self.TYPE_STORAGE:
# ignore storage nodes
continue
if directory and f.type != self.TYPE_DIR:
if directory and item.type != self.TYPE_DIR:
# ignore non directory
continue
nodes.append(f)
nodes.append(item)
if parentfromtree:
paths[self._get_parents(f)] = f
paths[self._get_parents(item)] = item
else:
paths[f.relpath] = f
paths[item.relpath] = item
if fmt == 'native':
for n in nodes:
self._print_node(n, withpath=True,
for item in nodes:
self._print_node(item, withpath=True,
withdepth=True,
withstorage=True,
recalcparent=parentfromtree,
raw=raw)
elif fmt == 'csv':
for n in nodes:
self._node_to_csv(n, raw=raw)
for item in nodes:
self._node_to_csv(item, raw=raw)
elif fmt.startswith('fzf'):
selected = self._fzf_prompt(paths)
newpaths = dict()
newpaths = {}
subfmt = fmt.replace('fzf-', '')
for s in selected:
if s not in paths:
for item in selected:
if item not in paths:
continue
newpaths[s] = paths[s]
self.print_tree(top, newpaths[s], fmt=subfmt)
newpaths[item] = paths[item]
self.print_tree(top, newpaths[item], fmt=subfmt)
paths = newpaths
if script:
tmp = ['${source}/' + x for x in paths.keys()]
tmp = ['${source}/' + x for x in paths]
tmpstr = ' '.join(tmp)
cmd = f'op=file; source=/media/mnt; $op {tmpstr}'
Logger.info(cmd)
@ -584,7 +586,7 @@ class Noder:
return found
def _callback_find_name(self, term):
'''callback for finding files'''
"""callback for finding files"""
def find_name(node):
if term.lower() in node.name.lower():
return True
@ -596,19 +598,19 @@ class Noder:
###############################################################
def walk(self, top, path, rec=False, fmt='native',
raw=False):
'''
"""
walk the tree for ls based on names
@top: start node
@rec: recursive walk
@fmt: output format
@raw: print raw size
'''
"""
self._debug(f'walking path: \"{path}\"')
r = anytree.resolver.Resolver('name')
resolv = anytree.resolver.Resolver('name')
found = []
try:
found = r.glob(top, path)
found = resolv.glob(top, path)
if len(found) < 1:
# nothing found
return []
@ -627,20 +629,20 @@ class Noder:
withpath=False, withdepth=True, raw=raw)
elif fmt == 'csv':
self._node_to_csv(found[0].parent, raw=raw)
elif fmt == 'fzf':
elif fmt.startswith('fzf'):
pass
# print all found nodes
for f in found:
for item in found:
if fmt == 'native':
self._print_node(f, withpath=False,
self._print_node(item, withpath=False,
pre='- ',
withdepth=True,
raw=raw)
elif fmt == 'csv':
self._node_to_csv(f, raw=raw)
elif fmt == 'fzf':
self._to_fzf(top, f)
self._node_to_csv(item, raw=raw)
elif fmt.startswith('fzf'):
self._to_fzf(top, item, fmt)
except anytree.resolver.ChildResolverError:
pass
@ -650,47 +652,47 @@ class Noder:
# tree creation
###############################################################
def _add_entry(self, name, top, resolv):
'''add an entry to the tree'''
"""add an entry to the tree"""
entries = name.rstrip(os.sep).split(os.sep)
if len(entries) == 1:
self.archive_node(name, name, top, top.name)
return
sub = os.sep.join(entries[:-1])
f = entries[-1]
nodename = entries[-1]
try:
parent = resolv.get(top, sub)
parent = self.archive_node(f, name, parent, top.name)
parent = self.archive_node(nodename, name, parent, top.name)
except anytree.resolver.ChildResolverError:
self.archive_node(f, name, top, top.name)
self.archive_node(nodename, name, top, top.name)
def list_to_tree(self, parent, names):
'''convert list of files to a tree'''
"""convert list of files to a tree"""
if not names:
return
r = anytree.resolver.Resolver('name')
resolv = anytree.resolver.Resolver('name')
for name in names:
name = name.rstrip(os.sep)
self._add_entry(name, parent, r)
self._add_entry(name, parent, resolv)
###############################################################
# diverse
###############################################################
def _sort_tree(self, items):
'''sorting a list of items'''
"""sorting a list of items"""
return sorted(items, key=self._sort, reverse=self.sortsize)
def _sort(self, lst):
'''sort a list'''
"""sort a list"""
if self.sortsize:
return self._sort_size(lst)
return self._sort_fs(lst)
def _sort_fs(self, node):
'''sorting nodes dir first and alpha'''
return (node.type, node.name.lstrip('\.').lower())
"""sorting nodes dir first and alpha"""
return (node.type, node.name.lstrip('.').lower())
def _sort_size(self, node):
'''sorting nodes by size'''
"""sorting nodes by size"""
try:
if not node.size:
return 0
@ -699,7 +701,7 @@ class Noder:
return 0
def _get_storage(self, node):
'''recursively traverse up to find storage'''
"""recursively traverse up to find storage"""
if node.type == self.TYPE_STORAGE:
return node
return node.ancestors[1]
@ -708,7 +710,7 @@ class Noder:
return attr in node.__dict__.keys()
def _get_parents(self, node):
'''get all parents recursively'''
"""get all parents recursively"""
if node.type == self.TYPE_STORAGE:
return ''
if node.type == self.TYPE_TOP:
@ -720,10 +722,10 @@ class Noder:
def _get_hash(self, path):
"""return md5 hash of node"""
return utils.md5sum(path)
return md5sum(path)
def _debug(self, string):
'''print debug'''
"""print debug"""
if not self.debug:
return
Logger.debug(string)

@ -16,29 +16,29 @@ from catcli.logger import Logger
def md5sum(path):
'''calculate md5 sum of a file'''
p = os.path.realpath(path)
if not os.path.exists(p):
Logger.err(f'\nmd5sum - file does not exist: {p}')
"""calculate md5 sum of a file"""
rpath = os.path.realpath(path)
if not os.path.exists(rpath):
Logger.err(f'\nmd5sum - file does not exist: {rpath}')
return None
try:
with open(p, mode='rb') as f:
d = hashlib.md5()
with open(rpath, mode='rb') as file:
hashv = hashlib.md5()
while True:
buf = f.read(4096)
buf = file.read(4096)
if not buf:
break
d.update(buf)
return d.hexdigest()
hashv.update(buf)
return hashv.hexdigest()
except PermissionError:
pass
except OSError as e:
Logger.err(f'md5sum error: {e}')
except OSError as exc:
Logger.err(f'md5sum error: {exc}')
return None
def size_to_str(size, raw=True):
'''convert size to string, optionally human readable'''
"""convert size to string, optionally human readable"""
div = 1024.
suf = ['B', 'K', 'M', 'G', 'T', 'P']
if raw or size < div:
@ -52,28 +52,28 @@ def size_to_str(size, raw=True):
def epoch_to_str(epoch):
'''convert epoch to string'''
"""convert epoch to string"""
if not epoch:
return ''
fmt = '%Y-%m-%d %H:%M:%S'
t = datetime.datetime.fromtimestamp(float(epoch))
return t.strftime(fmt)
timestamp = datetime.datetime.fromtimestamp(float(epoch))
return timestamp.strftime(fmt)
def ask(question):
'''ask the user what to do'''
"""ask the user what to do"""
resp = input(f'{question} [y|N] ? ')
return resp.lower() == 'y'
def edit(string):
'''edit the information with the default EDITOR'''
"""edit the information with the default EDITOR"""
string = string.encode('utf-8')
EDITOR = os.environ.get('EDITOR', 'vim')
with tempfile.NamedTemporaryFile(prefix='catcli', suffix='.tmp') as f:
f.write(string)
f.flush()
subprocess.call([EDITOR, f.name])
f.seek(0)
new = f.read()
editor = os.environ.get('EDITOR', 'vim')
with tempfile.NamedTemporaryFile(prefix='catcli', suffix='.tmp') as file:
file.write(string)
file.flush()
subprocess.call([editor, file.name])
file.seek(0)
new = file.read()
return new.decode('utf-8')

@ -0,0 +1,6 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
"""
__version__ = '0.8.7'

@ -12,57 +12,58 @@ from catcli.logger import Logger
class Walker:
"""a filesystem walker"""
MAXLINE = 80 - 15
def __init__(self, noder, hash=True, debug=False,
def __init__(self, noder, usehash=True, debug=False,
logpath=None):
'''
"""
@noder: the noder to use
@hash: calculate hash of nodes
@debug: debug mode
@logpath: path where to log catalog changes on reindex
'''
"""
self.noder = noder
self.hash = hash
self.noder.set_hashing(self.hash)
self.usehash = usehash
self.noder.set_hashing(self.usehash)
self.debug = debug
self.lpath = logpath
def index(self, path, parent, name, storagepath=''):
'''
"""
index a directory and store in tree
@path: path to index
@parent: parent node
@name: this stoarge name
'''
"""
self._debug(f'indexing starting at {path}')
if not parent:
parent = self.noder.dir_node(name, path, parent)
if os.path.islink(path):
rel = os.readlink(path)
ab = os.path.join(path, rel)
if os.path.isdir(ab):
abspath = os.path.join(path, rel)
if os.path.isdir(abspath):
return parent, 0
cnt = 0
for (root, dirs, files) in os.walk(path):
for f in files:
self._debug(f'found file {f} under {path}')
sub = os.path.join(root, f)
for file in files:
self._debug(f'found file {file} under {path}')
sub = os.path.join(root, file)
if not os.path.exists(sub):
continue
self._progress(f)
self._progress(file)
self._debug(f'index file {sub}')
n = self.noder.file_node(os.path.basename(f), sub,
parent, storagepath)
if n:
node = self.noder.file_node(os.path.basename(file), sub,
parent, storagepath)
if node:
cnt += 1
for d in dirs:
self._debug(f'found dir {d} under {path}')
base = os.path.basename(d)
sub = os.path.join(root, d)
for adir in dirs:
self._debug(f'found dir {adir} under {path}')
base = os.path.basename(adir)
sub = os.path.join(root, adir)
self._debug(f'index directory {sub}')
if not os.path.exists(sub):
continue
@ -80,40 +81,40 @@ class Walker:
return parent, cnt
def reindex(self, path, parent, top):
'''reindex a directory and store in tree'''
"""reindex a directory and store in tree"""
cnt = self._reindex(path, parent, top)
cnt += self.noder.clean_not_flagged(parent)
return cnt
def _reindex(self, path, parent, top, storagepath=''):
'''
"""
reindex a directory and store in tree
@path: directory path to re-index
@top: top node (storage)
@storagepath: rel path relative to indexed directory
'''
"""
self._debug(f'reindexing starting at {path}')
cnt = 0
for (root, dirs, files) in os.walk(path):
for f in files:
self._debug(f'found file \"{f}\" under {path}')
sub = os.path.join(root, f)
treepath = os.path.join(storagepath, f)
reindex, n = self._need_reindex(parent, sub, treepath)
for file in files:
self._debug(f'found file \"{file}\" under {path}')
sub = os.path.join(root, file)
treepath = os.path.join(storagepath, file)
reindex, node = self._need_reindex(parent, sub, treepath)
if not reindex:
self._debug(f'\tskip file {sub}')
self.noder.flag(n)
self.noder.flag(node)
continue
self._log2file(f'update catalog for \"{sub}\"')
n = self.noder.file_node(os.path.basename(f), sub,
parent, storagepath)
self.noder.flag(n)
node = self.noder.file_node(os.path.basename(file), sub,
parent, storagepath)
self.noder.flag(node)
cnt += 1
for d in dirs:
self._debug(f'found dir \"{d}\" under {path}')
base = os.path.basename(d)
sub = os.path.join(root, d)
treepath = os.path.join(storagepath, d)
for adir in dirs:
self._debug(f'found dir \"{adir}\" under {path}')
base = os.path.basename(adir)
sub = os.path.join(root, adir)
treepath = os.path.join(storagepath, adir)
reindex, dummy = self._need_reindex(parent, sub, treepath)
if reindex:
self._log2file(f'update catalog for \"{sub}\"')
@ -130,12 +131,12 @@ class Walker:
return cnt
def _need_reindex(self, top, path, treepath):
'''
"""
test if node needs re-indexing
@top: top node (storage)
@path: abs path to file
@treepath: rel path from indexed directory
'''
"""
cnode, changed = self.noder.get_node_if_changed(top, path, treepath)
if not cnode:
self._debug(f'\t{path} does not exist')
@ -152,13 +153,13 @@ class Walker:
return True, cnode
def _debug(self, string):
'''print to debug'''
"""print to debug"""
if not self.debug:
return
Logger.debug(string)
def _progress(self, string):
'''print progress'''
"""print progress"""
if self.debug:
return
if not string:
@ -170,7 +171,7 @@ class Walker:
Logger.progr(f'indexing: {string:80}')
def _log2file(self, string):
'''log to file'''
"""log to file"""
if not self.lpath:
return
line = f'{string}\n'

@ -13,8 +13,22 @@ pycodestyle tests/
pyflakes catcli/
pyflakes tests/
pylint catcli/
pylint tests/
# R0914: Too many local variables
# R0913: Too many arguments
# R0912: Too many branches
# R0915: Too many statements
pylint \
--disable=R0914 \
--disable=R0913 \
--disable=R0912 \
--disable=R0915 \
catcli/
pylint \
--disable=W0212 \
--disable=R0914 \
--disable=R0915 \
--disable=R0801 \
tests/
nosebin="nose2"
PYTHONPATH=catcli ${nosebin} --with-coverage --coverage=catcli

@ -21,32 +21,32 @@ TMPSUFFIX = '.catcli'
def get_rnd_string(length):
'''Get a random string of specific length '''
"""Get a random string of specific length """
alpha = string.ascii_uppercase + string.digits
return ''.join(random.choice(alpha) for _ in range(length))
def md5sum(path):
'''calculate md5 sum of a file'''
p = os.path.realpath(path)
if not os.path.exists(p):
"""calculate md5 sum of a file"""
rpath = os.path.realpath(path)
if not os.path.exists(rpath):
return None
try:
with open(p, mode='rb') as f:
d = hashlib.md5()
with open(rpath, mode='rb') as file:
val = hashlib.md5()
while True:
buf = f.read(4096)
buf = file.read(4096)
if not buf:
break
d.update(buf)
return d.hexdigest()
val.update(buf)
return val.hexdigest()
except PermissionError:
pass
return None
def clean(path):
'''Delete file or folder.'''
"""Delete file or folder."""
if not os.path.exists(path):
return
if os.path.islink(path):
@ -58,10 +58,12 @@ def clean(path):
def edit_file(path, newcontent):
"""edit file content"""
return write_to_file(path, newcontent)
def unix_tree(path):
"""print using unix tree tool"""
if not os.path.exists(path):
return
# cmd = ['tree', path]
@ -75,7 +77,7 @@ def unix_tree(path):
def create_tree():
''' create a random tree of files and directories '''
""" create a random tree of files and directories """
dirpath = get_tempdir()
# create 3 files
create_rnd_file(dirpath, get_rnd_string(5))
@ -83,13 +85,13 @@ def create_tree():
create_rnd_file(dirpath, get_rnd_string(5))
# create 2 directories
d1 = create_dir(dirpath, get_rnd_string(3))
d2 = create_dir(dirpath, get_rnd_string(3))
dir1 = create_dir(dirpath, get_rnd_string(3))
dir2 = create_dir(dirpath, get_rnd_string(3))
# fill directories
create_rnd_file(d1, get_rnd_string(4))
create_rnd_file(d1, get_rnd_string(4))
create_rnd_file(d2, get_rnd_string(6))
create_rnd_file(dir1, get_rnd_string(4))
create_rnd_file(dir1, get_rnd_string(4))
create_rnd_file(dir2, get_rnd_string(6))
return dirpath
@ -99,12 +101,12 @@ def create_tree():
def get_tempdir():
'''Get a temporary directory '''
"""Get a temporary directory """
return tempfile.mkdtemp(suffix=TMPSUFFIX)
def create_dir(path, dirname):
'''Create a directory '''
"""Create a directory """
fpath = os.path.join(path, dirname)
if not os.path.exists(fpath):
os.mkdir(fpath)
@ -112,7 +114,7 @@ def create_dir(path, dirname):
def create_rnd_file(path, filename, content=None):
'''Create the file filename in path with random content if None '''
"""Create the file filename in path with random content if None """
if not content:
content = get_rnd_string(100)
fpath = os.path.join(path, filename)
@ -120,23 +122,25 @@ def create_rnd_file(path, filename, content=None):
def write_to_file(path, content):
with open(path, 'w') as f:
f.write(content)
"""write content to file"""
with open(path, 'w', encoding='UTF-8') as file:
file.write(content)
return path
def read_from_file(path):
"""read file content"""
if not os.path.exists(path):
return ''
with open(path, 'r') as f:
content = f.read()
with open(path, 'r', encoding='UTF-8') as file:
content = file.read()
return content
############################################################
# fake tree in json
############################################################
FAKECATALOG = '''
FAKECATALOG = """
{
"children": [
{
@ -214,9 +218,9 @@ FAKECATALOG = '''
"name": "top",
"type": "top"
}
'''
"""
def get_fakecatalog():
# catalog constructed through test_index
"""catalog constructed through test_index"""
return FAKECATALOG

@ -14,8 +14,10 @@ from tests.helpers import get_fakecatalog
class TestFind(unittest.TestCase):
"""test find"""
def test_find(self):
"""test find"""
# init
catalog = Catalog('fake', force=True, debug=False)
top = catalog._restore_json(get_fakecatalog())
@ -38,6 +40,7 @@ class TestFind(unittest.TestCase):
def main():
"""entry point"""
unittest.main()

@ -16,8 +16,10 @@ from tests.helpers import clean, get_fakecatalog
class TestGraph(unittest.TestCase):
"""test graph"""
def test_graph(self):
"""test graph"""
# init
path = 'fake'
gpath = tempfile.gettempdir() + os.sep + 'graph.dot'
@ -38,6 +40,7 @@ class TestGraph(unittest.TestCase):
def main():
"""entry point"""
unittest.main()

@ -16,8 +16,10 @@ from tests.helpers import get_tempdir, create_rnd_file, clean, \
class TestIndexing(unittest.TestCase):
"""test index"""
def test_index(self):
"""test index"""
# init
workingdir = get_tempdir()
catalogpath = create_rnd_file(workingdir, 'catalog.json', content='')
@ -27,18 +29,18 @@ class TestIndexing(unittest.TestCase):
self.addCleanup(clean, dirpath)
# create 3 files
f1 = create_rnd_file(dirpath, get_rnd_string(5))
f2 = create_rnd_file(dirpath, get_rnd_string(5))
f3 = create_rnd_file(dirpath, get_rnd_string(5))
file1 = create_rnd_file(dirpath, get_rnd_string(5))
file2 = create_rnd_file(dirpath, get_rnd_string(5))
file3 = create_rnd_file(dirpath, get_rnd_string(5))
# create 2 directories
d1 = create_dir(dirpath, get_rnd_string(3))
d2 = create_dir(dirpath, get_rnd_string(3))
dir1 = create_dir(dirpath, get_rnd_string(3))
dir2 = create_dir(dirpath, get_rnd_string(3))
# fill directories with files
_ = create_rnd_file(d1, get_rnd_string(4))
_ = create_rnd_file(d1, get_rnd_string(4))
_ = create_rnd_file(d2, get_rnd_string(6))
_ = create_rnd_file(dir1, get_rnd_string(4))
_ = create_rnd_file(dir1, get_rnd_string(4))
_ = create_rnd_file(dir2, get_rnd_string(6))
noder = Noder()
top = noder.new_top_node()
@ -61,20 +63,21 @@ class TestIndexing(unittest.TestCase):
# ensures files and directories are in
names = [x.name for x in storage.children]
self.assertTrue(os.path.basename(f1) in names)
self.assertTrue(os.path.basename(f2) in names)
self.assertTrue(os.path.basename(f3) in names)
self.assertTrue(os.path.basename(d1) in names)
self.assertTrue(os.path.basename(d2) in names)
self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names)
self.assertTrue(os.path.basename(file3) in names)
self.assertTrue(os.path.basename(dir1) in names)
self.assertTrue(os.path.basename(dir2) in names)
for node in storage.children:
if node.name == os.path.basename(d1):
if node.name == os.path.basename(dir1):
self.assertTrue(len(node.children) == 2)
elif node.name == os.path.basename(d2):
elif node.name == os.path.basename(dir2):
self.assertTrue(len(node.children) == 1)
def main():
"""entry point"""
unittest.main()

@ -14,8 +14,10 @@ from tests.helpers import get_fakecatalog, clean
class TestWalking(unittest.TestCase):
"""test ls"""
def test_ls(self):
"""test ls"""
# init
path = 'fake'
self.addCleanup(clean, path)
@ -56,6 +58,7 @@ class TestWalking(unittest.TestCase):
def main():
"""entry point"""
unittest.main()

@ -14,8 +14,10 @@ from tests.helpers import clean, get_fakecatalog
class TestRm(unittest.TestCase):
"""test rm"""
def test_rm(self):
"""test rm"""
# init
path = 'fake'
self.addCleanup(clean, path)
@ -48,6 +50,7 @@ class TestRm(unittest.TestCase):
def main():
"""entry point"""
unittest.main()

@ -14,8 +14,10 @@ from tests.helpers import clean, get_fakecatalog
class TestTree(unittest.TestCase):
"""Test the tree"""
def test_tree(self):
"""test the tree"""
# init
path = 'fake'
self.addCleanup(clean, path)
@ -37,6 +39,7 @@ class TestTree(unittest.TestCase):
def main():
"""entry point"""
unittest.main()

@ -7,18 +7,20 @@ Basic unittest for updating an index
import unittest
import os
import anytree
from catcli.catcli import cmd_index, cmd_update
from catcli.noder import Noder
from catcli.catalog import Catalog
from tests.helpers import create_dir, create_rnd_file, get_tempdir, \
clean, unix_tree, edit_file, read_from_file, md5sum
import anytree
class TestIndexing(unittest.TestCase):
class TestUpdate(unittest.TestCase):
"""test update"""
def test_index(self):
def test_update(self):
"""test update"""
# init
workingdir = get_tempdir()
catalogpath = create_rnd_file(workingdir, 'catalog.json', content='')
@ -28,20 +30,20 @@ class TestIndexing(unittest.TestCase):
self.addCleanup(clean, dirpath)
# create 3 files
f1 = create_rnd_file(dirpath, 'file1')
f2 = create_rnd_file(dirpath, 'file2')
f3 = create_rnd_file(dirpath, 'file3')
f4 = create_rnd_file(dirpath, 'file4')
file1 = create_rnd_file(dirpath, 'file1')
file2 = create_rnd_file(dirpath, 'file2')
file3 = create_rnd_file(dirpath, 'file3')
file4 = create_rnd_file(dirpath, 'file4')
# create 2 directories
d1 = create_dir(dirpath, 'dir1')
d2 = create_dir(dirpath, 'dir2')
dir1 = create_dir(dirpath, 'dir1')
dir2 = create_dir(dirpath, 'dir2')
# fill directories with files
d1f1 = create_rnd_file(d1, 'dir1file1')
d1f2 = create_rnd_file(d1, 'dir1file2')
d2f1 = create_rnd_file(d2, 'dir2file1')
d2f2 = create_rnd_file(d2, 'dir2file2')
d1f1 = create_rnd_file(dir1, 'dir1file1')
d1f2 = create_rnd_file(dir1, 'dir1file2')
d2f1 = create_rnd_file(dir2, 'dir2file1')
d2f2 = create_rnd_file(dir2, 'dir2file2')
noder = Noder(debug=True)
noder.set_hashing(True)
@ -49,7 +51,7 @@ class TestIndexing(unittest.TestCase):
catalog = Catalog(catalogpath, force=True, debug=False)
# get checksums
f4_md5 = md5sum(f4)
f4_md5 = md5sum(file4)
self.assertTrue(f4_md5)
d1f1_md5 = md5sum(d1f1)
self.assertTrue(d1f1_md5)
@ -69,7 +71,7 @@ class TestIndexing(unittest.TestCase):
self.assertTrue(os.stat(catalogpath).st_size != 0)
# ensure md5 sum are in
nods = noder.find_name(top, os.path.basename(f4))
nods = noder.find_name(top, os.path.basename(file4))
self.assertTrue(len(nods) == 1)
nod = nods[0]
self.assertTrue(nod)
@ -79,34 +81,34 @@ class TestIndexing(unittest.TestCase):
noder.print_tree(top, top)
# add some files and directories
new1 = create_rnd_file(d1, 'newf1')
new1 = create_rnd_file(dir1, 'newf1')
new2 = create_rnd_file(dirpath, 'newf2')
new3 = create_dir(dirpath, 'newd3')
new4 = create_dir(d2, 'newd4')
new4 = create_dir(dir2, 'newd4')
new5 = create_rnd_file(new4, 'newf5')
unix_tree(dirpath)
# modify files
EDIT = 'edited'
edit_file(d1f1, EDIT)
editval = 'edited'
edit_file(d1f1, editval)
d1f1_md5_new = md5sum(d1f1)
self.assertTrue(d1f1_md5_new)
self.assertTrue(d1f1_md5_new != d1f1_md5)
# change file without mtime
maccess = os.path.getmtime(f4)
EDIT = 'edited'
edit_file(f4, EDIT)
maccess = os.path.getmtime(file4)
editval = 'edited'
edit_file(file4, editval)
# reset edit time
os.utime(f4, (maccess, maccess))
os.utime(file4, (maccess, maccess))
f4_md5_new = md5sum(d1f1)
self.assertTrue(f4_md5_new)
self.assertTrue(f4_md5_new != f4_md5)
# change file without mtime
maccess = os.path.getmtime(d2f2)
EDIT = 'edited'
edit_file(d2f2, EDIT)
editval = 'edited'
edit_file(d2f2, editval)
# reset edit time
os.utime(d2f2, (maccess, maccess))
d2f2_md5_new = md5sum(d2f2)
@ -134,7 +136,7 @@ class TestIndexing(unittest.TestCase):
self.assertTrue(nod.md5 == d1f1_md5_new)
# ensure f4 md5 sum has changed in catalog
nods = noder.find_name(top, os.path.basename(f4))
nods = noder.find_name(top, os.path.basename(file4))
self.assertTrue(len(nods) == 1)
nod = nods[0]
self.assertTrue(nod)
@ -152,14 +154,14 @@ class TestIndexing(unittest.TestCase):
# ensures files and directories are in
names = [node.name for node in anytree.PreOrderIter(storage)]
print(names)
self.assertTrue(os.path.basename(f1) in names)
self.assertTrue(os.path.basename(f2) in names)
self.assertTrue(os.path.basename(f3) in names)
self.assertTrue(os.path.basename(f4) in names)
self.assertTrue(os.path.basename(d1) in names)
self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names)
self.assertTrue(os.path.basename(file3) in names)
self.assertTrue(os.path.basename(file4) in names)
self.assertTrue(os.path.basename(dir1) in names)
self.assertTrue(os.path.basename(d1f1) in names)
self.assertTrue(os.path.basename(d1f2) in names)
self.assertTrue(os.path.basename(d2) in names)
self.assertTrue(os.path.basename(dir2) in names)
self.assertTrue(os.path.basename(d2f1) in names)
self.assertTrue(os.path.basename(new1) in names)
self.assertTrue(os.path.basename(new2) in names)
@ -168,19 +170,19 @@ class TestIndexing(unittest.TestCase):
self.assertTrue(os.path.basename(new5) in names)
for node in storage.children:
if node.name == os.path.basename(d1):
if node.name == os.path.basename(dir1):
self.assertTrue(len(node.children) == 3)
elif node.name == os.path.basename(d2):
elif node.name == os.path.basename(dir2):
self.assertTrue(len(node.children) == 3)
elif node.name == os.path.basename(new3):
self.assertTrue(len(node.children) == 0)
elif node.name == os.path.basename(new4):
self.assertTrue(len(node.children) == 1)
self.assertTrue(read_from_file(d1f1) == EDIT)
self.assertTrue(read_from_file(d1f1) == editval)
# remove some files
clean(d1f1)
clean(d2)
clean(dir2)
clean(new2)
clean(new4)
@ -190,14 +192,14 @@ class TestIndexing(unittest.TestCase):
# ensures files and directories are (not) in
names = [node.name for node in anytree.PreOrderIter(storage)]
print(names)
self.assertTrue(os.path.basename(f1) in names)
self.assertTrue(os.path.basename(f2) in names)
self.assertTrue(os.path.basename(f3) in names)
self.assertTrue(os.path.basename(f4) in names)
self.assertTrue(os.path.basename(d1) in names)
self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names)
self.assertTrue(os.path.basename(file3) in names)
self.assertTrue(os.path.basename(file4) in names)
self.assertTrue(os.path.basename(dir1) in names)
self.assertTrue(os.path.basename(d1f1) not in names)
self.assertTrue(os.path.basename(d1f2) in names)
self.assertTrue(os.path.basename(d2) not in names)
self.assertTrue(os.path.basename(dir2) not in names)
self.assertTrue(os.path.basename(d2f1) not in names)
self.assertTrue(os.path.basename(d2f1) not in names)
self.assertTrue(os.path.basename(new1) in names)
@ -206,13 +208,14 @@ class TestIndexing(unittest.TestCase):
self.assertTrue(os.path.basename(new4) not in names)
self.assertTrue(os.path.basename(new5) not in names)
for node in storage.children:
if node.name == os.path.basename(d1):
if node.name == os.path.basename(dir1):
self.assertTrue(len(node.children) == 2)
elif node.name == os.path.basename(new3):
self.assertTrue(len(node.children) == 0)
def main():
"""entry point"""
unittest.main()

Loading…
Cancel
Save