better reindex_type handler, fix off by one

master
Simon 1 month ago
parent 2a35b42d88
commit 1d9c274390
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4

@ -8,6 +8,7 @@ import json
import os
from datetime import datetime
from time import sleep
from typing import Callable, TypedDict
from home.models import CustomPeriodicTask
from home.src.download.subscriptions import ChannelSubscription
@ -23,10 +24,19 @@ from home.src.ta.settings import EnvironmentSettings
from home.src.ta.ta_redis import RedisQueue
class ReindexConfigType(TypedDict):
"""represents config type"""
index_name: str
queue_name: str
active_key: str
refresh_key: str
class ReindexBase:
"""base config class for reindex task"""
REINDEX_CONFIG = {
REINDEX_CONFIG: dict[str, ReindexConfigType] = {
"video": {
"index_name": "ta_video",
"queue_name": "reindex:ta_video",
@ -54,7 +64,7 @@ class ReindexBase:
self.config = AppConfig().config
self.now = int(datetime.now().timestamp())
def populate(self, all_ids, reindex_config):
def populate(self, all_ids, reindex_config: ReindexConfigType):
"""add all to reindex ids to redis queue"""
if not all_ids:
return
@ -65,13 +75,13 @@ class ReindexBase:
class ReindexPopulate(ReindexBase):
"""add outdated and recent documents to reindex queue"""
INTERVAL_DEFAIULT = 90
INTERVAL_DEFAIULT: int = 90
def __init__(self):
super().__init__()
self.interval = self.INTERVAL_DEFAIULT
def get_interval(self):
def get_interval(self) -> None:
"""get reindex days interval from task"""
try:
task = CustomPeriodicTask.objects.get(name="check_reindex")
@ -82,7 +92,7 @@ class ReindexPopulate(ReindexBase):
if task_config.get("days"):
self.interval = task_config.get("days")
def add_recent(self):
def add_recent(self) -> None:
"""add recent videos to refresh"""
gte = datetime.fromtimestamp(self.now - self.DAYS3).date().isoformat()
must_list = [
@ -100,10 +110,10 @@ class ReindexPopulate(ReindexBase):
return
all_ids = [i["_source"]["youtube_id"] for i in hits]
reindex_config = self.REINDEX_CONFIG.get("video")
reindex_config: ReindexConfigType = self.REINDEX_CONFIG["video"]
self.populate(all_ids, reindex_config)
def add_outdated(self):
def add_outdated(self) -> None:
"""add outdated documents"""
for reindex_config in self.REINDEX_CONFIG.values():
total_hits = self._get_total_hits(reindex_config)
@ -112,7 +122,7 @@ class ReindexPopulate(ReindexBase):
self.populate(all_ids, reindex_config)
@staticmethod
def _get_total_hits(reindex_config):
def _get_total_hits(reindex_config: ReindexConfigType) -> int:
"""get total hits from index"""
index_name = reindex_config["index_name"]
active_key = reindex_config["active_key"]
@ -124,7 +134,7 @@ class ReindexPopulate(ReindexBase):
return len(total)
def _get_daily_should(self, total_hits):
def _get_daily_should(self, total_hits: int) -> int:
"""calc how many should reindex daily"""
daily_should = int((total_hits // self.interval + 1) * self.MULTIPLY)
if daily_should >= 10000:
@ -132,7 +142,9 @@ class ReindexPopulate(ReindexBase):
return daily_should
def _get_outdated_ids(self, reindex_config, daily_should):
def _get_outdated_ids(
self, reindex_config: ReindexConfigType, daily_should: int
) -> list[str]:
"""get outdated from index_name"""
index_name = reindex_config["index_name"]
refresh_key = reindex_config["refresh_key"]
@ -169,7 +181,7 @@ class ReindexManual(ReindexBase):
self.extract_videos = extract_videos
self.data = False
def extract_data(self, data):
def extract_data(self, data) -> None:
"""process data"""
self.data = data
for key, values in self.data.items():
@ -180,7 +192,9 @@ class ReindexManual(ReindexBase):
self.process_index(reindex_config, values)
def process_index(self, index_config, values):
def process_index(
self, index_config: ReindexConfigType, values: list[str]
) -> None:
"""process values per index"""
index_name = index_config["index_name"]
if index_name == "ta_video":
@ -190,7 +204,7 @@ class ReindexManual(ReindexBase):
elif index_name == "ta_playlist":
self._add_playlists(values)
def _add_videos(self, values):
def _add_videos(self, values: list[str]) -> None:
"""add list of videos to reindex queue"""
if not values:
return
@ -198,7 +212,7 @@ class ReindexManual(ReindexBase):
queue_name = self.REINDEX_CONFIG["video"]["queue_name"]
RedisQueue(queue_name).add_list(values)
def _add_channels(self, values):
def _add_channels(self, values: list[str]) -> None:
"""add list of channels to reindex queue"""
queue_name = self.REINDEX_CONFIG["channel"]["queue_name"]
RedisQueue(queue_name).add_list(values)
@ -208,7 +222,7 @@ class ReindexManual(ReindexBase):
all_videos = self._get_channel_videos(channel_id)
self._add_videos(all_videos)
def _add_playlists(self, values):
def _add_playlists(self, values: list[str]) -> None:
"""add list of playlists to reindex queue"""
queue_name = self.REINDEX_CONFIG["playlist"]["queue_name"]
RedisQueue(queue_name).add_list(values)
@ -218,7 +232,7 @@ class ReindexManual(ReindexBase):
all_videos = self._get_playlist_videos(playlist_id)
self._add_videos(all_videos)
def _get_channel_videos(self, channel_id):
def _get_channel_videos(self, channel_id: str) -> list[str]:
"""get all videos from channel"""
data = {
"query": {"term": {"channel.channel_id": {"value": channel_id}}},
@ -227,7 +241,7 @@ class ReindexManual(ReindexBase):
all_results = IndexPaginate("ta_video", data).get_results()
return [i["youtube_id"] for i in all_results]
def _get_playlist_videos(self, playlist_id):
def _get_playlist_videos(self, playlist_id: str) -> list[str]:
"""get all videos from playlist"""
data = {
"query": {"term": {"playlist.keyword": {"value": playlist_id}}},
@ -249,7 +263,7 @@ class Reindex(ReindexBase):
"playlists": 0,
}
def reindex_all(self):
def reindex_all(self) -> None:
"""reindex all in queue"""
if not self.cookie_is_valid():
print("[reindex] cookie invalid, exiting...")
@ -259,27 +273,26 @@ class Reindex(ReindexBase):
if not RedisQueue(index_config["queue_name"]).length():
continue
while True:
has_next = self.reindex_index(name, index_config)
if not has_next:
break
self.reindex_type(name, index_config)
def reindex_index(self, name, index_config):
def reindex_type(self, name: str, index_config: ReindexConfigType) -> None:
"""reindex all of a single index"""
reindex = self._get_reindex_map(index_config["index_name"])
queue = RedisQueue(index_config["queue_name"])
total = queue.max_score()
youtube_id, idx = queue.get_next()
if youtube_id:
while True:
total = queue.max_score()
youtube_id, idx = queue.get_next()
if not youtube_id or not idx or not total:
break
if self.task:
self._notify(name, total, idx)
reindex(youtube_id)
sleep_interval = self.config["downloads"].get("sleep_interval", 0)
sleep(sleep_interval)
return bool(youtube_id)
def _get_reindex_map(self, index_name):
def _get_reindex_map(self, index_name: str) -> Callable:
"""return def to run for index"""
def_map = {
"ta_video": self._reindex_single_video,
@ -287,15 +300,15 @@ class Reindex(ReindexBase):
"ta_playlist": self._reindex_single_playlist,
}
return def_map.get(index_name)
return def_map[index_name]
def _notify(self, name, total, idx):
def _notify(self, name: str, total: int, idx: int) -> None:
"""send notification back to task"""
message = [f"Reindexing {name.title()}s {idx}/{total}"]
progress = idx / total
self.task.send_progress(message, progress=progress)
def _reindex_single_video(self, youtube_id):
def _reindex_single_video(self, youtube_id: str) -> None:
"""refresh data for single video"""
video = YoutubeVideo(youtube_id)
@ -334,9 +347,7 @@ class Reindex(ReindexBase):
Comments(youtube_id, config=self.config).reindex_comments()
self.processed["videos"] += 1
return
def _reindex_single_channel(self, channel_id):
def _reindex_single_channel(self, channel_id: str) -> None:
"""refresh channel data and sync to videos"""
# read current state
channel = YoutubeChannel(channel_id)
@ -367,7 +378,7 @@ class Reindex(ReindexBase):
ChannelFullScan(channel_id).scan()
self.processed["channels"] += 1
def _reindex_single_playlist(self, playlist_id):
def _reindex_single_playlist(self, playlist_id: str) -> None:
"""refresh playlist data"""
playlist = YoutubePlaylist(playlist_id)
playlist.get_from_es()
@ -383,9 +394,8 @@ class Reindex(ReindexBase):
return
self.processed["playlists"] += 1
return
def cookie_is_valid(self):
def cookie_is_valid(self) -> bool:
"""return true if cookie is enabled and valid"""
if not self.config["downloads"]["cookie_import"]:
# is not activated, continue reindex
@ -394,7 +404,7 @@ class Reindex(ReindexBase):
valid = CookieHandler(self.config).validate()
return valid
def build_message(self):
def build_message(self) -> str:
"""build progress message"""
message = ""
for key, value in self.processed.items():
@ -424,7 +434,7 @@ class ReindexProgress(ReindexBase):
self.request_type = request_type
self.request_id = request_id
def get_progress(self):
def get_progress(self) -> dict:
"""get progress from task"""
queue_name, request_type = self._get_queue_name()
total = self._get_total_in_queue(queue_name)

@ -155,13 +155,13 @@ class RedisQueue(RedisBase):
mapping = {i[1]: next_score + i[0] for i in enumerate(to_add)}
self.conn.zadd(self.key, mapping)
def max_score(self) -> float | None:
def max_score(self) -> int | None:
"""get max score"""
last = self.conn.zrange(self.key, -1, -1, withscores=True)
if not last:
return None
return last[0][1]
return int(last[0][1])
def _get_next_score(self) -> float:
"""get next score in queue to append"""
@ -171,13 +171,13 @@ class RedisQueue(RedisBase):
return last[0][1] + 1
def get_next(self) -> tuple[str | None, float | None]:
def get_next(self) -> tuple[str | None, int | None]:
"""return next element in the queue, if available"""
result = self.conn.zpopmin(self.key)
if not result:
return None, None
item, idx = result[0]
item, idx = result[0][0], int(result[0][1]) + 1
return item, idx

Loading…
Cancel
Save