From 5235af3d91d8d6dd89b74fcf27dcd7d130c0eb53 Mon Sep 17 00:00:00 2001 From: Simon Date: Tue, 14 May 2024 19:17:13 +0200 Subject: [PATCH] refactor download post processing to redis queue --- .../home/src/download/yt_dlp_handler.py | 127 ++++++++++-------- 1 file changed, 71 insertions(+), 56 deletions(-) diff --git a/tubearchivist/home/src/download/yt_dlp_handler.py b/tubearchivist/home/src/download/yt_dlp_handler.py index b7c2b80c..85d152b4 100644 --- a/tubearchivist/home/src/download/yt_dlp_handler.py +++ b/tubearchivist/home/src/download/yt_dlp_handler.py @@ -22,16 +22,18 @@ from home.src.index.video_constants import VideoTypeEnum from home.src.ta.config import AppConfig from home.src.ta.helper import get_channel_overwrites, ignore_filelist from home.src.ta.settings import EnvironmentSettings +from home.src.ta.ta_redis import RedisQueue class VideoDownloader: - """ - handle the video download functionality - if not initiated with list, take from queue - """ + """handle the video download functionality""" CACHE_DIR = EnvironmentSettings.CACHE_DIR MEDIA_DIR = EnvironmentSettings.MEDIA_DIR + CHANNEL_QUEUE = "download:channel" + PLAYLIST_QUEUE = "download:playlist:full" + PLAYLIST_QUICK = "download:playlist:quick" + VIDEO_QUEUE = "download:video" def __init__(self, task=False): self.obs = False @@ -39,11 +41,10 @@ class VideoDownloader: self.task = task self.config = AppConfig().config self._build_obs() - self.channels = set() - self.videos = set() - def run_queue(self, auto_only=False): + def run_queue(self, auto_only=False) -> int: """setup download queue in redis loop until no more items""" + downloaded = 0 while True: video_data = self._get_next(auto_only) if self.task.is_stopped() or not video_data: @@ -62,18 +63,18 @@ class VideoDownloader: self._notify(video_data, "Add video metadata to index", progress=1) video_type = VideoTypeEnum(video_data["vid_type"]) vid_dict = index_new_video(youtube_id, video_type=video_type) - self.channels.add(vid_dict["channel"]["channel_id"]) - self.videos.add(vid_dict["youtube_id"]) + RedisQueue(self.CHANNEL_QUEUE).add(channel_id) + RedisQueue(self.VIDEO_QUEUE).add(youtube_id) self._notify(video_data, "Move downloaded file to archive") self.move_to_archive(vid_dict) self._delete_from_pending(youtube_id) + downloaded += 1 # post processing - self._add_subscribed_channels() DownloadPostProcess(self).run() - return self.videos + return downloaded def _notify(self, video_data, message, progress=False): """send progress notification to task""" @@ -263,18 +264,6 @@ class VideoDownloader: path = f"ta_download/_doc/{youtube_id}?refresh=true" _, _ = ElasticWrap(path).delete() - def _add_subscribed_channels(self): - """add all channels subscribed to refresh""" - all_subscribed = PlaylistSubscription().get_playlists() - if not all_subscribed: - return - - channel_ids = [i["playlist_channel_id"] for i in all_subscribed] - for channel_id in channel_ids: - self.channels.add(channel_id) - - return - def _reset_auto(self): """reset autostart to defaults after queue stop""" path = "ta_download/_update_by_query" @@ -304,8 +293,8 @@ class DownloadPostProcess: self.channel_overwrites = get_channel_overwrites() self.auto_delete_all() self.auto_delete_overwrites() - to_refresh = self.refresh_playlist() - self.match_videos(to_refresh) + self.refresh_playlist() + self.match_videos() self.get_comments() def auto_delete_all(self): @@ -357,66 +346,90 @@ class DownloadPostProcess: pending.parse_url_list() _ = pending.add_to_pending(status="ignore") - def refresh_playlist(self) -> list[str]: + def refresh_playlist(self) -> None: """match videos with playlists""" - to_refresh = self._get_to_refresh_playlists() + total_playlist = self.add_playlists_to_refresh() + + queue = RedisQueue(self.download.PLAYLIST_QUEUE) + while True: + playlist_id = queue.get_next() + if not playlist_id: + break - total_playlist = len(to_refresh) - for idx, playlist_id in enumerate(to_refresh): playlist = YoutubePlaylist(playlist_id) playlist.update_playlist(skip_on_empty=True) if not self.download.task: continue + idx = total_playlist - queue.length() + channel_name = playlist.json_data["playlist_channel"] playlist_title = playlist.json_data["playlist_name"] message = [ f"Post Processing Playlists for: {channel_name}", - f"{playlist_title} [{idx + 1}/{total_playlist}]", + f"{playlist_title} [{idx}/{total_playlist}]", ] - progress = (idx + 1) / total_playlist + progress = (idx) / total_playlist self.download.task.send_progress(message, progress=progress) - return to_refresh - - def _get_to_refresh_playlists(self) -> list[str]: - """get playlists to refresh""" + def add_playlists_to_refresh(self) -> int: + """add playlists to refresh""" if self.download.task: message = ["Post Processing Playlists", "Scanning for Playlists"] self.download.task.send_progress(message) - to_refresh = [] - for channel_id in self.download.channels: + self._add_playlist_sub() + self._add_channel_playlists() + self._add_video_playlists() + + return RedisQueue(self.download.PLAYLIST_QUEUE).length() + + def _add_playlist_sub(self): + """add subscribed playlists to refresh""" + subs = PlaylistSubscription().get_playlists() + to_add = [i["playlist_id"] for i in subs] + RedisQueue(self.download.PLAYLIST_QUEUE).add_list(to_add) + + def _add_channel_playlists(self): + """add playlists from channels to refresh""" + queue = RedisQueue(self.download.CHANNEL_QUEUE) + while True: + channel_id = queue.get_next() + if not channel_id: + break + channel = YoutubeChannel(channel_id) channel.get_from_es() overwrites = channel.get_overwrites() if "index_playlists" in overwrites: channel.get_all_playlists() - to_refresh.extend([i[0] for i in channel.all_playlists]) + to_add = [i[0] for i in channel.all_playlists] + RedisQueue(self.download.PLAYLIST_QUEUE).add_list(to_add) - subs = PlaylistSubscription().get_playlists() - for playlist in subs: - playlist_id = playlist["playlist_id"] - if playlist_id not in to_refresh: - to_refresh.append(playlist_id) - - return to_refresh - - def match_videos(self, to_refresh: list[str]) -> None: - """scan rest of indexed playlists to match videos""" - must_not = [{"terms": {"playlist_id": to_refresh}}] - video_ids = list(self.download.videos) + def _add_video_playlists(self): + """add other playlists for quick sync""" + all_playlists = RedisQueue(self.download.PLAYLIST_QUEUE).get_all() + must_not = [{"terms": {"playlist_id": all_playlists}}] + video_ids = RedisQueue(self.download.VIDEO_QUEUE).get_all() must = [{"terms": {"playlist_entries.youtube_id": video_ids}}] data = { "query": {"bool": {"must_not": must_not, "must": must}}, "_source": ["playlist_id"], } playlists = IndexPaginate("ta_playlist", data).get_results() + to_add = [i["playlist_id"] for i in playlists] + RedisQueue(self.download.PLAYLIST_QUICK).add_list(to_add) + + def match_videos(self) -> None: + """scan rest of indexed playlists to match videos""" + queue = RedisQueue(self.download.PLAYLIST_QUICK) + total_playlist = queue.length() + while True: + playlist_id = queue.get_next() + if not playlist_id: + break - total_playlist = len(playlists) - for idx, to_match in enumerate(playlists): - playlist_id = to_match["playlist_id"] playlist = YoutubePlaylist(playlist_id) playlist.get_from_es() playlist.add_vids_to_playlist() @@ -425,13 +438,15 @@ class DownloadPostProcess: if not self.download.task: continue + idx = total_playlist - queue.length() message = [ "Post Processing Playlists.", - f"Validate Playlists: - {idx + 1}/{total_playlist}", + f"Validate Playlists: - {idx}/{total_playlist}", ] - progress = (idx + 1) / total_playlist + progress = (idx) / total_playlist self.download.task.send_progress(message, progress=progress) def get_comments(self): """get comments from youtube""" - CommentList(self.download.videos, task=self.download.task).index() + videos = RedisQueue(self.download.VIDEO_QUEUE).get_all() + CommentList(videos, task=self.download.task).index()