Merge pull request #45 from deadc0de6/features-42

add features for #42
pull/47/head
deadc0de 3 months ago committed by GitHub
commit 06a36c42fd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,7 @@
coverage:
status:
project:
default:
target: 90%
threshold: 1%
patch: off

@ -1,11 +1,11 @@
name: tests
on: [push, pull_request]
on: [push, pull_request, workflow_dispatch]
jobs:
test:
runs-on: ubuntu-20.04
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.6", "3.7", "3.8", "3.9", "3.10"]
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
@ -21,9 +21,9 @@ jobs:
- name: Run tests
run: |
./tests.sh
- name: Coveralls
run: |
pip install coveralls
coveralls --service=github
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
with:
files: coverage.xml
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

4
.gitignore vendored

@ -1,6 +1,8 @@
*.pyc
.coverage
.coverage*
coverages/
coverage.xml
dist/
build/
*.egg-info/
@ -9,3 +11,5 @@ build/
.mypy_cache
.pytest_cache
__pycache__
.pyre
.pytype

@ -0,0 +1,5 @@
[mypy]
strict = true
disable_error_code = import-untyped,import-not-found
ignore_missing_imports = True
warn_unused_ignores = False

@ -2,7 +2,7 @@
[![Tests Status](https://github.com/deadc0de6/catcli/workflows/tests/badge.svg?branch=master)](https://github.com/deadc0de6/catcli/actions)
[![License: GPL v3](https://img.shields.io/badge/License-GPL%20v3-blue.svg)](http://www.gnu.org/licenses/gpl-3.0)
[![Coveralls](https://img.shields.io/coveralls/github/deadc0de6/catcli)](https://coveralls.io/github/deadc0de6/catcli?branch=master)
[![Coverage](https://codecov.io/gh/deadc0de6/catcli/graph/badge.svg?token=t5dF7UL7K1)](https://codecov.io/gh/deadc0de6/catcli)
[![PyPI version](https://badge.fury.io/py/catcli.svg)](https://badge.fury.io/py/catcli)
[![AUR](https://img.shields.io/aur/version/catcli-git.svg)](https://aur.archlinux.org/packages/catcli-git)
@ -50,6 +50,8 @@ catcli ls -r
catcli ls log
# find files/directories named '*log*'
catcli find log
# show directories sizes
catcli du log
```
see [usage](#usage) for specific info
@ -76,6 +78,7 @@ See the [examples](#examples) for an overview of the available features.
* [Find files](#find-files)
* [Mount catalog](#mount-catalog)
* [Display entire hierarchy](#display-entire-hierarchy)
* [Disk usage](#disk-usage)
* [Catalog graph](#catalog-graph)
* [Edit storage](#edit-storage)
* [Update catalog](#update-catalog)
@ -148,9 +151,6 @@ directory under `catcli.catalog`.
The `--meta` switch allows to add any additional information to store along in
the catalog like for example `the blue disk in my office`.
Catcli will calculate and store the total size of each node (directories, storages, etc)
unless the `-n --no-subsize` switch is used.
Using the `-a --archive` switch allows to also index archive files as explained
[below](#index-archive-files).
@ -215,6 +215,11 @@ Resulting files can be sorted by size using the `-S --sortsize` switch.
See the [examples](#examples) for more.
## Disk usage
You can get the disk usage with the `du` command.
Resulting files can be sorted by size using the `-S --sortsize` switch.
## Catalog graph
The catalog can be exported in a dot file that can be used to

@ -7,9 +7,9 @@ Class that represents the catcli catalog
import os
from typing import Optional, List, Dict, Tuple, Union, Any
from anytree.exporter import JsonExporter, DictExporter # type: ignore
from anytree.importer import JsonImporter # type: ignore
from anytree import AnyNode # type: ignore
from anytree.exporter import JsonExporter, DictExporter
from anytree.importer import JsonImporter
from anytree import AnyNode
# local imports
from catcli import nodes
@ -96,14 +96,13 @@ class Catalog:
def _restore_json(self, string: str) -> Optional[NodeTop]:
"""restore the tree from json"""
imp = JsonImporter(dictimporter=_DictImporter(debug=self.debug))
self._debug('import from string...')
root = imp.import_(string)
self._debug(f'Catalog imported from json \"{self.path}\"')
self._debug(f'root imported: {root}')
if root.type != nodes.TYPE_TOP:
return None
top = NodeTop(root.name, children=root.children)
self._debug(f'top imported: {top}')
self._debug(f'top imported: {top.name}')
return top
@ -126,7 +125,7 @@ class _DictImporter():
assert "parent" not in data
attrs = dict(data)
# replace attr
attrs = back_attriter(attrs, debug=self.debug)
attrs = back_attriter(attrs)
children: Union[str, Any] = attrs.pop("children", [])
node = self.nodecls(parent=parent, **attrs)
for child in children:
@ -134,16 +133,14 @@ class _DictImporter():
return node
def back_attriter(adict: Dict[str, str],
debug: bool = False) -> Dict[str, str]:
def back_attriter(adict: Dict[str, str]) -> Dict[str, str]:
"""replace attribute on json restore"""
attrs = {}
for k, val in adict.items():
newk = k
if k == 'size':
if debug:
Logger.debug(f'changing {k}={val}')
k = 'nodesize'
attrs[k] = val
newk = 'nodesize'
attrs[newk] = val
return attrs

@ -11,13 +11,16 @@ Catcli command line interface
import sys
import os
import datetime
from typing import Dict, Any, List
from typing import Dict, Any, List, \
Tuple
from docopt import docopt
import cmd2
# local imports
from catcli.version import __version__ as VERSION
from catcli.nodes import NodeTop, NodeAny
from catcli.logger import Logger
from catcli.printer_csv import CsvPrinter
from catcli.colors import Colors
from catcli.catalog import Catalog
from catcli.walker import Walker
@ -39,17 +42,22 @@ USAGE = f"""
{BANNER}
Usage:
{NAME} ls [--catalog=<path>] [--format=<fmt>] [-aBCrVSs] [<path>]
{NAME} find [--catalog=<path>] [--format=<fmt>]
[-aBCbdVsP] [--path=<path>] [<term>]
{NAME} index [--catalog=<path>] [--meta=<meta>...]
[-aBCcfnV] <name> <path>
{NAME} update [--catalog=<path>] [-aBCcfnV] [--lpath=<path>] <name> <path>
{NAME} mount [--catalog=<path>] [-V] <mountpoint>
{NAME} rm [--catalog=<path>] [-BCfV] <storage>
{NAME} rename [--catalog=<path>] [-BCfV] <storage> <name>
{NAME} edit [--catalog=<path>] [-BCfV] <storage>
{NAME} graph [--catalog=<path>] [-BCV] [<path>]
{NAME} ls [--catalog=<path>] [--format=<fmt>] [-aBCrVSs] [<path>]
{NAME} tree [--catalog=<path>] [-aBCVSs] [<path>]
{NAME} find [--catalog=<path>] [--format=<fmt>]
[-aBCbdVs] [--path=<path>] [<term>]
{NAME} index [--catalog=<path>] [--meta=<meta>...]
[-aBCcfV] <name> <path>
{NAME} update [--catalog=<path>] [-aBCcfV]
[--lpath=<path>] <name> <path>
{NAME} mount [--catalog=<path>] [-V] <mountpoint>
{NAME} du [--catalog=<path>] [-BCVSs] [<path>]
{NAME} rm [--catalog=<path>] [-BCfV] <storage>
{NAME} rename [--catalog=<path>] [-BCfV] <storage> <name>
{NAME} edit [--catalog=<path>] [-BCfV] <storage>
{NAME} graph [--catalog=<path>] [-BCV] [<path>]
{NAME} [--catalog=<path>]
{NAME} fixsizes [--catalog=<path>]
{NAME} print_supported_formats
{NAME} help
{NAME} --help
@ -67,8 +75,6 @@ Options:
-F --format=<fmt> see \"print_supported_formats\" [default: native].
-f --force Do not ask when updating the catalog [default: False].
-l --lpath=<path> Path where changes are logged [default: ]
-n --no-subsize Do not store size of directories [default: False].
-P --parent Ignore stored relpath [default: True].
-p --path=<path> Start path.
-r --recursive Recursive [default: False].
-s --raw-size Print raw size [default: False].
@ -104,7 +110,6 @@ def cmd_index(args: Dict[str, Any],
name = args['<name>']
usehash = args['--hash']
debug = args['--verbose']
subsize = not args['--no-subsize']
if not os.path.exists(path):
Logger.err(f'\"{path}\" does not exist')
return
@ -116,16 +121,16 @@ def cmd_index(args: Dict[str, Any],
except KeyboardInterrupt:
Logger.err('aborted')
return
node = noder.get_storage_node(top, name)
node.parent = None
node = top.get_storage_node()
if node:
node.parent = None
start = datetime.datetime.now()
walker = Walker(noder, usehash=usehash, debug=debug)
attr = args['--meta']
root = noder.new_storage_node(name, path, top, attr)
_, cnt = walker.index(path, root, name)
if subsize:
noder.rec_size(root, store=True)
root.nodesize = root.get_rec_size()
stop = datetime.datetime.now()
diff = stop - start
Logger.info(f'Indexed {cnt} file(s) in {diff}')
@ -143,20 +148,19 @@ def cmd_update(args: Dict[str, Any],
usehash = args['--hash']
logpath = args['--lpath']
debug = args['--verbose']
subsize = not args['--no-subsize']
if not os.path.exists(path):
Logger.err(f'\"{path}\" does not exist')
return
root = noder.get_storage_node(top, name, newpath=path)
if not root:
storage = noder.find_storage_node_by_name(top, name)
if not storage:
Logger.err(f'storage named \"{name}\" does not exist')
return
noder.update_storage_path(top, name, path)
start = datetime.datetime.now()
walker = Walker(noder, usehash=usehash, debug=debug,
logpath=logpath)
cnt = walker.reindex(path, root, top)
if subsize:
noder.rec_size(root, store=True)
cnt = walker.reindex(path, storage, top)
storage.nodesize = storage.get_rec_size()
stop = datetime.datetime.now()
diff = stop - start
Logger.info(f'updated {cnt} file(s) in {diff}')
@ -164,6 +168,20 @@ def cmd_update(args: Dict[str, Any],
catalog.save(top)
def cmd_du(args: Dict[str, Any],
noder: Noder,
top: NodeTop) -> List[NodeAny]:
"""du action"""
path = path_to_search_all(args['<path>'])
found = noder.diskusage(top,
path,
raw=args['--raw-size'])
if not found:
path = args['<path>']
Logger.err(f'\"{path}\": nothing found')
return found
def cmd_ls(args: Dict[str, Any],
noder: Noder,
top: NodeTop) -> List[NodeAny]:
@ -174,8 +192,8 @@ def cmd_ls(args: Dict[str, Any],
raise BadFormatException('fzf is not supported in ls, use find')
found = noder.list(top,
path,
rec=args['--recursive'],
fmt=fmt,
rec=args['--recursive'],
raw=args['--raw-size'])
if not found:
path = args['<path>']
@ -189,7 +207,7 @@ def cmd_rm(args: Dict[str, Any],
top: NodeTop) -> NodeTop:
"""rm action"""
name = args['<storage>']
node = noder.get_storage_node(top, name)
node = noder.find_storage_node_by_name(top, name)
if node:
node.parent = None
if catalog.save(top):
@ -203,7 +221,6 @@ def cmd_find(args: Dict[str, Any],
noder: Noder,
top: NodeTop) -> List[NodeAny]:
"""find action"""
fromtree = args['--parent']
directory = args['--directory']
startpath = args['--path']
fmt = args['--format']
@ -212,13 +229,12 @@ def cmd_find(args: Dict[str, Any],
search_for = args['<term>']
if args['--verbose']:
Logger.debug(f'search for \"{search_for}\" under \"{top.name}\"')
found = noder.find_name(top, search_for,
script=script,
startnode=startpath,
only_dir=directory,
parentfromtree=fromtree,
fmt=fmt,
raw=raw)
found = noder.find(top, search_for,
script=script,
startnode=startpath,
only_dir=directory,
fmt=fmt,
raw=raw)
return found
@ -233,6 +249,17 @@ def cmd_graph(args: Dict[str, Any],
Logger.info(f'create graph with \"{cmd}\" (you need graphviz)')
def cmd_fixsizes(top: NodeTop,
noder: Noder,
catalog: Catalog) -> None:
"""
fix each node size by re-calculating
recursively their size
"""
noder.fixsizes(top)
catalog.save(top)
def cmd_rename(args: Dict[str, Any],
catalog: Catalog,
top: NodeTop) -> None:
@ -270,6 +297,75 @@ def cmd_edit(args: Dict[str, Any],
Logger.err(f'Storage named \"{storage}\" does not exist')
class CatcliRepl(cmd2.Cmd): # type: ignore
"""catcli repl"""
prompt = 'catcli> '
intro = ''
def __init__(self) -> None:
super().__init__()
# remove built-ins
del cmd2.Cmd.do_alias
del cmd2.Cmd.do_edit
del cmd2.Cmd.do_macro
del cmd2.Cmd.do_run_pyscript
del cmd2.Cmd.do_run_script
del cmd2.Cmd.do_set
del cmd2.Cmd.do_shell
del cmd2.Cmd.do_shortcuts
self.hidden_commands.append('EOF')
def cmdloop(self, intro: Any = None) -> Any:
return cmd2.Cmd.cmdloop(self, intro)
@cmd2.with_argument_list # type: ignore
def do_ls(self, arglist: List[str]) -> bool:
"""ls <path>"""
arglist.insert(0, '--no-banner')
arglist.insert(0, 'ls')
args, noder, _, _, top = init(arglist)
cmd_ls(args, noder, top)
return False
@cmd2.with_argument_list # type: ignore
def do_tree(self, arglist: List[str]) -> bool:
"""tree <path>"""
arglist.insert(0, '--no-banner')
arglist.insert(0, 'tree')
args, noder, _, _, top = init(arglist)
cmd_ls(args, noder, top)
return False
@cmd2.with_argument_list # type: ignore
def do_find(self, arglist: List[str]) -> bool:
"""find <term>"""
arglist.insert(0, '--no-banner')
arglist.insert(0, 'find')
args, noder, _, _, top = init(arglist)
cmd_find(args, noder, top)
return False
@cmd2.with_argument_list # type: ignore
def do_du(self, arglist: List[str]) -> bool:
"""du <path>"""
arglist.insert(0, '--no-banner')
arglist.insert(0, 'du')
args, noder, _, _, top = init(arglist)
cmd_du(args, noder, top)
return False
def do_help(self, _: Any) -> bool:
"""help"""
print(USAGE)
return False
# pylint: disable=C0103
def do_EOF(self, _: Any) -> bool:
"""exit repl"""
return True
def banner() -> None:
"""print banner"""
Logger.stderr_nocolor(BANNER)
@ -280,32 +376,35 @@ def print_supported_formats() -> None:
"""print all supported formats to stdout"""
print('"native" : native format')
print('"csv" : CSV format')
print(f' {Noder.CSV_HEADER}')
print(f' {CsvPrinter.CSV_HEADER}')
print('"fzf-native" : fzf to native (only valid for find)')
print('"fzf-csv" : fzf to csv (only valid for find)')
def main() -> bool:
"""entry point"""
args = docopt(USAGE, version=VERSION)
def init(argv: List[str]) -> Tuple[Dict[str, Any],
Noder,
Catalog,
str,
NodeTop]:
"""parse catcli arguments"""
args = docopt(USAGE, argv=argv, version=VERSION)
if args['help'] or args['--help']:
print(USAGE)
return True
sys.exit(0)
if args['print_supported_formats']:
print_supported_formats()
return True
sys.exit(0)
# check format
fmt = args['--format']
if fmt not in FORMATS:
Logger.err(f'bad format: {fmt}')
print_supported_formats()
return False
sys.exit(0)
if args['--verbose']:
print(args)
print(f'args: {args}')
# print banner
if not args['--no-banner']:
@ -331,15 +430,23 @@ def main() -> bool:
meta = noder.update_metanode(top)
catalog.set_metanode(meta)
return args, noder, catalog, catalog_path, top
def main() -> bool:
"""entry point"""
args, noder, catalog, catalog_path, top = init(sys.argv[1:])
# parse command
try:
if args['index']:
cmd_index(args, noder, catalog, top)
if args['update']:
elif args['update']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_update(args, noder, catalog, top)
cmd_fixsizes(top, noder, catalog)
elif args['find']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
@ -350,6 +457,12 @@ def main() -> bool:
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_ls(args, noder, top)
elif args['tree']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
args['--recursive'] = True
cmd_ls(args, noder, top)
elif args['mount']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
@ -376,6 +489,18 @@ def main() -> bool:
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_edit(args, noder, catalog, top)
elif args['du']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_du(args, noder, top)
elif args['fixsizes']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_fixsizes(top, noder, catalog)
else:
CatcliRepl().cmdloop()
except CatcliException as exc:
Logger.stderr_nocolor('ERROR ' + str(exc))
return False

@ -20,7 +20,9 @@ class Colors:
PURPLE = '\033[1;35m'
BLUE = '\033[94m'
GRAY = '\033[0;37m'
CYAN = '\033[36m'
MAGENTA = '\033[95m'
WHITE = '\033[97m'
RESET = '\033[0m'
EMPH = '\033[33m'
BOLD = '\033[1m'

@ -10,7 +10,7 @@ from time import time
from stat import S_IFDIR, S_IFREG
from typing import List, Dict, Any, Optional
try:
import fuse # type: ignore
import fuse
except ModuleNotFoundError:
pass
@ -107,7 +107,7 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
def getattr(self, path: str, _fh: Any = None) -> Dict[str, Any]:
"""return attr of file pointed by path"""
if path == '/':
if path == os.path.sep:
# mountpoint
curt = time()
meta = {

@ -1,73 +0,0 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
Class for printing nodes
"""
import sys
from typing import TypeVar, Type, Optional, Tuple, List, \
Dict, Any
from catcli.colors import Colors
from catcli.utils import fix_badchars
CLASSTYPE = TypeVar('CLASSTYPE', bound='NodePrinter')
class NodePrinter:
"""a node printer class"""
STORAGE = 'storage'
ARCHIVE = 'archive'
NBFILES = 'nbfiles'
@classmethod
def print_storage_native(cls: Type[CLASSTYPE], pre: str,
name: str, args: str,
attr: Dict[str, Any]) -> None:
"""print a storage node"""
end = ''
if attr:
end = f' {Colors.GRAY}({attr}){Colors.RESET}'
out = f'{pre}{Colors.UND}{cls.STORAGE}{Colors.RESET}:'
out += ' ' + Colors.PURPLE + fix_badchars(name) + \
Colors.RESET + end + '\n'
out += f' {Colors.GRAY}{args}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def print_file_native(cls: Type[CLASSTYPE], pre: str,
name: str, attr: str) -> None:
"""print a file node"""
nobad = fix_badchars(name)
out = f'{pre}{nobad}'
out += f' {Colors.GRAY}[{attr}]{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def print_dir_native(cls: Type[CLASSTYPE], pre: str,
name: str,
depth: int = 0,
attr: Optional[List[Tuple[str, str]]] = None) -> None:
"""print a directory node"""
end = []
if depth > 0:
end.append(f'{cls.NBFILES}:{depth}')
if attr:
end.append(' '.join([f'{x}:{y}' for x, y in attr]))
end_string = ''
if end:
end_string = f' [{", ".join(end)}]'
out = pre + Colors.BLUE + fix_badchars(name) + Colors.RESET
out += f'{Colors.GRAY}{end_string}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def print_archive_native(cls: Type[CLASSTYPE], pre: str,
name: str, archive: str) -> None:
"""archive to stdout"""
out = pre + Colors.YELLOW + fix_badchars(name) + Colors.RESET
out += f' {Colors.GRAY}[{cls.ARCHIVE}:{archive}]{Colors.RESET}'
sys.stdout.write(f'{out}\n')

@ -9,16 +9,19 @@ import os
import shutil
import time
from typing import List, Union, Tuple, Any, Optional, Dict, cast
import anytree # type: ignore
import fnmatch
import anytree
from natsort import os_sort_keygen
# local imports
from catcli import nodes
from catcli.nodes import NodeAny, NodeStorage, \
NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta, \
typcast_node
from catcli.utils import size_to_str, epoch_to_str, md5sum, fix_badchars
from catcli.utils import md5sum, fix_badchars, has_attr
from catcli.logger import Logger
from catcli.nodeprinter import NodePrinter
from catcli.printer_native import NativePrinter
from catcli.printer_csv import CsvPrinter
from catcli.decomp import Decomp
from catcli.version import __version__ as VERSION
from catcli.exceptions import CatcliException
@ -33,10 +36,7 @@ class Noder:
* "dir" node representing a directory
* "file" node representing a file
"""
CSV_HEADER = ('name,type,path,size,indexed_at,'
'maccess,md5,nbfiles,free_space,'
'total_space,meta')
# pylint: disable=R0904
def __init__(self, debug: bool = False,
sortsize: bool = False,
@ -52,31 +52,33 @@ class Noder:
self.arc = arc
if self.arc:
self.decomp = Decomp()
self.csv_printer = CsvPrinter()
self.native_printer = NativePrinter()
@staticmethod
def get_storage_names(top: NodeTop) -> List[str]:
"""return a list of all storage names"""
return [x.name for x in list(top.children)]
def get_storage_node(self, top: NodeTop,
name: str,
newpath: str = '') -> NodeStorage:
"""
return the storage node if any
if newpath is submitted, it will update the media info
"""
found = None
def find_storage_node_by_name(self, top: NodeTop,
name: str) -> Optional[NodeStorage]:
"""find a storage node by name"""
for node in top.children:
if node.type != nodes.TYPE_STORAGE:
continue
if node.name == name:
found = node
break
if found and newpath and os.path.exists(newpath):
found.free = shutil.disk_usage(newpath).free
found.total = shutil.disk_usage(newpath).total
found.ts = int(time.time())
return cast(NodeStorage, found)
return cast(NodeStorage, node)
return None
def update_storage_path(self, top: NodeTop,
name: str,
newpath: str) -> None:
"""find and update storage path on update"""
storage = self.find_storage_node_by_name(top, name)
if storage and newpath and os.path.exists(newpath):
storage.free = shutil.disk_usage(newpath).free
storage.total = shutil.disk_usage(newpath).total
storage.ts = int(time.time())
@staticmethod
def get_node(top: NodeTop,
@ -84,6 +86,7 @@ class Noder:
quiet: bool = False) -> Optional[NodeAny]:
"""get the node by internal tree path"""
resolv = anytree.resolver.Resolver('name')
bpath = ''
try:
bpath = os.path.basename(path)
the_node = resolv.get(top, bpath)
@ -114,7 +117,7 @@ class Noder:
return node, False
# force re-indexing if no maccess
maccess = os.path.getmtime(path)
if not self._has_attr(node, 'maccess') or \
if not has_attr(node, 'maccess') or \
not node.maccess:
self._debug('\tchange: no maccess found')
return node, True
@ -133,39 +136,6 @@ class Noder:
self._debug(f'\tchange: no change for \"{path}\"')
return node, False
def rec_size(self, node: Union[NodeDir, NodeStorage],
store: bool = True) -> int:
"""
recursively traverse tree and return size
@store: store the size in the node
"""
if node.type == nodes.TYPE_FILE:
node.__class__ = NodeFile
msg = f'size of {node.type} \"{node.name}\": {node.nodesize}'
self._debug(msg)
return node.nodesize
msg = f'getting node size recursively for \"{node.name}\"'
self._debug(msg)
fullsize: int = 0
for i in node.children:
if node.type == nodes.TYPE_DIR:
sub_size = self.rec_size(i, store=store)
if store:
i.nodesize = sub_size
fullsize += sub_size
continue
if node.type == nodes.TYPE_STORAGE:
sub_size = self.rec_size(i, store=store)
if store:
i.nodesize = sub_size
fullsize += sub_size
continue
self._debug(f'skipping {node.name}')
if store:
node.nodesize = fullsize
self._debug(f'size of {node.type} \"{node.name}\": {fullsize}')
return fullsize
###############################################################
# public helpers
###############################################################
@ -198,7 +168,7 @@ class Noder:
return top
def new_file_node(self, name: str, path: str,
parent: NodeAny, storagepath: str) -> Optional[NodeFile]:
parent: NodeAny) -> Optional[NodeFile]:
"""create a new node representing a file"""
if not os.path.exists(path):
Logger.err(f'File \"{path}\" does not exist')
@ -212,11 +182,9 @@ class Noder:
md5 = ''
if self.hash:
md5 = self._get_hash(path)
relpath = os.sep.join([storagepath, name])
maccess = os.path.getmtime(path)
node = NodeFile(name,
relpath,
stat.st_size,
md5,
maccess,
@ -232,13 +200,11 @@ class Noder:
return node
def new_dir_node(self, name: str, path: str,
parent: NodeAny, storagepath: str) -> NodeDir:
parent: NodeAny) -> NodeDir:
"""create a new node representing a directory"""
path = os.path.abspath(path)
relpath = os.sep.join([storagepath, name])
maccess = os.path.getmtime(path)
return NodeDir(name,
relpath,
0,
maccess,
parent=parent)
@ -261,10 +227,12 @@ class Noder:
self.attrs_to_string(attrs),
parent=parent)
def new_archive_node(self, name: str, path: str,
parent: str, archive: str) -> NodeArchived:
def new_archive_node(self,
name: str,
parent: str,
archive: str) -> NodeArchived:
"""create a new node for archive data"""
return NodeArchived(name=name, relpath=path,
return NodeArchived(name=name,
parent=parent, nodesize=0, md5='',
archive=archive)
@ -316,169 +284,82 @@ class Noder:
###############################################################
# printing
###############################################################
def _node_to_csv(self, node: NodeAny,
sep: str = ',',
raw: bool = False) -> None:
def _print_node_csv(self, node: NodeAny,
sep: str = ',',
raw: bool = False) -> None:
"""
print a node to csv
@node: the node to consider
@sep: CSV separator character
@raw: print raw size rather than human readable
"""
typcast_node(node)
if not node:
return
if node.type == nodes.TYPE_TOP:
return
out = []
if node.type == nodes.TYPE_STORAGE:
# handle storage
out.append(node.name) # name
out.append(node.type) # type
out.append('') # fake full path
size = self.rec_size(node, store=False)
out.append(size_to_str(size, raw=raw)) # size
out.append(epoch_to_str(node.ts)) # indexed_at
out.append('') # fake maccess
out.append('') # fake md5
out.append(str(len(node.children))) # nbfiles
# fake free_space
out.append(size_to_str(node.free, raw=raw))
# fake total_space
out.append(size_to_str(node.total, raw=raw))
out.append(node.attr) # meta
self.csv_printer.print_storage(node,
sep=sep,
raw=raw)
else:
# handle other nodes
out.append(node.name.replace('"', '""')) # name
out.append(node.type) # type
parents = self._get_parents(node)
storage = self._get_storage(node)
fullpath = os.path.join(storage.name, parents)
out.append(fullpath.replace('"', '""')) # full path
out.append(size_to_str(node.nodesize, raw=raw)) # size
out.append(epoch_to_str(storage.ts)) # indexed_at
if self._has_attr(node, 'maccess'):
out.append(epoch_to_str(node.maccess)) # maccess
else:
out.append('') # fake maccess
if self._has_attr(node, 'md5'):
out.append(node.md5) # md5
else:
out.append('') # fake md5
if node.type == nodes.TYPE_DIR:
out.append(str(len(node.children))) # nbfiles
else:
out.append('') # fake nbfiles
out.append('') # fake free_space
out.append('') # fake total_space
out.append('') # fake meta
self.csv_printer.print_node(node,
sep=sep,
raw=raw)
line = sep.join(['"' + o + '"' for o in out])
if len(line) > 0:
Logger.stdout_nocolor(line)
def _print_node_du(self, node: NodeAny,
raw: bool = False) -> None:
"""
print node du style
"""
typcast_node(node)
thenodes = self._get_entire_tree(node,
dironly=True)
for thenode in thenodes:
self.native_printer.print_du(thenode, raw=raw)
def _print_node_native(self, node: NodeAny,
pre: str = '',
withpath: bool = False,
withdepth: bool = False,
withnbchildren: bool = False,
withstorage: bool = False,
recalcparent: bool = False,
raw: bool = False) -> None:
"""
print a node
@node: the node to print
@pre: string to print before node
@withpath: print the node path
@withdepth: print the node depth info
@withnbchildren: print the node nb children
@withstorage: print the node storage it belongs to
@recalcparent: get relpath from tree instead of relpath field
@raw: print raw size rather than human readable
"""
typcast_node(node)
if node.type == nodes.TYPE_TOP:
# top node
node.__class__ = NodeTop
Logger.stdout_nocolor(f'{pre}{node.name}')
self.native_printer.print_top(pre, node.name)
elif node.type == nodes.TYPE_FILE:
# node of type file
node.__class__ = NodeFile
name = node.name
if withpath:
if recalcparent:
name = os.sep.join([self._get_parents(node.parent), name])
else:
name = node.relpath
name = name.lstrip(os.sep)
if withstorage:
storage = self._get_storage(node)
attr_str = ''
if node.md5:
attr_str = f', md5:{node.md5}'
size = size_to_str(node.nodesize, raw=raw)
compl = f'size:{size}{attr_str}'
if withstorage:
content = Logger.get_bold_text(storage.name)
compl += f', storage:{content}'
NodePrinter.print_file_native(pre, name, compl)
self.native_printer.print_file(pre, node,
withpath=withpath,
withstorage=withstorage,
raw=raw)
elif node.type == nodes.TYPE_DIR:
# node of type directory
node.__class__ = NodeDir
name = node.name
if withpath:
if recalcparent:
name = os.sep.join([self._get_parents(node.parent), name])
else:
name = node.relpath
name = name.lstrip(os.sep)
depth = 0
if withdepth:
depth = len(node.children)
if withstorage:
storage = self._get_storage(node)
attr: List[Tuple[str, str]] = []
if node.nodesize:
attr.append(('totsize', size_to_str(node.nodesize, raw=raw)))
if withstorage:
attr.append(('storage', Logger.get_bold_text(storage.name)))
NodePrinter.print_dir_native(pre, name, depth=depth, attr=attr)
self.native_printer.print_dir(pre,
node,
withpath=withpath,
withstorage=withstorage,
withnbchildren=withnbchildren,
raw=raw)
elif node.type == nodes.TYPE_STORAGE:
# node of type storage
node.__class__ = NodeStorage
sztotal = size_to_str(node.total, raw=raw)
szused = size_to_str(node.total - node.free, raw=raw)
nbchildren = len(node.children)
pcent = 0
if node.total > 0:
pcent = node.free * 100 / node.total
freepercent = f'{pcent:.1f}%'
# get the date
timestamp = ''
if self._has_attr(node, 'ts'):
timestamp = 'date:'
timestamp += epoch_to_str(node.ts)
disksize = ''
# the children size
recsize = self.rec_size(node, store=False)
sizestr = size_to_str(recsize, raw=raw)
disksize = 'totsize:' + f'{sizestr}'
# format the output
name = node.name
args = [
'nbfiles:' + f'{nbchildren}',
disksize,
f'free:{freepercent}',
'du:' + f'{szused}/{sztotal}',
timestamp]
argsstring = ' | '.join(args)
NodePrinter.print_storage_native(pre,
name,
argsstring,
node.attr)
self.native_printer.print_storage(pre,
node,
raw=raw)
elif node.type == nodes.TYPE_ARCHIVED:
# archive node
node.__class__ = NodeArchived
if self.arc:
NodePrinter.print_archive_native(pre, node.name, node.archive)
self.native_printer.print_archive(pre, node.name, node.archive)
else:
Logger.err(f'bad node encountered: {node}')
@ -497,27 +378,27 @@ class Noder:
rend = anytree.RenderTree(node, childiter=self._sort_tree)
for pre, _, thenode in rend:
self._print_node_native(thenode, pre=pre,
withdepth=True, raw=raw)
withnbchildren=True, raw=raw)
elif fmt == 'csv':
# csv output
self._to_csv(node, raw=raw)
self._print_nodes_csv(node, raw=raw)
elif fmt == 'csv-with-header':
# csv output
Logger.stdout_nocolor(self.CSV_HEADER)
self._to_csv(node, raw=raw)
self.csv_printer.print_header()
self._print_nodes_csv(node, raw=raw)
def _to_csv(self, node: NodeAny,
raw: bool = False) -> None:
def _print_nodes_csv(self, node: NodeAny,
raw: bool = False) -> None:
"""print the tree to csv"""
rend = anytree.RenderTree(node, childiter=self._sort_tree)
for _, _, item in rend:
self._node_to_csv(item, raw=raw)
self._print_node_csv(item, raw=raw)
@staticmethod
def _fzf_prompt(strings: Any) -> Any:
"""prompt with fzf"""
try:
from pyfzf.pyfzf import FzfPrompt # type: ignore # pylint: disable=C0415 # noqa
from pyfzf.pyfzf import FzfPrompt # pylint: disable=C0415 # noqa
fzf = FzfPrompt()
selected = fzf.prompt(strings)
return selected
@ -537,8 +418,8 @@ class Noder:
for _, _, rend in rendered:
if not rend:
continue
parents = self._get_parents(rend)
storage = self._get_storage(rend)
parents = rend.get_fullpath()
storage = rend.get_storage_node()
fullpath = os.path.join(storage.name, parents)
the_nodes[fullpath] = rend
# prompt with fzf
@ -564,14 +445,13 @@ class Noder:
###############################################################
# searching
###############################################################
def find_name(self, top: NodeTop,
key: str,
script: bool = False,
only_dir: bool = False,
startnode: Optional[NodeAny] = None,
parentfromtree: bool = False,
fmt: str = 'native',
raw: bool = False) -> List[NodeAny]:
def find(self, top: NodeTop,
key: str,
script: bool = False,
only_dir: bool = False,
startnode: Optional[NodeAny] = None,
fmt: str = 'native',
raw: bool = False) -> List[NodeAny]:
"""
find files based on their names
@top: top node
@ -579,7 +459,6 @@ class Noder:
@script: output script
@directory: only search for directories
@startpath: node to start with
@parentfromtree: get path from parent instead of stored relpath
@fmt: output format
@raw: raw size output
returns the found nodes
@ -597,17 +476,10 @@ class Noder:
# compile found nodes
paths = {}
for item in found:
typcast_node(item)
item.name = fix_badchars(item.name)
if hasattr(item, 'relpath'):
item.relpath = fix_badchars(item.relpath)
storage = self._get_storage(item)
if parentfromtree:
parent = self._get_parents(item)
key = f'{storage}/{parent}/{item.relpath}'
paths[parent] = item
else:
key = f'{storage}/{item.path}'
paths[key] = item
key = item.get_fullpath()
paths[key] = item
# handle fzf mode
if fmt.startswith('fzf'):
@ -623,16 +495,16 @@ class Noder:
else:
if fmt == 'native':
for _, item in paths.items():
self._print_node_native(item, withpath=True,
withdepth=True,
self._print_node_native(item,
withpath=True,
withnbchildren=True,
withstorage=True,
recalcparent=parentfromtree,
raw=raw)
elif fmt.startswith('csv'):
if fmt == 'csv-with-header':
Logger.stdout_nocolor(self.CSV_HEADER)
self.csv_printer.print_header()
for _, item in paths.items():
self._node_to_csv(item, raw=raw)
self._print_node_csv(item, raw=raw)
# execute script if any
if script:
@ -646,6 +518,8 @@ class Noder:
def _callback_find_name(self, term: str, only_dir: bool) -> Any:
"""callback for finding files"""
def find_name(node: NodeAny) -> bool:
typcast_node(node)
path = node.get_fullpath()
if node.type == nodes.TYPE_STORAGE:
# ignore storage nodes
return False
@ -662,13 +536,28 @@ class Noder:
# filter
if not term:
return True
if term.lower() in node.name.lower():
if term in path:
return True
if self.debug:
Logger.debug(f'match \"{path}\" with \"{term}\"')
if fnmatch.fnmatch(path, term):
return True
# ignore
return False
return find_name
###############################################################
# fixsizes
###############################################################
def fixsizes(self, top: NodeTop) -> None:
"""fix node sizes"""
typcast_node(top)
rend = anytree.RenderTree(top)
for _, _, thenode in rend:
typcast_node(thenode)
thenode.nodesize = thenode.get_rec_size()
###############################################################
# ls
###############################################################
@ -685,13 +574,27 @@ class Noder:
@fmt: output format
@raw: print raw size
"""
self._debug(f'walking path: \"{path}\" from {top}')
self._debug(f'ls walking path: \"{path}\" from \"{top.name}\"')
resolv = anytree.resolver.Resolver('name')
found = []
try:
# resolve the path in the tree
found = resolv.glob(top, path)
if '*' in path or '?' in path:
# we need to handle glob
self._debug('glob ls...')
found = resolv.glob(top, path)
else:
# we have a canonical path
self._debug('get ls...')
foundone = resolv.get(top, path)
cast(NodeAny, foundone)
typcast_node(foundone)
if foundone and foundone.may_have_children():
# let's find its children as well
modpath = os.path.join(path, '*')
found = resolv.glob(top, modpath)
else:
found = [foundone]
if len(found) < 1:
# nothing found
self._debug('nothing found')
@ -703,30 +606,19 @@ class Noder:
return found
# sort found nodes
found = sorted(found, key=self._sort, reverse=self.sortsize)
# print the parent
if fmt == 'native':
self._print_node_native(found[0].parent,
withpath=False,
withdepth=True,
raw=raw)
elif fmt.startswith('csv'):
self._node_to_csv(found[0].parent, raw=raw)
elif fmt.startswith('fzf'):
pass
found = sorted(found, key=os_sort_keygen(self._sort))
# print all found nodes
if fmt == 'csv-with-header':
Logger.stdout_nocolor(self.CSV_HEADER)
self.csv_printer.print_header()
for item in found:
if fmt == 'native':
self._print_node_native(item, withpath=False,
pre='- ',
withdepth=True,
self._print_node_native(item,
withpath=True,
withnbchildren=True,
raw=raw)
elif fmt.startswith('csv'):
self._node_to_csv(item, raw=raw)
self._print_node_csv(item, raw=raw)
elif fmt.startswith('fzf'):
self._to_fzf(item, fmt)
@ -734,6 +626,31 @@ class Noder:
pass
return found
###############################################################
# du
###############################################################
def diskusage(self, top: NodeTop,
path: str,
raw: bool = False) -> List[NodeAny]:
"""disk usage"""
self._debug(f'du walking path: \"{path}\" from \"{top.name}\"')
resolv = anytree.resolver.Resolver('name')
found: NodeAny
try:
# we have a canonical path
self._debug('get du...')
found = resolv.get(top, path)
if not found:
# nothing found
self._debug('nothing found')
return []
self._debug(f'du found: {found}')
self._print_node_du(found, raw=raw)
except anytree.resolver.ChildResolverError:
pass
return found
###############################################################
# tree creation
###############################################################
@ -743,15 +660,15 @@ class Noder:
"""add an entry to the tree"""
entries = name.rstrip(os.sep).split(os.sep)
if len(entries) == 1:
self.new_archive_node(name, name, top, top.name)
self.new_archive_node(name, top, top.name)
return
sub = os.sep.join(entries[:-1])
nodename = entries[-1]
try:
parent = resolv.get(top, sub)
parent = self.new_archive_node(nodename, name, parent, top.name)
parent = self.new_archive_node(nodename, parent, top.name)
except anytree.resolver.ChildResolverError:
self.new_archive_node(nodename, name, top, top.name)
self.new_archive_node(nodename, top, top.name)
def list_to_tree(self, parent: NodeAny, names: List[str]) -> None:
"""convert list of files to a tree"""
@ -765,6 +682,23 @@ class Noder:
###############################################################
# diverse
###############################################################
def _get_entire_tree(self, start: NodeAny,
dironly: bool = False) -> List[NodeAny]:
"""
get entire tree and sort it
"""
typcast_node(start)
rend = anytree.RenderTree(start)
thenodes = []
if dironly:
for _, _, thenode in rend:
typcast_node(thenode)
if thenode.type == nodes.TYPE_DIR:
thenodes.append(thenode)
else:
thenodes = [x for _, _, x in rend]
return sorted(thenodes, key=os_sort_keygen(self._sort))
def _sort_tree(self,
items: List[NodeAny]) -> List[NodeAny]:
"""sorting a list of items"""
@ -777,9 +711,10 @@ class Noder:
return self._sort_fs(lst)
@staticmethod
def _sort_fs(node: NodeAny) -> Tuple[str, str]:
"""sorting nodes dir first and alpha"""
return (node.type, node.name.lstrip('.').lower())
def _sort_fs(node: NodeAny) -> str:
"""sort by name"""
# to sort by types then name
return str(node.name)
@staticmethod
def _sort_size(node: NodeAny) -> float:
@ -791,28 +726,6 @@ class Noder:
except AttributeError:
return 0
def _get_storage(self, node: NodeAny) -> NodeStorage:
"""recursively traverse up to find storage"""
if node.type == nodes.TYPE_STORAGE:
return node
return cast(NodeStorage, node.ancestors[1])
@staticmethod
def _has_attr(node: NodeAny, attr: str) -> bool:
"""return True if node has attr as attribute"""
return attr in node.__dict__.keys()
def _get_parents(self, node: NodeAny) -> str:
"""get all parents recursively"""
if node.type == nodes.TYPE_STORAGE:
return ''
if node.type == nodes.TYPE_TOP:
return ''
parent = self._get_parents(node.parent)
if parent:
return os.sep.join([parent, node.name])
return str(node.name)
@staticmethod
def _get_hash(path: str) -> str:
"""return md5 hash of node"""

@ -6,8 +6,11 @@ Class that represents a node in the catalog tree
"""
# pylint: disable=W0622
from typing import Dict, Any
from anytree import NodeMixin # type: ignore
import os
from typing import Dict, Any, cast
from anytree import NodeMixin
from catcli.exceptions import CatcliException
TYPE_TOP = 'top'
@ -35,20 +38,30 @@ def typcast_node(node: Any) -> None:
node.__class__ = NodeStorage
elif node.type == TYPE_META:
node.__class__ = NodeMeta
else:
raise CatcliException(f"bad node: {node}")
class NodeAny(NodeMixin): # type: ignore
"""generic node"""
def __init__(self, # type: ignore[no-untyped-def]
name=None,
size=0,
parent=None,
children=None):
"""build generic node"""
super().__init__()
self.name = name
self.nodesize = size
self.parent = parent
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
raise NotImplementedError
def _to_str(self) -> str:
ret = str(self.__class__) + ": " + str(self.__dict__)
if self.children:
@ -60,6 +73,27 @@ class NodeAny(NodeMixin): # type: ignore
def __str__(self) -> str:
return self._to_str()
def get_fullpath(self) -> str:
"""return full path to this node"""
path = self.name
if self.parent:
typcast_node(self.parent)
ppath = self.parent.get_fullpath()
path = os.path.join(ppath, path)
return str(path)
def get_rec_size(self) -> int:
"""recursively traverse tree and return size"""
totsize: int = self.nodesize
for node in self.children:
typcast_node(node)
totsize += node.get_rec_size()
return totsize
def get_storage_node(self) -> NodeMixin:
"""recursively traverse up to find storage"""
return None
def flagged(self) -> bool:
"""is flagged"""
if not hasattr(self, '_flagged'):
@ -90,6 +124,23 @@ class NodeTop(NodeAny):
if children:
self.children = children
def get_fullpath(self) -> str:
"""return full path to this node"""
return ''
def may_have_children(self) -> bool:
"""can node contains sub"""
return True
def get_rec_size(self) -> int:
"""
recursively traverse tree and return size
also ensure to update the size on the way
"""
size = super().get_rec_size()
self.nodesize = size
return size
def __str__(self) -> str:
return self._to_str()
@ -99,7 +150,6 @@ class NodeFile(NodeAny):
def __init__(self, # type: ignore[no-untyped-def]
name: str,
relpath: str,
nodesize: int,
md5: str,
maccess: float,
@ -109,7 +159,6 @@ class NodeFile(NodeAny):
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_FILE
self.relpath = relpath
self.nodesize = nodesize
self.md5 = md5
self.maccess = maccess
@ -117,6 +166,14 @@ class NodeFile(NodeAny):
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return False
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return cast(NodeStorage, self.ancestors[1])
def __str__(self) -> str:
return self._to_str()
@ -126,7 +183,6 @@ class NodeDir(NodeAny):
def __init__(self, # type: ignore[no-untyped-def]
name: str,
relpath: str,
nodesize: int,
maccess: float,
parent=None,
@ -135,13 +191,29 @@ class NodeDir(NodeAny):
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_DIR
self.relpath = relpath
self.nodesize = nodesize
self.maccess = maccess
self.parent = parent
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return True
def get_rec_size(self) -> int:
"""
recursively traverse tree and return size
also ensure to update the size on the way
"""
size = super().get_rec_size()
self.nodesize = size
return size
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return cast(NodeStorage, self.ancestors[1])
def __str__(self) -> str:
return self._to_str()
@ -151,7 +223,6 @@ class NodeArchived(NodeAny):
def __init__(self, # type: ignore[no-untyped-def]
name: str,
relpath: str,
nodesize: int,
md5: str,
archive: str,
@ -161,7 +232,6 @@ class NodeArchived(NodeAny):
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_ARCHIVED
self.relpath = relpath
self.nodesize = nodesize
self.md5 = md5
self.archive = archive
@ -169,6 +239,14 @@ class NodeArchived(NodeAny):
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return False
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return cast(NodeStorage, self.ancestors[1])
def __str__(self) -> str:
return self._to_str()
@ -198,6 +276,23 @@ class NodeStorage(NodeAny):
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return True
def get_rec_size(self) -> int:
"""
recursively traverse tree and return size
also ensure to update the size on the way
"""
size = super().get_rec_size()
self.nodesize = size
return size
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return self
def __str__(self) -> str:
return self._to_str()
@ -219,5 +314,13 @@ class NodeMeta(NodeAny):
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return False
def get_rec_size(self) -> int:
"""recursively traverse tree and return size"""
return 0
def __str__(self) -> str:
return self._to_str()

@ -0,0 +1,82 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2024, deadc0de6
Class for printing nodes in csv format
"""
import sys
from typing import List
from catcli.nodes import NodeAny, NodeStorage, TYPE_DIR
from catcli.utils import size_to_str, epoch_to_str, \
has_attr
class CsvPrinter:
"""a node printer class"""
DEFSEP = ','
CSV_HEADER = ('name,type,path,size,indexed_at,'
'maccess,md5,nbfiles,free_space,'
'total_space,meta')
def _print_entries(self, entries: List[str], sep: str = DEFSEP) -> None:
line = sep.join(['"' + o + '"' for o in entries])
if len(line) > 0:
sys.stdout.write(f'{line}\n')
def print_header(self) -> None:
"""print csv header"""
sys.stdout.write(f'{self.CSV_HEADER}\n')
def print_storage(self, node: NodeStorage,
sep: str = DEFSEP,
raw: bool = False) -> None:
"""print a storage node"""
out = []
out.append(node.name) # name
out.append(node.type) # type
out.append('') # fake full path
size = node.get_rec_size()
out.append(size_to_str(size, raw=raw)) # size
out.append(epoch_to_str(node.ts)) # indexed_at
out.append('') # fake maccess
out.append('') # fake md5
out.append(str(len(node.children))) # nbfiles
# fake free_space
out.append(size_to_str(node.free, raw=raw))
# fake total_space
out.append(size_to_str(node.total, raw=raw))
out.append(node.attr) # meta
self._print_entries(out, sep=sep)
def print_node(self, node: NodeAny,
sep: str = DEFSEP,
raw: bool = False) -> None:
"""print other nodes"""
out = []
out.append(node.name.replace('"', '""')) # name
out.append(node.type) # type
fullpath = node.get_fullpath()
out.append(fullpath.replace('"', '""')) # full path
out.append(size_to_str(node.nodesize, raw=raw)) # size
storage = node.get_storage_node()
out.append(epoch_to_str(storage.ts)) # indexed_at
if has_attr(node, 'maccess'):
out.append(epoch_to_str(node.maccess)) # maccess
else:
out.append('') # fake maccess
if has_attr(node, 'md5'):
out.append(node.md5) # md5
else:
out.append('') # fake md5
if node.type == TYPE_DIR:
out.append(str(len(node.children))) # nbfiles
else:
out.append('') # fake nbfiles
out.append('') # fake free_space
out.append('') # fake total_space
out.append('') # fake meta
self._print_entries(out, sep=sep)

@ -0,0 +1,166 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
Class for printing nodes in native format
"""
import sys
from catcli.nodes import NodeFile, NodeDir, \
NodeStorage, NodeAny, typcast_node
from catcli.colors import Colors
from catcli.logger import Logger
from catcli.utils import fix_badchars, size_to_str, \
has_attr, epoch_to_str
COLOR_STORAGE = Colors.YELLOW
COLOR_FILE = Colors.WHITE
COLOR_DIRECTORY = Colors.BLUE
COLOR_ARCHIVE = Colors.PURPLE
COLOR_TS = Colors.CYAN
COLOR_SIZE = Colors.GREEN
FULLPATH_IN_NAME = True
class NativePrinter:
"""a node printer class"""
STORAGE = 'storage'
ARCHIVE = 'archive'
NBFILES = 'nbfiles'
def print_du(self, node: NodeAny,
raw: bool = False) -> None:
"""print du style"""
typcast_node(node)
name = node.get_fullpath()
size = node.nodesize
line = size_to_str(size, raw=raw).ljust(10, ' ')
out = f'{COLOR_SIZE}{line}{Colors.RESET}'
out += ' '
out += f'{COLOR_FILE}{name}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
def print_top(self, pre: str, name: str) -> None:
"""print top node"""
sys.stdout.write(f'{pre}{name}\n')
def print_storage(self, pre: str,
node: NodeStorage,
raw: bool = False) -> None:
"""print a storage node"""
# construct name
name = node.name
name = fix_badchars(name)
# construct attrs
attrs = []
# nb files
attrs.append(f'nbfiles:{len(node.children)}')
# the children size
recsize = node.get_rec_size()
sizestr = size_to_str(recsize, raw=raw)
attrs.append(f'totsize:{sizestr}')
# free
pcent = 0.0
if node.total > 0:
pcent = node.free * 100 / node.total
attrs.append(f'free:{pcent:.1f}%')
# du
sztotal = size_to_str(node.total, raw=raw)
szused = size_to_str(node.total - node.free, raw=raw)
attrs.append(f'du:{szused}/{sztotal}')
# timestamp
if has_attr(node, 'ts'):
attrs.append(f'date:{epoch_to_str(node.ts)}')
# print
out = f'{pre}{Colors.UND}{self.STORAGE}{Colors.RESET}: '
out += f'{COLOR_STORAGE}{name}{Colors.RESET}'
if attrs:
out += f' [{Colors.WHITE}{"|".join(attrs)}{Colors.RESET}]'
sys.stdout.write(f'{out}\n')
def print_file(self, pre: str,
node: NodeFile,
withpath: bool = False,
withstorage: bool = False,
raw: bool = False) -> None:
"""print a file node"""
# construct name
name = node.name
storage = node.get_storage_node()
if withpath:
name = node.get_fullpath()
# construct attributes
attrs = []
if node.md5:
attrs.append(f'md5:{node.md5}')
if withstorage:
content = Logger.get_bold_text(storage.name)
attrs.append(f'storage:{content}')
# print
out = []
out.append(f'{pre}')
out.append(f'{COLOR_FILE}{name}{Colors.RESET}')
size = 0
if node.nodesize:
size = node.nodesize
line = size_to_str(size, raw=raw)
out.append(f'{COLOR_SIZE}{line}{Colors.RESET}')
if has_attr(node, 'maccess'):
line = epoch_to_str(node.maccess)
out.append(f'{COLOR_TS}{line}{Colors.RESET}')
if attrs:
out.append(f'{Colors.GRAY}[{",".join(attrs)}]{Colors.RESET}')
out = [x for x in out if x]
sys.stdout.write(f'{" ".join(out)}\n')
def print_dir(self, pre: str,
node: NodeDir,
withpath: bool = False,
withstorage: bool = False,
withnbchildren: bool = False,
raw: bool = False) -> None:
"""print a directory node"""
# construct name
name = node.name
storage = node.get_storage_node()
if withpath:
name = node.get_fullpath()
# construct attrs
attrs = []
if withnbchildren:
nbchildren = len(node.children)
attrs.append(f'{self.NBFILES}:{nbchildren}')
if withstorage:
attrs.append(f'storage:{Logger.get_bold_text(storage.name)}')
# print
out = []
out.append(f'{pre}')
out.append(f'{COLOR_DIRECTORY}{name}{Colors.RESET}')
size = 0
if node.nodesize:
size = node.nodesize
line = size_to_str(size, raw=raw)
out.append(f'{COLOR_SIZE}{line}{Colors.RESET}')
if has_attr(node, 'maccess'):
line = epoch_to_str(node.maccess)
out.append(f'{COLOR_TS}{line}{Colors.RESET}')
if attrs:
out.append(f'{Colors.GRAY}[{",".join(attrs)}]{Colors.RESET}')
out = [x for x in out if x]
sys.stdout.write(f'{" ".join(out)}\n')
def print_archive(self, pre: str,
name: str, archive: str) -> None:
"""print an archive"""
name = fix_badchars(name)
out = f'{pre}{COLOR_ARCHIVE}{name}{Colors.RESET} '
out += f'{Colors.GRAY}[{self.ARCHIVE}:{archive}]{Colors.RESET}'
sys.stdout.write(f'{out}\n')

@ -16,13 +16,12 @@ from catcli import nodes
from catcli.exceptions import CatcliException
SEPARATOR = '/'
WILD = '*'
def path_to_top(path: str) -> str:
"""path pivot under top"""
pre = f'{SEPARATOR}{nodes.NAME_TOP}'
pre = f'{os.path.sep}{nodes.NAME_TOP}'
if not path.startswith(pre):
# prepend with top node path
path = pre + path
@ -32,19 +31,19 @@ def path_to_top(path: str) -> str:
def path_to_search_all(path: str) -> str:
"""path to search for all subs"""
if not path:
path = SEPARATOR
if not path.startswith(SEPARATOR):
path = SEPARATOR + path
pre = f'{SEPARATOR}{nodes.NAME_TOP}'
path = os.path.sep
if not path.startswith(os.path.sep):
path = os.path.sep + path
pre = f'{os.path.sep}{nodes.NAME_TOP}'
if not path.startswith(pre):
# prepend with top node path
path = pre + path
if not path.endswith(SEPARATOR):
# ensure ends with a separator
path += SEPARATOR
if not path.endswith(WILD):
# add wild card
path += WILD
# if not path.endswith(os.path.sep):
# # ensure ends with a separator
# path += os.path.sep
# if not path.endswith(WILD):
# # add wild card
# path += WILD
return path
@ -118,3 +117,8 @@ def edit(string: str) -> str:
def fix_badchars(string: str) -> str:
"""fix none utf-8 chars in string"""
return string.encode('utf-8', 'ignore').decode('utf-8')
def has_attr(node: nodes.NodeAny, attr: str) -> bool:
"""return True if node has attr as attribute"""
return attr in node.__dict__.keys()

@ -35,7 +35,8 @@ class Walker:
self.debug = debug
self.lpath = logpath
def index(self, path: str,
def index(self,
path: str,
parent: NodeAny,
name: str,
storagepath: str = '') -> Tuple[str, int]:
@ -47,8 +48,10 @@ class Walker:
"""
self._debug(f'indexing starting at {path}')
if not parent:
parent = self.noder.new_dir_node(name, path,
parent, storagepath)
# create the parent
parent = self.noder.new_dir_node(name,
path,
parent)
if os.path.islink(path):
rel = os.readlink(path)
@ -65,8 +68,9 @@ class Walker:
continue
self._progress(file)
self._debug(f'index file {sub}')
node = self.noder.new_file_node(os.path.basename(file), sub,
parent, storagepath)
node = self.noder.new_file_node(os.path.basename(file),
sub,
parent)
if node:
cnt += 1
for adir in dirs:
@ -76,7 +80,7 @@ class Walker:
self._debug(f'index directory {sub}')
if not os.path.exists(sub):
continue
dummy = self.noder.new_dir_node(base, sub, parent, storagepath)
dummy = self.noder.new_dir_node(base, sub, parent)
if not dummy:
continue
cnt += 1
@ -118,8 +122,9 @@ class Walker:
if node:
node.flag()
continue
node = self.noder.new_file_node(os.path.basename(file), sub,
parent, storagepath)
node = self.noder.new_file_node(os.path.basename(file),
sub,
parent)
if node:
node.flag()
cnt += 1
@ -131,7 +136,7 @@ class Walker:
reindex, dummy = self._need_reindex(parent, sub, treepath)
if reindex:
dummy = self.noder.new_dir_node(base, sub,
parent, storagepath)
parent)
cnt += 1
if dummy:
dummy.flag()

@ -3,3 +3,6 @@ types-docopt; python_version >= '3.0'
anytree; python_version >= '3.0'
pyfzf; python_version >= '3.0'
fusepy; python_version >= '3.0'
natsort; python_version >= '3.0'
cmd2; python_version >= '3.0'
gnureadline; python_version >= '3.0'

@ -37,11 +37,11 @@ setup(
python_requires=REQUIRES_PYTHON,
classifiers=[
'Development Status :: 5 - Production/Stable',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'License :: OSI Approved :: GNU General Public License v3 (GPLv3)',
],

@ -0,0 +1,22 @@
#!/usr/bin/env bash
# run me from the root of the package
source tests-ng/helper
rm -f tests-ng/assets/github.catalog.json
python3 -m catcli.catcli index -c github .github --catalog=tests-ng/assets/github.catalog.json
# edit catalog
clean_catalog "tests-ng/assets/github.catalog.json"
# native
python3 -m catcli.catcli ls -r -s -B --catalog=tests-ng/assets/github.catalog.json | \
sed -e 's/free:.*%/free:0.0%/g' \
-e 's/....-..-.. ..:..:../2023-03-09 16:20:59/g' \
-e 's#du:[^|]* |#du:0/0 |#g' > tests-ng/assets/github.catalog.native.txt
# csv
python3 -m catcli.catcli ls -r -s -B --catalog=tests-ng/assets/github.catalog.json --format=csv | \
sed -e 's/"3","[^"]*","[^"]*",""/"3","0","0",""/g' | \
sed 's/20..-..-.. ..:..:..//g' > tests-ng/assets/github.catalog.csv.txt

@ -1,5 +1,6 @@
"github","storage","","1510","2023-03-09 16:20:59","","","2","0","0",""
"workflows","dir","github/workflows","1493","2023-03-09 16:20:59","2023-03-09 16:20:44","","2","","",""
"pypi-release.yml","file","github/workflows/pypi-release.yml","691","2023-03-09 16:20:59","2022-10-19 21:00:37","57699a7a6a03e20e864f220e19f8e197","","","",""
"testing.yml","file","github/workflows/testing.yml","802","2023-03-09 16:20:59","2023-03-09 16:20:44","7144a119ef43adb634654522c12ec250","","","",""
"FUNDING.yml","file","github/FUNDING.yml","17","2023-03-09 16:20:59","2022-10-19 21:00:37","0c6407a84d412c514007313fb3bca4de","","","",""
"github","storage","","4865","","","","3","0","0",""
"FUNDING.yml","file","github/FUNDING.yml","17","","","0c6407a84d412c514007313fb3bca4de","","","",""
"codecov.yml","file","github/codecov.yml","104","","","4203204f75b43cd4bf032402beb3359d","","","",""
"workflows","dir","github/workflows","3082","","","","2","","",""
"pypi-release.yml","file","github/workflows/pypi-release.yml","691","","","57699a7a6a03e20e864f220e19f8e197","","","",""
"testing.yml","file","github/workflows/testing.yml","850","","","691df1a4d2f254b5cd04c152e7c6ccaf","","","",""

@ -7,54 +7,59 @@
"maccess": 1666206037.0786593,
"md5": "0c6407a84d412c514007313fb3bca4de",
"name": "FUNDING.yml",
"relpath": "/FUNDING.yml",
"size": 17,
"type": "file"
},
{
"maccess": 1704320710.7056112,
"md5": "4203204f75b43cd4bf032402beb3359d",
"name": "codecov.yml",
"size": 104,
"type": "file"
},
{
"children": [
{
"maccess": 1666206037.078865,
"md5": "57699a7a6a03e20e864f220e19f8e197",
"name": "pypi-release.yml",
"relpath": "workflows/pypi-release.yml",
"size": 691,
"type": "file"
},
{
"maccess": 1678375244.4870229,
"md5": "7144a119ef43adb634654522c12ec250",
"maccess": 1704403569.24789,
"md5": "691df1a4d2f254b5cd04c152e7c6ccaf",
"name": "testing.yml",
"relpath": "workflows/testing.yml",
"size": 802,
"size": 850,
"type": "file"
}
],
"maccess": 1678375244.4865956,
"maccess": 1704320727.2641916,
"name": "workflows",
"relpath": "/workflows",
"size": 1493,
"size": 1541,
"type": "dir"
}
],
"free": 0,
"name": "github",
"size": 1510,
"size": 1662,
"total": 0,
"ts": 1678375259,
"ts": 1704923096,
"type": "storage"
},
{
"attr": {
"access": 1678375259,
"access_version": "0.8.7",
"created": 1678375259,
"created_version": "0.8.7"
"access": 1704923096,
"access_version": "0.9.6",
"created": 1704923096,
"created_version": "0.9.6"
},
"name": "meta",
"size": null,
"type": "meta"
}
],
"name": "top",
"size": null,
"type": "top"
}

@ -1,7 +1,7 @@
top
└── storage: github
nbfiles:2 | totsize:1510 | free:0.0% | du:0/0 | date:2023-03-09 16:20:59
├── workflows [nbfiles:2, totsize:1493]
│ ├── pypi-release.yml [size:691, md5:57699a7a6a03e20e864f220e19f8e197]
│ └── testing.yml [size:802, md5:7144a119ef43adb634654522c12ec250]
└── FUNDING.yml [size:17, md5:0c6407a84d412c514007313fb3bca4de]
└── storage: github [nbfiles:3|totsize:4865|free:0.0%|du:0/0|date:2023-03-09 16:20:59]
├── FUNDING.yml 17 2023-03-09 16:20:59 [md5:0c6407a84d412c514007313fb3bca4de]
├── codecov.yml 104 2023-03-09 16:20:59 [md5:4203204f75b43cd4bf032402beb3359d]
└── workflows 3082 2023-03-09 16:20:59 [nbfiles:2]
├── pypi-release.yml 691 2023-03-09 16:20:59 [md5:57699a7a6a03e20e864f220e19f8e197]
└── testing.yml 850 2023-03-09 16:20:59 [md5:691df1a4d2f254b5cd04c152e7c6ccaf]

@ -10,7 +10,8 @@ cd "${prev}"
# coverage
bin="python3 -m catcli.catcli"
if command -v coverage 2>/dev/null; then
bin="coverage run -p --source=catcli -m catcli.catcli"
mkdir -p coverages/
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
fi
echo "current dir: $(pwd)"
@ -35,6 +36,7 @@ catalog="${tmpd}/catalog"
# index
${bin} -B index -c --catalog="${catalog}" github .github
clean_catalog "${catalog}"
ls -laR .github
cat "${catalog}"
@ -100,7 +102,7 @@ native="${tmpd}/native.txt"
${bin} -B ls -s -r --format=native --catalog="${catalog}" > "${native}"
mod="${tmpd}/native.mod.txt"
cat "${native}" | sed -e 's/free:.*%/free:0.0%/g' \
-e 's/date:....-..-.. ..:..:../date:2023-03-09 16:20:59/g' \
-e 's/....-..-.. ..:..:../2023-03-09 16:20:59/g' \
-e 's#du:[^|]* |#du:0/0 |#g' > "${mod}"
if command -v delta >/dev/null; then
delta -s "tests-ng/assets/github.catalog.native.txt" "${mod}"
@ -114,13 +116,14 @@ csv="${tmpd}/csv.txt"
${bin} -B ls -s -r --format=csv --catalog="${catalog}" > "${csv}"
# modify created csv
mod="${tmpd}/csv.mod.txt"
cat "${csv}" | sed -e 's/"2","[^"]*","[^"]*",""/"2","0","0",""/g' | \
cat "${csv}" | \
sed -e 's/"3","[^"]*","[^"]*",""/"3","0","0",""/g' | \
sed 's/20..-..-.. ..:..:..//g' > "${mod}"
# modify original
ori="${tmpd}/ori.mod.txt"
cat "tests-ng/assets/github.catalog.csv.txt" | \
sed 's/....-..-.. ..:..:..//g' | \
sed 's/"2","[^"]*","[^"]*",""/"2","0","0",""/g' > "${ori}"
sed 's/"3","[^"]*","[^"]*",""/"3","0","0",""/g' > "${ori}"
if command -v delta >/dev/null; then
delta -s "${ori}" "${mod}"
fi

@ -10,7 +10,8 @@ cd "${prev}"
# coverage
bin="python3 -m catcli.catcli"
if command -v coverage 2>/dev/null; then
bin="coverage run -p --source=catcli -m catcli.catcli"
mkdir -p coverages/
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
fi
echo "current dir: $(pwd)"
@ -36,13 +37,22 @@ catalog="${tmpd}/catalog"
# index
${bin} -B index -c -f --catalog="${catalog}" github1 .github
${bin} -B index -c -f --catalog="${catalog}" github2 .github
clean_catalog "${catalog}"
#cat "${catalog}"
echo ""
${bin} -B ls -r --catalog="${catalog}"
echo "finding \"testing.yml\""
${bin} -B find --catalog="${catalog}" testing.yml
cnt=$(${bin} -B find --catalog="${catalog}" testing.yml | wc -l)
[ "${cnt}" != "2" ] && echo "should return 2!" && exit 1
[ "${cnt}" != "2" ] && echo "should return 2 (not ${cnt})" && exit 1
echo "finding \"*.yml\""
${bin} -B find --catalog="${catalog}" '*.yml'
cnt=$(${bin} -B find --catalog="${catalog}" '*.yml' | wc -l)
[ "${cnt}" != "8" ] && echo "should return 8 (not ${cnt})" && exit 1
# the end
echo ""

@ -21,6 +21,14 @@ clear_on_exit()
fi
}
# clear catalog stuff for testing
# $1: catalog path
clean_catalog()
{
sed -i 's/"free": .*,/"free": 0,/g' "${1}"
sed -i 's/"total": .*,/"total": 0,/g' "${1}"
}
# clear files
on_exit()
{

@ -0,0 +1,59 @@
#!/usr/bin/env bash
# author: deadc0de6 (https://github.com/deadc0de6)
# Copyright (c) 2024, deadc0de6
set -e
cur=$(cd "$(dirname "${0}")" && pwd)
prev="${cur}/.."
cd "${prev}"
# coverage
bin="python3 -m catcli.catcli"
if command -v coverage 2>/dev/null; then
mkdir -p coverages/
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
fi
echo "current dir: $(pwd)"
echo "pythonpath: ${PYTHONPATH}"
echo "bin: ${bin}"
${bin} --version
# get the helpers
# shellcheck source=tests-ng/helper
source "${cur}"/helper
echo -e "$(tput setaf 6)==> RUNNING $(basename "${BASH_SOURCE[0]}") <==$(tput sgr0)"
##########################################################
# the test
##########################################################
# create temp dirs
tmpd=$(mktemp -d)
clear_on_exit "${tmpd}"
catalog="${tmpd}/catalog"
# index
${bin} -B index -c -f --catalog="${catalog}" github1 .github
${bin} -B index -c -f --catalog="${catalog}" github2 .github
clean_catalog "${catalog}"
#cat "${catalog}"
echo ""
${bin} -B ls -r --catalog="${catalog}"
${bin} -B ls --catalog="${catalog}" 'github1/*.yml'
cnt=$(${bin} -B ls --catalog="${catalog}" 'github1/*.yml' | wc -l)
[ "${cnt}" != "2" ] && echo "should return 2 (not ${cnt})" && exit 1
${bin} -B ls --catalog="${catalog}" 'github*/*.yml'
cnt=$(${bin} -B ls --catalog="${catalog}" 'github*/*.yml' | wc -l)
[ "${cnt}" != "4" ] && echo "should return 4 (not ${cnt})" && exit 1
# the end
echo ""
echo "test \"$(basename "$0")\" success"
cd "${cur}"
exit 0

@ -10,7 +10,8 @@ cd "${prev}"
# coverage
bin="python3 -m catcli.catcli"
if command -v coverage 2>/dev/null; then
bin="coverage run -p --source=catcli -m catcli.catcli"
mkdir -p coverages/
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
fi
echo "current dir: $(pwd)"
@ -40,12 +41,12 @@ echo "abc" > "${tmpd}/dir/a"
# index
${bin} -B index --catalog="${catalog}" dir "${tmpd}/dir"
${bin} -B ls --catalog="${catalog}" dir
${bin} -B ls --catalog="${catalog}"
# get attributes
freeb=$(${bin} -B ls --catalog="${catalog}" dir | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g')
dub=$(${bin} -B ls --catalog="${catalog}" dir | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g')
dateb=$(${bin} -B ls --catalog="${catalog}" dir | grep date: | sed 's/^.*,date: \(.*\)$/\1/g')
freeb=$(${bin} -B ls --catalog="${catalog}" | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g')
dub=$(${bin} -B ls --catalog="${catalog}" | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g')
dateb=$(${bin} -B ls --catalog="${catalog}" | grep date: | sed 's/^.*,date: \(.*\)$/\1/g')
echo "before: free:${freeb} | du:${dub} | date:${dateb}"
# change content
@ -60,12 +61,12 @@ sleep 1
# update
${bin} -B update -f --catalog="${catalog}" dir "${tmpu}/dir"
${bin} -B ls --catalog="${catalog}" dir
${bin} -B ls --catalog="${catalog}"
# get new attributes
freea=$(${bin} -B ls --catalog="${catalog}" dir | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g')
dua=$(${bin} -B ls --catalog="${catalog}" dir | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g')
datea=$(${bin} -B ls --catalog="${catalog}" dir | grep date: | sed 's/^.*,date: \(.*\)$/\1/g')
freea=$(${bin} -B ls --catalog="${catalog}" | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g')
dua=$(${bin} -B ls --catalog="${catalog}" | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g')
datea=$(${bin} -B ls --catalog="${catalog}" | grep date: | sed 's/^.*,date: \(.*\)$/\1/g')
echo "after: free:${freea} | du:${dua} | date:${datea}"
# test they are all different

@ -2,7 +2,7 @@ pycodestyle; python_version >= '3.0'
pyflakes; python_version >= '3.0'
nose2; python_version >= '3.0'
coverage; python_version >= '3.0'
coveralls; python_version >= '3.0'
pylint; python_version > '3.0'
mypy; python_version > '3.0'
pytest; python_version > '3.0'
pytype; python_version > '3.0'

@ -59,11 +59,23 @@ pylint -sn setup.py
# mypy
echo "[+] mypy"
mypy --strict catcli/
mypy --version
mypy --config-file=.mypy.ini catcli/
# pytype
echo "[+] pytype"
pytype --version
pytype catcli/
set +e
grep -R 'TODO' catcli/ && echo "TODO found" && exit 1
grep -R 'FIXME' catcli/ && echo "FIXME found" && exit 1
set -e
# unittest
echo "[+] unittests"
coverage run -p -m pytest tests
mkdir -p coverages/
coverage run -p --data-file coverages/coverage -m pytest tests
# tests-ng
echo "[+] tests-ng"
@ -87,7 +99,8 @@ done
# merge coverage
echo "[+] coverage merge"
coverage combine
coverage combine coverages/*
coverage xml
echo "ALL TESTS DONE OK"
exit 0

@ -149,21 +149,18 @@ FAKECATALOG = """
{
"md5": null,
"name": "7544G",
"relpath": "tmpj5602ih7.catcli/7544G",
"size": 100,
"type": "file"
},
{
"md5": null,
"name": "KF2ZC",
"relpath": "tmpj5602ih7.catcli/KF2ZC",
"size": 100,
"type": "file"
},
{
"md5": null,
"name": "Z9OII",
"relpath": "tmpj5602ih7.catcli/Z9OII",
"size": 100,
"type": "file"
},
@ -172,14 +169,12 @@ FAKECATALOG = """
{
"md5": null,
"name": "M592O9",
"relpath": "tmpj5602ih7.catcli/VNN/M592O9",
"size": 100,
"type": "file"
}
],
"md5": null,
"name": "VNN",
"relpath": "VNN",
"size": 100,
"type": "dir"
},
@ -188,21 +183,18 @@ FAKECATALOG = """
{
"md5": null,
"name": "X37H",
"relpath": "tmpj5602ih7.catcli/P4C/X37H",
"size": 100,
"type": "file"
},
{
"md5": null,
"name": "I566",
"relpath": "tmpj5602ih7.catcli/P4C/I566",
"size": 100,
"type": "file"
}
],
"md5": null,
"name": "P4C",
"relpath": "P4C",
"size": 200,
"type": "dir"
}

@ -50,7 +50,7 @@ class TestIndexing(unittest.TestCase):
tmpdirname = 'tmpdir'
args = {'<path>': dirpath, '<name>': tmpdirname,
'--hash': True, '--meta': ['some meta'],
'--no-subsize': False, '--verbose': True}
'--verbose': True}
# index the directory
cmd_index(args, noder, catalog, top)

@ -46,7 +46,7 @@ class TestRm(unittest.TestCase):
top = cmd_rm(args, noder, catalog, top)
# ensure there no children anymore
self.assertTrue(len(top.children) == 0)
self.assertEqual(len(top.children), 0)
def main():

@ -62,8 +62,7 @@ class TestUpdate(unittest.TestCase):
tmpdirname = 'tmpdir'
args = {'<path>': dirpath, '<name>': tmpdirname,
'--hash': True, '--meta': ['some meta'],
'--no-subsize': False, '--verbose': True,
'--lpath': None}
'--verbose': True, '--lpath': None}
# index the directory
unix_tree(dirpath)
@ -71,11 +70,11 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(os.stat(catalogpath).st_size != 0)
# ensure md5 sum are in
nods = noder.find_name(top, os.path.basename(file4))
self.assertTrue(len(nods) == 1)
nods = noder.find(top, os.path.basename(file4))
self.assertEqual(len(nods), 1)
nod = nods[0]
self.assertTrue(nod)
self.assertTrue(nod.md5 == f4_md5)
self.assertEqual(nod.md5, f4_md5)
# print catalog
noder.print_tree(top)
@ -125,10 +124,10 @@ class TestUpdate(unittest.TestCase):
# explore the top node to find all nodes
self.assertEqual(len(top.children), 1)
storage = top.children[0]
self.assertTrue(len(storage.children) == 8)
self.assertEqual(len(storage.children), 8)
# ensure d1f1 md5 sum has changed in catalog
nods = noder.find_name(top, os.path.basename(d1f1))
nods = noder.find(top, os.path.basename(d1f1))
self.assertTrue(len(nods) == 1)
nod = nods[0]
self.assertTrue(nod)
@ -136,7 +135,7 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(nod.md5 == d1f1_md5_new)
# ensure f4 md5 sum has changed in catalog
nods = noder.find_name(top, os.path.basename(file4))
nods = noder.find(top, os.path.basename(file4))
self.assertTrue(len(nods) == 1)
nod = nods[0]
self.assertTrue(nod)
@ -144,7 +143,7 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(nod.md5 == f4_md5_new)
# ensure d2f2 md5 sum has changed in catalog
nods = noder.find_name(top, os.path.basename(d2f2))
nods = noder.find(top, os.path.basename(d2f2))
self.assertTrue(len(nods) == 1)
nod = nods[0]
self.assertTrue(nod)

Loading…
Cancel
Save