From 0d21bfe9296c1c906dbe7362709a48603be8121a Mon Sep 17 00:00:00 2001 From: simon Date: Sun, 11 Dec 2022 12:03:21 +0700 Subject: [PATCH] refactor Reindex to use RedisQueue --- tubearchivist/home/src/index/filesystem.py | 10 - tubearchivist/home/src/index/reindex.py | 280 +++++++++++---------- tubearchivist/home/tasks.py | 9 +- 3 files changed, 149 insertions(+), 150 deletions(-) diff --git a/tubearchivist/home/src/index/filesystem.py b/tubearchivist/home/src/index/filesystem.py index bbe69fc4..38ca3e98 100644 --- a/tubearchivist/home/src/index/filesystem.py +++ b/tubearchivist/home/src/index/filesystem.py @@ -14,11 +14,9 @@ import subprocess from home.src.download.queue import PendingList from home.src.download.thumbnails import ThumbManager from home.src.es.connect import ElasticWrap -from home.src.index.reindex import Reindex from home.src.index.video import YoutubeVideo, index_new_video from home.src.ta.config import AppConfig from home.src.ta.helper import clean_string, ignore_filelist -from home.src.ta.ta_redis import RedisArchivist from PIL import Image, ImageFile from yt_dlp.utils import ISO639Utils @@ -606,11 +604,3 @@ def scan_filesystem(): for missing_vid in filesystem_handler.to_index: youtube_id = missing_vid[2] index_new_video(youtube_id) - - -def reindex_old_documents(): - """daily refresh of old documents""" - handler = Reindex() - handler.check_outdated() - handler.reindex() - RedisArchivist().set_message("last_reindex", handler.now) diff --git a/tubearchivist/home/src/index/reindex.py b/tubearchivist/home/src/index/reindex.py index a372b5bc..1bf9cb61 100644 --- a/tubearchivist/home/src/index/reindex.py +++ b/tubearchivist/home/src/index/reindex.py @@ -7,7 +7,6 @@ functionality: import os import shutil from datetime import datetime -from math import ceil from time import sleep from home.src.download.queue import PendingList @@ -20,135 +19,156 @@ from home.src.index.comments import Comments from home.src.index.playlist import YoutubePlaylist from home.src.index.video import YoutubeVideo from home.src.ta.config import AppConfig +from home.src.ta.ta_redis import RedisArchivist, RedisQueue + + +class ReindexBase: + """base config class for reindex task""" + + REINDEX_CONFIG = [ + { + "index_name": "ta_video", + "queue_name": "reindex:ta_video", + "active_key": "active", + "refresh_key": "vid_last_refresh", + }, + { + "index_name": "ta_channel", + "queue_name": "reindex:ta_channel", + "active_key": "channel_active", + "refresh_key": "channel_last_refresh", + }, + { + "index_name": "ta_playlist", + "queue_name": "reindex:ta_playlist", + "active_key": "playlist_active", + "refresh_key": "playlist_last_refresh", + }, + ] - -class Reindex: - """check for outdated documents and refresh data from youtube""" - - MATCH_FIELD = { - "ta_video": "active", - "ta_channel": "channel_active", - "ta_playlist": "playlist_active", - } MULTIPLY = 1.2 def __init__(self): - # config - self.now = int(datetime.now().strftime("%s")) self.config = AppConfig().config + self.now = int(datetime.now().strftime("%s")) + + def populate(self, all_ids, reindex_config): + """add all to reindex ids to redis queue""" + if not all_ids: + return + + RedisQueue(queue_name=reindex_config["queue_name"]).add_list(all_ids) + + +class ReindexOutdated(ReindexBase): + """add outdated documents to reindex queue""" + + def __init__(self): + super().__init__() self.interval = self.config["scheduler"]["check_reindex_days"] - # scan - self.all_youtube_ids = False - self.all_channel_ids = False - self.all_playlist_ids = False - - def check_cookie(self): - """validate cookie if enabled""" - if self.config["downloads"]["cookie_import"]: - valid = CookieHandler(self.config).validate() - if not valid: - return - - def _get_daily(self): - """get daily refresh values""" - total_videos = self._get_total_hits("ta_video") - video_daily = ceil(total_videos / self.interval * self.MULTIPLY) - if video_daily >= 10000: - video_daily = 9999 - - total_channels = self._get_total_hits("ta_channel") - channel_daily = ceil(total_channels / self.interval * self.MULTIPLY) - total_playlists = self._get_total_hits("ta_playlist") - playlist_daily = ceil(total_playlists / self.interval * self.MULTIPLY) - return (video_daily, channel_daily, playlist_daily) - - def _get_total_hits(self, index): + + def add_outdated(self): + """add outdated documents""" + for reindex_config in self.REINDEX_CONFIG: + total_hits = self._get_total_hits(reindex_config) + daily_should = self._get_daily_should(total_hits) + all_ids = self._get_outdated_ids(reindex_config, daily_should) + self.populate(all_ids, reindex_config) + + @staticmethod + def _get_total_hits(reindex_config): """get total hits from index""" - match_field = self.MATCH_FIELD[index] - path = f"{index}/_search?filter_path=hits.total" - data = {"query": {"match": {match_field: True}}} + index_name = reindex_config["index_name"] + active_key = reindex_config["active_key"] + path = f"{index_name}/_search?filter_path=hits.total" + data = {"query": {"match": {active_key: True}}} response, _ = ElasticWrap(path).post(data=data) total_hits = response["hits"]["total"]["value"] return total_hits - def _get_unrated_vids(self): - """get max 200 videos without rating if ryd integration is enabled""" - must_not_list = [ - {"exists": {"field": "stats.average_rating"}}, - {"term": {"active": {"value": False}}}, - ] - data = {"size": 200, "query": {"bool": {"must_not": must_not_list}}} - response, _ = ElasticWrap("ta_video/_search").get(data=data) + def _get_daily_should(self, total_hits): + """calc how many should reindex daily""" + daily_should = int((total_hits // self.interval + 1) * self.MULTIPLY) + if daily_should >= 10000: + daily_should = 9999 - missing_rating = [i["_id"] for i in response["hits"]["hits"]] - self.all_youtube_ids = self.all_youtube_ids + missing_rating + return daily_should - def _get_outdated_vids(self, size): - """get daily videos to refresh""" + def _get_outdated_ids(self, reindex_config, daily_should): + """get outdated from index_name""" + index_name = reindex_config["index_name"] + refresh_key = reindex_config["refresh_key"] now_lte = self.now - self.interval * 24 * 60 * 60 must_list = [ {"match": {"active": True}}, - {"range": {"vid_last_refresh": {"lte": now_lte}}}, + {"range": {refresh_key: {"lte": now_lte}}}, ] data = { - "size": size, + "size": daily_should, "query": {"bool": {"must": must_list}}, - "sort": [{"vid_last_refresh": {"order": "asc"}}], + "sort": [{refresh_key: {"order": "asc"}}], "_source": False, } - response, _ = ElasticWrap("ta_video/_search").get(data=data) + response, _ = ElasticWrap(f"{index_name}/_search").get(data=data) - all_youtube_ids = [i["_id"] for i in response["hits"]["hits"]] - return all_youtube_ids + all_ids = [i["_id"] for i in response["hits"]["hits"]] + return all_ids - def _get_outdated_channels(self, size): - """get daily channels to refresh""" - now_lte = self.now - self.interval * 24 * 60 * 60 - must_list = [ - {"match": {"channel_active": True}}, - {"range": {"channel_last_refresh": {"lte": now_lte}}}, - ] - data = { - "size": size, - "query": {"bool": {"must": must_list}}, - "sort": [{"channel_last_refresh": {"order": "asc"}}], - "_source": False, - } - response, _ = ElasticWrap("ta_channel/_search").get(data=data) - all_channel_ids = [i["_id"] for i in response["hits"]["hits"]] - return all_channel_ids +class Reindex(ReindexBase): + """reindex all documents from redis queue""" - def _get_outdated_playlists(self, size): - """get daily outdated playlists to refresh""" - now_lte = self.now - self.interval * 24 * 60 * 60 - must_list = [ - {"match": {"playlist_active": True}}, - {"range": {"playlist_last_refresh": {"lte": now_lte}}}, - ] - data = { - "size": size, - "query": {"bool": {"must": must_list}}, - "sort": [{"playlist_last_refresh": {"order": "asc"}}], - "_source": False, - } - response, _ = ElasticWrap("ta_playlist/_search").get(data=data) + def __init__(self): + super().__init__() + self.all_indexed_ids = False - all_playlist_ids = [i["_id"] for i in response["hits"]["hits"]] - return all_playlist_ids + def reindex_all(self): + """reindex all in queue""" + if self.cookie_invalid(): + print("[reindex] cookie invalid, exiting...") + return - def check_outdated(self): - """add missing vids and channels""" - video_daily, channel_daily, playlist_daily = self._get_daily() - self.all_youtube_ids = self._get_outdated_vids(video_daily) - self.all_channel_ids = self._get_outdated_channels(channel_daily) - self.all_playlist_ids = self._get_outdated_playlists(playlist_daily) + for index_config in self.REINDEX_CONFIG: + if not RedisQueue(index_config["queue_name"]).has_item(): + continue + + while True: + has_next = self.reindex_index(index_config) + if not has_next: + break + + RedisArchivist().set_message("last_reindex", self.now) + + def reindex_index(self, index_config): + """reindex all of a single index""" + reindex = self.get_reindex_map(index_config["index_name"]) + youtube_id = RedisQueue(index_config["queue_name"]).get_next() + if youtube_id: + reindex(youtube_id) + sleep_interval = self.config["downloads"].get("sleep_interval", 0) + sleep(sleep_interval) + + return bool(youtube_id) + + def get_reindex_map(self, index_name): + """return def to run for index""" + def_map = { + "ta_video": self._reindex_single_video, + "ta_channel": self._reindex_single_channel, + "ta_playlist": self._reindex_single_playlist, + } - integrate_ryd = self.config["downloads"]["integrate_ryd"] - if integrate_ryd: - self._get_unrated_vids() + return def_map.get(index_name) def _reindex_single_video(self, youtube_id): + """wrapper to handle channel name changes""" + try: + self._reindex_single_video_call(youtube_id) + except FileNotFoundError: + ChannelUrlFixer(youtube_id, self.config) + self._reindex_single_video_call(youtube_id) + + def _reindex_single_video_call(self, youtube_id): """refresh data for single video""" video = YoutubeVideo(youtube_id) @@ -206,13 +226,13 @@ class Reindex: channel.upload_to_es() channel.sync_to_videos() - @staticmethod - def _reindex_single_playlist(playlist_id, all_indexed_ids): + def _reindex_single_playlist(self, playlist_id): """refresh playlist data""" + self._get_all_videos() playlist = YoutubePlaylist(playlist_id) playlist.get_from_es() subscribed = playlist.json_data["playlist_subscribed"] - playlist.all_youtube_ids = all_indexed_ids + playlist.all_youtube_ids = self.all_indexed_ids playlist.build_json(scrape=True) if not playlist.json_data: playlist.deactivate() @@ -222,37 +242,29 @@ class Reindex: playlist.upload_to_es() return - def reindex(self): - """reindex what's needed""" - sleep_interval = self.config["downloads"]["sleep_interval"] - # videos - print(f"reindexing {len(self.all_youtube_ids)} videos") - for youtube_id in self.all_youtube_ids: - try: - self._reindex_single_video(youtube_id) - except FileNotFoundError: - # handle channel name change here - ChannelUrlFixer(youtube_id, self.config).run() - self._reindex_single_video(youtube_id) - if sleep_interval: - sleep(sleep_interval) - # channels - print(f"reindexing {len(self.all_channel_ids)} channels") - for channel_id in self.all_channel_ids: - self._reindex_single_channel(channel_id) - if sleep_interval: - sleep(sleep_interval) - # playlist - print(f"reindexing {len(self.all_playlist_ids)} playlists") - if self.all_playlist_ids: - handler = PendingList() - handler.get_download() - handler.get_indexed() - all_indexed_ids = [i["youtube_id"] for i in handler.all_videos] - for playlist_id in self.all_playlist_ids: - self._reindex_single_playlist(playlist_id, all_indexed_ids) - if sleep_interval: - sleep(sleep_interval) + def _get_all_videos(self): + """add all videos for playlist index validation""" + if self.all_indexed_ids: + return + + handler = PendingList() + handler.get_download() + handler.get_indexed() + self.all_indexed_ids = [i["youtube_id"] for i in handler.all_videos] + + def cookie_invalid(self): + """return true if cookie is enabled and invalid""" + if not self.config["downloads"]["cookie_import"]: + return False + + valid = CookieHandler(self.config).validate() + return valid + + +def reindex_outdated(): + """reindex all outdated""" + ReindexOutdated().add_outdated() + Reindex().reindex_all() class ChannelUrlFixer: diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index d7200510..ecadf8e8 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -20,11 +20,8 @@ from home.src.download.yt_dlp_handler import VideoDownloader from home.src.es.backup import ElasticBackup from home.src.es.index_setup import ElasitIndexWrap from home.src.index.channel import YoutubeChannel -from home.src.index.filesystem import ( - ImportFolderScanner, - reindex_old_documents, - scan_filesystem, -) +from home.src.index.filesystem import ImportFolderScanner, scan_filesystem +from home.src.index.reindex import reindex_outdated from home.src.ta.config import AppConfig, ScheduleBuilder from home.src.ta.helper import UrlListParser, clear_dl_cache from home.src.ta.ta_redis import RedisArchivist, RedisQueue @@ -138,7 +135,7 @@ def extrac_dl(youtube_ids): @shared_task(name="check_reindex") def check_reindex(): """run the reindex main command""" - reindex_old_documents() + reindex_outdated() @shared_task