diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index f1d292dc..3b5b808c 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -8,7 +8,7 @@ Functionality: import os -from celery import Celery, shared_task +from celery import Celery, Task, shared_task from home.src.download.queue import PendingList from home.src.download.subscriptions import ( SubscriptionHandler, @@ -42,6 +42,95 @@ app.autodiscover_tasks() app.conf.timezone = os.environ.get("TZ") or "UTC" +class BaseTask(Task): + """base class to inherit each class from""" + + TASK_CONFIG = { + "update_subscribed": { + "title": "Rescan your Subscriptions", + "group": "message:download:scan", + }, + "download_pending": { + "title": "Downloading", + "group": "message:download:run", + }, + "extract_download": { + "title": "Add to download queue", + "group": "message:download:add", + }, + "subscribe_to": { + "title": "Add Subscription", + "group": "message:download:subscribe", + }, + "check_reindex": { + "title": "Reindex old documents", + "group": "message:settings:reindex", + }, + "manual_import": { + "title": "Manual video import", + "group": "message:settings:import", + }, + "run_backup": { + "title": "Index Backup", + "group": "message:settings:backup", + }, + "restore_backup": { + "title": "Restore Backup", + "group": "message:settings:restore", + }, + "rescan_filesystem": { + "title": "Rescan your Filesystem", + "group": "message:settings:filesystemscan", + }, + "thumbnail_check": { + "title": "Check your Thumbnails", + "group": "message:settings:thumbnailcheck", + }, + "resync_thumbs": { + "title": "Sync Thumbnails to Media Files", + "group": "message:settings:thumbnailsync", + }, + "index_playlists": { + "title": "Index Channel Playlist", + "group": "message:channel:indexplaylist", + }, + } + + def on_failure(self, exc, task_id, args, kwargs, einfo): + """callback for task failure""" + print(f"{task_id} Failed callback") + + def on_success(self, retval, task_id, args, kwargs): + """callback task completed successfully""" + print(f"{task_id} success callback") + message, key = self._build_message(task_id) + message.update({"message": "Task completed successfully"}) + RedisArchivist().set_message(key, message, expire=5) + + def before_start(self, task_id, args, kwargs): + """callback before initiating task""" + print(f"{self.name} create callback") + message, key = self._build_message(task_id) + message.update({"message": "New task received."}) + RedisArchivist().set_message(key, message) + + def print_progress(self, task_id, progress): + """print progress""" + print(f"{task_id}: {progress}") + + def _build_message(self, task_id, level="info"): + """build message dict""" + config = self.TASK_CONFIG.get(self.name) + message = { + "status": config.get("group"), + "title": config.get("title"), + "level": level, + "id": task_id, + } + key = f"{config.get('group')}:{task_id.split('-')[0]}" + return message, key + + @shared_task(name="update_subscribed", bind=True) def update_subscribed(self): """look for missing videos and add to pending"""