Compare commits

...

8 Commits

Author SHA1 Message Date
deadc0de6 3a082ee5ad bump version 4 months ago
deadc0de6 1e12caa770 update readme 4 months ago
deadc0de6 c9b4043e5f fix bugs 4 months ago
deadc0de6 e6ca6e2fcc bump version 5 months ago
deadc0de6 1f1ea8d37c env variables 5 months ago
deadc0de6 eb0a628ab6 fix dependencies 5 months ago
deadc0de d8eae3024e
Merge pull request #47 from michalkielan/add_env_var
Add option to override default catalog path
5 months ago
Michal Kielan bbfb60fe2e use optional env variable for catalog path 5 months ago

@ -12,6 +12,10 @@
*The command line catalog tool for your offline data*
> [!WARNING]
> catcli has been superseded by [gocatcli](https://github.com/deadc0de6/gocatcli/)
> which provides all features of catcli and more...
Did you ever wanted to find back that specific file that should be on one of your
backup DVDs or one of your external hard drives? You usually go through all
of them hoping to find the right one on the first try?
@ -134,6 +138,13 @@ Five different types of entry are present in a catalog:
* **file node**: this is a file
* **archive node**: this is a file contained in an archive (tar, zip, etc)
Following environment variables are supported:
* `CATCLI_CATALOG_PATH`: define the catalog path (`--catalog=<path>`)
* `CATCLI_NO_BANNER`: disable the banner (`--no-banner`)
* `CATCLI_VERBOSE`: enable verbose mode (`--verbose`)
* `CATCLI_FORMAT`: define the output format (`-F --format=<fmt>`)
## Index data
Let's say the DVD or external hard drive that needs to be indexed

@ -102,7 +102,7 @@ class Catalog:
if root.type != nodes.TYPE_TOP:
return None
top = NodeTop(root.name, children=root.children)
self._debug(f'top imported: {top.name}')
self._debug(f'top imported: {top.get_name()}')
return top

@ -25,15 +25,27 @@ from catcli.colors import Colors
from catcli.catalog import Catalog
from catcli.walker import Walker
from catcli.noder import Noder
from catcli.utils import ask, edit, path_to_search_all
from catcli.utils import ask, edit
from catcli.nodes_utils import path_to_search_all
from catcli.exceptions import BadFormatException, CatcliException
NAME = 'catcli'
CUR = os.path.dirname(os.path.abspath(__file__))
CATALOGPATH = f'{NAME}.catalog'
GRAPHPATH = f'/tmp/{NAME}.dot'
FORMATS = ['native', 'csv', 'csv-with-header', 'fzf-native', 'fzf-csv']
# env variables
ENV_CATALOG_PATH = 'CATCLI_CATALOG_PATH'
ENV_NOBANNER = 'CATCLI_NO_BANNER'
ENV_VERBOSE = 'CATCLI_VERBOSE'
ENV_FORMAT = 'CATCLI_FORMAT'
# default paths
DEFAULT_CATALOGPATH = os.getenv(ENV_CATALOG_PATH, default=f'{NAME}.catalog')
DEFAULT_GRAPHPATH = f'/tmp/{NAME}.dot'
DEFAULT_NOBANNER = os.getenv(ENV_NOBANNER) is not None
DEFAULT_VERBOSEMODE = os.getenv(ENV_VERBOSE) is not None
DEFAULT_FORMAT = os.getenv(ENV_FORMAT, default='native')
BANNER = f""" +-+-+-+-+-+-+
|c|a|t|c|l|i|
+-+-+-+-+-+-+ v{VERSION}"""
@ -64,22 +76,22 @@ Usage:
{NAME} --version
Options:
--catalog=<path> Path to the catalog [default: {CATALOGPATH}].
--catalog=<path> Path to the catalog [default: {DEFAULT_CATALOGPATH}].
--meta=<meta> Additional attribute to store [default: ].
-a --archive Handle archive file [default: False].
-B --no-banner Do not display the banner [default: False].
-B --no-banner Do not display the banner [default: {str(DEFAULT_NOBANNER)}].
-b --script Output script to manage found file(s) [default: False].
-C --no-color Do not output colors [default: False].
-c --hash Calculate md5 hash [default: False].
-d --directory Only directory [default: False].
-F --format=<fmt> see \"print_supported_formats\" [default: native].
-F --format=<fmt> see \"print_supported_formats\" [default: {DEFAULT_FORMAT}].
-f --force Do not ask when updating the catalog [default: False].
-l --lpath=<path> Path where changes are logged [default: ]
-p --path=<path> Start path.
-r --recursive Recursive [default: False].
-s --raw-size Print raw size [default: False].
-S --sortsize Sort by size, largest first [default: False].
-V --verbose Be verbose [default: False].
-V --verbose Be verbose [default: {str(DEFAULT_VERBOSEMODE)}].
-v --version Show version.
-h --help Show this screen.
""" # nopep8
@ -126,6 +138,8 @@ def cmd_index(args: Dict[str, Any],
node.parent = None
start = datetime.datetime.now()
if debug:
Logger.debug('debug mode enabled')
walker = Walker(noder, usehash=usehash, debug=debug)
attr = args['--meta']
root = noder.new_storage_node(name, path, top, attr)
@ -228,7 +242,7 @@ def cmd_find(args: Dict[str, Any],
script = args['--script']
search_for = args['<term>']
if args['--verbose']:
Logger.debug(f'search for \"{search_for}\" under \"{top.name}\"')
Logger.debug(f'search for "{search_for}" under "{top.get_name()}"')
found = noder.find(top, search_for,
script=script,
startnode=startpath,
@ -244,7 +258,7 @@ def cmd_graph(args: Dict[str, Any],
"""graph action"""
path = args['<path>']
if not path:
path = GRAPHPATH
path = DEFAULT_GRAPHPATH
cmd = noder.to_dot(top, path)
Logger.info(f'create graph with \"{cmd}\" (you need graphviz)')
@ -266,10 +280,10 @@ def cmd_rename(args: Dict[str, Any],
"""rename action"""
storage = args['<storage>']
new = args['<name>']
storages = list(x.name for x in top.children)
storages = list(x.get_name() for x in top.children)
if storage in storages:
node = next(filter(lambda x: x.name == storage, top.children))
node.name = new
node = next(filter(lambda x: x.get_name() == storage, top.children))
node.set_name(new)
if catalog.save(top):
msg = f'Storage \"{storage}\" renamed to \"{new}\"'
Logger.info(msg)
@ -283,9 +297,9 @@ def cmd_edit(args: Dict[str, Any],
top: NodeTop) -> None:
"""edit action"""
storage = args['<storage>']
storages = list(x.name for x in top.children)
storages = list(x.get_name() for x in top.children)
if storage in storages:
node = next(filter(lambda x: x.name == storage, top.children))
node = next(filter(lambda x: x.get_name() == storage, top.children))
attr = node.attr
if not attr:
attr = ''
@ -403,11 +417,12 @@ def init(argv: List[str]) -> Tuple[Dict[str, Any],
print_supported_formats()
sys.exit(0)
if args['--verbose']:
if args['--verbose'] or DEFAULT_VERBOSEMODE:
print('verbose mode enabled')
print(f'args: {args}')
# print banner
if not args['--no-banner']:
if not args['--no-banner'] and DEFAULT_NOBANNER:
banner()
# set colors

@ -17,7 +17,7 @@ except ModuleNotFoundError:
# local imports
from catcli.noder import Noder
from catcli.nodes import NodeTop, NodeAny
from catcli.utils import path_to_search_all, path_to_top
from catcli.nodes_utils import path_to_search_all, path_to_top
from catcli import nodes
@ -129,5 +129,5 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
content = ['.', '..']
entries = self._get_entries(path)
for entry in entries:
content.append(entry.name)
content.append(entry.get_name())
return content

@ -18,7 +18,7 @@ from catcli import nodes
from catcli.nodes import NodeAny, NodeStorage, \
NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta, \
typcast_node
from catcli.utils import md5sum, fix_badchars, has_attr
from catcli.utils import md5sum
from catcli.logger import Logger
from catcli.printer_native import NativePrinter
from catcli.printer_csv import CsvPrinter
@ -117,7 +117,7 @@ class Noder:
return node, False
# force re-indexing if no maccess
maccess = os.path.getmtime(path)
if not has_attr(node, 'maccess') or \
if not node.has_attr('maccess') or \
not node.maccess:
self._debug('\tchange: no maccess found')
return node, True
@ -336,7 +336,7 @@ class Noder:
typcast_node(node)
if node.type == nodes.TYPE_TOP:
# top node
self.native_printer.print_top(pre, node.name)
self.native_printer.print_top(pre, node.get_name())
elif node.type == nodes.TYPE_FILE:
# node of type file
self.native_printer.print_file(pre, node,
@ -420,7 +420,7 @@ class Noder:
continue
parents = rend.get_fullpath()
storage = rend.get_storage_node()
fullpath = os.path.join(storage.name, parents)
fullpath = os.path.join(storage.get_name(), parents)
the_nodes[fullpath] = rend
# prompt with fzf
paths = self._fzf_prompt(the_nodes.keys())
@ -477,7 +477,7 @@ class Noder:
paths = {}
for item in found:
typcast_node(item)
item.name = fix_badchars(item.name)
item.set_name(item.get_name())
key = item.get_fullpath()
paths[key] = item
@ -574,7 +574,7 @@ class Noder:
@fmt: output format
@raw: print raw size
"""
self._debug(f'ls walking path: \"{path}\" from \"{top.name}\"')
self._debug(f'ls walking path: \"{path}\" from \"{top.get_name()}\"')
resolv = anytree.resolver.Resolver('name')
found = []
try:
@ -633,7 +633,7 @@ class Noder:
path: str,
raw: bool = False) -> List[NodeAny]:
"""disk usage"""
self._debug(f'du walking path: \"{path}\" from \"{top.name}\"')
self._debug(f'du walking path: \"{path}\" from \"{top.get_name()}\"')
resolv = anytree.resolver.Resolver('name')
found: NodeAny
try:
@ -660,15 +660,15 @@ class Noder:
"""add an entry to the tree"""
entries = name.rstrip(os.sep).split(os.sep)
if len(entries) == 1:
self.new_archive_node(name, top, top.name)
self.new_archive_node(name, top, top.get_name())
return
sub = os.sep.join(entries[:-1])
nodename = entries[-1]
try:
parent = resolv.get(top, sub)
parent = self.new_archive_node(nodename, parent, top.name)
parent = self.new_archive_node(nodename, parent, top.get_name())
except anytree.resolver.ChildResolverError:
self.new_archive_node(nodename, top, top.name)
self.new_archive_node(nodename, top, top.get_name())
def list_to_tree(self, parent: NodeAny, names: List[str]) -> None:
"""convert list of files to a tree"""

