Compare commits

..

No commits in common. 'master' and 'v0.9.2' have entirely different histories.

@ -1,7 +0,0 @@
coverage:
status:
project:
default:
target: 90%
threshold: 1%
patch: off

@ -1,11 +1,11 @@
name: tests
on: [push, pull_request, workflow_dispatch]
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
python-version: ["3.6", "3.7", "3.8", "3.9", "3.10"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
@ -21,9 +21,9 @@ jobs:
- name: Run tests
run: |
./tests.sh
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
with:
files: coverage.xml
- name: Coveralls
run: |
pip install coveralls
coveralls --service=github
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

4
.gitignore vendored

@ -1,8 +1,6 @@
*.pyc
.coverage
.coverage*
coverages/
coverage.xml
dist/
build/
*.egg-info/
@ -11,5 +9,3 @@ build/
.mypy_cache
.pytest_cache
__pycache__
.pyre
.pytype

@ -1,5 +0,0 @@
[mypy]
strict = true
disable_error_code = import-untyped,import-not-found
ignore_missing_imports = True
warn_unused_ignores = False

@ -1,8 +1,8 @@
# CATCLI
[![Tests Status](https://github.com/deadc0de6/catcli/workflows/tests/badge.svg?branch=master)](https://github.com/deadc0de6/catcli/actions)
[![Build Status](https://travis-ci.org/deadc0de6/catcli.svg?branch=master)](https://travis-ci.org/deadc0de6/catcli)
[![License: GPL v3](https://img.shields.io/badge/License-GPL%20v3-blue.svg)](http://www.gnu.org/licenses/gpl-3.0)
[![Coverage](https://codecov.io/gh/deadc0de6/catcli/graph/badge.svg?token=t5dF7UL7K1)](https://codecov.io/gh/deadc0de6/catcli)
[![Coveralls](https://img.shields.io/coveralls/github/deadc0de6/catcli)](https://coveralls.io/github/deadc0de6/catcli?branch=master)
[![PyPI version](https://badge.fury.io/py/catcli.svg)](https://badge.fury.io/py/catcli)
[![AUR](https://img.shields.io/aur/version/catcli-git.svg)](https://aur.archlinux.org/packages/catcli-git)
@ -12,10 +12,6 @@
*The command line catalog tool for your offline data*
> [!WARNING]
> catcli has been superseded by [gocatcli](https://github.com/deadc0de6/gocatcli/)
> which provides all features of catcli and more...
Did you ever wanted to find back that specific file that should be on one of your
backup DVDs or one of your external hard drives? You usually go through all
of them hoping to find the right one on the first try?
@ -54,8 +50,6 @@ catcli ls -r
catcli ls log
# find files/directories named '*log*'
catcli find log
# show directories sizes
catcli du log
```
see [usage](#usage) for specific info
@ -82,7 +76,6 @@ See the [examples](#examples) for an overview of the available features.
* [Find files](#find-files)
* [Mount catalog](#mount-catalog)
* [Display entire hierarchy](#display-entire-hierarchy)
* [Disk usage](#disk-usage)
* [Catalog graph](#catalog-graph)
* [Edit storage](#edit-storage)
* [Update catalog](#update-catalog)
@ -138,13 +131,6 @@ Five different types of entry are present in a catalog:
* **file node**: this is a file
* **archive node**: this is a file contained in an archive (tar, zip, etc)
Following environment variables are supported:
* `CATCLI_CATALOG_PATH`: define the catalog path (`--catalog=<path>`)
* `CATCLI_NO_BANNER`: disable the banner (`--no-banner`)
* `CATCLI_VERBOSE`: enable verbose mode (`--verbose`)
* `CATCLI_FORMAT`: define the output format (`-F --format=<fmt>`)
## Index data
Let's say the DVD or external hard drive that needs to be indexed
@ -162,6 +148,9 @@ directory under `catcli.catalog`.
The `--meta` switch allows to add any additional information to store along in
the catalog like for example `the blue disk in my office`.
Catcli will calculate and store the total size of each node (directories, storages, etc)
unless the `-n --no-subsize` switch is used.
Using the `-a --archive` switch allows to also index archive files as explained
[below](#index-archive-files).
@ -226,11 +215,6 @@ Resulting files can be sorted by size using the `-S --sortsize` switch.
See the [examples](#examples) for more.
## Disk usage
You can get the disk usage with the `du` command.
Resulting files can be sorted by size using the `-S --sortsize` switch.
## Catalog graph
The catalog can be exported in a dot file that can be used to

@ -6,10 +6,9 @@ Class that represents the catcli catalog
"""
import os
from typing import Optional, List, Dict, Tuple, Union, Any
from anytree.exporter import JsonExporter, DictExporter
from anytree.importer import JsonImporter
from anytree import AnyNode
from typing import Optional
from anytree.exporter import JsonExporter # type: ignore
from anytree.importer import JsonImporter # type: ignore
# local imports
from catcli import nodes
@ -30,7 +29,7 @@ class Catalog:
@debug: debug mode
@force: force overwrite if exists
"""
self.path = os.path.expanduser(path)
self.path = path
self.debug = debug
self.force = force
self.metanode: Optional[NodeMeta] = None
@ -86,8 +85,7 @@ class Catalog:
def _save_json(self, top: NodeTop) -> bool:
"""export the catalog in json"""
self._debug(f'saving {top} to json...')
dexporter = DictExporter(attriter=attriter)
exp = JsonExporter(dictexporter=dexporter, indent=2, sort_keys=True)
exp = JsonExporter(indent=2, sort_keys=True)
with open(self.path, 'w', encoding='UTF-8') as file:
exp.write(top, file)
self._debug(f'Catalog saved to json \"{self.path}\"')
@ -95,61 +93,13 @@ class Catalog:
def _restore_json(self, string: str) -> Optional[NodeTop]:
"""restore the tree from json"""
imp = JsonImporter(dictimporter=_DictImporter(debug=self.debug))
imp = JsonImporter()
self._debug(f'import from string: {string}')
root = imp.import_(string)
self._debug(f'Catalog imported from json \"{self.path}\"')
self._debug(f'root imported: {root}')
if root.type != nodes.TYPE_TOP:
return None
top = NodeTop(root.name, children=root.children)
self._debug(f'top imported: {top.get_name()}')
self._debug(f'top imported: {top}')
return top
class _DictImporter():
def __init__(self,
nodecls: AnyNode = AnyNode,
debug: bool = False):
self.nodecls = nodecls
self.debug = debug
def import_(self, data: Dict[str, str]) -> AnyNode:
"""Import tree from `data`."""
return self.__import(data)
def __import(self, data: Union[str, Any],
parent: AnyNode = None) -> AnyNode:
"""overwrite parent imoprt"""
assert isinstance(data, dict)
assert "parent" not in data
attrs = dict(data)
# replace attr
attrs = back_attriter(attrs)
children: Union[str, Any] = attrs.pop("children", [])
node = self.nodecls(parent=parent, **attrs)
for child in children:
self.__import(child, parent=node)
return node
def back_attriter(adict: Dict[str, str]) -> Dict[str, str]:
"""replace attribute on json restore"""
attrs = {}
for k, val in adict.items():
newk = k
if k == 'size':
newk = 'nodesize'
attrs[newk] = val
return attrs
def attriter(attrs: List[Tuple[str, Any]]) -> List[Tuple[str, Any]]:
"""replace attribute on json save"""
newattr = []
for attr in attrs:
k, val = attr
if k == 'nodesize':
k = 'size'
newattr.append((k, val))
return newattr

@ -11,41 +11,27 @@ Catcli command line interface
import sys
import os
import datetime
from typing import Dict, Any, List, \
Tuple
from typing import Dict, Any, List
from docopt import docopt
import cmd2
# local imports
from catcli.version import __version__ as VERSION
from catcli.nodes import NodeTop, NodeAny
from catcli.logger import Logger
from catcli.printer_csv import CsvPrinter
from catcli.colors import Colors
from catcli.catalog import Catalog
from catcli.walker import Walker
from catcli.noder import Noder
from catcli.utils import ask, edit
from catcli.nodes_utils import path_to_search_all
from catcli.utils import ask, edit, path_to_search_all
from catcli.fuser import Fuser
from catcli.exceptions import BadFormatException, CatcliException
NAME = 'catcli'
CUR = os.path.dirname(os.path.abspath(__file__))
CATALOGPATH = f'{NAME}.catalog'
GRAPHPATH = f'/tmp/{NAME}.dot'
FORMATS = ['native', 'csv', 'csv-with-header', 'fzf-native', 'fzf-csv']
# env variables
ENV_CATALOG_PATH = 'CATCLI_CATALOG_PATH'
ENV_NOBANNER = 'CATCLI_NO_BANNER'
ENV_VERBOSE = 'CATCLI_VERBOSE'
ENV_FORMAT = 'CATCLI_FORMAT'
# default paths
DEFAULT_CATALOGPATH = os.getenv(ENV_CATALOG_PATH, default=f'{NAME}.catalog')
DEFAULT_GRAPHPATH = f'/tmp/{NAME}.dot'
DEFAULT_NOBANNER = os.getenv(ENV_NOBANNER) is not None
DEFAULT_VERBOSEMODE = os.getenv(ENV_VERBOSE) is not None
DEFAULT_FORMAT = os.getenv(ENV_FORMAT, default='native')
BANNER = f""" +-+-+-+-+-+-+
|c|a|t|c|l|i|
+-+-+-+-+-+-+ v{VERSION}"""
@ -54,44 +40,41 @@ USAGE = f"""
{BANNER}
Usage:
{NAME} ls [--catalog=<path>] [--format=<fmt>] [-aBCrVSs] [<path>]
{NAME} tree [--catalog=<path>] [-aBCVSs] [<path>]
{NAME} find [--catalog=<path>] [--format=<fmt>]
[-aBCbdVs] [--path=<path>] [<term>]
{NAME} index [--catalog=<path>] [--meta=<meta>...]
[-aBCcfV] <name> <path>
{NAME} update [--catalog=<path>] [-aBCcfV]
[--lpath=<path>] <name> <path>
{NAME} mount [--catalog=<path>] [-V] <mountpoint>
{NAME} du [--catalog=<path>] [-BCVSs] [<path>]
{NAME} rm [--catalog=<path>] [-BCfV] <storage>
{NAME} rename [--catalog=<path>] [-BCfV] <storage> <name>
{NAME} edit [--catalog=<path>] [-BCfV] <storage>
{NAME} graph [--catalog=<path>] [-BCV] [<path>]
{NAME} [--catalog=<path>]
{NAME} fixsizes [--catalog=<path>]
{NAME} ls [--catalog=<path>] [--format=<fmt>] [-aBCrVSs] [<path>]
{NAME} find [--catalog=<path>] [--format=<fmt>]
[-aBCbdVsP] [--path=<path>] [<term>]
{NAME} index [--catalog=<path>] [--meta=<meta>...]
[-aBCcfnV] <name> <path>
{NAME} update [--catalog=<path>] [-aBCcfnV] [--lpath=<path>] <name> <path>
{NAME} mount [--catalog=<path>] [-V] <mountpoint>
{NAME} rm [--catalog=<path>] [-BCfV] <storage>
{NAME} rename [--catalog=<path>] [-BCfV] <storage> <name>
{NAME} edit [--catalog=<path>] [-BCfV] <storage>
{NAME} graph [--catalog=<path>] [-BCV] [<path>]
{NAME} print_supported_formats
{NAME} help
{NAME} --help
{NAME} --version
Options:
--catalog=<path> Path to the catalog [default: {DEFAULT_CATALOGPATH}].
--catalog=<path> Path to the catalog [default: {CATALOGPATH}].
--meta=<meta> Additional attribute to store [default: ].
-a --archive Handle archive file [default: False].
-B --no-banner Do not display the banner [default: {str(DEFAULT_NOBANNER)}].
-B --no-banner Do not display the banner [default: False].
-b --script Output script to manage found file(s) [default: False].
-C --no-color Do not output colors [default: False].
-c --hash Calculate md5 hash [default: False].
-d --directory Only directory [default: False].
-F --format=<fmt> see \"print_supported_formats\" [default: {DEFAULT_FORMAT}].
-F --format=<fmt> see \"print_supported_formats\" [default: native].
-f --force Do not ask when updating the catalog [default: False].
-l --lpath=<path> Path where changes are logged [default: ]
-n --no-subsize Do not store size of directories [default: False].
-P --parent Ignore stored relpath [default: True].
-p --path=<path> Start path.
-r --recursive Recursive [default: False].
-s --raw-size Print raw size [default: False].
-S --sortsize Sort by size, largest first [default: False].
-V --verbose Be verbose [default: {str(DEFAULT_VERBOSEMODE)}].
-V --verbose Be verbose [default: False].
-v --version Show version.
-h --help Show this screen.
""" # nopep8
@ -99,18 +82,12 @@ Options:
def cmd_mount(args: Dict[str, Any],
top: NodeTop,
noder: Noder) -> bool:
noder: Noder) -> None:
"""mount action"""
mountpoint = args['<mountpoint>']
debug = args['--verbose']
try:
from catcli.fuser import Fuser # pylint: disable=C0415
Fuser(mountpoint, top, noder,
debug=debug)
except ModuleNotFoundError:
Logger.err('install fusepy to use mount')
return False
return True
Fuser(mountpoint, top, noder,
debug=debug)
def cmd_index(args: Dict[str, Any],
@ -122,6 +99,7 @@ def cmd_index(args: Dict[str, Any],
name = args['<name>']
usehash = args['--hash']
debug = args['--verbose']
subsize = not args['--no-subsize']
if not os.path.exists(path):
Logger.err(f'\"{path}\" does not exist')
return
@ -133,18 +111,16 @@ def cmd_index(args: Dict[str, Any],
except KeyboardInterrupt:
Logger.err('aborted')
return
node = top.get_storage_node()
if node:
node.parent = None
node = noder.get_storage_node(top, name)
node.parent = None
start = datetime.datetime.now()
if debug:
Logger.debug('debug mode enabled')
walker = Walker(noder, usehash=usehash, debug=debug)
attr = args['--meta']
root = noder.new_storage_node(name, path, top, attr)
_, cnt = walker.index(path, root, name)
root.nodesize = root.get_rec_size()
if subsize:
noder.rec_size(root, store=True)
stop = datetime.datetime.now()
diff = stop - start
Logger.info(f'Indexed {cnt} file(s) in {diff}')
@ -162,19 +138,20 @@ def cmd_update(args: Dict[str, Any],
usehash = args['--hash']
logpath = args['--lpath']
debug = args['--verbose']
subsize = not args['--no-subsize']
if not os.path.exists(path):
Logger.err(f'\"{path}\" does not exist')
return
storage = noder.find_storage_node_by_name(top, name)
if not storage:
root = noder.get_storage_node(top, name, newpath=path)
if not root:
Logger.err(f'storage named \"{name}\" does not exist')
return
noder.update_storage_path(top, name, path)
start = datetime.datetime.now()
walker = Walker(noder, usehash=usehash, debug=debug,
logpath=logpath)
cnt = walker.reindex(path, storage, top)
storage.nodesize = storage.get_rec_size()
cnt = walker.reindex(path, root, top)
if subsize:
noder.rec_size(root, store=True)
stop = datetime.datetime.now()
diff = stop - start
Logger.info(f'updated {cnt} file(s) in {diff}')
@ -182,20 +159,6 @@ def cmd_update(args: Dict[str, Any],
catalog.save(top)
def cmd_du(args: Dict[str, Any],
noder: Noder,
top: NodeTop) -> List[NodeAny]:
"""du action"""
path = path_to_search_all(args['<path>'])
found = noder.diskusage(top,
path,
raw=args['--raw-size'])
if not found:
path = args['<path>']
Logger.err(f'\"{path}\": nothing found')
return found
def cmd_ls(args: Dict[str, Any],
noder: Noder,
top: NodeTop) -> List[NodeAny]:
@ -206,8 +169,8 @@ def cmd_ls(args: Dict[str, Any],
raise BadFormatException('fzf is not supported in ls, use find')
found = noder.list(top,
path,
fmt=fmt,
rec=args['--recursive'],
fmt=fmt,
raw=args['--raw-size'])
if not found:
path = args['<path>']
@ -221,7 +184,7 @@ def cmd_rm(args: Dict[str, Any],
top: NodeTop) -> NodeTop:
"""rm action"""
name = args['<storage>']
node = noder.find_storage_node_by_name(top, name)
node = noder.get_storage_node(top, name)
if node:
node.parent = None
if catalog.save(top):
@ -235,20 +198,19 @@ def cmd_find(args: Dict[str, Any],
noder: Noder,
top: NodeTop) -> List[NodeAny]:
"""find action"""
fromtree = args['--parent']
directory = args['--directory']
startpath = args['--path']
fmt = args['--format']
raw = args['--raw-size']
script = args['--script']
search_for = args['<term>']
if args['--verbose']:
Logger.debug(f'search for "{search_for}" under "{top.get_name()}"')
found = noder.find(top, search_for,
script=script,
startnode=startpath,
only_dir=directory,
fmt=fmt,
raw=raw)
found = noder.find_name(top, search_for,
script=script,
startnode=startpath,
only_dir=directory,
parentfromtree=fromtree,
fmt=fmt, raw=raw)
return found
@ -258,32 +220,21 @@ def cmd_graph(args: Dict[str, Any],
"""graph action"""
path = args['<path>']
if not path:
path = DEFAULT_GRAPHPATH
path = GRAPHPATH
cmd = noder.to_dot(top, path)
Logger.info(f'create graph with \"{cmd}\" (you need graphviz)')
def cmd_fixsizes(top: NodeTop,
noder: Noder,
catalog: Catalog) -> None:
"""
fix each node size by re-calculating
recursively their size
"""
noder.fixsizes(top)
catalog.save(top)
def cmd_rename(args: Dict[str, Any],
catalog: Catalog,
top: NodeTop) -> None:
"""rename action"""
storage = args['<storage>']
new = args['<name>']
storages = list(x.get_name() for x in top.children)
storages = list(x.name for x in top.children)
if storage in storages:
node = next(filter(lambda x: x.get_name() == storage, top.children))
node.set_name(new)
node = next(filter(lambda x: x.name == storage, top.children))
node.name = new
if catalog.save(top):
msg = f'Storage \"{storage}\" renamed to \"{new}\"'
Logger.info(msg)
@ -297,9 +248,9 @@ def cmd_edit(args: Dict[str, Any],
top: NodeTop) -> None:
"""edit action"""
storage = args['<storage>']
storages = list(x.get_name() for x in top.children)
storages = list(x.name for x in top.children)
if storage in storages:
node = next(filter(lambda x: x.get_name() == storage, top.children))
node = next(filter(lambda x: x.name == storage, top.children))
attr = node.attr
if not attr:
attr = ''
@ -311,75 +262,6 @@ def cmd_edit(args: Dict[str, Any],
Logger.err(f'Storage named \"{storage}\" does not exist')
class CatcliRepl(cmd2.Cmd): # type: ignore
"""catcli repl"""
prompt = 'catcli> '
intro = ''
def __init__(self) -> None:
super().__init__()
# remove built-ins
del cmd2.Cmd.do_alias
del cmd2.Cmd.do_edit
del cmd2.Cmd.do_macro
del cmd2.Cmd.do_run_pyscript
del cmd2.Cmd.do_run_script
del cmd2.Cmd.do_set
del cmd2.Cmd.do_shell
del cmd2.Cmd.do_shortcuts
self.hidden_commands.append('EOF')
def cmdloop(self, intro: Any = None) -> Any:
return cmd2.Cmd.cmdloop(self, intro)
@cmd2.with_argument_list # type: ignore
def do_ls(self, arglist: List[str]) -> bool:
"""ls <path>"""
arglist.insert(0, '--no-banner')
arglist.insert(0, 'ls')
args, noder, _, _, top = init(arglist)
cmd_ls(args, noder, top)
return False
@cmd2.with_argument_list # type: ignore
def do_tree(self, arglist: List[str]) -> bool:
"""tree <path>"""
arglist.insert(0, '--no-banner')
arglist.insert(0, 'tree')
args, noder, _, _, top = init(arglist)
cmd_ls(args, noder, top)
return False
@cmd2.with_argument_list # type: ignore
def do_find(self, arglist: List[str]) -> bool:
"""find <term>"""
arglist.insert(0, '--no-banner')
arglist.insert(0, 'find')
args, noder, _, _, top = init(arglist)
cmd_find(args, noder, top)
return False
@cmd2.with_argument_list # type: ignore
def do_du(self, arglist: List[str]) -> bool:
"""du <path>"""
arglist.insert(0, '--no-banner')
arglist.insert(0, 'du')
args, noder, _, _, top = init(arglist)
cmd_du(args, noder, top)
return False
def do_help(self, _: Any) -> bool:
"""help"""
print(USAGE)
return False
# pylint: disable=C0103
def do_EOF(self, _: Any) -> bool:
"""exit repl"""
return True
def banner() -> None:
"""print banner"""
Logger.stderr_nocolor(BANNER)
@ -390,39 +272,35 @@ def print_supported_formats() -> None:
"""print all supported formats to stdout"""
print('"native" : native format')
print('"csv" : CSV format')
print(f' {CsvPrinter.CSV_HEADER}')
print(f' {Noder.CSV_HEADER}')
print('"fzf-native" : fzf to native (only valid for find)')
print('"fzf-csv" : fzf to csv (only valid for find)')
def init(argv: List[str]) -> Tuple[Dict[str, Any],
Noder,
Catalog,
str,
NodeTop]:
"""parse catcli arguments"""
args = docopt(USAGE, argv=argv, version=VERSION)
def main() -> bool:
"""entry point"""
args = docopt(USAGE, version=VERSION)
if args['help'] or args['--help']:
print(USAGE)
sys.exit(0)
return True
if args['print_supported_formats']:
print_supported_formats()
sys.exit(0)
return True
# check format
fmt = args['--format']
if fmt not in FORMATS:
Logger.err(f'bad format: {fmt}')
print_supported_formats()
sys.exit(0)
return False
if args['--verbose'] or DEFAULT_VERBOSEMODE:
print('verbose mode enabled')
print(f'args: {args}')
if args['--verbose']:
print(args)
# print banner
if not args['--no-banner'] and DEFAULT_NOBANNER:
if not args['--no-banner']:
banner()
# set colors
@ -445,23 +323,15 @@ def init(argv: List[str]) -> Tuple[Dict[str, Any],
meta = noder.update_metanode(top)
catalog.set_metanode(meta)
return args, noder, catalog, catalog_path, top
def main() -> bool:
"""entry point"""
args, noder, catalog, catalog_path, top = init(sys.argv[1:])
# parse command
try:
if args['index']:
cmd_index(args, noder, catalog, top)
elif args['update']:
if args['update']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_update(args, noder, catalog, top)
cmd_fixsizes(top, noder, catalog)
elif args['find']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
@ -472,18 +342,11 @@ def main() -> bool:
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_ls(args, noder, top)
elif args['tree']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
args['--recursive'] = True
cmd_ls(args, noder, top)
elif args['mount']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
if not cmd_mount(args, top, noder):
return False
cmd_mount(args, top, noder)
elif args['rm']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
@ -504,18 +367,6 @@ def main() -> bool:
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_edit(args, noder, catalog, top)
elif args['du']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_du(args, noder, top)
elif args['fixsizes']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_fixsizes(top, noder, catalog)
else:
CatcliRepl().cmdloop()
except CatcliException as exc:
Logger.stderr_nocolor('ERROR ' + str(exc))
return False

@ -20,9 +20,7 @@ class Colors:
PURPLE = '\033[1;35m'
BLUE = '\033[94m'
GRAY = '\033[0;37m'
CYAN = '\033[36m'
MAGENTA = '\033[95m'
WHITE = '\033[97m'
RESET = '\033[0m'
EMPH = '\033[33m'
BOLD = '\033[1m'

@ -9,15 +9,12 @@ import os
from time import time
from stat import S_IFDIR, S_IFREG
from typing import List, Dict, Any, Optional
try:
import fuse
except ModuleNotFoundError:
pass
import fuse # type: ignore
# local imports
from catcli.noder import Noder
from catcli.nodes import NodeTop, NodeAny
from catcli.nodes_utils import path_to_search_all, path_to_top
from catcli.utils import path_to_search_all, path_to_top
from catcli import nodes
@ -33,6 +30,7 @@ class Fuser:
fuse.FUSE(filesystem,
mountpoint,
foreground=debug,
allow_other=True,
nothreads=True,
debug=debug)
@ -73,21 +71,21 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
maccess = time()
mode: Any = S_IFREG
nodesize: int = 0
size: int = 0
if entry.type == nodes.TYPE_ARCHIVED:
mode = S_IFREG
nodesize = entry.nodesize
size = entry.size
elif entry.type == nodes.TYPE_DIR:
mode = S_IFDIR
nodesize = entry.nodesize
size = entry.size
maccess = entry.maccess
elif entry.type == nodes.TYPE_FILE:
mode = S_IFREG
nodesize = entry.nodesize
size = entry.size
maccess = entry.maccess
elif entry.type == nodes.TYPE_STORAGE:
mode = S_IFDIR
nodesize = entry.nodesize
size = entry.size
maccess = entry.ts
elif entry.type == nodes.TYPE_META:
mode = S_IFREG
@ -97,7 +95,7 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
return {
'st_mode': (mode), # file type
'st_nlink': 1, # count hard link
'st_size': nodesize,
'st_size': size,
'st_ctime': maccess, # attr last modified
'st_mtime': maccess, # content last modified
'st_atime': maccess, # access time
@ -107,7 +105,7 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
def getattr(self, path: str, _fh: Any = None) -> Dict[str, Any]:
"""return attr of file pointed by path"""
if path == os.path.sep:
if path == '/':
# mountpoint
curt = time()
meta = {
@ -129,5 +127,5 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
content = ['.', '..']
entries = self._get_entries(path)
for entry in entries:
content.append(entry.get_name())
content.append(entry.name)
return content

@ -0,0 +1,73 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
Class for printing nodes
"""
import sys
from typing import TypeVar, Type, Optional, Tuple, List, \
Dict, Any
from catcli.colors import Colors
from catcli.utils import fix_badchars
CLASSTYPE = TypeVar('CLASSTYPE', bound='NodePrinter')
class NodePrinter:
"""a node printer class"""
STORAGE = 'storage'
ARCHIVE = 'archive'
NBFILES = 'nbfiles'
@classmethod
def print_storage_native(cls: Type[CLASSTYPE], pre: str,
name: str, args: str,
attr: Dict[str, Any]) -> None:
"""print a storage node"""
end = ''
if attr:
end = f' {Colors.GRAY}({attr}){Colors.RESET}'
out = f'{pre}{Colors.UND}{cls.STORAGE}{Colors.RESET}:'
out += ' ' + Colors.PURPLE + fix_badchars(name) + \
Colors.RESET + end + '\n'
out += f' {Colors.GRAY}{args}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def print_file_native(cls: Type[CLASSTYPE], pre: str,
name: str, attr: str) -> None:
"""print a file node"""
nobad = fix_badchars(name)
out = f'{pre}{nobad}'
out += f' {Colors.GRAY}[{attr}]{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def print_dir_native(cls: Type[CLASSTYPE], pre: str,
name: str,
depth: int = 0,
attr: Optional[List[Tuple[str, str]]] = None) -> None:
"""print a directory node"""
end = []
if depth > 0:
end.append(f'{cls.NBFILES}:{depth}')
if attr:
end.append(' '.join([f'{x}:{y}' for x, y in attr]))
end_string = ''
if end:
end_string = f' [{", ".join(end)}]'
out = pre + Colors.BLUE + fix_badchars(name) + Colors.RESET
out += f'{Colors.GRAY}{end_string}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def print_archive_native(cls: Type[CLASSTYPE], pre: str,
name: str, archive: str) -> None:
"""archive to stdout"""
out = pre + Colors.YELLOW + fix_badchars(name) + Colors.RESET
out += f' {Colors.GRAY}[{cls.ARCHIVE}:{archive}]{Colors.RESET}'
sys.stdout.write(f'{out}\n')

@ -9,19 +9,16 @@ import os
import shutil
import time
from typing import List, Union, Tuple, Any, Optional, Dict, cast
import fnmatch
import anytree
from natsort import os_sort_keygen
import anytree # type: ignore
from pyfzf.pyfzf import FzfPrompt # type: ignore
# local imports
from catcli import nodes
from catcli.nodes import NodeAny, NodeStorage, \
NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta, \
typcast_node
from catcli.utils import md5sum
NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta
from catcli.utils import size_to_str, epoch_to_str, md5sum, fix_badchars
from catcli.logger import Logger
from catcli.printer_native import NativePrinter
from catcli.printer_csv import CsvPrinter
from catcli.nodeprinter import NodePrinter
from catcli.decomp import Decomp
from catcli.version import __version__ as VERSION
from catcli.exceptions import CatcliException
@ -36,7 +33,10 @@ class Noder:
* "dir" node representing a directory
* "file" node representing a file
"""
# pylint: disable=R0904
CSV_HEADER = ('name,type,path,size,indexed_at,'
'maccess,md5,nbfiles,free_space,'
'total_space,meta')
def __init__(self, debug: bool = False,
sortsize: bool = False,
@ -52,33 +52,31 @@ class Noder:
self.arc = arc
if self.arc:
self.decomp = Decomp()
self.csv_printer = CsvPrinter()
self.native_printer = NativePrinter()
@staticmethod
def get_storage_names(top: NodeTop) -> List[str]:
"""return a list of all storage names"""
return [x.name for x in list(top.children)]
def find_storage_node_by_name(self, top: NodeTop,
name: str) -> Optional[NodeStorage]:
"""find a storage node by name"""
def get_storage_node(self, top: NodeTop,
name: str,
newpath: str = '') -> NodeStorage:
"""
return the storage node if any
if newpath is submitted, it will update the media info
"""
found = None
for node in top.children:
if node.type != nodes.TYPE_STORAGE:
continue
if node.name == name:
return cast(NodeStorage, node)
return None
def update_storage_path(self, top: NodeTop,
name: str,
newpath: str) -> None:
"""find and update storage path on update"""
storage = self.find_storage_node_by_name(top, name)
if storage and newpath and os.path.exists(newpath):
storage.free = shutil.disk_usage(newpath).free
storage.total = shutil.disk_usage(newpath).total
storage.ts = int(time.time())
found = node
break
if found and newpath and os.path.exists(newpath):
found.free = shutil.disk_usage(newpath).free
found.total = shutil.disk_usage(newpath).total
found.ts = int(time.time())
return cast(NodeStorage, found)
@staticmethod
def get_node(top: NodeTop,
@ -86,11 +84,9 @@ class Noder:
quiet: bool = False) -> Optional[NodeAny]:
"""get the node by internal tree path"""
resolv = anytree.resolver.Resolver('name')
bpath = ''
try:
bpath = os.path.basename(path)
the_node = resolv.get(top, bpath)
typcast_node(the_node)
return cast(NodeAny, the_node)
except anytree.resolver.ChildResolverError:
if not quiet:
@ -117,7 +113,7 @@ class Noder:
return node, False
# force re-indexing if no maccess
maccess = os.path.getmtime(path)
if not node.has_attr('maccess') or \
if not self._has_attr(node, 'maccess') or \
not node.maccess:
self._debug('\tchange: no maccess found')
return node, True
@ -136,6 +132,37 @@ class Noder:
self._debug(f'\tchange: no change for \"{path}\"')
return node, False
def rec_size(self, node: Union[NodeDir, NodeStorage],
store: bool = True) -> int:
"""
recursively traverse tree and return size
@store: store the size in the node
"""
if node.type == nodes.TYPE_FILE:
self._debug(f'size of {node.type} \"{node.name}\": {node.size}')
return node.size
msg = f'getting node size recursively for \"{node.name}\"'
self._debug(msg)
size: int = 0
for i in node.children:
if node.type == nodes.TYPE_DIR:
sub_size = self.rec_size(i, store=store)
if store:
i.size = sub_size
size += sub_size
continue
if node.type == nodes.TYPE_STORAGE:
sub_size = self.rec_size(i, store=store)
if store:
i.size = sub_size
size += sub_size
continue
self._debug(f'skipping {node.name}')
if store:
node.size = size
self._debug(f'size of {node.type} \"{node.name}\": {size}')
return size
###############################################################
# public helpers
###############################################################
@ -168,7 +195,7 @@ class Noder:
return top
def new_file_node(self, name: str, path: str,
parent: NodeAny) -> Optional[NodeFile]:
parent: NodeAny, storagepath: str) -> Optional[NodeFile]:
"""create a new node representing a file"""
if not os.path.exists(path):
Logger.err(f'File \"{path}\" does not exist')
@ -182,9 +209,11 @@ class Noder:
md5 = ''
if self.hash:
md5 = self._get_hash(path)
relpath = os.sep.join([storagepath, name])
maccess = os.path.getmtime(path)
node = NodeFile(name,
relpath,
stat.st_size,
md5,
maccess,
@ -200,11 +229,13 @@ class Noder:
return node
def new_dir_node(self, name: str, path: str,
parent: NodeAny) -> NodeDir:
parent: NodeAny, storagepath: str) -> NodeDir:
"""create a new node representing a directory"""
path = os.path.abspath(path)
relpath = os.sep.join([storagepath, name])
maccess = os.path.getmtime(path)
return NodeDir(name,
relpath,
0,
maccess,
parent=parent)
@ -227,13 +258,11 @@ class Noder:
self.attrs_to_string(attrs),
parent=parent)
def new_archive_node(self,
name: str,
parent: str,
archive: str) -> NodeArchived:
def new_archive_node(self, name: str, path: str,
parent: str, archive: str) -> NodeArchived:
"""create a new node for archive data"""
return NodeArchived(name=name,
parent=parent, nodesize=0, md5='',
return NodeArchived(name=name, relpath=path,
parent=parent, size=0, md5='',
archive=archive)
###############################################################
@ -266,7 +295,6 @@ class Noder:
"""remove any node not flagged and clean flags"""
cnt = 0
for node in anytree.PreOrderIter(top):
typcast_node(node)
if node.type not in [nodes.TYPE_DIR, nodes.TYPE_FILE]:
continue
if self._clean(node):
@ -284,82 +312,164 @@ class Noder:
###############################################################
# printing
###############################################################
def _print_node_csv(self, node: NodeAny,
sep: str = ',',
raw: bool = False) -> None:
def _node_to_csv(self, node: NodeAny,
sep: str = ',',
raw: bool = False) -> None:
"""
print a node to csv
@node: the node to consider
@sep: CSV separator character
@raw: print raw size rather than human readable
"""
typcast_node(node)
if not node:
return
if node.type == nodes.TYPE_TOP:
return
out = []
if node.type == nodes.TYPE_STORAGE:
self.csv_printer.print_storage(node,
sep=sep,
raw=raw)
# handle storage
out.append(node.name) # name
out.append(node.type) # type
out.append('') # fake full path
size = self.rec_size(node, store=False)
out.append(size_to_str(size, raw=raw)) # size
out.append(epoch_to_str(node.ts)) # indexed_at
out.append('') # fake maccess
out.append('') # fake md5
out.append(str(len(node.children))) # nbfiles
# fake free_space
out.append(size_to_str(node.free, raw=raw))
# fake total_space
out.append(size_to_str(node.total, raw=raw))
out.append(node.attr) # meta
else:
self.csv_printer.print_node(node,
sep=sep,
raw=raw)
# handle other nodes
out.append(node.name.replace('"', '""')) # name
out.append(node.type) # type
parents = self._get_parents(node)
storage = self._get_storage(node)
fullpath = os.path.join(storage.name, parents)
out.append(fullpath.replace('"', '""')) # full path
out.append(size_to_str(node.size, raw=raw)) # size
out.append(epoch_to_str(storage.ts)) # indexed_at
if self._has_attr(node, 'maccess'):
out.append(epoch_to_str(node.maccess)) # maccess
else:
out.append('') # fake maccess
if self._has_attr(node, 'md5'):
out.append(node.md5) # md5
else:
out.append('') # fake md5
if node.type == nodes.TYPE_DIR:
out.append(str(len(node.children))) # nbfiles
else:
out.append('') # fake nbfiles
out.append('') # fake free_space
out.append('') # fake total_space
out.append('') # fake meta
def _print_node_du(self, node: NodeAny,
raw: bool = False) -> None:
"""
print node du style
"""
typcast_node(node)
thenodes = self._get_entire_tree(node,
dironly=True)
for thenode in thenodes:
self.native_printer.print_du(thenode, raw=raw)
line = sep.join(['"' + o + '"' for o in out])
if len(line) > 0:
Logger.stdout_nocolor(line)
def _print_node_native(self, node: NodeAny,
pre: str = '',
withpath: bool = False,
withnbchildren: bool = False,
withdepth: bool = False,
withstorage: bool = False,
recalcparent: bool = False,
raw: bool = False) -> None:
"""
print a node
@node: the node to print
@pre: string to print before node
@withpath: print the node path
@withnbchildren: print the node nb children
@withdepth: print the node depth info
@withstorage: print the node storage it belongs to
@recalcparent: get relpath from tree instead of relpath field
@raw: print raw size rather than human readable
"""
typcast_node(node)
if node.type == nodes.TYPE_TOP:
# top node
self.native_printer.print_top(pre, node.get_name())
Logger.stdout_nocolor(f'{pre}{node.name}')
elif node.type == nodes.TYPE_FILE:
# node of type file
self.native_printer.print_file(pre, node,
withpath=withpath,
withstorage=withstorage,
raw=raw)
name = node.name
if withpath:
if recalcparent:
name = os.sep.join([self._get_parents(node.parent), name])
else:
name = node.relpath
name = name.lstrip(os.sep)
if withstorage:
storage = self._get_storage(node)
attr_str = ''
if node.md5:
attr_str = f', md5:{node.md5}'
size = size_to_str(node.size, raw=raw)
compl = f'size:{size}{attr_str}'
if withstorage:
content = Logger.get_bold_text(storage.name)
compl += f', storage:{content}'
NodePrinter.print_file_native(pre, name, compl)
elif node.type == nodes.TYPE_DIR:
# node of type directory
self.native_printer.print_dir(pre,
node,
withpath=withpath,
withstorage=withstorage,
withnbchildren=withnbchildren,
raw=raw)
name = node.name
if withpath:
if recalcparent:
name = os.sep.join([self._get_parents(node.parent), name])
else:
name = node.relpath
name = name.lstrip(os.sep)
depth = 0
if withdepth:
depth = len(node.children)
if withstorage:
storage = self._get_storage(node)
attr: List[Tuple[str, str]] = []
if node.size:
attr.append(('totsize', size_to_str(node.size, raw=raw)))
if withstorage:
attr.append(('storage', Logger.get_bold_text(storage.name)))
NodePrinter.print_dir_native(pre, name, depth=depth, attr=attr)
elif node.type == nodes.TYPE_STORAGE:
# node of type storage
self.native_printer.print_storage(pre,
node,
raw=raw)
sztotal = size_to_str(node.total, raw=raw)
szused = size_to_str(node.total - node.free, raw=raw)
nbchildren = len(node.children)
pcent = 0
if node.total > 0:
pcent = node.free * 100 / node.total
freepercent = f'{pcent:.1f}%'
# get the date
timestamp = ''
if self._has_attr(node, 'ts'):
timestamp = 'date:'
timestamp += epoch_to_str(node.ts)
disksize = ''
# the children size
recsize = self.rec_size(node, store=False)
sizestr = size_to_str(recsize, raw=raw)
disksize = 'totsize:' + f'{sizestr}'
# format the output
name = node.name
args = [
'nbfiles:' + f'{nbchildren}',
disksize,
f'free:{freepercent}',
'du:' + f'{szused}/{sztotal}',
timestamp]
argsstring = ' | '.join(args)
NodePrinter.print_storage_native(pre,
name,
argsstring,
node.attr)
elif node.type == nodes.TYPE_ARCHIVED:
# archive node
if self.arc:
self.native_printer.print_archive(pre, node.name, node.archive)
NodePrinter.print_archive_native(pre, node.name, node.archive)
else:
Logger.err(f'bad node encountered: {node}')
@ -378,33 +488,28 @@ class Noder:
rend = anytree.RenderTree(node, childiter=self._sort_tree)
for pre, _, thenode in rend:
self._print_node_native(thenode, pre=pre,
withnbchildren=True, raw=raw)
withdepth=True, raw=raw)
elif fmt == 'csv':
# csv output
self._print_nodes_csv(node, raw=raw)
self._to_csv(node, raw=raw)
elif fmt == 'csv-with-header':
# csv output
self.csv_printer.print_header()
self._print_nodes_csv(node, raw=raw)
Logger.stdout_nocolor(self.CSV_HEADER)
self._to_csv(node, raw=raw)
def _print_nodes_csv(self, node: NodeAny,
raw: bool = False) -> None:
def _to_csv(self, node: NodeAny,
raw: bool = False) -> None:
"""print the tree to csv"""
rend = anytree.RenderTree(node, childiter=self._sort_tree)
for _, _, item in rend:
self._print_node_csv(item, raw=raw)
self._node_to_csv(item, raw=raw)
@staticmethod
def _fzf_prompt(strings: Any) -> Any:
"""prompt with fzf"""
try:
from pyfzf.pyfzf import FzfPrompt # pylint: disable=C0415 # noqa
fzf = FzfPrompt()
selected = fzf.prompt(strings)
return selected
except ModuleNotFoundError:
Logger.err('install pyfzf to use fzf')
return None
# prompt with fzf
fzf = FzfPrompt()
selected = fzf.prompt(strings)
return selected
def _to_fzf(self, node: NodeAny, fmt: str) -> None:
"""
@ -418,9 +523,9 @@ class Noder:
for _, _, rend in rendered:
if not rend:
continue
parents = rend.get_fullpath()
storage = rend.get_storage_node()
fullpath = os.path.join(storage.get_name(), parents)
parents = self._get_parents(rend)
storage = self._get_storage(rend)
fullpath = os.path.join(storage.name, parents)
the_nodes[fullpath] = rend
# prompt with fzf
paths = self._fzf_prompt(the_nodes.keys())
@ -445,13 +550,14 @@ class Noder:
###############################################################
# searching
###############################################################
def find(self, top: NodeTop,
key: str,
script: bool = False,
only_dir: bool = False,
startnode: Optional[NodeAny] = None,
fmt: str = 'native',
raw: bool = False) -> List[NodeAny]:
def find_name(self, top: NodeTop,
key: str,
script: bool = False,
only_dir: bool = False,
startnode: Optional[NodeAny] = None,
parentfromtree: bool = False,
fmt: str = 'native',
raw: bool = False) -> List[NodeAny]:
"""
find files based on their names
@top: top node
@ -459,6 +565,7 @@ class Noder:
@script: output script
@directory: only search for directories
@startpath: node to start with
@parentfromtree: get path from parent instead of stored relpath
@fmt: output format
@raw: raw size output
returns the found nodes
@ -471,15 +578,19 @@ class Noder:
start = self.get_node(top, startnode)
filterfunc = self._callback_find_name(key, only_dir)
found = anytree.findall(start, filter_=filterfunc)
self._debug(f'found {len(found)} node(s)')
nbfound = len(found)
self._debug(f'found {nbfound} node(s)')
# compile found nodes
paths = {}
for item in found:
typcast_node(item)
item.set_name(item.get_name())
key = item.get_fullpath()
paths[key] = item
item.name = fix_badchars(item.name)
if hasattr(item, 'relpath'):
item.relpath = fix_badchars(item.relpath)
if parentfromtree:
paths[self._get_parents(item)] = item
else:
paths[item.relpath] = item
# handle fzf mode
if fmt.startswith('fzf'):
@ -495,16 +606,16 @@ class Noder:
else:
if fmt == 'native':
for _, item in paths.items():
self._print_node_native(item,
withpath=True,
withnbchildren=True,
self._print_node_native(item, withpath=True,
withdepth=True,
withstorage=True,
recalcparent=parentfromtree,
raw=raw)
elif fmt.startswith('csv'):
if fmt == 'csv-with-header':
self.csv_printer.print_header()
Logger.stdout_nocolor(self.CSV_HEADER)
for _, item in paths.items():
self._print_node_csv(item, raw=raw)
self._node_to_csv(item, raw=raw)
# execute script if any
if script:
@ -518,8 +629,6 @@ class Noder:
def _callback_find_name(self, term: str, only_dir: bool) -> Any:
"""callback for finding files"""
def find_name(node: NodeAny) -> bool:
typcast_node(node)
path = node.get_fullpath()
if node.type == nodes.TYPE_STORAGE:
# ignore storage nodes
return False
@ -536,28 +645,13 @@ class Noder:
# filter
if not term:
return True
if term in path:
return True
if self.debug:
Logger.debug(f'match \"{path}\" with \"{term}\"')
if fnmatch.fnmatch(path, term):
if term.lower() in node.name.lower():
return True
# ignore
return False
return find_name
###############################################################
# fixsizes
###############################################################
def fixsizes(self, top: NodeTop) -> None:
"""fix node sizes"""
typcast_node(top)
rend = anytree.RenderTree(top)
for _, _, thenode in rend:
typcast_node(thenode)
thenode.nodesize = thenode.get_rec_size()
###############################################################
# ls
###############################################################
@ -574,27 +668,13 @@ class Noder:
@fmt: output format
@raw: print raw size
"""
self._debug(f'ls walking path: \"{path}\" from \"{top.get_name()}\"')
self._debug(f'walking path: \"{path}\" from {top}')
resolv = anytree.resolver.Resolver('name')
found = []
try:
if '*' in path or '?' in path:
# we need to handle glob
self._debug('glob ls...')
found = resolv.glob(top, path)
else:
# we have a canonical path
self._debug('get ls...')
foundone = resolv.get(top, path)
cast(NodeAny, foundone)
typcast_node(foundone)
if foundone and foundone.may_have_children():
# let's find its children as well
modpath = os.path.join(path, '*')
found = resolv.glob(top, modpath)
else:
found = [foundone]
# resolve the path in the tree
found = resolv.glob(top, path)
if len(found) < 1:
# nothing found
self._debug('nothing found')
@ -606,19 +686,30 @@ class Noder:
return found
# sort found nodes
found = sorted(found, key=os_sort_keygen(self._sort))
found = sorted(found, key=self._sort, reverse=self.sortsize)
# print the parent
if fmt == 'native':
self._print_node_native(found[0].parent,
withpath=False,
withdepth=True,
raw=raw)
elif fmt.startswith('csv'):
self._node_to_csv(found[0].parent, raw=raw)
elif fmt.startswith('fzf'):
pass
# print all found nodes
if fmt == 'csv-with-header':
self.csv_printer.print_header()
Logger.stdout_nocolor(self.CSV_HEADER)
for item in found:
if fmt == 'native':
self._print_node_native(item,
withpath=True,
withnbchildren=True,
self._print_node_native(item, withpath=False,
pre='- ',
withdepth=True,
raw=raw)
elif fmt.startswith('csv'):
self._print_node_csv(item, raw=raw)
self._node_to_csv(item, raw=raw)
elif fmt.startswith('fzf'):
self._to_fzf(item, fmt)
@ -626,31 +717,6 @@ class Noder:
pass
return found
###############################################################
# du
###############################################################
def diskusage(self, top: NodeTop,
path: str,
raw: bool = False) -> List[NodeAny]:
"""disk usage"""
self._debug(f'du walking path: \"{path}\" from \"{top.get_name()}\"')
resolv = anytree.resolver.Resolver('name')
found: NodeAny
try:
# we have a canonical path
self._debug('get du...')
found = resolv.get(top, path)
if not found:
# nothing found
self._debug('nothing found')
return []
self._debug(f'du found: {found}')
self._print_node_du(found, raw=raw)
except anytree.resolver.ChildResolverError:
pass
return found
###############################################################
# tree creation
###############################################################
@ -660,15 +726,15 @@ class Noder:
"""add an entry to the tree"""
entries = name.rstrip(os.sep).split(os.sep)
if len(entries) == 1:
self.new_archive_node(name, top, top.get_name())
self.new_archive_node(name, name, top, top.name)
return
sub = os.sep.join(entries[:-1])
nodename = entries[-1]
try:
parent = resolv.get(top, sub)
parent = self.new_archive_node(nodename, parent, top.get_name())
parent = self.new_archive_node(nodename, name, parent, top.name)
except anytree.resolver.ChildResolverError:
self.new_archive_node(nodename, top, top.get_name())
self.new_archive_node(nodename, name, top, top.name)
def list_to_tree(self, parent: NodeAny, names: List[str]) -> None:
"""convert list of files to a tree"""
@ -682,23 +748,6 @@ class Noder:
###############################################################
# diverse
###############################################################
def _get_entire_tree(self, start: NodeAny,
dironly: bool = False) -> List[NodeAny]:
"""
get entire tree and sort it
"""
typcast_node(start)
rend = anytree.RenderTree(start)
thenodes = []
if dironly:
for _, _, thenode in rend:
typcast_node(thenode)
if thenode.type == nodes.TYPE_DIR:
thenodes.append(thenode)
else:
thenodes = [x for _, _, x in rend]
return sorted(thenodes, key=os_sort_keygen(self._sort))
def _sort_tree(self,
items: List[NodeAny]) -> List[NodeAny]:
"""sorting a list of items"""
@ -711,21 +760,42 @@ class Noder:
return self._sort_fs(lst)
@staticmethod
def _sort_fs(node: NodeAny) -> str:
"""sort by name"""
# to sort by types then name
return str(node.name)
def _sort_fs(node: NodeAny) -> Tuple[str, str]:
"""sorting nodes dir first and alpha"""
return (node.type, node.name.lstrip('.').lower())
@staticmethod
def _sort_size(node: NodeAny) -> float:
"""sorting nodes by size"""
try:
if not node.nodesize:
if not node.size:
return 0
return float(node.nodesize)
return float(node.size)
except AttributeError:
return 0
def _get_storage(self, node: NodeAny) -> NodeStorage:
"""recursively traverse up to find storage"""
if node.type == nodes.TYPE_STORAGE:
return node
return cast(NodeStorage, node.ancestors[1])
@staticmethod
def _has_attr(node: NodeAny, attr: str) -> bool:
"""return True if node has attr as attribute"""
return attr in node.__dict__.keys()
def _get_parents(self, node: NodeAny) -> str:
"""get all parents recursively"""
if node.type == nodes.TYPE_STORAGE:
return ''
if node.type == nodes.TYPE_TOP:
return ''
parent = self._get_parents(node.parent)
if parent:
return os.sep.join([parent, node.name])
return str(node.name)
@staticmethod
def _get_hash(path: str) -> str:
"""return md5 hash of node"""

@ -6,12 +6,8 @@ Class that represents a node in the catalog tree
"""
# pylint: disable=W0622
import os
from typing import Dict, Any, cast
from anytree import NodeMixin
from catcli.exceptions import CatcliException
from catcli.utils import fix_badchars
from typing import Dict, Any
from anytree import NodeMixin # type: ignore
TYPE_TOP = 'top'
@ -25,88 +21,29 @@ NAME_TOP = 'top'
NAME_META = 'meta'
def typcast_node(node: Any) -> None:
"""typecast node to its sub type"""
if node.type == TYPE_TOP:
node.__class__ = NodeTop
elif node.type == TYPE_FILE:
node.__class__ = NodeFile
elif node.type == TYPE_DIR:
node.__class__ = NodeDir
elif node.type == TYPE_ARCHIVED:
node.__class__ = NodeArchived
elif node.type == TYPE_STORAGE:
node.__class__ = NodeStorage
elif node.type == TYPE_META:
node.__class__ = NodeMeta
else:
raise CatcliException(f"bad node: {node}")
class NodeAny(NodeMixin): # type: ignore
"""generic node"""
def __init__(self, # type: ignore[no-untyped-def]
name=None,
size=0,
parent=None,
children=None):
"""build generic node"""
super().__init__()
self.name = name
self.nodesize = size
self.parent = parent
if children:
self.children = children
def get_name(self) -> str:
"""get node name"""
return fix_badchars(self.name)
def set_name(self, name: str) -> None:
"""set node name"""
self.name = fix_badchars(name)
def has_attr(self, attr: str) -> bool:
"""return True if node has attr as attribute"""
return attr in self.__dict__
def may_have_children(self) -> bool:
"""can node contains sub"""
raise NotImplementedError
def _to_str(self) -> str:
ret = str(self.__class__) + ": " + str(self.__dict__)
if self.children:
ret += '\n'
for child in self.children:
ret += f' child => {child}\n'
ret += ' child => ' + str(child)
return ret
def __str__(self) -> str:
return self._to_str()
def get_fullpath(self) -> str:
"""return full path to this node"""
path = self.get_name()
if self.parent:
typcast_node(self.parent)
ppath = self.parent.get_fullpath()
path = os.path.join(ppath, path)
return fix_badchars(path)
def get_rec_size(self) -> int:
"""recursively traverse tree and return size"""
totsize: int = self.nodesize
for node in self.children:
typcast_node(node)
totsize += node.get_rec_size()
return totsize
def get_storage_node(self) -> NodeMixin:
"""recursively traverse up to find storage"""
return None
def flagged(self) -> bool:
"""is flagged"""
if not hasattr(self, '_flagged'):
@ -137,23 +74,6 @@ class NodeTop(NodeAny):
if children:
self.children = children
def get_fullpath(self) -> str:
"""return full path to this node"""
return ''
def may_have_children(self) -> bool:
"""can node contains sub"""
return True
def get_rec_size(self) -> int:
"""
recursively traverse tree and return size
also ensure to update the size on the way
"""
size = super().get_rec_size()
self.nodesize = size
return size
def __str__(self) -> str:
return self._to_str()
@ -163,7 +83,8 @@ class NodeFile(NodeAny):
def __init__(self, # type: ignore[no-untyped-def]
name: str,
nodesize: int,
relpath: str,
size: int,
md5: str,
maccess: float,
parent=None,
@ -172,21 +93,14 @@ class NodeFile(NodeAny):
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_FILE
self.nodesize = nodesize
self.relpath = relpath
self.size = size
self.md5 = md5
self.maccess = maccess
self.parent = parent
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return False
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return cast(NodeStorage, self.ancestors[1])
def __str__(self) -> str:
return self._to_str()
@ -196,7 +110,8 @@ class NodeDir(NodeAny):
def __init__(self, # type: ignore[no-untyped-def]
name: str,
nodesize: int,
relpath: str,
size: int,
maccess: float,
parent=None,
children=None):
@ -204,29 +119,13 @@ class NodeDir(NodeAny):
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_DIR
self.nodesize = nodesize
self.relpath = relpath
self.size = size
self.maccess = maccess
self.parent = parent
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return True
def get_rec_size(self) -> int:
"""
recursively traverse tree and return size
also ensure to update the size on the way
"""
size = super().get_rec_size()
self.nodesize = size
return size
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return cast(NodeStorage, self.ancestors[1])
def __str__(self) -> str:
return self._to_str()
@ -236,7 +135,8 @@ class NodeArchived(NodeAny):
def __init__(self, # type: ignore[no-untyped-def]
name: str,
nodesize: int,
relpath: str,
size: int,
md5: str,
archive: str,
parent=None,
@ -245,21 +145,14 @@ class NodeArchived(NodeAny):
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_ARCHIVED
self.nodesize = nodesize
self.relpath = relpath
self.size = size
self.md5 = md5
self.archive = archive
self.parent = parent
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return False
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return cast(NodeStorage, self.ancestors[1])
def __str__(self) -> str:
return self._to_str()
@ -271,7 +164,7 @@ class NodeStorage(NodeAny):
name: str,
free: int,
total: int,
nodesize: int,
size: int,
ts: float,
attr: str,
parent=None,
@ -283,29 +176,12 @@ class NodeStorage(NodeAny):
self.free = free
self.total = total
self.attr = attr
self.nodesize = nodesize
self.size = size
self.ts = ts # pylint: disable=C0103
self.parent = parent
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return True
def get_rec_size(self) -> int:
"""
recursively traverse tree and return size
also ensure to update the size on the way
"""
size = super().get_rec_size()
self.nodesize = size
return size
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return self
def __str__(self) -> str:
return self._to_str()
@ -327,13 +203,5 @@ class NodeMeta(NodeAny):
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return False
def get_rec_size(self) -> int:
"""recursively traverse tree and return size"""
return 0
def __str__(self) -> str:
return self._to_str()

@ -1,39 +0,0 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2024, deadc0de6
nodes helpers
"""
import os
# local imports
from catcli import nodes
def path_to_top(path: str) -> str:
"""path pivot under top"""
pre = f"{os.path.sep}{nodes.NAME_TOP}"
if not path.startswith(pre):
# prepend with top node path
path = pre + path
return path
def path_to_search_all(path: str) -> str:
"""path to search for all subs"""
if not path:
path = os.path.sep
if not path.startswith(os.path.sep):
path = os.path.sep + path
pre = f"{os.path.sep}{nodes.NAME_TOP}"
if not path.startswith(pre):
# prepend with top node path
path = pre + path
# if not path.endswith(os.path.sep):
# # ensure ends with a separator
# path += os.path.sep
# if not path.endswith(WILD):
# # add wild card
# path += WILD
return path

@ -1,81 +0,0 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2024, deadc0de6
Class for printing nodes in csv format
"""
import sys
from typing import List
from catcli.nodes import NodeAny, NodeStorage, TYPE_DIR
from catcli.utils import size_to_str, epoch_to_str
class CsvPrinter:
"""a node printer class"""
DEFSEP = ','
CSV_HEADER = ('name,type,path,size,indexed_at,'
'maccess,md5,nbfiles,free_space,'
'total_space,meta')
def _print_entries(self, entries: List[str], sep: str = DEFSEP) -> None:
line = sep.join(['"' + o + '"' for o in entries])
if len(line) > 0:
sys.stdout.write(f'{line}\n')
def print_header(self) -> None:
"""print csv header"""
sys.stdout.write(f'{self.CSV_HEADER}\n')
def print_storage(self, node: NodeStorage,
sep: str = DEFSEP,
raw: bool = False) -> None:
"""print a storage node"""
out = []
out.append(node.get_name()) # name
out.append(node.type) # type
out.append('') # fake full path
size = node.get_rec_size()
out.append(size_to_str(size, raw=raw)) # size
out.append(epoch_to_str(node.ts)) # indexed_at
out.append('') # fake maccess
out.append('') # fake md5
out.append(str(len(node.children))) # nbfiles
# fake free_space
out.append(size_to_str(node.free, raw=raw))
# fake total_space
out.append(size_to_str(node.total, raw=raw))
out.append(node.attr) # meta
self._print_entries(out, sep=sep)
def print_node(self, node: NodeAny,
sep: str = DEFSEP,
raw: bool = False) -> None:
"""print other nodes"""
out = []
out.append(node.get_name().replace('"', '""')) # name
out.append(node.type) # type
fullpath = node.get_fullpath()
out.append(fullpath.replace('"', '""')) # full path
out.append(size_to_str(node.nodesize, raw=raw)) # size
storage = node.get_storage_node()
out.append(epoch_to_str(storage.ts)) # indexed_at
if node.has_attr('maccess'):
out.append(epoch_to_str(node.maccess)) # maccess
else:
out.append('') # fake maccess
if node.has_attr('md5'):
out.append(node.md5) # md5
else:
out.append('') # fake md5
if node.type == TYPE_DIR:
out.append(str(len(node.children))) # nbfiles
else:
out.append('') # fake nbfiles
out.append('') # fake free_space
out.append('') # fake total_space
out.append('') # fake meta
self._print_entries(out, sep=sep)

@ -1,165 +0,0 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
Class for printing nodes in native format
"""
import sys
from catcli.nodes import NodeFile, NodeDir, \
NodeStorage, NodeAny, typcast_node
from catcli.colors import Colors
from catcli.logger import Logger
from catcli.utils import fix_badchars, size_to_str, \
epoch_to_str
COLOR_STORAGE = Colors.YELLOW
COLOR_FILE = Colors.WHITE
COLOR_DIRECTORY = Colors.BLUE
COLOR_ARCHIVE = Colors.PURPLE
COLOR_TS = Colors.CYAN
COLOR_SIZE = Colors.GREEN
FULLPATH_IN_NAME = True
class NativePrinter:
"""a node printer class"""
STORAGE = 'storage'
ARCHIVE = 'archive'
NBFILES = 'nbfiles'
def print_du(self, node: NodeAny,
raw: bool = False) -> None:
"""print du style"""
typcast_node(node)
name = node.get_fullpath()
size = node.nodesize
line = size_to_str(size, raw=raw).ljust(10, ' ')
out = f'{COLOR_SIZE}{line}{Colors.RESET}'
out += ' '
out += f'{COLOR_FILE}{name}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
def print_top(self, pre: str, name: str) -> None:
"""print top node"""
sys.stdout.write(f'{pre}{name}\n')
def print_storage(self, pre: str,
node: NodeStorage,
raw: bool = False) -> None:
"""print a storage node"""
# construct name
name = node.get_name()
# construct attrs
attrs = []
# nb files
attrs.append(f'nbfiles:{len(node.children)}')
# the children size
recsize = node.get_rec_size()
sizestr = size_to_str(recsize, raw=raw)
attrs.append(f'totsize:{sizestr}')
# free
pcent = 0.0
if node.total > 0:
pcent = node.free * 100 / node.total
attrs.append(f'free:{pcent:.1f}%')
# du
sztotal = size_to_str(node.total, raw=raw)
szused = size_to_str(node.total - node.free, raw=raw)
attrs.append(f'du:{szused}/{sztotal}')
# timestamp
if node.has_attr('ts'):
attrs.append(f'date:{epoch_to_str(node.ts)}')
# print
out = f'{pre}{Colors.UND}{self.STORAGE}{Colors.RESET}: '
out += f'{COLOR_STORAGE}{name}{Colors.RESET}'
if attrs:
out += f' [{Colors.WHITE}{"|".join(attrs)}{Colors.RESET}]'
sys.stdout.write(f'{out}\n')
def print_file(self, pre: str,
node: NodeFile,
withpath: bool = False,
withstorage: bool = False,
raw: bool = False) -> None:
"""print a file node"""
# construct name
name = node.get_name()
storage = node.get_storage_node()
if withpath:
name = node.get_fullpath()
# construct attributes
attrs = []
if node.md5:
attrs.append(f'md5:{node.md5}')
if withstorage:
content = Logger.get_bold_text(storage.get_name())
attrs.append(f'storage:{content}')
# print
out = []
out.append(f'{pre}')
out.append(f'{COLOR_FILE}{name}{Colors.RESET}')
size = 0
if node.nodesize:
size = node.nodesize
line = size_to_str(size, raw=raw)
out.append(f'{COLOR_SIZE}{line}{Colors.RESET}')
if node.has_attr('maccess'):
line = epoch_to_str(node.maccess)
out.append(f'{COLOR_TS}{line}{Colors.RESET}')
if attrs:
out.append(f'{Colors.GRAY}[{",".join(attrs)}]{Colors.RESET}')
out = [x for x in out if x]
sys.stdout.write(f'{" ".join(out)}\n')
def print_dir(self, pre: str,
node: NodeDir,
withpath: bool = False,
withstorage: bool = False,
withnbchildren: bool = False,
raw: bool = False) -> None:
"""print a directory node"""
# construct name
name = node.get_name()
storage = node.get_storage_node()
if withpath:
name = node.get_fullpath()
# construct attrs
attrs = []
if withnbchildren:
nbchildren = len(node.children)
attrs.append(f'{self.NBFILES}:{nbchildren}')
if withstorage:
attrs.append(f"storage:{Logger.get_bold_text(storage.get_name())}")
# print
out = []
out.append(f'{pre}')
out.append(f'{COLOR_DIRECTORY}{name}{Colors.RESET}')
size = 0
if node.nodesize:
size = node.nodesize
line = size_to_str(size, raw=raw)
out.append(f'{COLOR_SIZE}{line}{Colors.RESET}')
if node.has_attr('maccess'):
line = epoch_to_str(node.maccess)
out.append(f'{COLOR_TS}{line}{Colors.RESET}')
if attrs:
out.append(f'{Colors.GRAY}[{",".join(attrs)}]{Colors.RESET}')
out = [x for x in out if x]
sys.stdout.write(f'{" ".join(out)}\n')
def print_archive(self, pre: str,
name: str, archive: str) -> None:
"""print an archive"""
name = fix_badchars(name)
out = f'{pre}{COLOR_ARCHIVE}{name}{Colors.RESET} '
out += f'{Colors.GRAY}[{self.ARCHIVE}:{archive}]{Colors.RESET}'
sys.stdout.write(f'{out}\n')

@ -10,15 +10,44 @@ import hashlib
import tempfile
import subprocess
import datetime
import string
# local imports
from catcli import nodes
from catcli.exceptions import CatcliException
SEPARATOR = '/'
WILD = '*'
def path_to_top(path: str) -> str:
"""path pivot under top"""
pre = f'{SEPARATOR}{nodes.NAME_TOP}'
if not path.startswith(pre):
# prepend with top node path
path = pre + path
return path
def path_to_search_all(path: str) -> str:
"""path to search for all subs"""
if not path:
path = SEPARATOR
if not path.startswith(SEPARATOR):
path = SEPARATOR + path
pre = f'{SEPARATOR}{nodes.NAME_TOP}'
if not path.startswith(pre):
# prepend with top node path
path = pre + path
if not path.endswith(SEPARATOR):
# ensure ends with a separator
path += SEPARATOR
if not path.endswith(WILD):
# add wild card
path += WILD
return path
def md5sum(path: str) -> str:
"""
calculate md5 sum of a file
@ -73,20 +102,19 @@ def ask(question: str) -> bool:
return resp.lower() == 'y'
def edit(data: str) -> str:
def edit(string: str) -> str:
"""edit the information with the default EDITOR"""
content = fix_badchars(data)
data = string.encode('utf-8')
editor = os.environ.get('EDITOR', 'vim')
with tempfile.NamedTemporaryFile(prefix='catcli', suffix='.tmp') as file:
file.write(content.encode('utf-8'))
file.write(data)
file.flush()
subprocess.call([editor, file.get_name()])
subprocess.call([editor, file.name])
file.seek(0)
new = file.read()
return new.decode('utf-8')
def fix_badchars(data: str) -> str:
def fix_badchars(string: str) -> str:
"""fix none utf-8 chars in string"""
data = "".join(x for x in data if x in string.printable)
return data.encode("utf-8", "ignore").decode("utf-8")
return string.encode('utf-8', 'ignore').decode('utf-8')

@ -3,4 +3,4 @@ author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
"""
__version__ = '1.0'
__version__ = '0.9.2'

@ -35,8 +35,7 @@ class Walker:
self.debug = debug
self.lpath = logpath
def index(self,
path: str,
def index(self, path: str,
parent: NodeAny,
name: str,
storagepath: str = '') -> Tuple[str, int]:
@ -48,10 +47,8 @@ class Walker:
"""
self._debug(f'indexing starting at {path}')
if not parent:
# create the parent
parent = self.noder.new_dir_node(name,
path,
parent)
parent = self.noder.new_dir_node(name, path,
parent, storagepath)
if os.path.islink(path):
rel = os.readlink(path)
@ -68,9 +65,8 @@ class Walker:
continue
self._progress(file)
self._debug(f'index file {sub}')
node = self.noder.new_file_node(os.path.basename(file),
sub,
parent)
node = self.noder.new_file_node(os.path.basename(file), sub,
parent, storagepath)
if node:
cnt += 1
for adir in dirs:
@ -80,7 +76,7 @@ class Walker:
self._debug(f'index directory {sub}')
if not os.path.exists(sub):
continue
dummy = self.noder.new_dir_node(base, sub, parent)
dummy = self.noder.new_dir_node(base, sub, parent, storagepath)
if not dummy:
continue
cnt += 1
@ -122,9 +118,8 @@ class Walker:
if node:
node.flag()
continue
node = self.noder.new_file_node(os.path.basename(file),
sub,
parent)
node = self.noder.new_file_node(os.path.basename(file), sub,
parent, storagepath)
if node:
node.flag()
cnt += 1
@ -136,7 +131,7 @@ class Walker:
reindex, dummy = self._need_reindex(parent, sub, treepath)
if reindex:
dummy = self.noder.new_dir_node(base, sub,
parent)
parent, storagepath)
cnt += 1
if dummy:
dummy.flag()
@ -171,7 +166,7 @@ class Walker:
if node and changed:
# remove this node and re-add
self._debug(f'\t{path} has changed')
self._debug(f"\tremoving node {node.get_name()} for {path}")
self._debug(f'\tremoving node {node.name} for {path}')
node.parent = None
return True, node

@ -3,6 +3,3 @@ types-docopt; python_version >= '3.0'
anytree; python_version >= '3.0'
pyfzf; python_version >= '3.0'
fusepy; python_version >= '3.0'
natsort; python_version >= '3.0'
cmd2; python_version >= '3.0'
gnureadline; python_version >= '3.0'

@ -37,19 +37,19 @@ setup(
python_requires=REQUIRES_PYTHON,
classifiers=[
'Development Status :: 5 - Production/Stable',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'License :: OSI Approved :: GNU General Public License v3 (GPLv3)',
],
keywords='catalog commandline indexer offline',
packages=find_packages(exclude=['tests*']),
install_requires=['docopt', 'types-docopt', 'anytree',
'pyfzf', 'fusepy', 'natsort', 'cmd2',
'gnureadline'],
install_requires=['docopt', 'anytree',
'types-docopt', 'pyfzf',
'fusepy'],
extras_require={
'dev': ['check-manifest'],

@ -1,22 +0,0 @@
#!/usr/bin/env bash
# run me from the root of the package
source tests-ng/helper
rm -f tests-ng/assets/github.catalog.json
python3 -m catcli.catcli index -c github .github --catalog=tests-ng/assets/github.catalog.json
# edit catalog
clean_catalog "tests-ng/assets/github.catalog.json"
# native
python3 -m catcli.catcli ls -r -s -B --catalog=tests-ng/assets/github.catalog.json | \
sed -e 's/free:.*%/free:0.0%/g' \
-e 's/....-..-.. ..:..:../2023-03-09 16:20:59/g' \
-e 's#du:[^|]* |#du:0/0 |#g' > tests-ng/assets/github.catalog.native.txt
# csv
python3 -m catcli.catcli ls -r -s -B --catalog=tests-ng/assets/github.catalog.json --format=csv | \
sed -e 's/"3","[^"]*","[^"]*",""/"3","0","0",""/g' | \
sed 's/20..-..-.. ..:..:..//g' > tests-ng/assets/github.catalog.csv.txt

@ -1,6 +1,5 @@
"github","storage","","4865","","","","3","0","0",""
"FUNDING.yml","file","github/FUNDING.yml","17","","","0c6407a84d412c514007313fb3bca4de","","","",""
"codecov.yml","file","github/codecov.yml","104","","","4203204f75b43cd4bf032402beb3359d","","","",""
"workflows","dir","github/workflows","3082","","","","2","","",""
"pypi-release.yml","file","github/workflows/pypi-release.yml","691","","","57699a7a6a03e20e864f220e19f8e197","","","",""
"testing.yml","file","github/workflows/testing.yml","850","","","691df1a4d2f254b5cd04c152e7c6ccaf","","","",""
"github","storage","","1510","2023-03-09 16:20:59","","","2","0","0",""
"workflows","dir","github/workflows","1493","2023-03-09 16:20:59","2023-03-09 16:20:44","","2","","",""
"pypi-release.yml","file","github/workflows/pypi-release.yml","691","2023-03-09 16:20:59","2022-10-19 21:00:37","57699a7a6a03e20e864f220e19f8e197","","","",""
"testing.yml","file","github/workflows/testing.yml","802","2023-03-09 16:20:59","2023-03-09 16:20:44","7144a119ef43adb634654522c12ec250","","","",""
"FUNDING.yml","file","github/FUNDING.yml","17","2023-03-09 16:20:59","2022-10-19 21:00:37","0c6407a84d412c514007313fb3bca4de","","","",""

@ -7,59 +7,54 @@
"maccess": 1666206037.0786593,
"md5": "0c6407a84d412c514007313fb3bca4de",
"name": "FUNDING.yml",
"relpath": "/FUNDING.yml",
"size": 17,
"type": "file"
},
{
"maccess": 1704320710.7056112,
"md5": "4203204f75b43cd4bf032402beb3359d",
"name": "codecov.yml",
"size": 104,
"type": "file"
},
{
"children": [
{
"maccess": 1666206037.078865,
"md5": "57699a7a6a03e20e864f220e19f8e197",
"name": "pypi-release.yml",
"relpath": "workflows/pypi-release.yml",
"size": 691,
"type": "file"
},
{
"maccess": 1704403569.24789,
"md5": "691df1a4d2f254b5cd04c152e7c6ccaf",
"maccess": 1678375244.4870229,
"md5": "7144a119ef43adb634654522c12ec250",
"name": "testing.yml",
"size": 850,
"relpath": "workflows/testing.yml",
"size": 802,
"type": "file"
}
],
"maccess": 1704320727.2641916,
"maccess": 1678375244.4865956,
"name": "workflows",
"size": 1541,
"relpath": "/workflows",
"size": 1493,
"type": "dir"
}
],
"free": 0,
"name": "github",
"size": 1662,
"size": 1510,
"total": 0,
"ts": 1704923096,
"ts": 1678375259,
"type": "storage"
},
{
"attr": {
"access": 1704923096,
"access_version": "0.9.6",
"created": 1704923096,
"created_version": "0.9.6"
"access": 1678375259,
"access_version": "0.8.7",
"created": 1678375259,
"created_version": "0.8.7"
},
"name": "meta",
"size": null,
"type": "meta"
}
],
"name": "top",
"size": null,
"type": "top"
}

@ -1,7 +1,7 @@
top
└── storage: github [nbfiles:3|totsize:4865|free:0.0%|du:0/0|date:2023-03-09 16:20:59]
├── FUNDING.yml 17 2023-03-09 16:20:59 [md5:0c6407a84d412c514007313fb3bca4de]
├── codecov.yml 104 2023-03-09 16:20:59 [md5:4203204f75b43cd4bf032402beb3359d]
└── workflows 3082 2023-03-09 16:20:59 [nbfiles:2]
├── pypi-release.yml 691 2023-03-09 16:20:59 [md5:57699a7a6a03e20e864f220e19f8e197]
└── testing.yml 850 2023-03-09 16:20:59 [md5:691df1a4d2f254b5cd04c152e7c6ccaf]
└── storage: github
nbfiles:2 | totsize:1510 | free:0.0% | du:0/0 | date:2023-03-09 16:20:59
├── workflows [nbfiles:2, totsize:1493]
│ ├── pypi-release.yml [size:691, md5:57699a7a6a03e20e864f220e19f8e197]
│ └── testing.yml [size:802, md5:7144a119ef43adb634654522c12ec250]
└── FUNDING.yml [size:17, md5:0c6407a84d412c514007313fb3bca4de]

@ -2,16 +2,30 @@
# author: deadc0de6 (https://github.com/deadc0de6)
# Copyright (c) 2023, deadc0de6
# exit on first error
set -e
cur=$(cd "$(dirname "${0}")" && pwd)
# get current path
rl="readlink -f"
if ! ${rl} "${0}" >/dev/null 2>&1; then
rl="realpath"
if ! command -v ${rl}; then
echo "\"${rl}\" not found !" && exit 1
fi
fi
cur=$(dirname "$(${rl} "${0}")")
# pivot
prev="${cur}/.."
cd "${prev}"
# coverage
#export PYTHONPATH=".:${PYTHONPATH}"
bin="python3 -m catcli.catcli"
if command -v coverage 2>/dev/null; then
mkdir -p coverages/
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
bin="coverage run -p --source=catcli -m catcli.catcli"
#bin="coverage run -p --source=${prev}/catcli -m catcli.catcli"
fi
echo "current dir: $(pwd)"
@ -36,7 +50,6 @@ catalog="${tmpd}/catalog"
# index
${bin} -B index -c --catalog="${catalog}" github .github
clean_catalog "${catalog}"
ls -laR .github
cat "${catalog}"
@ -102,7 +115,7 @@ native="${tmpd}/native.txt"
${bin} -B ls -s -r --format=native --catalog="${catalog}" > "${native}"
mod="${tmpd}/native.mod.txt"
cat "${native}" | sed -e 's/free:.*%/free:0.0%/g' \
-e 's/....-..-.. ..:..:../2023-03-09 16:20:59/g' \
-e 's/date:....-..-.. ..:..:../date:2023-03-09 16:20:59/g' \
-e 's#du:[^|]* |#du:0/0 |#g' > "${mod}"
if command -v delta >/dev/null; then
delta -s "tests-ng/assets/github.catalog.native.txt" "${mod}"
@ -116,14 +129,13 @@ csv="${tmpd}/csv.txt"
${bin} -B ls -s -r --format=csv --catalog="${catalog}" > "${csv}"
# modify created csv
mod="${tmpd}/csv.mod.txt"
cat "${csv}" | \
sed -e 's/"3","[^"]*","[^"]*",""/"3","0","0",""/g' | \
cat "${csv}" | sed -e 's/"2","[^"]*","[^"]*",""/"2","0","0",""/g' | \
sed 's/20..-..-.. ..:..:..//g' > "${mod}"
# modify original
ori="${tmpd}/ori.mod.txt"
cat "tests-ng/assets/github.catalog.csv.txt" | \
sed 's/....-..-.. ..:..:..//g' | \
sed 's/"3","[^"]*","[^"]*",""/"3","0","0",""/g' > "${ori}"
sed 's/"2","[^"]*","[^"]*",""/"2","0","0",""/g' > "${ori}"
if command -v delta >/dev/null; then
delta -s "${ori}" "${mod}"
fi

@ -1,61 +0,0 @@
#!/usr/bin/env bash
# author: deadc0de6 (https://github.com/deadc0de6)
# Copyright (c) 2023, deadc0de6
set -e
cur=$(cd "$(dirname "${0}")" && pwd)
prev="${cur}/.."
cd "${prev}"
# coverage
bin="python3 -m catcli.catcli"
if command -v coverage 2>/dev/null; then
mkdir -p coverages/
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
fi
echo "current dir: $(pwd)"
echo "pythonpath: ${PYTHONPATH}"
echo "bin: ${bin}"
${bin} --version
# get the helpers
# shellcheck source=tests-ng/helper
source "${cur}"/helper
echo -e "$(tput setaf 6)==> RUNNING $(basename "${BASH_SOURCE[0]}") <==$(tput sgr0)"
##########################################################
# the test
##########################################################
# create temp dirs
tmpd=$(mktemp -d)
clear_on_exit "${tmpd}"
catalog="${tmpd}/catalog"
# index
${bin} -B index -c -f --catalog="${catalog}" github1 .github
${bin} -B index -c -f --catalog="${catalog}" github2 .github
clean_catalog "${catalog}"
#cat "${catalog}"
echo ""
${bin} -B ls -r --catalog="${catalog}"
echo "finding \"testing.yml\""
${bin} -B find --catalog="${catalog}" testing.yml
cnt=$(${bin} -B find --catalog="${catalog}" testing.yml | wc -l)
[ "${cnt}" != "2" ] && echo "should return 2 (not ${cnt})" && exit 1
echo "finding \"*.yml\""
${bin} -B find --catalog="${catalog}" '*.yml'
cnt=$(${bin} -B find --catalog="${catalog}" '*.yml' | wc -l)
[ "${cnt}" != "8" ] && echo "should return 8 (not ${cnt})" && exit 1
# the end
echo ""
echo "test \"$(basename "$0")\" success"
cd "${cur}"
exit 0

@ -21,14 +21,6 @@ clear_on_exit()
fi
}
# clear catalog stuff for testing
# $1: catalog path
clean_catalog()
{
sed -i 's/"free": .*,/"free": 0,/g' "${1}"
sed -i 's/"total": .*,/"total": 0,/g' "${1}"
}
# clear files
on_exit()
{

@ -1,59 +0,0 @@
#!/usr/bin/env bash
# author: deadc0de6 (https://github.com/deadc0de6)
# Copyright (c) 2024, deadc0de6
set -e
cur=$(cd "$(dirname "${0}")" && pwd)
prev="${cur}/.."
cd "${prev}"
# coverage
bin="python3 -m catcli.catcli"
if command -v coverage 2>/dev/null; then
mkdir -p coverages/
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
fi
echo "current dir: $(pwd)"
echo "pythonpath: ${PYTHONPATH}"
echo "bin: ${bin}"
${bin} --version
# get the helpers
# shellcheck source=tests-ng/helper
source "${cur}"/helper
echo -e "$(tput setaf 6)==> RUNNING $(basename "${BASH_SOURCE[0]}") <==$(tput sgr0)"
##########################################################
# the test
##########################################################
# create temp dirs
tmpd=$(mktemp -d)
clear_on_exit "${tmpd}"
catalog="${tmpd}/catalog"
# index
${bin} -B index -c -f --catalog="${catalog}" github1 .github
${bin} -B index -c -f --catalog="${catalog}" github2 .github
clean_catalog "${catalog}"
#cat "${catalog}"
echo ""
${bin} -B ls -r --catalog="${catalog}"
${bin} -B ls --catalog="${catalog}" 'github1/*.yml'
cnt=$(${bin} -B ls --catalog="${catalog}" 'github1/*.yml' | wc -l)
[ "${cnt}" != "2" ] && echo "should return 2 (not ${cnt})" && exit 1
${bin} -B ls --catalog="${catalog}" 'github*/*.yml'
cnt=$(${bin} -B ls --catalog="${catalog}" 'github*/*.yml' | wc -l)
[ "${cnt}" != "4" ] && echo "should return 4 (not ${cnt})" && exit 1
# the end
echo ""
echo "test \"$(basename "$0")\" success"
cd "${cur}"
exit 0

@ -2,16 +2,30 @@
# author: deadc0de6 (https://github.com/deadc0de6)
# Copyright (c) 2021, deadc0de6
# exit on first error
set -e
cur=$(cd "$(dirname "${0}")" && pwd)
# get current path
rl="readlink -f"
if ! ${rl} "${0}" >/dev/null 2>&1; then
rl="realpath"
if ! command -v ${rl}; then
echo "\"${rl}\" not found !" && exit 1
fi
fi
cur=$(dirname "$(${rl} "${0}")")
# pivot
prev="${cur}/.."
cd "${prev}"
# coverage
#export PYTHONPATH=".:${PYTHONPATH}"
bin="python3 -m catcli.catcli"
if command -v coverage 2>/dev/null; then
mkdir -p coverages/
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
bin="coverage run -p --source=catcli -m catcli.catcli"
#bin="coverage run -p --source=${prev}/catcli -m catcli.catcli"
fi
echo "current dir: $(pwd)"
@ -41,12 +55,12 @@ echo "abc" > "${tmpd}/dir/a"
# index
${bin} -B index --catalog="${catalog}" dir "${tmpd}/dir"
${bin} -B ls --catalog="${catalog}"
${bin} -B ls --catalog="${catalog}" dir
# get attributes
freeb=$(${bin} -B ls --catalog="${catalog}" | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g')
dub=$(${bin} -B ls --catalog="${catalog}" | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g')
dateb=$(${bin} -B ls --catalog="${catalog}" | grep date: | sed 's/^.*,date: \(.*\)$/\1/g')
freeb=$(${bin} -B ls --catalog="${catalog}" dir | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g')
dub=$(${bin} -B ls --catalog="${catalog}" dir | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g')
dateb=$(${bin} -B ls --catalog="${catalog}" dir | grep date: | sed 's/^.*,date: \(.*\)$/\1/g')
echo "before: free:${freeb} | du:${dub} | date:${dateb}"
# change content
@ -61,12 +75,12 @@ sleep 1
# update
${bin} -B update -f --catalog="${catalog}" dir "${tmpu}/dir"
${bin} -B ls --catalog="${catalog}"
${bin} -B ls --catalog="${catalog}" dir
# get new attributes
freea=$(${bin} -B ls --catalog="${catalog}" | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g')
dua=$(${bin} -B ls --catalog="${catalog}" | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g')
datea=$(${bin} -B ls --catalog="${catalog}" | grep date: | sed 's/^.*,date: \(.*\)$/\1/g')
freea=$(${bin} -B ls --catalog="${catalog}" dir | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g')
dua=$(${bin} -B ls --catalog="${catalog}" dir | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g')
datea=$(${bin} -B ls --catalog="${catalog}" dir | grep date: | sed 's/^.*,date: \(.*\)$/\1/g')
echo "after: free:${freea} | du:${dua} | date:${datea}"
# test they are all different

@ -2,7 +2,7 @@ pycodestyle; python_version >= '3.0'
pyflakes; python_version >= '3.0'
nose2; python_version >= '3.0'
coverage; python_version >= '3.0'
coveralls; python_version >= '3.0'
pylint; python_version > '3.0'
mypy; python_version > '3.0'
pytest; python_version > '3.0'
pytype; python_version > '3.0'

@ -45,6 +45,8 @@ pylint -sn \
--disable=R0022 \
catcli/
# R0801: Similar lines in 2 files
# W0212: Access to a protected member
# R0914: Too many local variables
@ -59,23 +61,11 @@ pylint -sn setup.py
# mypy
echo "[+] mypy"
mypy --version
mypy --config-file=.mypy.ini catcli/
# pytype
echo "[+] pytype"
pytype --version
pytype catcli/
set +e
grep -R 'TODO' catcli/ && echo "TODO found" && exit 1
grep -R 'FIXME' catcli/ && echo "FIXME found" && exit 1
set -e
mypy --strict catcli/
# unittest
echo "[+] unittests"
mkdir -p coverages/
coverage run -p --data-file coverages/coverage -m pytest tests
coverage run -p -m pytest tests
# tests-ng
echo "[+] tests-ng"
@ -99,8 +89,7 @@ done
# merge coverage
echo "[+] coverage merge"
coverage combine coverages/*
coverage xml
coverage combine
echo "ALL TESTS DONE OK"
exit 0

@ -149,18 +149,21 @@ FAKECATALOG = """
{
"md5": null,
"name": "7544G",
"relpath": "tmpj5602ih7.catcli/7544G",
"size": 100,
"type": "file"
},
{
"md5": null,
"name": "KF2ZC",
"relpath": "tmpj5602ih7.catcli/KF2ZC",
"size": 100,
"type": "file"
},
{
"md5": null,
"name": "Z9OII",
"relpath": "tmpj5602ih7.catcli/Z9OII",
"size": 100,
"type": "file"
},
@ -169,12 +172,14 @@ FAKECATALOG = """
{
"md5": null,
"name": "M592O9",
"relpath": "tmpj5602ih7.catcli/VNN/M592O9",
"size": 100,
"type": "file"
}
],
"md5": null,
"name": "VNN",
"relpath": "VNN",
"size": 100,
"type": "dir"
},
@ -183,18 +188,21 @@ FAKECATALOG = """
{
"md5": null,
"name": "X37H",
"relpath": "tmpj5602ih7.catcli/P4C/X37H",
"size": 100,
"type": "file"
},
{
"md5": null,
"name": "I566",
"relpath": "tmpj5602ih7.catcli/P4C/I566",
"size": 100,
"type": "file"
}
],
"md5": null,
"name": "P4C",
"relpath": "P4C",
"size": 200,
"type": "dir"
}

@ -1,29 +0,0 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2017, deadc0de6
Basic unittest for ls
"""
import unittest
from catcli.decomp import Decomp
class TestDecomp(unittest.TestCase):
"""test ls"""
def test_list(self):
"""test decomp formats"""
dec = Decomp()
formats = dec.get_formats()
self.assertTrue('zip' in formats)
def main():
"""entry point"""
unittest.main()
if __name__ == '__main__':
main()

@ -50,7 +50,7 @@ class TestIndexing(unittest.TestCase):
tmpdirname = 'tmpdir'
args = {'<path>': dirpath, '<name>': tmpdirname,
'--hash': True, '--meta': ['some meta'],
'--verbose': True}
'--no-subsize': False, '--verbose': True}
# index the directory
cmd_index(args, noder, catalog, top)
@ -62,7 +62,7 @@ class TestIndexing(unittest.TestCase):
self.assertTrue(len(storage.children) == 5)
# ensures files and directories are in
names = [x.get_name() for x in storage.children]
names = [x.name for x in storage.children]
self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names)
self.assertTrue(os.path.basename(file3) in names)
@ -70,9 +70,9 @@ class TestIndexing(unittest.TestCase):
self.assertTrue(os.path.basename(dir2) in names)
for node in storage.children:
if node.get_name() == os.path.basename(dir1):
if node.name == os.path.basename(dir1):
self.assertTrue(len(node.children) == 2)
elif node.get_name() == os.path.basename(dir2):
elif node.name == os.path.basename(dir2):
self.assertTrue(len(node.children) == 1)

@ -46,7 +46,7 @@ class TestRm(unittest.TestCase):
top = cmd_rm(args, noder, catalog, top)
# ensure there no children anymore
self.assertEqual(len(top.children), 0)
self.assertTrue(len(top.children) == 0)
def main():

@ -62,7 +62,8 @@ class TestUpdate(unittest.TestCase):
tmpdirname = 'tmpdir'
args = {'<path>': dirpath, '<name>': tmpdirname,
'--hash': True, '--meta': ['some meta'],
'--verbose': True, '--lpath': None}
'--no-subsize': False, '--verbose': True,
'--lpath': None}
# index the directory
unix_tree(dirpath)
@ -70,11 +71,11 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(os.stat(catalogpath).st_size != 0)
# ensure md5 sum are in
nods = noder.find(top, os.path.basename(file4))
self.assertEqual(len(nods), 1)
nods = noder.find_name(top, os.path.basename(file4))
self.assertTrue(len(nods) == 1)
nod = nods[0]
self.assertTrue(nod)
self.assertEqual(nod.md5, f4_md5)
self.assertTrue(nod.md5 == f4_md5)
# print catalog
noder.print_tree(top)
@ -122,12 +123,12 @@ class TestUpdate(unittest.TestCase):
noder.print_tree(top)
# explore the top node to find all nodes
self.assertEqual(len(top.children), 1)
self.assertTrue(len(top.children) == 1)
storage = top.children[0]
self.assertEqual(len(storage.children), 8)
self.assertTrue(len(storage.children) == 8)
# ensure d1f1 md5 sum has changed in catalog
nods = noder.find(top, os.path.basename(d1f1))
nods = noder.find_name(top, os.path.basename(d1f1))
self.assertTrue(len(nods) == 1)
nod = nods[0]
self.assertTrue(nod)
@ -135,7 +136,7 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(nod.md5 == d1f1_md5_new)
# ensure f4 md5 sum has changed in catalog
nods = noder.find(top, os.path.basename(file4))
nods = noder.find_name(top, os.path.basename(file4))
self.assertTrue(len(nods) == 1)
nod = nods[0]
self.assertTrue(nod)
@ -143,7 +144,7 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(nod.md5 == f4_md5_new)
# ensure d2f2 md5 sum has changed in catalog
nods = noder.find(top, os.path.basename(d2f2))
nods = noder.find_name(top, os.path.basename(d2f2))
self.assertTrue(len(nods) == 1)
nod = nods[0]
self.assertTrue(nod)
@ -151,7 +152,7 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(nod.md5 == d2f2_md5_new)
# ensures files and directories are in
names = [node.get_name() for node in anytree.PreOrderIter(storage)]
names = [node.name for node in anytree.PreOrderIter(storage)]
print(names)
self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names)
@ -169,13 +170,13 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(os.path.basename(new5) in names)
for node in storage.children:
if node.get_name() == os.path.basename(dir1):
if node.name == os.path.basename(dir1):
self.assertTrue(len(node.children) == 3)
elif node.get_name() == os.path.basename(dir2):
elif node.name == os.path.basename(dir2):
self.assertTrue(len(node.children) == 3)
elif node.get_name() == os.path.basename(new3):
elif node.name == os.path.basename(new3):
self.assertTrue(len(node.children) == 0)
elif node.get_name() == os.path.basename(new4):
elif node.name == os.path.basename(new4):
self.assertTrue(len(node.children) == 1)
self.assertTrue(read_from_file(d1f1) == editval)
@ -189,7 +190,7 @@ class TestUpdate(unittest.TestCase):
cmd_update(args, noder, catalog, top)
# ensures files and directories are (not) in
names = [node.get_name() for node in anytree.PreOrderIter(storage)]
names = [node.name for node in anytree.PreOrderIter(storage)]
print(names)
self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names)
@ -207,9 +208,9 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(os.path.basename(new4) not in names)
self.assertTrue(os.path.basename(new5) not in names)
for node in storage.children:
if node.get_name() == os.path.basename(dir1):
if node.name == os.path.basename(dir1):
self.assertTrue(len(node.children) == 2)
elif node.get_name() == os.path.basename(new3):
elif node.name == os.path.basename(new3):
self.assertTrue(len(node.children) == 0)

Loading…
Cancel
Save