Compare commits

...

223 Commits

Author SHA1 Message Date
deadc0de6 3a082ee5ad bump version 4 months ago
deadc0de6 1e12caa770 update readme 4 months ago
deadc0de6 c9b4043e5f fix bugs 4 months ago
deadc0de6 e6ca6e2fcc bump version 5 months ago
deadc0de6 1f1ea8d37c env variables 5 months ago
deadc0de6 eb0a628ab6 fix dependencies 5 months ago
deadc0de d8eae3024e
Merge pull request #47 from michalkielan/add_env_var
Add option to override default catalog path
5 months ago
Michal Kielan bbfb60fe2e use optional env variable for catalog path 5 months ago
deadc0de6 8788ba8b86 bump version 5 months ago
deadc0de 06a36c42fd
Merge pull request #45 from deadc0de6/features-42
add features for #42
5 months ago
deadc0de6 6b9e00f11b fix linting 5 months ago
deadc0de6 e63f5b8d79 pytype checks 5 months ago
deadc0de6 6164930088 linting 5 months ago
deadc0de6 638085ce98 repl 5 months ago
deadc0de6 f3c4a86b1d tests 6 months ago
deadc0de6 963a8b0518 update ensures dir size are correct 6 months ago
deadc0de6 ea0cb2f9da linting 6 months ago
deadc0de6 340ab62d77 du 6 months ago
deadc0de6 ff02f9bb97 mypy fix 6 months ago
deadc0de6 4aaa073603 mypy 6 months ago
deadc0de6 8111327b53 fix find 6 months ago
deadc0de6 bed81ffee4 fix ls 6 months ago
deadc0de6 e58da8b6bf wildcard and refactoring 6 months ago
deadc0de6 0dcbfa94bd refactoring 6 months ago
deadc0de6 691396c96a add tree command 6 months ago
deadc0de6 b7d6f21cc2 ls sorting 6 months ago
deadc0de6 9326911824 coverage badge 6 months ago
deadc0de6 b0876f3382 add mypy.ini 6 months ago
deadc0de6 5c06e36cc6 mypy version 6 months ago
deadc0de6 5301126d90 fix mypy 6 months ago
deadc0de6 9d6bba0127 remove python 3.6 6 months ago
deadc0de6 982f69d410 add python 3.11 6 months ago
deadc0de6 c61ce59b35 fix actions 6 months ago
deadc0de6 07d323f0e6 fix tests 6 months ago
deadc0de6 ddefc662db coverage 6 months ago
deadc0de6 3ccaf81abd fix find 6 months ago
deadc0de6 e610273dc3 cleaning 6 months ago
deadc0de6 f918ea5ae4 handle file listing and glob 6 months ago
deadc0de6 9050c6bcf6 typecheck 6 months ago
deadc0de6 9dfc4da8bf check for TODO/FIXME 6 months ago
deadc0de6 35d1d0d9c4 depth to nbchildren 6 months ago
deadc0de6 1c74290fa1 update 6 months ago
deadc0de6 ac4ba145fe clear relpath and add full path 6 months ago
deadc0de6 3cf3031af2 bump version 11 months ago
deadc0de 1c2d5f378e
Merge pull request #40 from deadc0de6/fix-39
Fix 39
11 months ago
deadc0de6 57d865bb71 Merge branch 'fix-39' of github.com:deadc0de6/catcli into fix-39 11 months ago
deadc0de6 b7bd2ecc5d fix casting 11 months ago
deadc0de6 e2be5136d4 fix for #39 11 months ago
deadc0de6 b0572bf119 decomp bare test 11 months ago
deadc0de6 001c03ca28 fix casting 11 months ago
deadc0de6 b809850fa0 fix for #39 11 months ago
deadc0de6 b057bde35d remove allow_other in fuse 11 months ago
deadc0de6 055fe8a0ca bump version 12 months ago
deadc0de 9dbc8f7342
Merge pull request #37 from deadc0de6/fix-36
size attr fix for #36
12 months ago
deadc0de6 7e9a6806ee size attr fix for #36 12 months ago
deadc0de6 3dbec74e96 bump version 1 year ago
deadc0de 6321d04417
Merge pull request #35 from deadc0de6/fix-34
Fix 34
1 year ago
deadc0de6 a16838299f debug logs 1 year ago
deadc0de6 d00964483e fix tests 1 year ago
deadc0de6 226448b334 add test for #34 1 year ago
deadc0de6 205f4e2148 fix bug #34 1 year ago
deadc0de6 b4723b6277 badge 1 year ago
deadc0de6 f738644a52 bump version 1 year ago
deadc0de6 59cb7bc953 do not depend on fusepy and pyfzf 1 year ago
deadc0de6 59333441ee bump version 1 year ago
deadc0de6 4147e35c28 fix dependencies in setup.py 1 year ago
deadc0de6 8de3eec0fa bump version 1 year ago
deadc0de6 b9ca63f8bc fix setup.py 1 year ago
deadc0de6 7d374ca2fb bump version 1 year ago
deadc0de 511a32edfa
Merge pull request #30 from deadc0de6/fuse
Fuse
1 year ago
deadc0de6 ee2cf80d9d fix tests 1 year ago
deadc0de6 49160f2b1d fix recursive size bug 1 year ago
deadc0de6 44a13b526f update readme 1 year ago
deadc0de6 e501668102 sort 1 year ago
deadc0de6 611ddd64a8 tests 1 year ago
deadc0de6 ac79557d5d verbosity 1 year ago
deadc0de6 30c1af0d15 add coveralls 1 year ago
deadc0de6 8fdaba5aea fix tests 1 year ago
deadc0de6 9deed263d3 compare 1 year ago
deadc0de6 5e27cc40ba more tests 1 year ago
deadc0de6 b76dff46d6 shellcheck linting 1 year ago
deadc0de6 3ac55e7294 new test 1 year ago
deadc0de6 0da1adcca0 test assets 1 year ago
deadc0de6 b838166a37 deps 1 year ago
deadc0de6 0740626ef8 linting 1 year ago
deadc0de6 933f6efd2d percentage 1 year ago
deadc0de6 8ed43d5262 remove compare 1 year ago
deadc0de6 c318a62941 logging 1 year ago
deadc0de6 a69a751544 add pylint as deps 1 year ago
deadc0de6 3c6cce6b4c update doc 1 year ago
deadc0de6 63a052fc00 update ignored files 1 year ago
deadc0de6 e4cda7948c fix fuse 1 year ago
deadc0de6 e39088f6cb fix tests 1 year ago
deadc0de6 d294aa59e1 fix formats 1 year ago
deadc0de6 a333f60fa7 refactoring 1 year ago
deadc0de6 45d3b096f3 refactor node types 1 year ago
deadc0de6 45c2599a95 tests 1 year ago
deadc0de6 599852eb7a ignore coverage 1 year ago
deadc0de6 bdf05e9322 linting 1 year ago
deadc0de6 bb0835cd46 update test 1 year ago
deadc0de6 54eb365ee4 linting 1 year ago
deadc0de6 50eb5cf9fd add custom node type 1 year ago
deadc0de6 4a9e565e74 typing 1 year ago
deadc0de6 047c9bf4ab type hinting 1 year ago
deadc0de6 8659bfb348 gitignore 1 year ago
deadc0de6 04b8d6e770 fuse memory filesystem 1 year ago
deadc0de6 7590ad02c3 update readme for fzf 1 year ago
deadc0de 761ea54e1e
Merge pull request #29 from deadc0de6/fzf
Fzf
1 year ago
deadc0de6 37d2397139 move to ubuntu20.04 on gh actions 1 year ago
deadc0de6 3fd93fe23b add more python versions 1 year ago
deadc0de6 6ec86cc143 fix testing 1 year ago
deadc0de6 517aed8b7a ignore catalog files 2 years ago
deadc0de6 58e533f69e R0201 2 years ago
deadc0de6 7775a941a7 linting 2 years ago
deadc0de6 173dca1d34 refactoring 2 years ago
deadc0de6 362d824787 fix readme 2 years ago
deadc0de6 d9390a3288 replace tree with "ls -r" 2 years ago
deadc0de6 6175230892 refactoring 2 years ago
deadc0de6 eb9a2d3213 comments 2 years ago
deadc0de6 101f2d217b refactoring 2 years ago
deadc0de6 34fb8d4894 logs and comments 2 years ago
deadc0de6 b3101a1dfa drop 3.5 support 2 years ago
deadc0de6 caebd209e0 refactoring 2 years ago
deadc0de6 d8a360d3b5 fix fzf find 2 years ago
deadc0de6 f782078c3d linting 2 years ago
deadc0de6 3d13218486 fzf not for ls 2 years ago
deadc0de6 94f419b025 du correctly output used/total 2 years ago
deadc0de6 7ca20be2e5 split csv and csv-with-header formats 2 years ago
deadc0de6 c0afb0feeb linting 2 years ago
deadc0de6 cf3e465525 linting and f-string 2 years ago
deadc0de6 2176ac6127 first attempt at fzf for find and ls 2 years ago
deadc0de6 0422ad9229 bump version 2 years ago
deadc0de 0fa48ade13
Merge pull request #24 from mwalker/add-raw-size-option
Add --raw-size (-s) option to print raw size
2 years ago
Matthew Walker 49dd45e84c Add --raw-size (-s) option to print raw size
When used the raw size in bytes will be printed, rather than a
summary in human readable form.
2 years ago
deadc0de6 53d463cab6 bump version 2 years ago
deadc0de6 d0bda4b679 adding ci/cd on PR 2 years ago
deadc0de 34f6bc80e8
Merge pull request #23 from mwalker/escape-double-quote-in-csv
Escape double quotes in csv format
2 years ago
Matthew Walker b08548f276 Escape double quotes in csv format 2 years ago
deadc0de6 582fa83c8c update readme 2 years ago
deadc0de6 8385b0c0fd update readme 2 years ago
deadc0de6 674880d807 bump version 2 years ago
deadc0de6 7c06889aec linting 2 years ago
deadc0de6 bcfd8a8374 bump version 2 years ago
deadc0de6 4be5047d40 fix regression bug 2 years ago
deadc0de6 50fdcb3609 update readme 2 years ago
deadc0de6 3ed5f05e31 bump version 2 years ago
deadc0de6 f49c5364cd fix bug #21 on Windows 2 years ago
deadc0de6 797200f376 bump version 2 years ago
deadc0de6 0979871ce8 remove setup.cfg 2 years ago
deadc0de6 ba227e2318 bump version 2 years ago
deadc0de6 f52f196a1f update doc 2 years ago
deadc0de fd942d9d10
Merge pull request #19 from deadc0de6/csv-extended
adding new fileds to CSV for #18
2 years ago
deadc0de6 5d4f22595a adding new fileds for #18 2 years ago
deadc0de6 695c3525fb Merge branch 'master' of github.com:deadc0de6/catcli 2 years ago
deadc0de6 8e918d34e2 bump version 2 years ago
deadc0de 8d7e4bb3a8
Merge pull request #17 from deadc0de6/csv
Csv
2 years ago
deadc0de6 56d40c12cc adding header switch for tree 2 years ago
deadc0de6 93f9cf560d refactor print_supported_formats 2 years ago
deadc0de6 ecd15e66bd refactor and replace export with tree 2 years ago
deadc0de6 a3ebd2d5a4 revert to nose 2 years ago
deadc0de6 1ac6f01b6c add print_supported_formats to list available formats 2 years ago
deadc0de6 ee8260b524 update readme 2 years ago
deadc0de6 47602fe32d fix tests 2 years ago
deadc0de6 1b2c52fb3e add csv format to ls and find 2 years ago
deadc0de6 f5b20c023f add CSV doc 2 years ago
deadc0de6 a7a7626b13 linting 2 years ago
deadc0de6 cef572d7eb refactor storage args output 2 years ago
deadc0de6 f181f33753 refactor logging 2 years ago
deadc0de6 38bc83d3b9 adding indexed at and maccess to csv 2 years ago
deadc0de6 8453468459 add csv export for #16 2 years ago
deadc0de6 9275050c62 bump version 3 years ago
deadc0de6 bb5ded2208 fix test 3 years ago
deadc0de6 9b77b2a360 add tests-ng 3 years ago
deadc0de6 bd5d504997 fix bug for #15 3 years ago
deadc0de6 613a9092c1 bump version 3 years ago
deadc0de6 5657a0eeb4 fix issue #14 3 years ago
deadc0de6 031cc1a331 bump version 3 years ago
deadc0de6 9f6f1bb3a4 fix archive bug #13 3 years ago
deadc0de6 03f82f1514 bump version 3 years ago
deadc0de6 a596f39ac2 handle capital archive extension for #13 3 years ago
deadc0de6 7e1336c57f adding ko-fi 3 years ago
deadc0de6 ff929302c8 bump version 3 years ago
deadc0de6 37621f6ef6 fix tests 3 years ago
deadc0de6 73fd4df412 refactor storage display on two lines 3 years ago
deadc0de6 51b33e6e42 implement --no-color for #12 3 years ago
deadc0de6 3284113c5b implement --no-banner for #12 3 years ago
deadc0de b4ddb50242
Create FUNDING.yml 3 years ago
deadc0de6 430653c89f bump version 3 years ago
deadc0de6 b38f7e9f0e drop 3.4 in tests 3 years ago
deadc0de6 52a05bb748 format storage attributes 3 years ago
deadc0de6 e989ab9f31 bump version 3 years ago
deadc0de6 271ef75f74 fix help 3 years ago
deadc0de6 d019a2717c adding gh actions 3 years ago
deadc0de6 ee1e3de79e bump version 4 years ago
deadc0de6 76eab12499 merge hashdiff 4 years ago
deadc0de6 09d86f0902 refactoring 4 years ago
deadc0de6 c2510924b1 log updates to file for #10 4 years ago
deadc0de6 4af351d144 doc 4 years ago
deadc0de6 bf0ecf83cc update tests 4 years ago
deadc0de6 e10a65f91c update feedback without verbose 4 years ago
deadc0de6 7d77061141 fix pep8 4 years ago
deadc0de6 818e4fa5e0 ignore maccess change on directory for faster re-indexing 4 years ago
deadc0de6 6e4c3784fb fix re-indexing and refactor debug output 4 years ago
deadc0de6 99d29c272f fix re-indexing and refactor debug output 4 years ago
deadc0de6 7cb0e2050a fix tests 4 years ago
deadc0de6 74e74a6f9f refactoring 4 years ago
deadc0de6 a8e3c3f77d fix maccess comparison 4 years ago
deadc0de6 f23549786d fix typo 4 years ago
deadc0de6 c4a1320961 fix typo 4 years ago
deadc0de6 16bf5f99d4 refactor verbose in debug in code 4 years ago
deadc0de6 f0495e6d00 refactor logging 4 years ago
deadc0de6 6d7f95a6ae fix oserror in md5sum 4 years ago
deadc0de6 aec8950380 fix reindexing for #10 4 years ago
deadc0de 17edfcd0d0
Merge pull request #11 from deadc0de6/recursion
fix for #9
4 years ago
deadc0de6 5d0114d0e9 hash diff for #10 4 years ago
deadc0de6 40da582079 fix for #9 4 years ago
deadc0de6 a519d0a36c bump version 4 years ago
deadc0de6 382062ea4e fix update bug clearing other nodes 4 years ago
deadc0de6 c83c94bafb update README 4 years ago
deadc0de6 b8bd90d7e3 remove nightly tests 5 years ago
deadc0de6 5926239d89 bump version 5 years ago
deadc0de6 99e49624c9 replace psutil.disk_usage with shutil.disk_usage (#8) 5 years ago
deadc0de6 4faf1e6dec update pip install commands 5 years ago

@ -0,0 +1 @@
ko_fi: deadc0de6

@ -0,0 +1,7 @@
coverage:
status:
project:
default:
target: 90%
threshold: 1%
patch: off

@ -0,0 +1,26 @@
name: Release to PyPI
on:
release:
types: [created]
jobs:
pypi_publish:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python 3.8
uses: actions/setup-python@v2
with:
python-version: 3.8
- name: Install Tools
run: |
sudo apt update
python -m pip install --upgrade pip
pip install setuptools wheel twine
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Build and Publish
env:
TWINE_USERNAME: __token__
TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }}
run: |
python setup.py sdist bdist_wheel
twine upload dist/*

@ -0,0 +1,29 @@
name: tests
on: [push, pull_request, workflow_dispatch]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r tests-requirements.txt
pip install -r requirements.txt
sudo apt-get -y install shellcheck jq
- name: Run tests
run: |
./tests.sh
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
with:
files: coverage.xml
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

10
.gitignore vendored

@ -1,5 +1,15 @@
*.pyc
.coverage
.coverage*
coverages/
coverage.xml
dist/
build/
*.egg-info/
*.catalog
.vscode/
.mypy_cache
.pytest_cache
__pycache__
.pyre
.pytype

@ -0,0 +1,5 @@
[mypy]
strict = true
disable_error_code = import-untyped,import-not-found
ignore_missing_imports = True
warn_unused_ignores = False

@ -1,14 +0,0 @@
language: python
python:
- "3.4"
- "3.5"
- "3.6"
- "nightly"
install:
- "pip install pip --upgrade"
- "pip install -r tests-requirements.txt"
- "pip install -r requirements.txt"
script:
./tests.sh
after_success:
coveralls

@ -1,14 +1,21 @@
# CATCLI
[![Build Status](https://travis-ci.org/deadc0de6/catcli.svg?branch=master)](https://travis-ci.org/deadc0de6/catcli)
[![Tests Status](https://github.com/deadc0de6/catcli/workflows/tests/badge.svg?branch=master)](https://github.com/deadc0de6/catcli/actions)
[![License: GPL v3](https://img.shields.io/badge/License-GPL%20v3-blue.svg)](http://www.gnu.org/licenses/gpl-3.0)
[![Coverage Status](https://coveralls.io/repos/github/deadc0de6/catcli/badge.svg?branch=master)](https://coveralls.io/github/deadc0de6/catcli?branch=master)
[![Coverage](https://codecov.io/gh/deadc0de6/catcli/graph/badge.svg?token=t5dF7UL7K1)](https://codecov.io/gh/deadc0de6/catcli)
[![PyPI version](https://badge.fury.io/py/catcli.svg)](https://badge.fury.io/py/catcli)
[![AUR](https://img.shields.io/aur/version/catcli-git.svg)](https://aur.archlinux.org/packages/catcli-git)
[![Python](https://img.shields.io/pypi/pyversions/catcli.svg)](https://pypi.python.org/pypi/catcli)
[![Donate](https://img.shields.io/badge/donate-KoFi-blue.svg)](https://ko-fi.com/deadc0de6)
*The command line catalog tool for your offline data*
> [!WARNING]
> catcli has been superseded by [gocatcli](https://github.com/deadc0de6/gocatcli/)
> which provides all features of catcli and more...
Did you ever wanted to find back that specific file that should be on one of your
backup DVDs or one of your external hard drives? You usually go through all
of them hoping to find the right one on the first try?
@ -21,13 +28,16 @@ Features:
* Index any directories in a catalog
* Ability to search for files by name in the catalog
* Ability to navigate through indexed data à la `ls`
* Support for fuse to mount the indexed data as a virtual filesystem
* Handle archive files (zip, tar, ...) and index their content
* Save catalog to json for easy versioning with git
* Command line interface FTW
* Store files and directories sizes
* Store md5 hash of files
* Ability to update the catalog
* Support for `fzf` for finding files
* Tag your different storages with additional information
* Export catalog to CSV
<a href="https://asciinema.org/a/hRE22qbVtBGxOM1yxw2y4fBy8"><img src="https://asciinema.org/a/hRE22qbVtBGxOM1yxw2y4fBy8.png" width="50%" height="50%"></a>
@ -35,15 +45,17 @@ Quick start:
```bash
# install catcli with pip
sudo pip3 install catcli
pip3 install catcli --user
# index a directory in the catalog
catcli index -u --meta='some description' log /var/log
catcli index --meta='some description' log /var/log
# display the content
catcli tree
catcli ls -r
# navigate
catcli ls log
# find files/directories named '*log*'
catcli find log
# show directories sizes
catcli du log
```
see [usage](#usage) for specific info
@ -68,19 +80,23 @@ See the [examples](#examples) for an overview of the available features.
* [Index archive files](#index-archive-files)
* [Walk indexed files with ls](#walk-indexed-files-with-ls)
* [Find files](#find-files)
* [Display entire tree](#display-entire-tree)
* [Mount catalog](#mount-catalog)
* [Display entire hierarchy](#display-entire-hierarchy)
* [Disk usage](#disk-usage)
* [Catalog graph](#catalog-graph)
* [Edit storage](#edit-storage)
* [Update catalog](#update-catalog)
* [CSV format](#csv-format)
* [Examples](#examples)
* [Contribution](#contribution)
* [Thank you](#thank-you)
# Installation
To install run:
Install from Pypi
```bash
$ sudo pip3 install catcli
$ pip3 install catcli --user
```
Or from github directly
@ -93,7 +109,7 @@ $ catcli --help
To work with catcli without installing it, you can do the following
```bash
$ cd /tmp; git clone https://github.com/deadc0de6/catcli && cd catcli
$ sudo pip3 install -r requirements.txt
$ pip3 install -r requirements.txt --user
$ python3 -m catcli.catcli --help
```
@ -115,13 +131,20 @@ and they are all available through the command line interface of catcli.
Five different types of entry are present in a catalog:
* **top node**: this is the root of the tree
* **top node**: this is the root of the hierarchy
* **storage node**: this represents an indexed storage (a DVD, an external
hard drive, an USB drive, some arbitrary directory, etc).
* **dir node**: this is a directory
* **file node**: this is a file
* **archive node**: this is a file contained in an archive (tar, zip, etc)
Following environment variables are supported:
* `CATCLI_CATALOG_PATH`: define the catalog path (`--catalog=<path>`)
* `CATCLI_NO_BANNER`: disable the banner (`--no-banner`)
* `CATCLI_VERBOSE`: enable verbose mode (`--verbose`)
* `CATCLI_FORMAT`: define the output format (`-F --format=<fmt>`)
## Index data
Let's say the DVD or external hard drive that needs to be indexed
@ -130,7 +153,7 @@ will index the entire directory `/media/mnt`
and store that in your catalog under the name `<short-name>`.
```bash
$ catcli index --meta=<some-description> -u <short-name> /media/mnt
$ catcli index --meta=<some-description> <short-name> /media/mnt
```
If not specified otherwise (with the switch `--catalog`), the catalog is saved in the current
@ -139,9 +162,6 @@ directory under `catcli.catalog`.
The `--meta` switch allows to add any additional information to store along in
the catalog like for example `the blue disk in my office`.
Catcli will calculate and store the total size of each node (directories, storages, etc)
unless the `-n --no-subsize` switch is used.
Using the `-a --archive` switch allows to also index archive files as explained
[below](#index-archive-files).
@ -169,15 +189,48 @@ See the [examples](#examples) for more.
Files and directories can be found based on their names
using the `find` command.
`Find` support two formats that allow to use `fzf` for
searching:
* `--format=fzf-native`: display the result in native format
* `--format=fzf-csv`: display the result in csv
See the [examples](#examples) for more.
## Display entire tree
## Mount catalog
The catalog can be mounted with [fuse](https://www.kernel.org/doc/html/next/filesystems/fuse.html)
and navigate like any filesystem.
```bash
$ mkdir /tmp/mnt
$ catcli index -c github .github
$ catcli mount /tmp/mnt
$ ls -laR /tmp/mnt
drwxrwxrwx - user 8 Mar 22:08 github
mnt/github:
.rwxrwxrwx 17 user 19 Oct 2022 FUNDING.yml
drwxrwxrwx - user 2 Mar 10:15 workflows
mnt/github/workflows:
.rwxrwxrwx 691 user 19 Oct 2022 pypi-release.yml
.rwxrwxrwx 635 user 8 Mar 21:08 testing.yml
```
## Display entire hierarchy
The entire catalog can be shown using the `tree` command.
The entire catalog can be shown using the `ls -r` command.
Resulting files can be sorted by size using the `-S --sortsize` switch.
See the [examples](#examples) for more.
## Disk usage
You can get the disk usage with the `du` command.
Resulting files can be sorted by size using the `-S --sortsize` switch.
## Catalog graph
The catalog can be exported in a dot file that can be used to
@ -200,8 +253,28 @@ Storage entry can be edited with following catcli commands:
## Update catalog
The catalog can be updated with the `update` command.
Updates are based on the access time of each of the files. If using
`-c --hash`, only new files are re-hashed.
Updates are based on the access time of each of the files and on the
hash checksum if present (catalog was indexed with `-c --hash` and
`update` is called with the switch `-c --hash`).
## CSV format
Results can be printed to CSV using `--format=csv`.
Fields are separated by a comma (`,`) and are quoted with double quotes (`"`).
Each line contains the following fields:
* **name**: the entry name
* **type**: the entry type (file, directory, storage, etc)
* **path**: the entry path
* **size**: the entry size
* **indexed_at**: when this entry was indexed
* **maccess**: the entry modification date/time
* **md5**: the entry checksum (if any)
* **nbfiles**: the number of children (empty for nodes that are not storage or directory)
* **free_space**: free space (empty for not storage nodes)
* **total_space**: total space (empty for not storage nodes)
* **meta**: meta information (empty for not storage nodes)
# Examples
@ -241,10 +314,10 @@ $ catcli index --meta='my test directory' tmptest /tmp/test
Catcli creates its catalog file in the current directory as `catcli.catalog`.
Printing the entire catalog as a tree is done with the command `tree`
Printing the entire catalog as a tree is done with the command `ls -r`
```
$ catcli tree
$ catcli ls -r
top
└── storage: tmptest (my test directory) (nbfiles:3, free:3.7G/3.7G, date:2019-01-26 19:59:47)
├── a [nbfiles:3, totsize:72]
@ -365,8 +438,6 @@ $ catcli ls -ar some-name/v0.3.1.zip
└── catcli-0.3.1/ [archive:v0.3.1.zip]
```
All commands handle archive files (like `tree` or `find`).
# Contribution
If you are having trouble installing or using catcli, open an issue.
@ -375,6 +446,10 @@ If you want to contribute, feel free to do a PR (please follow PEP8).
The `tests.sh` script can be run to check the code.
# Thank you
If you like catcli, [buy me a coffee](https://ko-fi.com/deadc0de6).
# License
This project is licensed under the terms of the GPLv3 license.

@ -3,12 +3,12 @@ author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2017, deadc0de6
"""
# pylint: disable=C0415
import sys
__version__ = '0.5.11'
def main():
def main() -> None:
"""entry point"""
import catcli.catcli
if catcli.catcli.main():
sys.exit(0)

@ -6,88 +6,150 @@ Class that represents the catcli catalog
"""
import os
import pickle
from anytree.exporter import JsonExporter
from typing import Optional, List, Dict, Tuple, Union, Any
from anytree.exporter import JsonExporter, DictExporter
from anytree.importer import JsonImporter
from anytree import AnyNode
# local imports
import catcli.utils as utils
from catcli import nodes
from catcli.nodes import NodeMeta, NodeTop
from catcli.utils import ask
from catcli.logger import Logger
class Catalog:
"""the catalog"""
def __init__(self, path, pickle=False, verbose=False, force=False):
self.path = path # catalog path
self.verbose = verbose # verbosity
self.force = force # force overwrite if exists
self.metanode = None
self.pickle = pickle
def __init__(self, path: str,
debug: bool = False,
force: bool = False) -> None:
"""
@path: catalog path
@usepickle: use pickle
@debug: debug mode
@force: force overwrite if exists
"""
self.path = os.path.expanduser(path)
self.debug = debug
self.force = force
self.metanode: Optional[NodeMeta] = None
def set_metanode(self, metanode):
'''remove the metanode until tree is re-written'''
def set_metanode(self, metanode: NodeMeta) -> None:
"""remove the metanode until tree is re-written"""
self.metanode = metanode
self.metanode.parent = None
if self.metanode:
self.metanode.parent = None
def exists(self) -> bool:
"""does catalog exist"""
if not self.path:
return False
if os.path.exists(self.path):
return True
return False
def restore(self):
'''restore the catalog'''
def restore(self) -> Optional[NodeTop]:
"""restore the catalog"""
if not self.path:
return None
if not os.path.exists(self.path):
return None
if self.pickle:
return self._restore_pickle()
return self._restore_json(open(self.path, 'r').read())
with open(self.path, 'r', encoding='UTF-8') as file:
content = file.read()
return self._restore_json(content)
def save(self, node):
'''save the catalog'''
def save(self, node: NodeTop) -> bool:
"""save the catalog"""
if not self.path:
Logger.err('Path not defined')
return False
d = os.path.dirname(self.path)
if d and not os.path.exists(d):
os.makedirs(d)
directory = os.path.dirname(self.path)
if directory and not os.path.exists(directory):
os.makedirs(directory)
elif os.path.exists(self.path) and not self.force:
if not utils.ask('Update catalog \"{}\"'.format(self.path)):
if not ask(f'Update catalog \"{self.path}\"'):
Logger.info('Catalog not saved')
return False
if d and not os.path.exists(d):
Logger.err('Cannot write to \"{}\"'.format(d))
if directory and not os.path.exists(directory):
Logger.err(f'Cannot write to \"{directory}\"')
return False
if self.metanode:
self.metanode.parent = node
if self.pickle:
return self._save_pickle(node)
return self._save_json(node)
def _save_pickle(self, node):
'''pickle the catalog'''
pickle.dump(node, open(self.path, 'wb'))
if self.verbose:
Logger.info('Catalog saved to pickle \"{}\"'.format(self.path))
return True
def _debug(self, text: str) -> None:
if not self.debug:
return
Logger.debug(text)
def _restore_pickle(self):
'''restore the pickled tree'''
root = pickle.load(open(self.path, 'rb'))
if self.verbose:
m = 'Catalog imported from pickle \"{}\"'.format(self.path)
Logger.info(m)
return root
def _save_json(self, node):
'''export the catalog in json'''
exp = JsonExporter(indent=2, sort_keys=True)
with open(self.path, 'w') as f:
exp.write(node, f)
if self.verbose:
Logger.info('Catalog saved to json \"{}\"'.format(self.path))
def _save_json(self, top: NodeTop) -> bool:
"""export the catalog in json"""
self._debug(f'saving {top} to json...')
dexporter = DictExporter(attriter=attriter)
exp = JsonExporter(dictexporter=dexporter, indent=2, sort_keys=True)
with open(self.path, 'w', encoding='UTF-8') as file:
exp.write(top, file)
self._debug(f'Catalog saved to json \"{self.path}\"')
return True
def _restore_json(self, string):
'''restore the tree from json'''
imp = JsonImporter()
def _restore_json(self, string: str) -> Optional[NodeTop]:
"""restore the tree from json"""
imp = JsonImporter(dictimporter=_DictImporter(debug=self.debug))
root = imp.import_(string)
if self.verbose:
Logger.info('Catalog imported from json \"{}\"'.format(self.path))
return root
self._debug(f'Catalog imported from json \"{self.path}\"')
self._debug(f'root imported: {root}')
if root.type != nodes.TYPE_TOP:
return None
top = NodeTop(root.name, children=root.children)
self._debug(f'top imported: {top.get_name()}')
return top
class _DictImporter():
def __init__(self,
nodecls: AnyNode = AnyNode,
debug: bool = False):
self.nodecls = nodecls
self.debug = debug
def import_(self, data: Dict[str, str]) -> AnyNode:
"""Import tree from `data`."""
return self.__import(data)
def __import(self, data: Union[str, Any],
parent: AnyNode = None) -> AnyNode:
"""overwrite parent imoprt"""
assert isinstance(data, dict)
assert "parent" not in data
attrs = dict(data)
# replace attr
attrs = back_attriter(attrs)
children: Union[str, Any] = attrs.pop("children", [])
node = self.nodecls(parent=parent, **attrs)
for child in children:
self.__import(child, parent=node)
return node
def back_attriter(adict: Dict[str, str]) -> Dict[str, str]:
"""replace attribute on json restore"""
attrs = {}
for k, val in adict.items():
newk = k
if k == 'size':
newk = 'nodesize'
attrs[newk] = val
return attrs
def attriter(attrs: List[Tuple[str, Any]]) -> List[Tuple[str, Any]]:
"""replace attribute on json save"""
newattr = []
for attr in attrs:
k, val = attr
if k == 'nodesize':
k = 'size'
newattr.append((k, val))
return newattr

@ -11,229 +11,430 @@ Catcli command line interface
import sys
import os
import datetime
from typing import Dict, Any, List, \
Tuple
from docopt import docopt
import cmd2
# local imports
from . import __version__ as VERSION
from .logger import Logger
from .catalog import Catalog
from .walker import Walker
from .noder import Noder
from .utils import ask, edit
from catcli.version import __version__ as VERSION
from catcli.nodes import NodeTop, NodeAny
from catcli.logger import Logger
from catcli.printer_csv import CsvPrinter
from catcli.colors import Colors
from catcli.catalog import Catalog
from catcli.walker import Walker
from catcli.noder import Noder
from catcli.utils import ask, edit
from catcli.nodes_utils import path_to_search_all
from catcli.exceptions import BadFormatException, CatcliException
NAME = 'catcli'
CUR = os.path.dirname(os.path.abspath(__file__))
CATALOGPATH = '{}.catalog'.format(NAME)
GRAPHPATH = '/tmp/{}.dot'.format(NAME)
SEPARATOR = '/'
WILD = '*'
BANNER = """ +-+-+-+-+-+-+
FORMATS = ['native', 'csv', 'csv-with-header', 'fzf-native', 'fzf-csv']
# env variables
ENV_CATALOG_PATH = 'CATCLI_CATALOG_PATH'
ENV_NOBANNER = 'CATCLI_NO_BANNER'
ENV_VERBOSE = 'CATCLI_VERBOSE'
ENV_FORMAT = 'CATCLI_FORMAT'
# default paths
DEFAULT_CATALOGPATH = os.getenv(ENV_CATALOG_PATH, default=f'{NAME}.catalog')
DEFAULT_GRAPHPATH = f'/tmp/{NAME}.dot'
DEFAULT_NOBANNER = os.getenv(ENV_NOBANNER) is not None
DEFAULT_VERBOSEMODE = os.getenv(ENV_VERBOSE) is not None
DEFAULT_FORMAT = os.getenv(ENV_FORMAT, default='native')
BANNER = f""" +-+-+-+-+-+-+
|c|a|t|c|l|i|
+-+-+-+-+-+-+ v{}""".format(VERSION)
+-+-+-+-+-+-+ v{VERSION}"""
USAGE = """
{0}
USAGE = f"""
{BANNER}
Usage:
{1} index [--catalog=<path>] [--meta=<meta>...] [-acfnV] <name> <path>
{1} update [--catalog=<path>] [-acfnV] <name> <path>
{1} ls [--catalog=<path>] [-arVS] [<path>]
{1} find [--catalog=<path>] [-abdVP] [--path=<path>] <term>
{1} rm [--catalog=<path>] [-fV] <storage>
{1} tree [--catalog=<path>] [-aVS] [<path>]
{1} rename [--catalog=<path>] [-fV] <storage> <name>
{1} edit [--catalog=<path>] [-fV] <storage>
{1} graph [--catalog=<path>] [-V] [<path>]
{1} help
{1} --help
{1} --version
{NAME} ls [--catalog=<path>] [--format=<fmt>] [-aBCrVSs] [<path>]
{NAME} tree [--catalog=<path>] [-aBCVSs] [<path>]
{NAME} find [--catalog=<path>] [--format=<fmt>]
[-aBCbdVs] [--path=<path>] [<term>]
{NAME} index [--catalog=<path>] [--meta=<meta>...]
[-aBCcfV] <name> <path>
{NAME} update [--catalog=<path>] [-aBCcfV]
[--lpath=<path>] <name> <path>
{NAME} mount [--catalog=<path>] [-V] <mountpoint>
{NAME} du [--catalog=<path>] [-BCVSs] [<path>]
{NAME} rm [--catalog=<path>] [-BCfV] <storage>
{NAME} rename [--catalog=<path>] [-BCfV] <storage> <name>
{NAME} edit [--catalog=<path>] [-BCfV] <storage>
{NAME} graph [--catalog=<path>] [-BCV] [<path>]
{NAME} [--catalog=<path>]
{NAME} fixsizes [--catalog=<path>]
{NAME} print_supported_formats
{NAME} help
{NAME} --help
{NAME} --version
Options:
--catalog=<path> Path to the catalog [default: {2}].
--meta=<meta> Additional attribute to store [default: ].
-p --path=<path> Start path.
-n --no-subsize Do not store size of directories [default: False].
-a --archive Handle archive file [default: False].
-f --force Do not ask when updating the catalog [default: False].
-d --directory Only directory (default: False).
-b --script Output script to manage found file(s) [default: False].
-S --sortsize Sort by size, largest first [default: False].
-c --hash Calculate md5 hash [default: False].
-r --recursive Recursive [default: False].
-P --parent Ignore stored relpath [default: True].
-V --verbose Be verbose [default: False].
-v --version Show version.
-h --help Show this screen.
""".format(BANNER, NAME, CATALOGPATH)
def cmd_index(args, noder, catalog, top, debug=False):
--catalog=<path> Path to the catalog [default: {DEFAULT_CATALOGPATH}].
--meta=<meta> Additional attribute to store [default: ].
-a --archive Handle archive file [default: False].
-B --no-banner Do not display the banner [default: {str(DEFAULT_NOBANNER)}].
-b --script Output script to manage found file(s) [default: False].
-C --no-color Do not output colors [default: False].
-c --hash Calculate md5 hash [default: False].
-d --directory Only directory [default: False].
-F --format=<fmt> see \"print_supported_formats\" [default: {DEFAULT_FORMAT}].
-f --force Do not ask when updating the catalog [default: False].
-l --lpath=<path> Path where changes are logged [default: ]
-p --path=<path> Start path.
-r --recursive Recursive [default: False].
-s --raw-size Print raw size [default: False].
-S --sortsize Sort by size, largest first [default: False].
-V --verbose Be verbose [default: {str(DEFAULT_VERBOSEMODE)}].
-v --version Show version.
-h --help Show this screen.
""" # nopep8
def cmd_mount(args: Dict[str, Any],
top: NodeTop,
noder: Noder) -> bool:
"""mount action"""
mountpoint = args['<mountpoint>']
debug = args['--verbose']
try:
from catcli.fuser import Fuser # pylint: disable=C0415
Fuser(mountpoint, top, noder,
debug=debug)
except ModuleNotFoundError:
Logger.err('install fusepy to use mount')
return False
return True
def cmd_index(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: NodeTop) -> None:
"""index action"""
path = args['<path>']
name = args['<name>']
nohash = not args['--hash']
subsize = not args['--no-subsize']
usehash = args['--hash']
debug = args['--verbose']
if not os.path.exists(path):
Logger.err('\"{}\" does not exist'.format(path))
Logger.err(f'\"{path}\" does not exist')
return
if name in noder.get_storage_names(top):
try:
if not ask('Overwrite storage \"{}\"'.format(name)):
Logger.err('storage named \"{}\" already exist'.format(name))
if not ask(f'Overwrite storage \"{name}\"'):
Logger.err(f'storage named \"{name}\" already exist')
return
except KeyboardInterrupt:
Logger.err('aborted')
return
node = noder.get_storage_node(top, name)
node.parent = None
node = top.get_storage_node()
if node:
node.parent = None
start = datetime.datetime.now()
walker = Walker(noder, nohash=nohash, debug=debug)
attr = noder.format_storage_attr(args['--meta'])
root = noder.storage_node(name, path, parent=top, attr=attr)
if debug:
Logger.debug('debug mode enabled')
walker = Walker(noder, usehash=usehash, debug=debug)
attr = args['--meta']
root = noder.new_storage_node(name, path, top, attr)
_, cnt = walker.index(path, root, name)
if subsize:
noder.rec_size(root)
root.nodesize = root.get_rec_size()
stop = datetime.datetime.now()
Logger.info('Indexed {} file(s) in {}'.format(cnt, stop - start))
diff = stop - start
Logger.info(f'Indexed {cnt} file(s) in {diff}')
if cnt > 0:
catalog.save(top)
def cmd_update(args, noder, catalog, top, debug=False):
def cmd_update(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: NodeTop) -> None:
"""update action"""
path = args['<path>']
name = args['<name>']
nohash = not args['--hash']
subsize = not args['--no-subsize']
usehash = args['--hash']
logpath = args['--lpath']
debug = args['--verbose']
if not os.path.exists(path):
Logger.err('\"{}\" does not exist'.format(path))
Logger.err(f'\"{path}\" does not exist')
return
root = noder.get_storage_node(top, name)
if not root:
Logger.err('storage named \"{}\" does not exist'.format(name))
storage = noder.find_storage_node_by_name(top, name)
if not storage:
Logger.err(f'storage named \"{name}\" does not exist')
return
noder.update_storage_path(top, name, path)
start = datetime.datetime.now()
walker = Walker(noder, nohash=nohash, debug=debug)
cnt = walker.reindex(path, root, top)
if subsize:
noder.rec_size(root)
walker = Walker(noder, usehash=usehash, debug=debug,
logpath=logpath)
cnt = walker.reindex(path, storage, top)
storage.nodesize = storage.get_rec_size()
stop = datetime.datetime.now()
Logger.info('updated {} file(s) in {}'.format(cnt, stop - start))
diff = stop - start
Logger.info(f'updated {cnt} file(s) in {diff}')
if cnt > 0:
catalog.save(top)
def cmd_ls(args, noder, top):
path = args['<path>']
if not path:
path = SEPARATOR
if not path.startswith(SEPARATOR):
path = SEPARATOR + path
pre = '{}{}'.format(SEPARATOR, noder.TOPNAME)
if not path.startswith(pre):
path = pre + path
if not path.endswith(SEPARATOR):
path += SEPARATOR
if not path.endswith(WILD):
path += WILD
found = noder.walk(top, path, rec=args['--recursive'])
def cmd_du(args: Dict[str, Any],
noder: Noder,
top: NodeTop) -> List[NodeAny]:
"""du action"""
path = path_to_search_all(args['<path>'])
found = noder.diskusage(top,
path,
raw=args['--raw-size'])
if not found:
path = args['<path>']
Logger.err(f'\"{path}\": nothing found')
return found
def cmd_ls(args: Dict[str, Any],
noder: Noder,
top: NodeTop) -> List[NodeAny]:
"""ls action"""
path = path_to_search_all(args['<path>'])
fmt = args['--format']
if fmt.startswith('fzf'):
raise BadFormatException('fzf is not supported in ls, use find')
found = noder.list(top,
path,
fmt=fmt,
rec=args['--recursive'],
raw=args['--raw-size'])
if not found:
Logger.err('\"{}\": nothing found'.format(args['<path>']))
path = args['<path>']
Logger.err(f'\"{path}\": nothing found')
return found
def cmd_rm(args, noder, catalog, top):
def cmd_rm(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: NodeTop) -> NodeTop:
"""rm action"""
name = args['<storage>']
node = noder.get_storage_node(top, name)
node = noder.find_storage_node_by_name(top, name)
if node:
node.parent = None
if catalog.save(top):
Logger.info('Storage \"{}\" removed'.format(name))
Logger.info(f'Storage \"{name}\" removed')
else:
Logger.err('Storage named \"{}\" does not exist'.format(name))
Logger.err(f'Storage named \"{name}\" does not exist')
return top
def cmd_find(args, noder, top):
fromtree = args['--parent']
def cmd_find(args: Dict[str, Any],
noder: Noder,
top: NodeTop) -> List[NodeAny]:
"""find action"""
directory = args['--directory']
startpath = args['--path']
return noder.find_name(top, args['<term>'], script=args['--script'],
startpath=startpath, directory=directory,
parentfromtree=fromtree)
def cmd_tree(args, noder, top):
path = args['<path>']
node = top
if path:
node = noder.get_node(top, path)
if node:
noder.print_tree(node)
fmt = args['--format']
raw = args['--raw-size']
script = args['--script']
search_for = args['<term>']
if args['--verbose']:
Logger.debug(f'search for "{search_for}" under "{top.get_name()}"')
found = noder.find(top, search_for,
script=script,
startnode=startpath,
only_dir=directory,
fmt=fmt,
raw=raw)
return found
def cmd_graph(args, noder, top):
def cmd_graph(args: Dict[str, Any],
noder: Noder,
top: NodeTop) -> None:
"""graph action"""
path = args['<path>']
if not path:
path = GRAPHPATH
path = DEFAULT_GRAPHPATH
cmd = noder.to_dot(top, path)
Logger.info('create graph with \"{}\" (you need graphviz)'.format(cmd))
Logger.info(f'create graph with \"{cmd}\" (you need graphviz)')
def cmd_fixsizes(top: NodeTop,
noder: Noder,
catalog: Catalog) -> None:
"""
fix each node size by re-calculating
recursively their size
"""
noder.fixsizes(top)
catalog.save(top)
def cmd_rename(args, noder, catalog, top):
def cmd_rename(args: Dict[str, Any],
catalog: Catalog,
top: NodeTop) -> None:
"""rename action"""
storage = args['<storage>']
new = args['<name>']
storages = list(x.name for x in top.children)
storages = list(x.get_name() for x in top.children)
if storage in storages:
node = next(filter(lambda x: x.name == storage, top.children))
node.name = new
node = next(filter(lambda x: x.get_name() == storage, top.children))
node.set_name(new)
if catalog.save(top):
m = 'Storage \"{}\" renamed to \"{}\"'.format(storage, new)
Logger.info(m)
msg = f'Storage \"{storage}\" renamed to \"{new}\"'
Logger.info(msg)
else:
Logger.err('Storage named \"{}\" does not exist'.format(storage))
return top
Logger.err(f'Storage named \"{storage}\" does not exist')
def cmd_edit(args, noder, catalog, top):
def cmd_edit(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: NodeTop) -> None:
"""edit action"""
storage = args['<storage>']
storages = list(x.name for x in top.children)
storages = list(x.get_name() for x in top.children)
if storage in storages:
node = next(filter(lambda x: x.name == storage, top.children))
node = next(filter(lambda x: x.get_name() == storage, top.children))
attr = node.attr
if not attr:
attr = ''
new = edit(attr)
node.attr = noder.clean_storage_attr(new)
node.attr = noder.attrs_to_string(new)
if catalog.save(top):
Logger.info('Storage \"{}\" edited'.format(storage))
Logger.info(f'Storage \"{storage}\" edited')
else:
Logger.err('Storage named \"{}\" does not exist'.format(storage))
return top
Logger.err(f'Storage named \"{storage}\" does not exist')
class CatcliRepl(cmd2.Cmd): # type: ignore
"""catcli repl"""
prompt = 'catcli> '
intro = ''
def __init__(self) -> None:
super().__init__()
# remove built-ins
del cmd2.Cmd.do_alias
del cmd2.Cmd.do_edit
del cmd2.Cmd.do_macro
del cmd2.Cmd.do_run_pyscript
del cmd2.Cmd.do_run_script
del cmd2.Cmd.do_set
del cmd2.Cmd.do_shell
del cmd2.Cmd.do_shortcuts
self.hidden_commands.append('EOF')
def cmdloop(self, intro: Any = None) -> Any:
return cmd2.Cmd.cmdloop(self, intro)
@cmd2.with_argument_list # type: ignore
def do_ls(self, arglist: List[str]) -> bool:
"""ls <path>"""
arglist.insert(0, '--no-banner')
arglist.insert(0, 'ls')
args, noder, _, _, top = init(arglist)
cmd_ls(args, noder, top)
return False
@cmd2.with_argument_list # type: ignore
def do_tree(self, arglist: List[str]) -> bool:
"""tree <path>"""
arglist.insert(0, '--no-banner')
arglist.insert(0, 'tree')
args, noder, _, _, top = init(arglist)
cmd_ls(args, noder, top)
return False
@cmd2.with_argument_list # type: ignore
def do_find(self, arglist: List[str]) -> bool:
"""find <term>"""
arglist.insert(0, '--no-banner')
arglist.insert(0, 'find')
args, noder, _, _, top = init(arglist)
cmd_find(args, noder, top)
return False
@cmd2.with_argument_list # type: ignore
def do_du(self, arglist: List[str]) -> bool:
"""du <path>"""
arglist.insert(0, '--no-banner')
arglist.insert(0, 'du')
args, noder, _, _, top = init(arglist)
cmd_du(args, noder, top)
return False
def do_help(self, _: Any) -> bool:
"""help"""
print(USAGE)
return False
# pylint: disable=C0103
def do_EOF(self, _: Any) -> bool:
"""exit repl"""
return True
def banner():
Logger.log(BANNER)
Logger.log("")
def banner() -> None:
"""print banner"""
Logger.stderr_nocolor(BANNER)
Logger.stderr_nocolor("")
def main():
args = docopt(USAGE, version=VERSION)
def print_supported_formats() -> None:
"""print all supported formats to stdout"""
print('"native" : native format')
print('"csv" : CSV format')
print(f' {CsvPrinter.CSV_HEADER}')
print('"fzf-native" : fzf to native (only valid for find)')
print('"fzf-csv" : fzf to csv (only valid for find)')
if args['help']:
def init(argv: List[str]) -> Tuple[Dict[str, Any],
Noder,
Catalog,
str,
NodeTop]:
"""parse catcli arguments"""
args = docopt(USAGE, argv=argv, version=VERSION)
if args['help'] or args['--help']:
print(USAGE)
return True
sys.exit(0)
if args['--verbose']:
print(args)
if args['print_supported_formats']:
print_supported_formats()
sys.exit(0)
fmt = args['--format']
if fmt not in FORMATS:
Logger.err(f'bad format: {fmt}')
print_supported_formats()
sys.exit(0)
if args['--verbose'] or DEFAULT_VERBOSEMODE:
print('verbose mode enabled')
print(f'args: {args}')
# print banner
banner()
if not args['--no-banner'] and DEFAULT_NOBANNER:
banner()
# set colors
if args['--no-color']:
Colors.no_color()
# init noder
noder = Noder(verbose=args['--verbose'], sortsize=args['--sortsize'],
noder = Noder(debug=args['--verbose'], sortsize=args['--sortsize'],
arc=args['--archive'])
# init catalog
catalog = Catalog(args['--catalog'], verbose=args['--verbose'],
catalog_path = args['--catalog']
catalog = Catalog(catalog_path, debug=args['--verbose'],
force=args['--force'])
# init top node
top = catalog.restore()
@ -241,34 +442,88 @@ def main():
top = noder.new_top_node()
# handle the meta node
meta = noder.update_metanode(noder.get_meta_node(top))
meta = noder.update_metanode(top)
catalog.set_metanode(meta)
return args, noder, catalog, catalog_path, top
def main() -> bool:
"""entry point"""
args, noder, catalog, catalog_path, top = init(sys.argv[1:])
# parse command
if args['index']:
cmd_index(args, noder, catalog, top, debug=args['--verbose'])
if args['update']:
cmd_update(args, noder, catalog, top, debug=args['--verbose'])
elif args['find']:
cmd_find(args, noder, top)
elif args['tree']:
cmd_tree(args, noder, top)
elif args['ls']:
cmd_ls(args, noder, top)
elif args['rm']:
cmd_rm(args, noder, catalog, top)
elif args['graph']:
cmd_graph(args, noder, top)
elif args['rename']:
cmd_rename(args, noder, catalog, top)
elif args['edit']:
cmd_edit(args, noder, catalog, top)
try:
if args['index']:
cmd_index(args, noder, catalog, top)
elif args['update']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_update(args, noder, catalog, top)
cmd_fixsizes(top, noder, catalog)
elif args['find']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_find(args, noder, top)
elif args['ls']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_ls(args, noder, top)
elif args['tree']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
args['--recursive'] = True
cmd_ls(args, noder, top)
elif args['mount']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
if not cmd_mount(args, top, noder):
return False
elif args['rm']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_rm(args, noder, catalog, top)
elif args['graph']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_graph(args, noder, top)
elif args['rename']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_rename(args, catalog, top)
elif args['edit']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_edit(args, noder, catalog, top)
elif args['du']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_du(args, noder, top)
elif args['fixsizes']:
if not catalog.exists():
Logger.err(f'no such catalog: {catalog_path}')
return False
cmd_fixsizes(top, noder, catalog)
else:
CatcliRepl().cmdloop()
except CatcliException as exc:
Logger.stderr_nocolor('ERROR ' + str(exc))
return False
return True
if __name__ == '__main__':
'''entry point'''
if main():
sys.exit(0)
sys.exit(1)

@ -0,0 +1,44 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
shell colors
"""
from typing import TypeVar, Type
CLASSTYPE = TypeVar('CLASSTYPE', bound='Colors')
class Colors:
"""shell colors"""
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
PURPLE = '\033[1;35m'
BLUE = '\033[94m'
GRAY = '\033[0;37m'
CYAN = '\033[36m'
MAGENTA = '\033[95m'
WHITE = '\033[97m'
RESET = '\033[0m'
EMPH = '\033[33m'
BOLD = '\033[1m'
UND = '\033[4m'
@classmethod
def no_color(cls: Type[CLASSTYPE]) -> None:
"""disable colors"""
Colors.RED = ''
Colors.GREEN = ''
Colors.YELLOW = ''
Colors.PURPLE = ''
Colors.BLUE = ''
Colors.GRAY = ''
Colors.MAGENTA = ''
Colors.RESET = ''
Colors.EMPH = ''
Colors.BOLD = ''
Colors.UND = ''

@ -8,11 +8,13 @@ Catcli generic compressed data lister
import os
import tarfile
import zipfile
from typing import List
class Decomp:
"""decompressor"""
def __init__(self):
def __init__(self) -> None:
self.ext = {
'tar': self._tar,
'tgz': self._tar,
@ -27,27 +29,29 @@ class Decomp:
'tar.bz2': self._tar,
'zip': self._zip}
def get_format(self):
'''return list of supported extensions'''
def get_formats(self) -> List[str]:
"""return list of supported extensions"""
return list(self.ext.keys())
def get_names(self, path):
'''get tree of compressed archive'''
ext = os.path.splitext(path)[1][1:]
if ext in list(self.ext.keys()):
def get_names(self, path: str) -> List[str]:
"""get tree of compressed archive"""
ext = os.path.splitext(path)[1][1:].lower()
if ext in list(self.ext):
return self.ext[ext](path)
return None
return []
def _tar(self, path):
'''return list of file names in tar'''
@staticmethod
def _tar(path: str) -> List[str]:
"""return list of file names in tar"""
if not tarfile.is_tarfile(path):
return None
tar = tarfile.open(path, "r")
return tar.getnames()
return []
with tarfile.open(path, "r") as tar:
return tar.getnames()
def _zip(self, path):
'''return list of file names in zip'''
@staticmethod
def _zip(path: str) -> List[str]:
"""return list of file names in zip"""
if not zipfile.is_zipfile(path):
return None
z = zipfile.ZipFile(path)
return z.namelist()
return []
with zipfile.ZipFile(path) as file:
return file.namelist()

@ -0,0 +1,14 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
Catcli exceptions
"""
class CatcliException(Exception):
"""generic catcli exception"""
class BadFormatException(CatcliException):
"""use of bad format"""

@ -0,0 +1,133 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2023, deadc0de6
fuse for catcli
"""
import os
from time import time
from stat import S_IFDIR, S_IFREG
from typing import List, Dict, Any, Optional
try:
import fuse
except ModuleNotFoundError:
pass
# local imports
from catcli.noder import Noder
from catcli.nodes import NodeTop, NodeAny
from catcli.nodes_utils import path_to_search_all, path_to_top
from catcli import nodes
class Fuser:
"""fuse filesystem mounter"""
def __init__(self, mountpoint: str,
top: NodeTop,
noder: Noder,
debug: bool = False):
"""fuse filesystem"""
filesystem = CatcliFilesystem(top, noder)
fuse.FUSE(filesystem,
mountpoint,
foreground=debug,
nothreads=True,
debug=debug)
class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
"""in-memory filesystem for catcli catalog"""
def __init__(self, top: NodeTop,
noder: Noder):
"""init fuse filesystem"""
self.top = top
self.noder = noder
def _get_entry(self, path: str) -> Optional[NodeAny]:
"""return the node pointed by path"""
path = path_to_top(path)
found = self.noder.list(self.top, path,
rec=False,
fmt='native',
raw=True)
if found:
return found[0]
return None
def _get_entries(self, path: str) -> List[NodeAny]:
"""return nodes pointed by path"""
path = path_to_search_all(path)
found = self.noder.list(self.top, path,
rec=False,
fmt='native',
raw=True)
return found
def _getattr(self, path: str) -> Dict[str, Any]:
entry = self._get_entry(path)
if not entry:
return {}
maccess = time()
mode: Any = S_IFREG
nodesize: int = 0
if entry.type == nodes.TYPE_ARCHIVED:
mode = S_IFREG
nodesize = entry.nodesize
elif entry.type == nodes.TYPE_DIR:
mode = S_IFDIR
nodesize = entry.nodesize
maccess = entry.maccess
elif entry.type == nodes.TYPE_FILE:
mode = S_IFREG
nodesize = entry.nodesize
maccess = entry.maccess
elif entry.type == nodes.TYPE_STORAGE:
mode = S_IFDIR
nodesize = entry.nodesize
maccess = entry.ts
elif entry.type == nodes.TYPE_META:
mode = S_IFREG
elif entry.type == nodes.TYPE_TOP:
mode = S_IFREG
mode = mode | 0o777
return {
'st_mode': (mode), # file type
'st_nlink': 1, # count hard link
'st_size': nodesize,
'st_ctime': maccess, # attr last modified
'st_mtime': maccess, # content last modified
'st_atime': maccess, # access time
'st_uid': os.getuid(),
'st_gid': os.getgid(),
}
def getattr(self, path: str, _fh: Any = None) -> Dict[str, Any]:
"""return attr of file pointed by path"""
if path == os.path.sep:
# mountpoint
curt = time()
meta = {
'st_mode': (S_IFDIR | 0o777),
'st_nlink': 1,
'st_size': 0,
'st_ctime': curt,
'st_mtime': curt,
'st_atime': curt,
'st_uid': os.getuid(),
'st_gid': os.getgid(),
}
return meta
meta = self._getattr(path)
return meta
def readdir(self, path: str, _fh: Any) -> List[str]:
"""read directory content"""
content = ['.', '..']
entries = self._get_entries(path)
for entry in entries:
content.append(entry.get_name())
return content

@ -6,93 +6,66 @@ Logging helper
"""
import sys
from typing import TypeVar, Type
# local imports
from catcli.colors import Colors
from catcli.utils import fix_badchars
class Logger:
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
PURPLE = '\033[1;35m'
BLUE = '\033[94m'
GRAY = '\033[0;37m'
MAGENTA = '\033[95m'
RESET = '\033[0m'
EMPH = '\033[33m'
BOLD = '\033[1m'
UND = '\033[4m'
STORAGE = 'storage'
ARCHIVE = 'archive'
NBFILES = 'nbfiles'
def __init__(self):
pass
######################################################################
# node specific output
######################################################################
def storage(pre, name, args, attr):
'''print a storage node'''
end = ''
if attr:
end = ' {}({}){}'.format(Logger.GRAY, attr, Logger.RESET)
s = '{}{}{}{}:'.format(pre, Logger.UND, Logger.STORAGE, Logger.RESET)
s += ' {}{}{}{}'.format(Logger.PURPLE, name, Logger.RESET, end)
s += ' {}{}{}'.format(Logger.GRAY, args, Logger.RESET)
sys.stdout.write('{}\n'.format(s))
def file(pre, name, attr):
'''print a file node'''
s = '{}{}'.format(pre, name)
s += ' {}[{}]{}'.format(Logger.GRAY, attr, Logger.RESET)
sys.stdout.write('{}\n'.format(s))
def dir(pre, name, depth='', attr=None):
'''print a directory node'''
end = []
if depth != '':
end.append('{}:{}'.format(Logger.NBFILES, depth))
if attr:
end.append(' '.join(['{}:{}'.format(x, y) for x, y in attr]))
if end:
end = ' [{}]'.format(', '.join(end))
s = '{}{}{}{}'.format(pre, Logger.BLUE, name, Logger.RESET)
s += '{}{}{}'.format(Logger.GRAY, end, Logger.RESET)
sys.stdout.write('{}\n'.format(s))
CLASSTYPE = TypeVar('CLASSTYPE', bound='Logger')
def arc(pre, name, archive):
s = '{}{}{}{}'.format(pre, Logger.YELLOW, name, Logger.RESET)
s += ' {}[{}:{}]{}'.format(Logger.GRAY, Logger.ARCHIVE,
archive, Logger.RESET)
sys.stdout.write('{}\n'.format(s))
######################################################################
# generic output
######################################################################
def out(string):
'''to stdout'''
sys.stdout.write('{}\n'.format(string))
def log(string):
'''to stderr'''
sys.stderr.write('{}\n'.format(string))
def info(string):
'''to stderr in color'''
s = '{}{}{}'.format(Logger.MAGENTA, string, Logger.RESET)
sys.stderr.write('{}\n'.format(s))
def err(string):
'''to stderr in RED'''
s = '{}{}{}'.format(Logger.RED, string, Logger.RESET)
sys.stderr.write('{}\n'.format(s))
def progr(string):
'''print progress'''
sys.stderr.write('{}\r'.format(string))
class Logger:
"""log to stdout/stderr"""
@classmethod
def stdout_nocolor(cls: Type[CLASSTYPE],
string: str) -> None:
"""to stdout no color"""
string = fix_badchars(string)
sys.stdout.write(f'{string}\n')
@classmethod
def stderr_nocolor(cls: Type[CLASSTYPE],
string: str) -> None:
"""to stderr no color"""
string = fix_badchars(string)
sys.stderr.write(f'{string}\n')
@classmethod
def debug(cls: Type[CLASSTYPE],
string: str) -> None:
"""to stderr no color"""
cls.stderr_nocolor(f'[DBG] {string}')
@classmethod
def info(cls: Type[CLASSTYPE],
string: str) -> None:
"""to stdout in color"""
string = fix_badchars(string)
out = f'{Colors.MAGENTA}{string}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def err(cls: Type[CLASSTYPE],
string: str) -> None:
"""to stderr in RED"""
string = fix_badchars(string)
out = f'{Colors.RED}{string}{Colors.RESET}'
sys.stderr.write(f'{out}\n')
@classmethod
def progr(cls: Type[CLASSTYPE],
string: str) -> None:
"""print progress"""
string = fix_badchars(string)
sys.stderr.write(f'{string}\r')
sys.stderr.flush()
def bold(string):
'''make it bold'''
return '{}{}{}'.format(Logger.BOLD, string, Logger.RESET)
@classmethod
def get_bold_text(cls: Type[CLASSTYPE],
string: str) -> str:
"""make it bold"""
string = fix_badchars(string)
return f'{Colors.BOLD}{string}{Colors.RESET}'

File diff suppressed because it is too large Load Diff

@ -0,0 +1,339 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2023, deadc0de6
Class that represents a node in the catalog tree
"""
# pylint: disable=W0622
import os
from typing import Dict, Any, cast
from anytree import NodeMixin
from catcli.exceptions import CatcliException
from catcli.utils import fix_badchars
TYPE_TOP = 'top'
TYPE_FILE = 'file'
TYPE_DIR = 'dir'
TYPE_ARCHIVED = 'arc'
TYPE_STORAGE = 'storage'
TYPE_META = 'meta'
NAME_TOP = 'top'
NAME_META = 'meta'
def typcast_node(node: Any) -> None:
"""typecast node to its sub type"""
if node.type == TYPE_TOP:
node.__class__ = NodeTop
elif node.type == TYPE_FILE:
node.__class__ = NodeFile
elif node.type == TYPE_DIR:
node.__class__ = NodeDir
elif node.type == TYPE_ARCHIVED:
node.__class__ = NodeArchived
elif node.type == TYPE_STORAGE:
node.__class__ = NodeStorage
elif node.type == TYPE_META:
node.__class__ = NodeMeta
else:
raise CatcliException(f"bad node: {node}")
class NodeAny(NodeMixin): # type: ignore
"""generic node"""
def __init__(self, # type: ignore[no-untyped-def]
name=None,
size=0,
parent=None,
children=None):
"""build generic node"""
super().__init__()
self.name = name
self.nodesize = size
self.parent = parent
if children:
self.children = children
def get_name(self) -> str:
"""get node name"""
return fix_badchars(self.name)
def set_name(self, name: str) -> None:
"""set node name"""
self.name = fix_badchars(name)
def has_attr(self, attr: str) -> bool:
"""return True if node has attr as attribute"""
return attr in self.__dict__
def may_have_children(self) -> bool:
"""can node contains sub"""
raise NotImplementedError
def _to_str(self) -> str:
ret = str(self.__class__) + ": " + str(self.__dict__)
if self.children:
ret += '\n'
for child in self.children:
ret += f' child => {child}\n'
return ret
def __str__(self) -> str:
return self._to_str()
def get_fullpath(self) -> str:
"""return full path to this node"""
path = self.get_name()
if self.parent:
typcast_node(self.parent)
ppath = self.parent.get_fullpath()
path = os.path.join(ppath, path)
return fix_badchars(path)
def get_rec_size(self) -> int:
"""recursively traverse tree and return size"""
totsize: int = self.nodesize
for node in self.children:
typcast_node(node)
totsize += node.get_rec_size()
return totsize
def get_storage_node(self) -> NodeMixin:
"""recursively traverse up to find storage"""
return None
def flagged(self) -> bool:
"""is flagged"""
if not hasattr(self, '_flagged'):
return False
return self._flagged
def flag(self) -> None:
"""flag a node"""
self._flagged = True # pylint: disable=W0201
def unflag(self) -> None:
"""unflag node"""
self._flagged = False # pylint: disable=W0201
delattr(self, '_flagged')
class NodeTop(NodeAny):
"""a top node"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
children=None):
"""build a top node"""
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_TOP
self.parent = None
if children:
self.children = children
def get_fullpath(self) -> str:
"""return full path to this node"""
return ''
def may_have_children(self) -> bool:
"""can node contains sub"""
return True
def get_rec_size(self) -> int:
"""
recursively traverse tree and return size
also ensure to update the size on the way
"""
size = super().get_rec_size()
self.nodesize = size
return size
def __str__(self) -> str:
return self._to_str()
class NodeFile(NodeAny):
"""a file node"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
nodesize: int,
md5: str,
maccess: float,
parent=None,
children=None):
"""build a file node"""
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_FILE
self.nodesize = nodesize
self.md5 = md5
self.maccess = maccess
self.parent = parent
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return False
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return cast(NodeStorage, self.ancestors[1])
def __str__(self) -> str:
return self._to_str()
class NodeDir(NodeAny):
"""a directory node"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
nodesize: int,
maccess: float,
parent=None,
children=None):
"""build a directory node"""
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_DIR
self.nodesize = nodesize
self.maccess = maccess
self.parent = parent
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return True
def get_rec_size(self) -> int:
"""
recursively traverse tree and return size
also ensure to update the size on the way
"""
size = super().get_rec_size()
self.nodesize = size
return size
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return cast(NodeStorage, self.ancestors[1])
def __str__(self) -> str:
return self._to_str()
class NodeArchived(NodeAny):
"""an archived node"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
nodesize: int,
md5: str,
archive: str,
parent=None,
children=None):
"""build an archived node"""
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_ARCHIVED
self.nodesize = nodesize
self.md5 = md5
self.archive = archive
self.parent = parent
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return False
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return cast(NodeStorage, self.ancestors[1])
def __str__(self) -> str:
return self._to_str()
class NodeStorage(NodeAny):
"""a storage node"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
free: int,
total: int,
nodesize: int,
ts: float,
attr: str,
parent=None,
children=None):
"""build a storage node"""
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_STORAGE
self.free = free
self.total = total
self.attr = attr
self.nodesize = nodesize
self.ts = ts # pylint: disable=C0103
self.parent = parent
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return True
def get_rec_size(self) -> int:
"""
recursively traverse tree and return size
also ensure to update the size on the way
"""
size = super().get_rec_size()
self.nodesize = size
return size
def get_storage_node(self) -> NodeAny:
"""recursively traverse up to find storage"""
return self
def __str__(self) -> str:
return self._to_str()
class NodeMeta(NodeAny):
"""a meta node"""
def __init__(self, # type: ignore[no-untyped-def]
name: str,
attr: Dict[str, Any],
parent=None,
children=None):
"""build a meta node"""
super().__init__() # type: ignore[no-untyped-call]
self.name = name
self.type = TYPE_META
self.attr = attr
self.parent = parent
if children:
self.children = children
def may_have_children(self) -> bool:
"""can node contains sub"""
return False
def get_rec_size(self) -> int:
"""recursively traverse tree and return size"""
return 0
def __str__(self) -> str:
return self._to_str()

@ -0,0 +1,39 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2024, deadc0de6
nodes helpers
"""
import os
# local imports
from catcli import nodes
def path_to_top(path: str) -> str:
"""path pivot under top"""
pre = f"{os.path.sep}{nodes.NAME_TOP}"
if not path.startswith(pre):
# prepend with top node path
path = pre + path
return path
def path_to_search_all(path: str) -> str:
"""path to search for all subs"""
if not path:
path = os.path.sep
if not path.startswith(os.path.sep):
path = os.path.sep + path
pre = f"{os.path.sep}{nodes.NAME_TOP}"
if not path.startswith(pre):
# prepend with top node path
path = pre + path
# if not path.endswith(os.path.sep):
# # ensure ends with a separator
# path += os.path.sep
# if not path.endswith(WILD):
# # add wild card
# path += WILD
return path

@ -0,0 +1,81 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2024, deadc0de6
Class for printing nodes in csv format
"""
import sys
from typing import List
from catcli.nodes import NodeAny, NodeStorage, TYPE_DIR
from catcli.utils import size_to_str, epoch_to_str
class CsvPrinter:
"""a node printer class"""
DEFSEP = ','
CSV_HEADER = ('name,type,path,size,indexed_at,'
'maccess,md5,nbfiles,free_space,'
'total_space,meta')
def _print_entries(self, entries: List[str], sep: str = DEFSEP) -> None:
line = sep.join(['"' + o + '"' for o in entries])
if len(line) > 0:
sys.stdout.write(f'{line}\n')
def print_header(self) -> None:
"""print csv header"""
sys.stdout.write(f'{self.CSV_HEADER}\n')
def print_storage(self, node: NodeStorage,
sep: str = DEFSEP,
raw: bool = False) -> None:
"""print a storage node"""
out = []
out.append(node.get_name()) # name
out.append(node.type) # type
out.append('') # fake full path
size = node.get_rec_size()
out.append(size_to_str(size, raw=raw)) # size
out.append(epoch_to_str(node.ts)) # indexed_at
out.append('') # fake maccess
out.append('') # fake md5
out.append(str(len(node.children))) # nbfiles
# fake free_space
out.append(size_to_str(node.free, raw=raw))
# fake total_space
out.append(size_to_str(node.total, raw=raw))
out.append(node.attr) # meta
self._print_entries(out, sep=sep)
def print_node(self, node: NodeAny,
sep: str = DEFSEP,
raw: bool = False) -> None:
"""print other nodes"""
out = []
out.append(node.get_name().replace('"', '""')) # name
out.append(node.type) # type
fullpath = node.get_fullpath()
out.append(fullpath.replace('"', '""')) # full path
out.append(size_to_str(node.nodesize, raw=raw)) # size
storage = node.get_storage_node()
out.append(epoch_to_str(storage.ts)) # indexed_at
if node.has_attr('maccess'):
out.append(epoch_to_str(node.maccess)) # maccess
else:
out.append('') # fake maccess
if node.has_attr('md5'):
out.append(node.md5) # md5
else:
out.append('') # fake md5
if node.type == TYPE_DIR:
out.append(str(len(node.children))) # nbfiles
else:
out.append('') # fake nbfiles
out.append('') # fake free_space
out.append('') # fake total_space
out.append('') # fake meta
self._print_entries(out, sep=sep)

@ -0,0 +1,165 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
Class for printing nodes in native format
"""
import sys
from catcli.nodes import NodeFile, NodeDir, \
NodeStorage, NodeAny, typcast_node
from catcli.colors import Colors
from catcli.logger import Logger
from catcli.utils import fix_badchars, size_to_str, \
epoch_to_str
COLOR_STORAGE = Colors.YELLOW
COLOR_FILE = Colors.WHITE
COLOR_DIRECTORY = Colors.BLUE
COLOR_ARCHIVE = Colors.PURPLE
COLOR_TS = Colors.CYAN
COLOR_SIZE = Colors.GREEN
FULLPATH_IN_NAME = True
class NativePrinter:
"""a node printer class"""
STORAGE = 'storage'
ARCHIVE = 'archive'
NBFILES = 'nbfiles'
def print_du(self, node: NodeAny,
raw: bool = False) -> None:
"""print du style"""
typcast_node(node)
name = node.get_fullpath()
size = node.nodesize
line = size_to_str(size, raw=raw).ljust(10, ' ')
out = f'{COLOR_SIZE}{line}{Colors.RESET}'
out += ' '
out += f'{COLOR_FILE}{name}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
def print_top(self, pre: str, name: str) -> None:
"""print top node"""
sys.stdout.write(f'{pre}{name}\n')
def print_storage(self, pre: str,
node: NodeStorage,
raw: bool = False) -> None:
"""print a storage node"""
# construct name
name = node.get_name()
# construct attrs
attrs = []
# nb files
attrs.append(f'nbfiles:{len(node.children)}')
# the children size
recsize = node.get_rec_size()
sizestr = size_to_str(recsize, raw=raw)
attrs.append(f'totsize:{sizestr}')
# free
pcent = 0.0
if node.total > 0:
pcent = node.free * 100 / node.total
attrs.append(f'free:{pcent:.1f}%')
# du
sztotal = size_to_str(node.total, raw=raw)
szused = size_to_str(node.total - node.free, raw=raw)
attrs.append(f'du:{szused}/{sztotal}')
# timestamp
if node.has_attr('ts'):
attrs.append(f'date:{epoch_to_str(node.ts)}')
# print
out = f'{pre}{Colors.UND}{self.STORAGE}{Colors.RESET}: '
out += f'{COLOR_STORAGE}{name}{Colors.RESET}'
if attrs:
out += f' [{Colors.WHITE}{"|".join(attrs)}{Colors.RESET}]'
sys.stdout.write(f'{out}\n')
def print_file(self, pre: str,
node: NodeFile,
withpath: bool = False,
withstorage: bool = False,
raw: bool = False) -> None:
"""print a file node"""
# construct name
name = node.get_name()
storage = node.get_storage_node()
if withpath:
name = node.get_fullpath()
# construct attributes
attrs = []
if node.md5:
attrs.append(f'md5:{node.md5}')
if withstorage:
content = Logger.get_bold_text(storage.get_name())
attrs.append(f'storage:{content}')
# print
out = []
out.append(f'{pre}')
out.append(f'{COLOR_FILE}{name}{Colors.RESET}')
size = 0
if node.nodesize:
size = node.nodesize
line = size_to_str(size, raw=raw)
out.append(f'{COLOR_SIZE}{line}{Colors.RESET}')
if node.has_attr('maccess'):
line = epoch_to_str(node.maccess)
out.append(f'{COLOR_TS}{line}{Colors.RESET}')
if attrs:
out.append(f'{Colors.GRAY}[{",".join(attrs)}]{Colors.RESET}')
out = [x for x in out if x]
sys.stdout.write(f'{" ".join(out)}\n')
def print_dir(self, pre: str,
node: NodeDir,
withpath: bool = False,
withstorage: bool = False,
withnbchildren: bool = False,
raw: bool = False) -> None:
"""print a directory node"""
# construct name
name = node.get_name()
storage = node.get_storage_node()
if withpath:
name = node.get_fullpath()
# construct attrs
attrs = []
if withnbchildren:
nbchildren = len(node.children)
attrs.append(f'{self.NBFILES}:{nbchildren}')
if withstorage:
attrs.append(f"storage:{Logger.get_bold_text(storage.get_name())}")
# print
out = []
out.append(f'{pre}')
out.append(f'{COLOR_DIRECTORY}{name}{Colors.RESET}')
size = 0
if node.nodesize:
size = node.nodesize
line = size_to_str(size, raw=raw)
out.append(f'{COLOR_SIZE}{line}{Colors.RESET}')
if node.has_attr('maccess'):
line = epoch_to_str(node.maccess)
out.append(f'{COLOR_TS}{line}{Colors.RESET}')
if attrs:
out.append(f'{Colors.GRAY}[{",".join(attrs)}]{Colors.RESET}')
out = [x for x in out if x]
sys.stdout.write(f'{" ".join(out)}\n')
def print_archive(self, pre: str,
name: str, archive: str) -> None:
"""print an archive"""
name = fix_badchars(name)
out = f'{pre}{COLOR_ARCHIVE}{name}{Colors.RESET} '
out += f'{Colors.GRAY}[{self.ARCHIVE}:{archive}]{Colors.RESET}'
sys.stdout.write(f'{out}\n')

@ -10,65 +10,83 @@ import hashlib
import tempfile
import subprocess
import datetime
import string
# local imports
from catcli.logger import Logger
from catcli.exceptions import CatcliException
def md5sum(path):
'''calculate md5 sum of a file'''
p = os.path.realpath(path)
if not os.path.exists(p):
Logger.err('\nunable to get md5sum on {}'.format(path))
return None
WILD = '*'
def md5sum(path: str) -> str:
"""
calculate md5 sum of a file
may raise exception
"""
rpath = os.path.realpath(path)
if not os.path.exists(rpath):
raise CatcliException(f'md5sum - file does not exist: {rpath}')
try:
with open(p, mode='rb') as f:
d = hashlib.md5()
with open(rpath, mode='rb') as file:
hashv = hashlib.md5()
while True:
buf = f.read(4096)
buf = file.read(4096)
if not buf:
break
d.update(buf)
return d.hexdigest()
hashv.update(buf)
return hashv.hexdigest()
except PermissionError:
pass
return None
except OSError as exc:
raise CatcliException(f'md5sum error: {exc}') from exc
return ''
def human(size):
'''human readable size'''
def size_to_str(size: float,
raw: bool = True) -> str:
"""convert size to string, optionally human readable"""
div = 1024.
suf = ['B', 'K', 'M', 'G', 'T', 'P']
if size < div:
return '{}'.format(size)
if raw or size < div:
return f'{size}'
for i in suf:
if size < div:
return '{:.1f}{}'.format(size, i)
return f'{size:.1f}{i}'
size = size / div
return '{:.1f}{}'.format(size, suf[-1])
sufix = suf[-1]
return f'{size:.1f}{sufix}'
def epoch_to_str(epoch):
'''convert epoch to string'''
def epoch_to_str(epoch: float) -> str:
"""convert epoch to string"""
if not epoch:
return ''
fmt = '%Y-%m-%d %H:%M:%S'
t = datetime.datetime.fromtimestamp(float(epoch))
return t.strftime(fmt)
timestamp = datetime.datetime.fromtimestamp(epoch)
return timestamp.strftime(fmt)
def ask(question):
'''ask the user what to do'''
resp = input('{} [y|N] ? '.format(question))
def ask(question: str) -> bool:
"""ask the user what to do"""
resp = input(f'{question} [y|N] ? ')
return resp.lower() == 'y'
def edit(string):
'''edit the information with the default EDITOR'''
string = string.encode('utf-8')
EDITOR = os.environ.get('EDITOR', 'vim')
with tempfile.NamedTemporaryFile(prefix='catcli', suffix='.tmp') as f:
f.write(string)
f.flush()
subprocess.call([EDITOR, f.name])
f.seek(0)
new = f.read()
def edit(data: str) -> str:
"""edit the information with the default EDITOR"""
content = fix_badchars(data)
editor = os.environ.get('EDITOR', 'vim')
with tempfile.NamedTemporaryFile(prefix='catcli', suffix='.tmp') as file:
file.write(content.encode('utf-8'))
file.flush()
subprocess.call([editor, file.get_name()])
file.seek(0)
new = file.read()
return new.decode('utf-8')
def fix_badchars(data: str) -> str:
"""fix none utf-8 chars in string"""
data = "".join(x for x in data if x in string.printable)
return data.encode("utf-8", "ignore").decode("utf-8")

@ -0,0 +1,6 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
"""
__version__ = '1.0'

@ -6,42 +6,83 @@ Catcli filesystem indexer
"""
import os
from typing import Tuple, Optional
# local imports
from catcli.noder import Noder
from catcli.logger import Logger
from catcli.nodes import NodeAny, NodeTop
class Walker:
"""a filesystem walker"""
MAXLINE = 80 - 15
MAXLINELEN = 80 - 15
def __init__(self, noder, nohash=False, debug=False):
def __init__(self, noder: Noder,
usehash: bool = True,
debug: bool = False,
logpath: str = ''):
"""
@noder: the noder to use
@hash: calculate hash of nodes
@debug: debug mode
@logpath: path where to log catalog changes on reindex
"""
self.noder = noder
self.noder.set_hashing(not nohash)
self.usehash = usehash
self.noder.do_hashing(self.usehash)
self.debug = debug
self.lpath = logpath
def index(self, path, parent, name, storagepath=''):
'''index a directory and store in tree'''
self._debug('indexing starting at {}'.format(path))
def index(self,
path: str,
parent: NodeAny,
name: str,
storagepath: str = '') -> Tuple[str, int]:
"""
index a directory and store in tree
@path: path to index
@parent: parent node
@name: this stoarge name
"""
self._debug(f'indexing starting at {path}')
if not parent:
parent = self.noder.dir_node(name, path, parent)
# create the parent
parent = self.noder.new_dir_node(name,
path,
parent)
if os.path.islink(path):
rel = os.readlink(path)
abspath = os.path.join(path, rel)
if os.path.isdir(abspath):
return parent, 0
cnt = 0
for (root, dirs, files) in os.walk(path):
for f in files:
self._debug('found file {} under {}'.format(f, path))
sub = os.path.join(root, f)
self._log(f)
self._debug('index file {}'.format(sub))
self.noder.file_node(os.path.basename(f), sub,
parent, storagepath)
cnt += 1
for d in dirs:
self._debug('found dir {} under {}'.format(d, path))
base = os.path.basename(d)
sub = os.path.join(root, d)
self._debug('index directory {}'.format(sub))
dummy = self.noder.dir_node(base, sub, parent, storagepath)
for file in files:
self._debug(f'found file {file} under {path}')
sub = os.path.join(root, file)
if not os.path.exists(sub):
continue
self._progress(file)
self._debug(f'index file {sub}')
node = self.noder.new_file_node(os.path.basename(file),
sub,
parent)
if node:
cnt += 1
for adir in dirs:
self._debug(f'found dir {adir} under {path}')
base = os.path.basename(adir)
sub = os.path.join(root, adir)
self._debug(f'index directory {sub}')
if not os.path.exists(sub):
continue
dummy = self.noder.new_dir_node(base, sub, parent)
if not dummy:
continue
cnt += 1
nstoragepath = os.sep.join([storagepath, base])
if not storagepath:
@ -49,86 +90,105 @@ class Walker:
_, cnt2 = self.index(sub, dummy, base, nstoragepath)
cnt += cnt2
break
self._log(None)
self._progress('')
return parent, cnt
def reindex(self, path, parent, top):
'''reindex a directory and store in tree'''
cnt = self._reindex(path, parent, top, '')
cnt += self.noder.clean_not_flagged(top)
def reindex(self, path: str, parent: NodeAny, top: NodeTop) -> int:
"""reindex a directory and store in tree"""
cnt = self._reindex(path, parent, top)
cnt += self.noder.clean_not_flagged(parent)
return cnt
def _reindex(self, path, parent, top, storagepath):
'''reindex a directory and store in tree'''
self._debug('reindexing starting at {}'.format(path))
def _reindex(self, path: str,
parent: NodeAny,
top: NodeTop,
storagepath: str = '') -> int:
"""
reindex a directory and store in tree
@path: directory path to re-index
@top: top node (storage)
@storagepath: rel path relative to indexed directory
"""
self._debug(f'reindexing starting at {path}')
cnt = 0
for (root, dirs, files) in os.walk(path):
for f in files:
self._debug('found file {} under {}'.format(f, path))
sub = os.path.join(root, f)
maccess = os.path.getmtime(sub)
reindex, n = self._need_reindex(parent, f, maccess)
for file in files:
self._debug(f'found file \"{file}\" under {path}')
sub = os.path.join(root, file)
treepath = os.path.join(storagepath, file)
reindex, node = self._need_reindex(parent, sub, treepath)
if not reindex:
self._debug('\tignore file {}'.format(sub))
self.noder.flag(n)
self._debug(f'\tskip file {sub}')
if node:
node.flag()
continue
self._debug('\tre-index file {}'.format(sub))
self._log(f)
n = self.noder.file_node(os.path.basename(f), sub,
parent, storagepath)
self.noder.flag(n)
cnt += 1
for d in dirs:
self._debug('found dir {} under {}'.format(d, path))
base = os.path.basename(d)
sub = os.path.join(root, d)
maccess = os.path.getmtime(sub)
reindex, dummy = self._need_reindex(parent, base, maccess)
node = self.noder.new_file_node(os.path.basename(file),
sub,
parent)
if node:
node.flag()
cnt += 1
for adir in dirs:
self._debug(f'found dir \"{adir}\" under {path}')
base = os.path.basename(adir)
sub = os.path.join(root, adir)
treepath = os.path.join(storagepath, adir)
reindex, dummy = self._need_reindex(parent, sub, treepath)
if reindex:
self._debug('\tre-index directory {}'.format(sub))
dummy = self.noder.dir_node(base, sub, parent, storagepath)
dummy = self.noder.new_dir_node(base, sub,
parent)
cnt += 1
self.noder.flag(dummy)
self._debug('reindexing deeper under {}'.format(sub))
if dummy:
dummy.flag()
self._debug(f'reindexing deeper under {sub}')
nstoragepath = os.sep.join([storagepath, base])
if not storagepath:
nstoragepath = base
cnt2 = self._reindex(sub, dummy, top, nstoragepath)
cnt += cnt2
if dummy:
cnt2 = self._reindex(sub, dummy, top, nstoragepath)
cnt += cnt2
break
self._log(None)
return cnt
def _need_reindex(self, top, path, maccess):
'''test if node needs re-indexing'''
cnode, newer = self.noder.get_node_if_newer(top, path, maccess)
if not cnode:
self._debug('\tdoes not exist')
return True, cnode
if cnode and not newer:
def _need_reindex(self,
top: NodeTop,
path: str,
treepath: str) -> Tuple[bool, Optional[NodeTop]]:
"""
test if node needs re-indexing
@top: top node (storage)
@path: abs path to file
@treepath: rel path from indexed directory
"""
node, changed = self.noder.get_node_if_changed(top, path, treepath)
if not node:
self._debug(f'\t{path} does not exist')
return True, node
if node and not changed:
# ignore this node
self._debug('\tis not newer')
return False, cnode
if cnode and newer:
self._debug(f'\t{path} has not changed')
return False, node
if node and changed:
# remove this node and re-add
self._debug('\tis newer')
self._debug('\tremoving node {}'.format(cnode))
cnode.parent = None
self._debug('\tis to be re-indexed')
return True, cnode
self._debug(f'\t{path} has changed')
self._debug(f"\tremoving node {node.get_name()} for {path}")
node.parent = None
return True, node
def _debug(self, string):
def _debug(self, string: str) -> None:
"""print to debug"""
if not self.debug:
return
Logger.log(string)
Logger.debug(string)
def _log(self, string):
def _progress(self, string: str) -> None:
"""print progress"""
if self.debug:
return
if not string:
# clean
Logger.progr('{:80}'.format(' '))
Logger.progr(' ' * 80)
return
if len(string) > self.MAXLINE:
string = string[:self.MAXLINE] + '...'
Logger.progr('indexing: {:80}'.format(string))
if len(string) > self.MAXLINELEN:
string = string[:self.MAXLINELEN] + '...'
Logger.progr(f'indexing: {string:80}')

@ -1,3 +1,8 @@
docopt; python_version >= '3.0'
types-docopt; python_version >= '3.0'
anytree; python_version >= '3.0'
psutil; python_version >= '3.0'
pyfzf; python_version >= '3.0'
fusepy; python_version >= '3.0'
natsort; python_version >= '3.0'
cmd2; python_version >= '3.0'
gnureadline; python_version >= '3.0'

@ -1,11 +0,0 @@
[metadata]
description-file = README.md
license_file = LICENSE
[bdist_wheel]
python-tag = py3
[files]
extra_files =
LICENSE
README.md

@ -1,29 +1,34 @@
from setuptools import setup, find_packages
from codecs import open
"""setup.py"""
from os import path
import catcli
from setuptools import setup, find_packages
from catcli import version
readme = 'README.md'
README = 'README.md'
here = path.abspath(path.dirname(__file__))
VERSION = version.__version__
REQUIRES_PYTHON = '>=3'
try:
from pypandoc import convert_file
read_readme = lambda f: convert_file(f, 'rst')
except ImportError:
print('\n[WARNING] pypandoc not found, could not convert \"{}\"\n'.format(readme))
read_readme = lambda f: open(f, 'r').read()
VERSION = catcli.__version__
REQUIRES_PYTHON = '>=3'
def read_readme(readme_path):
"""read readme content"""
with open(readme_path, encoding="utf-8") as file:
return file.read()
URL = f'https://github.com/deadc0de6/catcli/archive/v{VERSION}.tar.gz'
setup(
name='catcli',
version=VERSION,
description='The command line catalog tool for your offline data',
long_description=read_readme(readme),
long_description=read_readme(README),
long_description_content_type='text/markdown',
license_files=('LICENSE',),
url='https://github.com/deadc0de6/catcli',
download_url = 'https://github.com/deadc0de6/catcli/archive/v'+VERSION+'.tar.gz',
download_url=URL,
options={"bdist_wheel": {"python_tag": "py3"}},
# include anything from MANIFEST.in
include_package_data=True,
author='deadc0de6',
author_email='deadc0de6@foo.bar',
@ -32,16 +37,19 @@ setup(
python_requires=REQUIRES_PYTHON,
classifiers=[
'Development Status :: 5 - Production/Stable',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'License :: OSI Approved :: GNU General Public License v3 (GPLv3)',
],
keywords='catalog commandline indexer offline',
packages=find_packages(exclude=['tests*']),
install_requires=['docopt', 'anytree','psutil'],
install_requires=['docopt', 'types-docopt', 'anytree',
'pyfzf', 'fusepy', 'natsort', 'cmd2',
'gnureadline'],
extras_require={
'dev': ['check-manifest'],

@ -0,0 +1,22 @@
#!/usr/bin/env bash
# run me from the root of the package
source tests-ng/helper
rm -f tests-ng/assets/github.catalog.json
python3 -m catcli.catcli index -c github .github --catalog=tests-ng/assets/github.catalog.json
# edit catalog
clean_catalog "tests-ng/assets/github.catalog.json"
# native
python3 -m catcli.catcli ls -r -s -B --catalog=tests-ng/assets/github.catalog.json | \
sed -e 's/free:.*%/free:0.0%/g' \
-e 's/....-..-.. ..:..:../2023-03-09 16:20:59/g' \
-e 's#du:[^|]* |#du:0/0 |#g' > tests-ng/assets/github.catalog.native.txt
# csv
python3 -m catcli.catcli ls -r -s -B --catalog=tests-ng/assets/github.catalog.json --format=csv | \
sed -e 's/"3","[^"]*","[^"]*",""/"3","0","0",""/g' | \
sed 's/20..-..-.. ..:..:..//g' > tests-ng/assets/github.catalog.csv.txt

@ -0,0 +1,6 @@
"github","storage","","4865","","","","3","0","0",""
"FUNDING.yml","file","github/FUNDING.yml","17","","","0c6407a84d412c514007313fb3bca4de","","","",""
"codecov.yml","file","github/codecov.yml","104","","","4203204f75b43cd4bf032402beb3359d","","","",""
"workflows","dir","github/workflows","3082","","","","2","","",""
"pypi-release.yml","file","github/workflows/pypi-release.yml","691","","","57699a7a6a03e20e864f220e19f8e197","","","",""
"testing.yml","file","github/workflows/testing.yml","850","","","691df1a4d2f254b5cd04c152e7c6ccaf","","","",""

@ -0,0 +1,65 @@
{
"children": [
{
"attr": "",
"children": [
{
"maccess": 1666206037.0786593,
"md5": "0c6407a84d412c514007313fb3bca4de",
"name": "FUNDING.yml",
"size": 17,
"type": "file"
},
{
"maccess": 1704320710.7056112,
"md5": "4203204f75b43cd4bf032402beb3359d",
"name": "codecov.yml",
"size": 104,
"type": "file"
},
{
"children": [
{
"maccess": 1666206037.078865,
"md5": "57699a7a6a03e20e864f220e19f8e197",
"name": "pypi-release.yml",
"size": 691,
"type": "file"
},
{
"maccess": 1704403569.24789,
"md5": "691df1a4d2f254b5cd04c152e7c6ccaf",
"name": "testing.yml",
"size": 850,
"type": "file"
}
],
"maccess": 1704320727.2641916,
"name": "workflows",
"size": 1541,
"type": "dir"
}
],
"free": 0,
"name": "github",
"size": 1662,
"total": 0,
"ts": 1704923096,
"type": "storage"
},
{
"attr": {
"access": 1704923096,
"access_version": "0.9.6",
"created": 1704923096,
"created_version": "0.9.6"
},
"name": "meta",
"size": null,
"type": "meta"
}
],
"name": "top",
"size": null,
"type": "top"
}

@ -0,0 +1,7 @@
top
└── storage: github [nbfiles:3|totsize:4865|free:0.0%|du:0/0|date:2023-03-09 16:20:59]
├── FUNDING.yml 17 2023-03-09 16:20:59 [md5:0c6407a84d412c514007313fb3bca4de]
├── codecov.yml 104 2023-03-09 16:20:59 [md5:4203204f75b43cd4bf032402beb3359d]
└── workflows 3082 2023-03-09 16:20:59 [nbfiles:2]
├── pypi-release.yml 691 2023-03-09 16:20:59 [md5:57699a7a6a03e20e864f220e19f8e197]
└── testing.yml 850 2023-03-09 16:20:59 [md5:691df1a4d2f254b5cd04c152e7c6ccaf]

@ -0,0 +1,136 @@
#!/usr/bin/env bash
# author: deadc0de6 (https://github.com/deadc0de6)
# Copyright (c) 2023, deadc0de6
set -e
cur=$(cd "$(dirname "${0}")" && pwd)
prev="${cur}/.."
cd "${prev}"
# coverage
bin="python3 -m catcli.catcli"
if command -v coverage 2>/dev/null; then
mkdir -p coverages/
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
fi
echo "current dir: $(pwd)"
echo "pythonpath: ${PYTHONPATH}"
echo "bin: ${bin}"
${bin} --version
# get the helpers
# shellcheck source=tests-ng/helper
source "${cur}"/helper
echo -e "$(tput setaf 6)==> RUNNING $(basename "${BASH_SOURCE[0]}") <==$(tput sgr0)"
##########################################################
# the test
##########################################################
# create temp dirs
tmpd=$(mktemp -d)
clear_on_exit "${tmpd}"
catalog="${tmpd}/catalog"
# index
${bin} -B index -c --catalog="${catalog}" github .github
clean_catalog "${catalog}"
ls -laR .github
cat "${catalog}"
#cat "${catalog}"
echo ""
# compare keys
echo "[+] compare keys"
src="tests-ng/assets/github.catalog.json"
src_keys="${tmpd}/src-keys"
dst_keys="${tmpd}/dst-keys"
cat "${src}" | jq '.. | keys?' | jq '.[]' | sort > "${src_keys}"
cat "${catalog}" | jq '.. | keys?' | jq '.[]' | sort > "${dst_keys}"
echo "src:"
cat "${src_keys}"
echo "dst:"
cat "${dst_keys}"
diff "${src_keys}" "${dst_keys}"
echo "ok!"
# compare children 1
echo "[+] compare children 1"
src_keys="${tmpd}/src-child1"
dst_keys="${tmpd}/dst-child1"
cat "${src}" | jq '. | select(.type=="top") | .children | .[].name' | sort > "${src_keys}"
cat "${catalog}" | jq '. | select(.type=="top") | .children | .[].name' | sort > "${dst_keys}"
echo "src:"
cat "${src_keys}"
echo "dst:"
cat "${dst_keys}"
diff "${src_keys}" "${dst_keys}"
echo "ok!"
# compare children 2
echo "[+] compare children 2"
src_keys="${tmpd}/src-child2"
dst_keys="${tmpd}/dst-child2"
cat "${src}" | jq '. | select(.type=="top") | .children | .[] | select(.name=="github") | .children | .[].name' | sort > "${src_keys}"
cat "${catalog}" | jq '. | select(.type=="top") | .children | .[] | select(.name=="github") | .children | .[].name' | sort > "${dst_keys}"
echo "src:"
cat "${src_keys}"
echo "dst:"
cat "${dst_keys}"
diff "${src_keys}" "${dst_keys}"
echo "ok!"
# compare children 3
echo "[+] compare children 3"
src_keys="${tmpd}/src-child3"
dst_keys="${tmpd}/dst-child3"
cat "${src}" | jq '. | select(.type=="top") | .children | .[] | select(.name=="github") | .children | .[] | select(.name=="workflows") | .children | .[].name' | sort > "${src_keys}"
cat "${catalog}" | jq '. | select(.type=="top") | .children | .[] | select(.name=="github") | .children | .[] | select(.name=="workflows") | .children | .[].name' | sort > "${dst_keys}"
echo "src:"
cat "${src_keys}"
echo "dst:"
cat "${dst_keys}"
diff "${src_keys}" "${dst_keys}"
echo "ok!"
# native
echo "[+] compare native output"
native="${tmpd}/native.txt"
${bin} -B ls -s -r --format=native --catalog="${catalog}" > "${native}"
mod="${tmpd}/native.mod.txt"
cat "${native}" | sed -e 's/free:.*%/free:0.0%/g' \
-e 's/....-..-.. ..:..:../2023-03-09 16:20:59/g' \
-e 's#du:[^|]* |#du:0/0 |#g' > "${mod}"
if command -v delta >/dev/null; then
delta -s "tests-ng/assets/github.catalog.native.txt" "${mod}"
fi
diff --color=always "tests-ng/assets/github.catalog.native.txt" "${mod}"
echo "ok!"
# csv
echo "[+] compare csv output"
csv="${tmpd}/csv.txt"
${bin} -B ls -s -r --format=csv --catalog="${catalog}" > "${csv}"
# modify created csv
mod="${tmpd}/csv.mod.txt"
cat "${csv}" | \
sed -e 's/"3","[^"]*","[^"]*",""/"3","0","0",""/g' | \
sed 's/20..-..-.. ..:..:..//g' > "${mod}"
# modify original
ori="${tmpd}/ori.mod.txt"
cat "tests-ng/assets/github.catalog.csv.txt" | \
sed 's/....-..-.. ..:..:..//g' | \
sed 's/"3","[^"]*","[^"]*",""/"3","0","0",""/g' > "${ori}"
if command -v delta >/dev/null; then
delta -s "${ori}" "${mod}"
fi
diff "${ori}" "${mod}"
echo "ok!"
# the end
echo "test \"$(basename "$0")\" success"
cd "${cur}"
exit 0

@ -0,0 +1,61 @@
#!/usr/bin/env bash
# author: deadc0de6 (https://github.com/deadc0de6)
# Copyright (c) 2023, deadc0de6
set -e
cur=$(cd "$(dirname "${0}")" && pwd)
prev="${cur}/.."
cd "${prev}"
# coverage
bin="python3 -m catcli.catcli"
if command -v coverage 2>/dev/null; then
mkdir -p coverages/
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
fi
echo "current dir: $(pwd)"
echo "pythonpath: ${PYTHONPATH}"
echo "bin: ${bin}"
${bin} --version
# get the helpers
# shellcheck source=tests-ng/helper
source "${cur}"/helper
echo -e "$(tput setaf 6)==> RUNNING $(basename "${BASH_SOURCE[0]}") <==$(tput sgr0)"
##########################################################
# the test
##########################################################
# create temp dirs
tmpd=$(mktemp -d)
clear_on_exit "${tmpd}"
catalog="${tmpd}/catalog"
# index
${bin} -B index -c -f --catalog="${catalog}" github1 .github
${bin} -B index -c -f --catalog="${catalog}" github2 .github
clean_catalog "${catalog}"
#cat "${catalog}"
echo ""
${bin} -B ls -r --catalog="${catalog}"
echo "finding \"testing.yml\""
${bin} -B find --catalog="${catalog}" testing.yml
cnt=$(${bin} -B find --catalog="${catalog}" testing.yml | wc -l)
[ "${cnt}" != "2" ] && echo "should return 2 (not ${cnt})" && exit 1
echo "finding \"*.yml\""
${bin} -B find --catalog="${catalog}" '*.yml'
cnt=$(${bin} -B find --catalog="${catalog}" '*.yml' | wc -l)
[ "${cnt}" != "8" ] && echo "should return 8 (not ${cnt})" && exit 1
# the end
echo ""
echo "test \"$(basename "$0")\" success"
cd "${cur}"
exit 0

@ -0,0 +1,76 @@
# author: deadc0de6 (https://github.com/deadc0de6)
# Copyright (c) 2023, deadc0de6
#
# file to be sourced from test scripts
#
declare -a to_be_cleared
# add a file/directory to be cleared
# on exit
#
# $1: file path to clear
clear_on_exit()
{
local len="${#to_be_cleared[*]}"
# shellcheck disable=SC2004
to_be_cleared[${len}]="$1"
if [ "${len}" = "0" ]; then
# set trap
trap on_exit EXIT
fi
}
# clear catalog stuff for testing
# $1: catalog path
clean_catalog()
{
sed -i 's/"free": .*,/"free": 0,/g' "${1}"
sed -i 's/"total": .*,/"total": 0,/g' "${1}"
}
# clear files
on_exit()
{
for i in "${to_be_cleared[@]}"; do
rm -rf "${i}"
done
}
# osx tricks
# brew install coreutils gnu-sed
if [[ $OSTYPE == 'darwin'* ]]; then
mktemp() {
gmktemp "$@"
}
stat() {
gstat "$@"
}
sed() {
gsed "$@"
}
wc() {
gwc "$@"
}
date() {
gdate "$@"
}
chmod() {
gchmod "$@"
}
readlink() {
greadlink "$@"
}
realpath() {
grealpath "$@"
}
export -f mktemp
export -f stat
export -f sed
export -f wc
export -f date
export -f chmod
export -f readlink
export -f realpath
fi

@ -0,0 +1,59 @@
#!/usr/bin/env bash
# author: deadc0de6 (https://github.com/deadc0de6)
# Copyright (c) 2024, deadc0de6
set -e
cur=$(cd "$(dirname "${0}")" && pwd)
prev="${cur}/.."
cd "${prev}"
# coverage
bin="python3 -m catcli.catcli"
if command -v coverage 2>/dev/null; then
mkdir -p coverages/
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
fi
echo "current dir: $(pwd)"
echo "pythonpath: ${PYTHONPATH}"
echo "bin: ${bin}"
${bin} --version
# get the helpers
# shellcheck source=tests-ng/helper
source "${cur}"/helper
echo -e "$(tput setaf 6)==> RUNNING $(basename "${BASH_SOURCE[0]}") <==$(tput sgr0)"
##########################################################
# the test
##########################################################
# create temp dirs
tmpd=$(mktemp -d)
clear_on_exit "${tmpd}"
catalog="${tmpd}/catalog"
# index
${bin} -B index -c -f --catalog="${catalog}" github1 .github
${bin} -B index -c -f --catalog="${catalog}" github2 .github
clean_catalog "${catalog}"
#cat "${catalog}"
echo ""
${bin} -B ls -r --catalog="${catalog}"
${bin} -B ls --catalog="${catalog}" 'github1/*.yml'
cnt=$(${bin} -B ls --catalog="${catalog}" 'github1/*.yml' | wc -l)
[ "${cnt}" != "2" ] && echo "should return 2 (not ${cnt})" && exit 1
${bin} -B ls --catalog="${catalog}" 'github*/*.yml'
cnt=$(${bin} -B ls --catalog="${catalog}" 'github*/*.yml' | wc -l)
[ "${cnt}" != "4" ] && echo "should return 4 (not ${cnt})" && exit 1
# the end
echo ""
echo "test \"$(basename "$0")\" success"
cd "${cur}"
exit 0

