mirror of https://github.com/deadc0de6/catcli
Compare commits
No commits in common. 'master' and 'v0.3' have entirely different histories.
@ -1 +0,0 @@
|
||||
ko_fi: deadc0de6
|
@ -1,7 +0,0 @@
|
||||
coverage:
|
||||
status:
|
||||
project:
|
||||
default:
|
||||
target: 90%
|
||||
threshold: 1%
|
||||
patch: off
|
@ -1,26 +0,0 @@
|
||||
name: Release to PyPI
|
||||
on:
|
||||
release:
|
||||
types: [created]
|
||||
jobs:
|
||||
pypi_publish:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: Set up Python 3.8
|
||||
uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: 3.8
|
||||
- name: Install Tools
|
||||
run: |
|
||||
sudo apt update
|
||||
python -m pip install --upgrade pip
|
||||
pip install setuptools wheel twine
|
||||
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
|
||||
- name: Build and Publish
|
||||
env:
|
||||
TWINE_USERNAME: __token__
|
||||
TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }}
|
||||
run: |
|
||||
python setup.py sdist bdist_wheel
|
||||
twine upload dist/*
|
@ -1,29 +0,0 @@
|
||||
name: tests
|
||||
on: [push, pull_request, workflow_dispatch]
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- name: Set up Python ${{ matrix.python-version }}
|
||||
uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install -r tests-requirements.txt
|
||||
pip install -r requirements.txt
|
||||
sudo apt-get -y install shellcheck jq
|
||||
- name: Run tests
|
||||
run: |
|
||||
./tests.sh
|
||||
- name: Upload coverage reports to Codecov
|
||||
uses: codecov/codecov-action@v3
|
||||
with:
|
||||
files: coverage.xml
|
||||
env:
|
||||
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
|
@ -1,5 +0,0 @@
|
||||
[mypy]
|
||||
strict = true
|
||||
disable_error_code = import-untyped,import-not-found
|
||||
ignore_missing_imports = True
|
||||
warn_unused_ignores = False
|
@ -0,0 +1,17 @@
|
||||
language: python
|
||||
python:
|
||||
- "3.3"
|
||||
- "3.4"
|
||||
- "3.5"
|
||||
- "3.6"
|
||||
- "nightly"
|
||||
install:
|
||||
- "pip install pycodestyle"
|
||||
- "pip install nose"
|
||||
- "pip install coverage"
|
||||
- "pip install coveralls"
|
||||
- "pip install -r requirements.txt"
|
||||
script:
|
||||
./tests.sh
|
||||
after_success:
|
||||
coveralls
|
@ -1,44 +0,0 @@
|
||||
"""
|
||||
author: deadc0de6 (https://github.com/deadc0de6)
|
||||
Copyright (c) 2022, deadc0de6
|
||||
|
||||
shell colors
|
||||
"""
|
||||
|
||||
from typing import TypeVar, Type
|
||||
|
||||
|
||||
CLASSTYPE = TypeVar('CLASSTYPE', bound='Colors')
|
||||
|
||||
|
||||
class Colors:
|
||||
"""shell colors"""
|
||||
|
||||
RED = '\033[91m'
|
||||
GREEN = '\033[92m'
|
||||
YELLOW = '\033[93m'
|
||||
PURPLE = '\033[1;35m'
|
||||
BLUE = '\033[94m'
|
||||
GRAY = '\033[0;37m'
|
||||
CYAN = '\033[36m'
|
||||
MAGENTA = '\033[95m'
|
||||
WHITE = '\033[97m'
|
||||
RESET = '\033[0m'
|
||||
EMPH = '\033[33m'
|
||||
BOLD = '\033[1m'
|
||||
UND = '\033[4m'
|
||||
|
||||
@classmethod
|
||||
def no_color(cls: Type[CLASSTYPE]) -> None:
|
||||
"""disable colors"""
|
||||
Colors.RED = ''
|
||||
Colors.GREEN = ''
|
||||
Colors.YELLOW = ''
|
||||
Colors.PURPLE = ''
|
||||
Colors.BLUE = ''
|
||||
Colors.GRAY = ''
|
||||
Colors.MAGENTA = ''
|
||||
Colors.RESET = ''
|
||||
Colors.EMPH = ''
|
||||
Colors.BOLD = ''
|
||||
Colors.UND = ''
|
@ -1,57 +0,0 @@
|
||||
"""
|
||||
author: deadc0de6 (https://github.com/deadc0de6)
|
||||
Copyright (c) 2017, deadc0de6
|
||||
|
||||
Catcli generic compressed data lister
|
||||
"""
|
||||
|
||||
import os
|
||||
import tarfile
|
||||
import zipfile
|
||||
from typing import List
|
||||
|
||||
|
||||
class Decomp:
|
||||
"""decompressor"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.ext = {
|
||||
'tar': self._tar,
|
||||
'tgz': self._tar,
|
||||
'gz': self._tar,
|
||||
'tar.gz': self._tar,
|
||||
'xz': self._tar,
|
||||
'tar.xz': self._tar,
|
||||
'lzma': self._tar,
|
||||
'tar.lzma': self._tar,
|
||||
'tlz': self._tar,
|
||||
'bz2': self._tar,
|
||||
'tar.bz2': self._tar,
|
||||
'zip': self._zip}
|
||||
|
||||
def get_formats(self) -> List[str]:
|
||||
"""return list of supported extensions"""
|
||||
return list(self.ext.keys())
|
||||
|
||||
def get_names(self, path: str) -> List[str]:
|
||||
"""get tree of compressed archive"""
|
||||
ext = os.path.splitext(path)[1][1:].lower()
|
||||
if ext in list(self.ext):
|
||||
return self.ext[ext](path)
|
||||
return []
|
||||
|
||||
@staticmethod
|
||||
def _tar(path: str) -> List[str]:
|
||||
"""return list of file names in tar"""
|
||||
if not tarfile.is_tarfile(path):
|
||||
return []
|
||||
with tarfile.open(path, "r") as tar:
|
||||
return tar.getnames()
|
||||
|
||||
@staticmethod
|
||||
def _zip(path: str) -> List[str]:
|
||||
"""return list of file names in zip"""
|
||||
if not zipfile.is_zipfile(path):
|
||||
return []
|
||||
with zipfile.ZipFile(path) as file:
|
||||
return file.namelist()
|
@ -1,14 +0,0 @@
|
||||
"""
|
||||
author: deadc0de6 (https://github.com/deadc0de6)
|
||||
Copyright (c) 2022, deadc0de6
|
||||
|
||||
Catcli exceptions
|
||||
"""
|
||||
|
||||
|
||||
class CatcliException(Exception):
|
||||
"""generic catcli exception"""
|
||||
|
||||
|
||||
class BadFormatException(CatcliException):
|
||||
"""use of bad format"""
|
@ -1,133 +0,0 @@
|
||||
"""
|
||||
author: deadc0de6 (https://github.com/deadc0de6)
|
||||
Copyright (c) 2023, deadc0de6
|
||||
|
||||
fuse for catcli
|
||||
"""
|
||||
|
||||
import os
|
||||
from time import time
|
||||
from stat import S_IFDIR, S_IFREG
|
||||
from typing import List, Dict, Any, Optional
|
||||
try:
|
||||
import fuse
|
||||
except ModuleNotFoundError:
|
||||
pass
|
||||
|
||||
# local imports
|
||||
from catcli.noder import Noder
|
||||
from catcli.nodes import NodeTop, NodeAny
|
||||
from catcli.nodes_utils import path_to_search_all, path_to_top
|
||||
from catcli import nodes
|
||||
|
||||
|
||||
class Fuser:
|
||||
"""fuse filesystem mounter"""
|
||||
|
||||
def __init__(self, mountpoint: str,
|
||||
top: NodeTop,
|
||||
noder: Noder,
|
||||
debug: bool = False):
|
||||
"""fuse filesystem"""
|
||||
filesystem = CatcliFilesystem(top, noder)
|
||||
fuse.FUSE(filesystem,
|
||||
mountpoint,
|
||||
foreground=debug,
|
||||
nothreads=True,
|
||||
debug=debug)
|
||||
|
||||
|
||||
class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
|
||||
"""in-memory filesystem for catcli catalog"""
|
||||
|
||||
def __init__(self, top: NodeTop,
|
||||
noder: Noder):
|
||||
"""init fuse filesystem"""
|
||||
self.top = top
|
||||
self.noder = noder
|
||||
|
||||
def _get_entry(self, path: str) -> Optional[NodeAny]:
|
||||
"""return the node pointed by path"""
|
||||
path = path_to_top(path)
|
||||
found = self.noder.list(self.top, path,
|
||||
rec=False,
|
||||
fmt='native',
|
||||
raw=True)
|
||||
if found:
|
||||
return found[0]
|
||||
return None
|
||||
|
||||
def _get_entries(self, path: str) -> List[NodeAny]:
|
||||
"""return nodes pointed by path"""
|
||||
path = path_to_search_all(path)
|
||||
found = self.noder.list(self.top, path,
|
||||
rec=False,
|
||||
fmt='native',
|
||||
raw=True)
|
||||
return found
|
||||
|
||||
def _getattr(self, path: str) -> Dict[str, Any]:
|
||||
entry = self._get_entry(path)
|
||||
if not entry:
|
||||
return {}
|
||||
|
||||
maccess = time()
|
||||
mode: Any = S_IFREG
|
||||
nodesize: int = 0
|
||||
if entry.type == nodes.TYPE_ARCHIVED:
|
||||
mode = S_IFREG
|
||||
nodesize = entry.nodesize
|
||||
elif entry.type == nodes.TYPE_DIR:
|
||||
mode = S_IFDIR
|
||||
nodesize = entry.nodesize
|
||||
maccess = entry.maccess
|
||||
elif entry.type == nodes.TYPE_FILE:
|
||||
mode = S_IFREG
|
||||
nodesize = entry.nodesize
|
||||
maccess = entry.maccess
|
||||
elif entry.type == nodes.TYPE_STORAGE:
|
||||
mode = S_IFDIR
|
||||
nodesize = entry.nodesize
|
||||
maccess = entry.ts
|
||||
elif entry.type == nodes.TYPE_META:
|
||||
mode = S_IFREG
|
||||
elif entry.type == nodes.TYPE_TOP:
|
||||
mode = S_IFREG
|
||||
mode = mode | 0o777
|
||||
return {
|
||||
'st_mode': (mode), # file type
|
||||
'st_nlink': 1, # count hard link
|
||||
'st_size': nodesize,
|
||||
'st_ctime': maccess, # attr last modified
|
||||
'st_mtime': maccess, # content last modified
|
||||
'st_atime': maccess, # access time
|
||||
'st_uid': os.getuid(),
|
||||
'st_gid': os.getgid(),
|
||||
}
|
||||
|
||||
def getattr(self, path: str, _fh: Any = None) -> Dict[str, Any]:
|
||||
"""return attr of file pointed by path"""
|
||||
if path == os.path.sep:
|
||||
# mountpoint
|
||||
curt = time()
|
||||
meta = {
|
||||
'st_mode': (S_IFDIR | 0o777),
|
||||
'st_nlink': 1,
|
||||
'st_size': 0,
|
||||
'st_ctime': curt,
|
||||
'st_mtime': curt,
|
||||
'st_atime': curt,
|
||||
'st_uid': os.getuid(),
|
||||
'st_gid': os.getgid(),
|
||||
}
|
||||
return meta
|
||||
meta = self._getattr(path)
|
||||
return meta
|
||||
|
||||
def readdir(self, path: str, _fh: Any) -> List[str]:
|
||||
"""read directory content"""
|
||||
content = ['.', '..']
|
||||
entries = self._get_entries(path)
|
||||
for entry in entries:
|
||||
content.append(entry.get_name())
|
||||
return content
|
@ -1,339 +0,0 @@
|
||||
"""
|
||||
author: deadc0de6 (https://github.com/deadc0de6)
|
||||
Copyright (c) 2023, deadc0de6
|
||||
|
||||
Class that represents a node in the catalog tree
|
||||
"""
|
||||
# pylint: disable=W0622
|
||||
|
||||
import os
|
||||
from typing import Dict, Any, cast
|
||||
from anytree import NodeMixin
|
||||
|
||||
from catcli.exceptions import CatcliException
|
||||
from catcli.utils import fix_badchars
|
||||
|
||||
|
||||
TYPE_TOP = 'top'
|
||||
TYPE_FILE = 'file'
|
||||
TYPE_DIR = 'dir'
|
||||
TYPE_ARCHIVED = 'arc'
|
||||
TYPE_STORAGE = 'storage'
|
||||
TYPE_META = 'meta'
|
||||
|
||||
NAME_TOP = 'top'
|
||||
NAME_META = 'meta'
|
||||
|
||||
|
||||
def typcast_node(node: Any) -> None:
|
||||
"""typecast node to its sub type"""
|
||||
if node.type == TYPE_TOP:
|
||||
node.__class__ = NodeTop
|
||||
elif node.type == TYPE_FILE:
|
||||
node.__class__ = NodeFile
|
||||
elif node.type == TYPE_DIR:
|
||||
node.__class__ = NodeDir
|
||||
elif node.type == TYPE_ARCHIVED:
|
||||
node.__class__ = NodeArchived
|
||||
elif node.type == TYPE_STORAGE:
|
||||
node.__class__ = NodeStorage
|
||||
elif node.type == TYPE_META:
|
||||
node.__class__ = NodeMeta
|
||||
else:
|
||||
raise CatcliException(f"bad node: {node}")
|
||||
|
||||
|
||||
class NodeAny(NodeMixin): # type: ignore
|
||||
"""generic node"""
|
||||
|
||||
def __init__(self, # type: ignore[no-untyped-def]
|
||||
name=None,
|
||||
size=0,
|
||||
parent=None,
|
||||
children=None):
|
||||
"""build generic node"""
|
||||
super().__init__()
|
||||
self.name = name
|
||||
self.nodesize = size
|
||||
self.parent = parent
|
||||
if children:
|
||||
self.children = children
|
||||
|
||||
def get_name(self) -> str:
|
||||
"""get node name"""
|
||||
return fix_badchars(self.name)
|
||||
|
||||
def set_name(self, name: str) -> None:
|
||||
"""set node name"""
|
||||
self.name = fix_badchars(name)
|
||||
|
||||
def has_attr(self, attr: str) -> bool:
|
||||
"""return True if node has attr as attribute"""
|
||||
return attr in self.__dict__
|
||||
|
||||
def may_have_children(self) -> bool:
|
||||
"""can node contains sub"""
|
||||
raise NotImplementedError
|
||||
|
||||
def _to_str(self) -> str:
|
||||
ret = str(self.__class__) + ": " + str(self.__dict__)
|
||||
if self.children:
|
||||
ret += '\n'
|
||||
for child in self.children:
|
||||
ret += f' child => {child}\n'
|
||||
return ret
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self._to_str()
|
||||
|
||||
def get_fullpath(self) -> str:
|
||||
"""return full path to this node"""
|
||||
path = self.get_name()
|
||||
if self.parent:
|
||||
typcast_node(self.parent)
|
||||
ppath = self.parent.get_fullpath()
|
||||
path = os.path.join(ppath, path)
|
||||
return fix_badchars(path)
|
||||
|
||||
def get_rec_size(self) -> int:
|
||||
"""recursively traverse tree and return size"""
|
||||
totsize: int = self.nodesize
|
||||
for node in self.children:
|
||||
typcast_node(node)
|
||||
totsize += node.get_rec_size()
|
||||
return totsize
|
||||
|
||||
def get_storage_node(self) -> NodeMixin:
|
||||
"""recursively traverse up to find storage"""
|
||||
return None
|
||||
|
||||
def flagged(self) -> bool:
|
||||
"""is flagged"""
|
||||
if not hasattr(self, '_flagged'):
|
||||
return False
|
||||
return self._flagged
|
||||
|
||||
def flag(self) -> None:
|
||||
"""flag a node"""
|
||||
self._flagged = True # pylint: disable=W0201
|
||||
|
||||
def unflag(self) -> None:
|
||||
"""unflag node"""
|
||||
self._flagged = False # pylint: disable=W0201
|
||||
delattr(self, '_flagged')
|
||||
|
||||
|
||||
class NodeTop(NodeAny):
|
||||
"""a top node"""
|
||||
|
||||
def __init__(self, # type: ignore[no-untyped-def]
|
||||
name: str,
|
||||
children=None):
|
||||
"""build a top node"""
|
||||
super().__init__() # type: ignore[no-untyped-call]
|
||||
self.name = name
|
||||
self.type = TYPE_TOP
|
||||
self.parent = None
|
||||
if children:
|
||||
self.children = children
|
||||
|
||||
def get_fullpath(self) -> str:
|
||||
"""return full path to this node"""
|
||||
return ''
|
||||
|
||||
def may_have_children(self) -> bool:
|
||||
"""can node contains sub"""
|
||||
return True
|
||||
|
||||
def get_rec_size(self) -> int:
|
||||
"""
|
||||
recursively traverse tree and return size
|
||||
also ensure to update the size on the way
|
||||
"""
|
||||
size = super().get_rec_size()
|
||||
self.nodesize = size
|
||||
return size
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self._to_str()
|
||||
|
||||
|
||||
class NodeFile(NodeAny):
|
||||
"""a file node"""
|
||||
|
||||
def __init__(self, # type: ignore[no-untyped-def]
|
||||
name: str,
|
||||
nodesize: int,
|
||||
md5: str,
|
||||
maccess: float,
|
||||
parent=None,
|
||||
children=None):
|
||||
"""build a file node"""
|
||||
super().__init__() # type: ignore[no-untyped-call]
|
||||
self.name = name
|
||||
self.type = TYPE_FILE
|
||||
self.nodesize = nodesize
|
||||
self.md5 = md5
|
||||
self.maccess = maccess
|
||||
self.parent = parent
|
||||
if children:
|
||||
self.children = children
|
||||
|
||||
def may_have_children(self) -> bool:
|
||||
"""can node contains sub"""
|
||||
return False
|
||||
|
||||
def get_storage_node(self) -> NodeAny:
|
||||
"""recursively traverse up to find storage"""
|
||||
return cast(NodeStorage, self.ancestors[1])
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self._to_str()
|
||||
|
||||
|
||||
class NodeDir(NodeAny):
|
||||
"""a directory node"""
|
||||
|
||||
def __init__(self, # type: ignore[no-untyped-def]
|
||||
name: str,
|
||||
nodesize: int,
|
||||
maccess: float,
|
||||
parent=None,
|
||||
children=None):
|
||||
"""build a directory node"""
|
||||
super().__init__() # type: ignore[no-untyped-call]
|
||||
self.name = name
|
||||
self.type = TYPE_DIR
|
||||
self.nodesize = nodesize
|
||||
self.maccess = maccess
|
||||
self.parent = parent
|
||||
if children:
|
||||
self.children = children
|
||||
|
||||
def may_have_children(self) -> bool:
|
||||
"""can node contains sub"""
|
||||
return True
|
||||
|
||||
def get_rec_size(self) -> int:
|
||||
"""
|
||||
recursively traverse tree and return size
|
||||
also ensure to update the size on the way
|
||||
"""
|
||||
size = super().get_rec_size()
|
||||
self.nodesize = size
|
||||
return size
|
||||
|
||||
def get_storage_node(self) -> NodeAny:
|
||||
"""recursively traverse up to find storage"""
|
||||
return cast(NodeStorage, self.ancestors[1])
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self._to_str()
|
||||
|
||||
|
||||
class NodeArchived(NodeAny):
|
||||
"""an archived node"""
|
||||
|
||||
def __init__(self, # type: ignore[no-untyped-def]
|
||||
name: str,
|
||||
nodesize: int,
|
||||
md5: str,
|
||||
archive: str,
|
||||
parent=None,
|
||||
children=None):
|
||||
"""build an archived node"""
|
||||
super().__init__() # type: ignore[no-untyped-call]
|
||||
self.name = name
|
||||
self.type = TYPE_ARCHIVED
|
||||
self.nodesize = nodesize
|
||||
self.md5 = md5
|
||||
self.archive = archive
|
||||
self.parent = parent
|
||||
if children:
|
||||
self.children = children
|
||||
|
||||
def may_have_children(self) -> bool:
|
||||
"""can node contains sub"""
|
||||
return False
|
||||
|
||||
def get_storage_node(self) -> NodeAny:
|
||||
"""recursively traverse up to find storage"""
|
||||
return cast(NodeStorage, self.ancestors[1])
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self._to_str()
|
||||
|
||||
|
||||
class NodeStorage(NodeAny):
|
||||
"""a storage node"""
|
||||
|
||||
def __init__(self, # type: ignore[no-untyped-def]
|
||||
name: str,
|
||||
free: int,
|
||||
total: int,
|
||||
nodesize: int,
|
||||
ts: float,
|
||||
attr: str,
|
||||
parent=None,
|
||||
children=None):
|
||||
"""build a storage node"""
|
||||
super().__init__() # type: ignore[no-untyped-call]
|
||||
self.name = name
|
||||
self.type = TYPE_STORAGE
|
||||
self.free = free
|
||||
self.total = total
|
||||
self.attr = attr
|
||||
self.nodesize = nodesize
|
||||
self.ts = ts # pylint: disable=C0103
|
||||
self.parent = parent
|
||||
if children:
|
||||
self.children = children
|
||||
|
||||
def may_have_children(self) -> bool:
|
||||
"""can node contains sub"""
|
||||
return True
|
||||
|
||||
def get_rec_size(self) -> int:
|
||||
"""
|
||||
recursively traverse tree and return size
|
||||
also ensure to update the size on the way
|
||||
"""
|
||||
size = super().get_rec_size()
|
||||
self.nodesize = size
|
||||
return size
|
||||
|
||||
def get_storage_node(self) -> NodeAny:
|
||||
"""recursively traverse up to find storage"""
|
||||
return self
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self._to_str()
|
||||
|
||||
|
||||
class NodeMeta(NodeAny):
|
||||
"""a meta node"""
|
||||
|
||||
def __init__(self, # type: ignore[no-untyped-def]
|
||||
name: str,
|
||||
attr: Dict[str, Any],
|
||||
parent=None,
|
||||
children=None):
|
||||
"""build a meta node"""
|
||||
super().__init__() # type: ignore[no-untyped-call]
|
||||
self.name = name
|
||||
self.type = TYPE_META
|
||||
self.attr = attr
|
||||
self.parent = parent
|
||||
if children:
|
||||
self.children = children
|
||||
|
||||
def may_have_children(self) -> bool:
|
||||
"""can node contains sub"""
|
||||
return False
|
||||
|
||||
def get_rec_size(self) -> int:
|
||||
"""recursively traverse tree and return size"""
|
||||
return 0
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self._to_str()
|
@ -1,39 +0,0 @@
|
||||
"""
|
||||
author: deadc0de6 (https://github.com/deadc0de6)
|
||||
Copyright (c) 2024, deadc0de6
|
||||
|
||||
nodes helpers
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
# local imports
|
||||
from catcli import nodes
|
||||
|
||||
|
||||
def path_to_top(path: str) -> str:
|
||||
"""path pivot under top"""
|
||||
pre = f"{os.path.sep}{nodes.NAME_TOP}"
|
||||
if not path.startswith(pre):
|
||||
# prepend with top node path
|
||||
path = pre + path
|
||||
return path
|
||||
|
||||
|
||||
def path_to_search_all(path: str) -> str:
|
||||
"""path to search for all subs"""
|
||||
if not path:
|
||||
path = os.path.sep
|
||||
if not path.startswith(os.path.sep):
|
||||
path = os.path.sep + path
|
||||
pre = f"{os.path.sep}{nodes.NAME_TOP}"
|
||||
if not path.startswith(pre):
|
||||
# prepend with top node path
|
||||
path = pre + path
|
||||
# if not path.endswith(os.path.sep):
|
||||
# # ensure ends with a separator
|
||||
# path += os.path.sep
|
||||
# if not path.endswith(WILD):
|
||||
# # add wild card
|
||||
# path += WILD
|
||||
return path
|
@ -1,81 +0,0 @@
|
||||
"""
|
||||
author: deadc0de6 (https://github.com/deadc0de6)
|
||||
Copyright (c) 2024, deadc0de6
|
||||
|
||||
Class for printing nodes in csv format
|
||||
"""
|
||||
|
||||
import sys
|
||||
from typing import List
|
||||
|
||||
from catcli.nodes import NodeAny, NodeStorage, TYPE_DIR
|
||||
from catcli.utils import size_to_str, epoch_to_str
|
||||
|
||||
|
||||
class CsvPrinter:
|
||||
"""a node printer class"""
|
||||
|
||||
DEFSEP = ','
|
||||
CSV_HEADER = ('name,type,path,size,indexed_at,'
|
||||
'maccess,md5,nbfiles,free_space,'
|
||||
'total_space,meta')
|
||||
|
||||
def _print_entries(self, entries: List[str], sep: str = DEFSEP) -> None:
|
||||
line = sep.join(['"' + o + '"' for o in entries])
|
||||
if len(line) > 0:
|
||||
sys.stdout.write(f'{line}\n')
|
||||
|
||||
def print_header(self) -> None:
|
||||
"""print csv header"""
|
||||
sys.stdout.write(f'{self.CSV_HEADER}\n')
|
||||
|
||||
def print_storage(self, node: NodeStorage,
|
||||
sep: str = DEFSEP,
|
||||
raw: bool = False) -> None:
|
||||
"""print a storage node"""
|
||||
out = []
|
||||
out.append(node.get_name()) # name
|
||||
out.append(node.type) # type
|
||||
out.append('') # fake full path
|
||||
size = node.get_rec_size()
|
||||
out.append(size_to_str(size, raw=raw)) # size
|
||||
out.append(epoch_to_str(node.ts)) # indexed_at
|
||||
out.append('') # fake maccess
|
||||
out.append('') # fake md5
|
||||
out.append(str(len(node.children))) # nbfiles
|
||||
# fake free_space
|
||||
out.append(size_to_str(node.free, raw=raw))
|
||||
# fake total_space
|
||||
out.append(size_to_str(node.total, raw=raw))
|
||||
out.append(node.attr) # meta
|
||||
self._print_entries(out, sep=sep)
|
||||
|
||||
def print_node(self, node: NodeAny,
|
||||
sep: str = DEFSEP,
|
||||
raw: bool = False) -> None:
|
||||
"""print other nodes"""
|
||||
out = []
|
||||
out.append(node.get_name().replace('"', '""')) # name
|
||||
out.append(node.type) # type
|
||||
fullpath = node.get_fullpath()
|
||||
out.append(fullpath.replace('"', '""')) # full path
|
||||
|
||||
out.append(size_to_str(node.nodesize, raw=raw)) # size
|
||||
storage = node.get_storage_node()
|
||||
out.append(epoch_to_str(storage.ts)) # indexed_at
|
||||
if node.has_attr('maccess'):
|
||||
out.append(epoch_to_str(node.maccess)) # maccess
|
||||
else:
|
||||
out.append('') # fake maccess
|
||||
if node.has_attr('md5'):
|
||||
out.append(node.md5) # md5
|
||||
else:
|
||||
out.append('') # fake md5
|
||||
if node.type == TYPE_DIR:
|
||||
out.append(str(len(node.children))) # nbfiles
|
||||
else:
|
||||
out.append('') # fake nbfiles
|
||||
out.append('') # fake free_space
|
||||
out.append('') # fake total_space
|
||||
out.append('') # fake meta
|
||||
self._print_entries(out, sep=sep)
|
@ -1,165 +0,0 @@
|
||||
"""
|
||||
author: deadc0de6 (https://github.com/deadc0de6)
|
||||
Copyright (c) 2022, deadc0de6
|
||||
|
||||
Class for printing nodes in native format
|
||||
"""
|
||||
|
||||
import sys
|
||||
|
||||
from catcli.nodes import NodeFile, NodeDir, \
|
||||
NodeStorage, NodeAny, typcast_node
|
||||
from catcli.colors import Colors
|
||||
from catcli.logger import Logger
|
||||
from catcli.utils import fix_badchars, size_to_str, \
|
||||
epoch_to_str
|
||||
|
||||
|
||||
COLOR_STORAGE = Colors.YELLOW
|
||||
COLOR_FILE = Colors.WHITE
|
||||
COLOR_DIRECTORY = Colors.BLUE
|
||||
COLOR_ARCHIVE = Colors.PURPLE
|
||||
COLOR_TS = Colors.CYAN
|
||||
COLOR_SIZE = Colors.GREEN
|
||||
|
||||
FULLPATH_IN_NAME = True
|
||||
|
||||
|
||||
class NativePrinter:
|
||||
"""a node printer class"""
|
||||
|
||||
STORAGE = 'storage'
|
||||
ARCHIVE = 'archive'
|
||||
NBFILES = 'nbfiles'
|
||||
|
||||
def print_du(self, node: NodeAny,
|
||||
raw: bool = False) -> None:
|
||||
"""print du style"""
|
||||
typcast_node(node)
|
||||
name = node.get_fullpath()
|
||||
size = node.nodesize
|
||||
|
||||
line = size_to_str(size, raw=raw).ljust(10, ' ')
|
||||
out = f'{COLOR_SIZE}{line}{Colors.RESET}'
|
||||
out += ' '
|
||||
out += f'{COLOR_FILE}{name}{Colors.RESET}'
|
||||
sys.stdout.write(f'{out}\n')
|
||||
|
||||
def print_top(self, pre: str, name: str) -> None:
|
||||
"""print top node"""
|
||||
sys.stdout.write(f'{pre}{name}\n')
|
||||
|
||||
def print_storage(self, pre: str,
|
||||
node: NodeStorage,
|
||||
raw: bool = False) -> None:
|
||||
"""print a storage node"""
|
||||
# construct name
|
||||
name = node.get_name()
|
||||
# construct attrs
|
||||
attrs = []
|
||||
# nb files
|
||||
attrs.append(f'nbfiles:{len(node.children)}')
|
||||
# the children size
|
||||
recsize = node.get_rec_size()
|
||||
sizestr = size_to_str(recsize, raw=raw)
|
||||
attrs.append(f'totsize:{sizestr}')
|
||||
# free
|
||||
pcent = 0.0
|
||||
if node.total > 0:
|
||||
pcent = node.free * 100 / node.total
|
||||
attrs.append(f'free:{pcent:.1f}%')
|
||||
# du
|
||||
sztotal = size_to_str(node.total, raw=raw)
|
||||
szused = size_to_str(node.total - node.free, raw=raw)
|
||||
attrs.append(f'du:{szused}/{sztotal}')
|
||||
# timestamp
|
||||
if node.has_attr('ts'):
|
||||
attrs.append(f'date:{epoch_to_str(node.ts)}')
|
||||
|
||||
# print
|
||||
out = f'{pre}{Colors.UND}{self.STORAGE}{Colors.RESET}: '
|
||||
out += f'{COLOR_STORAGE}{name}{Colors.RESET}'
|
||||
if attrs:
|
||||
out += f' [{Colors.WHITE}{"|".join(attrs)}{Colors.RESET}]'
|
||||
sys.stdout.write(f'{out}\n')
|
||||
|
||||
def print_file(self, pre: str,
|
||||
node: NodeFile,
|
||||
withpath: bool = False,
|
||||
withstorage: bool = False,
|
||||
raw: bool = False) -> None:
|
||||
"""print a file node"""
|
||||
# construct name
|
||||
name = node.get_name()
|
||||
storage = node.get_storage_node()
|
||||
if withpath:
|
||||
name = node.get_fullpath()
|
||||
# construct attributes
|
||||
attrs = []
|
||||
if node.md5:
|
||||
attrs.append(f'md5:{node.md5}')
|
||||
if withstorage:
|
||||
content = Logger.get_bold_text(storage.get_name())
|
||||
attrs.append(f'storage:{content}')
|
||||
# print
|
||||
out = []
|
||||
out.append(f'{pre}')
|
||||
out.append(f'{COLOR_FILE}{name}{Colors.RESET}')
|
||||
size = 0
|
||||
if node.nodesize:
|
||||
size = node.nodesize
|
||||
line = size_to_str(size, raw=raw)
|
||||
out.append(f'{COLOR_SIZE}{line}{Colors.RESET}')
|
||||
if node.has_attr('maccess'):
|
||||
line = epoch_to_str(node.maccess)
|
||||
out.append(f'{COLOR_TS}{line}{Colors.RESET}')
|
||||
if attrs:
|
||||
out.append(f'{Colors.GRAY}[{",".join(attrs)}]{Colors.RESET}')
|
||||
|
||||
out = [x for x in out if x]
|
||||
sys.stdout.write(f'{" ".join(out)}\n')
|
||||
|
||||
def print_dir(self, pre: str,
|
||||
node: NodeDir,
|
||||
withpath: bool = False,
|
||||
withstorage: bool = False,
|
||||
withnbchildren: bool = False,
|
||||
raw: bool = False) -> None:
|
||||
"""print a directory node"""
|
||||
# construct name
|
||||
name = node.get_name()
|
||||
storage = node.get_storage_node()
|
||||
if withpath:
|
||||
name = node.get_fullpath()
|
||||
# construct attrs
|
||||
attrs = []
|
||||
if withnbchildren:
|
||||
nbchildren = len(node.children)
|
||||
attrs.append(f'{self.NBFILES}:{nbchildren}')
|
||||
if withstorage:
|
||||
attrs.append(f"storage:{Logger.get_bold_text(storage.get_name())}")
|
||||
# print
|
||||
out = []
|
||||
out.append(f'{pre}')
|
||||
out.append(f'{COLOR_DIRECTORY}{name}{Colors.RESET}')
|
||||
size = 0
|
||||
if node.nodesize:
|
||||
size = node.nodesize
|
||||
line = size_to_str(size, raw=raw)
|
||||
out.append(f'{COLOR_SIZE}{line}{Colors.RESET}')
|
||||
if node.has_attr('maccess'):
|
||||
line = epoch_to_str(node.maccess)
|
||||
out.append(f'{COLOR_TS}{line}{Colors.RESET}')
|
||||
if attrs:
|
||||
out.append(f'{Colors.GRAY}[{",".join(attrs)}]{Colors.RESET}')
|
||||
|
||||
out = [x for x in out if x]
|
||||
sys.stdout.write(f'{" ".join(out)}\n')
|
||||
|
||||
def print_archive(self, pre: str,
|
||||
name: str, archive: str) -> None:
|
||||
"""print an archive"""
|
||||
name = fix_badchars(name)
|
||||
out = f'{pre}{COLOR_ARCHIVE}{name}{Colors.RESET} '
|
||||
out += f'{Colors.GRAY}[{self.ARCHIVE}:{archive}]{Colors.RESET}'
|
||||
sys.stdout.write(f'{out}\n')
|
@ -1,6 +0,0 @@
|
||||
"""
|
||||
author: deadc0de6 (https://github.com/deadc0de6)
|
||||
Copyright (c) 2022, deadc0de6
|
||||
"""
|
||||
|
||||
__version__ = '1.0'
|
@ -1,8 +1,3 @@
|
||||
docopt; python_version >= '3.0'
|
||||
types-docopt; python_version >= '3.0'
|
||||
anytree; python_version >= '3.0'
|
||||
pyfzf; python_version >= '3.0'
|
||||
fusepy; python_version >= '3.0'
|
||||
natsort; python_version >= '3.0'
|
||||
cmd2; python_version >= '3.0'
|
||||
gnureadline; python_version >= '3.0'
|
||||
psutil; python_version >= '3.0'
|
||||
|
@ -0,0 +1,11 @@
|
||||
[metadata]
|
||||
description-file = README.md
|
||||
license_file = LICENSE
|
||||
|
||||
[bdist_wheel]
|
||||
python-tag = py3
|
||||
|
||||
[files]
|
||||
extra_files =
|
||||
LICENSE
|
||||
README.md
|
@ -1,22 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# run me from the root of the package
|
||||
|
||||
source tests-ng/helper
|
||||
|
||||
|
||||
rm -f tests-ng/assets/github.catalog.json
|
||||
python3 -m catcli.catcli index -c github .github --catalog=tests-ng/assets/github.catalog.json
|
||||
|
||||
# edit catalog
|
||||
clean_catalog "tests-ng/assets/github.catalog.json"
|
||||
|
||||
# native
|
||||
python3 -m catcli.catcli ls -r -s -B --catalog=tests-ng/assets/github.catalog.json | \
|
||||
sed -e 's/free:.*%/free:0.0%/g' \
|
||||
-e 's/....-..-.. ..:..:../2023-03-09 16:20:59/g' \
|
||||
-e 's#du:[^|]* |#du:0/0 |#g' > tests-ng/assets/github.catalog.native.txt
|
||||
|
||||
# csv
|
||||
python3 -m catcli.catcli ls -r -s -B --catalog=tests-ng/assets/github.catalog.json --format=csv | \
|
||||
sed -e 's/"3","[^"]*","[^"]*",""/"3","0","0",""/g' | \
|
||||
sed 's/20..-..-.. ..:..:..//g' > tests-ng/assets/github.catalog.csv.txt
|
@ -1,6 +0,0 @@
|
||||
"github","storage","","4865","","","","3","0","0",""
|
||||
"FUNDING.yml","file","github/FUNDING.yml","17","","","0c6407a84d412c514007313fb3bca4de","","","",""
|
||||
"codecov.yml","file","github/codecov.yml","104","","","4203204f75b43cd4bf032402beb3359d","","","",""
|
||||
"workflows","dir","github/workflows","3082","","","","2","","",""
|
||||
"pypi-release.yml","file","github/workflows/pypi-release.yml","691","","","57699a7a6a03e20e864f220e19f8e197","","","",""
|
||||
"testing.yml","file","github/workflows/testing.yml","850","","","691df1a4d2f254b5cd04c152e7c6ccaf","","","",""
|
@ -1,65 +0,0 @@
|
||||
{
|
||||
"children": [
|
||||
{
|
||||
"attr": "",
|
||||
"children": [
|
||||
{
|
||||
"maccess": 1666206037.0786593,
|
||||
"md5": "0c6407a84d412c514007313fb3bca4de",
|
||||
"name": "FUNDING.yml",
|
||||
"size": 17,
|
||||
"type": "file"
|
||||
},
|
||||
{
|
||||
"maccess": 1704320710.7056112,
|
||||
"md5": "4203204f75b43cd4bf032402beb3359d",
|
||||
"name": "codecov.yml",
|
||||
"size": 104,
|
||||
"type": "file"
|
||||
},
|
||||
{
|
||||
"children": [
|
||||
{
|
||||
"maccess": 1666206037.078865,
|
||||
"md5": "57699a7a6a03e20e864f220e19f8e197",
|
||||
"name": "pypi-release.yml",
|
||||
"size": 691,
|
||||
"type": "file"
|
||||
},
|
||||
{
|
||||
"maccess": 1704403569.24789,
|
||||
"md5": "691df1a4d2f254b5cd04c152e7c6ccaf",
|
||||
"name": "testing.yml",
|
||||
"size": 850,
|
||||
"type": "file"
|
||||
}
|
||||
],
|
||||
"maccess": 1704320727.2641916,
|
||||
"name": "workflows",
|
||||
"size": 1541,
|
||||
"type": "dir"
|
||||
}
|
||||
],
|
||||
"free": 0,
|
||||
"name": "github",
|
||||
"size": 1662,
|
||||
"total": 0,
|
||||
"ts": 1704923096,
|
||||
"type": "storage"
|
||||
},
|
||||
{
|
||||
"attr": {
|
||||
"access": 1704923096,
|
||||
"access_version": "0.9.6",
|
||||
"created": 1704923096,
|
||||
"created_version": "0.9.6"
|
||||
},
|
||||
"name": "meta",
|
||||
"size": null,
|
||||
"type": "meta"
|
||||
}
|
||||
],
|
||||
"name": "top",
|
||||
"size": null,
|
||||
"type": "top"
|
||||
}
|
@ -1,136 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# author: deadc0de6 (https://github.com/deadc0de6)
|
||||
# Copyright (c) 2023, deadc0de6
|
||||
|
||||
set -e
|
||||
cur=$(cd "$(dirname "${0}")" && pwd)
|
||||
prev="${cur}/.."
|
||||
cd "${prev}"
|
||||
|
||||
# coverage
|
||||
bin="python3 -m catcli.catcli"
|
||||
if command -v coverage 2>/dev/null; then
|
||||
mkdir -p coverages/
|
||||
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
|
||||
fi
|
||||
|
||||
echo "current dir: $(pwd)"
|
||||
echo "pythonpath: ${PYTHONPATH}"
|
||||
echo "bin: ${bin}"
|
||||
${bin} --version
|
||||
|
||||
# get the helpers
|
||||
# shellcheck source=tests-ng/helper
|
||||
source "${cur}"/helper
|
||||
echo -e "$(tput setaf 6)==> RUNNING $(basename "${BASH_SOURCE[0]}") <==$(tput sgr0)"
|
||||
|
||||
##########################################################
|
||||
# the test
|
||||
##########################################################
|
||||
|
||||
# create temp dirs
|
||||
tmpd=$(mktemp -d)
|
||||
clear_on_exit "${tmpd}"
|
||||
|
||||
catalog="${tmpd}/catalog"
|
||||
|
||||
# index
|
||||
${bin} -B index -c --catalog="${catalog}" github .github
|
||||
clean_catalog "${catalog}"
|
||||
ls -laR .github
|
||||
cat "${catalog}"
|
||||
|
||||
#cat "${catalog}"
|
||||
echo ""
|
||||
|
||||
# compare keys
|
||||
echo "[+] compare keys"
|
||||
src="tests-ng/assets/github.catalog.json"
|
||||
src_keys="${tmpd}/src-keys"
|
||||
dst_keys="${tmpd}/dst-keys"
|
||||
cat "${src}" | jq '.. | keys?' | jq '.[]' | sort > "${src_keys}"
|
||||
cat "${catalog}" | jq '.. | keys?' | jq '.[]' | sort > "${dst_keys}"
|
||||
echo "src:"
|
||||
cat "${src_keys}"
|
||||
echo "dst:"
|
||||
cat "${dst_keys}"
|
||||
diff "${src_keys}" "${dst_keys}"
|
||||
echo "ok!"
|
||||
|
||||
# compare children 1
|
||||
echo "[+] compare children 1"
|
||||
src_keys="${tmpd}/src-child1"
|
||||
dst_keys="${tmpd}/dst-child1"
|
||||
cat "${src}" | jq '. | select(.type=="top") | .children | .[].name' | sort > "${src_keys}"
|
||||
cat "${catalog}" | jq '. | select(.type=="top") | .children | .[].name' | sort > "${dst_keys}"
|
||||
echo "src:"
|
||||
cat "${src_keys}"
|
||||
echo "dst:"
|
||||
cat "${dst_keys}"
|
||||
diff "${src_keys}" "${dst_keys}"
|
||||
echo "ok!"
|
||||
|
||||
# compare children 2
|
||||
echo "[+] compare children 2"
|
||||
src_keys="${tmpd}/src-child2"
|
||||
dst_keys="${tmpd}/dst-child2"
|
||||
cat "${src}" | jq '. | select(.type=="top") | .children | .[] | select(.name=="github") | .children | .[].name' | sort > "${src_keys}"
|
||||
cat "${catalog}" | jq '. | select(.type=="top") | .children | .[] | select(.name=="github") | .children | .[].name' | sort > "${dst_keys}"
|
||||
echo "src:"
|
||||
cat "${src_keys}"
|
||||
echo "dst:"
|
||||
cat "${dst_keys}"
|
||||
diff "${src_keys}" "${dst_keys}"
|
||||
echo "ok!"
|
||||
|
||||
# compare children 3
|
||||
echo "[+] compare children 3"
|
||||
src_keys="${tmpd}/src-child3"
|
||||
dst_keys="${tmpd}/dst-child3"
|
||||
cat "${src}" | jq '. | select(.type=="top") | .children | .[] | select(.name=="github") | .children | .[] | select(.name=="workflows") | .children | .[].name' | sort > "${src_keys}"
|
||||
cat "${catalog}" | jq '. | select(.type=="top") | .children | .[] | select(.name=="github") | .children | .[] | select(.name=="workflows") | .children | .[].name' | sort > "${dst_keys}"
|
||||
echo "src:"
|
||||
cat "${src_keys}"
|
||||
echo "dst:"
|
||||
cat "${dst_keys}"
|
||||
diff "${src_keys}" "${dst_keys}"
|
||||
echo "ok!"
|
||||
|
||||
# native
|
||||
echo "[+] compare native output"
|
||||
native="${tmpd}/native.txt"
|
||||
${bin} -B ls -s -r --format=native --catalog="${catalog}" > "${native}"
|
||||
mod="${tmpd}/native.mod.txt"
|
||||
cat "${native}" | sed -e 's/free:.*%/free:0.0%/g' \
|
||||
-e 's/....-..-.. ..:..:../2023-03-09 16:20:59/g' \
|
||||
-e 's#du:[^|]* |#du:0/0 |#g' > "${mod}"
|
||||
if command -v delta >/dev/null; then
|
||||
delta -s "tests-ng/assets/github.catalog.native.txt" "${mod}"
|
||||
fi
|
||||
diff --color=always "tests-ng/assets/github.catalog.native.txt" "${mod}"
|
||||
echo "ok!"
|
||||
|
||||
# csv
|
||||
echo "[+] compare csv output"
|
||||
csv="${tmpd}/csv.txt"
|
||||
${bin} -B ls -s -r --format=csv --catalog="${catalog}" > "${csv}"
|
||||
# modify created csv
|
||||
mod="${tmpd}/csv.mod.txt"
|
||||
cat "${csv}" | \
|
||||
sed -e 's/"3","[^"]*","[^"]*",""/"3","0","0",""/g' | \
|
||||
sed 's/20..-..-.. ..:..:..//g' > "${mod}"
|
||||
# modify original
|
||||
ori="${tmpd}/ori.mod.txt"
|
||||
cat "tests-ng/assets/github.catalog.csv.txt" | \
|
||||
sed 's/....-..-.. ..:..:..//g' | \
|
||||
sed 's/"3","[^"]*","[^"]*",""/"3","0","0",""/g' > "${ori}"
|
||||
if command -v delta >/dev/null; then
|
||||
delta -s "${ori}" "${mod}"
|
||||
fi
|
||||
diff "${ori}" "${mod}"
|
||||
echo "ok!"
|
||||
|
||||
# the end
|
||||
echo "test \"$(basename "$0")\" success"
|
||||
cd "${cur}"
|
||||
exit 0
|
@ -1,61 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# author: deadc0de6 (https://github.com/deadc0de6)
|
||||
# Copyright (c) 2023, deadc0de6
|
||||
|
||||
set -e
|
||||
cur=$(cd "$(dirname "${0}")" && pwd)
|
||||
prev="${cur}/.."
|
||||
cd "${prev}"
|
||||
|
||||
# coverage
|
||||
bin="python3 -m catcli.catcli"
|
||||
if command -v coverage 2>/dev/null; then
|
||||
mkdir -p coverages/
|
||||
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
|
||||
fi
|
||||
|
||||
echo "current dir: $(pwd)"
|
||||
echo "pythonpath: ${PYTHONPATH}"
|
||||
echo "bin: ${bin}"
|
||||
${bin} --version
|
||||
|
||||
# get the helpers
|
||||
# shellcheck source=tests-ng/helper
|
||||
source "${cur}"/helper
|
||||
echo -e "$(tput setaf 6)==> RUNNING $(basename "${BASH_SOURCE[0]}") <==$(tput sgr0)"
|
||||
|
||||
##########################################################
|
||||
# the test
|
||||
##########################################################
|
||||
|
||||
# create temp dirs
|
||||
tmpd=$(mktemp -d)
|
||||
clear_on_exit "${tmpd}"
|
||||
|
||||
catalog="${tmpd}/catalog"
|
||||
|
||||
# index
|
||||
${bin} -B index -c -f --catalog="${catalog}" github1 .github
|
||||
${bin} -B index -c -f --catalog="${catalog}" github2 .github
|
||||
clean_catalog "${catalog}"
|
||||
|
||||
#cat "${catalog}"
|
||||
echo ""
|
||||
|
||||
${bin} -B ls -r --catalog="${catalog}"
|
||||
|
||||
echo "finding \"testing.yml\""
|
||||
${bin} -B find --catalog="${catalog}" testing.yml
|
||||
cnt=$(${bin} -B find --catalog="${catalog}" testing.yml | wc -l)
|
||||
[ "${cnt}" != "2" ] && echo "should return 2 (not ${cnt})" && exit 1
|
||||
|
||||
echo "finding \"*.yml\""
|
||||
${bin} -B find --catalog="${catalog}" '*.yml'
|
||||
cnt=$(${bin} -B find --catalog="${catalog}" '*.yml' | wc -l)
|
||||
[ "${cnt}" != "8" ] && echo "should return 8 (not ${cnt})" && exit 1
|
||||
|
||||
# the end
|
||||
echo ""
|
||||
echo "test \"$(basename "$0")\" success"
|
||||
cd "${cur}"
|
||||
exit 0
|
@ -1,76 +0,0 @@
|
||||
# author: deadc0de6 (https://github.com/deadc0de6)
|
||||
# Copyright (c) 2023, deadc0de6
|
||||
#
|
||||
# file to be sourced from test scripts
|
||||
#
|
||||
|
||||
declare -a to_be_cleared
|
||||
|
||||
# add a file/directory to be cleared
|
||||
# on exit
|
||||
#
|
||||
# $1: file path to clear
|
||||
clear_on_exit()
|
||||
{
|
||||
local len="${#to_be_cleared[*]}"
|
||||
# shellcheck disable=SC2004
|
||||
to_be_cleared[${len}]="$1"
|
||||
if [ "${len}" = "0" ]; then
|
||||
# set trap
|
||||
trap on_exit EXIT
|
||||
fi
|
||||
}
|
||||
|
||||
# clear catalog stuff for testing
|
||||
# $1: catalog path
|
||||
clean_catalog()
|
||||
{
|
||||
sed -i 's/"free": .*,/"free": 0,/g' "${1}"
|
||||
sed -i 's/"total": .*,/"total": 0,/g' "${1}"
|
||||
}
|
||||
|
||||
# clear files
|
||||
on_exit()
|
||||
{
|
||||
for i in "${to_be_cleared[@]}"; do
|
||||
rm -rf "${i}"
|
||||
done
|
||||
}
|
||||
|
||||
# osx tricks
|
||||
# brew install coreutils gnu-sed
|
||||
if [[ $OSTYPE == 'darwin'* ]]; then
|
||||
mktemp() {
|
||||
gmktemp "$@"
|
||||
}
|
||||
stat() {
|
||||
gstat "$@"
|
||||
}
|
||||
sed() {
|
||||
gsed "$@"
|
||||
}
|
||||
wc() {
|
||||
gwc "$@"
|
||||
}
|
||||
date() {
|
||||
gdate "$@"
|
||||
}
|
||||
chmod() {
|
||||
gchmod "$@"
|
||||
}
|
||||
readlink() {
|
||||
greadlink "$@"
|
||||
}
|
||||
realpath() {
|
||||
grealpath "$@"
|
||||
}
|
||||
|
||||
export -f mktemp
|
||||
export -f stat
|
||||
export -f sed
|
||||
export -f wc
|
||||
export -f date
|
||||
export -f chmod
|
||||
export -f readlink
|
||||
export -f realpath
|
||||
fi
|
@ -1,59 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# author: deadc0de6 (https://github.com/deadc0de6)
|
||||
# Copyright (c) 2024, deadc0de6
|
||||
|
||||
set -e
|
||||
cur=$(cd "$(dirname "${0}")" && pwd)
|
||||
prev="${cur}/.."
|
||||
cd "${prev}"
|
||||
|
||||
# coverage
|
||||
bin="python3 -m catcli.catcli"
|
||||
if command -v coverage 2>/dev/null; then
|
||||
mkdir -p coverages/
|
||||
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
|
||||
fi
|
||||
|
||||
echo "current dir: $(pwd)"
|
||||
echo "pythonpath: ${PYTHONPATH}"
|
||||
echo "bin: ${bin}"
|
||||
${bin} --version
|
||||
|
||||
# get the helpers
|
||||
# shellcheck source=tests-ng/helper
|
||||
source "${cur}"/helper
|
||||
echo -e "$(tput setaf 6)==> RUNNING $(basename "${BASH_SOURCE[0]}") <==$(tput sgr0)"
|
||||
|
||||
##########################################################
|
||||
# the test
|
||||
##########################################################
|
||||
|
||||
# create temp dirs
|
||||
tmpd=$(mktemp -d)
|
||||
clear_on_exit "${tmpd}"
|
||||
|
||||
catalog="${tmpd}/catalog"
|
||||
|
||||
# index
|
||||
${bin} -B index -c -f --catalog="${catalog}" github1 .github
|
||||
${bin} -B index -c -f --catalog="${catalog}" github2 .github
|
||||
clean_catalog "${catalog}"
|
||||
|
||||
#cat "${catalog}"
|
||||
echo ""
|
||||
|
||||
${bin} -B ls -r --catalog="${catalog}"
|
||||
|
||||
${bin} -B ls --catalog="${catalog}" 'github1/*.yml'
|
||||
cnt=$(${bin} -B ls --catalog="${catalog}" 'github1/*.yml' | wc -l)
|
||||
[ "${cnt}" != "2" ] && echo "should return 2 (not ${cnt})" && exit 1
|
||||
|
||||
${bin} -B ls --catalog="${catalog}" 'github*/*.yml'
|
||||
cnt=$(${bin} -B ls --catalog="${catalog}" 'github*/*.yml' | wc -l)
|
||||
[ "${cnt}" != "4" ] && echo "should return 4 (not ${cnt})" && exit 1
|
||||
|
||||
# the end
|
||||
echo ""
|
||||
echo "test \"$(basename "$0")\" success"
|
||||
cd "${cur}"
|
||||
exit 0
|
@ -1,80 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# author: deadc0de6 (https://github.com/deadc0de6)
|
||||
# Copyright (c) 2021, deadc0de6
|
||||
|
||||
set -e
|
||||
cur=$(cd "$(dirname "${0}")" && pwd)
|
||||
prev="${cur}/.."
|
||||
cd "${prev}"
|
||||
|
||||
# coverage
|
||||
bin="python3 -m catcli.catcli"
|
||||
if command -v coverage 2>/dev/null; then
|
||||
mkdir -p coverages/
|
||||
bin="coverage run -p --data-file coverages/coverage --source=catcli -m catcli.catcli"
|
||||
fi
|
||||
|
||||
echo "current dir: $(pwd)"
|
||||
echo "pythonpath: ${PYTHONPATH}"
|
||||
echo "bin: ${bin}"
|
||||
${bin} --version
|
||||
|
||||
# get the helpers
|
||||
# shellcheck source=tests-ng/helper
|
||||
source "${cur}"/helper
|
||||
echo -e "$(tput setaf 6)==> RUNNING $(basename "${BASH_SOURCE[0]}") <==$(tput sgr0)"
|
||||
|
||||
##########################################################
|
||||
# the test
|
||||
##########################################################
|
||||
|
||||
# create temp dirs
|
||||
tmpd=$(mktemp -d)
|
||||
clear_on_exit "${tmpd}"
|
||||
tmpu="${tmpd}/dir2"
|
||||
mkdir -p "${tmpu}"
|
||||
|
||||
catalog="${tmpd}/catalog"
|
||||
|
||||
mkdir -p "${tmpd}/dir"
|
||||
echo "abc" > "${tmpd}/dir/a"
|
||||
|
||||
# index
|
||||
${bin} -B index --catalog="${catalog}" dir "${tmpd}/dir"
|
||||
${bin} -B ls --catalog="${catalog}"
|
||||
|
||||
# get attributes
|
||||
freeb=$(${bin} -B ls --catalog="${catalog}" | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g')
|
||||
dub=$(${bin} -B ls --catalog="${catalog}" | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g')
|
||||
dateb=$(${bin} -B ls --catalog="${catalog}" | grep date: | sed 's/^.*,date: \(.*\)$/\1/g')
|
||||
echo "before: free:${freeb} | du:${dub} | date:${dateb}"
|
||||
|
||||
# change content
|
||||
echo "abc" >> "${tmpd}/dir/a"
|
||||
echo "abc" > "${tmpd}/dir/b"
|
||||
|
||||
# move dir
|
||||
cp -r "${tmpd}/dir" "${tmpu}/"
|
||||
|
||||
# sleep to force date change
|
||||
sleep 1
|
||||
|
||||
# update
|
||||
${bin} -B update -f --catalog="${catalog}" dir "${tmpu}/dir"
|
||||
${bin} -B ls --catalog="${catalog}"
|
||||
|
||||
# get new attributes
|
||||
freea=$(${bin} -B ls --catalog="${catalog}" | grep free: | sed 's/^.*,free:\([^ ]*\).*$/\1/g')
|
||||
dua=$(${bin} -B ls --catalog="${catalog}" | grep du: | sed 's/^.*,du:\([^ ]*\).*$/\1/g')
|
||||
datea=$(${bin} -B ls --catalog="${catalog}" | grep date: | sed 's/^.*,date: \(.*\)$/\1/g')
|
||||
echo "after: free:${freea} | du:${dua} | date:${datea}"
|
||||
|
||||
# test they are all different
|
||||
[ "${freeb}" = "${freea}" ] && echo "WARNING free didn't change!"
|
||||
[ "${dub}" = "${dua}" ] && echo "WARNING du didn't change!"
|
||||
[ "${dateb}" = "${datea}" ] && echo "WARNING date didn't change!" && exit 1
|
||||
|
||||
# the end
|
||||
echo "test \"$(basename "$0")\" success"
|
||||
cd "${cur}"
|
||||
exit 0
|
@ -1,8 +0,0 @@
|
||||
pycodestyle; python_version >= '3.0'
|
||||
pyflakes; python_version >= '3.0'
|
||||
nose2; python_version >= '3.0'
|
||||
coverage; python_version >= '3.0'
|
||||
pylint; python_version > '3.0'
|
||||
mypy; python_version > '3.0'
|
||||
pytest; python_version > '3.0'
|
||||
pytype; python_version > '3.0'
|
@ -1,29 +0,0 @@
|
||||
"""
|
||||
author: deadc0de6 (https://github.com/deadc0de6)
|
||||
Copyright (c) 2017, deadc0de6
|
||||
|
||||
Basic unittest for ls
|
||||
"""
|
||||
|
||||
import unittest
|
||||
|
||||
from catcli.decomp import Decomp
|
||||
|
||||
|
||||
class TestDecomp(unittest.TestCase):
|
||||
"""test ls"""
|
||||
|
||||
def test_list(self):
|
||||
"""test decomp formats"""
|
||||
dec = Decomp()
|
||||
formats = dec.get_formats()
|
||||
self.assertTrue('zip' in formats)
|
||||
|
||||
|
||||
def main():
|
||||
"""entry point"""
|
||||
unittest.main()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,222 +0,0 @@
|
||||
"""
|
||||
author: deadc0de6 (https://github.com/deadc0de6)
|
||||
Copyright (c) 2017, deadc0de6
|
||||
|
||||
Basic unittest for updating an index
|
||||
"""
|
||||
|
||||
import unittest
|
||||
import os
|
||||
import anytree
|
||||
|
||||
from catcli.catcli import cmd_index, cmd_update
|
||||
from catcli.noder import Noder
|
||||
from catcli.catalog import Catalog
|
||||
from tests.helpers import create_dir, create_rnd_file, get_tempdir, \
|
||||
clean, unix_tree, edit_file, read_from_file, md5sum
|
||||
|
||||
|
||||
class TestUpdate(unittest.TestCase):
|
||||
"""test update"""
|
||||
|
||||
def test_update(self):
|
||||
"""test update"""
|
||||
# init
|
||||
workingdir = get_tempdir()
|
||||
catalogpath = create_rnd_file(workingdir, 'catalog.json', content='')
|
||||
self.addCleanup(clean, workingdir)
|
||||
|
||||
dirpath = get_tempdir()
|
||||
self.addCleanup(clean, dirpath)
|
||||
|
||||
# create 3 files
|
||||
file1 = create_rnd_file(dirpath, 'file1')
|
||||
file2 = create_rnd_file(dirpath, 'file2')
|
||||
file3 = create_rnd_file(dirpath, 'file3')
|
||||
file4 = create_rnd_file(dirpath, 'file4')
|
||||
|
||||
# create 2 directories
|
||||
dir1 = create_dir(dirpath, 'dir1')
|
||||
dir2 = create_dir(dirpath, 'dir2')
|
||||
|
||||
# fill directories with files
|
||||
d1f1 = create_rnd_file(dir1, 'dir1file1')
|
||||
d1f2 = create_rnd_file(dir1, 'dir1file2')
|
||||
d2f1 = create_rnd_file(dir2, 'dir2file1')
|
||||
d2f2 = create_rnd_file(dir2, 'dir2file2')
|
||||
|
||||
noder = Noder(debug=True)
|
||||
noder.do_hashing(True)
|
||||
top = noder.new_top_node()
|
||||
catalog = Catalog(catalogpath, force=True, debug=False)
|
||||
|
||||
# get checksums
|
||||
f4_md5 = md5sum(file4)
|
||||
self.assertTrue(f4_md5)
|
||||
d1f1_md5 = md5sum(d1f1)
|
||||
self.assertTrue(d1f1_md5)
|
||||
d2f2_md5 = md5sum(d2f2)
|
||||
self.assertTrue(d2f2_md5)
|
||||
|
||||
# create fake args
|
||||
tmpdirname = 'tmpdir'
|
||||
args = {'<path>': dirpath, '<name>': tmpdirname,
|
||||
'--hash': True, '--meta': ['some meta'],
|
||||
'--verbose': True, '--lpath': None}
|
||||
|
||||
# index the directory
|
||||
unix_tree(dirpath)
|
||||
cmd_index(args, noder, catalog, top)
|
||||
self.assertTrue(os.stat(catalogpath).st_size != 0)
|
||||
|
||||
# ensure md5 sum are in
|
||||
nods = noder.find(top, os.path.basename(file4))
|
||||
self.assertEqual(len(nods), 1)
|
||||
nod = nods[0]
|
||||
self.assertTrue(nod)
|
||||
self.assertEqual(nod.md5, f4_md5)
|
||||
|
||||
# print catalog
|
||||
noder.print_tree(top)
|
||||
|
||||
# add some files and directories
|
||||
new1 = create_rnd_file(dir1, 'newf1')
|
||||
new2 = create_rnd_file(dirpath, 'newf2')
|
||||
new3 = create_dir(dirpath, 'newd3')
|
||||
new4 = create_dir(dir2, 'newd4')
|
||||
new5 = create_rnd_file(new4, 'newf5')
|
||||
unix_tree(dirpath)
|
||||
|
||||
# modify files
|
||||
editval = 'edited'
|
||||
edit_file(d1f1, editval)
|
||||
d1f1_md5_new = md5sum(d1f1)
|
||||
self.assertTrue(d1f1_md5_new)
|
||||
self.assertTrue(d1f1_md5_new != d1f1_md5)
|
||||
|
||||
# change file without mtime
|
||||
maccess = os.path.getmtime(file4)
|
||||
editval = 'edited'
|
||||
edit_file(file4, editval)
|
||||
# reset edit time
|
||||
os.utime(file4, (maccess, maccess))
|
||||
f4_md5_new = md5sum(d1f1)
|
||||
self.assertTrue(f4_md5_new)
|
||||
self.assertTrue(f4_md5_new != f4_md5)
|
||||
|
||||
# change file without mtime
|
||||
maccess = os.path.getmtime(d2f2)
|
||||
editval = 'edited'
|
||||
edit_file(d2f2, editval)
|
||||
# reset edit time
|
||||
os.utime(d2f2, (maccess, maccess))
|
||||
d2f2_md5_new = md5sum(d2f2)
|
||||
self.assertTrue(d2f2_md5_new)
|
||||
self.assertTrue(d2f2_md5_new != d2f2_md5)
|
||||
|
||||
# update storage
|
||||
cmd_update(args, noder, catalog, top)
|
||||
|
||||
# print catalog
|
||||
# print(read_from_file(catalogpath))
|
||||
noder.print_tree(top)
|
||||
|
||||
# explore the top node to find all nodes
|
||||
self.assertEqual(len(top.children), 1)
|
||||
storage = top.children[0]
|
||||
self.assertEqual(len(storage.children), 8)
|
||||
|
||||
# ensure d1f1 md5 sum has changed in catalog
|
||||
nods = noder.find(top, os.path.basename(d1f1))
|
||||
self.assertTrue(len(nods) == 1)
|
||||
nod = nods[0]
|
||||
self.assertTrue(nod)
|
||||
self.assertTrue(nod.md5 != d1f1_md5)
|
||||
self.assertTrue(nod.md5 == d1f1_md5_new)
|
||||
|
||||
# ensure f4 md5 sum has changed in catalog
|
||||
nods = noder.find(top, os.path.basename(file4))
|
||||
self.assertTrue(len(nods) == 1)
|
||||
nod = nods[0]
|
||||
self.assertTrue(nod)
|
||||
self.assertTrue(nod.md5 != f4_md5)
|
||||
self.assertTrue(nod.md5 == f4_md5_new)
|
||||
|
||||
# ensure d2f2 md5 sum has changed in catalog
|
||||
nods = noder.find(top, os.path.basename(d2f2))
|
||||
self.assertTrue(len(nods) == 1)
|
||||
nod = nods[0]
|
||||
self.assertTrue(nod)
|
||||
self.assertTrue(nod.md5 != d2f2_md5)
|
||||
self.assertTrue(nod.md5 == d2f2_md5_new)
|
||||
|
||||
# ensures files and directories are in
|
||||
names = [node.get_name() for node in anytree.PreOrderIter(storage)]
|
||||
print(names)
|
||||
self.assertTrue(os.path.basename(file1) in names)
|
||||
self.assertTrue(os.path.basename(file2) in names)
|
||||
self.assertTrue(os.path.basename(file3) in names)
|
||||
self.assertTrue(os.path.basename(file4) in names)
|
||||
self.assertTrue(os.path.basename(dir1) in names)
|
||||
self.assertTrue(os.path.basename(d1f1) in names)
|
||||
self.assertTrue(os.path.basename(d1f2) in names)
|
||||
self.assertTrue(os.path.basename(dir2) in names)
|
||||
self.assertTrue(os.path.basename(d2f1) in names)
|
||||
self.assertTrue(os.path.basename(new1) in names)
|
||||
self.assertTrue(os.path.basename(new2) in names)
|
||||
self.assertTrue(os.path.basename(new3) in names)
|
||||
self.assertTrue(os.path.basename(new4) in names)
|
||||
self.assertTrue(os.path.basename(new5) in names)
|
||||
|
||||
for node in storage.children:
|
||||
if node.get_name() == os.path.basename(dir1):
|
||||
self.assertTrue(len(node.children) == 3)
|
||||
elif node.get_name() == os.path.basename(dir2):
|
||||
self.assertTrue(len(node.children) == 3)
|
||||
elif node.get_name() == os.path.basename(new3):
|
||||
self.assertTrue(len(node.children) == 0)
|
||||
elif node.get_name() == os.path.basename(new4):
|
||||
self.assertTrue(len(node.children) == 1)
|
||||
self.assertTrue(read_from_file(d1f1) == editval)
|
||||
|
||||
# remove some files
|
||||
clean(d1f1)
|
||||
clean(dir2)
|
||||
clean(new2)
|
||||
clean(new4)
|
||||
|
||||
# update storage
|
||||
cmd_update(args, noder, catalog, top)
|
||||
|
||||
# ensures files and directories are (not) in
|
||||
names = [node.get_name() for node in anytree.PreOrderIter(storage)]
|
||||
print(names)
|
||||
self.assertTrue(os.path.basename(file1) in names)
|
||||
self.assertTrue(os.path.basename(file2) in names)
|
||||
self.assertTrue(os.path.basename(file3) in names)
|
||||
self.assertTrue(os.path.basename(file4) in names)
|
||||
self.assertTrue(os.path.basename(dir1) in names)
|
||||
self.assertTrue(os.path.basename(d1f1) not in names)
|
||||
self.assertTrue(os.path.basename(d1f2) in names)
|
||||
self.assertTrue(os.path.basename(dir2) not in names)
|
||||
self.assertTrue(os.path.basename(d2f1) not in names)
|
||||
self.assertTrue(os.path.basename(d2f1) not in names)
|
||||
self.assertTrue(os.path.basename(new1) in names)
|
||||
self.assertTrue(os.path.basename(new2) not in names)
|
||||
self.assertTrue(os.path.basename(new3) in names)
|
||||
self.assertTrue(os.path.basename(new4) not in names)
|
||||
self.assertTrue(os.path.basename(new5) not in names)
|
||||
for node in storage.children:
|
||||
if node.get_name() == os.path.basename(dir1):
|
||||
self.assertTrue(len(node.children) == 2)
|
||||
elif node.get_name() == os.path.basename(new3):
|
||||
self.assertTrue(len(node.children) == 0)
|
||||
|
||||
|
||||
def main():
|
||||
"""entry point"""
|
||||
unittest.main()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Loading…
Reference in New Issue