diff --git a/tubearchivist/home/config.json b/tubearchivist/home/config.json index 2a30cedb..d45a2a10 100644 --- a/tubearchivist/home/config.json +++ b/tubearchivist/home/config.json @@ -47,8 +47,11 @@ }, "scheduler": { "update_subscribed": false, + "update_subscribed_notify": false, "download_pending": false, + "download_pending_notify": false, "check_reindex": {"minute": "0", "hour": "12", "day_of_week": "*"}, + "check_reindex_notify": false, "check_reindex_days": 90, "thumbnail_check": {"minute": "0", "hour": "17", "day_of_week": "*"}, "run_backup": false, diff --git a/tubearchivist/home/src/download/yt_dlp_handler.py b/tubearchivist/home/src/download/yt_dlp_handler.py index abd3a949..75aa7923 100644 --- a/tubearchivist/home/src/download/yt_dlp_handler.py +++ b/tubearchivist/home/src/download/yt_dlp_handler.py @@ -191,6 +191,8 @@ class VideoDownloader: self._add_subscribed_channels() DownloadPostProcess(self).run() + return self.videos + def _notify(self, video_data, message): """send progress notification to task""" if not self.task: diff --git a/tubearchivist/home/src/frontend/forms.py b/tubearchivist/home/src/frontend/forms.py index edde00ce..fe5f5b26 100644 --- a/tubearchivist/home/src/frontend/forms.py +++ b/tubearchivist/home/src/frontend/forms.py @@ -157,9 +157,41 @@ class ApplicationSettingsForm(forms.Form): class SchedulerSettingsForm(forms.Form): """handle scheduler settings""" + HELP_TEXT = "Add Apprise notification URLs, one per line" + update_subscribed = forms.CharField(required=False) + update_subscribed_notify = forms.CharField( + label=False, + widget=forms.Textarea( + attrs={ + "rows": 2, + "placeholder": HELP_TEXT, + } + ), + required=False, + ) download_pending = forms.CharField(required=False) + download_pending_notify = forms.CharField( + label=False, + widget=forms.Textarea( + attrs={ + "rows": 2, + "placeholder": HELP_TEXT, + } + ), + required=False, + ) check_reindex = forms.CharField(required=False) + check_reindex_notify = forms.CharField( + label=False, + widget=forms.Textarea( + attrs={ + "rows": 2, + "placeholder": HELP_TEXT, + } + ), + required=False, + ) check_reindex_days = forms.IntegerField(required=False) thumbnail_check = forms.CharField(required=False) run_backup = forms.CharField(required=False) diff --git a/tubearchivist/home/src/index/reindex.py b/tubearchivist/home/src/index/reindex.py index 7c69b495..b89c00d9 100644 --- a/tubearchivist/home/src/index/reindex.py +++ b/tubearchivist/home/src/index/reindex.py @@ -227,6 +227,11 @@ class Reindex(ReindexBase): super().__init__() self.task = task self.all_indexed_ids = False + self.processed = { + "videos": 0, + "channels": 0, + "playlists": 0, + } def reindex_all(self): """reindex all in queue""" @@ -316,6 +321,7 @@ class Reindex(ReindexBase): thumb_handler.download_video_thumb(video.json_data["vid_thumb_url"]) Comments(youtube_id, config=self.config).reindex_comments() + self.processed["videos"] += 1 return @@ -327,8 +333,7 @@ class Reindex(ReindexBase): new_path = os.path.join(videos, media_url_should) os.rename(old_path, new_path) - @staticmethod - def _reindex_single_channel(channel_id): + def _reindex_single_channel(self, channel_id): """refresh channel data and sync to videos""" # read current state channel = YoutubeChannel(channel_id) @@ -354,6 +359,7 @@ class Reindex(ReindexBase): channel.upload_to_es() ChannelFullScan(channel_id).scan() + self.processed["channels"] += 1 def _reindex_single_playlist(self, playlist_id): """refresh playlist data""" @@ -369,6 +375,7 @@ class Reindex(ReindexBase): playlist.json_data["playlist_subscribed"] = subscribed playlist.upload_to_es() + self.processed["playlists"] += 1 return def _get_all_videos(self): @@ -390,6 +397,18 @@ class Reindex(ReindexBase): valid = CookieHandler(self.config).validate() return valid + def build_message(self): + """build progress message""" + message = "" + for key, value in self.processed.items(): + if value: + message = message + f"{value} {key}, " + + if message: + message = f"reindexed {message.rstrip(', ')}" + + return message + class ReindexProgress(ReindexBase): """ diff --git a/tubearchivist/home/src/ta/config.py b/tubearchivist/home/src/ta/config.py index 671602cb..d1cccced 100644 --- a/tubearchivist/home/src/ta/config.py +++ b/tubearchivist/home/src/ta/config.py @@ -184,6 +184,11 @@ class ScheduleBuilder: "version_check": "0 11 *", } CONFIG = ["check_reindex_days", "run_backup_rotate"] + NOTIFY = [ + "update_subscribed_notify", + "download_pending_notify", + "check_reindex_notify", + ] MSG = "message:setting" def __init__(self): @@ -213,6 +218,13 @@ class ScheduleBuilder: redis_config["scheduler"][key] = to_write if key in self.CONFIG and value: redis_config["scheduler"][key] = int(value) + if key in self.NOTIFY and value: + if value == "0": + to_write = False + else: + to_write = value + redis_config["scheduler"][key] = to_write + RedisArchivist().set_message("config", redis_config) mess_dict = { "status": self.MSG, diff --git a/tubearchivist/home/src/ta/notify.py b/tubearchivist/home/src/ta/notify.py new file mode 100644 index 00000000..d8821285 --- /dev/null +++ b/tubearchivist/home/src/ta/notify.py @@ -0,0 +1,55 @@ +"""send notifications using apprise""" + +import apprise +from home.src.ta.config import AppConfig +from home.src.ta.task_manager import TaskManager + + +class Notifications: + """notification handler""" + + def __init__(self, name, task_id, task_title): + self.name = name + self.task_id = task_id + self.task_title = task_title + + def send(self): + """send notifications""" + apobj = apprise.Apprise() + hooks: str | None = self.get_url() + if not hooks: + return + + hook_list: list[str] = self.parse_hooks(hooks=hooks) + title, body = self.build_message() + + if not body: + return + + for hook in hook_list: + apobj.add(hook) + + apobj.notify(body=body, title=title) + + def get_url(self) -> str | None: + """get apprise urls for task""" + config = AppConfig().config + hooks: str = config["scheduler"].get(f"{self.name}_notify") + + return hooks + + def parse_hooks(self, hooks: str) -> list[str]: + """create list of hooks""" + + hook_list: list[str] = [i.strip() for i in hooks.split()] + + return hook_list + + def build_message(self) -> tuple[str, str | None]: + """build message to send notification""" + task = TaskManager().get_task(self.task_id) + status = task.get("status") + title: str = f"[TA] {self.task_title} process ended with {status}" + body: str | None = task.get("result") + + return title, body diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index 76bc888c..5ce1d627 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -23,6 +23,7 @@ from home.src.index.filesystem import Scanner from home.src.index.manual import ImportFolderScanner from home.src.index.reindex import Reindex, ReindexManual, ReindexPopulate from home.src.ta.config import AppConfig, ReleaseVersion, ScheduleBuilder +from home.src.ta.notify import Notifications from home.src.ta.ta_redis import RedisArchivist from home.src.ta.task_manager import TaskManager from home.src.ta.urlparser import Parser @@ -130,6 +131,12 @@ class BaseTask(Task): message.update({"messages": ["New task received."]}) RedisArchivist().set_message(key, message) + def after_return(self, status, retval, task_id, args, kwargs, einfo): + """callback after task returns""" + print(f"{task_id} return callback") + task_title = self.TASK_CONFIG.get(self.name).get("title") + Notifications(self.name, task_id, task_title).send() + def send_progress(self, message_lines, progress=False, title=False): """send progress message""" message, key = self._build_message() @@ -169,7 +176,7 @@ def update_subscribed(self): if manager.is_pending(self): print(f"[task][{self.name}] rescan already running") self.send_progress("Rescan already in progress.") - return + return None manager.init(self) handler = SubscriptionScanner(task=self) @@ -178,6 +185,10 @@ def update_subscribed(self): if missing_videos: print(missing_videos) extrac_dl.delay(missing_videos, auto_start=auto_start) + message = f"Found {len(missing_videos)} videos to add to the queue." + return message + + return None @shared_task(name="download_pending", bind=True, base=BaseTask) @@ -187,10 +198,16 @@ def download_pending(self, auto_only=False): if manager.is_pending(self): print(f"[task][{self.name}] download queue already running") self.send_progress("Download Queue is already running.") - return + return None manager.init(self) - VideoDownloader(task=self).run_queue(auto_only=auto_only) + downloader = VideoDownloader(task=self) + videos_downloaded = downloader.run_queue(auto_only=auto_only) + + if videos_downloaded: + return f"downloaded {len(videos_downloaded)} videos." + + return None @shared_task(name="extract_download", bind=True, base=BaseTask) @@ -235,7 +252,10 @@ def check_reindex(self, data=False, extract_videos=False): self.send_progress("Add outdated documents to the reindex Queue.") populate.add_outdated() - Reindex(task=self).reindex_all() + handler = Reindex(task=self) + handler.reindex_all() + + return handler.build_message() @shared_task(bind=True, name="manual_import", base=BaseTask) diff --git a/tubearchivist/home/templates/home/settings.html b/tubearchivist/home/templates/home/settings.html index 6805ba89..73a0bc33 100644 --- a/tubearchivist/home/templates/home/settings.html +++ b/tubearchivist/home/templates/home/settings.html @@ -250,6 +250,11 @@
Periodically rescan your subscriptions:
{{ scheduler_form.update_subscribed }} +Send notification on task completed:
+Current notification urls: {{ config.scheduler.update_subscribed_notify }}
+ {{ scheduler_form.update_subscribed_notify }} +Automatic video download schedule:
{{ scheduler_form.download_pending }}Send notification on task completed:
+Current notification urls: {{ config.scheduler.download_pending_notify }}
+ {{ scheduler_form.download_pending_notify }} +Refresh older than x days, recommended 90:
{{ scheduler_form.check_reindex_days }}Send notification on task completed:
+Current notification urls: {{ config.scheduler.check_reindex_notify }}
+ {{ scheduler_form.check_reindex_notify }} +