@ -0,0 +1,80 @@
#!/usr/bin/env bash
# author: deadc0de6 (https://github.com/deadc0de6)
# Copyright (c) 2021, deadc0de6
set -e
cur=$(cd "$(dirname "${0}")" && pwd)
prev="${cur}/.."
cd "${prev}"
# coverage
bin="python3 -m catcli.catcli"
if command -v coverage 2>/dev/null; then
mkdir -p coverages/
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
fi
echo "current dir: $(pwd)"
echo "pythonpath: ${PYTHONPATH}"
echo "bin: ${bin}"
${bin} --version
# get the helpers
# shellcheck source=tests-ng/helper
source "${cur}"/helper
echo -e "$(tput setaf 6)==> RUNNING $(basename "${BASH_SOURCE[0]}") <==$(tput sgr0)"
##########################################################
# the test
##########################################################
# create temp dirs
tmpd=$(mktemp -d)
clear_on_exit "${tmpd}"
tmpu="${tmpd}/dir2"
mkdir -p "${tmpu}"
catalog="${tmpd}/catalog"
mkdir -p "${tmpd}/dir"
echo "abc" > "${tmpd}/dir/a"
# index
${bin} -B index --catalog="${catalog}" dir "${tmpd}/dir"
${bin} -B ls --catalog="${catalog}"
# get attributes
freeb=$(${bin} -B ls --catalog="${catalog}" | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g')
dub=$(${bin} -B ls --catalog="${catalog}" | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g')
dateb=$(${bin} -B ls --catalog="${catalog}" | grep date: | sed 's/^.*,date: \(.*\)$/\1/g')
echo "before: free:${freeb} | du:${dub} | date:${dateb}"
# change content
echo "abc" >> "${tmpd}/dir/a"
echo "abc" > "${tmpd}/dir/b"
# move dir
cp -r "${tmpd}/dir" "${tmpu}/"
# sleep to force date change
sleep 1
# update
${bin} -B update -f --catalog="${catalog}" dir "${tmpu}/dir"
${bin} -B ls --catalog="${catalog}"
# get new attributes
freea=$(${bin} -B ls --catalog="${catalog}" | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g')
dua=$(${bin} -B ls --catalog="${catalog}" | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g')
datea=$(${bin} -B ls --catalog="${catalog}" | grep date: | sed 's/^.*,date: \(.*\)$/\1/g')
echo "after: free:${freea} | du:${dua} | date:${datea}"
# test they are all different
[ "${freeb}" = "${freea}" ] && echo "WARNING free didn't change!"
[ "${dub}" = "${dua}" ] && echo "WARNING du didn't change!"
[ "${dateb}" = "${datea}" ] && echo "WARNING date didn't change!" && exit 1
# the end
echo "test \"$(basename "$0")\" success"
cd "${cur}"
exit 0

@ -1,5 +1,8 @@
pycodestyle; python_version >= '3.0'
pyflakes; python_version >= '3.0'
nose; python_version >= '3.0'
nose2; python_version >= '3.0'
coverage; python_version >= '3.0'
coveralls; python_version >= '3.0'
pylint; python_version > '3.0'
mypy; python_version > '3.0'
pytest; python_version > '3.0'
pytype; python_version > '3.0'

@ -2,13 +2,105 @@
# author: deadc0de6 (https://github.com/deadc0de6)
# Copyright (c) 2017, deadc0de6
cur=$(dirname "$(readlink -f "${0}")")
# stop on first error
set -ev
set -e
#set -v
pycodestyle --ignore=W605 catcli/
# pycodestyle
echo "[+] pycodestyle"
pycodestyle --version
pycodestyle catcli/
pycodestyle tests/
pycodestyle setup.py
# pyflakes
echo "[+] pyflakes"
pyflakes --version
pyflakes catcli/
pyflakes tests/
pyflakes setup.py
# pylint
# R0914: Too many local variables
# R0913: Too many arguments
# R0912: Too many branches
# R0915: Too many statements
# R0911: Too many return statements
# R0903: Too few public methods
# R0902: Too many instance attributes
# R0201: no-self-used
echo "[+] pylint"
pylint --version
pylint -sn \
--disable=R0914 \
--disable=R0913 \
--disable=R0912 \
--disable=R0915 \
--disable=R0911 \
--disable=R0903 \
--disable=R0902 \
--disable=R0201 \
--disable=R0022 \
catcli/
# R0801: Similar lines in 2 files
# W0212: Access to a protected member
# R0914: Too many local variables
# R0915: Too many statements
pylint -sn \
--disable=R0801 \
--disable=W0212 \
--disable=R0914 \
--disable=R0915 \
tests/
pylint -sn setup.py
# mypy
echo "[+] mypy"
mypy --version
mypy --config-file=.mypy.ini catcli/
# pytype
echo "[+] pytype"
pytype --version
pytype catcli/
set +e
grep -R 'TODO' catcli/ && echo "TODO found" && exit 1
grep -R 'FIXME' catcli/ && echo "FIXME found" && exit 1
set -e
# unittest
echo "[+] unittests"
mkdir -p coverages/
coverage run -p --data-file coverages/coverage -m pytest tests
# tests-ng
echo "[+] tests-ng"
for t in "${cur}"/tests-ng/*.sh; do
echo "running test \"$(basename "${t}")\""
${t}
done
# check shells
echo "[+] shellcheck"
if ! command -v shellcheck >/dev/null 2>&1; then
echo "Install shellcheck"
exit 1
fi
shellcheck --version
find . -iname '*.sh' | while read -r script; do
shellcheck -x \
--exclude SC2002 \
"${script}"
done
# merge coverage
echo "[+] coverage merge"
coverage combine coverages/*
coverage xml
PYTHONPATH=catcli python3 -m nose -s --with-coverage --cover-package=catcli
echo "ALL TESTS DONE OK"
exit 0

@ -11,6 +11,7 @@ import random
import tempfile
import shutil
import subprocess
import hashlib
TMPSUFFIX = '.catcli'
@ -20,13 +21,32 @@ TMPSUFFIX = '.catcli'
def get_rnd_string(length):
'''Get a random string of specific length '''
"""Get a random string of specific length """
alpha = string.ascii_uppercase + string.digits
return ''.join(random.choice(alpha) for _ in range(length))
def md5sum(path):
"""calculate md5 sum of a file"""
rpath = os.path.realpath(path)
if not os.path.exists(rpath):
return None
try:
with open(rpath, mode='rb') as file:
val = hashlib.md5()
while True:
buf = file.read(4096)
if not buf:
break
val.update(buf)
return val.hexdigest()
except PermissionError:
pass
return None
def clean(path):
'''Delete file or folder.'''
"""Delete file or folder."""
if not os.path.exists(path):
return
if os.path.islink(path):
@ -38,13 +58,12 @@ def clean(path):
def edit_file(path, newcontent):
if not os.path.exists(path):
write_to_file(path, newcontent)
else:
write_to_file(path, newcontent)
"""edit file content"""
return write_to_file(path, newcontent)
def unix_tree(path):
"""print using unix tree tool"""
if not os.path.exists(path):
return
# cmd = ['tree', path]
@ -58,7 +77,7 @@ def unix_tree(path):
def create_tree():
''' create a random tree of files and directories '''
""" create a random tree of files and directories """
dirpath = get_tempdir()
# create 3 files
create_rnd_file(dirpath, get_rnd_string(5))
@ -66,13 +85,13 @@ def create_tree():
create_rnd_file(dirpath, get_rnd_string(5))
# create 2 directories
d1 = create_dir(dirpath, get_rnd_string(3))
d2 = create_dir(dirpath, get_rnd_string(3))
dir1 = create_dir(dirpath, get_rnd_string(3))
dir2 = create_dir(dirpath, get_rnd_string(3))
# fill directories
create_rnd_file(d1, get_rnd_string(4))
create_rnd_file(d1, get_rnd_string(4))
create_rnd_file(d2, get_rnd_string(6))
create_rnd_file(dir1, get_rnd_string(4))
create_rnd_file(dir1, get_rnd_string(4))
create_rnd_file(dir2, get_rnd_string(6))
return dirpath
@ -82,12 +101,12 @@ def create_tree():
def get_tempdir():
'''Get a temporary directory '''
"""Get a temporary directory """
return tempfile.mkdtemp(suffix=TMPSUFFIX)
def create_dir(path, dirname):
'''Create a directory '''
"""Create a directory """
fpath = os.path.join(path, dirname)
if not os.path.exists(fpath):
os.mkdir(fpath)
@ -95,7 +114,7 @@ def create_dir(path, dirname):
def create_rnd_file(path, filename, content=None):
'''Create the file filename in path with random content if None '''
"""Create the file filename in path with random content if None """
if not content:
content = get_rnd_string(100)
fpath = os.path.join(path, filename)
@ -103,23 +122,25 @@ def create_rnd_file(path, filename, content=None):
def write_to_file(path, content):
with open(path, 'w') as f:
f.write(content)
"""write content to file"""
with open(path, 'w', encoding='utf-8') as file:
file.write(content)
return path
def read_from_file(path):
"""read file content"""
if not os.path.exists(path):
return ''
with open(path, 'r') as f:
content = f.read()
with open(path, 'r', encoding='utf-8') as file:
content = file.read()
return content
############################################################
# fake tree in json
############################################################
FAKECATALOG = '''
FAKECATALOG = """
{
"children": [
{
@ -128,21 +149,18 @@ FAKECATALOG = '''
{
"md5": null,
"name": "7544G",
"relpath": "tmpj5602ih7.catcli/7544G",
"size": 100,
"type": "file"
},
{
"md5": null,
"name": "KF2ZC",
"relpath": "tmpj5602ih7.catcli/KF2ZC",
"size": 100,
"type": "file"
},
{
"md5": null,
"name": "Z9OII",
"relpath": "tmpj5602ih7.catcli/Z9OII",
"size": 100,
"type": "file"
},
@ -151,14 +169,12 @@ FAKECATALOG = '''
{
"md5": null,
"name": "M592O9",
"relpath": "tmpj5602ih7.catcli/VNN/M592O9",
"size": 100,
"type": "file"
}
],
"md5": null,
"name": "VNN",
"relpath": "VNN",
"size": 100,
"type": "dir"
},
@ -167,21 +183,18 @@ FAKECATALOG = '''
{
"md5": null,
"name": "X37H",
"relpath": "tmpj5602ih7.catcli/P4C/X37H",
"size": 100,
"type": "file"
},
{
"md5": null,
"name": "I566",
"relpath": "tmpj5602ih7.catcli/P4C/I566",
"size": 100,
"type": "file"
}
],
"md5": null,
"name": "P4C",
"relpath": "P4C",
"size": 200,
"type": "dir"
}
@ -197,9 +210,9 @@ FAKECATALOG = '''
"name": "top",
"type": "top"
}
'''
"""
def get_fakecatalog():
# catalog constructed through test_index
"""catalog constructed through test_index"""
return FAKECATALOG

@ -0,0 +1,29 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2017, deadc0de6
Basic unittest for ls
"""
import unittest
from catcli.decomp import Decomp
class TestDecomp(unittest.TestCase):
"""test ls"""
def test_list(self):
"""test decomp formats"""
dec = Decomp()
formats = dec.get_formats()
self.assertTrue('zip' in formats)
def main():
"""entry point"""
unittest.main()
if __name__ == '__main__':
main()

@ -14,17 +14,20 @@ from tests.helpers import get_fakecatalog
class TestFind(unittest.TestCase):
"""test find"""
def test_find(self):
"""test find"""
# init
catalog = Catalog('fake', force=True, verbose=False)
catalog = Catalog('fake', force=True, debug=False)
top = catalog._restore_json(get_fakecatalog())
noder = Noder()
# create fake args
args = {'<term>': '7544G', '--script': True,
'--verbose': True, '--parent': False,
'--directory': False, '--path': None}
'--directory': False, '--path': None,
'--format': 'native', '--raw-size': False}
# try to find something
found = cmd_find(args, noder, top)
@ -37,6 +40,7 @@ class TestFind(unittest.TestCase):
def main():
"""entry point"""
unittest.main()

@ -16,14 +16,16 @@ from tests.helpers import clean, get_fakecatalog
class TestGraph(unittest.TestCase):
"""test graph"""
def test_graph(self):
"""test graph"""
# init
path = 'fake'
gpath = tempfile.gettempdir() + os.sep + 'graph.dot'
self.addCleanup(clean, path)
self.addCleanup(clean, gpath)
catalog = Catalog(path, force=True, verbose=False)
catalog = Catalog(path, force=True, debug=False)
top = catalog._restore_json(get_fakecatalog())
noder = Noder()
@ -38,6 +40,7 @@ class TestGraph(unittest.TestCase):
def main():
"""entry point"""
unittest.main()

@ -16,8 +16,10 @@ from tests.helpers import get_tempdir, create_rnd_file, clean, \
class TestIndexing(unittest.TestCase):
"""test index"""
def test_index(self):
"""test index"""
# init
workingdir = get_tempdir()
catalogpath = create_rnd_file(workingdir, 'catalog.json', content='')
@ -27,28 +29,28 @@ class TestIndexing(unittest.TestCase):
self.addCleanup(clean, dirpath)
# create 3 files
f1 = create_rnd_file(dirpath, get_rnd_string(5))
f2 = create_rnd_file(dirpath, get_rnd_string(5))
f3 = create_rnd_file(dirpath, get_rnd_string(5))
file1 = create_rnd_file(dirpath, get_rnd_string(5))
file2 = create_rnd_file(dirpath, get_rnd_string(5))
file3 = create_rnd_file(dirpath, get_rnd_string(5))
# create 2 directories
d1 = create_dir(dirpath, get_rnd_string(3))
d2 = create_dir(dirpath, get_rnd_string(3))
dir1 = create_dir(dirpath, get_rnd_string(3))
dir2 = create_dir(dirpath, get_rnd_string(3))
# fill directories with files
_ = create_rnd_file(d1, get_rnd_string(4))
_ = create_rnd_file(d1, get_rnd_string(4))
_ = create_rnd_file(d2, get_rnd_string(6))
_ = create_rnd_file(dir1, get_rnd_string(4))
_ = create_rnd_file(dir1, get_rnd_string(4))
_ = create_rnd_file(dir2, get_rnd_string(6))
noder = Noder()
top = noder.new_top_node()
catalog = Catalog(catalogpath, force=True, verbose=False)
catalog = Catalog(catalogpath, force=True, debug=False)
# create fake args
tmpdirname = 'tmpdir'
args = {'<path>': dirpath, '<name>': tmpdirname,
'--hash': True, '--meta': ['some meta'],
'--no-subsize': False, '--verbose': True}
'--verbose': True}
# index the directory
cmd_index(args, noder, catalog, top)
@ -60,21 +62,22 @@ class TestIndexing(unittest.TestCase):
self.assertTrue(len(storage.children) == 5)
# ensures files and directories are in
names = [x.name for x in storage.children]
self.assertTrue(os.path.basename(f1) in names)
self.assertTrue(os.path.basename(f2) in names)
self.assertTrue(os.path.basename(f3) in names)
self.assertTrue(os.path.basename(d1) in names)
self.assertTrue(os.path.basename(d2) in names)
names = [x.get_name() for x in storage.children]
self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names)
self.assertTrue(os.path.basename(file3) in names)
self.assertTrue(os.path.basename(dir1) in names)
self.assertTrue(os.path.basename(dir2) in names)
for node in storage.children:
if node.name == os.path.basename(d1):
if node.get_name() == os.path.basename(dir1):
self.assertTrue(len(node.children) == 2)
elif node.name == os.path.basename(d2):
elif node.get_name() == os.path.basename(dir2):
self.assertTrue(len(node.children) == 1)
def main():
"""entry point"""
unittest.main()

@ -14,18 +14,22 @@ from tests.helpers import get_fakecatalog, clean
class TestWalking(unittest.TestCase):
"""test ls"""
def test_ls(self):
"""test ls"""
# init
path = 'fake'
self.addCleanup(clean, path)
catalog = Catalog(path, force=True, verbose=False)
catalog = Catalog(path, force=True, debug=False)
top = catalog._restore_json(get_fakecatalog())
noder = Noder()
# create fake args
args = {'<path>': '', '--recursive': False,
'--verbose': True}
'--verbose': True,
'--format': 'native',
'--raw-size': False}
# list root
args['<path>'] = ''
@ -54,6 +58,7 @@ class TestWalking(unittest.TestCase):
def main():
"""entry point"""
unittest.main()

@ -14,18 +14,22 @@ from tests.helpers import clean, get_fakecatalog
class TestRm(unittest.TestCase):
"""test rm"""
def test_rm(self):
"""test rm"""
# init
path = 'fake'
self.addCleanup(clean, path)
catalog = Catalog(path, force=True, verbose=False)
catalog = Catalog(path, force=True, debug=False)
top = catalog._restore_json(get_fakecatalog())
noder = Noder()
# create fake args dict
args = {'<path>': '', '--recursive': False,
'--verbose': True}
'--verbose': True,
'--format': 'native',
'--raw-size': False}
# list files and make sure there are children
args['<path>'] = ''
@ -42,10 +46,11 @@ class TestRm(unittest.TestCase):
top = cmd_rm(args, noder, catalog, top)
# ensure there no children anymore
self.assertTrue(len(top.children) == 0)
self.assertEqual(len(top.children), 0)
def main():
"""entry point"""
unittest.main()

@ -7,30 +7,40 @@ Basic unittest for tree
import unittest
from catcli.catcli import cmd_tree
from catcli.catcli import cmd_ls
from catcli.noder import Noder
from catcli.catalog import Catalog
from tests.helpers import clean, get_fakecatalog
class TestTree(unittest.TestCase):
"""Test the tree"""
def test_tree(self):
"""test the tree"""
# init
path = 'fake'
self.addCleanup(clean, path)
catalog = Catalog(path, force=True, verbose=False)
catalog = Catalog(path, force=True, debug=False)
top = catalog._restore_json(get_fakecatalog())
noder = Noder()
# create fake args dict
args = {'<path>': path, '--verbose': True}
args = {
'<path>': path,
'--verbose': True,
'--format': 'native',
'--header': False,
'--raw-size': False,
'--recursive': True,
}
# print tree and wait for any errors
cmd_tree(args, noder, top)
cmd_ls(args, noder, top)
def main():
"""entry point"""
unittest.main()

@ -7,18 +7,20 @@ Basic unittest for updating an index
import unittest
import os
import anytree
from catcli.catcli import cmd_index, cmd_update
from catcli.noder import Noder
from catcli.catalog import Catalog
from tests.helpers import create_dir, create_rnd_file, get_tempdir, \
clean, unix_tree, edit_file, read_from_file
import anytree
clean, unix_tree, edit_file, read_from_file, md5sum
class TestIndexing(unittest.TestCase):
class TestUpdate(unittest.TestCase):
"""test update"""
def test_index(self):
def test_update(self):
"""test update"""
# init
workingdir = get_tempdir()
catalogpath = create_rnd_file(workingdir, 'catalog.json', content='')
@ -28,71 +30,137 @@ class TestIndexing(unittest.TestCase):
self.addCleanup(clean, dirpath)
# create 3 files
f1 = create_rnd_file(dirpath, 'file1')
f2 = create_rnd_file(dirpath, 'file2')
f3 = create_rnd_file(dirpath, 'file3')
file1 = create_rnd_file(dirpath, 'file1')
file2 = create_rnd_file(dirpath, 'file2')
file3 = create_rnd_file(dirpath, 'file3')
file4 = create_rnd_file(dirpath, 'file4')
# create 2 directories
d1 = create_dir(dirpath, 'dir1')
d2 = create_dir(dirpath, 'dir2')
dir1 = create_dir(dirpath, 'dir1')
dir2 = create_dir(dirpath, 'dir2')
# fill directories with files
d1f1 = create_rnd_file(d1, 'dir1file1')
d1f2 = create_rnd_file(d1, 'dir1file2')
d2f1 = create_rnd_file(d2, 'dir2file1')
d1f1 = create_rnd_file(dir1, 'dir1file1')
d1f2 = create_rnd_file(dir1, 'dir1file2')
d2f1 = create_rnd_file(dir2, 'dir2file1')
d2f2 = create_rnd_file(dir2, 'dir2file2')
noder = Noder()
noder = Noder(debug=True)
noder.do_hashing(True)
top = noder.new_top_node()
catalog = Catalog(catalogpath, force=True, verbose=False)
catalog = Catalog(catalogpath, force=True, debug=False)
# get checksums
f4_md5 = md5sum(file4)
self.assertTrue(f4_md5)
d1f1_md5 = md5sum(d1f1)
self.assertTrue(d1f1_md5)
d2f2_md5 = md5sum(d2f2)
self.assertTrue(d2f2_md5)
# create fake args
tmpdirname = 'tmpdir'
args = {'<path>': dirpath, '<name>': tmpdirname,
'--hash': True, '--meta': ['some meta'],
'--no-subsize': False, '--verbose': True}
'--verbose': True, '--lpath': None}
# index the directory
unix_tree(dirpath)
cmd_index(args, noder, catalog, top, debug=True)
cmd_index(args, noder, catalog, top)
self.assertTrue(os.stat(catalogpath).st_size != 0)
# ensure md5 sum are in
nods = noder.find(top, os.path.basename(file4))
self.assertEqual(len(nods), 1)
nod = nods[0]
self.assertTrue(nod)
self.assertEqual(nod.md5, f4_md5)
# print catalog
noder.print_tree(top)
# add some files and directories
new1 = create_rnd_file(d1, 'newf1')
new1 = create_rnd_file(dir1, 'newf1')
new2 = create_rnd_file(dirpath, 'newf2')
new3 = create_dir(dirpath, 'newd3')
new4 = create_dir(d2, 'newd4')
new4 = create_dir(dir2, 'newd4')
new5 = create_rnd_file(new4, 'newf5')
unix_tree(dirpath)
# modify files
EDIT = 'edited'
edit_file(d1f1, EDIT)
editval = 'edited'
edit_file(d1f1, editval)
d1f1_md5_new = md5sum(d1f1)
self.assertTrue(d1f1_md5_new)
self.assertTrue(d1f1_md5_new != d1f1_md5)
# change file without mtime
maccess = os.path.getmtime(file4)
editval = 'edited'
edit_file(file4, editval)
# reset edit time
os.utime(file4, (maccess, maccess))
f4_md5_new = md5sum(d1f1)
self.assertTrue(f4_md5_new)
self.assertTrue(f4_md5_new != f4_md5)
# change file without mtime
maccess = os.path.getmtime(d2f2)
editval = 'edited'
edit_file(d2f2, editval)
# reset edit time
os.utime(d2f2, (maccess, maccess))
d2f2_md5_new = md5sum(d2f2)
self.assertTrue(d2f2_md5_new)
self.assertTrue(d2f2_md5_new != d2f2_md5)
# update storage
cmd_update(args, noder, catalog, top, debug=True)
cmd_update(args, noder, catalog, top)
# print catalog
# print(read_from_file(catalogpath))
noder.print_tree(top)
# explore the top node to find all nodes
self.assertTrue(len(top.children) == 1)
self.assertEqual(len(top.children), 1)
storage = top.children[0]
self.assertTrue(len(storage.children) == 7)
self.assertEqual(len(storage.children), 8)
# ensure d1f1 md5 sum has changed in catalog
nods = noder.find(top, os.path.basename(d1f1))
self.assertTrue(len(nods) == 1)
nod = nods[0]
self.assertTrue(nod)
self.assertTrue(nod.md5 != d1f1_md5)
self.assertTrue(nod.md5 == d1f1_md5_new)
# ensure f4 md5 sum has changed in catalog
nods = noder.find(top, os.path.basename(file4))
self.assertTrue(len(nods) == 1)
nod = nods[0]
self.assertTrue(nod)
self.assertTrue(nod.md5 != f4_md5)
self.assertTrue(nod.md5 == f4_md5_new)
# ensure d2f2 md5 sum has changed in catalog
nods = noder.find(top, os.path.basename(d2f2))
self.assertTrue(len(nods) == 1)
nod = nods[0]
self.assertTrue(nod)
self.assertTrue(nod.md5 != d2f2_md5)
self.assertTrue(nod.md5 == d2f2_md5_new)
# ensures files and directories are in
names = [node.name for node in anytree.PreOrderIter(storage)]
names = [node.get_name() for node in anytree.PreOrderIter(storage)]
print(names)
self.assertTrue(os.path.basename(f1) in names)
self.assertTrue(os.path.basename(f2) in names)
self.assertTrue(os.path.basename(f3) in names)
self.assertTrue(os.path.basename(d1) in names)
self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names)
self.assertTrue(os.path.basename(file3) in names)
self.assertTrue(os.path.basename(file4) in names)
self.assertTrue(os.path.basename(dir1) in names)
self.assertTrue(os.path.basename(d1f1) in names)
self.assertTrue(os.path.basename(d1f2) in names)
self.assertTrue(os.path.basename(d2) in names)
self.assertTrue(os.path.basename(dir2) in names)
self.assertTrue(os.path.basename(d2f1) in names)
self.assertTrue(os.path.basename(new1) in names)
self.assertTrue(os.path.basename(new2) in names)
@ -101,35 +169,37 @@ class TestIndexing(unittest.TestCase):
self.assertTrue(os.path.basename(new5) in names)
for node in storage.children:
if node.name == os.path.basename(d1):
if node.get_name() == os.path.basename(dir1):
self.assertTrue(len(node.children) == 3)
elif node.name == os.path.basename(d2):
self.assertTrue(len(node.children) == 2)
elif node.name == os.path.basename(new3):
elif node.get_name() == os.path.basename(dir2):
self.assertTrue(len(node.children) == 3)
elif node.get_name() == os.path.basename(new3):
self.assertTrue(len(node.children) == 0)
elif node.name == os.path.basename(new4):
elif node.get_name() == os.path.basename(new4):
self.assertTrue(len(node.children) == 1)
self.assertTrue(read_from_file(d1f1) == EDIT)
self.assertTrue(read_from_file(d1f1) == editval)
# remove some files
clean(d1f1)
clean(d2)
clean(dir2)
clean(new2)
clean(new4)
# update storage
cmd_update(args, noder, catalog, top, debug=True)
cmd_update(args, noder, catalog, top)
# ensures files and directories are (not) in
names = [node.name for node in anytree.PreOrderIter(storage)]
names = [node.get_name() for node in anytree.PreOrderIter(storage)]
print(names)
self.assertTrue(os.path.basename(f1) in names)
self.assertTrue(os.path.basename(f2) in names)
self.assertTrue(os.path.basename(f3) in names)
self.assertTrue(os.path.basename(d1) in names)
self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names)
self.assertTrue(os.path.basename(file3) in names)
self.assertTrue(os.path.basename(file4) in names)
self.assertTrue(os.path.basename(dir1) in names)
self.assertTrue(os.path.basename(d1f1) not in names)
self.assertTrue(os.path.basename(d1f2) in names)
self.assertTrue(os.path.basename(d2) not in names)
self.assertTrue(os.path.basename(dir2) not in names)
self.assertTrue(os.path.basename(d2f1) not in names)
self.assertTrue(os.path.basename(d2f1) not in names)
self.assertTrue(os.path.basename(new1) in names)
self.assertTrue(os.path.basename(new2) not in names)
@ -137,13 +207,14 @@ class TestIndexing(unittest.TestCase):
self.assertTrue(os.path.basename(new4) not in names)
self.assertTrue(os.path.basename(new5) not in names)
for node in storage.children:
if node.name == os.path.basename(d1):
if node.get_name() == os.path.basename(dir1):
self.assertTrue(len(node.children) == 2)
elif node.name == os.path.basename(new3):
elif node.get_name() == os.path.basename(new3):
self.assertTrue(len(node.children) == 0)
def main():
"""entry point"""
unittest.main()

Loading…
Cancel
Save