@ -11,6 +11,7 @@ from typing import Dict, Any, cast
from anytree import NodeMixin
from catcli.exceptions import CatcliException
from catcli.utils import fix_badchars
TYPE_TOP = 'top'
@ -58,6 +59,18 @@ class NodeAny(NodeMixin): # type: ignore
if children:
self.children = children
def get_name(self) -> str:
"""get node name"""
return fix_badchars(self.name)
def set_name(self, name: str) -> None:
"""set node name"""
self.name = fix_badchars(name)
def has_attr(self, attr: str) -> bool:
"""return True if node has attr as attribute"""
return attr in self.__dict__
def may_have_children(self) -> bool:
"""can node contains sub"""
raise NotImplementedError
@ -75,12 +88,12 @@ class NodeAny(NodeMixin): # type: ignore
def get_fullpath(self) -> str:
"""return full path to this node"""
path = self.name
path = self.get_name()
if self.parent:
typcast_node(self.parent)
ppath = self.parent.get_fullpath()
path = os.path.join(ppath, path)
return str(path)
return fix_badchars(path)
def get_rec_size(self) -> int:
"""recursively traverse tree and return size"""

@ -0,0 +1,39 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2024, deadc0de6
nodes helpers
"""
import os
# local imports
from catcli import nodes
def path_to_top(path: str) -> str:
"""path pivot under top"""
pre = f"{os.path.sep}{nodes.NAME_TOP}"
if not path.startswith(pre):
# prepend with top node path
path = pre + path
return path
def path_to_search_all(path: str) -> str:
"""path to search for all subs"""
if not path:
path = os.path.sep
if not path.startswith(os.path.sep):
path = os.path.sep + path
pre = f"{os.path.sep}{nodes.NAME_TOP}"
if not path.startswith(pre):
# prepend with top node path
path = pre + path
# if not path.endswith(os.path.sep):
# # ensure ends with a separator
# path += os.path.sep
# if not path.endswith(WILD):
# # add wild card
# path += WILD
return path

@ -9,8 +9,7 @@ import sys
from typing import List
from catcli.nodes import NodeAny, NodeStorage, TYPE_DIR
from catcli.utils import size_to_str, epoch_to_str, \
has_attr
from catcli.utils import size_to_str, epoch_to_str
class CsvPrinter:
@ -35,7 +34,7 @@ class CsvPrinter:
raw: bool = False) -> None:
"""print a storage node"""
out = []
out.append(node.name) # name
out.append(node.get_name()) # name
out.append(node.type) # type
out.append('') # fake full path
size = node.get_rec_size()
@ -56,7 +55,7 @@ class CsvPrinter:
raw: bool = False) -> None:
"""print other nodes"""
out = []
out.append(node.name.replace('"', '""')) # name
out.append(node.get_name().replace('"', '""')) # name
out.append(node.type) # type
fullpath = node.get_fullpath()
out.append(fullpath.replace('"', '""')) # full path
@ -64,11 +63,11 @@ class CsvPrinter:
out.append(size_to_str(node.nodesize, raw=raw)) # size
storage = node.get_storage_node()
out.append(epoch_to_str(storage.ts)) # indexed_at
if has_attr(node, 'maccess'):
if node.has_attr('maccess'):
out.append(epoch_to_str(node.maccess)) # maccess
else:
out.append('') # fake maccess
if has_attr(node, 'md5'):
if node.has_attr('md5'):
out.append(node.md5) # md5
else:
out.append('') # fake md5

@ -12,7 +12,7 @@ from catcli.nodes import NodeFile, NodeDir, \
from catcli.colors import Colors
from catcli.logger import Logger
from catcli.utils import fix_badchars, size_to_str, \
has_attr, epoch_to_str
epoch_to_str
COLOR_STORAGE = Colors.YELLOW
@ -54,8 +54,7 @@ class NativePrinter:
raw: bool = False) -> None:
"""print a storage node"""
# construct name
name = node.name
name = fix_badchars(name)
name = node.get_name()
# construct attrs
attrs = []
# nb files
@ -74,7 +73,7 @@ class NativePrinter:
szused = size_to_str(node.total - node.free, raw=raw)
attrs.append(f'du:{szused}/{sztotal}')
# timestamp
if has_attr(node, 'ts'):
if node.has_attr('ts'):
attrs.append(f'date:{epoch_to_str(node.ts)}')
# print
@ -91,7 +90,7 @@ class NativePrinter:
raw: bool = False) -> None:
"""print a file node"""
# construct name
name = node.name
name = node.get_name()
storage = node.get_storage_node()
if withpath:
name = node.get_fullpath()
@ -100,7 +99,7 @@ class NativePrinter:
if node.md5:
attrs.append(f'md5:{node.md5}')
if withstorage:
content = Logger.get_bold_text(storage.name)
content = Logger.get_bold_text(storage.get_name())
attrs.append(f'storage:{content}')
# print
out = []
@ -111,7 +110,7 @@ class NativePrinter:
size = node.nodesize
line = size_to_str(size, raw=raw)
out.append(f'{COLOR_SIZE}{line}{Colors.RESET}')
if has_attr(node, 'maccess'):
if node.has_attr('maccess'):
line = epoch_to_str(node.maccess)
out.append(f'{COLOR_TS}{line}{Colors.RESET}')
if attrs:
@ -128,7 +127,7 @@ class NativePrinter:
raw: bool = False) -> None:
"""print a directory node"""
# construct name
name = node.name
name = node.get_name()
storage = node.get_storage_node()
if withpath:
name = node.get_fullpath()
@ -138,7 +137,7 @@ class NativePrinter:
nbchildren = len(node.children)
attrs.append(f'{self.NBFILES}:{nbchildren}')
if withstorage:
attrs.append(f'storage:{Logger.get_bold_text(storage.name)}')
attrs.append(f"storage:{Logger.get_bold_text(storage.get_name())}")
# print
out = []
out.append(f'{pre}')
@ -148,7 +147,7 @@ class NativePrinter:
size = node.nodesize
line = size_to_str(size, raw=raw)
out.append(f'{COLOR_SIZE}{line}{Colors.RESET}')
if has_attr(node, 'maccess'):
if node.has_attr('maccess'):
line = epoch_to_str(node.maccess)
out.append(f'{COLOR_TS}{line}{Colors.RESET}')
if attrs:

@ -10,43 +10,15 @@ import hashlib
import tempfile
import subprocess
import datetime
import string
# local imports
from catcli import nodes
from catcli.exceptions import CatcliException
WILD = '*'
def path_to_top(path: str) -> str:
"""path pivot under top"""
pre = f'{os.path.sep}{nodes.NAME_TOP}'
if not path.startswith(pre):
# prepend with top node path
path = pre + path
return path
def path_to_search_all(path: str) -> str:
"""path to search for all subs"""
if not path:
path = os.path.sep
if not path.startswith(os.path.sep):
path = os.path.sep + path
pre = f'{os.path.sep}{nodes.NAME_TOP}'
if not path.startswith(pre):
# prepend with top node path
path = pre + path
# if not path.endswith(os.path.sep):
# # ensure ends with a separator
# path += os.path.sep
# if not path.endswith(WILD):
# # add wild card
# path += WILD
return path
def md5sum(path: str) -> str:
"""
calculate md5 sum of a file
@ -101,24 +73,20 @@ def ask(question: str) -> bool:
return resp.lower() == 'y'
def edit(string: str) -> str:
def edit(data: str) -> str:
"""edit the information with the default EDITOR"""
data = string.encode('utf-8')
content = fix_badchars(data)
editor = os.environ.get('EDITOR', 'vim')
with tempfile.NamedTemporaryFile(prefix='catcli', suffix='.tmp') as file:
file.write(data)
file.write(content.encode('utf-8'))
file.flush()
subprocess.call([editor, file.name])
subprocess.call([editor, file.get_name()])
file.seek(0)
new = file.read()
return new.decode('utf-8')
def fix_badchars(string: str) -> str:
def fix_badchars(data: str) -> str:
"""fix none utf-8 chars in string"""
return string.encode('utf-8', 'ignore').decode('utf-8')
def has_attr(node: nodes.NodeAny, attr: str) -> bool:
"""return True if node has attr as attribute"""
return attr in node.__dict__.keys()
data = "".join(x for x in data if x in string.printable)
return data.encode("utf-8", "ignore").decode("utf-8")

