Merge pull request #29 from deadc0de6/fzf

Fzf
fix-36
deadc0de 1 year ago committed by GitHub
commit 761ea54e1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -2,14 +2,14 @@ name: tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
strategy:
matrix:
python-version: [3.5, 3.6, 3.7, 3.8]
python-version: ["3.6", "3.7", "3.8", "3.9", "3.10"]
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies

1
.gitignore vendored

@ -3,3 +3,4 @@
dist/
build/
*.egg-info/
*.catalog

@ -43,7 +43,7 @@ pip3 install catcli --user
# index a directory in the catalog
catcli index --meta='some description' log /var/log
# display the content
catcli tree
catcli ls -r
# navigate
catcli ls log
# find files/directories named '*log*'
@ -72,7 +72,7 @@ See the [examples](#examples) for an overview of the available features.
* [Index archive files](#index-archive-files)
* [Walk indexed files with ls](#walk-indexed-files-with-ls)
* [Find files](#find-files)
* [Display entire tree](#display-entire-tree)
* [Display entire hierarchy](#display-entire-hierarchy)
* [Catalog graph](#catalog-graph)
* [Edit storage](#edit-storage)
* [Update catalog](#update-catalog)
@ -121,7 +121,7 @@ and they are all available through the command line interface of catcli.
Five different types of entry are present in a catalog:
* **top node**: this is the root of the tree
* **top node**: this is the root of the hierarchy
* **storage node**: this represents an indexed storage (a DVD, an external
hard drive, an USB drive, some arbitrary directory, etc).
* **dir node**: this is a directory
@ -177,9 +177,9 @@ Files and directories can be found based on their names
using the `find` command.
See the [examples](#examples) for more.
## Display entire tree
## Display entire hierarchy
The entire catalog can be shown using the `tree` command.
The entire catalog can be shown using the `ls -r` command.
Resulting files can be sorted by size using the `-S --sortsize` switch.
See the [examples](#examples) for more.
@ -267,10 +267,10 @@ $ catcli index --meta='my test directory' tmptest /tmp/test
Catcli creates its catalog file in the current directory as `catcli.catalog`.
Printing the entire catalog as a tree is done with the command `tree`
Printing the entire catalog as a tree is done with the command `ls -r`
```
$ catcli tree
$ catcli ls -r
top
└── storage: tmptest (my test directory) (nbfiles:3, free:3.7G/3.7G, date:2019-01-26 19:59:47)
├── a [nbfiles:3, totsize:72]
@ -391,8 +391,6 @@ $ catcli ls -ar some-name/v0.3.1.zip
└── catcli-0.3.1/ [archive:v0.3.1.zip]
```
All commands handle archive files (like `tree` or `find`).
# Contribution
If you are having trouble installing or using catcli, open an issue.

@ -3,12 +3,12 @@ author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2017, deadc0de6
"""
# pylint: disable=C0415
import sys
__version__ = '0.8.7'
def main():
"""entry point"""
import catcli.catcli
if catcli.catcli.main():
sys.exit(0)

@ -11,54 +11,65 @@ from anytree.exporter import JsonExporter
from anytree.importer import JsonImporter
# local imports
import catcli.utils as utils
from catcli.utils import ask
from catcli.logger import Logger
class Catalog:
"""the catalog"""
def __init__(self, path, pickle=False, debug=False, force=False):
'''
def __init__(self, path, usepickle=False, debug=False, force=False):
"""
@path: catalog path
@pickle: use pickle
@usepickle: use pickle
@debug: debug mode
@force: force overwrite if exists
'''
"""
self.path = path
self.debug = debug
self.force = force
self.metanode = None
self.pickle = pickle
self.pickle = usepickle
def set_metanode(self, metanode):
'''remove the metanode until tree is re-written'''
"""remove the metanode until tree is re-written"""
self.metanode = metanode
self.metanode.parent = None
def exists(self):
"""does catalog exist"""
if not self.path:
return False
if os.path.exists(self.path):
return True
return False
def restore(self):
'''restore the catalog'''
"""restore the catalog"""
if not self.path:
return None
if not os.path.exists(self.path):
return None
if self.pickle:
return self._restore_pickle()
return self._restore_json(open(self.path, 'r').read())
with open(self.path, 'r', encoding='UTF-8') as file:
content = file.read()
return self._restore_json(content)
def save(self, node):
'''save the catalog'''
"""save the catalog"""
if not self.path:
Logger.err('Path not defined')
return False
d = os.path.dirname(self.path)
if d and not os.path.exists(d):
os.makedirs(d)
directory = os.path.dirname(self.path)
if directory and not os.path.exists(directory):
os.makedirs(directory)
elif os.path.exists(self.path) and not self.force:
if not utils.ask('Update catalog \"{}\"'.format(self.path)):
if not ask(f'Update catalog \"{self.path}\"'):
Logger.info('Catalog not saved')
return False
if d and not os.path.exists(d):
Logger.err('Cannot write to \"{}\"'.format(d))
if directory and not os.path.exists(directory):
Logger.err(f'Cannot write to \"{directory}\"')
return False
if self.metanode:
self.metanode.parent = node
@ -72,29 +83,31 @@ class Catalog:
Logger.debug(text)
def _save_pickle(self, node):
'''pickle the catalog'''
pickle.dump(node, open(self.path, 'wb'))
self._debug('Catalog saved to pickle \"{}\"'.format(self.path))
"""pickle the catalog"""
with open(self.path, 'wb') as file:
pickle.dump(node, file)
self._debug(f'Catalog saved to pickle \"{self.path}\"')
return True
def _restore_pickle(self):
'''restore the pickled tree'''
root = pickle.load(open(self.path, 'rb'))
m = 'Catalog imported from pickle \"{}\"'.format(self.path)
self._debug(m)
"""restore the pickled tree"""
with open(self.path, 'rb') as file:
root = pickle.load(file)
msg = f'Catalog imported from pickle \"{self.path}\"'
self._debug(msg)
return root
def _save_json(self, node):
'''export the catalog in json'''
"""export the catalog in json"""
exp = JsonExporter(indent=2, sort_keys=True)
with open(self.path, 'w') as f:
exp.write(node, f)
self._debug('Catalog saved to json \"{}\"'.format(self.path))
with open(self.path, 'w', encoding='UTF-8') as file:
exp.write(node, file)
self._debug(f'Catalog saved to json \"{self.path}\"')
return True
def _restore_json(self, string):
'''restore the tree from json'''
"""restore the tree from json"""
imp = JsonImporter()
root = imp.import_(string)
self._debug('Catalog imported from json \"{}\"'.format(self.path))
self._debug(f'Catalog imported from json \"{self.path}\"')
return root

@ -14,44 +14,46 @@ import datetime
from docopt import docopt
# local imports
from . import __version__ as VERSION
from .version import __version__ as VERSION
from .logger import Logger
from .colors import Colors
from .catalog import Catalog
from .walker import Walker
from .noder import Noder
from .utils import ask, edit
from .exceptions import BadFormatException, CatcliException
NAME = 'catcli'
CUR = os.path.dirname(os.path.abspath(__file__))
CATALOGPATH = '{}.catalog'.format(NAME)
GRAPHPATH = '/tmp/{}.dot'.format(NAME)
CATALOGPATH = f'{NAME}.catalog'
GRAPHPATH = f'/tmp/{NAME}.dot'
SEPARATOR = '/'
WILD = '*'
FORMATS = ['native', 'csv', 'csv-with-header', 'fzf-native', 'fzf-csv']
BANNER = """ +-+-+-+-+-+-+
BANNER = f""" +-+-+-+-+-+-+
|c|a|t|c|l|i|
+-+-+-+-+-+-+ v{}""".format(VERSION)
+-+-+-+-+-+-+ v{VERSION}"""
USAGE = """
{0}
USAGE = f"""
{BANNER}
Usage:
{1} ls [--catalog=<path>] [--format=<fmt>] [-aBCrVSs] [<path>]
{1} find [--catalog=<path>] [--format=<fmt>] [-aBCbdVsP] [--path=<path>] <term>
{1} tree [--catalog=<path>] [--format=<fmt>] [-aBCVSsH] [<path>]
{1} index [--catalog=<path>] [--meta=<meta>...] [-aBCcfnV] <name> <path>
{1} update [--catalog=<path>] [-aBCcfnV] [--lpath=<path>] <name> <path>
{1} rm [--catalog=<path>] [-BCfV] <storage>
{1} rename [--catalog=<path>] [-BCfV] <storage> <name>
{1} edit [--catalog=<path>] [-BCfV] <storage>
{1} graph [--catalog=<path>] [-BCV] [<path>]
{1} print_supported_formats
{1} help
{1} --help
{1} --version
{NAME} ls [--catalog=<path>] [--format=<fmt>] [-aBCrVSs] [<path>]
{NAME} find [--catalog=<path>] [--format=<fmt>] [-aBCbdVsP] [--path=<path>] [<term>]
{NAME} index [--catalog=<path>] [--meta=<meta>...] [-aBCcfnV] <name> <path>
{NAME} update [--catalog=<path>] [-aBCcfnV] [--lpath=<path>] <name> <path>
{NAME} rm [--catalog=<path>] [-BCfV] <storage>
{NAME} rename [--catalog=<path>] [-BCfV] <storage> <name>
{NAME} edit [--catalog=<path>] [-BCfV] <storage>
{NAME} graph [--catalog=<path>] [-BCV] [<path>]
{NAME} print_supported_formats
{NAME} help
{NAME} --help
{NAME} --version
Options:
--catalog=<path> Path to the catalog [default: {2}].
--catalog=<path> Path to the catalog [default: {CATALOGPATH}].
--meta=<meta> Additional attribute to store [default: ].
-a --archive Handle archive file [default: False].
-B --no-banner Do not display the banner [default: False].
@ -61,7 +63,6 @@ Options:
-d --directory Only directory [default: False].
-F --format=<fmt> Print format, see command \"print_supported_formats\" [default: native].
-f --force Do not ask when updating the catalog [default: False].
-H --header Print header on CSV format [default: False].
-l --lpath=<path> Path where changes are logged [default: ]
-n --no-subsize Do not store size of directories [default: False].
-P --parent Ignore stored relpath [default: True].
@ -72,22 +73,23 @@ Options:
-V --verbose Be verbose [default: False].
-v --version Show version.
-h --help Show this screen.
""".format(BANNER, NAME, CATALOGPATH) # nopep8
""" # nopep8
def cmd_index(args, noder, catalog, top):
"""index action"""
path = args['<path>']
name = args['<name>']
hash = args['--hash']
usehash = args['--hash']
debug = args['--verbose']
subsize = not args['--no-subsize']
if not os.path.exists(path):
Logger.err('\"{}\" does not exist'.format(path))
Logger.err(f'\"{path}\" does not exist')
return
if name in noder.get_storage_names(top):
try:
if not ask('Overwrite storage \"{}\"'.format(name)):
Logger.err('storage named \"{}\" already exist'.format(name))
if not ask(f'Overwrite storage \"{name}\"'):
Logger.err(f'storage named \"{name}\" already exist')
return
except KeyboardInterrupt:
Logger.err('aborted')
@ -96,114 +98,116 @@ def cmd_index(args, noder, catalog, top):
node.parent = None
start = datetime.datetime.now()
walker = Walker(noder, hash=hash, debug=debug)
walker = Walker(noder, usehash=usehash, debug=debug)
attr = noder.format_storage_attr(args['--meta'])
root = noder.storage_node(name, path, parent=top, attr=attr)
root = noder.new_storage_node(name, path, parent=top, attr=attr)
_, cnt = walker.index(path, root, name)
if subsize:
noder.rec_size(root)
noder.rec_size(root, store=True)
stop = datetime.datetime.now()
Logger.info('Indexed {} file(s) in {}'.format(cnt, stop - start))
diff = stop - start
Logger.info(f'Indexed {cnt} file(s) in {diff}')
if cnt > 0:
catalog.save(top)
def cmd_update(args, noder, catalog, top):
"""update action"""
path = args['<path>']
name = args['<name>']
hash = args['--hash']
usehash = args['--hash']
logpath = args['--lpath']
debug = args['--verbose']
subsize = not args['--no-subsize']
if not os.path.exists(path):
Logger.err('\"{}\" does not exist'.format(path))
Logger.err(f'\"{path}\" does not exist')
return
root = noder.get_storage_node(top, name, path=path)
if not root:
Logger.err('storage named \"{}\" does not exist'.format(name))
Logger.err(f'storage named \"{name}\" does not exist')
return
start = datetime.datetime.now()
walker = Walker(noder, hash=hash, debug=debug,
walker = Walker(noder, usehash=usehash, debug=debug,
logpath=logpath)
cnt = walker.reindex(path, root, top)
if subsize:
noder.rec_size(root)
noder.rec_size(root, store=True)
stop = datetime.datetime.now()
Logger.info('updated {} file(s) in {}'.format(cnt, stop - start))
diff = stop - start
Logger.info(f'updated {cnt} file(s) in {diff}')
if cnt > 0:
catalog.save(top)
def cmd_ls(args, noder, top):
"""ls action"""
path = args['<path>']
if not path:
path = SEPARATOR
if not path.startswith(SEPARATOR):
path = SEPARATOR + path
pre = '{}{}'.format(SEPARATOR, noder.TOPNAME)
# prepend with top node path
pre = f'{SEPARATOR}{noder.NAME_TOP}'
if not path.startswith(pre):
path = pre + path
# ensure ends with a separator
if not path.endswith(SEPARATOR):
path += SEPARATOR
# add wild card
if not path.endswith(WILD):
path += WILD
found = noder.walk(top, path,
fmt = args['--format']
if fmt.startswith('fzf'):
raise BadFormatException('fzf is not supported in ls, use find')
found = noder.list(top, path,
rec=args['--recursive'],
fmt=args['--format'],
fmt=fmt,
raw=args['--raw-size'])
if not found:
Logger.err('\"{}\": nothing found'.format(args['<path>']))
path = args['<path>']
Logger.err(f'\"{path}\": nothing found')
return found
def cmd_rm(args, noder, catalog, top):
"""rm action"""
name = args['<storage>']
node = noder.get_storage_node(top, name)
if node:
node.parent = None
if catalog.save(top):
Logger.info('Storage \"{}\" removed'.format(name))
Logger.info(f'Storage \"{name}\" removed')
else:
Logger.err('Storage named \"{}\" does not exist'.format(name))
Logger.err(f'Storage named \"{name}\" does not exist')
return top
def cmd_find(args, noder, top):
"""find action"""
fromtree = args['--parent']
directory = args['--directory']
startpath = args['--path']
fmt = args['--format']
raw = args['--raw-size']
return noder.find_name(top, args['<term>'], script=args['--script'],
startpath=startpath, directory=directory,
script = args['--script']
search_for = args['<term>']
return noder.find_name(top, search_for, script=script,
startpath=startpath, only_dir=directory,
parentfromtree=fromtree, fmt=fmt, raw=raw)
def cmd_tree(args, noder, top):
path = args['<path>']
fmt = args['--format']
hdr = args['--header']
raw = args['--raw-size']
# find node to start with
node = top
if path:
node = noder.get_node(top, path)
if node:
# print the tree
noder.print_tree(node, fmt=fmt, header=hdr, raw=raw)
def cmd_graph(args, noder, top):
"""graph action"""
path = args['<path>']
if not path:
path = GRAPHPATH
cmd = noder.to_dot(top, path)
Logger.info('create graph with \"{}\" (you need graphviz)'.format(cmd))
Logger.info(f'create graph with \"{cmd}\" (you need graphviz)')
def cmd_rename(args, noder, catalog, top):
def cmd_rename(args, catalog, top):
"""rename action"""
storage = args['<storage>']
new = args['<name>']
storages = list(x.name for x in top.children)
@ -211,14 +215,15 @@ def cmd_rename(args, noder, catalog, top):
node = next(filter(lambda x: x.name == storage, top.children))
node.name = new
if catalog.save(top):
m = 'Storage \"{}\" renamed to \"{}\"'.format(storage, new)
Logger.info(m)
msg = f'Storage \"{storage}\" renamed to \"{new}\"'
Logger.info(msg)
else:
Logger.err('Storage named \"{}\" does not exist'.format(storage))
Logger.err(f'Storage named \"{storage}\" does not exist')
return top
def cmd_edit(args, noder, catalog, top):
"""edit action"""
storage = args['<storage>']
storages = list(x.name for x in top.children)
if storage in storages:
@ -229,18 +234,29 @@ def cmd_edit(args, noder, catalog, top):
new = edit(attr)
node.attr = noder.format_storage_attr(new)
if catalog.save(top):
Logger.info('Storage \"{}\" edited'.format(storage))
Logger.info(f'Storage \"{storage}\" edited')
else:
Logger.err('Storage named \"{}\" does not exist'.format(storage))
Logger.err(f'Storage named \"{storage}\" does not exist')
return top
def banner():
Logger.out_err(BANNER)
Logger.out_err("")
"""print banner"""
Logger.stderr_nocolor(BANNER)
Logger.stderr_nocolor("")
def print_supported_formats():
"""print all supported formats to stdout"""
print('"native" : native format')
print('"csv" : CSV format')
print(f' {Noder.CSV_HEADER}')
print('"fzf-native" : fzf to native (only for find)')
print('"fzf-csv" : fzf to csv (only for find)')
def main():
"""entry point"""
args = docopt(USAGE, version=VERSION)
if args['help'] or args['--help']:
@ -248,15 +264,14 @@ def main():
return True
if args['print_supported_formats']:
print('"native": native format')
print('"csv" : CSV format')
print(' {}'.format(Noder.CSV_HEADER))
print_supported_formats()
return True
# check format
fmt = args['--format']
if fmt != 'native' and fmt != 'csv':
Logger.err('bad format: {}'.format(fmt))
if fmt not in FORMATS:
Logger.err(f'bad format: {fmt}')
print_supported_formats()
return False
if args['--verbose']:
@ -268,13 +283,14 @@ def main():
# set colors
if args['--no-color']:
Logger.no_color()
Colors.no_color()
# init noder
noder = Noder(debug=args['--verbose'], sortsize=args['--sortsize'],
arc=args['--archive'])
# init catalog
catalog = Catalog(args['--catalog'], debug=args['--verbose'],
catalog_path = args['--catalog']
catalog = Catalog(catalog_path, debug=args['--verbose'],
force=args['--force'])
# init top node
top = catalog.restore()
@ -286,30 +302,52 @@ def main():
catalog.set_metanode(meta)
# parse command
if args['index']:
cmd_index(args, noder, catalog, top)
if args['update']:
cmd_update(args, noder, catalog, top)
elif args['find']:
cmd_find(args, noder, top)
elif args['tree']:
cmd_tree(args, noder, top)
elif args['ls']:
cmd_ls(args, noder, top)
elif args['rm']:
cmd_rm(args, noder, catalog, top)
elif args['graph']:
cmd_graph(args, noder, top)
elif args['rename']:
cmd_rename(args, noder, catalog, top)
elif args['edit']:
cmd_edit(args, noder, catalog, top)
try:
if args['index']:
cmd_index(args, noder, catalog, top)
if args['update']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_update(args, noder, catalog, top)
elif args['find']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_find(args, noder, top)
elif args['ls']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_ls(args, noder, top)
elif args['rm']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_rm(args, noder, catalog, top)
elif args['graph']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_graph(args, noder, top)
elif args['rename']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_rename(args, catalog, top)
elif args['edit']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_edit(args, noder, catalog, top)
except CatcliException as exc:
Logger.stderr_nocolor('ERROR ' + str(exc))
return False
return True
if __name__ == '__main__':
'''entry point'''
if main():
sys.exit(0)
sys.exit(1)

@ -0,0 +1,37 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
shell colors
"""
class Colors:
"""shell colors"""
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
PURPLE = '\033[1;35m'
BLUE = '\033[94m'
GRAY = '\033[0;37m'
MAGENTA = '\033[95m'
RESET = '\033[0m'
EMPH = '\033[33m'
BOLD = '\033[1m'
UND = '\033[4m'
@classmethod
def no_color(cls):
"""disable colors"""
Colors.RED = ''
Colors.GREEN = ''
Colors.YELLOW = ''
Colors.PURPLE = ''
Colors.BLUE = ''
Colors.GRAY = ''
Colors.MAGENTA = ''
Colors.RESET = ''
Colors.EMPH = ''
Colors.BOLD = ''
Colors.UND = ''

@ -11,6 +11,7 @@ import zipfile
class Decomp:
"""decompressor"""
def __init__(self):
self.ext = {
@ -28,26 +29,28 @@ class Decomp:
'zip': self._zip}
def get_formats(self):
'''return list of supported extensions'''
"""return list of supported extensions"""
return list(self.ext.keys())
def get_names(self, path):
'''get tree of compressed archive'''
"""get tree of compressed archive"""
ext = os.path.splitext(path)[1][1:].lower()
if ext in list(self.ext.keys()):
if ext in list(self.ext):
return self.ext[ext](path)
return None
def _tar(self, path):
'''return list of file names in tar'''
@staticmethod
def _tar(path):
"""return list of file names in tar"""
if not tarfile.is_tarfile(path):
return None
tar = tarfile.open(path, "r")
return tar.getnames()
with tarfile.open(path, "r") as tar:
return tar.getnames()
def _zip(self, path):
'''return list of file names in zip'''
@staticmethod
def _zip(path):
"""return list of file names in zip"""
if not zipfile.is_zipfile(path):
return None
z = zipfile.ZipFile(path)
return z.namelist()
with zipfile.ZipFile(path) as file:
return file.namelist()

@ -0,0 +1,14 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
Catcli exceptions
"""
class CatcliException(Exception):
"""generic catcli exception"""
class BadFormatException(CatcliException):
"""use of bad format"""

@ -7,128 +7,64 @@ Logging helper
import sys
# local imports
from catcli.colors import Colors
from catcli.utils import fix_badchars
class Logger:
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
PURPLE = '\033[1;35m'
BLUE = '\033[94m'
GRAY = '\033[0;37m'
MAGENTA = '\033[95m'
RESET = '\033[0m'
EMPH = '\033[33m'
BOLD = '\033[1m'
UND = '\033[4m'
STORAGE = 'storage'
ARCHIVE = 'archive'
NBFILES = 'nbfiles'
def no_color():
Logger.RED = ''
Logger.GREEN = ''
Logger.YELLOW = ''
Logger.PURPLE = ''
Logger.BLUE = ''
Logger.GRAY = ''
Logger.MAGENTA = ''
Logger.RESET = ''
Logger.EMPH = ''
Logger.BOLD = ''
Logger.UND = ''
def fix_badchars(line):
return line.encode('utf-8', 'ignore').decode('utf-8')
######################################################################
# node specific output
######################################################################
def storage(pre, name, args, attr):
'''print a storage node'''
end = ''
if attr:
end = ' {}({}){}'.format(Logger.GRAY, attr, Logger.RESET)
s = '{}{}{}{}:'.format(pre, Logger.UND, Logger.STORAGE, Logger.RESET)
s += ' {}{}{}{}\n'.format(Logger.PURPLE,
Logger.fix_badchars(name),
Logger.RESET, end)
s += ' {}{}{}'.format(Logger.GRAY, args, Logger.RESET)
sys.stdout.write('{}\n'.format(s))
def file(pre, name, attr):
'''print a file node'''
s = '{}{}'.format(pre, Logger.fix_badchars(name))
s += ' {}[{}]{}'.format(Logger.GRAY, attr, Logger.RESET)
sys.stdout.write('{}\n'.format(s))
def dir(pre, name, depth='', attr=None):
'''print a directory node'''
end = []
if depth != '':
end.append('{}:{}'.format(Logger.NBFILES, depth))
if attr:
end.append(' '.join(['{}:{}'.format(x, y) for x, y in attr]))
if end:
end = ' [{}]'.format(', '.join(end))
s = '{}{}{}{}'.format(pre, Logger.BLUE,
Logger.fix_badchars(name), Logger.RESET)
s += '{}{}{}'.format(Logger.GRAY, end, Logger.RESET)
sys.stdout.write('{}\n'.format(s))
def arc(pre, name, archive):
s = '{}{}{}{}'.format(pre, Logger.YELLOW,
Logger.fix_badchars(name), Logger.RESET)
s += ' {}[{}:{}]{}'.format(Logger.GRAY, Logger.ARCHIVE,
archive, Logger.RESET)
sys.stdout.write('{}\n'.format(s))
######################################################################
# generic output
######################################################################
def out(string):
'''to stdout no color'''
string = Logger.fix_badchars(string)
sys.stdout.write('{}\n'.format(string))
def out_err(string):
'''to stderr no color'''
string = Logger.fix_badchars(string)
sys.stderr.write('{}\n'.format(string))
def debug(string):
'''to stderr no color'''
string = Logger.fix_badchars(string)
sys.stderr.write('[DBG] {}\n'.format(string))
def info(string):
'''to stdout in color'''
string = Logger.fix_badchars(string)
s = '{}{}{}'.format(Logger.MAGENTA, string, Logger.RESET)
sys.stdout.write('{}\n'.format(s))
def err(string):
'''to stderr in RED'''
string = Logger.fix_badchars(string)
s = '{}{}{}'.format(Logger.RED, string, Logger.RESET)
sys.stderr.write('{}\n'.format(s))
def progr(string):
'''print progress'''
string = Logger.fix_badchars(string)
sys.stderr.write('{}\r'.format(string))
class Logger:
"""log to stdout/stderr"""
@classmethod
def stdout_nocolor(cls, string):
"""to stdout no color"""
string = fix_badchars(string)
sys.stdout.write(f'{string}\n')
@classmethod
def stderr_nocolor(cls, string):
"""to stderr no color"""
string = fix_badchars(string)
sys.stderr.write(f'{string}\n')
@classmethod
def debug(cls, string):
"""to stderr no color"""
cls.stderr_nocolor(f'[DBG] {string}\n')
@classmethod
def info(cls, string):
"""to stdout in color"""
string = fix_badchars(string)
out = f'{Colors.MAGENTA}{string}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def err(cls, string):
"""to stderr in RED"""
string = fix_badchars(string)
out = f'{Colors.RED}{string}{Colors.RESET}'
sys.stderr.write(f'{out}\n')
@classmethod
def progr(cls, string):
"""print progress"""
string = fix_badchars(string)
sys.stderr.write(f'{string}\r')
sys.stderr.flush()
def bold(string):
'''make it bold'''
string = Logger.fix_badchars(string)
return '{}{}{}'.format(Logger.BOLD, string, Logger.RESET)
@classmethod
def get_bold_text(cls, string):
"""make it bold"""
string = fix_badchars(string)
return f'{Colors.BOLD}{string}{Colors.RESET}'
def flog(path, string, append=True):
string = Logger.fix_badchars(string)
@classmethod
def log_to_file(cls, path, string, append=True):
"""log to file"""
string = fix_badchars(string)
mode = 'w'
if append:
mode = 'a'
with open(path, mode) as f:
f.write(string)
with open(path, mode, encoding='UTF-8') as file:
file.write(string)

@ -0,0 +1,61 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
Class for printing nodes
"""
import sys
from catcli.colors import Colors
from catcli.utils import fix_badchars
class NodePrinter:
"""a node printer class"""
STORAGE = 'storage'
ARCHIVE = 'archive'
NBFILES = 'nbfiles'
@classmethod
def print_storage_native(cls, pre, name, args, attr):
"""print a storage node"""
end = ''
if attr:
end = f' {Colors.GRAY}({attr}){Colors.RESET}'
out = f'{pre}{Colors.UND}{cls.STORAGE}{Colors.RESET}:'
out += ' ' + Colors.PURPLE + fix_badchars(name) + \
Colors.RESET + end + '\n'
out += f' {Colors.GRAY}{args}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def print_file_native(cls, pre, name, attr):
"""print a file node"""
nobad = fix_badchars(name)
out = f'{pre}{nobad}'
out += f' {Colors.GRAY}[{attr}]{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def print_dir_native(cls, pre, name, depth='', attr=None):
"""print a directory node"""
end = []
if depth != '':
end.append(f'{cls.NBFILES}:{depth}')
if attr:
end.append(' '.join([f'{x}:{y}' for x, y in attr]))
if end:
endstring = ', '.join(end)
end = f' [{endstring}]'
out = pre + Colors.BLUE + fix_badchars(name) + Colors.RESET
out += f'{Colors.GRAY}{end}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def print_archive_native(cls, pre, name, archive):
"""archive to stdout"""
out = pre + Colors.YELLOW + fix_badchars(name) + Colors.RESET
out += f' {Colors.GRAY}[{cls.ARCHIVE}:{archive}]{Colors.RESET}'
sys.stdout.write(f'{out}\n')

@ -6,45 +6,50 @@ Class that represents a node in the catalog tree
"""
import os
import anytree
import shutil
import time
import anytree
from pyfzf.pyfzf import FzfPrompt
# local imports
from . import __version__ as VERSION
import catcli.utils as utils
from catcli.utils import size_to_str, epoch_to_str, md5sum, fix_badchars
from catcli.logger import Logger
from catcli.nodeprinter import NodePrinter
from catcli.decomp import Decomp
from catcli.version import __version__ as VERSION
from catcli.exceptions import CatcliException
'''
There are 4 types of node:
class Noder:
"""
handles node in the catalog tree
There are 4 types of node:
* "top" node representing the top node (generic node)
* "storage" node representing a storage
* "dir" node representing a directory
* "file" node representing a file
'''
"""
NAME_TOP = 'top'
NAME_META = 'meta'
class Noder:
TOPNAME = 'top'
METANAME = 'meta'
TYPE_TOP = 'top'
TYPE_FILE = 'file'
TYPE_DIR = 'dir'
TYPE_ARC = 'arc'
TYPE_STORAGE = 'storage'
TYPE_META = 'meta'
CSV_HEADER = ('name,type,path,size,indexed_at,'
'maccess,md5,nbfiles,free_space,'
'total_space,meta')
def __init__(self, debug=False, sortsize=False, arc=False):
'''
"""
@debug: debug mode
@sortsize: sort nodes by size
@arch: handle archive
'''
"""
self.hash = True
self.debug = debug
self.sortsize = sortsize
@ -52,21 +57,22 @@ class Noder:
if self.arc:
self.decomp = Decomp()
def get_storage_names(self, top):
'''return a list of all storage names'''
@staticmethod
def get_storage_names(top):
"""return a list of all storage names"""
return [x.name for x in list(top.children)]
def get_storage_node(self, top, name, path=None):
'''
"""
return the storage node if any
if path is submitted, it will update the media info
'''
"""
found = None
for n in top.children:
if n.type != self.TYPE_STORAGE:
for node in top.children:
if node.type != self.TYPE_STORAGE:
continue
if n.name == name:
found = n
if node.name == name:
found = node
break
if found and path and os.path.exists(path):
found.free = shutil.disk_usage(path).free
@ -74,24 +80,25 @@ class Noder:
found.ts = int(time.time())
return found
def get_node(self, top, path, quiet=False):
'''get the node by internal tree path'''
r = anytree.resolver.Resolver('name')
@staticmethod
def get_node(top, path, quiet=False):
"""get the node by internal tree path"""
resolv = anytree.resolver.Resolver('name')
try:
p = os.path.basename(path)
return r.get(top, p)
bpath = os.path.basename(path)
return resolv.get(top, bpath)
except anytree.resolver.ChildResolverError:
if not quiet:
Logger.err('No node at path \"{}\"'.format(p))
Logger.err(f'No node at path \"{bpath}\"')
return None
def get_node_if_changed(self, top, path, treepath):
'''
"""
return the node (if any) and if it has changed
@top: top node (storage)
@path: abs path to file
@treepath: rel path from indexed directory
'''
"""
treepath = treepath.lstrip(os.sep)
node = self.get_node(top, treepath, quiet=True)
# node does not exist
@ -109,105 +116,80 @@ class Noder:
# maccess changed
old_maccess = node.maccess
if float(maccess) != float(old_maccess):
self._debug('\tchange: maccess changed for \"{}\"'.format(path))
self._debug(f'\tchange: maccess changed for \"{path}\"')
return node, True
# test hash
if self.hash and node.md5:
md5 = self._get_hash(path)
if md5 != node.md5:
m = '\tchange: checksum changed for \"{}\"'.format(path)
self._debug(m)
if md5 and md5 != node.md5:
msg = f'\tchange: checksum changed for \"{path}\"'
self._debug(msg)
return node, True
self._debug('\tchange: no change for \"{}\"'.format(path))
self._debug(f'\tchange: no change for \"{path}\"')
return node, False
def _rec_size(self, node, store=True):
'''
def rec_size(self, node, store=True):
"""
recursively traverse tree and return size
@store: store the size in the node
'''
"""
if node.type == self.TYPE_FILE:
self._debug('getting node size for \"{}\"'.format(node.name))
self._debug(f'getting node size for \"{node.name}\"')
return node.size
m = 'getting node size recursively for \"{}\"'.format(node.name)
self._debug(m)
msg = f'getting node size recursively for \"{node.name}\"'
self._debug(msg)
size = 0
for i in node.children:
if node.type == self.TYPE_DIR:
sz = self._rec_size(i, store=store)
size = self.rec_size(i, store=store)
if store:
i.size = sz
size += sz
i.size = size
size += size
if node.type == self.TYPE_STORAGE:
sz = self._rec_size(i, store=store)
size = self.rec_size(i, store=store)
if store:
i.size = sz
size += sz
i.size = size
size += size
else:
continue
if store:
node.size = size
return size
def rec_size(self, node):
'''recursively traverse tree and store dir size'''
return self._rec_size(node, store=True)
###############################################################
# public helpers
###############################################################
def format_storage_attr(self, attr):
'''format the storage attr for saving'''
@staticmethod
def format_storage_attr(attr):
"""format the storage attr for saving"""
if not attr:
return ''
if type(attr) is list:
if isinstance(attr, list):
return ', '.join(attr)
attr = attr.rstrip()
return attr
def set_hashing(self, val):
'''hash files when indexing'''
"""hash files when indexing"""
self.hash = val
###############################################################
# node creationg
# node creation
###############################################################
def new_top_node(self):
'''create a new top node'''
return anytree.AnyNode(name=self.TOPNAME, type=self.TYPE_TOP)
def update_metanode(self, top):
'''create or update meta node information'''
meta = self._get_meta_node(top)
epoch = int(time.time())
if not meta:
attr = {}
attr['created'] = epoch
attr['created_version'] = VERSION
meta = anytree.AnyNode(name=self.METANAME, type=self.TYPE_META,
attr=attr)
meta.attr['access'] = epoch
meta.attr['access_version'] = VERSION
return meta
def _get_meta_node(self, top):
'''return the meta node if any'''
try:
return next(filter(lambda x: x.type == self.TYPE_META,
top.children))
except StopIteration:
return None
"""create a new top node"""
return anytree.AnyNode(name=self.NAME_TOP, type=self.TYPE_TOP)
def file_node(self, name, path, parent, storagepath):
'''create a new node representing a file'''
def new_file_node(self, name, path, parent, storagepath):
"""create a new node representing a file"""
if not os.path.exists(path):
Logger.err('File \"{}\" does not exist'.format(path))
Logger.err(f'File \"{path}\" does not exist')
return None
path = os.path.abspath(path)
try:
st = os.lstat(path)
except OSError as e:
Logger.err('OSError: {}'.format(e))
stat = os.lstat(path)
except OSError as exc:
Logger.err(f'OSError: {exc}')
return None
md5 = None
if self.hash:
@ -215,42 +197,92 @@ class Noder:
relpath = os.sep.join([storagepath, name])
maccess = os.path.getmtime(path)
n = self._node(name, self.TYPE_FILE, relpath, parent,
size=st.st_size, md5=md5, maccess=maccess)
node = self._new_generic_node(name, self.TYPE_FILE, relpath, parent,
size=stat.st_size, md5=md5,
maccess=maccess)
if self.arc:
ext = os.path.splitext(path)[1][1:]
if ext.lower() in self.decomp.get_formats():
self._debug('{} is an archive'.format(path))
self._debug(f'{path} is an archive')
names = self.decomp.get_names(path)
self.list_to_tree(n, names)
self.list_to_tree(node, names)
else:
self._debug('{} is NOT an archive'.format(path))
return n
self._debug(f'{path} is NOT an archive')
return node
def dir_node(self, name, path, parent, storagepath):
'''create a new node representing a directory'''
def new_dir_node(self, name, path, parent, storagepath):
"""create a new node representing a directory"""
path = os.path.abspath(path)
relpath = os.sep.join([storagepath, name])
maccess = os.path.getmtime(path)
return self._node(name, self.TYPE_DIR, relpath,
parent, maccess=maccess)
return self._new_generic_node(name, self.TYPE_DIR, relpath,
parent, maccess=maccess)
def new_storage_node(self, name, path, parent, attr=None):
"""create a new node representing a storage"""
path = os.path.abspath(path)
free = shutil.disk_usage(path).free
total = shutil.disk_usage(path).total
epoch = int(time.time())
return anytree.AnyNode(name=name, type=self.TYPE_STORAGE, free=free,
total=total, parent=parent, attr=attr, ts=epoch)
def new_archive_node(self, name, path, parent, archive):
"""create a new node for archive data"""
return anytree.AnyNode(name=name, type=self.TYPE_ARC, relpath=path,
parent=parent, size=0, md5=None,
archive=archive)
@staticmethod
def _new_generic_node(name, nodetype, relpath, parent,
size=None, md5=None, maccess=None):
"""generic node creation"""
return anytree.AnyNode(name=name, type=nodetype, relpath=relpath,
parent=parent, size=size,
md5=md5, maccess=maccess)
###############################################################
# node management
###############################################################
def update_metanode(self, top):
"""create or update meta node information"""
meta = self._get_meta_node(top)
epoch = int(time.time())
if not meta:
attr = {}
attr['created'] = epoch
attr['created_version'] = VERSION
meta = anytree.AnyNode(name=self.NAME_META, type=self.TYPE_META,
attr=attr)
meta.attr['access'] = epoch
meta.attr['access_version'] = VERSION
return meta
def _get_meta_node(self, top):
"""return the meta node if any"""
try:
return next(filter(lambda x: x.type == self.TYPE_META,
top.children))
except StopIteration:
return None
def clean_not_flagged(self, top):
'''remove any node not flagged and clean flags'''
"""remove any node not flagged and clean flags"""
cnt = 0
for node in anytree.PreOrderIter(top):
if node.type != self.TYPE_FILE and node.type != self.TYPE_DIR:
if node.type not in [self.TYPE_FILE, self.TYPE_DIR]:
continue
if self._clean(node):
cnt += 1
return cnt
def flag(self, node):
'''flag a node'''
@staticmethod
def flag(node):
"""flag a node"""
node.flag = True
def _clean(self, node):
'''remove node if not flagged'''
"""remove node if not flagged"""
if not self._has_attr(node, 'flag') or \
not node.flag:
node.parent = None
@ -258,42 +290,20 @@ class Noder:
del node.flag
return False
def storage_node(self, name, path, parent, attr=None):
'''create a new node representing a storage'''
path = os.path.abspath(path)
free = shutil.disk_usage(path).free
total = shutil.disk_usage(path).total
epoch = int(time.time())
return anytree.AnyNode(name=name, type=self.TYPE_STORAGE, free=free,
total=total, parent=parent, attr=attr, ts=epoch)
def archive_node(self, name, path, parent, archive):
'''crete a new node for archive data'''
return anytree.AnyNode(name=name, type=self.TYPE_ARC, relpath=path,
parent=parent, size=0, md5=None,
archive=archive)
def _node(self, name, type, relpath, parent,
size=None, md5=None, maccess=None):
'''generic node creation'''
return anytree.AnyNode(name=name, type=type, relpath=relpath,
parent=parent, size=size,
md5=md5, maccess=maccess)
###############################################################
# printing
###############################################################
def _node_to_csv(self, node, sep=',', raw=False):
'''
"""
print a node to csv
@node: the node to consider
@sep: CSV separator character
@raw: print raw size rather than human readable
'''
"""
if not node:
return ''
return
if node.type == self.TYPE_TOP:
return ''
return
out = []
if node.type == self.TYPE_STORAGE:
@ -301,16 +311,16 @@ class Noder:
out.append(node.name) # name
out.append(node.type) # type
out.append('') # fake full path
sz = self._rec_size(node, store=False)
out.append(utils.size_to_str(sz, raw=raw)) # size
out.append(utils.epoch_to_str(node.ts)) # indexed_at
size = self.rec_size(node, store=False)
out.append(size_to_str(size, raw=raw)) # size
out.append(epoch_to_str(node.ts)) # indexed_at
out.append('') # fake maccess
out.append('') # fake md5
out.append(str(len(node.children))) # nbfiles
# fake free_space
out.append(utils.size_to_str(node.free, raw=raw))
out.append(size_to_str(node.free, raw=raw))
# fake total_space
out.append(utils.size_to_str(node.total, raw=raw))
out.append(size_to_str(node.total, raw=raw))
out.append(node.attr) # meta
else:
# handle other nodes
@ -321,10 +331,10 @@ class Noder:
fullpath = os.path.join(storage.name, parents)
out.append(fullpath.replace('"', '""')) # full path
out.append(utils.size_to_str(node.size, raw=raw)) # size
out.append(utils.epoch_to_str(storage.ts)) # indexed_at
out.append(size_to_str(node.size, raw=raw)) # size
out.append(epoch_to_str(storage.ts)) # indexed_at
if self._has_attr(node, 'maccess'):
out.append(utils.epoch_to_str(node.maccess)) # maccess
out.append(epoch_to_str(node.maccess)) # maccess
else:
out.append('') # fake maccess
if node.md5:
@ -341,12 +351,12 @@ class Noder:
line = sep.join(['"' + o + '"' for o in out])
if len(line) > 0:
Logger.out(line)
Logger.stdout_nocolor(line)
def _print_node(self, node, pre='', withpath=False,
withdepth=False, withstorage=False,
recalcparent=False, raw=False):
'''
def _print_node_native(self, node, pre='', withpath=False,
withdepth=False, withstorage=False,
recalcparent=False, raw=False):
"""
print a node
@node: the node to print
@pre: string to print before node
@ -355,10 +365,10 @@ class Noder:
@withstorage: print the node storage it belongs to
@recalcparent: get relpath from tree instead of relpath field
@raw: print raw size rather than human readable
'''
"""
if node.type == self.TYPE_TOP:
# top node
Logger.out('{}{}'.format(pre, node.name))
Logger.stdout_nocolor(f'{pre}{node.name}')
elif node.type == self.TYPE_FILE:
# node of type file
name = node.name
@ -372,12 +382,13 @@ class Noder:
storage = self._get_storage(node)
attr = ''
if node.md5:
attr = ', md5:{}'.format(node.md5)
sz = utils.size_to_str(node.size, raw=raw)
compl = 'size:{}{}'.format(sz, attr)
attr = f', md5:{node.md5}'
size = size_to_str(node.size, raw=raw)
compl = f'size:{size}{attr}'
if withstorage:
compl += ', storage:{}'.format(Logger.bold(storage.name))
Logger.file(pre, name, compl)
content = Logger.get_bold_text(storage.name)
compl += f', storage:{content}'
NodePrinter.print_file_native(pre, name, compl)
elif node.type == self.TYPE_DIR:
# node of type directory
name = node.name
@ -394,154 +405,243 @@ class Noder:
storage = self._get_storage(node)
attr = []
if node.size:
attr.append(['totsize', utils.size_to_str(node.size, raw=raw)])
attr.append(['totsize', size_to_str(node.size, raw=raw)])
if withstorage:
attr.append(['storage', Logger.bold(storage.name)])
Logger.dir(pre, name, depth=depth, attr=attr)
attr.append(['storage', Logger.get_bold_text(storage.name)])
NodePrinter.print_dir_native(pre, name, depth=depth, attr=attr)
elif node.type == self.TYPE_STORAGE:
# node of type storage
hf = utils.size_to_str(node.free, raw=raw)
ht = utils.size_to_str(node.total, raw=raw)
sztotal = size_to_str(node.total, raw=raw)
szused = size_to_str(node.total - node.free, raw=raw)
nbchildren = len(node.children)
freepercent = '{:.1f}%'.format(
node.free * 100 / node.total
)
pcent = node.free * 100 / node.total
freepercent = f'{pcent:.1f}%'
# get the date
dt = ''
timestamp = ''
if self._has_attr(node, 'ts'):
dt = 'date:'
dt += '{}'.format(utils.epoch_to_str(node.ts))
ds = ''
timestamp = 'date:'
timestamp += epoch_to_str(node.ts)
disksize = ''
# the children size
sz = self._rec_size(node, store=False)
sz = utils.size_to_str(sz, raw=raw)
ds = 'totsize:' + '{}'.format(sz)
size = self.rec_size(node, store=False)
size = size_to_str(size, raw=raw)
disksize = 'totsize:' + f'{size}'
# format the output
name = '{}'.format(node.name)
name = node.name
args = [
'nbfiles:' + '{}'.format(nbchildren),
ds,
'free:{}'.format(freepercent),
'du:' + '{}/{}'.format(hf, ht),
dt]
Logger.storage(pre,
name,
'{}'.format(' | '.join(args)),
node.attr)
'nbfiles:' + f'{nbchildren}',
disksize,
f'free:{freepercent}',
'du:' + f'{szused}/{sztotal}',
timestamp]
argsstring = ' | '.join(args)
NodePrinter.print_storage_native(pre,
name,
argsstring,
node.attr)
elif node.type == self.TYPE_ARC:
# archive node
if self.arc:
Logger.arc(pre, node.name, node.archive)
NodePrinter.print_archive_native(pre, node.name, node.archive)
else:
Logger.err('bad node encountered: {}'.format(node))
Logger.err(f'bad node encountered: {node}')
def print_tree(self, node, style=anytree.ContRoundStyle(),
fmt='native', header=False, raw=False):
'''
print the tree similar to unix tool "tree"
def print_tree(self, node,
fmt='native',
raw=False):
"""
print the tree in different format
@node: start node
@style: when fmt=native, defines the tree style
@fmt: output format
@header: when fmt=csv, print the header
@raw: print the raw size rather than human readable
'''
"""
if fmt == 'native':
# "tree" style
rend = anytree.RenderTree(node, childiter=self._sort_tree)
for pre, fill, node in rend:
self._print_node(node, pre=pre, withdepth=True, raw=raw)
for pre, _, thenode in rend:
self._print_node_native(thenode, pre=pre,
withdepth=True, raw=raw)
elif fmt == 'csv':
self._to_csv(node, with_header=header, raw=raw)
def _to_csv(self, node, with_header=False, raw=False):
'''print the tree to csv'''
# csv output
self._to_csv(node, raw=raw)
elif fmt == 'csv-with-header':
# csv output
Logger.stdout_nocolor(self.CSV_HEADER)
self._to_csv(node, raw=raw)
def _to_csv(self, node, raw=False):
"""print the tree to csv"""
rend = anytree.RenderTree(node, childiter=self._sort_tree)
if with_header:
Logger.out(self.CSV_HEADER)
for _, _, node in rend:
self._node_to_csv(node, raw=raw)
for _, _, item in rend:
self._node_to_csv(item, raw=raw)
@staticmethod
def _fzf_prompt(strings):
# prompt with fzf
fzf = FzfPrompt()
selected = fzf.prompt(strings)
return selected
def _to_fzf(self, node, fmt):
"""
fzf prompt with list and print selected node(s)
@node: node to start with
@fmt: output format for selected nodes
"""
rendered = anytree.RenderTree(node, childiter=self._sort_tree)
nodes = {}
# construct node names list
for _, _, rend in rendered:
if not rend:
continue
parents = self._get_parents(rend)
storage = self._get_storage(rend)
fullpath = os.path.join(storage.name, parents)
nodes[fullpath] = rend
# prompt with fzf
paths = self._fzf_prompt(nodes.keys())
# print the resulting tree
subfmt = fmt.replace('fzf-', '')
for path in paths:
if not path:
continue
if path not in nodes:
continue
rend = nodes[path]
self.print_tree(rend, fmt=subfmt)
def to_dot(self, node, path='tree.dot'):
'''export to dot for graphing'''
@staticmethod
def to_dot(node, path='tree.dot'):
"""export to dot for graphing"""
anytree.exporter.DotExporter(node).to_dotfile(path)
Logger.info('dot file created under \"{}\"'.format(path))
return 'dot {} -T png -o /tmp/tree.png'.format(path)
Logger.info(f'dot file created under \"{path}\"')
return f'dot {path} -T png -o /tmp/tree.png'
###############################################################
# searching
###############################################################
def find_name(self, root, key,
script=False, directory=False,
def find_name(self, top, key,
script=False, only_dir=False,
startpath=None, parentfromtree=False,
fmt='native', raw=False):
'''
"""
find files based on their names
@top: top node
@key: term to search for
@script: output script
@directory: only search for directories
@startpath: node to start with
@parentfromtree: get path from parent instead of stored relpath
@fmt: output format
'''
self._debug('searching for \"{}\"'.format(key))
start = root
if startpath:
start = self.get_node(root, startpath)
self.term = key
found = anytree.findall(start, filter_=self._find_name)
paths = []
for f in found:
if f.type == self.TYPE_STORAGE:
# ignore storage nodes
continue
if directory and f.type != self.TYPE_DIR:
# ignore non directory
continue
# print the node
if fmt == 'native':
self._print_node(f, withpath=True,
withdepth=True,
withstorage=True,
recalcparent=parentfromtree,
raw=raw)
elif fmt == 'csv':
self._node_to_csv(f, raw=raw)
@raw: raw size output
returns the found nodes
"""
self._debug(f'searching for \"{key}\"')
# search for nodes based on path
start = top
if startpath:
start = self.get_node(top, startpath)
filterfunc = self._callback_find_name(key, only_dir)
found = anytree.findall(start, filter_=filterfunc)
nbfound = len(found)
self._debug(f'found {nbfound} node(s)')
# compile found nodes
paths = {}
for item in found:
item = self._sanitize(item)
if parentfromtree:
paths.append(self._get_parents(f))
paths[self._get_parents(item)] = item
else:
paths.append(f.relpath)
paths[item.relpath] = item
# handle fzf mode
if fmt.startswith('fzf'):
selected = self._fzf_prompt(paths.keys())
newpaths = {}
subfmt = fmt.replace('fzf-', '')
for item in selected:
if item not in paths:
continue
newpaths[item] = paths[item]
self.print_tree(newpaths[item], fmt=subfmt)
paths = newpaths
else:
if fmt == 'native':
for _, item in paths.items():
self._print_node_native(item, withpath=True,
withdepth=True,
withstorage=True,
recalcparent=parentfromtree,
raw=raw)
elif fmt.startswith('csv'):
if fmt == 'csv-with-header':
Logger.stdout_nocolor(self.CSV_HEADER)
for _, item in paths.items():
self._node_to_csv(item, raw=raw)
# execute script if any
if script:
tmp = ['${source}/' + x for x in paths]
cmd = 'op=file; source=/media/mnt; $op {}'.format(' '.join(tmp))
tmpstr = ' '.join(tmp)
cmd = f'op=file; source=/media/mnt; $op {tmpstr}'
Logger.info(cmd)
return found
return list(paths.values())
def _find_name(self, node):
'''callback for finding files'''
if self.term.lower() in node.name.lower():
return True
return False
def _callback_find_name(self, term, only_dir):
"""callback for finding files"""
def find_name(node):
if node.type == self.TYPE_STORAGE:
# ignore storage nodes
return False
if node.type == self.TYPE_TOP:
# ignore top nodes
return False
if node.type == self.TYPE_META:
# ignore meta nodes
return False
if only_dir and node.type != self.TYPE_DIR:
# ignore non directory
return False
# filter
if not term:
return True
if term.lower() in node.name.lower():
return True
# ignore
return False
return find_name
###############################################################
# climbing
# ls
###############################################################
def walk(self, root, path, rec=False, fmt='native', raw=False):
'''
walk the tree for ls based on names
@root: start node
def list(self, top, path,
rec=False,
fmt='native',
raw=False):
"""
list nodes for "ls"
@top: top node
@path: path to search for
@rec: recursive walk
@fmt: output format
'''
self._debug('walking path: \"{}\"'.format(path))
@raw: print raw size
"""
self._debug(f'walking path: \"{path}\" from {top}')
r = anytree.resolver.Resolver('name')
resolv = anytree.resolver.Resolver('name')
found = []
try:
found = r.glob(root, path)
# resolve the path in the tree
found = resolv.glob(top, path)
if len(found) < 1:
# nothing found
self._debug('nothing found')
return []
if rec:
@ -554,20 +654,28 @@ class Noder:
# print the parent
if fmt == 'native':
self._print_node(found[0].parent,
withpath=False, withdepth=True, raw=raw)
elif fmt == 'csv':
self._print_node_native(found[0].parent,
withpath=False,
withdepth=True,
raw=raw)
elif fmt.startswith('csv'):
self._node_to_csv(found[0].parent, raw=raw)
elif fmt.startswith('fzf'):
pass
# print all found nodes
for f in found:
if fmt == 'csv-with-header':
Logger.stdout_nocolor(self.CSV_HEADER)
for item in found:
if fmt == 'native':
self._print_node(f, withpath=False,
pre='- ',
withdepth=True,
raw=raw)
elif fmt == 'csv':
self._node_to_csv(f, raw=raw)
self._print_node_native(item, withpath=False,
pre='- ',
withdepth=True,
raw=raw)
elif fmt.startswith('csv'):
self._node_to_csv(item, raw=raw)
elif fmt.startswith('fzf'):
self._to_fzf(item, fmt)
except anytree.resolver.ChildResolverError:
pass
@ -577,78 +685,96 @@ class Noder:
# tree creation
###############################################################
def _add_entry(self, name, top, resolv):
'''add an entry to the tree'''
"""add an entry to the tree"""
entries = name.rstrip(os.sep).split(os.sep)
if len(entries) == 1:
self.archive_node(name, name, top, top.name)
self.new_archive_node(name, name, top, top.name)
return
sub = os.sep.join(entries[:-1])
f = entries[-1]
nodename = entries[-1]
try:
parent = resolv.get(top, sub)
parent = self.archive_node(f, name, parent, top.name)
parent = self.new_archive_node(nodename, name, parent, top.name)
except anytree.resolver.ChildResolverError:
self.archive_node(f, name, top, top.name)
self.new_archive_node(nodename, name, top, top.name)
def list_to_tree(self, parent, names):
'''convert list of files to a tree'''
"""convert list of files to a tree"""
if not names:
return
r = anytree.resolver.Resolver('name')
resolv = anytree.resolver.Resolver('name')
for name in names:
name = name.rstrip(os.sep)
self._add_entry(name, parent, r)
self._add_entry(name, parent, resolv)
###############################################################
# diverse
###############################################################
def _sort_tree(self, items):
'''sorting a list of items'''
"""sorting a list of items"""
return sorted(items, key=self._sort, reverse=self.sortsize)
def _sort(self, x):
'''sort a list'''
def _sort(self, lst):
"""sort a list"""
if self.sortsize:
return self._sort_size(x)
return self._sort_fs(x)
return self._sort_size(lst)
return self._sort_fs(lst)
def _sort_fs(self, n):
'''sorting nodes dir first and alpha'''
return (n.type, n.name.lstrip('\.').lower())
@staticmethod
def _sort_fs(node):
"""sorting nodes dir first and alpha"""
return (node.type, node.name.lstrip('.').lower())
def _sort_size(self, n):
'''sorting nodes by size'''
@staticmethod
def _sort_size(node):
"""sorting nodes by size"""
try:
if not n.size:
if not node.size:
return 0
return n.size
return node.size
except AttributeError:
return 0
def _get_storage(self, node):
'''recursively traverse up to find storage'''
"""recursively traverse up to find storage"""
if node.type == self.TYPE_STORAGE:
return node
return node.ancestors[1]
def _has_attr(self, node, attr):
@staticmethod
def _has_attr(node, attr):
"""return True if node has attr as attribute"""
return attr in node.__dict__.keys()
def _get_parents(self, node):
'''get all parents recursively'''
"""get all parents recursively"""
if node.type == self.TYPE_STORAGE:
return ''
if node.type == self.TYPE_TOP:
return ''
parent = self._get_parents(node.parent)
if parent:
return os.sep.join([parent, node.name])
return node.name
def _get_hash(self, path):
@staticmethod
def _get_hash(path):
"""return md5 hash of node"""
return utils.md5sum(path)
try:
return md5sum(path)
except CatcliException as exc:
Logger.err(str(exc))
return None
@staticmethod
def _sanitize(node):
"""sanitize node strings"""
node.name = fix_badchars(node.name)
node.relpath = fix_badchars(node.relpath)
return node
def _debug(self, string):
'''print debug'''
"""print debug"""
if not self.debug:
return
Logger.debug(string)

@ -12,67 +12,75 @@ import subprocess
import datetime
# local imports
from catcli.logger import Logger
from catcli.exceptions import CatcliException
def md5sum(path):
'''calculate md5 sum of a file'''
p = os.path.realpath(path)
if not os.path.exists(p):
Logger.err('\nmd5sum - file does not exist: {}'.format(p))
return None
"""
calculate md5 sum of a file
may raise exception
"""
rpath = os.path.realpath(path)
if not os.path.exists(rpath):
raise CatcliException(f'md5sum - file does not exist: {rpath}')
try:
with open(p, mode='rb') as f:
d = hashlib.md5()
with open(rpath, mode='rb') as file:
hashv = hashlib.md5()
while True:
buf = f.read(4096)
buf = file.read(4096)
if not buf:
break
d.update(buf)
return d.hexdigest()
hashv.update(buf)
return hashv.hexdigest()
except PermissionError:
pass
except OSError as e:
Logger.err('md5sum error: {}'.format(e))
except OSError as exc:
raise CatcliException(f'md5sum error: {exc}') from exc
return None
def size_to_str(size, raw=True):
'''convert size to string, optionally human readable'''
"""convert size to string, optionally human readable"""
div = 1024.
suf = ['B', 'K', 'M', 'G', 'T', 'P']
if raw or size < div:
return '{}'.format(size)
return f'{size}'
for i in suf:
if size < div:
return '{:.1f}{}'.format(size, i)
return f'{size:.1f}{i}'
size = size / div
return '{:.1f}{}'.format(size, suf[-1])
sufix = suf[-1]
return f'{size:.1f}{sufix}'
def epoch_to_str(epoch):
'''convert epoch to string'''
"""convert epoch to string"""
if not epoch:
return ''
fmt = '%Y-%m-%d %H:%M:%S'
t = datetime.datetime.fromtimestamp(float(epoch))
return t.strftime(fmt)
timestamp = datetime.datetime.fromtimestamp(float(epoch))
return timestamp.strftime(fmt)
def ask(question):
'''ask the user what to do'''
resp = input('{} [y|N] ? '.format(question))
"""ask the user what to do"""
resp = input(f'{question} [y|N] ? ')
return resp.lower() == 'y'
def edit(string):
'''edit the information with the default EDITOR'''
"""edit the information with the default EDITOR"""
string = string.encode('utf-8')
EDITOR = os.environ.get('EDITOR', 'vim')
with tempfile.NamedTemporaryFile(prefix='catcli', suffix='.tmp') as f:
f.write(string)
f.flush()
subprocess.call([EDITOR, f.name])
f.seek(0)
new = f.read()
editor = os.environ.get('EDITOR', 'vim')
with tempfile.NamedTemporaryFile(prefix='catcli', suffix='.tmp') as file:
file.write(string)
file.flush()
subprocess.call([editor, file.name])
file.seek(0)
new = file.read()
return new.decode('utf-8')
def fix_badchars(string):
"""fix none utf-8 chars in string"""
return string.encode('utf-8', 'ignore').decode('utf-8')

@ -0,0 +1,6 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
"""
__version__ = '0.8.7'

@ -12,61 +12,62 @@ from catcli.logger import Logger
class Walker:
"""a filesystem walker"""
MAXLINE = 80 - 15
MAXLINELEN = 80 - 15
def __init__(self, noder, hash=True, debug=False,
def __init__(self, noder, usehash=True, debug=False,
logpath=None):
'''
"""
@noder: the noder to use
@hash: calculate hash of nodes
@debug: debug mode
@logpath: path where to log catalog changes on reindex
'''
"""
self.noder = noder
self.hash = hash
self.noder.set_hashing(self.hash)
self.usehash = usehash
self.noder.set_hashing(self.usehash)
self.debug = debug
self.lpath = logpath
def index(self, path, parent, name, storagepath=''):
'''
"""
index a directory and store in tree
@path: path to index
@parent: parent node
@name: this stoarge name
'''
self._debug('indexing starting at {}'.format(path))
"""
self._debug(f'indexing starting at {path}')
if not parent:
parent = self.noder.dir_node(name, path, parent)
parent = self.noder.new_dir_node(name, path, parent)
if os.path.islink(path):
rel = os.readlink(path)
ab = os.path.join(path, rel)
if os.path.isdir(ab):
abspath = os.path.join(path, rel)
if os.path.isdir(abspath):
return parent, 0
cnt = 0
for (root, dirs, files) in os.walk(path):
for f in files:
self._debug('found file {} under {}'.format(f, path))
sub = os.path.join(root, f)
for file in files:
self._debug(f'found file {file} under {path}')
sub = os.path.join(root, file)
if not os.path.exists(sub):
continue
self._progress(f)
self._debug('index file {}'.format(sub))
n = self.noder.file_node(os.path.basename(f), sub,
parent, storagepath)
if n:
self._progress(file)
self._debug(f'index file {sub}')
node = self.noder.new_file_node(os.path.basename(file), sub,
parent, storagepath)
if node:
cnt += 1
for d in dirs:
self._debug('found dir {} under {}'.format(d, path))
base = os.path.basename(d)
sub = os.path.join(root, d)
self._debug('index directory {}'.format(sub))
for adir in dirs:
self._debug(f'found dir {adir} under {path}')
base = os.path.basename(adir)
sub = os.path.join(root, adir)
self._debug(f'index directory {sub}')
if not os.path.exists(sub):
continue
dummy = self.noder.dir_node(base, sub, parent, storagepath)
dummy = self.noder.new_dir_node(base, sub, parent, storagepath)
if not dummy:
continue
cnt += 1
@ -80,47 +81,48 @@ class Walker:
return parent, cnt
def reindex(self, path, parent, top):
'''reindex a directory and store in tree'''
"""reindex a directory and store in tree"""
cnt = self._reindex(path, parent, top)
cnt += self.noder.clean_not_flagged(parent)
return cnt
def _reindex(self, path, parent, top, storagepath=''):
'''
"""
reindex a directory and store in tree
@path: directory path to re-index
@top: top node (storage)
@storagepath: rel path relative to indexed directory
'''
self._debug('reindexing starting at {}'.format(path))
"""
self._debug(f'reindexing starting at {path}')
cnt = 0
for (root, dirs, files) in os.walk(path):
for f in files:
self._debug('found file \"{}\" under {}'.format(f, path))
sub = os.path.join(root, f)
treepath = os.path.join(storagepath, f)
reindex, n = self._need_reindex(parent, sub, treepath)
for file in files:
self._debug(f'found file \"{file}\" under {path}')
sub = os.path.join(root, file)
treepath = os.path.join(storagepath, file)
reindex, node = self._need_reindex(parent, sub, treepath)
if not reindex:
self._debug('\tskip file {}'.format(sub))
self.noder.flag(n)
self._debug(f'\tskip file {sub}')
self.noder.flag(node)
continue
self._log2file('update catalog for \"{}\"'.format(sub))
n = self.noder.file_node(os.path.basename(f), sub,
parent, storagepath)
self.noder.flag(n)
self._log2file(f'update catalog for \"{sub}\"')
node = self.noder.new_file_node(os.path.basename(file), sub,
parent, storagepath)
self.noder.flag(node)
cnt += 1
for d in dirs:
self._debug('found dir \"{}\" under {}'.format(d, path))
base = os.path.basename(d)
sub = os.path.join(root, d)
treepath = os.path.join(storagepath, d)
for adir in dirs:
self._debug(f'found dir \"{adir}\" under {path}')
base = os.path.basename(adir)
sub = os.path.join(root, adir)
treepath = os.path.join(storagepath, adir)
reindex, dummy = self._need_reindex(parent, sub, treepath)
if reindex:
self._log2file('update catalog for \"{}\"'.format(sub))
dummy = self.noder.dir_node(base, sub, parent, storagepath)
self._log2file(f'update catalog for \"{sub}\"')
dummy = self.noder.new_dir_node(base, sub,
parent, storagepath)
cnt += 1
self.noder.flag(dummy)
self._debug('reindexing deeper under {}'.format(sub))
self._debug(f'reindexing deeper under {sub}')
nstoragepath = os.sep.join([storagepath, base])
if not storagepath:
nstoragepath = base
@ -130,48 +132,48 @@ class Walker:
return cnt
def _need_reindex(self, top, path, treepath):
'''
"""
test if node needs re-indexing
@top: top node (storage)
@path: abs path to file
@treepath: rel path from indexed directory
'''
"""
cnode, changed = self.noder.get_node_if_changed(top, path, treepath)
if not cnode:
self._debug('\t{} does not exist'.format(path))
self._debug(f'\t{path} does not exist')
return True, cnode
if cnode and not changed:
# ignore this node
self._debug('\t{} has not changed'.format(path))
self._debug(f'\t{path} has not changed')
return False, cnode
if cnode and changed:
# remove this node and re-add
self._debug('\t{} has changed'.format(path))
self._debug('\tremoving node {} for {}'.format(cnode.name, path))
self._debug(f'\t{path} has changed')
self._debug(f'\tremoving node {cnode.name} for {path}')
cnode.parent = None
return True, cnode
def _debug(self, string):
'''print to debug'''
"""print to debug"""
if not self.debug:
return
Logger.debug(string)
def _progress(self, string):
'''print progress'''
"""print progress"""
if self.debug:
return
if not string:
# clean
Logger.progr('{:80}'.format(' '))
Logger.progr(' ' * 80)
return
if len(string) > self.MAXLINE:
string = string[:self.MAXLINE] + '...'
Logger.progr('indexing: {:80}'.format(string))
if len(string) > self.MAXLINELEN:
string = string[:self.MAXLINELEN] + '...'
Logger.progr(f'indexing: {string:80}')
def _log2file(self, string):
'''log to file'''
"""log to file"""
if not self.lpath:
return
line = '{}\n'.format(string)
Logger.flog(self.lpath, line, append=True)
line = f'{string}\n'
Logger.log_to_file(self.lpath, line, append=True)

@ -1,2 +1,3 @@
docopt; python_version >= '3.0'
anytree; python_version >= '3.0'
pyfzf; python_version >= '3.0'

@ -31,10 +31,11 @@ setup(
python_requires=REQUIRES_PYTHON,
classifiers=[
'Development Status :: 5 - Production/Stable',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'License :: OSI Approved :: GNU General Public License v3 (GPLv3)',
],

@ -1,6 +1,6 @@
pycodestyle; python_version >= '3.0'
pyflakes; python_version >= '3.0'
#nose-py3; python_version >= '3.0'
nose; python_version >= '3.0'
nose2; python_version >= '3.0'
coverage; python_version >= '3.0'
coveralls; python_version >= '3.0'
pylint; python_version > '3.0'

@ -7,16 +7,38 @@ cur=$(dirname "$(readlink -f "${0}")")
# stop on first error
set -ev
pycodestyle --version
pycodestyle --ignore=W605 catcli/
pycodestyle tests/
pyflakes --version
pyflakes catcli/
pyflakes tests/
nosebin="nosetests"
PYTHONPATH=catcli ${nosebin} -s --with-coverage --cover-package=catcli
#PYTHONPATH=catcli ${nosebin} -s
# R0914: Too many local variables
# R0913: Too many arguments
# R0912: Too many branches
# R0915: Too many statements
# R0911: Too many return statements
# R0903: Too few public methods
pylint --version
pylint \
--disable=R0914 \
--disable=R0913 \
--disable=R0912 \
--disable=R0915 \
--disable=R0911 \
--disable=R0903 \
catcli/
pylint \
--disable=W0212 \
--disable=R0914 \
--disable=R0915 \
--disable=R0801 \
tests/
nosebin="nose2"
PYTHONPATH=catcli ${nosebin} --with-coverage --coverage=catcli
for t in ${cur}/tests-ng/*; do
echo "running test \"`basename ${t}`\""

@ -21,32 +21,32 @@ TMPSUFFIX = '.catcli'
def get_rnd_string(length):
'''Get a random string of specific length '''
"""Get a random string of specific length """
alpha = string.ascii_uppercase + string.digits
return ''.join(random.choice(alpha) for _ in range(length))
def md5sum(path):
'''calculate md5 sum of a file'''
p = os.path.realpath(path)
if not os.path.exists(p):
"""calculate md5 sum of a file"""
rpath = os.path.realpath(path)
if not os.path.exists(rpath):
return None
try:
with open(p, mode='rb') as f:
d = hashlib.md5()
with open(rpath, mode='rb') as file:
val = hashlib.md5()
while True:
buf = f.read(4096)
buf = file.read(4096)
if not buf:
break
d.update(buf)
return d.hexdigest()
val.update(buf)
return val.hexdigest()
except PermissionError:
pass
return None
def clean(path):
'''Delete file or folder.'''
"""Delete file or folder."""
if not os.path.exists(path):
return
if os.path.islink(path):
@ -58,10 +58,12 @@ def clean(path):
def edit_file(path, newcontent):
"""edit file content"""
return write_to_file(path, newcontent)
def unix_tree(path):
"""print using unix tree tool"""
if not os.path.exists(path):
return
# cmd = ['tree', path]
@ -75,7 +77,7 @@ def unix_tree(path):
def create_tree():
''' create a random tree of files and directories '''
""" create a random tree of files and directories """
dirpath = get_tempdir()
# create 3 files
create_rnd_file(dirpath, get_rnd_string(5))
@ -83,13 +85,13 @@ def create_tree():
create_rnd_file(dirpath, get_rnd_string(5))
# create 2 directories
d1 = create_dir(dirpath, get_rnd_string(3))
d2 = create_dir(dirpath, get_rnd_string(3))
dir1 = create_dir(dirpath, get_rnd_string(3))
dir2 = create_dir(dirpath, get_rnd_string(3))
# fill directories
create_rnd_file(d1, get_rnd_string(4))
create_rnd_file(d1, get_rnd_string(4))
create_rnd_file(d2, get_rnd_string(6))
create_rnd_file(dir1, get_rnd_string(4))
create_rnd_file(dir1, get_rnd_string(4))
create_rnd_file(dir2, get_rnd_string(6))
return dirpath
@ -99,12 +101,12 @@ def create_tree():
def get_tempdir():
'''Get a temporary directory '''
"""Get a temporary directory """
return tempfile.mkdtemp(suffix=TMPSUFFIX)
def create_dir(path, dirname):
'''Create a directory '''
"""Create a directory """
fpath = os.path.join(path, dirname)
if not os.path.exists(fpath):
os.mkdir(fpath)
@ -112,7 +114,7 @@ def create_dir(path, dirname):
def create_rnd_file(path, filename, content=None):
'''Create the file filename in path with random content if None '''
"""Create the file filename in path with random content if None """
if not content:
content = get_rnd_string(100)
fpath = os.path.join(path, filename)
@ -120,23 +122,25 @@ def create_rnd_file(path, filename, content=None):
def write_to_file(path, content):
with open(path, 'w') as f:
f.write(content)
"""write content to file"""
with open(path, 'w', encoding='utf-8') as file:
file.write(content)
return path
def read_from_file(path):
"""read file content"""
if not os.path.exists(path):
return ''
with open(path, 'r') as f:
content = f.read()
with open(path, 'r', encoding='utf-8') as file:
content = file.read()
return content
############################################################
# fake tree in json
############################################################
FAKECATALOG = '''
FAKECATALOG = """
{
"children": [
{
@ -214,9 +218,9 @@ FAKECATALOG = '''
"name": "top",
"type": "top"
}
'''
"""
def get_fakecatalog():
# catalog constructed through test_index
"""catalog constructed through test_index"""
return FAKECATALOG

@ -14,8 +14,10 @@ from tests.helpers import get_fakecatalog
class TestFind(unittest.TestCase):
"""test find"""
def test_find(self):
"""test find"""
# init
catalog = Catalog('fake', force=True, debug=False)
top = catalog._restore_json(get_fakecatalog())
@ -38,6 +40,7 @@ class TestFind(unittest.TestCase):
def main():
"""entry point"""
unittest.main()

@ -16,8 +16,10 @@ from tests.helpers import clean, get_fakecatalog
class TestGraph(unittest.TestCase):
"""test graph"""
def test_graph(self):
"""test graph"""
# init
path = 'fake'
gpath = tempfile.gettempdir() + os.sep + 'graph.dot'
@ -38,6 +40,7 @@ class TestGraph(unittest.TestCase):
def main():
"""entry point"""
unittest.main()

@ -16,8 +16,10 @@ from tests.helpers import get_tempdir, create_rnd_file, clean, \
class TestIndexing(unittest.TestCase):
"""test index"""
def test_index(self):
"""test index"""
# init
workingdir = get_tempdir()
catalogpath = create_rnd_file(workingdir, 'catalog.json', content='')
@ -27,18 +29,18 @@ class TestIndexing(unittest.TestCase):
self.addCleanup(clean, dirpath)
# create 3 files
f1 = create_rnd_file(dirpath, get_rnd_string(5))
f2 = create_rnd_file(dirpath, get_rnd_string(5))
f3 = create_rnd_file(dirpath, get_rnd_string(5))
file1 = create_rnd_file(dirpath, get_rnd_string(5))
file2 = create_rnd_file(dirpath, get_rnd_string(5))
file3 = create_rnd_file(dirpath, get_rnd_string(5))
# create 2 directories
d1 = create_dir(dirpath, get_rnd_string(3))
d2 = create_dir(dirpath, get_rnd_string(3))
dir1 = create_dir(dirpath, get_rnd_string(3))
dir2 = create_dir(dirpath, get_rnd_string(3))
# fill directories with files
_ = create_rnd_file(d1, get_rnd_string(4))
_ = create_rnd_file(d1, get_rnd_string(4))
_ = create_rnd_file(d2, get_rnd_string(6))
_ = create_rnd_file(dir1, get_rnd_string(4))
_ = create_rnd_file(dir1, get_rnd_string(4))
_ = create_rnd_file(dir2, get_rnd_string(6))
noder = Noder()
top = noder.new_top_node()
@ -61,20 +63,21 @@ class TestIndexing(unittest.TestCase):
# ensures files and directories are in
names = [x.name for x in storage.children]
self.assertTrue(os.path.basename(f1) in names)
self.assertTrue(os.path.basename(f2) in names)
self.assertTrue(os.path.basename(f3) in names)
self.assertTrue(os.path.basename(d1) in names)
self.assertTrue(os.path.basename(d2) in names)
self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names)
self.assertTrue(os.path.basename(file3) in names)
self.assertTrue(os.path.basename(dir1) in names)
self.assertTrue(os.path.basename(dir2) in names)
for node in storage.children:
if node.name == os.path.basename(d1):
if node.name == os.path.basename(dir1):
self.assertTrue(len(node.children) == 2)
elif node.name == os.path.basename(d2):
elif node.name == os.path.basename(dir2):
self.assertTrue(len(node.children) == 1)
def main():
"""entry point"""
unittest.main()

@ -14,8 +14,10 @@ from tests.helpers import get_fakecatalog, clean
class TestWalking(unittest.TestCase):
"""test ls"""
def test_ls(self):
"""test ls"""
# init
path = 'fake'
self.addCleanup(clean, path)
@ -56,6 +58,7 @@ class TestWalking(unittest.TestCase):
def main():
"""entry point"""
unittest.main()

@ -14,8 +14,10 @@ from tests.helpers import clean, get_fakecatalog
class TestRm(unittest.TestCase):
"""test rm"""
def test_rm(self):
"""test rm"""
# init
path = 'fake'
self.addCleanup(clean, path)
@ -48,6 +50,7 @@ class TestRm(unittest.TestCase):
def main():
"""entry point"""
unittest.main()

@ -7,15 +7,17 @@ Basic unittest for tree
import unittest
from catcli.catcli import cmd_tree
from catcli.catcli import cmd_ls
from catcli.noder import Noder
from catcli.catalog import Catalog
from tests.helpers import clean, get_fakecatalog
class TestTree(unittest.TestCase):
"""Test the tree"""
def test_tree(self):
"""test the tree"""
# init
path = 'fake'
self.addCleanup(clean, path)
@ -30,13 +32,15 @@ class TestTree(unittest.TestCase):
'--format': 'native',
'--header': False,
'--raw-size': False,
'--recursive': True,
}
# print tree and wait for any errors
cmd_tree(args, noder, top)
cmd_ls(args, noder, top)
def main():
"""entry point"""
unittest.main()

@ -7,18 +7,20 @@ Basic unittest for updating an index
import unittest
import os
import anytree
from catcli.catcli import cmd_index, cmd_update
from catcli.noder import Noder
from catcli.catalog import Catalog
from tests.helpers import create_dir, create_rnd_file, get_tempdir, \
clean, unix_tree, edit_file, read_from_file, md5sum
import anytree
class TestIndexing(unittest.TestCase):
class TestUpdate(unittest.TestCase):
"""test update"""
def test_index(self):
def test_update(self):
"""test update"""
# init
workingdir = get_tempdir()
catalogpath = create_rnd_file(workingdir, 'catalog.json', content='')
@ -28,20 +30,20 @@ class TestIndexing(unittest.TestCase):
self.addCleanup(clean, dirpath)
# create 3 files
f1 = create_rnd_file(dirpath, 'file1')
f2 = create_rnd_file(dirpath, 'file2')
f3 = create_rnd_file(dirpath, 'file3')
f4 = create_rnd_file(dirpath, 'file4')
file1 = create_rnd_file(dirpath, 'file1')
file2 = create_rnd_file(dirpath, 'file2')
file3 = create_rnd_file(dirpath, 'file3')
file4 = create_rnd_file(dirpath, 'file4')
# create 2 directories
d1 = create_dir(dirpath, 'dir1')
d2 = create_dir(dirpath, 'dir2')
dir1 = create_dir(dirpath, 'dir1')
dir2 = create_dir(dirpath, 'dir2')
# fill directories with files
d1f1 = create_rnd_file(d1, 'dir1file1')
d1f2 = create_rnd_file(d1, 'dir1file2')
d2f1 = create_rnd_file(d2, 'dir2file1')
d2f2 = create_rnd_file(d2, 'dir2file2')
d1f1 = create_rnd_file(dir1, 'dir1file1')
d1f2 = create_rnd_file(dir1, 'dir1file2')
d2f1 = create_rnd_file(dir2, 'dir2file1')
d2f2 = create_rnd_file(dir2, 'dir2file2')
noder = Noder(debug=True)
noder.set_hashing(True)
@ -49,7 +51,7 @@ class TestIndexing(unittest.TestCase):
catalog = Catalog(catalogpath, force=True, debug=False)
# get checksums
f4_md5 = md5sum(f4)
f4_md5 = md5sum(file4)
self.assertTrue(f4_md5)
d1f1_md5 = md5sum(d1f1)
self.assertTrue(d1f1_md5)
@ -69,7 +71,7 @@ class TestIndexing(unittest.TestCase):
self.assertTrue(os.stat(catalogpath).st_size != 0)
# ensure md5 sum are in
nods = noder.find_name(top, os.path.basename(f4))
nods = noder.find_name(top, os.path.basename(file4))
self.assertTrue(len(nods) == 1)
nod = nods[0]
self.assertTrue(nod)
@ -79,34 +81,34 @@ class TestIndexing(unittest.TestCase):
noder.print_tree(top)
# add some files and directories
new1 = create_rnd_file(d1, 'newf1')
new1 = create_rnd_file(dir1, 'newf1')
new2 = create_rnd_file(dirpath, 'newf2')
new3 = create_dir(dirpath, 'newd3')
new4 = create_dir(d2, 'newd4')
new4 = create_dir(dir2, 'newd4')
new5 = create_rnd_file(new4, 'newf5')
unix_tree(dirpath)
# modify files
EDIT = 'edited'
edit_file(d1f1, EDIT)
editval = 'edited'
edit_file(d1f1, editval)
d1f1_md5_new = md5sum(d1f1)
self.assertTrue(d1f1_md5_new)
self.assertTrue(d1f1_md5_new != d1f1_md5)
# change file without mtime
maccess = os.path.getmtime(f4)
EDIT = 'edited'
edit_file(f4, EDIT)
maccess = os.path.getmtime(file4)
editval = 'edited'
edit_file(file4, editval)
# reset edit time
os.utime(f4, (maccess, maccess))
os.utime(file4, (maccess, maccess))
f4_md5_new = md5sum(d1f1)
self.assertTrue(f4_md5_new)
self.assertTrue(f4_md5_new != f4_md5)
# change file without mtime
maccess = os.path.getmtime(d2f2)
EDIT = 'edited'
edit_file(d2f2, EDIT)
editval = 'edited'
edit_file(d2f2, editval)
# reset edit time
os.utime(d2f2, (maccess, maccess))
d2f2_md5_new = md5sum(d2f2)
@ -134,7 +136,7 @@ class TestIndexing(unittest.TestCase):
self.assertTrue(nod.md5 == d1f1_md5_new)
# ensure f4 md5 sum has changed in catalog
nods = noder.find_name(top, os.path.basename(f4))
nods = noder.find_name(top, os.path.basename(file4))
self.assertTrue(len(nods) == 1)
nod = nods[0]
self.assertTrue(nod)
@ -152,14 +154,14 @@ class TestIndexing(unittest.TestCase):
# ensures files and directories are in
names = [node.name for node in anytree.PreOrderIter(storage)]
print(names)
self.assertTrue(os.path.basename(f1) in names)
self.assertTrue(os.path.basename(f2) in names)
self.assertTrue(os.path.basename(f3) in names)
self.assertTrue(os.path.basename(f4) in names)
self.assertTrue(os.path.basename(d1) in names)
self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names)
self.assertTrue(os.path.basename(file3) in names)
self.assertTrue(os.path.basename(file4) in names)
self.assertTrue(os.path.basename(dir1) in names)
self.assertTrue(os.path.basename(d1f1) in names)
self.assertTrue(os.path.basename(d1f2) in names)
self.assertTrue(os.path.basename(d2) in names)
self.assertTrue(os.path.basename(dir2) in names)
self.assertTrue(os.path.basename(d2f1) in names)
self.assertTrue(os.path.basename(new1) in names)
self.assertTrue(os.path.basename(new2) in names)
@ -168,19 +170,19 @@ class TestIndexing(unittest.TestCase):
self.assertTrue(os.path.basename(new5) in names)
for node in storage.children:
if node.name == os.path.basename(d1):
if node.name == os.path.basename(dir1):
self.assertTrue(len(node.children) == 3)
elif node.name == os.path.basename(d2):
elif node.name == os.path.basename(dir2):
self.assertTrue(len(node.children) == 3)
elif node.name == os.path.basename(new3):
self.assertTrue(len(node.children) == 0)
elif node.name == os.path.basename(new4):
self.assertTrue(len(node.children) == 1)
self.assertTrue(read_from_file(d1f1) == EDIT)
self.assertTrue(read_from_file(d1f1) == editval)
# remove some files
clean(d1f1)
clean(d2)
clean(dir2)
clean(new2)
clean(new4)
@ -190,14 +192,14 @@ class TestIndexing(unittest.TestCase):
# ensures files and directories are (not) in
names = [node.name for node in anytree.PreOrderIter(storage)]
print(names)
self.assertTrue(os.path.basename(f1) in names)
self.assertTrue(os.path.basename(f2) in names)
self.assertTrue(os.path.basename(f3) in names)
self.assertTrue(os.path.basename(f4) in names)
self.assertTrue(os.path.basename(d1) in names)
self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names)
self.assertTrue(os.path.basename(file3) in names)
self.assertTrue(os.path.basename(file4) in names)
self.assertTrue(os.path.basename(dir1) in names)
self.assertTrue(os.path.basename(d1f1) not in names)
self.assertTrue(os.path.basename(d1f2) in names)
self.assertTrue(os.path.basename(d2) not in names)
self.assertTrue(os.path.basename(dir2) not in names)
self.assertTrue(os.path.basename(d2f1) not in names)
self.assertTrue(os.path.basename(d2f1) not in names)
self.assertTrue(os.path.basename(new1) in names)
@ -206,13 +208,14 @@ class TestIndexing(unittest.TestCase):
self.assertTrue(os.path.basename(new4) not in names)
self.assertTrue(os.path.basename(new5) not in names)
for node in storage.children:
if node.name == os.path.basename(d1):
if node.name == os.path.basename(dir1):
self.assertTrue(len(node.children) == 2)
elif node.name == os.path.basename(new3):
self.assertTrue(len(node.children) == 0)
def main():
"""entry point"""
unittest.main()

Loading…
Cancel
Save