diff --git a/tubearchivist/config/management/commands/ta_startup.py b/tubearchivist/config/management/commands/ta_startup.py index a74358de..0b40a2bf 100644 --- a/tubearchivist/config/management/commands/ta_startup.py +++ b/tubearchivist/config/management/commands/ta_startup.py @@ -14,6 +14,7 @@ from home.src.es.snapshot import ElasticSnapshot from home.src.ta.config import AppConfig, ReleaseVersion from home.src.ta.helper import clear_dl_cache from home.src.ta.ta_redis import RedisArchivist +from home.src.ta.task_manager import TaskManager TOPIC = """ @@ -35,6 +36,7 @@ class Command(BaseCommand): self._sync_redis_state() self._make_folders() self._release_locks() + self._clear_tasks() self._clear_dl_cache() self._version_check() self._mig_index_setup() @@ -96,9 +98,23 @@ class Command(BaseCommand): if not has_changed: self.stdout.write(self.style.SUCCESS(" no locks found")) + def _clear_tasks(self): + """clear tasks and messages""" + self.stdout.write("[4] clear task leftovers") + TaskManager().fail_pending() + redis_con = RedisArchivist() + to_delete = redis_con.list_keys("message:") + if to_delete: + for key in to_delete: + redis_con.del_message(key) + + self.stdout.write( + self.style.SUCCESS(f" ✓ cleared {len(to_delete)} messages") + ) + def _clear_dl_cache(self): """clear leftover files from dl cache""" - self.stdout.write("[4] clear leftover files from dl cache") + self.stdout.write("[5] clear leftover files from dl cache") config = AppConfig().config leftover_files = clear_dl_cache(config) if leftover_files: @@ -110,7 +126,7 @@ class Command(BaseCommand): def _version_check(self): """remove new release key if updated now""" - self.stdout.write("[5] check for first run after update") + self.stdout.write("[6] check for first run after update") new_version = ReleaseVersion().is_updated() if new_version: self.stdout.write( diff --git a/tubearchivist/home/src/ta/ta_redis.py b/tubearchivist/home/src/ta/ta_redis.py index 9c04fb0f..829c9ba8 100644 --- a/tubearchivist/home/src/ta/ta_redis.py +++ b/tubearchivist/home/src/ta/ta_redis.py @@ -57,18 +57,23 @@ class RedisArchivist(RedisBase): return json_str - def list_items(self, query): - """list all matches""" + def list_keys(self, query): + """return all key matches""" reply = self.conn.execute_command( "KEYS", self.NAME_SPACE + query + "*" ) - all_matches = [i.decode().lstrip(self.NAME_SPACE) for i in reply] - all_results = [] - for match in all_matches: - json_str = self.get_message(match) - all_results.append(json_str) + if not reply: + return False + + return [i.decode().lstrip(self.NAME_SPACE) for i in reply] + + def list_items(self, query): + """list all matches""" + all_matches = self.list_keys(query) + if not all_matches: + return False - return all_results + return [self.get_message(i) for i in all_matches] def del_message(self, key): """delete key from redis""" diff --git a/tubearchivist/home/src/ta/task_manager.py b/tubearchivist/home/src/ta/task_manager.py index 62fff923..c3d1a2a1 100644 --- a/tubearchivist/home/src/ta/task_manager.py +++ b/tubearchivist/home/src/ta/task_manager.py @@ -67,6 +67,17 @@ class TaskManager: } TaskRedis().set_key(task.request.id, message) + def fail_pending(self): + """ + mark all pending as failed, + run at startup to recover from hard reset + """ + all_results = self.get_all_results() + for result in all_results: + if result.get("status") == "PENDING": + result["status"] = "FAILED" + TaskRedis().set_key(result["task_id"], result, expire=True) + class TaskCommand: """run commands on task"""