Merge pull request #30 from deadc0de6/fuse

Fuse
fix-36
deadc0de 1 year ago committed by GitHub
commit 511a32edfa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,6 +17,13 @@ jobs:
python -m pip install --upgrade pip
pip install -r tests-requirements.txt
pip install -r requirements.txt
sudo apt-get -y install shellcheck jq
- name: Run tests
run: |
./tests.sh
- name: Coveralls
run: |
pip install coveralls
coveralls --service=github
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

5
.gitignore vendored

@ -1,6 +1,11 @@
*.pyc
.coverage
.coverage*
dist/
build/
*.egg-info/
*.catalog
.vscode/
.mypy_cache
.pytest_cache
__pycache__

@ -2,7 +2,7 @@
[![Build Status](https://travis-ci.org/deadc0de6/catcli.svg?branch=master)](https://travis-ci.org/deadc0de6/catcli)
[![License: GPL v3](https://img.shields.io/badge/License-GPL%20v3-blue.svg)](http://www.gnu.org/licenses/gpl-3.0)
[![Coverage Status](https://coveralls.io/repos/github/deadc0de6/catcli/badge.svg?branch=master)](https://coveralls.io/github/deadc0de6/catcli?branch=master)
[![Coveralls](https://img.shields.io/coveralls/github/deadc0de6/catcli)](https://coveralls.io/github/deadc0de6/catcli?branch=master)
[![PyPI version](https://badge.fury.io/py/catcli.svg)](https://badge.fury.io/py/catcli)
[![AUR](https://img.shields.io/aur/version/catcli-git.svg)](https://aur.archlinux.org/packages/catcli-git)
@ -24,6 +24,7 @@ Features:
* Index any directories in a catalog
* Ability to search for files by name in the catalog
* Ability to navigate through indexed data à la `ls`
* Support for fuse to mount the indexed data as a virtual filesystem
* Handle archive files (zip, tar, ...) and index their content
* Save catalog to json for easy versioning with git
* Command line interface FTW
@ -73,6 +74,7 @@ See the [examples](#examples) for an overview of the available features.
* [Index archive files](#index-archive-files)
* [Walk indexed files with ls](#walk-indexed-files-with-ls)
* [Find files](#find-files)
* [Mount catalog](#mount-catalog)
* [Display entire hierarchy](#display-entire-hierarchy)
* [Catalog graph](#catalog-graph)
* [Edit storage](#edit-storage)
@ -185,6 +187,27 @@ searching:
See the [examples](#examples) for more.
## Mount catalog
The catalog can be mounted with [fuse](https://www.kernel.org/doc/html/next/filesystems/fuse.html)
and navigate like any filesystem.
```bash
$ mkdir /tmp/mnt
$ catcli index -c github .github
$ catcli mount /tmp/mnt
$ ls -laR /tmp/mnt
drwxrwxrwx - user 8 Mar 22:08 github
mnt/github:
.rwxrwxrwx 17 user 19 Oct 2022 FUNDING.yml
drwxrwxrwx - user 2 Mar 10:15 workflows
mnt/github/workflows:
.rwxrwxrwx 691 user 19 Oct 2022 pypi-release.yml
.rwxrwxrwx 635 user 8 Mar 21:08 testing.yml
```
## Display entire hierarchy
The entire catalog can be shown using the `ls -r` command.
@ -232,7 +255,7 @@ Each line contains the following fields:
* **indexed_at**: when this entry was indexed
* **maccess**: the entry modification date/time
* **md5**: the entry checksum (if any)
* **nbfiles**: the number of children (empty for not storage or directory nodes)
* **nbfiles**: the number of children (empty for nodes that are not storage or directory)
* **free_space**: free space (empty for not storage nodes)
* **total_space**: total space (empty for not storage nodes)
* **meta**: meta information (empty for not storage nodes)

@ -7,7 +7,7 @@ Copyright (c) 2017, deadc0de6
import sys
def main():
def main() -> None:
"""entry point"""
import catcli.catcli
if catcli.catcli.main():

@ -6,11 +6,13 @@ Class that represents the catcli catalog
"""
import os
import pickle
from anytree.exporter import JsonExporter
from anytree.importer import JsonImporter
from typing import Optional
from anytree.exporter import JsonExporter # type: ignore
from anytree.importer import JsonImporter # type: ignore
# local imports
from catcli import nodes
from catcli.nodes import NodeMeta, NodeTop
from catcli.utils import ask
from catcli.logger import Logger
@ -18,7 +20,9 @@ from catcli.logger import Logger
class Catalog:
"""the catalog"""
def __init__(self, path, usepickle=False, debug=False, force=False):
def __init__(self, path: str,
debug: bool = False,
force: bool = False) -> None:
"""
@path: catalog path
@usepickle: use pickle
@ -28,15 +32,15 @@ class Catalog:
self.path = path
self.debug = debug
self.force = force
self.metanode = None
self.pickle = usepickle
self.metanode: Optional[NodeMeta] = None
def set_metanode(self, metanode):
def set_metanode(self, metanode: NodeMeta) -> None:
"""remove the metanode until tree is re-written"""
self.metanode = metanode
self.metanode.parent = None
if self.metanode:
self.metanode.parent = None
def exists(self):
def exists(self) -> bool:
"""does catalog exist"""
if not self.path:
return False
@ -44,19 +48,17 @@ class Catalog:
return True
return False
def restore(self):
def restore(self) -> Optional[NodeTop]:
"""restore the catalog"""
if not self.path:
return None
if not os.path.exists(self.path):
return None
if self.pickle:
return self._restore_pickle()
with open(self.path, 'r', encoding='UTF-8') as file:
content = file.read()
return self._restore_json(content)
def save(self, node):
def save(self, node: NodeTop) -> bool:
"""save the catalog"""
if not self.path:
Logger.err('Path not defined')
@ -73,41 +75,31 @@ class Catalog:
return False
if self.metanode:
self.metanode.parent = node
if self.pickle:
return self._save_pickle(node)
return self._save_json(node)
def _debug(self, text):
def _debug(self, text: str) -> None:
if not self.debug:
return
Logger.debug(text)
def _save_pickle(self, node):
"""pickle the catalog"""
with open(self.path, 'wb') as file:
pickle.dump(node, file)
self._debug(f'Catalog saved to pickle \"{self.path}\"')
return True
def _restore_pickle(self):
"""restore the pickled tree"""
with open(self.path, 'rb') as file:
root = pickle.load(file)
msg = f'Catalog imported from pickle \"{self.path}\"'
self._debug(msg)
return root
def _save_json(self, node):
def _save_json(self, top: NodeTop) -> bool:
"""export the catalog in json"""
self._debug(f'saving {top} to json...')
exp = JsonExporter(indent=2, sort_keys=True)
with open(self.path, 'w', encoding='UTF-8') as file:
exp.write(node, file)
exp.write(top, file)
self._debug(f'Catalog saved to json \"{self.path}\"')
return True
def _restore_json(self, string):
def _restore_json(self, string: str) -> Optional[NodeTop]:
"""restore the tree from json"""
imp = JsonImporter()
self._debug(f'import from string: {string}')
root = imp.import_(string)
self._debug(f'Catalog imported from json \"{self.path}\"')
return root
self._debug(f'root imported: {root}')
if root.type != nodes.TYPE_TOP:
return None
top = NodeTop(root.name, children=root.children)
self._debug(f'top imported: {top}')
return top

@ -11,24 +11,25 @@ Catcli command line interface
import sys
import os
import datetime
from typing import Dict, Any, List
from docopt import docopt
# local imports
from .version import __version__ as VERSION
from .logger import Logger
from .colors import Colors
from .catalog import Catalog
from .walker import Walker
from .noder import Noder
from .utils import ask, edit
from .exceptions import BadFormatException, CatcliException
from catcli.version import __version__ as VERSION
from catcli.nodes import NodeTop, NodeAny
from catcli.logger import Logger
from catcli.colors import Colors
from catcli.catalog import Catalog
from catcli.walker import Walker
from catcli.noder import Noder
from catcli.utils import ask, edit, path_to_search_all
from catcli.fuser import Fuser
from catcli.exceptions import BadFormatException, CatcliException
NAME = 'catcli'
CUR = os.path.dirname(os.path.abspath(__file__))
CATALOGPATH = f'{NAME}.catalog'
GRAPHPATH = f'/tmp/{NAME}.dot'
SEPARATOR = '/'
WILD = '*'
FORMATS = ['native', 'csv', 'csv-with-header', 'fzf-native', 'fzf-csv']
BANNER = f""" +-+-+-+-+-+-+
@ -40,9 +41,12 @@ USAGE = f"""
Usage:
{NAME} ls [--catalog=<path>] [--format=<fmt>] [-aBCrVSs] [<path>]
{NAME} find [--catalog=<path>] [--format=<fmt>] [-aBCbdVsP] [--path=<path>] [<term>]
{NAME} index [--catalog=<path>] [--meta=<meta>...] [-aBCcfnV] <name> <path>
{NAME} find [--catalog=<path>] [--format=<fmt>]
[-aBCbdVsP] [--path=<path>] [<term>]
{NAME} index [--catalog=<path>] [--meta=<meta>...]
[-aBCcfnV] <name> <path>
{NAME} update [--catalog=<path>] [-aBCcfnV] [--lpath=<path>] <name> <path>
{NAME} mount [--catalog=<path>] [-V] <mountpoint>
{NAME} rm [--catalog=<path>] [-BCfV] <storage>
{NAME} rename [--catalog=<path>] [-BCfV] <storage> <name>
{NAME} edit [--catalog=<path>] [-BCfV] <storage>
@ -61,14 +65,14 @@ Options:
-C --no-color Do not output colors [default: False].
-c --hash Calculate md5 hash [default: False].
-d --directory Only directory [default: False].
-F --format=<fmt> Print format, see command \"print_supported_formats\" [default: native].
-F --format=<fmt> see \"print_supported_formats\" [default: native].
-f --force Do not ask when updating the catalog [default: False].
-l --lpath=<path> Path where changes are logged [default: ]
-n --no-subsize Do not store size of directories [default: False].
-P --parent Ignore stored relpath [default: True].
-p --path=<path> Start path.
-r --recursive Recursive [default: False].
-s --raw-size Print raw size rather than human readable [default: False].
-s --raw-size Print raw size [default: False].
-S --sortsize Sort by size, largest first [default: False].
-V --verbose Be verbose [default: False].
-v --version Show version.
@ -76,7 +80,20 @@ Options:
""" # nopep8
def cmd_index(args, noder, catalog, top):
def cmd_mount(args: Dict[str, Any],
top: NodeTop,
noder: Noder) -> None:
"""mount action"""
mountpoint = args['<mountpoint>']
debug = args['--verbose']
Fuser(mountpoint, top, noder,
debug=debug)
def cmd_index(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: NodeTop) -> None:
"""index action"""
path = args['<path>']
name = args['<name>']
@ -99,8 +116,8 @@ def cmd_index(args, noder, catalog, top):
start = datetime.datetime.now()
walker = Walker(noder, usehash=usehash, debug=debug)
attr = noder.format_storage_attr(args['--meta'])
root = noder.new_storage_node(name, path, parent=top, attr=attr)
attr = args['--meta']
root = noder.new_storage_node(name, path, top, attr)
_, cnt = walker.index(path, root, name)
if subsize:
noder.rec_size(root, store=True)
@ -111,7 +128,10 @@ def cmd_index(args, noder, catalog, top):
catalog.save(top)
def cmd_update(args, noder, catalog, top):
def cmd_update(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: NodeTop) -> None:
"""update action"""
path = args['<path>']
name = args['<name>']
@ -122,7 +142,7 @@ def cmd_update(args, noder, catalog, top):
if not os.path.exists(path):
Logger.err(f'\"{path}\" does not exist')
return
root = noder.get_storage_node(top, name, path=path)
root = noder.get_storage_node(top, name, newpath=path)
if not root:
Logger.err(f'storage named \"{name}\" does not exist')
return
@ -139,28 +159,16 @@ def cmd_update(args, noder, catalog, top):
catalog.save(top)
def cmd_ls(args, noder, top):
def cmd_ls(args: Dict[str, Any],
noder: Noder,
top: NodeTop) -> List[NodeAny]:
"""ls action"""
path = args['<path>']
if not path:
path = SEPARATOR
if not path.startswith(SEPARATOR):
path = SEPARATOR + path
# prepend with top node path
pre = f'{SEPARATOR}{noder.NAME_TOP}'
if not path.startswith(pre):
path = pre + path
# ensure ends with a separator
if not path.endswith(SEPARATOR):
path += SEPARATOR
# add wild card
if not path.endswith(WILD):
path += WILD
path = path_to_search_all(args['<path>'])
fmt = args['--format']
if fmt.startswith('fzf'):
raise BadFormatException('fzf is not supported in ls, use find')
found = noder.list(top, path,
found = noder.list(top,
path,
rec=args['--recursive'],
fmt=fmt,
raw=args['--raw-size'])
@ -170,7 +178,10 @@ def cmd_ls(args, noder, top):
return found
def cmd_rm(args, noder, catalog, top):
def cmd_rm(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: NodeTop) -> NodeTop:
"""rm action"""
name = args['<storage>']
node = noder.get_storage_node(top, name)
@ -183,7 +194,9 @@ def cmd_rm(args, noder, catalog, top):
return top
def cmd_find(args, noder, top):
def cmd_find(args: Dict[str, Any],
noder: Noder,
top: NodeTop) -> List[NodeAny]:
"""find action"""
fromtree = args['--parent']
directory = args['--directory']
@ -192,12 +205,18 @@ def cmd_find(args, noder, top):
raw = args['--raw-size']
script = args['--script']
search_for = args['<term>']
return noder.find_name(top, search_for, script=script,
startpath=startpath, only_dir=directory,
parentfromtree=fromtree, fmt=fmt, raw=raw)
found = noder.find_name(top, search_for,
script=script,
startnode=startpath,
only_dir=directory,
parentfromtree=fromtree,
fmt=fmt, raw=raw)
return found
def cmd_graph(args, noder, top):
def cmd_graph(args: Dict[str, Any],
noder: Noder,
top: NodeTop) -> None:
"""graph action"""
path = args['<path>']
if not path:
@ -206,7 +225,9 @@ def cmd_graph(args, noder, top):
Logger.info(f'create graph with \"{cmd}\" (you need graphviz)')
def cmd_rename(args, catalog, top):
def cmd_rename(args: Dict[str, Any],
catalog: Catalog,
top: NodeTop) -> None:
"""rename action"""
storage = args['<storage>']
new = args['<name>']
@ -219,10 +240,12 @@ def cmd_rename(args, catalog, top):
Logger.info(msg)
else:
Logger.err(f'Storage named \"{storage}\" does not exist')
return top
def cmd_edit(args, noder, catalog, top):
def cmd_edit(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: NodeTop) -> None:
"""edit action"""
storage = args['<storage>']
storages = list(x.name for x in top.children)
@ -232,30 +255,29 @@ def cmd_edit(args, noder, catalog, top):
if not attr:
attr = ''
new = edit(attr)
node.attr = noder.format_storage_attr(new)
node.attr = noder.attrs_to_string(new)
if catalog.save(top):
Logger.info(f'Storage \"{storage}\" edited')
else:
Logger.err(f'Storage named \"{storage}\" does not exist')
return top
def banner():
def banner() -> None:
"""print banner"""
Logger.stderr_nocolor(BANNER)
Logger.stderr_nocolor("")
def print_supported_formats():
def print_supported_formats() -> None:
"""print all supported formats to stdout"""
print('"native" : native format')
print('"csv" : CSV format')
print(f' {Noder.CSV_HEADER}')
print('"fzf-native" : fzf to native (only for find)')
print('"fzf-csv" : fzf to csv (only for find)')
print('"fzf-native" : fzf to native (only valid for find)')
print('"fzf-csv" : fzf to csv (only valid for find)')
def main():
def main() -> bool:
"""entry point"""
args = docopt(USAGE, version=VERSION)
@ -320,6 +342,11 @@ def main():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_ls(args, noder, top)
elif args['mount']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_mount(args, top, noder)
elif args['rm']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')

@ -5,6 +5,11 @@ Copyright (c) 2022, deadc0de6
shell colors
"""
from typing import TypeVar, Type
CLASSTYPE = TypeVar('CLASSTYPE', bound='Colors')
class Colors:
"""shell colors"""
@ -22,7 +27,7 @@ class Colors:
UND = '\033[4m'
@classmethod
def no_color(cls):
def no_color(cls: Type[CLASSTYPE]) -> None:
"""disable colors"""
Colors.RED = ''
Colors.GREEN = ''

@ -8,12 +8,13 @@ Catcli generic compressed data lister
import os
import tarfile
import zipfile
from typing import List
class Decomp:
"""decompressor"""
def __init__(self):
def __init__(self) -> None:
self.ext = {
'tar': self._tar,
'tgz': self._tar,
@ -28,29 +29,29 @@ class Decomp:
'tar.bz2': self._tar,
'zip': self._zip}
def get_formats(self):
def get_formats(self) -> List[str]:
"""return list of supported extensions"""
return list(self.ext.keys())
def get_names(self, path):
def get_names(self, path: str) -> List[str]:
"""get tree of compressed archive"""
ext = os.path.splitext(path)[1][1:].lower()
if ext in list(self.ext):
return self.ext[ext](path)
return None
return []
@staticmethod
def _tar(path):
def _tar(path: str) -> List[str]:
"""return list of file names in tar"""
if not tarfile.is_tarfile(path):
return None
return []
with tarfile.open(path, "r") as tar:
return tar.getnames()
@staticmethod
def _zip(path):
def _zip(path: str) -> List[str]:
"""return list of file names in zip"""
if not zipfile.is_zipfile(path):
return None
return []
with zipfile.ZipFile(path) as file:
return file.namelist()

@ -0,0 +1,131 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2023, deadc0de6
fuse for catcli
"""
import os
from time import time
from stat import S_IFDIR, S_IFREG
from typing import List, Dict, Any, Optional
import fuse # type: ignore
# local imports
from catcli.noder import Noder
from catcli.nodes import NodeTop, NodeAny
from catcli.utils import path_to_search_all, path_to_top
from catcli import nodes
class Fuser:
"""fuse filesystem mounter"""
def __init__(self, mountpoint: str,
top: NodeTop,
noder: Noder,
debug: bool = False):
"""fuse filesystem"""
filesystem = CatcliFilesystem(top, noder)
fuse.FUSE(filesystem,
mountpoint,
foreground=debug,
allow_other=True,
nothreads=True,
debug=debug)
class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
"""in-memory filesystem for catcli catalog"""
def __init__(self, top: NodeTop,
noder: Noder):
"""init fuse filesystem"""
self.top = top
self.noder = noder
def _get_entry(self, path: str) -> Optional[NodeAny]:
"""return the node pointed by path"""
path = path_to_top(path)
found = self.noder.list(self.top, path,
rec=False,
fmt='native',
raw=True)
if found:
return found[0]
return None
def _get_entries(self, path: str) -> List[NodeAny]:
"""return nodes pointed by path"""
path = path_to_search_all(path)
found = self.noder.list(self.top, path,
rec=False,
fmt='native',
raw=True)
return found
def _getattr(self, path: str) -> Dict[str, Any]:
entry = self._get_entry(path)
if not entry:
return {}
maccess = time()
mode: Any = S_IFREG
size: int = 0
if entry.type == nodes.TYPE_ARCHIVED:
mode = S_IFREG
size = entry.size
elif entry.type == nodes.TYPE_DIR:
mode = S_IFDIR
size = entry.size
maccess = entry.maccess
elif entry.type == nodes.TYPE_FILE:
mode = S_IFREG
size = entry.size
maccess = entry.maccess
elif entry.type == nodes.TYPE_STORAGE:
mode = S_IFDIR
size = entry.size
maccess = entry.ts
elif entry.type == nodes.TYPE_META:
mode = S_IFREG
elif entry.type == nodes.TYPE_TOP:
mode = S_IFREG
mode = mode | 0o777
return {
'st_mode': (mode), # file type
'st_nlink': 1, # count hard link
'st_size': size,
'st_ctime': maccess, # attr last modified
'st_mtime': maccess, # content last modified
'st_atime': maccess, # access time
'st_uid': os.getuid(),
'st_gid': os.getgid(),
}
def getattr(self, path: str, _fh: Any = None) -> Dict[str, Any]:
"""return attr of file pointed by path"""
if path == '/':
# mountpoint
curt = time()
meta = {
'st_mode': (S_IFDIR | 0o777),
'st_nlink': 1,
'st_size': 0,
'st_ctime': curt,
'st_mtime': curt,
'st_atime': curt,
'st_uid': os.getuid(),
'st_gid': os.getgid(),
}
return meta
meta = self._getattr(path)
return meta
def readdir(self, path: str, _fh: Any) -> List[str]:
"""read directory content"""
content = ['.', '..']
entries = self._get_entries(path)
for entry in entries:
content.append(entry.name)
return content

@ -6,65 +6,66 @@ Logging helper
"""
import sys
from typing import TypeVar, Type
# local imports
from catcli.colors import Colors
from catcli.utils import fix_badchars
CLASSTYPE = TypeVar('CLASSTYPE', bound='Logger')
class Logger:
"""log to stdout/stderr"""
@classmethod
def stdout_nocolor(cls, string):
def stdout_nocolor(cls: Type[CLASSTYPE],
string: str) -> None:
"""to stdout no color"""
string = fix_badchars(string)
sys.stdout.write(f'{string}\n')
@classmethod
def stderr_nocolor(cls, string):
def stderr_nocolor(cls: Type[CLASSTYPE],
string: str) -> None:
"""to stderr no color"""
string = fix_badchars(string)
sys.stderr.write(f'{string}\n')
@classmethod
def debug(cls, string):
def debug(cls: Type[CLASSTYPE],
string: str) -> None:
"""to stderr no color"""
cls.stderr_nocolor(f'[DBG] {string}\n')
cls.stderr_nocolor(f'[DBG] {string}')
@classmethod
def info(cls, string):
def info(cls: Type[CLASSTYPE],
string: str) -> None:
"""to stdout in color"""
string = fix_badchars(string)
out = f'{Colors.MAGENTA}{string}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def err(cls, string):
def err(cls: Type[CLASSTYPE],
string: str) -> None:
"""to stderr in RED"""
string = fix_badchars(string)
out = f'{Colors.RED}{string}{Colors.RESET}'
sys.stderr.write(f'{out}\n')
@classmethod
def progr(cls, string):
def progr(cls: Type[CLASSTYPE],
string: str) -> None:
"""print progress"""
string = fix_badchars(string)
sys.stderr.write(f'{string}\r')
sys.stderr.flush()
@classmethod
def get_bold_text(cls, string):
def get_bold_text(cls: Type[CLASSTYPE],
string: str) -> str:
"""make it bold"""
string = fix_badchars(string)
return f'{Colors.BOLD}{string}{Colors.RESET}'
@classmethod
def log_to_file(cls, path, string, append=True):
"""log to file"""
string = fix_badchars(string)
mode = 'w'
if append:
mode = 'a'
with open(path, mode, encoding='UTF-8') as file:
file.write(string)

@ -6,11 +6,16 @@ Class for printing nodes
"""
import sys
from typing import TypeVar, Type, Optional, Tuple, List, \
Dict, Any
from catcli.colors import Colors
from catcli.utils import fix_badchars
CLASSTYPE = TypeVar('CLASSTYPE', bound='NodePrinter')
class NodePrinter:
"""a node printer class"""
@ -19,7 +24,9 @@ class NodePrinter:
NBFILES = 'nbfiles'
@classmethod
def print_storage_native(cls, pre, name, args, attr):
def print_storage_native(cls: Type[CLASSTYPE], pre: str,
name: str, args: str,
attr: Dict[str, Any]) -> None:
"""print a storage node"""
end = ''
if attr:
@ -31,7 +38,8 @@ class NodePrinter:
sys.stdout.write(f'{out}\n')
@classmethod
def print_file_native(cls, pre, name, attr):
def print_file_native(cls: Type[CLASSTYPE], pre: str,
name: str, attr: str) -> None:
"""print a file node"""
nobad = fix_badchars(name)
out = f'{pre}{nobad}'
@ -39,22 +47,26 @@ class NodePrinter:
sys.stdout.write(f'{out}\n')
@classmethod
def print_dir_native(cls, pre, name, depth='', attr=None):
def print_dir_native(cls: Type[CLASSTYPE], pre: str,
name: str,
depth: int = 0,
attr: Optional[List[Tuple[str, str]]] = None) -> None:
"""print a directory node"""
end = []
if depth != '':
if depth > 0:
end.append(f'{cls.NBFILES}:{depth}')
if attr:
end.append(' '.join([f'{x}:{y}' for x, y in attr]))
end_string = ''
if end:
endstring = ', '.join(end)
end = f' [{endstring}]'
end_string = f' [{", ".join(end)}]'
out = pre + Colors.BLUE + fix_badchars(name) + Colors.RESET
out += f'{Colors.GRAY}{end}{Colors.RESET}'
out += f'{Colors.GRAY}{end_string}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def print_archive_native(cls, pre, name, archive):
def print_archive_native(cls: Type[CLASSTYPE], pre: str,
name: str, archive: str) -> None:
"""archive to stdout"""
out = pre + Colors.YELLOW + fix_badchars(name) + Colors.RESET
out += f' {Colors.GRAY}[{cls.ARCHIVE}:{archive}]{Colors.RESET}'

@ -2,16 +2,20 @@
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2017, deadc0de6
Class that represents a node in the catalog tree
Class that process nodes in the catalog tree
"""
import os
import shutil
import time
import anytree
from pyfzf.pyfzf import FzfPrompt
from typing import List, Union, Tuple, Any, Optional, Dict, cast
import anytree # type: ignore
from pyfzf.pyfzf import FzfPrompt # type: ignore
# local imports
from catcli import nodes
from catcli.nodes import NodeAny, NodeStorage, \
NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta
from catcli.utils import size_to_str, epoch_to_str, md5sum, fix_badchars
from catcli.logger import Logger
from catcli.nodeprinter import NodePrinter
@ -30,21 +34,13 @@ class Noder:
* "file" node representing a file
"""
NAME_TOP = 'top'
NAME_META = 'meta'
TYPE_TOP = 'top'
TYPE_FILE = 'file'
TYPE_DIR = 'dir'
TYPE_ARC = 'arc'
TYPE_STORAGE = 'storage'
TYPE_META = 'meta'
CSV_HEADER = ('name,type,path,size,indexed_at,'
'maccess,md5,nbfiles,free_space,'
'total_space,meta')
def __init__(self, debug=False, sortsize=False, arc=False):
def __init__(self, debug: bool = False,
sortsize: bool = False,
arc: bool = False) -> None:
"""
@debug: debug mode
@sortsize: sort nodes by size
@ -58,41 +54,49 @@ class Noder:
self.decomp = Decomp()
@staticmethod
def get_storage_names(top):
def get_storage_names(top: NodeTop) -> List[str]:
"""return a list of all storage names"""
return [x.name for x in list(top.children)]
def get_storage_node(self, top, name, path=None):
def get_storage_node(self, top: NodeTop,
name: str,
newpath: str = '') -> NodeStorage:
"""
return the storage node if any
if path is submitted, it will update the media info
if newpath is submitted, it will update the media info
"""
found = None
for node in top.children:
if node.type != self.TYPE_STORAGE:
if node.type != nodes.TYPE_STORAGE:
continue
if node.name == name:
found = node
break
if found and path and os.path.exists(path):
found.free = shutil.disk_usage(path).free
found.total = shutil.disk_usage(path).total
if found and newpath and os.path.exists(newpath):
found.free = shutil.disk_usage(newpath).free
found.total = shutil.disk_usage(newpath).total
found.ts = int(time.time())
return found
return cast(NodeStorage, found)
@staticmethod
def get_node(top, path, quiet=False):
def get_node(top: NodeTop,
path: str,
quiet: bool = False) -> Optional[NodeAny]:
"""get the node by internal tree path"""
resolv = anytree.resolver.Resolver('name')
try:
bpath = os.path.basename(path)
return resolv.get(top, bpath)
the_node = resolv.get(top, bpath)
return cast(NodeAny, the_node)
except anytree.resolver.ChildResolverError:
if not quiet:
Logger.err(f'No node at path \"{bpath}\"')
return None
def get_node_if_changed(self, top, path, treepath):
def get_node_if_changed(self,
top: NodeTop,
path: str,
treepath: str) -> Tuple[Optional[NodeAny], bool]:
"""
return the node (if any) and if it has changed
@top: top node (storage)
@ -128,59 +132,70 @@ class Noder:
self._debug(f'\tchange: no change for \"{path}\"')
return node, False
def rec_size(self, node, store=True):
def rec_size(self, node: Union[NodeDir, NodeStorage],
store: bool = True) -> int:
"""
recursively traverse tree and return size
@store: store the size in the node
"""
if node.type == self.TYPE_FILE:
self._debug(f'getting node size for \"{node.name}\"')
if node.type == nodes.TYPE_FILE:
self._debug(f'size of {node.type} \"{node.name}\": {node.size}')
return node.size
msg = f'getting node size recursively for \"{node.name}\"'
self._debug(msg)
size = 0
size: int = 0
for i in node.children:
if node.type == self.TYPE_DIR:
size = self.rec_size(i, store=store)
if node.type == nodes.TYPE_DIR:
sub_size = self.rec_size(i, store=store)
if store:
i.size = size
size += size
if node.type == self.TYPE_STORAGE:
size = self.rec_size(i, store=store)
i.size = sub_size
size += sub_size
continue
if node.type == nodes.TYPE_STORAGE:
sub_size = self.rec_size(i, store=store)
if store:
i.size = size
size += size
else:
i.size = sub_size
size += sub_size
continue
self._debug(f'skipping {node.name}')
if store:
node.size = size
self._debug(f'size of {node.type} \"{node.name}\": {size}')
return size
###############################################################
# public helpers
###############################################################
@staticmethod
def format_storage_attr(attr):
def attrs_to_string(attr: Union[List[str], Dict[str, str], str]) -> str:
"""format the storage attr for saving"""
if not attr:
return ''
if isinstance(attr, list):
return ', '.join(attr)
if isinstance(attr, dict):
ret = []
for key, val in attr.items():
ret.append(f'{key}={val}')
return ', '.join(ret)
attr = attr.rstrip()
return attr
def set_hashing(self, val):
def do_hashing(self, val: bool) -> None:
"""hash files when indexing"""
self.hash = val
###############################################################
# node creation
###############################################################
def new_top_node(self):
def new_top_node(self) -> NodeTop:
"""create a new top node"""
return anytree.AnyNode(name=self.NAME_TOP, type=self.TYPE_TOP)
top = NodeTop(nodes.NAME_TOP)
self._debug(f'new top node: {top}')
return top
def new_file_node(self, name, path, parent, storagepath):
def new_file_node(self, name: str, path: str,
parent: NodeAny, storagepath: str) -> Optional[NodeFile]:
"""create a new node representing a file"""
if not os.path.exists(path):
Logger.err(f'File \"{path}\" does not exist')
@ -191,15 +206,18 @@ class Noder:
except OSError as exc:
Logger.err(f'OSError: {exc}')
return None
md5 = None
md5 = ''
if self.hash:
md5 = self._get_hash(path)
relpath = os.sep.join([storagepath, name])
maccess = os.path.getmtime(path)
node = self._new_generic_node(name, self.TYPE_FILE, relpath, parent,
size=stat.st_size, md5=md5,
maccess=maccess)
node = NodeFile(name,
relpath,
stat.st_size,
md5,
maccess,
parent=parent)
if self.arc:
ext = os.path.splitext(path)[1][1:]
if ext.lower() in self.decomp.get_formats():
@ -210,90 +228,93 @@ class Noder:
self._debug(f'{path} is NOT an archive')
return node
def new_dir_node(self, name, path, parent, storagepath):
def new_dir_node(self, name: str, path: str,
parent: NodeAny, storagepath: str) -> NodeDir:
"""create a new node representing a directory"""
path = os.path.abspath(path)
relpath = os.sep.join([storagepath, name])
maccess = os.path.getmtime(path)
return self._new_generic_node(name, self.TYPE_DIR, relpath,
parent, maccess=maccess)
def new_storage_node(self, name, path, parent, attr=None):
return NodeDir(name,
relpath,
0,
maccess,
parent=parent)
def new_storage_node(self, name: str,
path: str,
parent: str,
attrs: Dict[str, Any]) \
-> NodeStorage:
"""create a new node representing a storage"""
path = os.path.abspath(path)
free = shutil.disk_usage(path).free
total = shutil.disk_usage(path).total
epoch = int(time.time())
return anytree.AnyNode(name=name, type=self.TYPE_STORAGE, free=free,
total=total, parent=parent, attr=attr, ts=epoch)
def new_archive_node(self, name, path, parent, archive):
return NodeStorage(name,
free,
total,
0,
epoch,
self.attrs_to_string(attrs),
parent=parent)
def new_archive_node(self, name: str, path: str,
parent: str, archive: str) -> NodeArchived:
"""create a new node for archive data"""
return anytree.AnyNode(name=name, type=self.TYPE_ARC, relpath=path,
parent=parent, size=0, md5=None,
archive=archive)
@staticmethod
def _new_generic_node(name, nodetype, relpath, parent,
size=None, md5=None, maccess=None):
"""generic node creation"""
return anytree.AnyNode(name=name, type=nodetype, relpath=relpath,
parent=parent, size=size,
md5=md5, maccess=maccess)
return NodeArchived(name=name, relpath=path,
parent=parent, size=0, md5='',
archive=archive)
###############################################################
# node management
###############################################################
def update_metanode(self, top):
def update_metanode(self, top: NodeTop) -> NodeMeta:
"""create or update meta node information"""
meta = self._get_meta_node(top)
epoch = int(time.time())
if not meta:
attr = {}
attr['created'] = epoch
attr['created_version'] = VERSION
meta = anytree.AnyNode(name=self.NAME_META, type=self.TYPE_META,
attr=attr)
attrs: Dict[str, Any] = {}
attrs['created'] = epoch
attrs['created_version'] = VERSION
meta = NodeMeta(name=nodes.NAME_META,
attr=attrs)
meta.attr['access'] = epoch
meta.attr['access_version'] = VERSION
return meta
def _get_meta_node(self, top):
def _get_meta_node(self, top: NodeTop) -> Optional[NodeMeta]:
"""return the meta node if any"""
try:
return next(filter(lambda x: x.type == self.TYPE_META,
top.children))
found = next(filter(lambda x: x.type == nodes.TYPE_META,
top.children))
return cast(NodeMeta, found)
except StopIteration:
return None
def clean_not_flagged(self, top):
def clean_not_flagged(self, top: NodeTop) -> int:
"""remove any node not flagged and clean flags"""
cnt = 0
for node in anytree.PreOrderIter(top):
if node.type not in [self.TYPE_FILE, self.TYPE_DIR]:
if node.type not in [nodes.TYPE_DIR, nodes.TYPE_FILE]:
continue
if self._clean(node):
cnt += 1
return cnt
@staticmethod
def flag(node):
"""flag a node"""
node.flag = True
def _clean(self, node):
def _clean(self, node: NodeAny) -> bool:
"""remove node if not flagged"""
if not self._has_attr(node, 'flag') or \
not node.flag:
if not node.flagged():
node.parent = None
return True
del node.flag
node.unflag()
return False
###############################################################
# printing
###############################################################
def _node_to_csv(self, node, sep=',', raw=False):
def _node_to_csv(self, node: NodeAny,
sep: str = ',',
raw: bool = False) -> None:
"""
print a node to csv
@node: the node to consider
@ -302,11 +323,11 @@ class Noder:
"""
if not node:
return
if node.type == self.TYPE_TOP:
if node.type == nodes.TYPE_TOP:
return
out = []
if node.type == self.TYPE_STORAGE:
if node.type == nodes.TYPE_STORAGE:
# handle storage
out.append(node.name) # name
out.append(node.type) # type
@ -337,11 +358,11 @@ class Noder:
out.append(epoch_to_str(node.maccess)) # maccess
else:
out.append('') # fake maccess
if node.md5:
if self._has_attr(node, 'md5'):
out.append(node.md5) # md5
else:
out.append('') # fake md5
if node.type == self.TYPE_DIR:
if node.type == nodes.TYPE_DIR:
out.append(str(len(node.children))) # nbfiles
else:
out.append('') # fake nbfiles
@ -353,9 +374,13 @@ class Noder:
if len(line) > 0:
Logger.stdout_nocolor(line)
def _print_node_native(self, node, pre='', withpath=False,
withdepth=False, withstorage=False,
recalcparent=False, raw=False):
def _print_node_native(self, node: NodeAny,
pre: str = '',
withpath: bool = False,
withdepth: bool = False,
withstorage: bool = False,
recalcparent: bool = False,
raw: bool = False) -> None:
"""
print a node
@node: the node to print
@ -366,10 +391,10 @@ class Noder:
@recalcparent: get relpath from tree instead of relpath field
@raw: print raw size rather than human readable
"""
if node.type == self.TYPE_TOP:
if node.type == nodes.TYPE_TOP:
# top node
Logger.stdout_nocolor(f'{pre}{node.name}')
elif node.type == self.TYPE_FILE:
elif node.type == nodes.TYPE_FILE:
# node of type file
name = node.name
if withpath:
@ -380,16 +405,16 @@ class Noder:
name = name.lstrip(os.sep)
if withstorage:
storage = self._get_storage(node)
attr = ''
attr_str = ''
if node.md5:
attr = f', md5:{node.md5}'
attr_str = f', md5:{node.md5}'
size = size_to_str(node.size, raw=raw)
compl = f'size:{size}{attr}'
compl = f'size:{size}{attr_str}'
if withstorage:
content = Logger.get_bold_text(storage.name)
compl += f', storage:{content}'
NodePrinter.print_file_native(pre, name, compl)
elif node.type == self.TYPE_DIR:
elif node.type == nodes.TYPE_DIR:
# node of type directory
name = node.name
if withpath:
@ -398,23 +423,25 @@ class Noder:
else:
name = node.relpath
name = name.lstrip(os.sep)
depth = ''
depth = 0
if withdepth:
depth = len(node.children)
if withstorage:
storage = self._get_storage(node)
attr = []
attr: List[Tuple[str, str]] = []
if node.size:
attr.append(['totsize', size_to_str(node.size, raw=raw)])
attr.append(('totsize', size_to_str(node.size, raw=raw)))
if withstorage:
attr.append(['storage', Logger.get_bold_text(storage.name)])
attr.append(('storage', Logger.get_bold_text(storage.name)))
NodePrinter.print_dir_native(pre, name, depth=depth, attr=attr)
elif node.type == self.TYPE_STORAGE:
elif node.type == nodes.TYPE_STORAGE:
# node of type storage
sztotal = size_to_str(node.total, raw=raw)
szused = size_to_str(node.total - node.free, raw=raw)
nbchildren = len(node.children)
pcent = node.free * 100 / node.total
pcent = 0
if node.total > 0:
pcent = node.free * 100 / node.total
freepercent = f'{pcent:.1f}%'
# get the date
timestamp = ''
@ -423,9 +450,9 @@ class Noder:
timestamp += epoch_to_str(node.ts)
disksize = ''
# the children size
size = self.rec_size(node, store=False)
size = size_to_str(size, raw=raw)
disksize = 'totsize:' + f'{size}'
recsize = self.rec_size(node, store=False)
sizestr = size_to_str(recsize, raw=raw)
disksize = 'totsize:' + f'{sizestr}'
# format the output
name = node.name
args = [
@ -439,16 +466,16 @@ class Noder:
name,
argsstring,
node.attr)
elif node.type == self.TYPE_ARC:
elif node.type == nodes.TYPE_ARCHIVED:
# archive node
if self.arc:
NodePrinter.print_archive_native(pre, node.name, node.archive)
else:
Logger.err(f'bad node encountered: {node}')
def print_tree(self, node,
fmt='native',
raw=False):
def print_tree(self, node: NodeAny,
fmt: str = 'native',
raw: bool = False) -> None:
"""
print the tree in different format
@node: start node
@ -470,27 +497,28 @@ class Noder:
Logger.stdout_nocolor(self.CSV_HEADER)
self._to_csv(node, raw=raw)
def _to_csv(self, node, raw=False):
def _to_csv(self, node: NodeAny,
raw: bool = False) -> None:
"""print the tree to csv"""
rend = anytree.RenderTree(node, childiter=self._sort_tree)
for _, _, item in rend:
self._node_to_csv(item, raw=raw)
@staticmethod
def _fzf_prompt(strings):
def _fzf_prompt(strings: Any) -> Any:
# prompt with fzf
fzf = FzfPrompt()
selected = fzf.prompt(strings)
return selected
def _to_fzf(self, node, fmt):
def _to_fzf(self, node: NodeAny, fmt: str) -> None:
"""
fzf prompt with list and print selected node(s)
@node: node to start with
@fmt: output format for selected nodes
"""
rendered = anytree.RenderTree(node, childiter=self._sort_tree)
nodes = {}
the_nodes = {}
# construct node names list
for _, _, rend in rendered:
if not rend:
@ -498,33 +526,38 @@ class Noder:
parents = self._get_parents(rend)
storage = self._get_storage(rend)
fullpath = os.path.join(storage.name, parents)
nodes[fullpath] = rend
the_nodes[fullpath] = rend
# prompt with fzf
paths = self._fzf_prompt(nodes.keys())
paths = self._fzf_prompt(the_nodes.keys())
# print the resulting tree
subfmt = fmt.replace('fzf-', '')
for path in paths:
if not path:
continue
if path not in nodes:
if path not in the_nodes:
continue
rend = nodes[path]
rend = the_nodes[path]
self.print_tree(rend, fmt=subfmt)
@staticmethod
def to_dot(node, path='tree.dot'):
def to_dot(top: NodeTop,
path: str = 'tree.dot') -> str:
"""export to dot for graphing"""
anytree.exporter.DotExporter(node).to_dotfile(path)
anytree.exporter.DotExporter(top).to_dotfile(path)
Logger.info(f'dot file created under \"{path}\"')
return f'dot {path} -T png -o /tmp/tree.png'
###############################################################
# searching
###############################################################
def find_name(self, top, key,
script=False, only_dir=False,
startpath=None, parentfromtree=False,
fmt='native', raw=False):
def find_name(self, top: NodeTop,
key: str,
script: bool = False,
only_dir: bool = False,
startnode: Optional[NodeAny] = None,
parentfromtree: bool = False,
fmt: str = 'native',
raw: bool = False) -> List[NodeAny]:
"""
find files based on their names
@top: top node
@ -540,9 +573,9 @@ class Noder:
self._debug(f'searching for \"{key}\"')
# search for nodes based on path
start = top
if startpath:
start = self.get_node(top, startpath)
start: Optional[NodeAny] = top
if startnode:
start = self.get_node(top, startnode)
filterfunc = self._callback_find_name(key, only_dir)
found = anytree.findall(start, filter_=filterfunc)
nbfound = len(found)
@ -551,7 +584,9 @@ class Noder:
# compile found nodes
paths = {}
for item in found:
item = self._sanitize(item)
item.name = fix_badchars(item.name)
if hasattr(item, 'relpath'):
item.relpath = fix_badchars(item.relpath)
if parentfromtree:
paths[self._get_parents(item)] = item
else:
@ -591,19 +626,19 @@ class Noder:
return list(paths.values())
def _callback_find_name(self, term, only_dir):
def _callback_find_name(self, term: str, only_dir: bool) -> Any:
"""callback for finding files"""
def find_name(node):
if node.type == self.TYPE_STORAGE:
def find_name(node: NodeAny) -> bool:
if node.type == nodes.TYPE_STORAGE:
# ignore storage nodes
return False
if node.type == self.TYPE_TOP:
if node.type == nodes.TYPE_TOP:
# ignore top nodes
return False
if node.type == self.TYPE_META:
if node.type == nodes.TYPE_META:
# ignore meta nodes
return False
if only_dir and node.type != self.TYPE_DIR:
if only_dir and node.type == nodes.TYPE_DIR:
# ignore non directory
return False
@ -620,10 +655,11 @@ class Noder:
###############################################################
# ls
###############################################################
def list(self, top, path,
rec=False,
fmt='native',
raw=False):
def list(self, top: NodeTop,
path: str,
rec: bool = False,
fmt: str = 'native',
raw: bool = False) -> List[NodeAny]:
"""
list nodes for "ls"
@top: top node
@ -684,7 +720,9 @@ class Noder:
###############################################################
# tree creation
###############################################################
def _add_entry(self, name, top, resolv):
def _add_entry(self, name: str,
top: NodeTop,
resolv: Any) -> None:
"""add an entry to the tree"""
entries = name.rstrip(os.sep).split(os.sep)
if len(entries) == 1:
@ -698,7 +736,7 @@ class Noder:
except anytree.resolver.ChildResolverError:
self.new_archive_node(nodename, name, top, top.name)
def list_to_tree(self, parent, names):
def list_to_tree(self, parent: NodeAny, names: List[str]) -> None:
"""convert list of files to a tree"""
if not names:
return
@ -710,70 +748,64 @@ class Noder:
###############################################################
# diverse
###############################################################
def _sort_tree(self, items):
def _sort_tree(self,
items: List[NodeAny]) -> List[NodeAny]:
"""sorting a list of items"""
return sorted(items, key=self._sort, reverse=self.sortsize)
def _sort(self, lst):
def _sort(self, lst: NodeAny) -> Any:
"""sort a list"""
if self.sortsize:
return self._sort_size(lst)
return self._sort_fs(lst)
@staticmethod
def _sort_fs(node):
def _sort_fs(node: NodeAny) -> Tuple[str, str]:
"""sorting nodes dir first and alpha"""
return (node.type, node.name.lstrip('.').lower())
@staticmethod
def _sort_size(node):
def _sort_size(node: NodeAny) -> float:
"""sorting nodes by size"""
try:
if not node.size:
return 0
return node.size
return float(node.size)
except AttributeError:
return 0
def _get_storage(self, node):
def _get_storage(self, node: NodeAny) -> NodeStorage:
"""recursively traverse up to find storage"""
if node.type == self.TYPE_STORAGE:
if node.type == nodes.TYPE_STORAGE:
return node
return node.ancestors[1]
return cast(NodeStorage, node.ancestors[1])
@staticmethod
def _has_attr(node, attr):
def _has_attr(node: NodeAny, attr: str) -> bool:
"""return True if node has attr as attribute"""
return attr in node.__dict__.keys()
def _get_parents(self, node):
def _get_parents(self, node: NodeAny) -> str:
"""get all parents recursively"""
if node.type == self.TYPE_STORAGE:
if node.type == nodes.TYPE_STORAGE:
return ''
if node.type == self.TYPE_TOP:
if node.type == nodes.TYPE_TOP:
return ''
parent = self._get_parents(node.parent)
if parent:
return os.sep.join([parent, node.name])
return node.name
return str(node.name)
@staticmethod
def _get_hash(path):
def _get_hash(path: str) -> str:
"""return md5 hash of node"""
try:
return md5sum(path)
except CatcliException as exc:
Logger.err(str(exc))
return None
@staticmethod
def _sanitize(node):
"""sanitize node strings"""
node.name = fix_badchars(node.name)
node.relpath = fix_badchars(node.relpath)
return node
return ''
def _debug(self, string):
def _debug(self, string: str) -> None:
"""print debug"""
if not self.debug:
return

@ -0,0 +1,207 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2023, deadc0de6
Class that represents a node in the catalog tree
"""
# pylint: disable=W0622
from typing import Dict, Any
from anytree import NodeMixin # type: ignore
TYPE_TOP = 'top'
TYPE_FILE = 'file'
TYPE_DIR = 'dir'
TYPE_ARCHIVED = 'arc'
TYPE_STORAGE = 'storage'
TYPE_META = 'meta'
NAME_TOP = 'top'
NAME_META = 'meta'
class NodeAny(NodeMixin): # type: ignore
"""generic node"""
def __init__(self, # type: ignore[no-untyped-def]
parent=None,
children=None):
"""build generic node"""
super().__init__()
self.parent = parent
if children:
self.children = children
def _to_str(self) -> str:
ret = str(self.__class__) + ": " + str(self.__dict__)
if self.children:
ret += '\n'
for child in self.children:
ret += ' child => ' + str(child)
return ret
def __str__(self) -> str:
return self._to_str()
def flagged(self) -> bool:
"""is flagged"""
if not hasattr(self, '_flagged'):
return False
return self._flagged
def flag(self) -> None:
"""flag a node"""
self._flagged = True # pylint: disable=W0201
def unflag(self) -> None:
"""unflag node"""
self._flagged = False # pylint: disable=W0201
delattr(self, '_flagged')
class NodeTop(NodeAny):
"""a top node"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
children=None):
"""build a top node"""
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_TOP
self.parent = None
if children:
self.children = children
def __str__(self) -> str:
return self._to_str()
class NodeFile(NodeAny):
"""a file node"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
relpath: str,
size: int,
md5: str,
maccess: float,
parent=None,
children=None):
"""build a file node"""
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_FILE
self.relpath = relpath
self.size = size
self.md5 = md5
self.maccess = maccess
self.parent = parent
if children:
self.children = children
def __str__(self) -> str:
return self._to_str()
class NodeDir(NodeAny):
"""a directory node"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
relpath: str,
size: int,
maccess: float,
parent=None,
children=None):
"""build a directory node"""
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_DIR
self.relpath = relpath
self.size = size
self.maccess = maccess
self.parent = parent
if children:
self.children = children
def __str__(self) -> str:
return self._to_str()
class NodeArchived(NodeAny):
"""an archived node"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
relpath: str,
size: int,
md5: str,
archive: str,
parent=None,
children=None):
"""build an archived node"""
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_ARCHIVED
self.relpath = relpath
self.size = size
self.md5 = md5
self.archive = archive
self.parent = parent
if children:
self.children = children
def __str__(self) -> str:
return self._to_str()
class NodeStorage(NodeAny):
"""a storage node"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
free: int,
total: int,
size: int,
ts: float,
attr: str,
parent=None,
children=None):
"""build a storage node"""
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_STORAGE
self.free = free
self.total = total
self.attr = attr
self.size = size
self.ts = ts # pylint: disable=C0103
self.parent = parent
if children:
self.children = children
def __str__(self) -> str:
return self._to_str()
class NodeMeta(NodeAny):
"""a meta node"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
attr: Dict[str, Any],
parent=None,
children=None):
"""build a meta node"""
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_META
self.attr = attr
self.parent = parent
if children:
self.children = children
def __str__(self) -> str:
return self._to_str()

@ -12,10 +12,43 @@ import subprocess
import datetime
# local imports
from catcli import nodes
from catcli.exceptions import CatcliException
def md5sum(path):
SEPARATOR = '/'
WILD = '*'
def path_to_top(path: str) -> str:
"""path pivot under top"""
pre = f'{SEPARATOR}{nodes.NAME_TOP}'
if not path.startswith(pre):
# prepend with top node path
path = pre + path
return path
def path_to_search_all(path: str) -> str:
"""path to search for all subs"""
if not path:
path = SEPARATOR
if not path.startswith(SEPARATOR):
path = SEPARATOR + path
pre = f'{SEPARATOR}{nodes.NAME_TOP}'
if not path.startswith(pre):
# prepend with top node path
path = pre + path
if not path.endswith(SEPARATOR):
# ensure ends with a separator
path += SEPARATOR
if not path.endswith(WILD):
# add wild card
path += WILD
return path
def md5sum(path: str) -> str:
"""
calculate md5 sum of a file
may raise exception
@ -36,10 +69,11 @@ def md5sum(path):
pass
except OSError as exc:
raise CatcliException(f'md5sum error: {exc}') from exc
return None
return ''
def size_to_str(size, raw=True):
def size_to_str(size: float,
raw: bool = True) -> str:
"""convert size to string, optionally human readable"""
div = 1024.
suf = ['B', 'K', 'M', 'G', 'T', 'P']
@ -53,27 +87,27 @@ def size_to_str(size, raw=True):
return f'{size:.1f}{sufix}'
def epoch_to_str(epoch):
def epoch_to_str(epoch: float) -> str:
"""convert epoch to string"""
if not epoch:
return ''
fmt = '%Y-%m-%d %H:%M:%S'
timestamp = datetime.datetime.fromtimestamp(float(epoch))
timestamp = datetime.datetime.fromtimestamp(epoch)
return timestamp.strftime(fmt)
def ask(question):
def ask(question: str) -> bool:
"""ask the user what to do"""
resp = input(f'{question} [y|N] ? ')
return resp.lower() == 'y'
def edit(string):
def edit(string: str) -> str:
"""edit the information with the default EDITOR"""
string = string.encode('utf-8')
data = string.encode('utf-8')
editor = os.environ.get('EDITOR', 'vim')
with tempfile.NamedTemporaryFile(prefix='catcli', suffix='.tmp') as file:
file.write(string)
file.write(data)
file.flush()
subprocess.call([editor, file.name])
file.seek(0)
@ -81,6 +115,6 @@ def edit(string):
return new.decode('utf-8')
def fix_badchars(string):
def fix_badchars(string: str) -> str:
"""fix none utf-8 chars in string"""
return string.encode('utf-8', 'ignore').decode('utf-8')

@ -6,9 +6,12 @@ Catcli filesystem indexer
"""
import os
from typing import Tuple, Optional
# local imports
from catcli.noder import Noder
from catcli.logger import Logger
from catcli.nodes import NodeAny, NodeTop
class Walker:
@ -16,8 +19,10 @@ class Walker:
MAXLINELEN = 80 - 15
def __init__(self, noder, usehash=True, debug=False,
logpath=None):
def __init__(self, noder: Noder,
usehash: bool = True,
debug: bool = False,
logpath: str = ''):
"""
@noder: the noder to use
@hash: calculate hash of nodes
@ -26,11 +31,14 @@ class Walker:
"""
self.noder = noder
self.usehash = usehash
self.noder.set_hashing(self.usehash)
self.noder.do_hashing(self.usehash)
self.debug = debug
self.lpath = logpath
def index(self, path, parent, name, storagepath=''):
def index(self, path: str,
parent: NodeAny,
name: str,
storagepath: str = '') -> Tuple[str, int]:
"""
index a directory and store in tree
@path: path to index
@ -39,7 +47,8 @@ class Walker:
"""
self._debug(f'indexing starting at {path}')
if not parent:
parent = self.noder.new_dir_node(name, path, parent)
parent = self.noder.new_dir_node(name, path,
parent, storagepath)
if os.path.islink(path):
rel = os.readlink(path)
@ -77,16 +86,19 @@ class Walker:
_, cnt2 = self.index(sub, dummy, base, nstoragepath)
cnt += cnt2
break
self._progress(None)
self._progress('')
return parent, cnt
def reindex(self, path, parent, top):
def reindex(self, path: str, parent: NodeAny, top: NodeTop) -> int:
"""reindex a directory and store in tree"""
cnt = self._reindex(path, parent, top)
cnt += self.noder.clean_not_flagged(parent)
return cnt
def _reindex(self, path, parent, top, storagepath=''):
def _reindex(self, path: str,
parent: NodeAny,
top: NodeTop,
storagepath: str = '') -> int:
"""
reindex a directory and store in tree
@path: directory path to re-index
@ -103,13 +115,14 @@ class Walker:
reindex, node = self._need_reindex(parent, sub, treepath)
if not reindex:
self._debug(f'\tskip file {sub}')
self.noder.flag(node)
if node:
node.flag()
continue
self._log2file(f'update catalog for \"{sub}\"')
node = self.noder.new_file_node(os.path.basename(file), sub,
parent, storagepath)
self.noder.flag(node)
cnt += 1
if node:
node.flag()
cnt += 1
for adir in dirs:
self._debug(f'found dir \"{adir}\" under {path}')
base = os.path.basename(adir)
@ -117,49 +130,53 @@ class Walker:
treepath = os.path.join(storagepath, adir)
reindex, dummy = self._need_reindex(parent, sub, treepath)
if reindex:
self._log2file(f'update catalog for \"{sub}\"')
dummy = self.noder.new_dir_node(base, sub,
parent, storagepath)
cnt += 1
self.noder.flag(dummy)
if dummy:
dummy.flag()
self._debug(f'reindexing deeper under {sub}')
nstoragepath = os.sep.join([storagepath, base])
if not storagepath:
nstoragepath = base
cnt2 = self._reindex(sub, dummy, top, nstoragepath)
cnt += cnt2
if dummy:
cnt2 = self._reindex(sub, dummy, top, nstoragepath)
cnt += cnt2
break
return cnt
def _need_reindex(self, top, path, treepath):
def _need_reindex(self,
top: NodeTop,
path: str,
treepath: str) -> Tuple[bool, Optional[NodeTop]]:
"""
test if node needs re-indexing
@top: top node (storage)
@path: abs path to file
@treepath: rel path from indexed directory
"""
cnode, changed = self.noder.get_node_if_changed(top, path, treepath)
if not cnode:
node, changed = self.noder.get_node_if_changed(top, path, treepath)
if not node:
self._debug(f'\t{path} does not exist')
return True, cnode
if cnode and not changed:
return True, node
if node and not changed:
# ignore this node
self._debug(f'\t{path} has not changed')
return False, cnode
if cnode and changed:
return False, node
if node and changed:
# remove this node and re-add
self._debug(f'\t{path} has changed')
self._debug(f'\tremoving node {cnode.name} for {path}')
cnode.parent = None
return True, cnode
self._debug(f'\tremoving node {node.name} for {path}')
node.parent = None
return True, node
def _debug(self, string):
def _debug(self, string: str) -> None:
"""print to debug"""
if not self.debug:
return
Logger.debug(string)
def _progress(self, string):
def _progress(self, string: str) -> None:
"""print progress"""
if self.debug:
return
@ -170,10 +187,3 @@ class Walker:
if len(string) > self.MAXLINELEN:
string = string[:self.MAXLINELEN] + '...'
Logger.progr(f'indexing: {string:80}')
def _log2file(self, string):
"""log to file"""
if not self.lpath:
return
line = f'{string}\n'
Logger.log_to_file(self.lpath, line, append=True)

@ -1,3 +1,5 @@
docopt; python_version >= '3.0'
types-docopt; python_version >= '3.0'
anytree; python_version >= '3.0'
pyfzf; python_version >= '3.0'
fusepy; python_version >= '3.0'

@ -1,25 +1,31 @@
from setuptools import setup, find_packages
from codecs import open
"""setup.py"""
from os import path
from setuptools import setup, find_packages
import catcli
readme = 'README.md'
README = 'README.md'
here = path.abspath(path.dirname(__file__))
read_readme = lambda f: open(f, 'r').read()
VERSION = catcli.__version__
VERSION = catcli.version.__version__
REQUIRES_PYTHON = '>=3'
def read_readme(readme_path):
"""read readme content"""
with open(readme_path, encoding="utf-8") as file:
return file.read()
URL = f'https://github.com/deadc0de6/catcli/archive/v{VERSION}.tar.gz'
setup(
name='catcli',
version=VERSION,
description='The command line catalog tool for your offline data',
long_description=read_readme(readme),
long_description=read_readme(README),
long_description_content_type='text/markdown',
license_files = ('LICENSE',),
license_files=('LICENSE',),
url='https://github.com/deadc0de6/catcli',
download_url = 'https://github.com/deadc0de6/catcli/archive/v'+VERSION+'.tar.gz',
download_url=URL,
options={"bdist_wheel": {"python_tag": "py3"}},
# include anything from MANIFEST.in
include_package_data=True,

@ -0,0 +1,5 @@
"github","storage","","1510","2023-03-09 16:20:59","","","2","0","0",""
"workflows","dir","github/workflows","1493","2023-03-09 16:20:59","2023-03-09 16:20:44","","2","","",""
"pypi-release.yml","file","github/workflows/pypi-release.yml","691","2023-03-09 16:20:59","2022-10-19 21:00:37","57699a7a6a03e20e864f220e19f8e197","","","",""
"testing.yml","file","github/workflows/testing.yml","802","2023-03-09 16:20:59","2023-03-09 16:20:44","7144a119ef43adb634654522c12ec250","","","",""
"FUNDING.yml","file","github/FUNDING.yml","17","2023-03-09 16:20:59","2022-10-19 21:00:37","0c6407a84d412c514007313fb3bca4de","","","",""

@ -0,0 +1,60 @@
{
"children": [
{
"attr": "",
"children": [
{
"maccess": 1666206037.0786593,
"md5": "0c6407a84d412c514007313fb3bca4de",
"name": "FUNDING.yml",
"relpath": "/FUNDING.yml",
"size": 17,
"type": "file"
},
{
"children": [
{
"maccess": 1666206037.078865,
"md5": "57699a7a6a03e20e864f220e19f8e197",
"name": "pypi-release.yml",
"relpath": "workflows/pypi-release.yml",
"size": 691,
"type": "file"
},
{
"maccess": 1678375244.4870229,
"md5": "7144a119ef43adb634654522c12ec250",
"name": "testing.yml",
"relpath": "workflows/testing.yml",
"size": 802,
"type": "file"
}
],
"maccess": 1678375244.4865956,
"name": "workflows",
"relpath": "/workflows",
"size": 1493,
"type": "dir"
}
],
"free": 0,
"name": "github",
"size": 1510,
"total": 0,
"ts": 1678375259,
"type": "storage"
},
{
"attr": {
"access": 1678375259,
"access_version": "0.8.7",
"created": 1678375259,
"created_version": "0.8.7"
},
"name": "meta",
"type": "meta"
}
],
"name": "top",
"type": "top"
}

@ -0,0 +1,7 @@
top
└── storage: github
nbfiles:2 | totsize:1510 | free:0.0% | du:0/0 | date:2023-03-09 16:20:59
├── workflows [nbfiles:2, totsize:1493]
│ ├── pypi-release.yml [size:691, md5:57699a7a6a03e20e864f220e19f8e197]
│ └── testing.yml [size:802, md5:7144a119ef43adb634654522c12ec250]
└── FUNDING.yml [size:17, md5:0c6407a84d412c514007313fb3bca4de]

@ -0,0 +1,148 @@
#!/usr/bin/env bash
# author: deadc0de6 (https://github.com/deadc0de6)
# Copyright (c) 2023, deadc0de6
# exit on first error
set -e
# get current path
rl="readlink -f"
if ! ${rl} "${0}" >/dev/null 2>&1; then
rl="realpath"
if ! command -v ${rl}; then
echo "\"${rl}\" not found !" && exit 1
fi
fi
cur=$(dirname "$(${rl} "${0}")")
# pivot
prev="${cur}/.."
cd "${prev}"
# coverage
#export PYTHONPATH=".:${PYTHONPATH}"
bin="python3 -m catcli.catcli"
if command -v coverage 2>/dev/null; then
bin="coverage run -p --source=catcli -m catcli.catcli"
#bin="coverage run -p --source=${prev}/catcli -m catcli.catcli"
fi
echo "current dir: $(pwd)"
echo "pythonpath: ${PYTHONPATH}"
echo "bin: ${bin}"
${bin} --version
# get the helpers
# shellcheck source=tests-ng/helper
source "${cur}"/helper
echo -e "$(tput setaf 6)==> RUNNING $(basename "${BASH_SOURCE[0]}") <==$(tput sgr0)"
##########################################################
# the test
##########################################################
# create temp dirs
tmpd=$(mktemp -d)
clear_on_exit "${tmpd}"
catalog="${tmpd}/catalog"
# index
${bin} -B index -c --catalog="${catalog}" github .github
ls -laR .github
cat "${catalog}"
#cat "${catalog}"
echo ""
# compare keys
echo "[+] compare keys"
src="tests-ng/assets/github.catalog.json"
src_keys="${tmpd}/src-keys"
dst_keys="${tmpd}/dst-keys"
cat "${src}" | jq '.. | keys?' | jq '.[]' | sort > "${src_keys}"
cat "${catalog}" | jq '.. | keys?' | jq '.[]' | sort > "${dst_keys}"
echo "src:"
cat "${src_keys}"
echo "dst:"
cat "${dst_keys}"
diff "${src_keys}" "${dst_keys}"
echo "ok!"
# compare children 1
echo "[+] compare children 1"
src_keys="${tmpd}/src-child1"
dst_keys="${tmpd}/dst-child1"
cat "${src}" | jq '. | select(.type=="top") | .children | .[].name' | sort > "${src_keys}"
cat "${catalog}" | jq '. | select(.type=="top") | .children | .[].name' | sort > "${dst_keys}"
echo "src:"
cat "${src_keys}"
echo "dst:"
cat "${dst_keys}"
diff "${src_keys}" "${dst_keys}"
echo "ok!"
# compare children 2
echo "[+] compare children 2"
src_keys="${tmpd}/src-child2"
dst_keys="${tmpd}/dst-child2"
cat "${src}" | jq '. | select(.type=="top") | .children | .[] | select(.name=="github") | .children | .[].name' | sort > "${src_keys}"
cat "${catalog}" | jq '. | select(.type=="top") | .children | .[] | select(.name=="github") | .children | .[].name' | sort > "${dst_keys}"
echo "src:"
cat "${src_keys}"
echo "dst:"
cat "${dst_keys}"
diff "${src_keys}" "${dst_keys}"
echo "ok!"
# compare children 3
echo "[+] compare children 3"
src_keys="${tmpd}/src-child3"
dst_keys="${tmpd}/dst-child3"
cat "${src}" | jq '. | select(.type=="top") | .children | .[] | select(.name=="github") | .children | .[] | select(.name=="workflows") | .children | .[].name' | sort > "${src_keys}"
cat "${catalog}" | jq '. | select(.type=="top") | .children | .[] | select(.name=="github") | .children | .[] | select(.name=="workflows") | .children | .[].name' | sort > "${dst_keys}"
echo "src:"
cat "${src_keys}"
echo "dst:"
cat "${dst_keys}"
diff "${src_keys}" "${dst_keys}"
echo "ok!"
# native
echo "[+] compare native output"
native="${tmpd}/native.txt"
${bin} -B ls -s -r --format=native --catalog="${catalog}" > "${native}"
mod="${tmpd}/native.mod.txt"
cat "${native}" | sed -e 's/free:.*%/free:0.0%/g' \
-e 's/date:....-..-.. ..:..:../date:2023-03-09 16:20:59/g' \
-e 's#du:[^|]* |#du:0/0 |#g' > "${mod}"
if command -v delta >/dev/null; then
delta -s "tests-ng/assets/github.catalog.native.txt" "${mod}"
fi
diff --color=always "tests-ng/assets/github.catalog.native.txt" "${mod}"
echo "ok!"
# csv
echo "[+] compare csv output"
csv="${tmpd}/csv.txt"
${bin} -B ls -s -r --format=csv --catalog="${catalog}" > "${csv}"
# modify created csv
mod="${tmpd}/csv.mod.txt"
cat "${csv}" | sed -e 's/"2","[^"]*","[^"]*",""/"2","0","0",""/g' | \
sed 's/20..-..-.. ..:..:..//g' > "${mod}"
# modify original
ori="${tmpd}/ori.mod.txt"
cat "tests-ng/assets/github.catalog.csv.txt" | \
sed 's/....-..-.. ..:..:..//g' | \
sed 's/"2","[^"]*","[^"]*",""/"2","0","0",""/g' > "${ori}"
if command -v delta >/dev/null; then
delta -s "${ori}" "${mod}"
fi
diff "${ori}" "${mod}"
echo "ok!"
# the end
echo "test \"$(basename "$0")\" success"
cd "${cur}"
exit 0

@ -0,0 +1,68 @@
# author: deadc0de6 (https://github.com/deadc0de6)
# Copyright (c) 2023, deadc0de6
#
# file to be sourced from test scripts
#
declare -a to_be_cleared
# add a file/directory to be cleared
# on exit
#
# $1: file path to clear
clear_on_exit()
{
local len="${#to_be_cleared[*]}"
# shellcheck disable=SC2004
to_be_cleared[${len}]="$1"
if [ "${len}" = "0" ]; then
# set trap
trap on_exit EXIT
fi
}
# clear files
on_exit()
{
for i in "${to_be_cleared[@]}"; do
rm -rf "${i}"
done
}
# osx tricks
# brew install coreutils gnu-sed
if [[ $OSTYPE == 'darwin'* ]]; then
mktemp() {
gmktemp "$@"
}
stat() {
gstat "$@"
}
sed() {
gsed "$@"
}
wc() {
gwc "$@"
}
date() {
gdate "$@"
}
chmod() {
gchmod "$@"
}
readlink() {
greadlink "$@"
}
realpath() {
grealpath "$@"
}
export -f mktemp
export -f stat
export -f sed
export -f wc
export -f date
export -f chmod
export -f readlink
export -f realpath
fi

@ -2,72 +2,93 @@
# author: deadc0de6 (https://github.com/deadc0de6)
# Copyright (c) 2021, deadc0de6
cur=$(dirname "$(readlink -f "${0}")")
cwd=`pwd`
# exit on first error
set -e
# get current path
rl="readlink -f"
if ! ${rl} "${0}" >/dev/null 2>&1; then
rl="realpath"
if ! command -v ${rl}; then
echo "\"${rl}\" not found !" && exit 1
fi
fi
cur=$(dirname "$(${rl} "${0}")")
# pivot
cd ${cur}/../
python3 -m catcli.catcli --version
prev="${cur}/.."
cd "${prev}"
# coverage
#export PYTHONPATH=".:${PYTHONPATH}"
bin="python3 -m catcli.catcli"
if command -v coverage 2>/dev/null; then
bin="coverage run -p --source=catcli -m catcli.catcli"
#bin="coverage run -p --source=${prev}/catcli -m catcli.catcli"
fi
echo "current dir: $(pwd)"
echo "pythonpath: ${PYTHONPATH}"
echo "bin: ${bin}"
${bin} --version
# get the helpers
# shellcheck source=tests-ng/helper
source "${cur}"/helper
echo -e "$(tput setaf 6)==> RUNNING $(basename "${BASH_SOURCE[0]}") <==$(tput sgr0)"
##########################################################
# the test
##########################################################
# create temp dirs
tmpd=`mktemp -d`
tmpd=$(mktemp -d)
clear_on_exit "${tmpd}"
tmpu="${tmpd}/dir2"
mkdir -p ${tmpu}
# setup cleaning
clean() {
# clean
rm -rf ${tmpd} ${tmpu}
}
trap clean EXIT
mkdir -p "${tmpu}"
catalog="${tmpd}/catalog"
mkdir -p ${tmpd}/dir
echo "abc" > ${tmpd}/dir/a
mkdir -p "${tmpd}/dir"
echo "abc" > "${tmpd}/dir/a"
# index
python3 -m catcli.catcli -B index --catalog=${catalog} dir ${tmpd}/dir
python3 -m catcli.catcli -B ls --catalog=${catalog} dir
${bin} -B index --catalog="${catalog}" dir "${tmpd}/dir"
${bin} -B ls --catalog="${catalog}" dir
# get attributes
freeb=`python3 -m catcli.catcli -B ls --catalog=${catalog} dir | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g'`
dub=`python3 -m catcli.catcli -B ls --catalog=${catalog} dir | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g'`
dateb=`python3 -m catcli.catcli -B ls --catalog=${catalog} dir | grep date: | sed 's/^.*,date: \(.*\)$/\1/g'`
freeb=$(${bin} -B ls --catalog="${catalog}" dir | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g')
dub=$(${bin} -B ls --catalog="${catalog}" dir | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g')
dateb=$(${bin} -B ls --catalog="${catalog}" dir | grep date: | sed 's/^.*,date: \(.*\)$/\1/g')
echo "before: free:${freeb} | du:${dub} | date:${dateb}"
# change content
echo "abc" >> ${tmpd}/dir/a
echo "abc" > ${tmpd}/dir/b
echo "abc" >> "${tmpd}/dir/a"
echo "abc" > "${tmpd}/dir/b"
# move dir
cp -r ${tmpd}/dir ${tmpu}/
cp -r "${tmpd}/dir" "${tmpu}/"
# sleep to force date change
sleep 1
# update
python3 -m catcli.catcli -B update -f --catalog=${catalog} dir ${tmpu}/dir
python3 -m catcli.catcli -B ls --catalog=${catalog} dir
${bin} -B update -f --catalog="${catalog}" dir "${tmpu}/dir"
${bin} -B ls --catalog="${catalog}" dir
# get new attributes
freea=`python3 -m catcli.catcli -B ls --catalog=${catalog} dir | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g'`
dua=`python3 -m catcli.catcli -B ls --catalog=${catalog} dir | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g'`
datea=`python3 -m catcli.catcli -B ls --catalog=${catalog} dir | grep date: | sed 's/^.*,date: \(.*\)$/\1/g'`
freea=$(${bin} -B ls --catalog="${catalog}" dir | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g')
dua=$(${bin} -B ls --catalog="${catalog}" dir | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g')
datea=$(${bin} -B ls --catalog="${catalog}" dir | grep date: | sed 's/^.*,date: \(.*\)$/\1/g')
echo "after: free:${freea} | du:${dua} | date:${datea}"
# test they are all different
[ "${freeb}" = "${freea}" ] && echo "WARNING free didn't change!"
[ "${dub}" = "${dua}" ] && echo "WARNING du didn't change!"
[ "${dateb}" = "${datea}" ] && echo "date didn't change!" && exit 1
# pivot back
cd ${cwd}
[ "${dateb}" = "${datea}" ] && echo "WARNING date didn't change!" && exit 1
# the end
echo "test \"`basename $0`\" success"
echo "test \"$(basename "$0")\" success"
cd "${cur}"
exit 0

@ -4,3 +4,5 @@ nose2; python_version >= '3.0'
coverage; python_version >= '3.0'
coveralls; python_version >= '3.0'
pylint; python_version > '3.0'
mypy; python_version > '3.0'
pytest; python_version > '3.0'

@ -5,44 +5,91 @@
cur=$(dirname "$(readlink -f "${0}")")
# stop on first error
set -ev
set -e
#set -v
# pycodestyle
echo "[+] pycodestyle"
pycodestyle --version
pycodestyle --ignore=W605 catcli/
pycodestyle catcli/
pycodestyle tests/
pycodestyle setup.py
# pyflakes
echo "[+] pyflakes"
pyflakes --version
pyflakes catcli/
pyflakes tests/
pyflakes setup.py
# pylint
# R0914: Too many local variables
# R0913: Too many arguments
# R0912: Too many branches
# R0915: Too many statements
# R0911: Too many return statements
# R0903: Too few public methods
# R0902: Too many instance attributes
# R0201: no-self-used
echo "[+] pylint"
pylint --version
pylint \
pylint -sn \
--disable=R0914 \
--disable=R0913 \
--disable=R0912 \
--disable=R0915 \
--disable=R0911 \
--disable=R0903 \
--disable=R0902 \
--disable=R0201 \
--disable=R0022 \
catcli/
pylint \
# R0801: Similar lines in 2 files
# W0212: Access to a protected member
# R0914: Too many local variables
# R0915: Too many statements
pylint -sn \
--disable=R0801 \
--disable=W0212 \
--disable=R0914 \
--disable=R0915 \
--disable=R0801 \
tests/
pylint -sn setup.py
# mypy
echo "[+] mypy"
mypy --strict catcli/
nosebin="nose2"
PYTHONPATH=catcli ${nosebin} --with-coverage --coverage=catcli
# unittest
echo "[+] unittests"
coverage run -p -m pytest tests
for t in ${cur}/tests-ng/*; do
echo "running test \"`basename ${t}`\""
# tests-ng
echo "[+] tests-ng"
for t in "${cur}"/tests-ng/*.sh; do
echo "running test \"$(basename "${t}")\""
${t}
done
# check shells
echo "[+] shellcheck"
if ! command -v shellcheck >/dev/null 2>&1; then
echo "Install shellcheck"
exit 1
fi
shellcheck --version
find . -iname '*.sh' | while read -r script; do
shellcheck -x \
--exclude SC2002 \
"${script}"
done
# merge coverage
echo "[+] coverage merge"
coverage combine
echo "ALL TESTS DONE OK"
exit 0

@ -46,7 +46,7 @@ class TestUpdate(unittest.TestCase):
d2f2 = create_rnd_file(dir2, 'dir2file2')
noder = Noder(debug=True)
noder.set_hashing(True)
noder.do_hashing(True)
top = noder.new_top_node()
catalog = Catalog(catalogpath, force=True, debug=False)

Loading…
Cancel
Save