From 16440a41701a59b984aa4f545ba2be1e1f4969db Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 8 Mar 2023 12:31:03 +0700 Subject: [PATCH] add generic task command handler --- tubearchivist/home/src/ta/ta_redis.py | 11 +++++++ tubearchivist/home/src/ta/task_manager.py | 40 +++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/tubearchivist/home/src/ta/ta_redis.py b/tubearchivist/home/src/ta/ta_redis.py index e7e6e590..5e7bd12d 100644 --- a/tubearchivist/home/src/ta/ta_redis.py +++ b/tubearchivist/home/src/ta/ta_redis.py @@ -164,6 +164,7 @@ class TaskRedis(RedisBase): BASE = "celery-task-meta-" EXPIRE = 60 * 60 * 24 + COMMANDS = ["STOP", "KILL"] def get_all(self): """return all tasks""" @@ -183,6 +184,16 @@ class TaskRedis(RedisBase): if expire: self.conn.execute_command("EXPIRE", key, self.EXPIRE) + def set_command(self, task_id, command): + """set task command""" + if command not in self.COMMANDS: + print(f"{command} not in valid commands {self.COMMANDS}") + raise ValueError + + message = self.get_single(task_id) + message.update({"command": command}) + self.set_key(task_id, message) + def del_task(self, task_id): """delete task result by id""" self.conn.execute_command("DEL", f"{self.BASE}{task_id}") diff --git a/tubearchivist/home/src/ta/task_manager.py b/tubearchivist/home/src/ta/task_manager.py index 16e56e8b..b4705b03 100644 --- a/tubearchivist/home/src/ta/task_manager.py +++ b/tubearchivist/home/src/ta/task_manager.py @@ -36,6 +36,14 @@ class TaskManager: tasks = self.get_tasks_by_name(task.name) return bool([i for i in tasks if i.get("status") == "PENDING"]) + def get_pending(self, task_name): + """get all pending tasks of task_name""" + tasks = self.get_tasks_by_name(task_name) + if not tasks: + return False + + return [i for i in tasks if i.get("status") == "PENDING"] + def init(self, task): """pass task object from bind task to set initial pending message""" message = { @@ -47,3 +55,35 @@ class TaskManager: "task_id": task.request.id, } TaskRedis().set_key(task.request.id, message) + + +class TaskCommand: + """send command pending task""" + + def __init__(self, command="STOP"): + self.command = command + + def by_id(self, task_id): + """run command on single task id""" + self._set_command(task_id) + + def by_name(self, task_name): + """run command on all tasks by name""" + pending = TaskManager().get_pending(task_name) + if not pending: + return + + for task in pending: + self._set_command(task.get("task_id")) + + def _set_command(self, task_id): + """stop single task by id""" + TaskRedis().set_command(task_id, self.command) + if self.command == "KILL": + self._kill(task_id) + + def _kill(self, task_id): + """kill task by id""" + from home.tasks import app as CeleryApp + + CeleryApp.control.revoke(task_id, terminate=True)