add generic task command handler

This commit is contained in:
simon 2023-03-08 12:31:03 +07:00
parent 1f08ea9eea
commit 16440a4170
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
2 changed files with 51 additions and 0 deletions

View File

@ -164,6 +164,7 @@ class TaskRedis(RedisBase):
BASE = "celery-task-meta-" BASE = "celery-task-meta-"
EXPIRE = 60 * 60 * 24 EXPIRE = 60 * 60 * 24
COMMANDS = ["STOP", "KILL"]
def get_all(self): def get_all(self):
"""return all tasks""" """return all tasks"""
@ -183,6 +184,16 @@ class TaskRedis(RedisBase):
if expire: if expire:
self.conn.execute_command("EXPIRE", key, self.EXPIRE) self.conn.execute_command("EXPIRE", key, self.EXPIRE)
def set_command(self, task_id, command):
"""set task command"""
if command not in self.COMMANDS:
print(f"{command} not in valid commands {self.COMMANDS}")
raise ValueError
message = self.get_single(task_id)
message.update({"command": command})
self.set_key(task_id, message)
def del_task(self, task_id): def del_task(self, task_id):
"""delete task result by id""" """delete task result by id"""
self.conn.execute_command("DEL", f"{self.BASE}{task_id}") self.conn.execute_command("DEL", f"{self.BASE}{task_id}")

View File

@ -36,6 +36,14 @@ class TaskManager:
tasks = self.get_tasks_by_name(task.name) tasks = self.get_tasks_by_name(task.name)
return bool([i for i in tasks if i.get("status") == "PENDING"]) return bool([i for i in tasks if i.get("status") == "PENDING"])
def get_pending(self, task_name):
"""get all pending tasks of task_name"""
tasks = self.get_tasks_by_name(task_name)
if not tasks:
return False
return [i for i in tasks if i.get("status") == "PENDING"]
def init(self, task): def init(self, task):
"""pass task object from bind task to set initial pending message""" """pass task object from bind task to set initial pending message"""
message = { message = {
@ -47,3 +55,35 @@ class TaskManager:
"task_id": task.request.id, "task_id": task.request.id,
} }
TaskRedis().set_key(task.request.id, message) TaskRedis().set_key(task.request.id, message)
class TaskCommand:
"""send command pending task"""
def __init__(self, command="STOP"):
self.command = command
def by_id(self, task_id):
"""run command on single task id"""
self._set_command(task_id)
def by_name(self, task_name):
"""run command on all tasks by name"""
pending = TaskManager().get_pending(task_name)
if not pending:
return
for task in pending:
self._set_command(task.get("task_id"))
def _set_command(self, task_id):
"""stop single task by id"""
TaskRedis().set_command(task_id, self.command)
if self.command == "KILL":
self._kill(task_id)
def _kill(self, task_id):
"""kill task by id"""
from home.tasks import app as CeleryApp
CeleryApp.control.revoke(task_id, terminate=True)