From 488711ee8f8b73a8466030bb6b5b4ea09c044672 Mon Sep 17 00:00:00 2001 From: simon Date: Tue, 14 Mar 2023 16:40:05 +0700 Subject: [PATCH] refactor extract dl task --- tubearchivist/home/src/download/queue.py | 75 ++++++++++--------- .../home/src/download/subscriptions.py | 38 ++++------ tubearchivist/home/tasks.py | 11 ++- 3 files changed, 64 insertions(+), 60 deletions(-) diff --git a/tubearchivist/home/src/download/queue.py b/tubearchivist/home/src/download/queue.py index 9bf4492a..52e2e66c 100644 --- a/tubearchivist/home/src/download/queue.py +++ b/tubearchivist/home/src/download/queue.py @@ -18,7 +18,7 @@ from home.src.index.playlist import YoutubePlaylist from home.src.index.video_constants import VideoTypeEnum from home.src.ta.config import AppConfig from home.src.ta.helper import DurationConverter, is_shorts -from home.src.ta.ta_redis import RedisArchivist, RedisQueue +from home.src.ta.ta_redis import RedisQueue class PendingIndex: @@ -163,10 +163,11 @@ class PendingList(PendingIndex): "simulate": True, } - def __init__(self, youtube_ids=False): + def __init__(self, youtube_ids=False, task=False): super().__init__() self.config = AppConfig().config self.youtube_ids = youtube_ids + self.task = task self.to_skip = False self.missing_videos = False @@ -175,16 +176,16 @@ class PendingList(PendingIndex): self.missing_videos = [] self.get_download() self.get_indexed() - for entry in self.youtube_ids: - # notify - mess_dict = { - "status": "message:add", - "level": "info", - "title": "Adding to download queue.", - "message": "Extracting lists", - } - RedisArchivist().set_message("message:add", mess_dict, expire=True) + total = len(self.youtube_ids) + for idx, entry in enumerate(self.youtube_ids): self._process_entry(entry) + if not self.task: + continue + + self.task.send_progress( + message_lines=[f"Extracting items {idx + 1}/{total}"], + progress=(idx + 1) / total, + ) def _process_entry(self, entry): """process single entry from url list""" @@ -238,9 +239,10 @@ class PendingList(PendingIndex): self.get_channels() bulk_list = [] + total = len(self.missing_videos) for idx, (youtube_id, vid_type) in enumerate(self.missing_videos): - print(f"{youtube_id} ({vid_type}): add to download queue") - self._notify_add(idx) + print(f"{youtube_id}: [{idx + 1}/{total}]: add to queue") + self._notify_add(idx, total) video_details = self.get_youtube_details(youtube_id, vid_type) if not video_details: continue @@ -253,29 +255,34 @@ class PendingList(PendingIndex): url = video_details["vid_thumb_url"] ThumbManager(youtube_id).download_video_thumb(url) - if bulk_list: - # add last newline - bulk_list.append("\n") - query_str = "\n".join(bulk_list) - _, _ = ElasticWrap("_bulk").post(query_str, ndjson=True) + if len(bulk_list) >= 20: + self._ingest_bulk(bulk_list) + bulk_list = [] - def _notify_add(self, idx): - """send notification for adding videos to download queue""" - progress = f"{idx + 1}/{len(self.missing_videos)}" - mess_dict = { - "status": "message:add", - "level": "info", - "title": "Adding new videos to download queue.", - "message": "Progress: " + progress, - } - if idx + 1 == len(self.missing_videos): - expire = 4 - else: - expire = True + self._ingest_bulk(bulk_list) + + def _ingest_bulk(self, bulk_list): + """add items to queue in bulk""" + if not bulk_list: + return + + # add last newline + bulk_list.append("\n") + query_str = "\n".join(bulk_list) + _, _ = ElasticWrap("_bulk").post(query_str, ndjson=True) - RedisArchivist().set_message("message:add", mess_dict, expire=expire) - if idx + 1 % 25 == 0: - print("adding to queue progress: " + progress) + def _notify_add(self, idx, total): + """send notification for adding videos to download queue""" + if not self.task: + return + + self.task.send_progress( + message_lines=[ + "Adding new videos to download queue.", + f"Extracting items {idx + 1}/{total}", + ], + progress=(idx + 1) / total, + ) def get_youtube_details(self, youtube_id, vid_type=VideoTypeEnum.VIDEOS): """get details from youtubedl for single pending video""" diff --git a/tubearchivist/home/src/download/subscriptions.py b/tubearchivist/home/src/download/subscriptions.py index 46d96873..41cf1821 100644 --- a/tubearchivist/home/src/download/subscriptions.py +++ b/tubearchivist/home/src/download/subscriptions.py @@ -46,7 +46,7 @@ class ChannelSubscription: last_videos = [] - for vid_type, limit_amount in queries: + for vid_type_enum, limit_amount in queries: obs = { "skip_download": True, "extract_flat": True, @@ -54,9 +54,9 @@ class ChannelSubscription: if limit: obs["playlistend"] = limit_amount - path = vid_type.value + vid_type = vid_type_enum.value channel = YtWrap(obs, self.config).extract( - f"https://www.youtube.com/channel/{channel_id}/{path}" + f"https://www.youtube.com/channel/{channel_id}/{vid_type}" ) if not channel: continue @@ -278,20 +278,16 @@ class SubscriptionScanner: def scan(self): """scan channels and playlists""" - self.missing_videos = [] - self._notify() - self._scan_channels() - self._scan_playlists() - if not self.missing_videos: - return + if self.task: + self.task.send_progress(["Rescanning channels and playlists."]) - self.add_to_pending() + self.missing_videos = [] + self.scan_channels() + self.scan_playlists() - def _notify(self): - """set redis notification""" - self.task.send_progress(["Rescanning channels and playlists."]) + return self.missing_videos - def _scan_channels(self): + def scan_channels(self): """get missing from channels""" channel_handler = ChannelSubscription(task=self.task) missing = channel_handler.find_missing() @@ -303,7 +299,7 @@ class SubscriptionScanner: {"type": "video", "vid_type": vid_type, "url": vid_id} ) - def _scan_playlists(self): + def scan_playlists(self): """get missing from playlists""" playlist_handler = PlaylistSubscription(task=self.task) missing = playlist_handler.find_missing() @@ -312,15 +308,13 @@ class SubscriptionScanner: for i in missing: self.missing_videos.append( - {"type": "video", "vid_type": VideoTypeEnum.VIDEOS, "url": i} + { + "type": "video", + "vid_type": VideoTypeEnum.VIDEOS.value, + "url": i, + } ) - def add_to_pending(self): - """add missing videos to pending queue""" - pending_handler = queue.PendingList(youtube_ids=self.missing_videos) - pending_handler.parse_url_list() - pending_handler.add_to_pending() - class SubscriptionHandler: """subscribe to channels and playlists from url_str""" diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index 5ec16d34..f0d1d55b 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -156,7 +156,10 @@ def update_subscribed(self): return manager.init(self) - SubscriptionScanner(task=self).scan() + missing_videos = SubscriptionScanner(task=self).scan() + if missing_videos: + print(missing_videos) + extrac_dl.delay(missing_videos) @shared_task(name="download_pending", bind=True) @@ -174,10 +177,10 @@ def download_pending(self, from_queue=True): downloader.run_queue() -@shared_task(name="extract_download") -def extrac_dl(youtube_ids): +@shared_task(name="extract_download", bind=True, base=BaseTask) +def extrac_dl(self, youtube_ids): """parse list passed and add to pending""" - pending_handler = PendingList(youtube_ids=youtube_ids) + pending_handler = PendingList(youtube_ids=youtube_ids, task=self) pending_handler.parse_url_list() pending_handler.add_to_pending()