Compare commits

...

66 Commits

Author SHA1 Message Date
deadc0de6 3a082ee5ad bump version 3 months ago
deadc0de6 1e12caa770 update readme 3 months ago
deadc0de6 c9b4043e5f fix bugs 3 months ago
deadc0de6 e6ca6e2fcc bump version 4 months ago
deadc0de6 1f1ea8d37c env variables 4 months ago
deadc0de6 eb0a628ab6 fix dependencies 4 months ago
deadc0de d8eae3024e
Merge pull request #47 from michalkielan/add_env_var
Add option to override default catalog path
4 months ago
Michal Kielan bbfb60fe2e use optional env variable for catalog path 4 months ago
deadc0de6 8788ba8b86 bump version 4 months ago
deadc0de 06a36c42fd
Merge pull request #45 from deadc0de6/features-42
add features for #42
4 months ago
deadc0de6 6b9e00f11b fix linting 5 months ago
deadc0de6 e63f5b8d79 pytype checks 5 months ago
deadc0de6 6164930088 linting 5 months ago
deadc0de6 638085ce98 repl 5 months ago
deadc0de6 f3c4a86b1d tests 5 months ago
deadc0de6 963a8b0518 update ensures dir size are correct 5 months ago
deadc0de6 ea0cb2f9da linting 5 months ago
deadc0de6 340ab62d77 du 5 months ago
deadc0de6 ff02f9bb97 mypy fix 5 months ago
deadc0de6 4aaa073603 mypy 5 months ago
deadc0de6 8111327b53 fix find 5 months ago
deadc0de6 bed81ffee4 fix ls 5 months ago
deadc0de6 e58da8b6bf wildcard and refactoring 5 months ago
deadc0de6 0dcbfa94bd refactoring 5 months ago
deadc0de6 691396c96a add tree command 5 months ago
deadc0de6 b7d6f21cc2 ls sorting 5 months ago
deadc0de6 9326911824 coverage badge 5 months ago
deadc0de6 b0876f3382 add mypy.ini 5 months ago
deadc0de6 5c06e36cc6 mypy version 5 months ago
deadc0de6 5301126d90 fix mypy 5 months ago
deadc0de6 9d6bba0127 remove python 3.6 5 months ago
deadc0de6 982f69d410 add python 3.11 5 months ago
deadc0de6 c61ce59b35 fix actions 5 months ago
deadc0de6 07d323f0e6 fix tests 5 months ago
deadc0de6 ddefc662db coverage 5 months ago
deadc0de6 3ccaf81abd fix find 5 months ago
deadc0de6 e610273dc3 cleaning 5 months ago
deadc0de6 f918ea5ae4 handle file listing and glob 5 months ago
deadc0de6 9050c6bcf6 typecheck 5 months ago
deadc0de6 9dfc4da8bf check for TODO/FIXME 5 months ago
deadc0de6 35d1d0d9c4 depth to nbchildren 5 months ago
deadc0de6 1c74290fa1 update 5 months ago
deadc0de6 ac4ba145fe clear relpath and add full path 5 months ago
deadc0de6 3cf3031af2 bump version 10 months ago
deadc0de 1c2d5f378e
Merge pull request #40 from deadc0de6/fix-39
Fix 39
10 months ago
deadc0de6 57d865bb71 Merge branch 'fix-39' of github.com:deadc0de6/catcli into fix-39 10 months ago
deadc0de6 b7bd2ecc5d fix casting 10 months ago
deadc0de6 e2be5136d4 fix for #39 10 months ago
deadc0de6 b0572bf119 decomp bare test 10 months ago
deadc0de6 001c03ca28 fix casting 10 months ago
deadc0de6 b809850fa0 fix for #39 10 months ago
deadc0de6 b057bde35d remove allow_other in fuse 10 months ago
deadc0de6 055fe8a0ca bump version 11 months ago
deadc0de 9dbc8f7342
Merge pull request #37 from deadc0de6/fix-36
size attr fix for #36
11 months ago
deadc0de6 7e9a6806ee size attr fix for #36 11 months ago
deadc0de6 3dbec74e96 bump version 1 year ago
deadc0de 6321d04417
Merge pull request #35 from deadc0de6/fix-34
Fix 34
1 year ago
deadc0de6 a16838299f debug logs 1 year ago
deadc0de6 d00964483e fix tests 1 year ago
deadc0de6 226448b334 add test for #34 1 year ago
deadc0de6 205f4e2148 fix bug #34 1 year ago
deadc0de6 b4723b6277 badge 1 year ago
deadc0de6 f738644a52 bump version 1 year ago
deadc0de6 59cb7bc953 do not depend on fusepy and pyfzf 1 year ago
deadc0de6 59333441ee bump version 1 year ago
deadc0de6 4147e35c28 fix dependencies in setup.py 1 year ago

@ -0,0 +1,7 @@
coverage:
status:
project:
default:
target: 90%
threshold: 1%
patch: off

@ -1,11 +1,11 @@
name: tests
on: [push, pull_request]
on: [push, pull_request, workflow_dispatch]
jobs:
test:
runs-on: ubuntu-20.04
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.6", "3.7", "3.8", "3.9", "3.10"]
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
@ -21,9 +21,9 @@ jobs:
- name: Run tests
run: |
./tests.sh
- name: Coveralls
run: |
pip install coveralls
coveralls --service=github
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
with:
files: coverage.xml
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

4
.gitignore vendored

@ -1,6 +1,8 @@
*.pyc
.coverage
.coverage*
coverages/
coverage.xml
dist/
build/
*.egg-info/
@ -9,3 +11,5 @@ build/
.mypy_cache
.pytest_cache
__pycache__
.pyre
.pytype

@ -0,0 +1,5 @@
[mypy]
strict = true
disable_error_code = import-untyped,import-not-found
ignore_missing_imports = True
warn_unused_ignores = False

