From 6328e316f4a55b5c512fb0658e34ca1fed0962ff Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 1 Mar 2023 18:13:51 +0700 Subject: [PATCH] add task manager integration --- tubearchivist/home/src/ta/ta_redis.py | 25 ++++++++++++ tubearchivist/home/src/ta/task_manager.py | 46 +++++++++++++++++++++++ 2 files changed, 71 insertions(+) create mode 100644 tubearchivist/home/src/ta/task_manager.py diff --git a/tubearchivist/home/src/ta/ta_redis.py b/tubearchivist/home/src/ta/ta_redis.py index 14e90a11..eaf8fff4 100644 --- a/tubearchivist/home/src/ta/ta_redis.py +++ b/tubearchivist/home/src/ta/ta_redis.py @@ -156,3 +156,28 @@ class RedisQueue(RedisBase): """check if queue as at least one pending item""" result = self.conn.execute_command("LRANGE", self.key, 0, 0) return bool(result) + + +class TaskRedis(RedisBase): + """interact with redis tasks""" + + BASE = "celery-task-meta-" + EXPIRE = 60 * 60 * 24 + + def get_all(self): + """return all tasks""" + all_keys = self.conn.execute_command("KEYS", f"{self.BASE}*") + return [i.decode().replace(self.BASE, "") for i in all_keys] + + def get_single(self, task_id): + """return content of single task""" + result = self.conn.execute_command("GET", self.BASE + task_id).decode() + return json.loads(result) + + def set_key(self, task_id, message, expire=False): + """set value for lock, initial or update""" + key = f"{self.BASE}{task_id}" + self.conn.execute_command("SET", key, json.dumps(message)) + + if expire: + self.conn.execute_command("EXPIRE", key, self.EXPIRE) diff --git a/tubearchivist/home/src/ta/task_manager.py b/tubearchivist/home/src/ta/task_manager.py new file mode 100644 index 00000000..2a4f0ba8 --- /dev/null +++ b/tubearchivist/home/src/ta/task_manager.py @@ -0,0 +1,46 @@ +""" +functionality: +- interact with in redis stored task results +- handle threads and locks +""" + +from home.src.ta.ta_redis import TaskRedis + + +class TaskManager: + """manage tasks""" + + def get_all_results(self): + """return all task results""" + handler = TaskRedis() + all_keys = handler.get_all() + if not all_keys: + return False + + return [handler.get_single(i) for i in all_keys] + + def get_tasks_by_name(self, task_name): + """get all tasks by name""" + all_results = self.get_all_results() + + return [i for i in all_results if i.get("name") == task_name] + + def get_task(self, task_id): + """get single task""" + return TaskRedis().get_single(task_id) + + def is_pending(self, task): + """check if task_name is pending, pass task object""" + tasks = self.get_tasks_by_name(task.name) + return bool([i for i in tasks if i.get("status") == "PENDING"]) + + def init(self, task): + """pass task object from bind task to set initial pending message""" + message = { + "status": "PENDING", + "result": None, + "traceback": None, + "name": task.name, + "task_id": task.request.id, + } + TaskRedis().set_key(task.request.id, message)