@ -3,4 +3,4 @@ author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2022, deadc0de6
"""
__version__ = '0.10.0'
__version__ = '1.0'

@ -171,7 +171,7 @@ class Walker:
if node and changed:
# remove this node and re-add
self._debug(f'\t{path} has changed')
self._debug(f'\tremoving node {node.name} for {path}')
self._debug(f"\tremoving node {node.get_name()} for {path}")
node.parent = None
return True, node

@ -47,9 +47,9 @@ setup(
keywords='catalog commandline indexer offline',
packages=find_packages(exclude=['tests*']),
install_requires=['docopt', 'anytree',
'types-docopt', 'pyfzf',
'fusepy'],
install_requires=['docopt', 'types-docopt', 'anytree',
'pyfzf', 'fusepy', 'natsort', 'cmd2',
'gnureadline'],
extras_require={
'dev': ['check-manifest'],

@ -62,7 +62,7 @@ class TestIndexing(unittest.TestCase):
self.assertTrue(len(storage.children) == 5)
# ensures files and directories are in
names = [x.name for x in storage.children]
names = [x.get_name() for x in storage.children]
self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names)
self.assertTrue(os.path.basename(file3) in names)
@ -70,9 +70,9 @@ class TestIndexing(unittest.TestCase):
self.assertTrue(os.path.basename(dir2) in names)
for node in storage.children:
if node.name == os.path.basename(dir1):
if node.get_name() == os.path.basename(dir1):
self.assertTrue(len(node.children) == 2)
elif node.name == os.path.basename(dir2):
elif node.get_name() == os.path.basename(dir2):
self.assertTrue(len(node.children) == 1)

@ -151,7 +151,7 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(nod.md5 == d2f2_md5_new)
# ensures files and directories are in
names = [node.name for node in anytree.PreOrderIter(storage)]
names = [node.get_name() for node in anytree.PreOrderIter(storage)]
print(names)
self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names)
@ -169,13 +169,13 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(os.path.basename(new5) in names)
for node in storage.children:
if node.name == os.path.basename(dir1):
if node.get_name() == os.path.basename(dir1):
self.assertTrue(len(node.children) == 3)
elif node.name == os.path.basename(dir2):
elif node.get_name() == os.path.basename(dir2):
self.assertTrue(len(node.children) == 3)
elif node.name == os.path.basename(new3):
elif node.get_name() == os.path.basename(new3):
self.assertTrue(len(node.children) == 0)
elif node.name == os.path.basename(new4):
elif node.get_name() == os.path.basename(new4):
self.assertTrue(len(node.children) == 1)
self.assertTrue(read_from_file(d1f1) == editval)
@ -189,7 +189,7 @@ class TestUpdate(unittest.TestCase):
cmd_update(args, noder, catalog, top)
# ensures files and directories are (not) in
names = [node.name for node in anytree.PreOrderIter(storage)]
names = [node.get_name() for node in anytree.PreOrderIter(storage)]
print(names)
self.assertTrue(os.path.basename(file1) in names)
self.assertTrue(os.path.basename(file2) in names)
@ -207,9 +207,9 @@ class TestUpdate(unittest.TestCase):
self.assertTrue(os.path.basename(new4) not in names)
self.assertTrue(os.path.basename(new5) not in names)
for node in storage.children:
if node.name == os.path.basename(dir1):
if node.get_name() == os.path.basename(dir1):
self.assertTrue(len(node.children) == 2)
elif node.name == os.path.basename(new3):
elif node.get_name() == os.path.basename(new3):
self.assertTrue(len(node.children) == 0)

Loading…
Cancel
Save