refactor download post processing to redis queue

master
Simon 4 weeks ago
parent 6ab70c7602
commit 5235af3d91
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4

@ -22,16 +22,18 @@ from home.src.index.video_constants import VideoTypeEnum
from home.src.ta.config import AppConfig
from home.src.ta.helper import get_channel_overwrites, ignore_filelist
from home.src.ta.settings import EnvironmentSettings
from home.src.ta.ta_redis import RedisQueue
class VideoDownloader:
"""
handle the video download functionality
if not initiated with list, take from queue
"""
"""handle the video download functionality"""
CACHE_DIR = EnvironmentSettings.CACHE_DIR
MEDIA_DIR = EnvironmentSettings.MEDIA_DIR
CHANNEL_QUEUE = "download:channel"
PLAYLIST_QUEUE = "download:playlist:full"
PLAYLIST_QUICK = "download:playlist:quick"
VIDEO_QUEUE = "download:video"
def __init__(self, task=False):
self.obs = False
@ -39,11 +41,10 @@ class VideoDownloader:
self.task = task
self.config = AppConfig().config
self._build_obs()
self.channels = set()
self.videos = set()
def run_queue(self, auto_only=False):
def run_queue(self, auto_only=False) -> int:
"""setup download queue in redis loop until no more items"""
downloaded = 0
while True:
video_data = self._get_next(auto_only)
if self.task.is_stopped() or not video_data:
@ -62,18 +63,18 @@ class VideoDownloader:
self._notify(video_data, "Add video metadata to index", progress=1)
video_type = VideoTypeEnum(video_data["vid_type"])
vid_dict = index_new_video(youtube_id, video_type=video_type)
self.channels.add(vid_dict["channel"]["channel_id"])
self.videos.add(vid_dict["youtube_id"])
RedisQueue(self.CHANNEL_QUEUE).add(channel_id)
RedisQueue(self.VIDEO_QUEUE).add(youtube_id)
self._notify(video_data, "Move downloaded file to archive")
self.move_to_archive(vid_dict)
self._delete_from_pending(youtube_id)
downloaded += 1
# post processing
self._add_subscribed_channels()
DownloadPostProcess(self).run()
return self.videos
return downloaded
def _notify(self, video_data, message, progress=False):
"""send progress notification to task"""
@ -263,18 +264,6 @@ class VideoDownloader:
path = f"ta_download/_doc/{youtube_id}?refresh=true"
_, _ = ElasticWrap(path).delete()
def _add_subscribed_channels(self):
"""add all channels subscribed to refresh"""
all_subscribed = PlaylistSubscription().get_playlists()
if not all_subscribed:
return
channel_ids = [i["playlist_channel_id"] for i in all_subscribed]
for channel_id in channel_ids:
self.channels.add(channel_id)
return
def _reset_auto(self):
"""reset autostart to defaults after queue stop"""
path = "ta_download/_update_by_query"
@ -304,8 +293,8 @@ class DownloadPostProcess:
self.channel_overwrites = get_channel_overwrites()
self.auto_delete_all()
self.auto_delete_overwrites()
to_refresh = self.refresh_playlist()
self.match_videos(to_refresh)
self.refresh_playlist()
self.match_videos()
self.get_comments()
def auto_delete_all(self):
@ -357,66 +346,90 @@ class DownloadPostProcess:
pending.parse_url_list()
_ = pending.add_to_pending(status="ignore")
def refresh_playlist(self) -> list[str]:
def refresh_playlist(self) -> None:
"""match videos with playlists"""
to_refresh = self._get_to_refresh_playlists()
total_playlist = self.add_playlists_to_refresh()
queue = RedisQueue(self.download.PLAYLIST_QUEUE)
while True:
playlist_id = queue.get_next()
if not playlist_id:
break
total_playlist = len(to_refresh)
for idx, playlist_id in enumerate(to_refresh):
playlist = YoutubePlaylist(playlist_id)
playlist.update_playlist(skip_on_empty=True)
if not self.download.task:
continue
idx = total_playlist - queue.length()
channel_name = playlist.json_data["playlist_channel"]
playlist_title = playlist.json_data["playlist_name"]
message = [
f"Post Processing Playlists for: {channel_name}",
f"{playlist_title} [{idx + 1}/{total_playlist}]",
f"{playlist_title} [{idx}/{total_playlist}]",
]
progress = (idx + 1) / total_playlist
progress = (idx) / total_playlist
self.download.task.send_progress(message, progress=progress)
return to_refresh
def _get_to_refresh_playlists(self) -> list[str]:
"""get playlists to refresh"""
def add_playlists_to_refresh(self) -> int:
"""add playlists to refresh"""
if self.download.task:
message = ["Post Processing Playlists", "Scanning for Playlists"]
self.download.task.send_progress(message)
to_refresh = []
for channel_id in self.download.channels:
self._add_playlist_sub()
self._add_channel_playlists()
self._add_video_playlists()
return RedisQueue(self.download.PLAYLIST_QUEUE).length()
def _add_playlist_sub(self):
"""add subscribed playlists to refresh"""
subs = PlaylistSubscription().get_playlists()
to_add = [i["playlist_id"] for i in subs]
RedisQueue(self.download.PLAYLIST_QUEUE).add_list(to_add)
def _add_channel_playlists(self):
"""add playlists from channels to refresh"""
queue = RedisQueue(self.download.CHANNEL_QUEUE)
while True:
channel_id = queue.get_next()
if not channel_id:
break
channel = YoutubeChannel(channel_id)
channel.get_from_es()
overwrites = channel.get_overwrites()
if "index_playlists" in overwrites:
channel.get_all_playlists()
to_refresh.extend([i[0] for i in channel.all_playlists])
subs = PlaylistSubscription().get_playlists()
for playlist in subs:
playlist_id = playlist["playlist_id"]
if playlist_id not in to_refresh:
to_refresh.append(playlist_id)
return to_refresh
def match_videos(self, to_refresh: list[str]) -> None:
"""scan rest of indexed playlists to match videos"""
must_not = [{"terms": {"playlist_id": to_refresh}}]
video_ids = list(self.download.videos)
to_add = [i[0] for i in channel.all_playlists]
RedisQueue(self.download.PLAYLIST_QUEUE).add_list(to_add)
def _add_video_playlists(self):
"""add other playlists for quick sync"""
all_playlists = RedisQueue(self.download.PLAYLIST_QUEUE).get_all()
must_not = [{"terms": {"playlist_id": all_playlists}}]
video_ids = RedisQueue(self.download.VIDEO_QUEUE).get_all()
must = [{"terms": {"playlist_entries.youtube_id": video_ids}}]
data = {
"query": {"bool": {"must_not": must_not, "must": must}},
"_source": ["playlist_id"],
}
playlists = IndexPaginate("ta_playlist", data).get_results()
to_add = [i["playlist_id"] for i in playlists]
RedisQueue(self.download.PLAYLIST_QUICK).add_list(to_add)
def match_videos(self) -> None:
"""scan rest of indexed playlists to match videos"""
queue = RedisQueue(self.download.PLAYLIST_QUICK)
total_playlist = queue.length()
while True:
playlist_id = queue.get_next()
if not playlist_id:
break
total_playlist = len(playlists)
for idx, to_match in enumerate(playlists):
playlist_id = to_match["playlist_id"]
playlist = YoutubePlaylist(playlist_id)
playlist.get_from_es()
playlist.add_vids_to_playlist()
@ -425,13 +438,15 @@ class DownloadPostProcess:
if not self.download.task:
continue
idx = total_playlist - queue.length()
message = [
"Post Processing Playlists.",
f"Validate Playlists: - {idx + 1}/{total_playlist}",
f"Validate Playlists: - {idx}/{total_playlist}",
]
progress = (idx + 1) / total_playlist
progress = (idx) / total_playlist
self.download.task.send_progress(message, progress=progress)
def get_comments(self):
"""get comments from youtube"""
CommentList(self.download.videos, task=self.download.task).index()
videos = RedisQueue(self.download.VIDEO_QUEUE).get_all()
CommentList(videos, task=self.download.task).index()

Loading…
Cancel
Save