@ -1,8 +1,8 @@
# CATCLI
[![Build Status](https://travis-ci.org/deadc0de6/catcli.svg?branch=master)](https://travis-ci.org/deadc0de6/catcli)
[![Tests Status](https://github.com/deadc0de6/catcli/workflows/tests/badge.svg?branch=master)](https://github.com/deadc0de6/catcli/actions)
[![License: GPL v3](https://img.shields.io/badge/License-GPL%20v3-blue.svg)](http://www.gnu.org/licenses/gpl-3.0)
[![Coveralls](https://img.shields.io/coveralls/github/deadc0de6/catcli)](https://coveralls.io/github/deadc0de6/catcli?branch=master)
[![Coverage](https://codecov.io/gh/deadc0de6/catcli/graph/badge.svg?token=t5dF7UL7K1)](https://codecov.io/gh/deadc0de6/catcli)
[![PyPI version](https://badge.fury.io/py/catcli.svg)](https://badge.fury.io/py/catcli)
[![AUR](https://img.shields.io/aur/version/catcli-git.svg)](https://aur.archlinux.org/packages/catcli-git)
@ -12,6 +12,10 @@
*The command line catalog tool for your offline data*
> [!WARNING]
> catcli has been superseded by [gocatcli](https://github.com/deadc0de6/gocatcli/)
> which provides all features of catcli and more...
Did you ever wanted to find back that specific file that should be on one of your
backup DVDs or one of your external hard drives? You usually go through all
of them hoping to find the right one on the first try?
@ -50,6 +54,8 @@ catcli ls -r
catcli ls log
# find files/directories named '*log*'
catcli find log
# show directories sizes
catcli du log
```
see [usage](#usage) for specific info
@ -76,6 +82,7 @@ See the [examples](#examples) for an overview of the available features.
* [Find files](#find-files)
* [Mount catalog](#mount-catalog)
* [Display entire hierarchy](#display-entire-hierarchy)
* [Disk usage](#disk-usage)
* [Catalog graph](#catalog-graph)
* [Edit storage](#edit-storage)
* [Update catalog](#update-catalog)
@ -131,6 +138,13 @@ Five different types of entry are present in a catalog:
* **file node**: this is a file
* **archive node**: this is a file contained in an archive (tar, zip, etc)
Following environment variables are supported:
* `CATCLI_CATALOG_PATH`: define the catalog path (`--catalog=<path>`)
* `CATCLI_NO_BANNER`: disable the banner (`--no-banner`)
* `CATCLI_VERBOSE`: enable verbose mode (`--verbose`)
* `CATCLI_FORMAT`: define the output format (`-F --format=<fmt>`)
## Index data
Let's say the DVD or external hard drive that needs to be indexed
@ -148,9 +162,6 @@ directory under `catcli.catalog`.
The `--meta` switch allows to add any additional information to store along in
the catalog like for example `the blue disk in my office`.
Catcli will calculate and store the total size of each node (directories, storages, etc)
unless the `-n --no-subsize` switch is used.
Using the `-a --archive` switch allows to also index archive files as explained
[below](#index-archive-files).
@ -215,6 +226,11 @@ Resulting files can be sorted by size using the `-S --sortsize` switch.
See the [examples](#examples) for more.
## Disk usage
You can get the disk usage with the `du` command.
Resulting files can be sorted by size using the `-S --sortsize` switch.
## Catalog graph
The catalog can be exported in a dot file that can be used to

@ -6,9 +6,10 @@ Class that represents the catcli catalog
"""
import os
from typing import Optional
from anytree.exporter import JsonExporter # type: ignore
from anytree.importer import JsonImporter # type: ignore
from typing import Optional, List, Dict, Tuple, Union, Any
from anytree.exporter import JsonExporter, DictExporter
from anytree.importer import JsonImporter
from anytree import AnyNode
# local imports
from catcli import nodes
@ -29,7 +30,7 @@ class Catalog:
@debug: debug mode
@force: force overwrite if exists
"""
self.path = path
self.path = os.path.expanduser(path)
self.debug = debug
self.force = force
self.metanode: Optional[NodeMeta] = None
@ -85,7 +86,8 @@ class Catalog:
def _save_json(self, top: NodeTop) -> bool:
"""export the catalog in json"""
self._debug(f'saving {top} to json...')
exp = JsonExporter(indent=2, sort_keys=True)
dexporter = DictExporter(attriter=attriter)
exp = JsonExporter(dictexporter=dexporter, indent=2, sort_keys=True)
with open(self.path, 'w', encoding='UTF-8') as file:
exp.write(top, file)
self._debug(f'Catalog saved to json \"{self.path}\"')
@ -93,13 +95,61 @@ class Catalog:
def _restore_json(self, string: str) -> Optional[NodeTop]:
"""restore the tree from json"""
imp = JsonImporter()
self._debug(f'import from string: {string}')
imp = JsonImporter(dictimporter=_DictImporter(debug=self.debug))
root = imp.import_(string)
self._debug(f'Catalog imported from json \"{self.path}\"')
self._debug(f'root imported: {root}')
if root.type != nodes.TYPE_TOP:
return None
top = NodeTop(root.name, children=root.children)
self._debug(f'top imported: {top}')
self._debug(f'top imported: {top.get_name()}')
return top
class _DictImporter():
def __init__(self,
nodecls: AnyNode = AnyNode,
debug: bool = False):
self.nodecls = nodecls
self.debug = debug
def import_(self, data: Dict[str, str]) -> AnyNode:
"""Import tree from `data`."""
return self.__import(data)
def __import(self, data: Union[str, Any],
parent: AnyNode = None) -> AnyNode:
"""overwrite parent imoprt"""
assert isinstance(data, dict)
assert "parent" not in data
attrs = dict(data)
# replace attr
attrs = back_attriter(attrs)
children: Union[str, Any] = attrs.pop("children", [])
node = self.nodecls(parent=parent, **attrs)
for child in children:
self.__import(child, parent=node)
return node
def back_attriter(adict: Dict[str, str]) -> Dict[str, str]:
"""replace attribute on json restore"""
attrs = {}
for k, val in adict.items():
newk = k
if k == 'size':
newk = 'nodesize'
attrs[newk] = val
return attrs
def attriter(attrs: List[Tuple[str, Any]]) -> List[Tuple[str, Any]]:
"""replace attribute on json save"""
newattr = []
for attr in attrs:
k, val = attr
if k == 'nodesize':
k = 'size'
newattr.append((k, val))
return newattr

@ -11,27 +11,41 @@ Catcli command line interface
import sys
import os
import datetime
from typing import Dict, Any, List
from typing import Dict, Any, List, \
Tuple
from docopt import docopt
import cmd2
# local imports
from catcli.version import __version__ as VERSION
from catcli.nodes import NodeTop, NodeAny
from catcli.logger import Logger
from catcli.printer_csv import CsvPrinter
from catcli.colors import Colors
from catcli.catalog import Catalog
from catcli.walker import Walker
from catcli.noder import Noder
from catcli.utils import ask, edit, path_to_search_all
from catcli.fuser import Fuser
from catcli.utils import ask, edit
from catcli.nodes_utils import path_to_search_all
from catcli.exceptions import BadFormatException, CatcliException
NAME = 'catcli'
CUR = os.path.dirname(os.path.abspath(__file__))
CATALOGPATH = f'{NAME}.catalog'
GRAPHPATH = f'/tmp/{NAME}.dot'
FORMATS = ['native', 'csv', 'csv-with-header', 'fzf-native', 'fzf-csv']
# env variables
ENV_CATALOG_PATH = 'CATCLI_CATALOG_PATH'
ENV_NOBANNER = 'CATCLI_NO_BANNER'
ENV_VERBOSE = 'CATCLI_VERBOSE'
ENV_FORMAT = 'CATCLI_FORMAT'
# default paths
DEFAULT_CATALOGPATH = os.getenv(ENV_CATALOG_PATH, default=f'{NAME}.catalog')
DEFAULT_GRAPHPATH = f'/tmp/{NAME}.dot'
DEFAULT_NOBANNER = os.getenv(ENV_NOBANNER) is not None
DEFAULT_VERBOSEMODE = os.getenv(ENV_VERBOSE) is not None
DEFAULT_FORMAT = os.getenv(ENV_FORMAT, default='native')
BANNER = f""" +-+-+-+-+-+-+
|c|a|t|c|l|i|
+-+-+-+-+-+-+ v{VERSION}"""
@ -40,41 +54,44 @@ USAGE = f"""
{BANNER}
Usage:
{NAME} ls [--catalog=<path>] [--format=<fmt>] [-aBCrVSs] [<path>]
{NAME} find [--catalog=<path>] [--format=<fmt>]
[-aBCbdVsP] [--path=<path>] [<term>]
{NAME} index [--catalog=<path>] [--meta=<meta>...]
[-aBCcfnV] <name> <path>
{NAME} update [--catalog=<path>] [-aBCcfnV] [--lpath=<path>] <name> <path>
{NAME} mount [--catalog=<path>] [-V] <mountpoint>
{NAME} rm [--catalog=<path>] [-BCfV] <storage>
{NAME} rename [--catalog=<path>] [-BCfV] <storage> <name>
{NAME} edit [--catalog=<path>] [-BCfV] <storage>
{NAME} graph [--catalog=<path>] [-BCV] [<path>]
{NAME} ls [--catalog=<path>] [--format=<fmt>] [-aBCrVSs] [<path>]
{NAME} tree [--catalog=<path>] [-aBCVSs] [<path>]
{NAME} find [--catalog=<path>] [--format=<fmt>]
[-aBCbdVs] [--path=<path>] [<term>]
{NAME} index [--catalog=<path>] [--meta=<meta>...]
[-aBCcfV] <name> <path>
{NAME} update [--catalog=<path>] [-aBCcfV]
[--lpath=<path>] <name> <path>
{NAME} mount [--catalog=<path>] [-V] <mountpoint>
{NAME} du [--catalog=<path>] [-BCVSs] [<path>]
{NAME} rm [--catalog=<path>] [-BCfV] <storage>
{NAME} rename [--catalog=<path>] [-BCfV] <storage> <name>
{NAME} edit [--catalog=<path>] [-BCfV] <storage>
{NAME} graph [--catalog=<path>] [-BCV] [<path>]
{NAME} [--catalog=<path>]
{NAME} fixsizes [--catalog=<path>]
{NAME} print_supported_formats
{NAME} help
{NAME} --help
{NAME} --version
Options:
--catalog=<path> Path to the catalog [default: {CATALOGPATH}].
--catalog=<path> Path to the catalog [default: {DEFAULT_CATALOGPATH}].
--meta=<meta> Additional attribute to store [default: ].
-a --archive Handle archive file [default: False].
-B --no-banner Do not display the banner [default: False].
-B --no-banner Do not display the banner [default: {str(DEFAULT_NOBANNER)}].
-b --script Output script to manage found file(s) [default: False].
-C --no-color Do not output colors [default: False].
-c --hash Calculate md5 hash [default: False].
-d --directory Only directory [default: False].
-F --format=<fmt> see \"print_supported_formats\" [default: native].
-F --format=<fmt> see \"print_supported_formats\" [default: {DEFAULT_FORMAT}].
-f --force Do not ask when updating the catalog [default: False].
-l --lpath=<path> Path where changes are logged [default: ]
-n --no-subsize Do not store size of directories [default: False].
-P --parent Ignore stored relpath [default: True].
-p --path=<path> Start path.
-r --recursive Recursive [default: False].
-s --raw-size Print raw size [default: False].
-S --sortsize Sort by size, largest first [default: False].
-V --verbose Be verbose [default: False].
-V --verbose Be verbose [default: {str(DEFAULT_VERBOSEMODE)}].
-v --version Show version.
-h --help Show this screen.
""" # nopep8
@ -82,12 +99,18 @@ Options:
def cmd_mount(args: Dict[str, Any],
top: NodeTop,
noder: Noder) -> None:
noder: Noder) -> bool:
"""mount action"""
mountpoint = args['<mountpoint>']
debug = args['--verbose']
Fuser(mountpoint, top, noder,
debug=debug)
try:
from catcli.fuser import Fuser # pylint: disable=C0415
Fuser(mountpoint, top, noder,
debug=debug)
except ModuleNotFoundError:
Logger.err('install fusepy to use mount')
return False
return True
def cmd_index(args: Dict[str, Any],
@ -99,7 +122,6 @@ def cmd_index(args: Dict[str, Any],
name = args['<name>']
usehash = args['--hash']
debug = args['--verbose']
subsize = not args['--no-subsize']
if not os.path.exists(path):
Logger.err(f'\"{path}\" does not exist')
return
@ -111,16 +133,18 @@ def cmd_index(args: Dict[str, Any],
except KeyboardInterrupt:
Logger.err('aborted')
return
node = noder.get_storage_node(top, name)
node.parent = None
node = top.get_storage_node()
if node:
node.parent = None
start = datetime.datetime.now()
if debug:
Logger.debug('debug mode enabled')
walker = Walker(noder, usehash=usehash, debug=debug)
attr = args['--meta']
root = noder.new_storage_node(name, path, top, attr)
_, cnt = walker.index(path, root, name)
if subsize:
noder.rec_size(root, store=True)
root.nodesize = root.get_rec_size()
stop = datetime.datetime.now()
diff = stop - start
Logger.info(f'Indexed {cnt} file(s) in {diff}')
@ -138,20 +162,19 @@ def cmd_update(args: Dict[str, Any],
usehash = args['--hash']
logpath = args['--lpath']
debug = args['--verbose']
subsize = not args['--no-subsize']
if not os.path.exists(path):
Logger.err(f'\"{path}\" does not exist')
return
root = noder.get_storage_node(top, name, newpath=path)
if not root:
storage = noder.find_storage_node_by_name(top, name)
if not storage:
Logger.err(f'storage named \"{name}\" does not exist')
return
noder.update_storage_path(top, name, path)
start = datetime.datetime.now()
walker = Walker(noder, usehash=usehash, debug=debug,
logpath=logpath)
cnt = walker.reindex(path, root, top)
if subsize:
noder.rec_size(root, store=True)
cnt = walker.reindex(path, storage, top)
storage.nodesize = storage.get_rec_size()
stop = datetime.datetime.now()
diff = stop - start
Logger.info(f'updated {cnt} file(s) in {diff}')
@ -159,6 +182,20 @@ def cmd_update(args: Dict[str, Any],
catalog.save(top)
def cmd_du(args: Dict[str, Any],
noder: Noder,
top: NodeTop) -> List[NodeAny]:
"""du action"""
path = path_to_search_all(args['<path>'])
found = noder.diskusage(top,
path,
raw=args['--raw-size'])
if not found:
path = args['<path>']
Logger.err(f'\"{path}\": nothing found')
return found
def cmd_ls(args: Dict[str, Any],
noder: Noder,
top: NodeTop) -> List[NodeAny]:
@ -169,8 +206,8 @@ def cmd_ls(args: Dict[str, Any],
raise BadFormatException('fzf is not supported in ls, use find')
found = noder.list(top,
path,
rec=args['--recursive'],
fmt=fmt,
rec=args['--recursive'],
raw=args['--raw-size'])
if not found:
path = args['<path>']
@ -184,7 +221,7 @@ def cmd_rm(args: Dict[str, Any],
top: NodeTop) -> NodeTop:
"""rm action"""
name = args['<storage>']
node = noder.get_storage_node(top, name)
node = noder.find_storage_node_by_name(top, name)
if node:
node.parent = None
if catalog.save(top):
@ -198,19 +235,20 @@ def cmd_find(args: Dict[str, Any],
noder: Noder,
top: NodeTop) -> List[NodeAny]:
"""find action"""
fromtree = args['--parent']
directory = args['--directory']
startpath = args['--path']
fmt = args['--format']
raw = args['--raw-size']
script = args['--script']
search_for = args['<term>']
found = noder.find_name(top, search_for,
script=script,
startnode=startpath,
only_dir=directory,
parentfromtree=fromtree,
fmt=fmt, raw=raw)
if args['--verbose']:
Logger.debug(f'search for "{search_for}" under "{top.get_name()}"')
found = noder.find(top, search_for,
script=script,
startnode=startpath,
only_dir=directory,
fmt=fmt,
raw=raw)
return found
@ -220,21 +258,32 @@ def cmd_graph(args: Dict[str, Any],
"""graph action"""
path = args['<path>']
if not path:
path = GRAPHPATH
path = DEFAULT_GRAPHPATH
cmd = noder.to_dot(top, path)
Logger.info(f'create graph with \"{cmd}\" (you need graphviz)')
def cmd_fixsizes(top: NodeTop,
noder: Noder,
catalog: Catalog) -> None:
"""
fix each node size by re-calculating
recursively their size
"""
noder.fixsizes(top)
catalog.save(top)
def cmd_rename(args: Dict[str, Any],
catalog: Catalog,
top: NodeTop) -> None:
"""rename action"""
storage = args['<storage>']
new = args['<name>']
storages = list(x.name for x in top.children)
storages = list(x.get_name() for x in top.children)
if storage in storages:
node = next(filter(lambda x: x.name == storage, top.children))
node.name = new
node = next(filter(lambda x: x.get_name() == storage, top.children))
node.set_name(new)
if catalog.save(top):
msg = f'Storage \"{storage}\" renamed to \"{new}\"'
Logger.info(msg)
@ -248,9 +297,9 @@ def cmd_edit(args: Dict[str, Any],
top: NodeTop) -> None:
"""edit action"""
storage = args['<storage>']
storages = list(x.name for x in top.children)
storages = list(x.get_name() for x in top.children)
if storage in storages:
node = next(filter(lambda x: x.name == storage, top.children))
node = next(filter(lambda x: x.get_name() == storage, top.children))
attr = node.attr
if not attr:
attr = ''
@ -262,6 +311,75 @@ def cmd_edit(args: Dict[str, Any],
Logger.err(f'Storage named \"{storage}\" does not exist')
class CatcliRepl(cmd2.Cmd): # type: ignore
"""catcli repl"""
prompt = 'catcli> '
intro = ''
def __init__(self) -> None:
super().__init__()
# remove built-ins
del cmd2.Cmd.do_alias
del cmd2.Cmd.do_edit
del cmd2.Cmd.do_macro
del cmd2.Cmd.do_run_pyscript
del cmd2.Cmd.do_run_script
del cmd2.Cmd.do_set
del cmd2.Cmd.do_shell
del cmd2.Cmd.do_shortcuts
self.hidden_commands.append('EOF')
def cmdloop(self, intro: Any = None) -> Any:
return cmd2.Cmd.cmdloop(self, intro)
@cmd2.with_argument_list # type: ignore
def do_ls(self, arglist: List[str]) -> bool:
"""ls <path>"""
arglist.insert(0, '--no-banner')
arglist.insert(0, 'ls')
args, noder, _, _, top = init(arglist)
cmd_ls(args, noder, top)
return False
@cmd2.with_argument_list # type: ignore
def do_tree(self, arglist: List[str]) -> bool:
"""tree <path>"""
arglist.insert(0, '--no-banner')
arglist.insert(0, 'tree')
args, noder, _, _, top = init(arglist)
cmd_ls(args, noder, top)
return False
@cmd2.with_argument_list # type: ignore
def do_find(self, arglist: List[str]) -> bool:
"""find <term>"""
arglist.insert(0, '--no-banner')
arglist.insert(0, 'find')
args, noder, _, _, top = init(arglist)
cmd_find(args, noder, top)
return False
@cmd2.with_argument_list # type: ignore
def do_du(self, arglist: List[str]) -> bool:
"""du <path>"""
arglist.insert(0, '--no-banner')
arglist.insert(0, 'du')
args, noder, _, _, top = init(arglist)
cmd_du(args, noder, top)
return False
def do_help(self, _: Any) -> bool:
"""help"""
print(USAGE)
return False
# pylint: disable=C0103
def do_EOF(self, _: Any) -> bool:
"""exit repl"""
return True
def banner() -> None:
"""print banner"""
Logger.stderr_nocolor(BANNER)
@ -272,35 +390,39 @@ def print_supported_formats() -> None:
"""print all supported formats to stdout"""
print('"native" : native format')
print('"csv" : CSV format')
print(f' {Noder.CSV_HEADER}')
print(f' {CsvPrinter.CSV_HEADER}')
print('"fzf-native" : fzf to native (only valid for find)')
print('"fzf-csv" : fzf to csv (only valid for find)')
def main() -> bool:
"""entry point"""
args = docopt(USAGE, version=VERSION)
def init(argv: List[str]) -> Tuple[Dict[str, Any],
Noder,
Catalog,
str,
NodeTop]:
"""parse catcli arguments"""
args = docopt(USAGE, argv=argv, version=VERSION)
if args['help'] or args['--help']:
print(USAGE)
return True
sys.exit(0)
if args['print_supported_formats']:
print_supported_formats()
return True
sys.exit(0)
# check format
fmt = args['--format']
if fmt not in FORMATS:
Logger.err(f'bad format: {fmt}')
print_supported_formats()
return False
sys.exit(0)
if args['--verbose']:
print(args)
if args['--verbose'] or DEFAULT_VERBOSEMODE:
print('verbose mode enabled')
print(f'args: {args}')
# print banner
if not args['--no-banner']:
if not args['--no-banner'] and DEFAULT_NOBANNER:
banner()
# set colors
@ -323,15 +445,23 @@ def main() -> bool:
meta = noder.update_metanode(top)
catalog.set_metanode(meta)
return args, noder, catalog, catalog_path, top
def main() -> bool:
"""entry point"""
args, noder, catalog, catalog_path, top = init(sys.argv[1:])
# parse command
try:
if args['index']:
cmd_index(args, noder, catalog, top)
if args['update']:
elif args['update']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_update(args, noder, catalog, top)
cmd_fixsizes(top, noder, catalog)
elif args['find']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
@ -342,11 +472,18 @@ def main() -> bool:
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_ls(args, noder, top)
elif args['tree']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
args['--recursive'] = True
cmd_ls(args, noder, top)
elif args['mount']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_mount(args, top, noder)
if not cmd_mount(args, top, noder):
return False
elif args['rm']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
@ -367,6 +504,18 @@ def main() -> bool:
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_edit(args, noder, catalog, top)
elif args['du']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_du(args, noder, top)
elif args['fixsizes']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_fixsizes(top, noder, catalog)
else:
CatcliRepl().cmdloop()
except CatcliException as exc:
Logger.stderr_nocolor('ERROR ' + str(exc))
return False

@ -20,7 +20,9 @@ class Colors:
PURPLE = '\033[1;35m'
BLUE = '\033[94m'
GRAY = '\033[0;37m'
CYAN = '\033[36m'
MAGENTA = '\033[95m'
WHITE = '\033[97m'
RESET = '\033[0m'
EMPH = '\033[33m'
BOLD = '\033[1m'

@ -9,12 +9,15 @@ import os
from time import time
from stat import S_IFDIR, S_IFREG
from typing import List, Dict, Any, Optional
import fuse # type: ignore
try:
import fuse
except ModuleNotFoundError:
pass
# local imports
from catcli.noder import Noder
from catcli.nodes import NodeTop, NodeAny
from catcli.utils import path_to_search_all, path_to_top
from catcli.nodes_utils import path_to_search_all, path_to_top
from catcli import nodes
@ -30,7 +33,6 @@ class Fuser:
fuse.FUSE(filesystem,
mountpoint,
foreground=debug,
allow_other=True,
nothreads=True,
debug=debug)
@ -71,21 +73,21 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
maccess = time()
mode: Any = S_IFREG
size: int = 0
nodesize: int = 0
if entry.type == nodes.TYPE_ARCHIVED:
mode = S_IFREG
size = entry.size
nodesize = entry.nodesize
elif entry.type == nodes.TYPE_DIR:
mode = S_IFDIR
size = entry.size
nodesize = entry.nodesize
maccess = entry.maccess
elif entry.type == nodes.TYPE_FILE:
mode = S_IFREG
size = entry.size
nodesize = entry.nodesize
maccess = entry.maccess
elif entry.type == nodes.TYPE_STORAGE:
mode = S_IFDIR
size = entry.size
nodesize = entry.nodesize
maccess = entry.ts
elif entry.type == nodes.TYPE_META:
mode = S_IFREG
@ -95,7 +97,7 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
return {
'st_mode': (mode), # file type
'st_nlink': 1, # count hard link
'st_size': size,
'st_size': nodesize,
'st_ctime': maccess, # attr last modified
'st_mtime': maccess, # content last modified
'st_atime': maccess, # access time
@ -105,7 +107,7 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
def getattr(self, path: str, _fh: Any = None) -> Dict[str, Any]:
"""return attr of file pointed by path"""
if path == '/':
if path == os.path.sep:
# mountpoint
curt = time()
meta = {
@ -127,5 +129,5 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
content = ['.', '..']
entries = self._get_entries(path)
for entry in entries:
content.append(entry.name)
content.append(entry.get_name())
return content

@ -1,73 +0,0 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
Class for printing nodes
"""
import sys
from typing import TypeVar, Type, Optional, Tuple, List, \
Dict, Any
from catcli.colors import Colors
from catcli.utils import fix_badchars
CLASSTYPE = TypeVar('CLASSTYPE', bound='NodePrinter')
class NodePrinter:
"""a node printer class"""
STORAGE = 'storage'
ARCHIVE = 'archive'
NBFILES = 'nbfiles'
@classmethod
def print_storage_native(cls: Type[CLASSTYPE], pre: str,
name: str, args: str,
attr: Dict[str, Any]) -> None:
"""print a storage node"""
end = ''
if attr:
end = f' {Colors.GRAY}({attr}){Colors.RESET}'
out = f'{pre}{Colors.UND}{cls.STORAGE}{Colors.RESET}:'
out += ' ' + Colors.PURPLE + fix_badchars(name) + \
Colors.RESET + end + '\n'
out += f' {Colors.GRAY}{args}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def print_file_native(cls: Type[CLASSTYPE], pre: str,
name: str, attr: str) -> None:
"""print a file node"""
nobad = fix_badchars(name)
out = f'{pre}{nobad}'
out += f' {Colors.GRAY}[{attr}]{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def print_dir_native(cls: Type[CLASSTYPE], pre: str,
name: str,
depth: int = 0,
attr: Optional[List[Tuple[str, str]]] = None) -> None:
"""print a directory node"""
end = []
if depth > 0:
end.append(f'{cls.NBFILES}:{depth}')
if attr:
end.append(' '.join([f'{x}:{y}' for x, y in attr]))
end_string = ''
if end:
end_string = f' [{", ".join(end)}]'
out = pre + Colors.BLUE + fix_badchars(name) + Colors.RESET
out += f'{Colors.GRAY}{end_string}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def print_archive_native(cls: Type[CLASSTYPE], pre: str,
name: str, archive: str) -> None:
"""archive to stdout"""
out = pre + Colors.YELLOW + fix_badchars(name) + Colors.RESET
out += f' {Colors.GRAY}[{cls.ARCHIVE}:{archive}]{Colors.RESET}'
sys.stdout.write(f'{out}\n')

@ -9,16 +9,19 @@ import os
import shutil
import time
from typing import List, Union, Tuple, Any, Optional, Dict, cast
import anytree # type: ignore
from pyfzf.pyfzf import FzfPrompt # type: ignore
import fnmatch
import anytree
from natsort import os_sort_keygen
# local imports
from catcli import nodes
from catcli.nodes import NodeAny, NodeStorage, \
NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta
from catcli.utils import size_to_str, epoch_to_str, md5sum, fix_badchars
NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta, \
typcast_node
from catcli.utils import md5sum
from catcli.logger import Logger
from catcli.nodeprinter import NodePrinter
from catcli.printer_native import NativePrinter
from catcli.printer_csv import CsvPrinter
from catcli.decomp import Decomp
from catcli.version import __version__ as VERSION
from catcli.exceptions import CatcliException
@ -33,10 +36,7 @@ class Noder:
* "dir" node representing a directory
* "file" node representing a file
"""
CSV_HEADER = ('name,type,path,size,indexed_at,'
'maccess,md5,nbfiles,free_space,'
'total_space,meta')
# pylint: disable=R0904
def __init__(self, debug: bool = False,
sortsize: bool = False,
@ -52,31 +52,33 @@ class Noder:
self.arc = arc
if self.arc:
self.decomp = Decomp()
self.csv_printer = CsvPrinter()
self.native_printer = NativePrinter()
@staticmethod
def get_storage_names(top: NodeTop) -> List[str]:
"""return a list of all storage names"""
return [x.name for x in list(top.children)]
def get_storage_node(self, top: NodeTop,
name: str,
newpath: str = '') -> NodeStorage:
"""
return the storage node if any
if newpath is submitted, it will update the media info
"""
found = None
def find_storage_node_by_name(self, top: NodeTop,
name: str) -> Optional[NodeStorage]:
"""find a storage node by name"""
for node in top.children:
if node.type != nodes.TYPE_STORAGE:
continue
if node.name == name:
found = node
break
if found and newpath and os.path.exists(newpath):
found.free = shutil.disk_usage(newpath).free
found.total = shutil.disk_usage(newpath).total
found.ts = int(time.time())
return cast(NodeStorage, found)
return cast(NodeStorage, node)
return None
def update_storage_path(self, top: NodeTop,
name: str,
newpath: str) -> None:
"""find and update storage path on update"""
storage = self.find_storage_node_by_name(top, name)
if storage and newpath and os.path.exists(newpath):
storage.free = shutil.disk_usage(newpath).free
storage.total = shutil.disk_usage(newpath).total
storage.ts = int(time.time())
@staticmethod
def get_node(top: NodeTop,
@ -84,9 +86,11 @@ class Noder:
quiet: bool = False) -> Optional[NodeAny]:
"""get the node by internal tree path"""
resolv = anytree.resolver.Resolver('name')
bpath = ''
try:
bpath = os.path.basename(path)
the_node = resolv.get(top, bpath)
typcast_node(the_node)
return cast(NodeAny, the_node)
except anytree.resolver.ChildResolverError:
if not quiet:
@ -113,7 +117,7 @@ class Noder:
return node, False
# force re-indexing if no maccess
maccess = os.path.getmtime(path)
if not self._has_attr(node, 'maccess') or \
if not node.has_attr('maccess') or \
not node.maccess:
self._debug('\tchange: no maccess found')
return node, True
@ -132,37 +136,6 @@ class Noder:
self._debug(f'\tchange: no change for \"{path}\"')
return node, False
def rec_size(self, node: Union[NodeDir, NodeStorage],
store: bool = True) -> int:
"""
recursively traverse tree and return size
@store: store the size in the node
"""
if node.type == nodes.TYPE_FILE:
self._debug(f'size of {node.type} \"{node.name}\": {node.size}')
return node.size
msg = f'getting node size recursively for \"{node.name}\"'
self._debug(msg)
size: int = 0
for i in node.children:
if node.type == nodes.TYPE_DIR:
sub_size = self.rec_size(i, store=store)
if store:
i.size = sub_size
size += sub_size
continue
if node.type == nodes.TYPE_STORAGE:
sub_size = self.rec_size(i, store=store)
if store:
i.size = sub_size
size += sub_size
continue
self._debug(f'skipping {node.name}')
if store:
node.size = size
self._debug(f'size of {node.type} \"{node.name}\": {size}')
return size
###############################################################
# public helpers
###############################################################
@ -195,7 +168,7 @@ class Noder:
return top
def new_file_node(self, name: str, path: str,
parent: NodeAny, storagepath: str) -> Optional[NodeFile]:
parent: NodeAny) -> Optional[NodeFile]:
"""create a new node representing a file"""
if not os.path.exists(path):
Logger.err(f'File \"{path}\" does not exist')
@ -209,11 +182,9 @@ class Noder:
md5 = ''
if self.hash:
md5 = self._get_hash(path)
relpath = os.sep.join([storagepath, name])
maccess = os.path.getmtime(path)
node = NodeFile(name,
relpath,
stat.st_size,
md5,
maccess,
@ -229,13 +200,11 @@ class Noder:
return node
def new_dir_node(self, name: str, path: str,
parent: NodeAny, storagepath: str) -> NodeDir:
parent: NodeAny) -> NodeDir:
"""create a new node representing a directory"""
path = os.path.abspath(path)
relpath = os.sep.join([storagepath, name])
maccess = os.path.getmtime(path)
return NodeDir(name,
relpath,
0,
maccess,
parent=parent)
@ -258,11 +227,13 @@ class Noder:
self.attrs_to_string(attrs),
parent=parent)
def new_archive_node(self, name: str, path: str,
parent: str, archive: str) -> NodeArchived:
def new_archive_node(self,
name: str,
parent: str,
archive: str) -> NodeArchived:
"""create a new node for archive data"""
return NodeArchived(name=name, relpath=path,
parent=parent, size=0, md5='',
return NodeArchived(name=name,
parent=parent, nodesize=0, md5='',
archive=archive)
###############################################################
@ -295,6 +266,7 @@ class Noder:
"""remove any node not flagged and clean flags"""
cnt = 0
for node in anytree.PreOrderIter(top):
typcast_node(node)
if node.type not in [nodes.TYPE_DIR, nodes.TYPE_FILE]:
continue
if self._clean(node):
@ -312,164 +284,82 @@ class Noder:
###############################################################
# printing
###############################################################
def _node_to_csv(self, node: NodeAny,
sep: str = ',',
raw: bool = False) -> None:
def _print_node_csv(self, node: NodeAny,
sep: str = ',',
raw: bool = False) -> None:
"""
print a node to csv
@node: the node to consider
@sep: CSV separator character
@raw: print raw size rather than human readable
"""
typcast_node(node)
if not node:
return
if node.type == nodes.TYPE_TOP:
return
out = []
if node.type == nodes.TYPE_STORAGE:
# handle storage
out.append(node.name) # name
out.append(node.type) # type
out.append('') # fake full path
size = self.rec_size(node, store=False)
out.append(size_to_str(size, raw=raw)) # size
out.append(epoch_to_str(node.ts)) # indexed_at
out.append('') # fake maccess
out.append('') # fake md5
out.append(str(len(node.children))) # nbfiles
# fake free_space
out.append(size_to_str(node.free, raw=raw))
# fake total_space
out.append(size_to_str(node.total, raw=raw))
out.append(node.attr) # meta
self.csv_printer.print_storage(node,
sep=sep,
raw=raw)
else:
# handle other nodes
out.append(node.name.replace('"', '""')) # name
out.append(node.type) # type
parents = self._get_parents(node)
storage = self._get_storage(node)
fullpath = os.path.join(storage.name, parents)
out.append(fullpath.replace('"', '""')) # full path
out.append(size_to_str(node.size, raw=raw)) # size
out.append(epoch_to_str(storage.ts)) # indexed_at
if self._has_attr(node, 'maccess'):
out.append(epoch_to_str(node.maccess)) # maccess
else:
out.append('') # fake maccess
if self._has_attr(node, 'md5'):
out.append(node.md5) # md5
else:
out.append('') # fake md5
if node.type == nodes.TYPE_DIR:
out.append(str(len(node.children))) # nbfiles
else:
out.append('') # fake nbfiles
out.append('') # fake free_space
out.append('') # fake total_space
out.append('') # fake meta
self.csv_printer.print_node(node,
sep=sep,
raw=raw)
line = sep.join(['"' + o + '"' for o in out])
if len(line) > 0:
Logger.stdout_nocolor(line)
def _print_node_du(self, node: NodeAny,
raw: bool = False) -> None:
"""
print node du style
"""
typcast_node(node)
thenodes = self._get_entire_tree(node,
dironly=True)
for thenode in thenodes:
self.native_printer.print_du(thenode, raw=raw)
def _print_node_native(self, node: NodeAny,
pre: str = '',
withpath: bool = False,
withdepth: bool = False,
withnbchildren: bool = False,
withstorage: bool = False,
recalcparent: bool = False,
raw: bool = False) -> None:
"""
print a node
@node: the node to print
@pre: string to print before node
@withpath: print the node path
@withdepth: print the node depth info
@withnbchildren: print the node nb children
@withstorage: print the node storage it belongs to
@recalcparent: get relpath from tree instead of relpath field
@raw: print raw size rather than human readable
"""
typcast_node(node)
if node.type == nodes.TYPE_TOP:
# top node
Logger.stdout_nocolor(f'{pre}{node.name}')
self.native_printer.print_top(pre, node.get_name())
elif node.type == nodes.TYPE_FILE:
# node of type file
name = node.name
if withpath:
if recalcparent:
name = os.sep.join([self._get_parents(node.parent), name])
else:
name = node.relpath
name = name.lstrip(os.sep)
if withstorage:
storage = self._get_storage(node)
attr_str = ''
if node.md5:
attr_str = f', md5:{node.md5}'
size = size_to_str(node.size, raw=raw)
compl = f'size:{size}{attr_str}'
if withstorage:
content = Logger.get_bold_text(storage.name)
compl += f', storage:{content}'
NodePrinter.print_file_native(pre, name, compl)
self.native_printer.print_file(pre, node,
withpath=withpath,
withstorage=withstorage,
raw=raw)
elif node.type == nodes.TYPE_DIR:
# node of type directory
name = node.name
if withpath:
if recalcparent:
name = os.sep.join([self._get_parents(node.parent), name])
else:
name = node.relpath
name = name.lstrip(os.sep)
depth = 0
if withdepth:
depth = len(node.children)
if withstorage:
storage = self._get_storage(node)
attr: List[Tuple[str, str]] = []
if node.size:
attr.append(('totsize', size_to_str(node.size, raw=raw)))
if withstorage:
attr.append(('storage', Logger.get_bold_text(storage.name)))
NodePrinter.print_dir_native(pre, name, depth=depth, attr=attr)
self.native_printer.print_dir(pre,
node,
withpath=withpath,
withstorage=withstorage,
withnbchildren=withnbchildren,
raw=raw)
elif node.type == nodes.TYPE_STORAGE:
# node of type storage
sztotal = size_to_str(node.total, raw=raw)
szused = size_to_str(node.total - node.free, raw=raw)
nbchildren = len(node.children)
pcent = 0
if node.total > 0:
pcent = node.free * 100 / node.total
freepercent = f'{pcent:.1f}%'
# get the date
timestamp = ''
if self._has_attr(node, 'ts'):
timestamp = 'date:'
timestamp += epoch_to_str(node.ts)
disksize = ''
# the children size
recsize = self.rec_size(node, store=False)
sizestr = size_to_str(recsize, raw=raw)
disksize = 'totsize:' + f'{sizestr}'
# format the output
name = node.name
args = [
'nbfiles:' + f'{nbchildren}',
disksize,
f'free:{freepercent}',
'du:' + f'{szused}/{sztotal}',
timestamp]
argsstring = ' | '.join(args)
NodePrinter.print_storage_native(pre,
name,
argsstring,
node.attr)
self.native_printer.print_storage(pre,
node,
raw=raw)
elif node.type == nodes.TYPE_ARCHIVED:
# archive node
if self.arc:
NodePrinter.print_archive_native(pre, node.name, node.archive)
self.native_printer.print_archive(pre, node.name, node.archive)
else:
Logger.err(f'bad node encountered: {node}')
@ -488,28 +378,33 @@ class Noder:
rend = anytree.RenderTree(node, childiter=self._sort_tree)
for pre, _, thenode in rend:
self._print_node_native(thenode, pre=pre,
withdepth=True, raw=raw)
withnbchildren=True, raw=raw)
elif fmt == 'csv':
# csv output
self._to_csv(node, raw=raw)
self._print_nodes_csv(node, raw=raw)
elif fmt == 'csv-with-header':
# csv output
Logger.stdout_nocolor(self.CSV_HEADER)
self._to_csv(node, raw=raw)
self.csv_printer.print_header()
self._print_nodes_csv(node, raw=raw)
def _to_csv(self, node: NodeAny,
raw: bool = False) -> None:
def _print_nodes_csv(self, node: NodeAny,
raw: bool = False) -> None:
"""print the tree to csv"""
rend = anytree.RenderTree(node, childiter=self._sort_tree)
for _, _, item in rend:
self._node_to_csv(item, raw=raw)
self._print_node_csv(item, raw=raw)
@staticmethod
def _fzf_prompt(strings: Any) -> Any:
# prompt with fzf
fzf = FzfPrompt()
selected = fzf.prompt(strings)
return selected
"""prompt with fzf"""
try:
from pyfzf.pyfzf import FzfPrompt # pylint: disable=C0415 # noqa
fzf = FzfPrompt()
selected = fzf.prompt(strings)
return selected
except ModuleNotFoundError:
Logger.err('install pyfzf to use fzf')
return None
def _to_fzf(self, node: NodeAny, fmt: str) -> None:
"""
@ -523,9 +418,9 @@ class Noder:
for _, _, rend in rendered:
if not rend:
continue
parents = self._get_parents(rend)
storage = self._get_storage(rend)
fullpath = os.path.join(storage.name, parents)
parents = rend.get_fullpath()
storage = rend.get_storage_node()
fullpath = os.path.join(storage.get_name(), parents)
the_nodes[fullpath] = rend
# prompt with fzf
paths = self._fzf_prompt(the_nodes.keys())
@ -550,14 +445,13 @@ class Noder:
###############################################################
# searching
###############################################################
def find_name(self, top: NodeTop,
key: str,
script: bool = False,
only_dir: bool = False,
startnode: Optional[NodeAny] = None,
parentfromtree: bool = False,
fmt: str = 'native',
raw: bool = False) -> List[NodeAny]:
def find(self, top: NodeTop,
key: str,
script: bool = False,
only_dir: bool = False,
startnode: Optional[NodeAny] = None,
fmt: str = 'native',
raw: bool = False) -> List[NodeAny]:
"""
find files based on their names
@top: top node
@ -565,7 +459,6 @@ class Noder:
@script: output script
@directory: only search for directories
@startpath: node to start with
@parentfromtree: get path from parent instead of stored relpath
@fmt: output format
@raw: raw size output
returns the found nodes
@ -578,19 +471,15 @@ class Noder:
start = self.get_node(top, startnode)
filterfunc = self._callback_find_name(key, only_dir)
found = anytree.findall(start, filter_=filterfunc)
nbfound = len(found)
self._debug(f'found {nbfound} node(s)')
self._debug(f'found {len(found)} node(s)')
# compile found nodes
paths = {}
for item in found:
item.name = fix_badchars(item.name)
if hasattr(item, 'relpath'):
item.relpath = fix_badchars(item.relpath)
if parentfromtree:
paths[self._get_parents(item)] = item
else:
paths[item.relpath] = item
typcast_node(item)
item.set_name(item.get_name())
key = item.get_fullpath()
paths[key] = item
# handle fzf mode
if fmt.startswith('fzf'):
@ -606,16 +495,16 @@ class Noder:
else:
if fmt == 'native':
for _, item in paths.items():
self._print_node_native(item, withpath=True,
withdepth=True,
self._print_node_native(item,
withpath=True,
withnbchildren=True,
withstorage=True,
recalcparent=parentfromtree,
raw=raw)
elif fmt.startswith('csv'):
if fmt == 'csv-with-header':
Logger.stdout_nocolor(self.CSV_HEADER)
self.csv_printer.print_header()
for _, item in paths.items():
self._node_to_csv(item, raw=raw)
self._print_node_csv(item, raw=raw)
# execute script if any
if script:
@ -629,6 +518,8 @@ class Noder:
def _callback_find_name(self, term: str, only_dir: bool) -> Any:
"""callback for finding files"""
def find_name(node: NodeAny) -> bool:
typcast_node(node)
path = node.get_fullpath()
if node.type == nodes.TYPE_STORAGE:
# ignore storage nodes
return False
@ -645,13 +536,28 @@ class Noder:
# filter
if not term:
return True
if term.lower() in node.name.lower():
if term in path:
return True
if self.debug:
Logger.debug(f'match \"{path}\" with \"{term}\"')
if fnmatch.fnmatch(path, term):
return True
# ignore
return False
return find_name
###############################################################
# fixsizes
###############################################################
def fixsizes(self, top: NodeTop) -> None:
"""fix node sizes"""
typcast_node(top)
rend = anytree.RenderTree(top)
for _, _, thenode in rend:
typcast_node(thenode)
thenode.nodesize = thenode.get_rec_size()
###############################################################
# ls
###############################################################
@ -668,13 +574,27 @@ class Noder:
@fmt: output format
@raw: print raw size
"""
self._debug(f'walking path: \"{path}\" from {top}')
self._debug(f'ls walking path: \"{path}\" from \"{top.get_name()}\"')
resolv = anytree.resolver.Resolver('name')
found = []
try:
# resolve the path in the tree
found = resolv.glob(top, path)
if '*' in path or '?' in path:
# we need to handle glob
self._debug('glob ls...')
found = resolv.glob(top, path)
else:
# we have a canonical path
self._debug('get ls...')
foundone = resolv.get(top, path)
cast(NodeAny, foundone)
typcast_node(foundone)
if foundone and foundone.may_have_children():
# let's find its children as well
modpath = os.path.join(path, '*')
found = resolv.glob(top, modpath)
else:
found = [foundone]
if len(found) < 1:
# nothing found
self._debug('nothing found')
@ -686,30 +606,19 @@ class Noder:
return found
# sort found nodes
found = sorted(found, key=self._sort, reverse=self.sortsize)
# print the parent
if fmt == 'native':
self._print_node_native(found[0].parent,
withpath=False,
withdepth=True,
raw=raw)
elif fmt.startswith('csv'):
self._node_to_csv(found[0].parent, raw=raw)
elif fmt.startswith('fzf'):
pass
found = sorted(found, key=os_sort_keygen(self._sort))
# print all found nodes
if fmt == 'csv-with-header':
Logger.stdout_nocolor(self.CSV_HEADER)
self.csv_printer.print_header()
for item in found:
if fmt == 'native':
self._print_node_native(item, withpath=False,
pre='- ',
withdepth=True,
self._print_node_native(item,
withpath=True,
withnbchildren=True,
raw=raw)
elif fmt.startswith('csv'):
self._node_to_csv(item, raw=raw)
self._print_node_csv(item, raw=raw)
elif fmt.startswith('fzf'):
self._to_fzf(item, fmt)
@ -717,6 +626,31 @@ class Noder:
pass
return found
###############################################################
# du
###############################################################
def diskusage(self, top: NodeTop,
path: str,
raw: bool = False) -> List[NodeAny]:
"""disk usage"""
self._debug(f'du walking path: \"{path}\" from \"{top.get_name()}\"')
resolv = anytree.resolver.Resolver('name')
found: NodeAny
try:
# we have a canonical path
self._debug('get du...')
found = resolv.get(top, path)
if not found:
# nothing found
self._debug('nothing found')
return []
self._debug(f'du found: {found}')
self._print_node_du(found, raw=raw)
except anytree.resolver.ChildResolverError:
pass
return found
###############################################################
# tree creation
###############################################################
@ -726,15 +660,15 @@ class Noder:
"""add an entry to the tree"""
entries = name.rstrip(os.sep).split(os.sep)
if len(entries) == 1:
self.new_archive_node(name, name, top, top.name)
self.new_archive_node(name, top, top.get_name())
return
sub = os.sep.join(entries[:-1])
nodename = entries[-1]
try:
parent = resolv.get(top, sub)
parent = self.new_archive_node(nodename, name, parent, top.name)
parent = self.new_archive_node(nodename, parent, top.get_name())
except anytree.resolver.ChildResolverError:
self.new_archive_node(nodename, name, top, top.name)
self.new_archive_node(nodename, top, top.get_name())
def list_to_tree(self, parent: NodeAny, names: List[str]) -> None:
"""convert list of files to a tree"""
@ -748,6 +682,23 @@ class Noder:
###############################################################
# diverse
###############################################################
def _get_entire_tree(self, start: NodeAny,
dironly: bool = False) -> List[NodeAny]:
"""
get entire tree and sort it
"""
typcast_node(start)
rend = anytree.RenderTree(start)
thenodes = []
if dironly:
for _, _, thenode in rend:
typcast_node(thenode)
if thenode.type == nodes.TYPE_DIR:
thenodes.append(thenode)
else:
thenodes = [x for _, _, x in rend]
return sorted(thenodes, key=os_sort_keygen(self._sort))
def _sort_tree(self,
items: List[NodeAny]) -> List[NodeAny]:
"""sorting a list of items"""
@ -760,42 +711,21 @@ class Noder:
return self._sort_fs(lst)
@staticmethod
def _sort_fs(node: NodeAny) -> Tuple[str, str]:
"""sorting nodes dir first and alpha"""
return (node.type, node.name.lstrip('.').lower())
def _sort_fs(node: NodeAny) -> str:
"""sort by name"""
# to sort by types then name
return str(node.name)
@staticmethod
def _sort_size(node: NodeAny) -> float:
"""sorting nodes by size"""
try:
if not node.size:
if not node.nodesize:
return 0
return float(node.size)
return float(node.nodesize)
except AttributeError:
return 0
def _get_storage(self, node: NodeAny) -> NodeStorage:
"""recursively traverse up to find storage"""
if node.type == nodes.TYPE_STORAGE:
return node
return cast(NodeStorage, node.ancestors[1])
@staticmethod
def _has_attr(node: NodeAny, attr: str) -> bool:
"""return True if node has attr as attribute"""
return attr in node.__dict__.keys()
def _get_parents(self, node: NodeAny) -> str:
"""get all parents recursively"""
if node.type == nodes.TYPE_STORAGE:
return ''
if node.type == nodes.TYPE_TOP:
return ''
parent = self._get_parents(node.parent)
if parent:
return os.sep.join([parent, node.name])
return str(node.name)
@staticmethod
def _get_hash(path: str) -> str:
"""return md5 hash of node"""

@ -6,8 +6,12 @@ Class that represents a node in the catalog tree
"""
# pylint: disable=W0622
from typing import Dict, Any
from anytree import NodeMixin # type: ignore
import os
from typing import Dict, Any, cast
from anytree import NodeMixin
from catcli.exceptions import CatcliException
from catcli.utils import fix_badchars
TYPE_TOP = 'top'
@ -21,29 +25,88 @@ NAME_TOP = 'top'
NAME_META = 'meta'
def typcast_node(node: Any) -> None:
"""typecast node to its sub type"""
if node.type == TYPE_TOP:
node.__class__ = NodeTop
elif node.type == TYPE_FILE:
node.__class__ = NodeFile
elif node.type == TYPE_DIR:
node.__class__ = NodeDir
elif node.type == TYPE_ARCHIVED:
node.__class__ = NodeArchived
elif node.type == TYPE_STORAGE:
node.__class__ = NodeStorage
elif node.type == TYPE_META:
node.__class__ = NodeMeta
else:
raise CatcliException(f"bad node: {node}")
class NodeAny(NodeMixin): # type: ignore
"""generic node"""
def __init__(self, # type: ignore[no-untyped-def]
name=None,
size=0,
parent=None,
children=None):
"""build generic node"""
super().__init__()
self.name = name
self.nodesize = size
self.parent = parent
if children:
self.children = children
def get_name(self) -> str:
"""get node name"""
return fix_badchars(self.name)
def set_name(self, name: str) -> None:
"""set node name"""
self.name = fix_badchars(name)
def has_attr(self, attr: str) -> bool:
"""return True if node has attr as attribute"""
return attr in self.__dict__
def may_have_children(self) -> bool:
"""can node contains sub"""
raise NotImplementedError
def _to_str(self) -> str:
ret = str(self.__class__) + ": " + str(self.__dict__)
if self.children:
ret += '\n'
for child in self.children:
ret += ' child => ' + str(child)
ret += f' child => {child}\n'
return ret
def __str__(self) -> str:
return self._to_str()
def get_fullpath(self) -> str:
"""return full path to this node"""
path = self.get_name()
if self.parent:
typcast_node(self.parent)
ppath = self.parent.get_fullpath()
path = os.path.join(ppath, path)
return fix_badchars(path)
def get_rec_size(self) -> int:
"""recursively traverse tree and return size"""
totsize: int = self.nodesize
for node in self.children:
typcast_node(node)
totsize += node.get_rec_size()
return totsize
def get_storage_node(self) -> NodeMixin:
"""recursively traverse up to find storage"""
return None
def flagged(self) -> bool:
"""is flagged"""
if not hasattr(self, '_flagged'):
@ -74,6 +137,23 @@ class NodeTop(NodeAny):
if children:
self.children = children
def get_fullpath(self) -> str:
"""return full path to this node"""
return ''
def may_have_children(self) -> bool:
"""can node contains sub"""
return True
def get_rec_size(self) -> int:
"""
recursively traverse tree and return size
also ensure to update the size on the way
"""
size = super().get_rec_size()
self.nodesize = size
return size
def __str__(self) -> str:
return self._to_str()
@ -83,8 +163,7 @@ class NodeFile(NodeAny):
def __init__(self, # type: ignore[no-untyped-def]
name: str,
relpath: str,
size: int,
nodesize: int,
md5: str,
maccess: float,
parent=None,
@ -93,14 +172,21 @@ class NodeFile(NodeAny):
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_FILE
self.relpath = relpath
self.size = size
self.nodesize = nodesize
self.md5 = md5
self.maccess = maccess
self.parent = parent
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return False
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return cast(NodeStorage, self.ancestors[1])
def __str__(self) -> str:
return self._to_str()
@ -110,8 +196,7 @@ class NodeDir(NodeAny):
def __init__(self, # type: ignore[no-untyped-def]
name: str,
relpath: str,
size: int,
nodesize: int,
maccess: float,
parent=None,
children=None):
@ -119,13 +204,29 @@ class NodeDir(NodeAny):
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_DIR
self.relpath = relpath
self.size = size
self.nodesize = nodesize
self.maccess = maccess
self.parent = parent
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return True
def get_rec_size(self) -> int:
"""
recursively traverse tree and return size
also ensure to update the size on the way
"""
size = super().get_rec_size()
self.nodesize = size
return size
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return cast(NodeStorage, self.ancestors[1])
def __str__(self) -> str:
return self._to_str()
@ -135,8 +236,7 @@ class NodeArchived(NodeAny):
def __init__(self, # type: ignore[no-untyped-def]
name: str,
relpath: str,
size: int,
nodesize: int,
md5: str,
archive: str,
parent=None,
@ -145,14 +245,21 @@ class NodeArchived(NodeAny):
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_ARCHIVED
self.relpath = relpath
self.size = size
self.nodesize = nodesize
self.md5 = md5
self.archive = archive
self.parent = parent
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return False
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return cast(NodeStorage, self.ancestors[1])
def __str__(self) -> str:
return self._to_str()
@ -164,7 +271,7 @@ class NodeStorage(NodeAny):
name: str,
free: int,
total: int,
size: int,
nodesize: int,
ts: float,
attr: str,
parent=None,
@ -176,12 +283,29 @@ class NodeStorage(NodeAny):
self.free = free
self.total = total
self.attr = attr
self.size = size
self.nodesize = nodesize
self.ts = ts # pylint: disable=C0103
self.parent = parent
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return True
def get_rec_size(self) -> int:
"""
recursively traverse tree and return size
also ensure to update the size on the way
"""
size = super().get_rec_size()
self.nodesize = size
return size
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return self
def __str__(self) -> str:
return self._to_str()
@ -203,5 +327,13 @@ class NodeMeta(NodeAny):
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return False
def get_rec_size(self) -> int:
"""recursively traverse tree and return size"""
return 0
def __str__(self) -> str:
return self._to_str()

@ -0,0 +1,39 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2024, deadc0de6
nodes helpers
"""
import os
# local imports
from catcli import nodes
def path_to_top(path: str) -> str:
"""path pivot under top"""
pre = f"{os.path.sep}{nodes.NAME_TOP}"
if not path.startswith(pre):
# prepend with top node path
path = pre + path
return path
def path_to_search_all(path: str) -> str:
"""path to search for all subs"""
if not path:
path = os.path.sep
if not path.startswith(os.path.sep):
path = os.path.sep + path
pre = f"{os.path.sep}{nodes.NAME_TOP}"
if not path.startswith(pre):
# prepend with top node path
path = pre + path
# if not path.endswith(os.path.sep):
# # ensure ends with a separator
# path += os.path.sep
# if not path.endswith(WILD):
# # add wild card
# path += WILD
return path

@ -0,0 +1,81 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2024, deadc0de6
Class for printing nodes in csv format
"""
import sys
from typing import List
from catcli.nodes import NodeAny, NodeStorage, TYPE_DIR
from catcli.utils import size_to_str, epoch_to_str
class CsvPrinter:
"""a node printer class"""
DEFSEP = ','
CSV_HEADER = ('name,type,path,size,indexed_at,'
'maccess,md5,nbfiles,free_space,'
'total_space,meta')
def _print_entries(self, entries: List[str], sep: str = DEFSEP) -> None:
line = sep.join(['"' + o + '"' for o in entries])
if len(line) > 0:
sys.stdout.write(f'{line}\n')
def print_header(self) -> None:
"""print csv header"""
sys.stdout.write(f'{self.CSV_HEADER}\n')
def print_storage(self, node: NodeStorage,
sep: str = DEFSEP,
raw: bool = False) -> None:
"""print a storage node"""
out = []
out.append(node.get_name()) # name
out.append(node.type) # type
out.append('') # fake full path
size = node.get_rec_size()
out.append(size_to_str(size, raw=raw)) # size
out.append(epoch_to_str(node.ts)) # indexed_at
out.append('') # fake maccess
out.append('') # fake md5
out.append(str(len(node.children))) # nbfiles
# fake free_space
out.append(size_to_str(node.free, raw=raw))
# fake total_space
out.append(size_to_str(node.total, raw=raw))
out.append(node.attr) # meta
self._print_entries(out, sep=sep)
def print_node(self, node: NodeAny,
sep: str = DEFSEP,
raw: bool = False) -> None:
"""print other nodes"""
out = []
out.append(node.get_name().replace('"', '""')) # name
out.append(node.type) # type
fullpath = node.get_fullpath()
out.append(fullpath.replace('"', '""')) # full path
out.append(size_to_str(node.nodesize, raw=raw)) # size
storage = node.get_storage_node()
out.append(epoch_to_str(storage.ts)) # indexed_at
if node.has_attr('maccess'):
out.append(epoch_to_str(node.maccess)) # maccess
else:
out.append('') # fake maccess
if node.has_attr('md5'):
out.append(node.md5) # md5
else:
out.append('') # fake md5
if node.type == TYPE_DIR:
out.append(str(len(node.children))) # nbfiles
else:
out.append('') # fake nbfiles
out.append('') # fake free_space
out.append('') # fake total_space
out.append('') # fake meta
self._print_entries(out, sep=sep)

@ -0,0 +1,165 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
Class for printing nodes in native format
"""
import sys
from catcli.nodes import NodeFile, NodeDir, \
NodeStorage, NodeAny, typcast_node
from catcli.colors import Colors
from catcli.logger import Logger
from catcli.utils import fix_badchars, size_to_str, \
epoch_to_str
COLOR_STORAGE = Colors.YELLOW
COLOR_FILE = Colors.WHITE
COLOR_DIRECTORY = Colors.BLUE
COLOR_ARCHIVE = Colors.PURPLE
COLOR_TS = Colors.CYAN
COLOR_SIZE = Colors.GREEN
FULLPATH_IN_NAME = True
class NativePrinter:
"""a node printer class"""
STORAGE = 'storage'
ARCHIVE = 'archive'
NBFILES = 'nbfiles'
def print_du(self, node: NodeAny,
raw: bool = False) -> None:
"""print du style"""
typcast_node(node)
name = node.get_fullpath()
size = node.nodesize
line = size_to_str(size, raw=raw).ljust(10, ' ')
out = f'{COLOR_SIZE}{line}{Colors.RESET}'
out += ' '
out += f'{COLOR_FILE}{name}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
def print_top(self, pre: str, name: str) -> None:
"""print top node"""
sys.stdout.write(f'{pre}{name}\n')
def print_storage(self, pre: str,
node: NodeStorage,
raw: bool = False) -> None:
"""print a storage node"""
# construct name
name = node.get_name()
# construct attrs
attrs = []
# nb files
attrs.append(f'nbfiles:{len(node.children)}')
# the children size
recsize = node.get_rec_size()
sizestr = size_to_str(recsize, raw=raw)
attrs.append(f'totsize:{sizestr}')
# free
pcent = 0.0
if node.total > 0:
pcent = node.free * 100 / node.total
attrs.append(f'free:{pcent:.1f}%')
# du
sztotal = size_to_str(node.total, raw=raw)
szused = size_to_str(node.total - node.free, raw=raw)
attrs.append(f'du:{szused}/{sztotal}')
# timestamp
if node.has_attr('ts'):
attrs.append(f'date:{epoch_to_str(node.ts)}')
# print
out = f'{pre}{Colors.UND}{self.STORAGE}{Colors.RESET}: '
out += f'{COLOR_STORAGE}{name}{Colors.RESET}'
if attrs:
out += f' [{Colors.WHITE}{"|".join(attrs)}{Colors.RESET}]'
sys.stdout.write(f'{out}\n')
def print_file(self, pre: str,
node: NodeFile,
withpath: bool = False,
withstorage: bool = False,
raw: bool = False) -> None:
"""print a file node"""
# construct name
name = node.get_name()
storage = node.get_storage_node()
if withpath:
name = node.get_fullpath()
# construct attributes
attrs = []
if node.md5:
attrs.append(f'md5:{node.md5}')
if withstorage:
content = Logger.get_bold_text(storage.get_name())
attrs.append(f'storage:{content}')
# print
out = []
out.append(f'{pre}')
out.append(f'{COLOR_FILE}{name}{Colors.RESET}')
size = 0
if node.nodesize:
size = node.nodesize
line = size_to_str(size, raw=raw)
out.append(f'{COLOR_SIZE}{line}{Colors.RESET}')
if node.has_attr('maccess'):
line = epoch_to_str(node.maccess)
out.append(f'{COLOR_TS}{line}{Colors.RESET}')
if attrs:
out.append(f'{Colors.GRAY}[{",".join(attrs)}]{Colors.RESET}')
out = [x for x in out if x]
sys.stdout.write(f'{" ".join(out)}\n')
def print_dir(self, pre: str,
node: NodeDir,
withpath: bool = False,
withstorage: bool = False,
withnbchildren: bool = False,
raw: bool = False) -> None:
"""print a directory node"""
# construct name
name = node.get_name()
storage = node.get_storage_node()
if withpath:
name = node.get_fullpath()
# construct attrs
attrs = []
if withnbchildren:
nbchildren = len(node.children)
attrs.append(f'{self.NBFILES}:{nbchildren}')
if withstorage:
attrs.append(f"storage:{Logger.get_bold_text(storage.get_name())}")
# print
out = []
out.append(f'{pre}')
out.append(f'{COLOR_DIRECTORY}{name}{Colors.RESET}')
size = 0
if node.nodesize:
size = node.nodesize
line = size_to_str(size, raw=raw)
out.append(f'{COLOR_SIZE}{line}{Colors.RESET}')
if node.has_attr('maccess'):
line = epoch_to_str(node.maccess)
out.append(f'{COLOR_TS}{line}{Colors.RESET}')
if attrs:
out.append(f'{Colors.GRAY}[{",".join(attrs)}]{Colors.RESET}')
out = [x for x in out if x]
sys.stdout.write(f'{" ".join(out)}\n')
def print_archive(self, pre: str,
name: str, archive: str) -> None:
"""print an archive"""
name = fix_badchars(name)
out = f'{pre}{COLOR_ARCHIVE}{name}{Colors.RESET} '
out += f'{Colors.GRAY}[{self.ARCHIVE}:{archive}]{Colors.RESET}'
sys.stdout.write(f'{out}\n')

@ -10,44 +10,15 @@ import hashlib
import tempfile
import subprocess
import datetime
import string
# local imports
from catcli import nodes
from catcli.exceptions import CatcliException
SEPARATOR = '/'
WILD = '*'
def path_to_top(path: str) -> str:
"""path pivot under top"""
pre = f'{SEPARATOR}{nodes.NAME_TOP}'
if not path.startswith(pre):
# prepend with top node path
path = pre + path
return path
def path_to_search_all(path: str) -> str:
"""path to search for all subs"""
if not path:
path = SEPARATOR
if not path.startswith(SEPARATOR):
path = SEPARATOR + path
pre = f'{SEPARATOR}{nodes.NAME_TOP}'
if not path.startswith(pre):
# prepend with top node path
path = pre + path
if not path.endswith(SEPARATOR):
# ensure ends with a separator
path += SEPARATOR
if not path.endswith(WILD):
# add wild card
path += WILD
return path
def md5sum(path: str) -> str:
"""
calculate md5 sum of a file
@ -102,19 +73,20 @@ def ask(question: str) -> bool:
return resp.lower() == 'y'
def edit(string: str) -> str:
def edit(data: str) -> str:
"""edit the information with the default EDITOR"""
data = string.encode('utf-8')
content = fix_badchars(data)
editor = os.environ.get('EDITOR', 'vim')
with tempfile.NamedTemporaryFile(prefix='catcli', suffix='.tmp') as file:
file.write(data)
file.write(content.encode('utf-8'))
file.flush()
subprocess.call([editor, file.name])
subprocess.call([editor, file.get_name()])
file.seek(0)
new = file.read()
return new.decode('utf-8')
def fix_badchars(string: str) -> str:
def fix_badchars(data: str) -> str:
"""fix none utf-8 chars in string"""
return string.encode('utf-8', 'ignore').decode('utf-8')
data = "".join(x for x in data if x in string.printable)
return data.encode("utf-8", "ignore").decode("utf-8")

@ -3,4 +3,4 @@ author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
"""
__version__ = '0.9.1'
__version__ = '1.0'

@ -35,7 +35,8 @@ class Walker:
self.debug = debug
self.lpath = logpath
def index(self, path: str,
def index(self,
path: str,
parent: NodeAny,
name: str,
storagepath: str = '') -> Tuple[str, int]:
@ -47,8 +48,10 @@ class Walker:
"""
self._debug(f'indexing starting at {path}')
if not parent:
parent = self.noder.new_dir_node(name, path,
parent, storagepath)
# create the parent
parent = self.noder.new_dir_node(name,
path,
parent)
if os.path.islink(path):
rel = os.readlink(path)
@ -65,8 +68,9 @@ class Walker:
continue
self._progress(file)
self._debug(f'index file {sub}')
node = self.noder.new_file_node(os.path.basename(file), sub,
parent, storagepath)
node = self.noder.new_file_node(os.path.basename(file),
sub,
parent)
if node:
cnt += 1
for adir in dirs:
@ -76,7 +80,7 @@ class Walker:
self._debug(f'index directory {sub}')
if not os.path.exists(sub):
continue
dummy = self.noder.new_dir_node(base, sub, parent, storagepath)
dummy = self.noder.new_dir_node(base, sub, parent)
if not dummy:
continue
cnt += 1
@ -118,8 +122,9 @@ class Walker:
if node:
node.flag()
continue
node = self.noder.new_file_node(os.path.basename(file), sub,
parent, storagepath)
node = self.noder.new_file_node(os.path.basename(file),
sub,
parent)
if node:
node.flag()
cnt += 1
@ -131,7 +136,7 @@ class Walker:
reindex, dummy = self._need_reindex(parent, sub, treepath)
if reindex:
dummy = self.noder.new_dir_node(base, sub,
parent, storagepath)
parent)
cnt += 1
if dummy:
dummy.flag()
@ -166,7 +171,7 @@ class Walker:
if node and changed:
# remove this node and re-add
self._debug(f'\t{path} has changed')
self._debug(f'\tremoving node {node.name} for {path}')
self._debug(f"\tremoving node {node.get_name()} for {path}")
node.parent = None
return True, node

@ -3,3 +3,6 @@ types-docopt; python_version >= '3.0'
anytree; python_version >= '3.0'
pyfzf; python_version >= '3.0'
fusepy; python_version >= '3.0'
natsort; python_version >= '3.0'
cmd2; python_version >= '3.0'
gnureadline; python_version >= '3.0'

@ -37,17 +37,19 @@ setup(
python_requires=REQUIRES_PYTHON,
classifiers=[
'Development Status :: 5 - Production/Stable',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'License :: OSI Approved :: GNU General Public License v3 (GPLv3)',
],
keywords='catalog commandline indexer offline',
packages=find_packages(exclude=['tests*']),
install_requires=['docopt', 'anytree'],
install_requires=['docopt', 'types-docopt', 'anytree',
'pyfzf', 'fusepy', 'natsort', 'cmd2',
'gnureadline'],
extras_require={
'dev': ['check-manifest'],

@ -0,0 +1,22 @@
#!/usr/bin/env bash
# run me from the root of the package
source tests-ng/helper
rm -f tests-ng/assets/github.catalog.json
python3 -m catcli.catcli index -c github .github --catalog=tests-ng/assets/github.catalog.json
# edit catalog
clean_catalog "tests-ng/assets/github.catalog.json"
# native
python3 -m catcli.catcli ls -r -s -B --catalog=tests-ng/assets/github.catalog.json | \
sed -e 's/free:.*%/free:0.0%/g' \
-e 's/....-..-.. ..:..:../2023-03-09 16:20:59/g' \
-e 's#du:[^|]* |#du:0/0 |#g' > tests-ng/assets/github.catalog.native.txt
# csv
python3 -m catcli.catcli ls -r -s -B --catalog=tests-ng/assets/github.catalog.json --format=csv | \
sed -e 's/"3","[^"]*","[^"]*",""/"3","0","0",""/g' | \
sed 's/20..-..-.. ..:..:..//g' > tests-ng/assets/github.catalog.csv.txt

@ -1,5 +1,6 @@
"github","storage","","1510","2023-03-09 16:20:59","","","2","0","0",""
"workflows","dir","github/workflows","1493","2023-03-09 16:20:59","2023-03-09 16:20:44","","2","","",""
"pypi-release.yml","file","github/workflows/pypi-release.yml","691","2023-03-09 16:20:59","2022-10-19 21:00:37","57699a7a6a03e20e864f220e19f8e197","","","",""
"testing.yml","file","github/workflows/testing.yml","802","2023-03-09 16:20:59","2023-03-09 16:20:44","7144a119ef43adb634654522c12ec250","","","",""
"FUNDING.yml","file","github/FUNDING.yml","17","2023-03-09 16:20:59","2022-10-19 21:00:37","0c6407a84d412c514007313fb3bca4de","","","",""
"github","storage","","4865","","","","3","0","0",""
"FUNDING.yml","file","github/FUNDING.yml","17","","","0c6407a84d412c514007313fb3bca4de","","","",""
"codecov.yml","file","github/codecov.yml","104","","","4203204f75b43cd4bf032402beb3359d","","","",""
"workflows","dir","github/workflows","3082","","","","2","","",""
"pypi-release.yml","file","github/workflows/pypi-release.yml","691","","","57699a7a6a03e20e864f220e19f8e197","","","",""
"testing.yml","file","github/workflows/testing.yml","850","","","691df1a4d2f254b5cd04c152e7c6ccaf","","","",""

@ -7,54 +7,59 @@
"maccess": 1666206037.0786593,
"md5": "0c6407a84d412c514007313fb3bca4de",
"name": "FUNDING.yml",
"relpath": "/FUNDING.yml",
"size": 17,
"type": "file"
},
{
"maccess": 1704320710.7056112,
"md5": "4203204f75b43cd4bf032402beb3359d",
"name": "codecov.yml",
"size": 104,
"type": "file"
},
{
"children": [
{
"maccess": 1666206037.078865,
"md5": "57699a7a6a03e20e864f220e19f8e197",
"name": "pypi-release.yml",
"relpath": "workflows/pypi-release.yml",
"size": 691,
"type": "file"
},
{
"maccess": 1678375244.4870229,
"md5": "7144a119ef43adb634654522c12ec250",
"maccess": 1704403569.24789,
"md5": "691df1a4d2f254b5cd04c152e7c6ccaf",
"name": "testing.yml",
"relpath": "workflows/testing.yml",
"size": 802,
"size": 850,
"type": "file"
}
],
"maccess": 1678375244.4865956,
"maccess": 1704320727.2641916,
"name": "workflows",
"relpath": "/workflows",
"size": 1493,
"size": 1541,
"type": "dir"
}
],
"free": 0,
"name": "github",
"size": 1510,
"size": 1662,
"total": 0,
"ts": 1678375259,
"ts": 1704923096,
"type": "storage"
},
{
"attr": {
"access": 1678375259,
"access_version": "0.8.7",
"created": 1678375259,
"created_version": "0.8.7"
"access": 1704923096,
"access_version": "0.9.6",
"created": 1704923096,
"created_version": "0.9.6"
},
"name": "meta",
"size": null,
"type": "meta"
}
],
"name": "top",
"size": null,
"type": "top"
}

@ -1,7 +1,7 @@
top
└── storage: github
nbfiles:2 | totsize:1510 | free:0.0% | du:0/0 | date:2023-03-09 16:20:59
├── workflows [nbfiles:2, totsize:1493]
│ ├── pypi-release.yml [size:691, md5:57699a7a6a03e20e864f220e19f8e197]
│ └── testing.yml [size:802, md5:7144a119ef43adb634654522c12ec250]
└── FUNDING.yml [size:17, md5:0c6407a84d412c514007313fb3bca4de]
└── storage: github [nbfiles:3|totsize:4865|free:0.0%|du:0/0|date:2023-03-09 16:20:59]
├── FUNDING.yml 17 2023-03-09 16:20:59 [md5:0c6407a84d412c514007313fb3bca4de]
├── codecov.yml 104 2023-03-09 16:20:59 [md5:4203204f75b43cd4bf032402beb3359d]
└── workflows 3082 2023-03-09 16:20:59 [nbfiles:2]
├── pypi-release.yml 691 2023-03-09 16:20:59 [md5:57699a7a6a03e20e864f220e19f8e197]
└── testing.yml 850 2023-03-09 16:20:59 [md5:691df1a4d2f254b5cd04c152e7c6ccaf]

@ -2,30 +2,16 @@
# author: deadc0de6 (https://github.com/deadc0de6)
# Copyright (c) 2023, deadc0de6
# exit on first error
set -e
# get current path
rl="readlink -f"
if ! ${rl} "${0}" >/dev/null 2>&1; then
rl="realpath"
if ! command -v ${rl}; then
echo "\"${rl}\" not found !" && exit 1
fi
fi
cur=$(dirname "$(${rl} "${0}")")
# pivot
cur=$(cd "$(dirname "${0}")" && pwd)
prev="${cur}/.."
cd "${prev}"
# coverage
#export PYTHONPATH=".:${PYTHONPATH}"
bin="python3 -m catcli.catcli"
if command -v coverage 2>/dev/null; then
bin="coverage run -p --source=catcli -m catcli.catcli"
#bin="coverage run -p --source=${prev}/catcli -m catcli.catcli"
mkdir -p coverages/
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
fi
echo "current dir: $(pwd)"
@ -50,6 +36,7 @@ catalog="${tmpd}/catalog"
# index
${bin} -B index -c --catalog="${catalog}" github .github
clean_catalog "${catalog}"
ls -laR .github
cat "${catalog}"
@ -115,7 +102,7 @@ native="${tmpd}/native.txt"
${bin} -B ls -s -r --format=native --catalog="${catalog}" > "${native}"
mod="${tmpd}/native.mod.txt"
cat "${native}" | sed -e 's/free:.*%/free:0.0%/g' \
-e 's/date:....-..-.. ..:..:../date:2023-03-09 16:20:59/g' \
-e 's/....-..-.. ..:..:../2023-03-09 16:20:59/g' \
-e 's#du:[^|]* |#du:0/0 |#g' > "${mod}"
if command -v delta >/dev/null; then
delta -s "tests-ng/assets/github.catalog.native.txt" "${mod}"
@ -129,13 +116,14 @@ csv="${tmpd}/csv.txt"
${bin} -B ls -s -r --format=csv --catalog="${catalog}" > "${csv}"
# modify created csv
mod="${tmpd}/csv.mod.txt"
cat "${csv}" | sed -e 's/"2","[^"]*","[^"]*",""/"2","0","0",""/g' | \
cat "${csv}" | \
sed -e 's/"3","[^"]*","[^"]*",""/"3","0","0",""/g' | \
sed 's/20..-..-.. ..:..:..//g' > "${mod}"
# modify original
ori="${tmpd}/ori.mod.txt"
cat "tests-ng/assets/github.catalog.csv.txt" | \
sed 's/....-..-.. ..:..:..//g' | \
sed 's/"2","[^"]*","[^"]*",""/"2","0","0",""/g' > "${ori}"
sed 's/"3","[^"]*","[^"]*",""/"3","0","0",""/g' > "${ori}"
if command -v delta >/dev/null; then
delta -s "${ori}" "${mod}"
fi

@ -0,0 +1,61 @@
#!/usr/bin/env bash
# author: deadc0de6 (https://github.com/deadc0de6)
# Copyright (c) 2023, deadc0de6
set -e
cur=$(cd "$(dirname "${0}")" && pwd)
prev="${cur}/.."
cd "${prev}"
# coverage
bin="python3 -m catcli.catcli"
if command -v coverage 2>/dev/null; then
mkdir -p coverages/
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
fi
echo "current dir: $(pwd)"
echo "pythonpath: ${PYTHONPATH}"
echo "bin: ${bin}"
${bin} --version
# get the helpers
# shellcheck source=tests-ng/helper
source "${cur}"/helper
echo -e "$(tput setaf 6)==> RUNNING $(basename "${BASH_SOURCE[0]}") <==$(tput sgr0)"
##########################################################
# the test
##########################################################
# create temp dirs
tmpd=$(mktemp -d)
clear_on_exit "${tmpd}"
catalog="${tmpd}/catalog"
# index
${bin} -B index -c -f --catalog="${catalog}" github1 .github
${bin} -B index -c -f --catalog="${catalog}" github2 .github
clean_catalog "${catalog}"
#cat "${catalog}"
echo ""
${bin} -B ls -r --catalog="${catalog}"
echo "finding \"testing.yml\""
${bin} -B find --catalog="${catalog}" testing.yml
cnt=$(${bin} -B find --catalog="${catalog}" testing.yml | wc -l)
[ "${cnt}" != "2" ] && echo "should return 2 (not ${cnt})" && exit 1
echo "finding \"*.yml\""
${bin} -B find --catalog="${catalog}" '*.yml'
cnt=$(${bin} -B find --catalog="${catalog}" '*.yml' | wc -l)
[ "${cnt}" != "8" ] && echo "should return 8 (not ${cnt})" && exit 1
# the end
echo ""
echo "test \"$(basename "$0")\" success"
cd "${cur}"
exit 0

@ -21,6 +21,14 @@ clear_on_exit()
fi
}
# clear catalog stuff for testing
# $1: catalog path
clean_catalog()
{
sed -i 's/"free": .*,/"free": 0,/g' "${1}"
sed -i 's/"total": .*,/"total": 0,/g' "${1}"
}
# clear files
on_exit()
{

@ -0,0 +1,59 @@
#!/usr/bin/env bash
# author: deadc0de6 (https://github.com/deadc0de6)
# Copyright (c) 2024, deadc0de6
set -e
cur=$(cd "$(dirname "${0}")" && pwd)
prev="${cur}/.."
cd "${prev}"
# coverage
bin="python3 -m catcli.catcli"
if command -v coverage 2>/dev/null; then
mkdir -p coverages/
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
fi
echo "current dir: $(pwd)"
echo "pythonpath: ${PYTHONPATH}"
echo "bin: ${bin}"
${bin} --version
# get the helpers
# shellcheck source=tests-ng/helper
source "${cur}"/helper
echo -e "$(tput setaf 6)==> RUNNING $(basename "${BASH_SOURCE[0]}") <==$(tput sgr0)"
##########################################################
# the test
##########################################################
# create temp dirs
tmpd=$(mktemp -d)
clear_on_exit "${tmpd}"
catalog="${tmpd}/catalog"
# index
${bin} -B index -c -f --catalog="${catalog}" github1 .github
${bin} -B index -c -f --catalog="${catalog}" github2 .github
clean_catalog "${catalog}"
#cat "${catalog}"
echo ""
${bin} -B ls -r --catalog="${catalog}"
${bin} -B ls --catalog="${catalog}" 'github1/*.yml'
cnt=$(${bin} -B ls --catalog="${catalog}" 'github1/*.yml' | wc -l)
[ "${cnt}" != "2" ] && echo "should return 2 (not ${cnt})" && exit 1
${bin} -B ls --catalog="${catalog}" 'github*/*.yml'
cnt=$(${bin} -B ls --catalog="${catalog}" 'github*/*.yml' | wc -l)
[ "${cnt}" != "4" ] && echo "should return 4 (not ${cnt})" && exit 1
# the end
echo ""
echo "test \"$(basename "$0")\" success"
cd "${cur}"
exit 0

@ -2,30 +2,16 @@
# author: deadc0de6 (https://github.com/deadc0de6)
# Copyright (c) 2021, deadc0de6
# exit on first error
set -e
# get current path
rl="readlink -f"
if ! ${rl} "${0}" >/dev/null 2>&1; then
rl="realpath"
if ! command -v ${rl}; then
echo "\"${rl}\" not found !" && exit 1
fi
fi
cur=$(dirname "$(${rl} "${0}")")
# pivot
cur=$(cd "$(dirname "${0}")" && pwd)
prev="${cur}/.."
cd "${prev}"
# coverage
#export PYTHONPATH=".:${PYTHONPATH}"
bin="python3 -m catcli.catcli"
if command -v coverage 2>/dev/null; then
bin="coverage run -p --source=catcli -m catcli.catcli"
#bin="coverage run -p --source=${prev}/catcli -m catcli.catcli"
mkdir -p coverages/
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
fi
echo "current dir: $(pwd)"
@ -55,12 +41,12 @@ echo "abc" > "${tmpd}/dir/a"
# index
${bin} -B index --catalog="${catalog}" dir "${tmpd}/dir"
${bin} -B ls --catalog="${catalog}" dir
${bin} -B ls --catalog="${catalog}"
# get attributes
freeb=$(${bin} -B ls --catalog="${catalog}" dir | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g')
dub=$(${bin} -B ls --catalog="${catalog}" dir | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g')
dateb=$(${bin} -B ls --catalog="${catalog}" dir | grep date: | sed 's/^.*,date: \(.*\)$/\1/g')
freeb=$(${bin} -B ls --catalog="${catalog}" | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g')
dub=$(${bin} -B ls --catalog="${catalog}" | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g')
dateb=$(${bin} -B ls --catalog="${catalog}" | grep date: | sed 's/^.*,date: \(.*\)$/\1/g')
echo "before: free:${freeb} | du:${dub} | date:${dateb}"
# change content
@ -75,12 +61,12 @@ sleep 1
# update
${bin} -B update -f --catalog="${catalog}" dir "${tmpu}/dir"
${bin} -B ls --catalog="${catalog}" dir
${bin} -B ls --catalog="${catalog}"
# get new attributes
freea=$(${bin} -B ls --catalog="${catalog}" dir | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g')
dua=$(${bin} -B ls --catalog="${catalog}" dir | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g')
datea=$(${bin} -B ls --catalog="${catalog}" dir | grep date: | sed 's/^.*,date: \(.*\)$/\1/g')
freea=$(${bin} -B ls --catalog="${catalog}" | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g')
dua=$(${bin} -B ls --catalog="${catalog}" | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g')
datea=$(${bin} -B ls --catalog="${catalog}" | grep date: | sed 's/^.*,date: \(.*\)$/\1/g')
echo "after: free:${freea} | du:${dua} | date:${datea}"
# test they are all different

@ -2,7 +2,7 @@ pycodestyle; python_version >= '3.0'
pyflakes; python_version >= '3.0'
nose2; python_version >= '3.0'
coverage; python_version >= '3.0'
coveralls; python_version >= '3.0'
pylint; python_version > '3.0'
mypy; python_version > '3.0'
pytest; python_version > '3.0'
pytype; python_version > '3.0'

@ -45,8 +45,6 @@ pylint -sn \
--disable=R0022 \
catcli/
# R0801: Similar lines in 2 files
# W0212: Access to a protected member
# R0914: Too many local variables
@ -61,11 +59,23 @@ pylint -sn setup.py
# mypy
echo "[+] mypy"
mypy --strict catcli/
mypy --version
mypy --config-file=.mypy.ini catcli/
# pytype
echo "[+] pytype"
pytype --version
pytype catcli/
set +e
grep -R 'TODO' catcli/ && echo "TODO found" && exit 1
grep -R 'FIXME' catcli/ && echo "FIXME found" && exit 1
set -e
# unittest
echo "[+] unittests"
coverage run -p -m pytest tests
mkdir -p coverages/
coverage run -p --data-file coverages/coverage -m pytest tests
# tests-ng
echo "[+] tests-ng"
@ -89,7 +99,8 @@ done
# merge coverage
echo "[+] coverage merge"
coverage combine
coverage combine coverages/*
coverage xml
echo "ALL TESTS DONE OK"
exit 0

@ -149,21 +149,18 @@ FAKECATALOG = """
{
"md5": null,
"name": "7544G",
"relpath": "tmpj5602ih7.catcli/7544G",
"size": 100,
"type": "file"
},
{
"md5": null,
"name": "KF2ZC",
"relpath": "tmpj5602ih7.catcli/KF2ZC",
"size": 100,
"type": "file"
},
{
"md5": null,
"name": "Z9OII",
"relpath": "tmpj5602ih7.catcli/Z9OII",
"size": 100,
"type": "file"
},
@ -172,14 +169,12 @@ FAKECATALOG = """
{
"md5": null,
"name": "M592O9",
"relpath": "tmpj5602ih7.catcli/VNN/M592O9",
"size": 100,
"type": "file"
}
],
"md5": null,
"name": "VNN",
"relpath": "VNN",
"size": 100,
"type": "dir"
},
@ -188,21 +183,18 @@ FAKECATALOG = """
{
"md5": null,
"name": "X37H",
"relpath": "tmpj5602ih7.catcli/P4C/X37H",
"size": 100,
"type": "file"
},
{
"md5": null,
"name": "I566",
"relpath": "tmpj5602ih7.catcli/P4C/I566",
"size": 100,
"type": "file"
}
],
"md5": null,
"name": "P4C",
"relpath": "P4C",
"size": 200,
"type": "dir"
}

@ -0,0 +1,29 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2017, deadc0de6
Basic unittest for ls
"""
import unittest
from catcli.decomp import Decomp
class TestDecomp(unittest.TestCase):
"""test ls"""
def test_list(self):
"""test decomp formats"""
dec = Decomp()
formats = dec.get_formats()
self.assertTrue('zip' in formats)
def main():
"""entry point"""
unittest.main()
if __name__ == '__main__':
main()

@ -50,7 +50,7 @@ class TestIndexing(unittest.TestCase):
tmpdirname = 'tmpdir'
args = {'<path>': dirpath, '<name>': tmpdirname,
'--hash': True, '--meta': ['some meta'],
'--no-subsize': False, '--verbose': True}
'--verbose': True}
# index the directory
cmd_index(args, noder, catalog, top)
@ -62,7 +62,7 @@ class TestIndexing(unittest.TestCase):
self.assertTrue(len(storage.children) == 5)
# ensures files and directories are in
names = [x.name for x in storage.children]
names = [x.get_name() for x in storage.children]
self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names)
self.assertTrue(os.path.basename(file3) in names)
@ -70,9 +70,9 @@ class TestIndexing(unittest.TestCase):
self.assertTrue(os.path.basename(dir2) in names)
for node in storage.children:
if node.name == os.path.basename(dir1):
if node.get_name() == os.path.basename(dir1):
self.assertTrue(len(node.children) == 2)
elif node.name == os.path.basename(dir2):
elif node.get_name() == os.path.basename(dir2):
self.assertTrue(len(node.children) == 1)

@ -46,7 +46,7 @@ class TestRm(unittest.TestCase):
top = cmd_rm(args, noder, catalog, top)
# ensure there no children anymore
self.assertTrue(len(top.children) == 0)
self.assertEqual(len(top.children), 0)
def main():

@ -62,8 +62,7 @@ class TestUpdate(unittest.TestCase):
tmpdirname = 'tmpdir'
args = {'<path>': dirpath, '<name>': tmpdirname,
'--hash': True, '--meta': ['some meta'],
'--no-subsize': False, '--verbose': True,
'--lpath': None}
'--verbose': True, '--lpath': None}
# index the directory
unix_tree(dirpath)
@ -71,11 +70,11 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(os.stat(catalogpath).st_size != 0)
# ensure md5 sum are in
nods = noder.find_name(top, os.path.basename(file4))
self.assertTrue(len(nods) == 1)
nods = noder.find(top, os.path.basename(file4))
self.assertEqual(len(nods), 1)
nod = nods[0]
self.assertTrue(nod)
self.assertTrue(nod.md5 == f4_md5)
self.assertEqual(nod.md5, f4_md5)
# print catalog
noder.print_tree(top)
@ -123,12 +122,12 @@ class TestUpdate(unittest.TestCase):
noder.print_tree(top)
# explore the top node to find all nodes
self.assertTrue(len(top.children) == 1)
self.assertEqual(len(top.children), 1)
storage = top.children[0]
self.assertTrue(len(storage.children) == 8)
self.assertEqual(len(storage.children), 8)
# ensure d1f1 md5 sum has changed in catalog
nods = noder.find_name(top, os.path.basename(d1f1))
nods = noder.find(top, os.path.basename(d1f1))
self.assertTrue(len(nods) == 1)
nod = nods[0]
self.assertTrue(nod)
@ -136,7 +135,7 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(nod.md5 == d1f1_md5_new)
# ensure f4 md5 sum has changed in catalog
nods = noder.find_name(top, os.path.basename(file4))
nods = noder.find(top, os.path.basename(file4))
self.assertTrue(len(nods) == 1)
nod = nods[0]
self.assertTrue(nod)
@ -144,7 +143,7 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(nod.md5 == f4_md5_new)
# ensure d2f2 md5 sum has changed in catalog
nods = noder.find_name(top, os.path.basename(d2f2))
nods = noder.find(top, os.path.basename(d2f2))
self.assertTrue(len(nods) == 1)
nod = nods[0]
self.assertTrue(nod)
@ -152,7 +151,7 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(nod.md5 == d2f2_md5_new)
# ensures files and directories are in
names = [node.name for node in anytree.PreOrderIter(storage)]
names = [node.get_name() for node in anytree.PreOrderIter(storage)]
print(names)
self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names)
@ -170,13 +169,13 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(os.path.basename(new5) in names)
for node in storage.children:
if node.name == os.path.basename(dir1):
if node.get_name() == os.path.basename(dir1):
self.assertTrue(len(node.children) == 3)
elif node.name == os.path.basename(dir2):
elif node.get_name() == os.path.basename(dir2):
self.assertTrue(len(node.children) == 3)
elif node.name == os.path.basename(new3):
elif node.get_name() == os.path.basename(new3):
self.assertTrue(len(node.children) == 0)
elif node.name == os.path.basename(new4):
elif node.get_name() == os.path.basename(new4):
self.assertTrue(len(node.children) == 1)
self.assertTrue(read_from_file(d1f1) == editval)
@ -190,7 +189,7 @@ class TestUpdate(unittest.TestCase):
cmd_update(args, noder, catalog, top)
# ensures files and directories are (not) in
names = [node.name for node in anytree.PreOrderIter(storage)]
names = [node.get_name() for node in anytree.PreOrderIter(storage)]
print(names)
self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names)
@ -208,9 +207,9 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(os.path.basename(new4) not in names)
self.assertTrue(os.path.basename(new5) not in names)
for node in storage.children:
if node.name == os.path.basename(dir1):
if node.get_name() == os.path.basename(dir1):
self.assertTrue(len(node.children) == 2)
elif node.name == os.path.basename(new3):
elif node.get_name() == os.path.basename(new3):
self.assertTrue(len(node.children) == 0)

Loading…
Cancel
Save