From 1d9c27439085f9f8b9705c12055c3427ff1952f9 Mon Sep 17 00:00:00 2001 From: Simon Date: Wed, 15 May 2024 13:18:28 +0200 Subject: [PATCH] better reindex_type handler, fix off by one --- tubearchivist/home/src/index/reindex.py | 90 ++++++++++++++----------- tubearchivist/home/src/ta/ta_redis.py | 8 +-- 2 files changed, 54 insertions(+), 44 deletions(-) diff --git a/tubearchivist/home/src/index/reindex.py b/tubearchivist/home/src/index/reindex.py index 441f033b..1db975e5 100644 --- a/tubearchivist/home/src/index/reindex.py +++ b/tubearchivist/home/src/index/reindex.py @@ -8,6 +8,7 @@ import json import os from datetime import datetime from time import sleep +from typing import Callable, TypedDict from home.models import CustomPeriodicTask from home.src.download.subscriptions import ChannelSubscription @@ -23,10 +24,19 @@ from home.src.ta.settings import EnvironmentSettings from home.src.ta.ta_redis import RedisQueue +class ReindexConfigType(TypedDict): + """represents config type""" + + index_name: str + queue_name: str + active_key: str + refresh_key: str + + class ReindexBase: """base config class for reindex task""" - REINDEX_CONFIG = { + REINDEX_CONFIG: dict[str, ReindexConfigType] = { "video": { "index_name": "ta_video", "queue_name": "reindex:ta_video", @@ -54,7 +64,7 @@ class ReindexBase: self.config = AppConfig().config self.now = int(datetime.now().timestamp()) - def populate(self, all_ids, reindex_config): + def populate(self, all_ids, reindex_config: ReindexConfigType): """add all to reindex ids to redis queue""" if not all_ids: return @@ -65,13 +75,13 @@ class ReindexBase: class ReindexPopulate(ReindexBase): """add outdated and recent documents to reindex queue""" - INTERVAL_DEFAIULT = 90 + INTERVAL_DEFAIULT: int = 90 def __init__(self): super().__init__() self.interval = self.INTERVAL_DEFAIULT - def get_interval(self): + def get_interval(self) -> None: """get reindex days interval from task""" try: task = CustomPeriodicTask.objects.get(name="check_reindex") @@ -82,7 +92,7 @@ class ReindexPopulate(ReindexBase): if task_config.get("days"): self.interval = task_config.get("days") - def add_recent(self): + def add_recent(self) -> None: """add recent videos to refresh""" gte = datetime.fromtimestamp(self.now - self.DAYS3).date().isoformat() must_list = [ @@ -100,10 +110,10 @@ class ReindexPopulate(ReindexBase): return all_ids = [i["_source"]["youtube_id"] for i in hits] - reindex_config = self.REINDEX_CONFIG.get("video") + reindex_config: ReindexConfigType = self.REINDEX_CONFIG["video"] self.populate(all_ids, reindex_config) - def add_outdated(self): + def add_outdated(self) -> None: """add outdated documents""" for reindex_config in self.REINDEX_CONFIG.values(): total_hits = self._get_total_hits(reindex_config) @@ -112,7 +122,7 @@ class ReindexPopulate(ReindexBase): self.populate(all_ids, reindex_config) @staticmethod - def _get_total_hits(reindex_config): + def _get_total_hits(reindex_config: ReindexConfigType) -> int: """get total hits from index""" index_name = reindex_config["index_name"] active_key = reindex_config["active_key"] @@ -124,7 +134,7 @@ class ReindexPopulate(ReindexBase): return len(total) - def _get_daily_should(self, total_hits): + def _get_daily_should(self, total_hits: int) -> int: """calc how many should reindex daily""" daily_should = int((total_hits // self.interval + 1) * self.MULTIPLY) if daily_should >= 10000: @@ -132,7 +142,9 @@ class ReindexPopulate(ReindexBase): return daily_should - def _get_outdated_ids(self, reindex_config, daily_should): + def _get_outdated_ids( + self, reindex_config: ReindexConfigType, daily_should: int + ) -> list[str]: """get outdated from index_name""" index_name = reindex_config["index_name"] refresh_key = reindex_config["refresh_key"] @@ -169,7 +181,7 @@ class ReindexManual(ReindexBase): self.extract_videos = extract_videos self.data = False - def extract_data(self, data): + def extract_data(self, data) -> None: """process data""" self.data = data for key, values in self.data.items(): @@ -180,7 +192,9 @@ class ReindexManual(ReindexBase): self.process_index(reindex_config, values) - def process_index(self, index_config, values): + def process_index( + self, index_config: ReindexConfigType, values: list[str] + ) -> None: """process values per index""" index_name = index_config["index_name"] if index_name == "ta_video": @@ -190,7 +204,7 @@ class ReindexManual(ReindexBase): elif index_name == "ta_playlist": self._add_playlists(values) - def _add_videos(self, values): + def _add_videos(self, values: list[str]) -> None: """add list of videos to reindex queue""" if not values: return @@ -198,7 +212,7 @@ class ReindexManual(ReindexBase): queue_name = self.REINDEX_CONFIG["video"]["queue_name"] RedisQueue(queue_name).add_list(values) - def _add_channels(self, values): + def _add_channels(self, values: list[str]) -> None: """add list of channels to reindex queue""" queue_name = self.REINDEX_CONFIG["channel"]["queue_name"] RedisQueue(queue_name).add_list(values) @@ -208,7 +222,7 @@ class ReindexManual(ReindexBase): all_videos = self._get_channel_videos(channel_id) self._add_videos(all_videos) - def _add_playlists(self, values): + def _add_playlists(self, values: list[str]) -> None: """add list of playlists to reindex queue""" queue_name = self.REINDEX_CONFIG["playlist"]["queue_name"] RedisQueue(queue_name).add_list(values) @@ -218,7 +232,7 @@ class ReindexManual(ReindexBase): all_videos = self._get_playlist_videos(playlist_id) self._add_videos(all_videos) - def _get_channel_videos(self, channel_id): + def _get_channel_videos(self, channel_id: str) -> list[str]: """get all videos from channel""" data = { "query": {"term": {"channel.channel_id": {"value": channel_id}}}, @@ -227,7 +241,7 @@ class ReindexManual(ReindexBase): all_results = IndexPaginate("ta_video", data).get_results() return [i["youtube_id"] for i in all_results] - def _get_playlist_videos(self, playlist_id): + def _get_playlist_videos(self, playlist_id: str) -> list[str]: """get all videos from playlist""" data = { "query": {"term": {"playlist.keyword": {"value": playlist_id}}}, @@ -249,7 +263,7 @@ class Reindex(ReindexBase): "playlists": 0, } - def reindex_all(self): + def reindex_all(self) -> None: """reindex all in queue""" if not self.cookie_is_valid(): print("[reindex] cookie invalid, exiting...") @@ -259,27 +273,26 @@ class Reindex(ReindexBase): if not RedisQueue(index_config["queue_name"]).length(): continue - while True: - has_next = self.reindex_index(name, index_config) - if not has_next: - break + self.reindex_type(name, index_config) - def reindex_index(self, name, index_config): + def reindex_type(self, name: str, index_config: ReindexConfigType) -> None: """reindex all of a single index""" reindex = self._get_reindex_map(index_config["index_name"]) queue = RedisQueue(index_config["queue_name"]) - total = queue.max_score() - youtube_id, idx = queue.get_next() - if youtube_id: + while True: + total = queue.max_score() + youtube_id, idx = queue.get_next() + if not youtube_id or not idx or not total: + break + if self.task: self._notify(name, total, idx) + reindex(youtube_id) sleep_interval = self.config["downloads"].get("sleep_interval", 0) sleep(sleep_interval) - return bool(youtube_id) - - def _get_reindex_map(self, index_name): + def _get_reindex_map(self, index_name: str) -> Callable: """return def to run for index""" def_map = { "ta_video": self._reindex_single_video, @@ -287,15 +300,15 @@ class Reindex(ReindexBase): "ta_playlist": self._reindex_single_playlist, } - return def_map.get(index_name) + return def_map[index_name] - def _notify(self, name, total, idx): + def _notify(self, name: str, total: int, idx: int) -> None: """send notification back to task""" message = [f"Reindexing {name.title()}s {idx}/{total}"] progress = idx / total self.task.send_progress(message, progress=progress) - def _reindex_single_video(self, youtube_id): + def _reindex_single_video(self, youtube_id: str) -> None: """refresh data for single video""" video = YoutubeVideo(youtube_id) @@ -334,9 +347,7 @@ class Reindex(ReindexBase): Comments(youtube_id, config=self.config).reindex_comments() self.processed["videos"] += 1 - return - - def _reindex_single_channel(self, channel_id): + def _reindex_single_channel(self, channel_id: str) -> None: """refresh channel data and sync to videos""" # read current state channel = YoutubeChannel(channel_id) @@ -367,7 +378,7 @@ class Reindex(ReindexBase): ChannelFullScan(channel_id).scan() self.processed["channels"] += 1 - def _reindex_single_playlist(self, playlist_id): + def _reindex_single_playlist(self, playlist_id: str) -> None: """refresh playlist data""" playlist = YoutubePlaylist(playlist_id) playlist.get_from_es() @@ -383,9 +394,8 @@ class Reindex(ReindexBase): return self.processed["playlists"] += 1 - return - def cookie_is_valid(self): + def cookie_is_valid(self) -> bool: """return true if cookie is enabled and valid""" if not self.config["downloads"]["cookie_import"]: # is not activated, continue reindex @@ -394,7 +404,7 @@ class Reindex(ReindexBase): valid = CookieHandler(self.config).validate() return valid - def build_message(self): + def build_message(self) -> str: """build progress message""" message = "" for key, value in self.processed.items(): @@ -424,7 +434,7 @@ class ReindexProgress(ReindexBase): self.request_type = request_type self.request_id = request_id - def get_progress(self): + def get_progress(self) -> dict: """get progress from task""" queue_name, request_type = self._get_queue_name() total = self._get_total_in_queue(queue_name) diff --git a/tubearchivist/home/src/ta/ta_redis.py b/tubearchivist/home/src/ta/ta_redis.py index 94859e2c..4e590407 100644 --- a/tubearchivist/home/src/ta/ta_redis.py +++ b/tubearchivist/home/src/ta/ta_redis.py @@ -155,13 +155,13 @@ class RedisQueue(RedisBase): mapping = {i[1]: next_score + i[0] for i in enumerate(to_add)} self.conn.zadd(self.key, mapping) - def max_score(self) -> float | None: + def max_score(self) -> int | None: """get max score""" last = self.conn.zrange(self.key, -1, -1, withscores=True) if not last: return None - return last[0][1] + return int(last[0][1]) def _get_next_score(self) -> float: """get next score in queue to append""" @@ -171,13 +171,13 @@ class RedisQueue(RedisBase): return last[0][1] + 1 - def get_next(self) -> tuple[str | None, float | None]: + def get_next(self) -> tuple[str | None, int | None]: """return next element in the queue, if available""" result = self.conn.zpopmin(self.key) if not result: return None, None - item, idx = result[0] + item, idx = result[0][0], int(result[0][1]) + 1 return item, idx