mirror of
https://github.com/tubearchivist/tubearchivist
synced 2024-11-19 15:25:51 +00:00
refactor Reindex to use RedisQueue
This commit is contained in:
parent
abc3150f59
commit
0d21bfe929
@ -14,11 +14,9 @@ import subprocess
|
||||
from home.src.download.queue import PendingList
|
||||
from home.src.download.thumbnails import ThumbManager
|
||||
from home.src.es.connect import ElasticWrap
|
||||
from home.src.index.reindex import Reindex
|
||||
from home.src.index.video import YoutubeVideo, index_new_video
|
||||
from home.src.ta.config import AppConfig
|
||||
from home.src.ta.helper import clean_string, ignore_filelist
|
||||
from home.src.ta.ta_redis import RedisArchivist
|
||||
from PIL import Image, ImageFile
|
||||
from yt_dlp.utils import ISO639Utils
|
||||
|
||||
@ -606,11 +604,3 @@ def scan_filesystem():
|
||||
for missing_vid in filesystem_handler.to_index:
|
||||
youtube_id = missing_vid[2]
|
||||
index_new_video(youtube_id)
|
||||
|
||||
|
||||
def reindex_old_documents():
|
||||
"""daily refresh of old documents"""
|
||||
handler = Reindex()
|
||||
handler.check_outdated()
|
||||
handler.reindex()
|
||||
RedisArchivist().set_message("last_reindex", handler.now)
|
||||
|
@ -7,7 +7,6 @@ functionality:
|
||||
import os
|
||||
import shutil
|
||||
from datetime import datetime
|
||||
from math import ceil
|
||||
from time import sleep
|
||||
|
||||
from home.src.download.queue import PendingList
|
||||
@ -20,135 +19,156 @@ from home.src.index.comments import Comments
|
||||
from home.src.index.playlist import YoutubePlaylist
|
||||
from home.src.index.video import YoutubeVideo
|
||||
from home.src.ta.config import AppConfig
|
||||
from home.src.ta.ta_redis import RedisArchivist, RedisQueue
|
||||
|
||||
|
||||
class Reindex:
|
||||
"""check for outdated documents and refresh data from youtube"""
|
||||
class ReindexBase:
|
||||
"""base config class for reindex task"""
|
||||
|
||||
REINDEX_CONFIG = [
|
||||
{
|
||||
"index_name": "ta_video",
|
||||
"queue_name": "reindex:ta_video",
|
||||
"active_key": "active",
|
||||
"refresh_key": "vid_last_refresh",
|
||||
},
|
||||
{
|
||||
"index_name": "ta_channel",
|
||||
"queue_name": "reindex:ta_channel",
|
||||
"active_key": "channel_active",
|
||||
"refresh_key": "channel_last_refresh",
|
||||
},
|
||||
{
|
||||
"index_name": "ta_playlist",
|
||||
"queue_name": "reindex:ta_playlist",
|
||||
"active_key": "playlist_active",
|
||||
"refresh_key": "playlist_last_refresh",
|
||||
},
|
||||
]
|
||||
|
||||
MATCH_FIELD = {
|
||||
"ta_video": "active",
|
||||
"ta_channel": "channel_active",
|
||||
"ta_playlist": "playlist_active",
|
||||
}
|
||||
MULTIPLY = 1.2
|
||||
|
||||
def __init__(self):
|
||||
# config
|
||||
self.now = int(datetime.now().strftime("%s"))
|
||||
self.config = AppConfig().config
|
||||
self.interval = self.config["scheduler"]["check_reindex_days"]
|
||||
# scan
|
||||
self.all_youtube_ids = False
|
||||
self.all_channel_ids = False
|
||||
self.all_playlist_ids = False
|
||||
self.now = int(datetime.now().strftime("%s"))
|
||||
|
||||
def check_cookie(self):
|
||||
"""validate cookie if enabled"""
|
||||
if self.config["downloads"]["cookie_import"]:
|
||||
valid = CookieHandler(self.config).validate()
|
||||
if not valid:
|
||||
def populate(self, all_ids, reindex_config):
|
||||
"""add all to reindex ids to redis queue"""
|
||||
if not all_ids:
|
||||
return
|
||||
|
||||
def _get_daily(self):
|
||||
"""get daily refresh values"""
|
||||
total_videos = self._get_total_hits("ta_video")
|
||||
video_daily = ceil(total_videos / self.interval * self.MULTIPLY)
|
||||
if video_daily >= 10000:
|
||||
video_daily = 9999
|
||||
RedisQueue(queue_name=reindex_config["queue_name"]).add_list(all_ids)
|
||||
|
||||
total_channels = self._get_total_hits("ta_channel")
|
||||
channel_daily = ceil(total_channels / self.interval * self.MULTIPLY)
|
||||
total_playlists = self._get_total_hits("ta_playlist")
|
||||
playlist_daily = ceil(total_playlists / self.interval * self.MULTIPLY)
|
||||
return (video_daily, channel_daily, playlist_daily)
|
||||
|
||||
def _get_total_hits(self, index):
|
||||
class ReindexOutdated(ReindexBase):
|
||||
"""add outdated documents to reindex queue"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.interval = self.config["scheduler"]["check_reindex_days"]
|
||||
|
||||
def add_outdated(self):
|
||||
"""add outdated documents"""
|
||||
for reindex_config in self.REINDEX_CONFIG:
|
||||
total_hits = self._get_total_hits(reindex_config)
|
||||
daily_should = self._get_daily_should(total_hits)
|
||||
all_ids = self._get_outdated_ids(reindex_config, daily_should)
|
||||
self.populate(all_ids, reindex_config)
|
||||
|
||||
@staticmethod
|
||||
def _get_total_hits(reindex_config):
|
||||
"""get total hits from index"""
|
||||
match_field = self.MATCH_FIELD[index]
|
||||
path = f"{index}/_search?filter_path=hits.total"
|
||||
data = {"query": {"match": {match_field: True}}}
|
||||
index_name = reindex_config["index_name"]
|
||||
active_key = reindex_config["active_key"]
|
||||
path = f"{index_name}/_search?filter_path=hits.total"
|
||||
data = {"query": {"match": {active_key: True}}}
|
||||
response, _ = ElasticWrap(path).post(data=data)
|
||||
total_hits = response["hits"]["total"]["value"]
|
||||
return total_hits
|
||||
|
||||
def _get_unrated_vids(self):
|
||||
"""get max 200 videos without rating if ryd integration is enabled"""
|
||||
must_not_list = [
|
||||
{"exists": {"field": "stats.average_rating"}},
|
||||
{"term": {"active": {"value": False}}},
|
||||
]
|
||||
data = {"size": 200, "query": {"bool": {"must_not": must_not_list}}}
|
||||
response, _ = ElasticWrap("ta_video/_search").get(data=data)
|
||||
def _get_daily_should(self, total_hits):
|
||||
"""calc how many should reindex daily"""
|
||||
daily_should = int((total_hits // self.interval + 1) * self.MULTIPLY)
|
||||
if daily_should >= 10000:
|
||||
daily_should = 9999
|
||||
|
||||
missing_rating = [i["_id"] for i in response["hits"]["hits"]]
|
||||
self.all_youtube_ids = self.all_youtube_ids + missing_rating
|
||||
return daily_should
|
||||
|
||||
def _get_outdated_vids(self, size):
|
||||
"""get daily videos to refresh"""
|
||||
def _get_outdated_ids(self, reindex_config, daily_should):
|
||||
"""get outdated from index_name"""
|
||||
index_name = reindex_config["index_name"]
|
||||
refresh_key = reindex_config["refresh_key"]
|
||||
now_lte = self.now - self.interval * 24 * 60 * 60
|
||||
must_list = [
|
||||
{"match": {"active": True}},
|
||||
{"range": {"vid_last_refresh": {"lte": now_lte}}},
|
||||
{"range": {refresh_key: {"lte": now_lte}}},
|
||||
]
|
||||
data = {
|
||||
"size": size,
|
||||
"size": daily_should,
|
||||
"query": {"bool": {"must": must_list}},
|
||||
"sort": [{"vid_last_refresh": {"order": "asc"}}],
|
||||
"sort": [{refresh_key: {"order": "asc"}}],
|
||||
"_source": False,
|
||||
}
|
||||
response, _ = ElasticWrap("ta_video/_search").get(data=data)
|
||||
response, _ = ElasticWrap(f"{index_name}/_search").get(data=data)
|
||||
|
||||
all_youtube_ids = [i["_id"] for i in response["hits"]["hits"]]
|
||||
return all_youtube_ids
|
||||
all_ids = [i["_id"] for i in response["hits"]["hits"]]
|
||||
return all_ids
|
||||
|
||||
def _get_outdated_channels(self, size):
|
||||
"""get daily channels to refresh"""
|
||||
now_lte = self.now - self.interval * 24 * 60 * 60
|
||||
must_list = [
|
||||
{"match": {"channel_active": True}},
|
||||
{"range": {"channel_last_refresh": {"lte": now_lte}}},
|
||||
]
|
||||
data = {
|
||||
"size": size,
|
||||
"query": {"bool": {"must": must_list}},
|
||||
"sort": [{"channel_last_refresh": {"order": "asc"}}],
|
||||
"_source": False,
|
||||
|
||||
class Reindex(ReindexBase):
|
||||
"""reindex all documents from redis queue"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.all_indexed_ids = False
|
||||
|
||||
def reindex_all(self):
|
||||
"""reindex all in queue"""
|
||||
if self.cookie_invalid():
|
||||
print("[reindex] cookie invalid, exiting...")
|
||||
return
|
||||
|
||||
for index_config in self.REINDEX_CONFIG:
|
||||
if not RedisQueue(index_config["queue_name"]).has_item():
|
||||
continue
|
||||
|
||||
while True:
|
||||
has_next = self.reindex_index(index_config)
|
||||
if not has_next:
|
||||
break
|
||||
|
||||
RedisArchivist().set_message("last_reindex", self.now)
|
||||
|
||||
def reindex_index(self, index_config):
|
||||
"""reindex all of a single index"""
|
||||
reindex = self.get_reindex_map(index_config["index_name"])
|
||||
youtube_id = RedisQueue(index_config["queue_name"]).get_next()
|
||||
if youtube_id:
|
||||
reindex(youtube_id)
|
||||
sleep_interval = self.config["downloads"].get("sleep_interval", 0)
|
||||
sleep(sleep_interval)
|
||||
|
||||
return bool(youtube_id)
|
||||
|
||||
def get_reindex_map(self, index_name):
|
||||
"""return def to run for index"""
|
||||
def_map = {
|
||||
"ta_video": self._reindex_single_video,
|
||||
"ta_channel": self._reindex_single_channel,
|
||||
"ta_playlist": self._reindex_single_playlist,
|
||||
}
|
||||
response, _ = ElasticWrap("ta_channel/_search").get(data=data)
|
||||
|
||||
all_channel_ids = [i["_id"] for i in response["hits"]["hits"]]
|
||||
return all_channel_ids
|
||||
|
||||
def _get_outdated_playlists(self, size):
|
||||
"""get daily outdated playlists to refresh"""
|
||||
now_lte = self.now - self.interval * 24 * 60 * 60
|
||||
must_list = [
|
||||
{"match": {"playlist_active": True}},
|
||||
{"range": {"playlist_last_refresh": {"lte": now_lte}}},
|
||||
]
|
||||
data = {
|
||||
"size": size,
|
||||
"query": {"bool": {"must": must_list}},
|
||||
"sort": [{"playlist_last_refresh": {"order": "asc"}}],
|
||||
"_source": False,
|
||||
}
|
||||
response, _ = ElasticWrap("ta_playlist/_search").get(data=data)
|
||||
|
||||
all_playlist_ids = [i["_id"] for i in response["hits"]["hits"]]
|
||||
return all_playlist_ids
|
||||
|
||||
def check_outdated(self):
|
||||
"""add missing vids and channels"""
|
||||
video_daily, channel_daily, playlist_daily = self._get_daily()
|
||||
self.all_youtube_ids = self._get_outdated_vids(video_daily)
|
||||
self.all_channel_ids = self._get_outdated_channels(channel_daily)
|
||||
self.all_playlist_ids = self._get_outdated_playlists(playlist_daily)
|
||||
|
||||
integrate_ryd = self.config["downloads"]["integrate_ryd"]
|
||||
if integrate_ryd:
|
||||
self._get_unrated_vids()
|
||||
return def_map.get(index_name)
|
||||
|
||||
def _reindex_single_video(self, youtube_id):
|
||||
"""wrapper to handle channel name changes"""
|
||||
try:
|
||||
self._reindex_single_video_call(youtube_id)
|
||||
except FileNotFoundError:
|
||||
ChannelUrlFixer(youtube_id, self.config)
|
||||
self._reindex_single_video_call(youtube_id)
|
||||
|
||||
def _reindex_single_video_call(self, youtube_id):
|
||||
"""refresh data for single video"""
|
||||
video = YoutubeVideo(youtube_id)
|
||||
|
||||
@ -206,13 +226,13 @@ class Reindex:
|
||||
channel.upload_to_es()
|
||||
channel.sync_to_videos()
|
||||
|
||||
@staticmethod
|
||||
def _reindex_single_playlist(playlist_id, all_indexed_ids):
|
||||
def _reindex_single_playlist(self, playlist_id):
|
||||
"""refresh playlist data"""
|
||||
self._get_all_videos()
|
||||
playlist = YoutubePlaylist(playlist_id)
|
||||
playlist.get_from_es()
|
||||
subscribed = playlist.json_data["playlist_subscribed"]
|
||||
playlist.all_youtube_ids = all_indexed_ids
|
||||
playlist.all_youtube_ids = self.all_indexed_ids
|
||||
playlist.build_json(scrape=True)
|
||||
if not playlist.json_data:
|
||||
playlist.deactivate()
|
||||
@ -222,37 +242,29 @@ class Reindex:
|
||||
playlist.upload_to_es()
|
||||
return
|
||||
|
||||
def reindex(self):
|
||||
"""reindex what's needed"""
|
||||
sleep_interval = self.config["downloads"]["sleep_interval"]
|
||||
# videos
|
||||
print(f"reindexing {len(self.all_youtube_ids)} videos")
|
||||
for youtube_id in self.all_youtube_ids:
|
||||
try:
|
||||
self._reindex_single_video(youtube_id)
|
||||
except FileNotFoundError:
|
||||
# handle channel name change here
|
||||
ChannelUrlFixer(youtube_id, self.config).run()
|
||||
self._reindex_single_video(youtube_id)
|
||||
if sleep_interval:
|
||||
sleep(sleep_interval)
|
||||
# channels
|
||||
print(f"reindexing {len(self.all_channel_ids)} channels")
|
||||
for channel_id in self.all_channel_ids:
|
||||
self._reindex_single_channel(channel_id)
|
||||
if sleep_interval:
|
||||
sleep(sleep_interval)
|
||||
# playlist
|
||||
print(f"reindexing {len(self.all_playlist_ids)} playlists")
|
||||
if self.all_playlist_ids:
|
||||
def _get_all_videos(self):
|
||||
"""add all videos for playlist index validation"""
|
||||
if self.all_indexed_ids:
|
||||
return
|
||||
|
||||
handler = PendingList()
|
||||
handler.get_download()
|
||||
handler.get_indexed()
|
||||
all_indexed_ids = [i["youtube_id"] for i in handler.all_videos]
|
||||
for playlist_id in self.all_playlist_ids:
|
||||
self._reindex_single_playlist(playlist_id, all_indexed_ids)
|
||||
if sleep_interval:
|
||||
sleep(sleep_interval)
|
||||
self.all_indexed_ids = [i["youtube_id"] for i in handler.all_videos]
|
||||
|
||||
def cookie_invalid(self):
|
||||
"""return true if cookie is enabled and invalid"""
|
||||
if not self.config["downloads"]["cookie_import"]:
|
||||
return False
|
||||
|
||||
valid = CookieHandler(self.config).validate()
|
||||
return valid
|
||||
|
||||
|
||||
def reindex_outdated():
|
||||
"""reindex all outdated"""
|
||||
ReindexOutdated().add_outdated()
|
||||
Reindex().reindex_all()
|
||||
|
||||
|
||||
class ChannelUrlFixer:
|
||||
|
@ -20,11 +20,8 @@ from home.src.download.yt_dlp_handler import VideoDownloader
|
||||
from home.src.es.backup import ElasticBackup
|
||||
from home.src.es.index_setup import ElasitIndexWrap
|
||||
from home.src.index.channel import YoutubeChannel
|
||||
from home.src.index.filesystem import (
|
||||
ImportFolderScanner,
|
||||
reindex_old_documents,
|
||||
scan_filesystem,
|
||||
)
|
||||
from home.src.index.filesystem import ImportFolderScanner, scan_filesystem
|
||||
from home.src.index.reindex import reindex_outdated
|
||||
from home.src.ta.config import AppConfig, ScheduleBuilder
|
||||
from home.src.ta.helper import UrlListParser, clear_dl_cache
|
||||
from home.src.ta.ta_redis import RedisArchivist, RedisQueue
|
||||
@ -138,7 +135,7 @@ def extrac_dl(youtube_ids):
|
||||
@shared_task(name="check_reindex")
|
||||
def check_reindex():
|
||||
"""run the reindex main command"""
|
||||
reindex_old_documents()
|
||||
reindex_outdated()
|
||||
|
||||
|
||||
@shared_task
|
||||
|
Loading…
Reference in New Issue
Block a user