From 2c719ae1ae779c555a3c8243a382a08d3f5b1019 Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 15 Mar 2023 13:28:19 +0700 Subject: [PATCH] refactor download_pending task --- .../home/src/download/yt_dlp_handler.py | 110 ++++++------------ tubearchivist/home/src/index/comments.py | 62 +++------- tubearchivist/home/tasks.py | 10 +- 3 files changed, 61 insertions(+), 121 deletions(-) diff --git a/tubearchivist/home/src/download/yt_dlp_handler.py b/tubearchivist/home/src/download/yt_dlp_handler.py index be7c71af..cb85dfb1 100644 --- a/tubearchivist/home/src/download/yt_dlp_handler.py +++ b/tubearchivist/home/src/download/yt_dlp_handler.py @@ -22,7 +22,7 @@ from home.src.index.video import YoutubeVideo, index_new_video from home.src.index.video_constants import VideoTypeEnum from home.src.ta.config import AppConfig from home.src.ta.helper import clean_string, ignore_filelist -from home.src.ta.ta_redis import RedisArchivist, RedisQueue +from home.src.ta.ta_redis import RedisQueue class DownloadPostProcess: @@ -125,28 +125,23 @@ class DownloadPostProcess: def _notify_playlist_progress(self, all_channel_playlist, id_c, id_p): """notify to UI""" - title = ( - "Processing playlists for channels: " - + f"{id_c + 1}/{len(self.download.channels)}" - ) - message = f"Progress: {id_p + 1}/{len(all_channel_playlist)}" - key = "message:download" - mess_dict = { - "status": key, - "level": "info", - "title": title, - "message": message, - } - if id_p + 1 == len(all_channel_playlist): - expire = 4 - else: - expire = True + if not self.download.task: + return - RedisArchivist().set_message(key, mess_dict, expire=expire) + total_channel = len(self.download.channels) + total_playlist = len(all_channel_playlist) + + message = [f"Validate Playlists {id_p + 1}/{total_playlist}"] + title = f"Post Processing Channels: {id_c + 1}/{total_channel}" + progress = (id_c + 1) / total_channel + + self.download.task.send_progress( + message, progress=progress, title=title + ) def get_comments(self): """get comments from youtube""" - CommentList(self.download.videos).index(notify=True) + CommentList(self.download.videos, task=self.download.task).index() class VideoDownloader: @@ -157,10 +152,11 @@ class VideoDownloader: MSG = "message:download" - def __init__(self, youtube_id_list=False): + def __init__(self, youtube_id_list=False, task=False): self.obs = False self.video_overwrites = False self.youtube_id_list = youtube_id_list + self.task = task self.config = AppConfig().config self._build_obs() self.channels = set() @@ -181,11 +177,7 @@ class VideoDownloader: if not youtube_data: break - try: - youtube_data = json.loads(youtube_data) - except json.JSONDecodeError: # This many not be necessary - continue - + youtube_data = json.loads(youtube_data) youtube_id = youtube_data.get("youtube_id") tmp_vid_type = youtube_data.get( @@ -198,13 +190,8 @@ class VideoDownloader: if not success: continue - mess_dict = { - "status": self.MSG, - "level": "info", - "title": "Indexing....", - "message": "Add video metadata to index.", - } - RedisArchivist().set_message(self.MSG, mess_dict, expire=120) + if self.task: + self.task.send_progress(["Add video metadata to index."]) vid_dict = index_new_video( youtube_id, @@ -213,29 +200,20 @@ class VideoDownloader: ) self.channels.add(vid_dict["channel"]["channel_id"]) self.videos.add(vid_dict["youtube_id"]) - mess_dict = { - "status": self.MSG, - "level": "info", - "title": "Moving....", - "message": "Moving downloaded file to storage folder", - } - RedisArchivist().set_message(self.MSG, mess_dict) + + if self.task: + self.task.send_progress(["Move downloaded file to archive."]) + + self.move_to_archive(vid_dict) if queue.has_item(): message = "Continue with next video." - expire = False else: message = "Download queue is finished." - expire = 10 - self.move_to_archive(vid_dict) - mess_dict = { - "status": self.MSG, - "level": "info", - "title": "Completed", - "message": message, - } - RedisArchivist().set_message(self.MSG, mess_dict, expire=expire) + if self.task: + self.task.send_progress([message]) + self._delete_from_pending(youtube_id) # post processing @@ -256,13 +234,9 @@ class VideoDownloader: def add_pending(self): """add pending videos to download queue""" - mess_dict = { - "status": self.MSG, - "level": "info", - "title": "Looking for videos to download", - "message": "Scanning your download queue.", - } - RedisArchivist().set_message(self.MSG, mess_dict) + if self.task: + self.task.send_progress(["Scanning your download queue."]) + pending = PendingList() pending.get_download() to_add = [ @@ -279,40 +253,32 @@ class VideoDownloader: if not to_add: # there is nothing pending print("download queue is empty") - mess_dict = { - "status": self.MSG, - "level": "error", - "title": "Download queue is empty", - "message": "Add some videos to the queue first.", - } - RedisArchivist().set_message(self.MSG, mess_dict, expire=True) + if self.task: + self.task.send_progress(["Download queue is empty."]) + return RedisQueue(queue_name="dl_queue").add_list(to_add) def _progress_hook(self, response): """process the progress_hooks from yt_dlp""" - title = "Downloading: " + response["info_dict"]["title"] - + progress = False try: size = response.get("_total_bytes_str") if size.strip() == "N/A": size = response.get("_total_bytes_estimate_str", "N/A") percent = response["_percent_str"] + progress = float(percent.strip("%")) / 100 speed = response["_speed_str"] eta = response["_eta_str"] message = f"{percent} of {size} at {speed} - time left: {eta}" except KeyError: message = "processing" - mess_dict = { - "status": self.MSG, - "level": "info", - "title": title, - "message": message, - } - RedisArchivist().set_message(self.MSG, mess_dict, expire=True) + if self.task: + title = response["info_dict"]["title"] + self.task.send_progress([title, message], progress=progress) def _build_obs(self): """collection to build all obs passed to yt-dlp""" diff --git a/tubearchivist/home/src/index/comments.py b/tubearchivist/home/src/index/comments.py index 32cea557..a8a8795c 100644 --- a/tubearchivist/home/src/index/comments.py +++ b/tubearchivist/home/src/index/comments.py @@ -10,7 +10,6 @@ from datetime import datetime from home.src.download.yt_dlp_base import YtWrap from home.src.es.connect import ElasticWrap from home.src.ta.config import AppConfig -from home.src.ta.ta_redis import RedisArchivist class Comments: @@ -24,14 +23,13 @@ class Comments: self.is_activated = False self.comments_format = False - def build_json(self, notify=False): + def build_json(self): """build json document for es""" print(f"{self.youtube_id}: get comments") self.check_config() if not self.is_activated: return - self._send_notification(notify) comments_raw, channel_id = self.get_yt_comments() if not comments_raw and not channel_id: return @@ -52,23 +50,6 @@ class Comments: self.is_activated = bool(self.config["downloads"]["comment_max"]) - @staticmethod - def _send_notification(notify): - """send notification for download post process message""" - if not notify: - return - - key = "message:download" - idx, total_videos = notify - message = { - "status": key, - "level": "info", - "title": "Download and index comments", - "message": f"Progress: {idx + 1}/{total_videos}", - } - - RedisArchivist().set_message(key, message) - def build_yt_obs(self): """ get extractor config @@ -200,38 +181,29 @@ class Comments: class CommentList: """interact with comments in group""" - def __init__(self, video_ids): + def __init__(self, video_ids, task=False): self.video_ids = video_ids + self.task = task self.config = AppConfig().config - def index(self, notify=False): - """index group of videos""" + def index(self): + """index comments for list, init with task object to notify""" if not self.config["downloads"].get("comment_max"): return total_videos = len(self.video_ids) - if notify: - self._notify(f"add comments for {total_videos} videos", False) - - for idx, video_id in enumerate(self.video_ids): - comment = Comments(video_id, config=self.config) - if notify: - notify = (idx, total_videos) - comment.build_json(notify=notify) + for idx, youtube_id in enumerate(self.video_ids): + if self.task: + self.notify(idx, total_videos) + + comment = Comments(youtube_id, config=self.config) + comment.build_json() if comment.json_data: comment.upload_comments() - if notify: - self._notify(f"added comments for {total_videos} videos", 5) - - @staticmethod - def _notify(message, expire): - """send notification""" - key = "message:download" - message = { - "status": key, - "level": "info", - "title": "Download and index comments finished", - "message": message, - } - RedisArchivist().set_message(key, message, expire=expire) + def notify(self, idx, total_videos): + """send notification on task""" + message = [f"Add comments for new videos {idx + 1}/{total_videos}"] + progress = (idx + 1) / total_videos + title = "Index Comments" + self.task.send_progress(message, progress=progress, title=title) diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index f0d1d55b..d65cddb0 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -114,9 +114,8 @@ class BaseTask(Task): message.update({"messages": ["New task received."]}) RedisArchivist().set_message(key, message) - def send_progress(self, message_lines, progress=False): + def send_progress(self, message_lines, progress=False, title=False): """send progress message""" - print(f"{self.request.id}: {progress}") message, key = self._build_message() message.update( { @@ -124,6 +123,9 @@ class BaseTask(Task): "progress": progress, } ) + if title: + message["title"] = title + RedisArchivist().set_message(key, message) def _build_message(self, level="info"): @@ -162,7 +164,7 @@ def update_subscribed(self): extrac_dl.delay(missing_videos) -@shared_task(name="download_pending", bind=True) +@shared_task(name="download_pending", bind=True, base=BaseTask) def download_pending(self, from_queue=True): """download latest pending videos""" manager = TaskManager() @@ -171,7 +173,7 @@ def download_pending(self, from_queue=True): return manager.init(self) - downloader = VideoDownloader() + downloader = VideoDownloader(task=self) if from_queue: downloader.add_pending() downloader.run_queue()