diff --git a/tubearchivist/api/README.md b/tubearchivist/api/README.md index eb182de3..b87980f7 100644 --- a/tubearchivist/api/README.md +++ b/tubearchivist/api/README.md @@ -36,9 +36,13 @@ Note: - [Snapshot List](#snapshot-list-view) - [Snapshot Single](#snapshot-item-view) +**Task management** +- [Task Name List](#task-name-list-view) +- [Task Name Single](#task-name-item-view) +- [Task ID](#task-id-view) + **Additional** - [Login](#login-view) -- [Task](#task-view) WIP - [Refresh](#refresh-view) - [Cookie](#cookie-view) - [Search](#search-view) @@ -252,7 +256,7 @@ POST /api/snapshot/ Create new snapshot now, will return immediately, task will run async in the background, will return snapshot name: ```json { - "snapshot_name": "ta_daily_ + "snapshot_name": "ta_daily_" } ``` @@ -261,7 +265,7 @@ GET /api/snapshot/\/ Return metadata of a single snapshot ```json { - "id": "ta_daily_, + "id": "ta_daily_", "state": "SUCCESS", "es_version": "0.0.0", "start_date": "date_str", @@ -277,6 +281,29 @@ Restore this snapshot DELETE /api/snapshot/\/ Remove this snapshot from index +## Task Name List View +GET /api/task-name/ +Return all task results + +## Task Name Item View +GET /api/task-name/\/ +Return all ask results by task name + +POST /api/task-name/\/ +Start a new task by task name, only tasks without arguments can be started like that, see `home.tasks.BaseTask.TASK_CONFIG` for more info. + +## Task ID view +GET /api/task-id/\/ +Return task status by task ID + +POST /api/task-id/\/ +```json +{ + "command": "stop|kill" +} +``` +Send command to a task, valid commands: `stop` and `kill`. + ## Login View Return token and user ID for username and password: POST /api/login/ @@ -295,33 +322,6 @@ after successful login returns } ``` -## Task View -GET /api/task/ -POST /api/task/ - -Check if there is an ongoing task: -GET /api/task/ - -Returns: -```json -{ - "rescan": false, - "downloading": false -} -``` - -Start a background task -POST /api/task/ -```json -{ - "run": "task_name" -} -``` - -List of valid task names: -- **download_pending**: Start the download queue -- **rescan_pending**: Rescan your subscriptions - ## Refresh View GET /api/refresh/ parameters: diff --git a/tubearchivist/api/src/task_processor.py b/tubearchivist/api/src/task_processor.py deleted file mode 100644 index dd42ee07..00000000 --- a/tubearchivist/api/src/task_processor.py +++ /dev/null @@ -1,54 +0,0 @@ -""" -Functionality: -- process tasks from API -- validate -- handover to celery -""" - -from home.src.ta.ta_redis import RedisArchivist -from home.tasks import download_pending, update_subscribed - - -class TaskHandler: - """handle tasks from api""" - - def __init__(self, data): - self.data = data - - def run_task(self): - """map data and run""" - task_name = self.data["run"] - try: - to_run = self.exec_map(task_name) - except KeyError as err: - print(f"invalid task name {task_name}") - raise ValueError from err - - response = to_run() - response.update({"task": task_name}) - return response - - def exec_map(self, task_name): - """map dict key and return function to execute""" - exec_map = { - "download_pending": self._download_pending, - "rescan_pending": self._rescan_pending, - } - - return exec_map[task_name] - - @staticmethod - def _rescan_pending(): - """look for new items in subscribed channels""" - print("rescan subscribed channels") - update_subscribed.delay() - return {"success": True} - - @staticmethod - def _download_pending(): - """start the download queue""" - print("download pending") - running = download_pending.delay() - print("set task id: " + running.id) - RedisArchivist().set_message("dl_queue_id", running.id) - return {"success": True} diff --git a/tubearchivist/api/urls.py b/tubearchivist/api/urls.py index 6f400653..e87d2aee 100644 --- a/tubearchivist/api/urls.py +++ b/tubearchivist/api/urls.py @@ -81,11 +81,6 @@ urlpatterns = [ views.RefreshView.as_view(), name="api-refresh", ), - path( - "task/", - views.TaskApiView.as_view(), - name="api-task", - ), path( "snapshot/", views.SnapshotApiListView.as_view(), @@ -96,6 +91,21 @@ urlpatterns = [ views.SnapshotApiView.as_view(), name="api-snapshot", ), + path( + "task-name/", + views.TaskListView.as_view(), + name="api-task-list", + ), + path( + "task-name//", + views.TaskNameListView.as_view(), + name="api-task-name-list", + ), + path( + "task-id//", + views.TaskIDView.as_view(), + name="api-task-id", + ), path( "cookie/", views.CookieView.as_view(), diff --git a/tubearchivist/api/views.py b/tubearchivist/api/views.py index 8bd954bb..cbdd86b0 100644 --- a/tubearchivist/api/views.py +++ b/tubearchivist/api/views.py @@ -1,7 +1,6 @@ """all API views""" from api.src.search_processor import SearchProcess -from api.src.task_processor import TaskHandler from home.src.download.queue import PendingInteract from home.src.download.yt_dlp_base import CookieHandler from home.src.es.connect import ElasticWrap @@ -14,8 +13,15 @@ from home.src.index.reindex import ReindexProgress from home.src.index.video import SponsorBlock, YoutubeVideo from home.src.ta.config import AppConfig from home.src.ta.ta_redis import RedisArchivist, RedisQueue +from home.src.ta.task_manager import TaskCommand, TaskManager from home.src.ta.urlparser import Parser -from home.tasks import check_reindex, download_pending, extrac_dl, subscribe_to +from home.tasks import ( + BaseTask, + check_reindex, + download_pending, + extrac_dl, + subscribe_to, +) from rest_framework.authentication import ( SessionAuthentication, TokenAuthentication, @@ -555,29 +561,6 @@ class LoginApiView(ObtainAuthToken): return Response({"token": token.key, "user_id": user.pk}) -class TaskApiView(ApiBaseView): - """resolves to /api/task/ - GET: check if ongoing background task - POST: start a new background task - """ - - @staticmethod - def get(request): - """handle get request""" - # pylint: disable=unused-argument - response = {"rescan": False, "downloading": False} - for key in response.keys(): - response[key] = RedisArchivist().is_locked(key) - - return Response(response) - - def post(self, request): - """handle post request""" - response = TaskHandler(request.data).run_task() - - return Response(response) - - class SnapshotApiListView(ApiBaseView): """resolves to /api/snapshot/ GET: returns snashot config plus list of existing snapshots @@ -642,6 +625,103 @@ class SnapshotApiView(ApiBaseView): return Response(response) +class TaskListView(ApiBaseView): + """resolves to /api/task-name/ + GET: return a list of all stored task results + """ + + def get(self, request): + """handle get request""" + # pylint: disable=unused-argument + all_results = TaskManager().get_all_results() + + return Response(all_results) + + +class TaskNameListView(ApiBaseView): + """resolves to /api/task-name// + GET: return a list of stored results of task + POST: start new background process + """ + + def get(self, request, task_name): + """handle get request""" + # pylint: disable=unused-argument + if task_name not in BaseTask.TASK_CONFIG: + message = {"message": "invalid task name"} + return Response(message, status=404) + + all_results = TaskManager().get_tasks_by_name(task_name) + + return Response(all_results) + + def post(self, request, task_name): + """ + handle post request + 404 for invalid task_name + 400 if task can't be started here without argument + """ + # pylint: disable=unused-argument + task_config = BaseTask.TASK_CONFIG.get(task_name) + if not task_config: + message = {"message": "invalid task name"} + return Response(message, status=404) + + if not task_config.get("api-start"): + message = {"message": "can not start task through this endpoint"} + return Response(message, status=400) + + message = TaskCommand().start(task_name) + + return Response({"message": message}) + + +class TaskIDView(ApiBaseView): + """resolves to /api/task-id// + GET: return details of task id + """ + + valid_commands = ["stop", "kill"] + + def get(self, request, task_id): + """handle get request""" + # pylint: disable=unused-argument + task_result = TaskManager().get_task(task_id) + if not task_result: + message = {"message": "task id not found"} + return Response(message, status=404) + + return Response(task_result) + + def post(self, request, task_id): + """post command to task""" + command = request.data.get("command") + if not command or command not in self.valid_commands: + message = {"message": "no valid command found"} + return Response(message, status=400) + + task_result = TaskManager().get_task(task_id) + if not task_result: + message = {"message": "task id not found"} + return Response(message, status=404) + + task_conf = BaseTask.TASK_CONFIG.get(task_result.get("name")) + if command == "stop": + if not task_conf.get("api-stop"): + message = {"message": "task can not be stopped"} + return Response(message, status=400) + + TaskCommand().stop(task_id) + if command == "kill": + if not task_conf.get("api-stop"): + message = {"message": "task can not be killed"} + return Response(message, status=400) + + TaskCommand().kill(task_id) + + return Response({"message": "command sent"}) + + class RefreshView(ApiBaseView): """resolves to /api/refresh/ GET: get refresh progress diff --git a/tubearchivist/home/src/ta/task_manager.py b/tubearchivist/home/src/ta/task_manager.py index 29868071..517318a6 100644 --- a/tubearchivist/home/src/ta/task_manager.py +++ b/tubearchivist/home/src/ta/task_manager.py @@ -4,6 +4,7 @@ functionality: - handle threads and locks """ +from home import tasks as ta_tasks from home.src.ta.ta_redis import TaskRedis @@ -61,32 +62,32 @@ class TaskManager: class TaskCommand: - """send command pending task""" + """run commands on task""" - def __init__(self, command="STOP"): - self.command = command - - def by_id(self, task_id): - """run command on single task id""" - self._set_command(task_id) + def start(self, task_name): + """start task by task_name, only pass task that don't take args""" + task = ta_tasks.app.tasks.get(task_name).delay() + message = { + "task_id": task.id, + "status": task.status, + "task_name": task.name, + } - def by_name(self, task_name): - """run command on all tasks by name""" - pending = TaskManager().get_pending(task_name) - if not pending: - return + return message - for task in pending: - self._set_command(task.get("task_id")) + def stop(self, task_id): + """ + send stop signal to task_id, + needs to be implemented in task to take effect + """ + handler = TaskRedis() - def _set_command(self, task_id): - """stop single task by id""" - TaskRedis().set_command(task_id, self.command) - if self.command == "KILL": - self._kill(task_id) + task = handler.get_single(task_id) + if not task["name"] in ta_tasks.BaseTask.TASK_CONFIG: + raise ValueError - def _kill(self, task_id): - """kill task by id""" - from home.tasks import app as CeleryApp + handler.set_command(task_id, "STOP") - CeleryApp.control.revoke(task_id, terminate=True) + def kill(self, task_id): + """send kill signal to task_id""" + ta_tasks.app.control.revoke(task_id, terminate=True) diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index a588d6c8..f044b1f6 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -46,14 +46,18 @@ app.conf.timezone = os.environ.get("TZ") or "UTC" class BaseTask(Task): """base class to inherit each class from""" + # pylint: disable=abstract-method + TASK_CONFIG = { "update_subscribed": { "title": "Rescan your Subscriptions", "group": "download:scan", + "api-start": True, }, "download_pending": { "title": "Downloading", "group": "download:run", + "api-start": True, }, "extract_download": { "title": "Add to download queue", @@ -66,10 +70,12 @@ class BaseTask(Task): "manual_import": { "title": "Manual video import", "group": "setting:import", + "api-start": True, }, "run_backup": { "title": "Index Backup", "group": "setting:backup", + "api-start": True, }, "restore_backup": { "title": "Restore Backup", @@ -78,14 +84,17 @@ class BaseTask(Task): "rescan_filesystem": { "title": "Rescan your Filesystem", "group": "setting:filesystemscan", + "api-start": True, }, "thumbnail_check": { "title": "Check your Thumbnails", "group": "setting:thumbnailcheck", + "api-start": True, }, "resync_thumbs": { "title": "Sync Thumbnails to Media Files", "group": "setting:thumbnailsync", + "api-start": True, }, "index_playlists": { "title": "Index Channel Playlist", @@ -132,14 +141,9 @@ class BaseTask(Task): def _build_message(self, level="info"): """build message dict""" task_id = self.request.id - config = self.TASK_CONFIG.get(self.name) - message = { - "status": config.get("group"), - "title": config.get("title"), - "level": level, - "id": task_id, - } - key = f"message:{config.get('group')}:{task_id.split('-')[0]}" + message = self.TASK_CONFIG.get(self.name) + message.update({"level": level, "id": task_id}) + key = f"message:{message.get('group')}:{task_id.split('-')[0]}" return message, key