improve re-indexing and add more tests

pull/6/head
deadc0de6 6 years ago
parent 8eed881f5d
commit d7ea943c5f

@ -24,12 +24,14 @@ class Walker:
def index(self, path, parent, name):
'''index a directory and store in tree'''
self._debug('indexing starting at {}'.format(path))
if not parent:
parent = noder.dir_node(name, path, parent)
cnt = 0
for (root, dirs, files) in os.walk(path):
for f in files:
self._debug('found file {} under {}'.format(f, path))
sub = os.path.join(root, f)
self._log(f)
self._debug('index file {}'.format(sub))
@ -37,6 +39,7 @@ class Walker:
parent, path)
cnt += 1
for d in dirs:
self._debug('found dir {} under {}'.format(f, path))
base = os.path.basename(d)
sub = os.path.join(root, d)
self._debug('index directory {}'.format(sub))
@ -50,10 +53,11 @@ class Walker:
def reindex(self, path, parent, top):
'''reindex a directory and store in tree'''
self._debug('reindexing starting at {}'.format(path))
cnt = 0
for (root, dirs, files) in os.walk(path):
for f in files:
self._debug('found file {}'.format(f))
self._debug('found file {} under {}'.format(f, path))
sub = os.path.join(root, f)
maccess = os.path.getmtime(sub)
reindex, _ = self._need_reindex(parent, f, maccess)
@ -66,7 +70,7 @@ class Walker:
parent, path)
cnt += 1
for d in dirs:
self._debug('found dir {}'.format(d))
self._debug('found dir {} under {}'.format(d, path))
base = os.path.basename(d)
sub = os.path.join(root, d)
maccess = os.path.getmtime(sub)
@ -74,8 +78,9 @@ class Walker:
if reindex:
self._debug('\tre-index directory {}'.format(sub))
dummy = self.noder.dir_node(base, sub, parent, path)
cnt2 = self.reindex(sub, dummy, top)
cnt += cnt2
self._debug('reindexing deeper under {}'.format(sub))
cnt2 = self.reindex(sub, dummy, top)
cnt += cnt2
break
self._log(None)
return cnt

@ -10,6 +10,7 @@ import string
import random
import tempfile
import shutil
import subprocess
TMPSUFFIX = '.catcli'
@ -35,6 +36,21 @@ def clean(path):
else:
os.remove(path)
def edit_file(path, newcontent):
if not os.path.exists(path):
write_to_file(path, newcontent)
else:
write_to_file(path, newcontent)
def unix_tree(path):
if not os.path.exists(path):
return
cmd = ['tree', path]
subprocess.call(cmd)
############################################################
# catcli specific
############################################################
@ -82,9 +98,21 @@ def create_rnd_file(path, filename, content=None):
if not content:
content = get_rnd_string(100)
fpath = os.path.join(path, filename)
with open(fpath, 'w') as f:
return write_to_file(fpath, content)
def write_to_file(path, content):
with open(path, 'w') as f:
f.write(content)
return fpath
return path
def read_from_file(path):
if not os.path.exists(path):
return ''
with open(path, 'r') as f:
content = f.read()
return content
############################################################

@ -0,0 +1,112 @@
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2017, deadc0de6
Basic unittest for updating an index
"""
import unittest
from catcli.catcli import *
from catcli.noder import Noder
from catcli.walker import Walker
from catcli.catalog import Catalog
from tests.helpers import *
class TestIndexing(unittest.TestCase):
def test_index(self):
# init
workingdir = get_tempdir()
catalogpath = create_rnd_file(workingdir, 'catalog.json', content='')
self.addCleanup(clean, workingdir)
dirpath = get_tempdir()
self.addCleanup(clean, dirpath)
# create 3 files
f1 = create_rnd_file(dirpath, 'file1')
f2 = create_rnd_file(dirpath, 'file2')
f3 = create_rnd_file(dirpath, 'file3')
# create 2 directories
d1 = create_dir(dirpath, 'dir1')
d2 = create_dir(dirpath, 'dir2')
# fill directories with files
d1f1 = create_rnd_file(d1, 'dir1file1')
d1f2 = create_rnd_file(d1, 'dir1file2')
d2f1 = create_rnd_file(d2, 'dir2file1')
noder = Noder()
top = noder.new_top_node()
walker = Walker(noder)
catalog = Catalog(catalogpath, force=True, verbose=False)
# create fake args
tmpdirname = 'tmpdir'
args = {'<path>': dirpath, '<name>': tmpdirname,
'--hash': True, '--meta': 'some meta',
'--subsize': True, '--verbose': True}
# index the directory
unix_tree(dirpath)
cmd_index(args, noder, catalog, top, debug=True)
self.assertTrue(os.stat(catalogpath).st_size != 0)
# print catalog
noder.print_tree(top)
# add some files and directories
new1 = create_rnd_file(d1, 'newf1')
new2 = create_rnd_file(dirpath, 'newf2')
new3 = create_dir(dirpath, 'newd3')
new4 = create_dir(d2, 'newd4')
new5 = create_rnd_file(new4, 'newf5')
unix_tree(dirpath)
# modify files
EDIT = 'edited'
edit_file(d1f1, EDIT)
# update storage
cmd_update(args, noder, catalog, top, debug=True)
# print catalog
# print(read_from_file(catalogpath))
noder.print_tree(top)
# explore the top node to find all nodes
self.assertTrue(len(top.children) == 1)
storage = top.children[0]
self.assertTrue(len(storage.children) == 7)
# ensures files and directories are in
names = [x.name for x in storage.children]
self.assertTrue(os.path.basename(f1) in names)
self.assertTrue(os.path.basename(f2) in names)
self.assertTrue(os.path.basename(f3) in names)
self.assertTrue(os.path.basename(d1) in names)
self.assertTrue(os.path.basename(d2) in names)
self.assertTrue(os.path.basename(new3) in names)
self.assertTrue(os.path.basename(new2) in names)
for node in storage.children:
if node.name == os.path.basename(d1):
self.assertTrue(len(node.children) == 3)
elif node.name == os.path.basename(d2):
self.assertTrue(len(node.children) == 2)
elif node.name == os.path.basename(new3):
self.assertTrue(len(node.children) == 0)
elif node.name == os.path.basename(new4):
self.assertTrue(len(node.children) == 1)
self.assertTrue(read_from_file(d1f1) == EDIT)
def main():
unittest.main()
if __name__ == '__main__':
main()
Loading…
Cancel
Save