From 9b30c7df6eaa20186eaf18e6baed7b1837ec96b3 Mon Sep 17 00:00:00 2001 From: simon Date: Sat, 20 May 2023 16:07:33 +0700 Subject: [PATCH] refacter filesystem scanner --- tubearchivist/home/src/index/filesystem.py | 257 +++++++-------------- tubearchivist/home/tasks.py | 6 +- 2 files changed, 92 insertions(+), 171 deletions(-) diff --git a/tubearchivist/home/src/index/filesystem.py b/tubearchivist/home/src/index/filesystem.py index 75f4724c..7594ee04 100644 --- a/tubearchivist/home/src/index/filesystem.py +++ b/tubearchivist/home/src/index/filesystem.py @@ -1,198 +1,85 @@ """ Functionality: -- reindexing old documents -- syncing updated values between indexes - scan the filesystem to delete or index """ -import json import os -from home.src.download.queue import PendingList -from home.src.es.connect import ElasticWrap +from home.src.es.connect import ElasticWrap, IndexPaginate from home.src.index.comments import CommentList -from home.src.index.video import index_new_video +from home.src.index.video import YoutubeVideo, index_new_video from home.src.ta.config import AppConfig -from home.src.ta.helper import clean_string, ignore_filelist -from PIL import ImageFile +from home.src.ta.helper import ignore_filelist -ImageFile.LOAD_TRUNCATED_IMAGES = True +class Scanner: + """scan index and filesystem""" -class ScannerBase: - """scan the filesystem base class""" + VIDEOS = AppConfig().config["application"]["videos"] - CONFIG = AppConfig().config - VIDEOS = CONFIG["application"]["videos"] - - def __init__(self): - self.to_index = False + def __init__(self, task=False): + self.task = task self.to_delete = False - self.mismatch = False - self.to_rename = False + self.to_index = False def scan(self): - """entry point, scan and compare""" - all_downloaded = self._get_all_downloaded() - all_indexed = self._get_all_indexed() - self.list_comarison(all_downloaded, all_indexed) - - def _get_all_downloaded(self): - """get a list of all video files downloaded""" - channels = os.listdir(self.VIDEOS) - all_channels = ignore_filelist(channels) - all_channels.sort() - all_downloaded = [] - for channel_name in all_channels: - channel_path = os.path.join(self.VIDEOS, channel_name) - channel_files = os.listdir(channel_path) - channel_files_clean = ignore_filelist(channel_files) - all_videos = [i for i in channel_files_clean if i.endswith(".mp4")] - for video in all_videos: - youtube_id = video[9:20] - all_downloaded.append((channel_name, video, youtube_id)) - - return all_downloaded - - @staticmethod - def _get_all_indexed(): - """get a list of all indexed videos""" - index_handler = PendingList() - index_handler.get_download() - index_handler.get_indexed() - - all_indexed = [] - for video in index_handler.all_videos: - youtube_id = video["youtube_id"] - media_url = video["media_url"] - published = video["published"] - title = video["title"] - all_indexed.append((youtube_id, media_url, published, title)) - return all_indexed - - def list_comarison(self, all_downloaded, all_indexed): - """compare the lists to figure out what to do""" - self._find_unindexed(all_downloaded, all_indexed) - self._find_missing(all_downloaded, all_indexed) - self._find_bad_media_url(all_downloaded, all_indexed) - - def _find_unindexed(self, all_downloaded, all_indexed): - """find video files without a matching document indexed""" - all_indexed_ids = [i[0] for i in all_indexed] - self.to_index = [] - for downloaded in all_downloaded: - if downloaded[2] not in all_indexed_ids: - self.to_index.append(downloaded) - - def _find_missing(self, all_downloaded, all_indexed): - """find indexed videos without matching media file""" - all_downloaded_ids = [i[2] for i in all_downloaded] - self.to_delete = [] - for video in all_indexed: - youtube_id = video[0] - if youtube_id not in all_downloaded_ids: - self.to_delete.append(video) - - def _find_bad_media_url(self, all_downloaded, all_indexed): - """rename media files not matching the indexed title""" - self.mismatch = [] - self.to_rename = [] - - for downloaded in all_downloaded: - channel, filename, downloaded_id = downloaded - # find in indexed - for indexed in all_indexed: - indexed_id, media_url, published, title = indexed - if indexed_id == downloaded_id: - # found it - pub = published.replace("-", "") - expected = f"{pub}_{indexed_id}_{clean_string(title)}.mp4" - new_url = os.path.join(channel, expected) - if expected != filename: - # file to rename - self.to_rename.append((channel, filename, expected)) - if media_url != new_url: - # media_url to update in es - self.mismatch.append((indexed_id, new_url)) - - break - - -class Filesystem(ScannerBase): - """handle scanning and fixing from filesystem""" + """scan the filesystem""" + downloaded = self._get_downloaded() + indexed = self._get_indexed() + self.to_index = downloaded - indexed + self.to_delete = indexed - downloaded + + def _get_downloaded(self): + """get downloaded ids""" + if self.task: + self.task.send_progress(["Scan your filesystem for videos."]) - def __init__(self, task=False): - super().__init__() - self.task = task + downloaded = set() + channels = ignore_filelist(os.listdir(self.VIDEOS)) + for channel in channels: + folder = os.path.join(self.VIDEOS, channel) + files = ignore_filelist(os.listdir(folder)) + downloaded.update(set(i.split(".")[0] for i in files)) - def process(self): - """entry point""" - if self.task: - self.task.send_progress(["Scanning your archive and index."]) - self.scan() - self.rename_files() - self.send_mismatch_bulk() - self.delete_from_index() - self.add_missing() - - def rename_files(self): - """rename media files as identified by find_bad_media_url""" - if not self.to_rename: - return + return downloaded - total = len(self.to_rename) + def _get_indexed(self): + """get all indexed ids""" if self.task: - self.task.send_progress([f"Rename {total} media files."]) - for bad_filename in self.to_rename: - channel, filename, expected_filename = bad_filename - print(f"renaming [{filename}] to [{expected_filename}]") - old_path = os.path.join(self.VIDEOS, channel, filename) - new_path = os.path.join(self.VIDEOS, channel, expected_filename) - os.rename(old_path, new_path) - - def send_mismatch_bulk(self): - """build bulk update""" - if not self.mismatch: - return + self.task.send_progress(["Get all videos indexed."]) - total = len(self.mismatch) - if self.task: - self.task.send_progress([f"Fix media urls for {total} files"]) - bulk_list = [] - for video_mismatch in self.mismatch: - youtube_id, media_url = video_mismatch - print(f"{youtube_id}: fixing media url {media_url}") - action = {"update": {"_id": youtube_id, "_index": "ta_video"}} - source = {"doc": {"media_url": media_url}} - bulk_list.append(json.dumps(action)) - bulk_list.append(json.dumps(source)) - # add last newline - bulk_list.append("\n") - data = "\n".join(bulk_list) - _, _ = ElasticWrap("_bulk").post(data=data, ndjson=True) - - def delete_from_index(self): - """find indexed but deleted mediafile""" + data = {"query": {"match_all": {}}, "_source": ["youtube_id"]} + response = IndexPaginate("ta_video", data).get_results() + return set(i["youtube_id"] for i in response) + + def apply(self): + """apply all changes""" + self.delete() + self.index() + self.url_fix() + + def delete(self): + """delete videos from index""" if not self.to_delete: + print("nothing to delete") return - total = len(self.to_delete) if self.task: - self.task.send_progress([f"Clean up {total} items from index."]) - for indexed in self.to_delete: - youtube_id = indexed[0] - print(f"deleting {youtube_id} from index") - path = f"ta_video/_doc/{youtube_id}" - _, _ = ElasticWrap(path).delete() - - def add_missing(self): - """add missing videos to index""" - video_ids = [i[2] for i in self.to_index] - if not video_ids: + self.task.send_progress( + [f"Remove {len(self.to_delete)} videos from index."] + ) + + for youtube_id in self.to_delete: + YoutubeVideo(youtube_id).delete_media_file() + + def index(self): + """index new""" + if not self.to_index: + print("nothing to index") return - total = len(video_ids) - for idx, youtube_id in enumerate(video_ids): + total = len(self.to_index) + for idx, youtube_id in enumerate(self.to_index): if self.task: self.task.send_progress( message_lines=[ @@ -202,4 +89,36 @@ class Filesystem(ScannerBase): ) index_new_video(youtube_id) - CommentList(video_ids, task=self.task).index() + CommentList(self.to_index, task=self.task).index() + + def url_fix(self): + """ + update path v0.3.6 to v0.3.7 + fix url not matching channel-videoid pattern + """ + bool_must = ( + "doc['media_url'].value == " + + "(doc['channel.channel_id'].value + '/' + " + + "doc['youtube_id'].value) + '.mp4'" + ) + to_update = ( + "ctx._source['media_url'] = " + + "ctx._source.channel['channel_id'] + '/' + " + + "ctx._source['youtube_id'] + '.mp4'" + ) + data = { + "query": { + "bool": { + "must_not": [{"script": {"script": {"source": bool_must}}}] + } + }, + "script": {"source": to_update}, + } + response, _ = ElasticWrap("ta_video/_update_by_query").post(data=data) + updated = response.get("updates") + if updated: + print(f"updated {updated} bad media_url") + if self.task: + self.task.send_progress( + [f"Updated {updated} wrong media urls."] + ) diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index 1251891f..c5907839 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -19,7 +19,7 @@ from home.src.download.yt_dlp_handler import VideoDownloader from home.src.es.backup import ElasticBackup from home.src.es.index_setup import ElasitIndexWrap from home.src.index.channel import YoutubeChannel -from home.src.index.filesystem import Filesystem +from home.src.index.filesystem import Scanner from home.src.index.manual import ImportFolderScanner from home.src.index.reindex import Reindex, ReindexManual, ReindexPopulate from home.src.ta.config import AppConfig, ReleaseVersion, ScheduleBuilder @@ -290,7 +290,9 @@ def rescan_filesystem(self): return manager.init(self) - Filesystem(task=self).process() + handler = Scanner(task=self) + handler.scan() + handler.apply() ThumbValidator(task=self).validate()