From f1b89345e24451ffedeba391bf8428231d4ae302 Mon Sep 17 00:00:00 2001 From: simon Date: Sun, 27 Nov 2022 15:41:59 +0700 Subject: [PATCH] refactor ElasticBackup and ElasticIndex for better startup handling --- tubearchivist/home/apps.py | 6 +- tubearchivist/home/src/es/backup.py | 224 +++++++++++++++ tubearchivist/home/src/es/index_setup.py | 334 +++++------------------ tubearchivist/home/src/es/snapshot.py | 2 +- tubearchivist/home/src/ta/helper.py | 9 + tubearchivist/home/tasks.py | 8 +- tubearchivist/home/views.py | 4 +- 7 files changed, 306 insertions(+), 281 deletions(-) create mode 100644 tubearchivist/home/src/es/backup.py diff --git a/tubearchivist/home/apps.py b/tubearchivist/home/apps.py index ef98c800..bd823c0b 100644 --- a/tubearchivist/home/apps.py +++ b/tubearchivist/home/apps.py @@ -5,7 +5,7 @@ import sys from django.apps import AppConfig from home.src.es.connect import ElasticWrap -from home.src.es.index_setup import index_check +from home.src.es.index_setup import ElasitIndexWrap from home.src.es.snapshot import ElasticSnapshot from home.src.ta.config import AppConfig as ArchivistConfig from home.src.ta.helper import clear_dl_cache @@ -28,11 +28,11 @@ class StartupCheck: print("run startup checks") self.es_version_check() self.release_lock() - clear_dl_cache(self.config_handler.config) - index_check() + ElasitIndexWrap().setup() self.sync_redis_state() self.set_redis_conf() self.make_folders() + clear_dl_cache(self.config_handler.config) self.snapshot_check() self.set_has_run() diff --git a/tubearchivist/home/src/es/backup.py b/tubearchivist/home/src/es/backup.py new file mode 100644 index 00000000..41326b69 --- /dev/null +++ b/tubearchivist/home/src/es/backup.py @@ -0,0 +1,224 @@ +""" +Functionality: +- Handle json zip file based backup +- create backup +- restore backup +""" + +import json +import os +import zipfile +from datetime import datetime + +from home.src.es.connect import ElasticWrap, IndexPaginate +from home.src.ta.config import AppConfig +from home.src.ta.helper import get_mapping, ignore_filelist + + +class ElasticBackup: + """dump index to nd-json files for later bulk import""" + + def __init__(self, reason=False): + self.config = AppConfig().config + self.cache_dir = self.config["application"]["cache_dir"] + self.timestamp = datetime.now().strftime("%Y%m%d") + self.index_config = get_mapping() + self.reason = reason + + def backup_all_indexes(self): + """backup all indexes, add reason to init""" + print("backup all indexes") + if not self.reason: + raise ValueError("missing backup reason in ElasticBackup") + + for index in self.index_config: + index_name = index["index_name"] + print(f"backup: export in progress for {index_name}") + if not self.index_exists(index_name): + print(f"skip backup for not yet existing index {index_name}") + continue + + self.backup_index(index_name) + + self.zip_it() + if self.reason == "auto": + self.rotate_backup() + + @staticmethod + def backup_index(index_name): + """export all documents of a single index""" + data = { + "query": {"match_all": {}}, + "sort": [{"_doc": {"order": "desc"}}], + } + paginate = IndexPaginate( + f"ta_{index_name}", data, keep_source=True, callback=BackupCallback + ) + _ = paginate.get_results() + + def zip_it(self): + """pack it up into single zip file""" + file_name = f"ta_backup-{self.timestamp}-{self.reason}.zip" + folder = os.path.join(self.cache_dir, "backup") + + to_backup = [] + for file in os.listdir(folder): + if file.endswith(".json"): + to_backup.append(os.path.join(folder, file)) + + backup_file = os.path.join(folder, file_name) + + comp = zipfile.ZIP_DEFLATED + with zipfile.ZipFile(backup_file, "w", compression=comp) as zip_f: + for backup_file in to_backup: + zip_f.write(backup_file, os.path.basename(backup_file)) + + # cleanup + for backup_file in to_backup: + os.remove(backup_file) + + def post_bulk_restore(self, file_name): + """send bulk to es""" + file_path = os.path.join(self.cache_dir, file_name) + with open(file_path, "r", encoding="utf-8") as f: + data = f.read() + + if not data.strip(): + return + + _, _ = ElasticWrap("_bulk").post(data=data, ndjson=True) + + def get_all_backup_files(self): + """build all available backup files for view""" + backup_dir = os.path.join(self.cache_dir, "backup") + backup_files = os.listdir(backup_dir) + all_backup_files = ignore_filelist(backup_files) + all_available_backups = [ + i + for i in all_backup_files + if i.startswith("ta_") and i.endswith(".zip") + ] + all_available_backups.sort(reverse=True) + + backup_dicts = [] + for backup_file in all_available_backups: + file_split = backup_file.split("-") + if len(file_split) == 2: + timestamp = file_split[1].strip(".zip") + reason = False + elif len(file_split) == 3: + timestamp = file_split[1] + reason = file_split[2].strip(".zip") + + to_add = { + "filename": backup_file, + "timestamp": timestamp, + "reason": reason, + } + backup_dicts.append(to_add) + + return backup_dicts + + def restore(self, filename): + """ + restore from backup zip file + call reset from ElasitIndexWrap first to start blank + """ + zip_content = self._unpack_zip_backup(filename) + self._restore_json_files(zip_content) + + def _unpack_zip_backup(self, filename): + """extract backup zip and return filelist""" + backup_dir = os.path.join(self.cache_dir, "backup") + file_path = os.path.join(backup_dir, filename) + + with zipfile.ZipFile(file_path, "r") as z: + zip_content = z.namelist() + z.extractall(backup_dir) + + return zip_content + + def _restore_json_files(self, zip_content): + """go through the unpacked files and restore""" + backup_dir = os.path.join(self.cache_dir, "backup") + + for json_f in zip_content: + + file_name = os.path.join(backup_dir, json_f) + + if not json_f.startswith("es_") or not json_f.endswith(".json"): + os.remove(file_name) + continue + + print("restoring: " + json_f) + self.post_bulk_restore(file_name) + os.remove(file_name) + + @staticmethod + def index_exists(index_name): + """check if index already exists to skip""" + _, status_code = ElasticWrap(f"ta_{index_name}").get() + exists = status_code == 200 + + return exists + + def rotate_backup(self): + """delete old backups if needed""" + rotate = self.config["scheduler"]["run_backup_rotate"] + if not rotate: + return + + all_backup_files = self.get_all_backup_files() + auto = [i for i in all_backup_files if i["reason"] == "auto"] + + if len(auto) <= rotate: + print("no backup files to rotate") + return + + backup_dir = os.path.join(self.cache_dir, "backup") + + all_to_delete = auto[rotate:] + for to_delete in all_to_delete: + file_path = os.path.join(backup_dir, to_delete["filename"]) + print(f"remove old backup file: {file_path}") + os.remove(file_path) + + +class BackupCallback: + """handle backup ndjson writer as callback for IndexPaginate""" + + def __init__(self, source, index_name): + self.source = source + self.index_name = index_name + self.timestamp = datetime.now().strftime("%Y%m%d") + + def run(self): + """run the junk task""" + file_content = self._build_bulk() + self._write_es_json(file_content) + + def _build_bulk(self): + """build bulk query data from all_results""" + bulk_list = [] + + for document in self.source: + document_id = document["_id"] + es_index = document["_index"] + action = {"index": {"_index": es_index, "_id": document_id}} + source = document["_source"] + bulk_list.append(json.dumps(action)) + bulk_list.append(json.dumps(source)) + + # add last newline + bulk_list.append("\n") + file_content = "\n".join(bulk_list) + + return file_content + + def _write_es_json(self, file_content): + """write nd-json file for es _bulk API to disk""" + cache_dir = AppConfig().config["application"]["cache_dir"] + file_name = f"es_{self.index_name.lstrip('ta_')}-{self.timestamp}.json" + file_path = os.path.join(cache_dir, "backup", file_name) + with open(file_path, "a+", encoding="utf-8") as f: + f.write(file_content) diff --git a/tubearchivist/home/src/es/index_setup.py b/tubearchivist/home/src/es/index_setup.py index d7e2d033..64cc9ed9 100644 --- a/tubearchivist/home/src/es/index_setup.py +++ b/tubearchivist/home/src/es/index_setup.py @@ -5,22 +5,15 @@ functionality: - backup and restore metadata """ -import json -import os -import zipfile -from datetime import datetime - -from home.src.es.connect import ElasticWrap, IndexPaginate -from home.src.ta.config import AppConfig -from home.src.ta.helper import ignore_filelist +from home.src.es.backup import ElasticBackup +from home.src.es.connect import ElasticWrap +from home.src.ta.helper import get_mapping class ElasticIndex: - """ - handle mapping and settings on elastic search for a given index - """ + """interact with a single index""" - def __init__(self, index_name, expected_map, expected_set): + def __init__(self, index_name, expected_map=False, expected_set=False): self.index_name = index_name self.expected_map = expected_map self.expected_set = expected_set @@ -61,23 +54,23 @@ class ElasticIndex: if list(value.keys()) == ["properties"]: for key_n, value_n in value["properties"].items(): if key not in now_map: - print(key_n, value_n) + print(f"detected mapping change: {key_n}, {value_n}") return True if key_n not in now_map[key]["properties"].keys(): - print(key_n, value_n) + print(f"detected mapping change: {key_n}, {value_n}") return True if not value_n == now_map[key]["properties"][key_n]: - print(key_n, value_n) + print(f"detected mapping change: {key_n}, {value_n}") return True continue # not nested if key not in now_map.keys(): - print(key, value) + print(f"detected mapping change: {key}, {value}") return True if not value == now_map[key]: - print(key, value) + print(f"detected mapping change: {key}, {value}") return True return False @@ -100,6 +93,7 @@ class ElasticIndex: def rebuild_index(self): """rebuild with new mapping""" + print(f"applying new mappings to index ta_{self.index_name}...") self.create_blank(for_backup=True) self.reindex("backup") self.delete_index(backup=False) @@ -129,6 +123,7 @@ class ElasticIndex: def create_blank(self, for_backup=False): """apply new mapping and settings for blank new index""" + print(f"create new blank index with name ta_{self.index_name}...") path = f"ta_{self.index_name}" if for_backup: path = f"{path}_backup" @@ -142,270 +137,65 @@ class ElasticIndex: _, _ = ElasticWrap(path).put(data) -class BackupCallback: - """handle backup ndjson writer as callback for IndexPaginate""" +class ElasitIndexWrap: + """interact with all index mapping and setup""" - def __init__(self, source, index_name): - self.source = source - self.index_name = index_name - self.timestamp = datetime.now().strftime("%Y%m%d") + def __init__(self): + self.index_config = get_mapping() + self.backup_run = False - def run(self): - """run the junk task""" - file_content = self._build_bulk() - self._write_es_json(file_content) - - def _build_bulk(self): - """build bulk query data from all_results""" - bulk_list = [] - - for document in self.source: - document_id = document["_id"] - es_index = document["_index"] - action = {"index": {"_index": es_index, "_id": document_id}} - source = document["_source"] - bulk_list.append(json.dumps(action)) - bulk_list.append(json.dumps(source)) - - # add last newline - bulk_list.append("\n") - file_content = "\n".join(bulk_list) - - return file_content - - def _write_es_json(self, file_content): - """write nd-json file for es _bulk API to disk""" - cache_dir = AppConfig().config["application"]["cache_dir"] - file_name = f"es_{self.index_name.lstrip('ta_')}-{self.timestamp}.json" - file_path = os.path.join(cache_dir, "backup", file_name) - with open(file_path, "a+", encoding="utf-8") as f: - f.write(file_content) - - -class ElasticBackup: - """dump index to nd-json files for later bulk import""" - - def __init__(self, index_config, reason): - self.config = AppConfig().config - self.cache_dir = self.config["application"]["cache_dir"] - self.timestamp = datetime.now().strftime("%Y%m%d") - self.index_config = index_config - self.reason = reason - - @staticmethod - def backup_index(index_name): - """export all documents of a single index""" - data = { - "query": {"match_all": {}}, - "sort": [{"_doc": {"order": "desc"}}], - } - paginate = IndexPaginate( - f"ta_{index_name}", data, keep_source=True, callback=BackupCallback - ) - _ = paginate.get_results() - - def zip_it(self): - """pack it up into single zip file""" - file_name = f"ta_backup-{self.timestamp}-{self.reason}.zip" - folder = os.path.join(self.cache_dir, "backup") - - to_backup = [] - for file in os.listdir(folder): - if file.endswith(".json"): - to_backup.append(os.path.join(folder, file)) - - backup_file = os.path.join(folder, file_name) - - comp = zipfile.ZIP_DEFLATED - with zipfile.ZipFile(backup_file, "w", compression=comp) as zip_f: - for backup_file in to_backup: - zip_f.write(backup_file, os.path.basename(backup_file)) - - # cleanup - for backup_file in to_backup: - os.remove(backup_file) - - def post_bulk_restore(self, file_name): - """send bulk to es""" - file_path = os.path.join(self.cache_dir, file_name) - with open(file_path, "r", encoding="utf-8") as f: - data = f.read() - - if not data.strip(): - return - - _, _ = ElasticWrap("_bulk").post(data=data, ndjson=True) - - def get_all_backup_files(self): - """build all available backup files for view""" - backup_dir = os.path.join(self.cache_dir, "backup") - backup_files = os.listdir(backup_dir) - all_backup_files = ignore_filelist(backup_files) - all_available_backups = [ - i - for i in all_backup_files - if i.startswith("ta_") and i.endswith(".zip") - ] - all_available_backups.sort(reverse=True) - - backup_dicts = [] - for backup_file in all_available_backups: - file_split = backup_file.split("-") - if len(file_split) == 2: - timestamp = file_split[1].strip(".zip") - reason = False - elif len(file_split) == 3: - timestamp = file_split[1] - reason = file_split[2].strip(".zip") - - to_add = { - "filename": backup_file, - "timestamp": timestamp, - "reason": reason, - } - backup_dicts.append(to_add) - - return backup_dicts - - def unpack_zip_backup(self, filename): - """extract backup zip and return filelist""" - backup_dir = os.path.join(self.cache_dir, "backup") - file_path = os.path.join(backup_dir, filename) - - with zipfile.ZipFile(file_path, "r") as z: - zip_content = z.namelist() - z.extractall(backup_dir) - - return zip_content - - def restore_json_files(self, zip_content): - """go through the unpacked files and restore""" - backup_dir = os.path.join(self.cache_dir, "backup") - - for json_f in zip_content: - - file_name = os.path.join(backup_dir, json_f) - - if not json_f.startswith("es_") or not json_f.endswith(".json"): - os.remove(file_name) + def setup(self): + """setup elastic index, run at startup""" + for index in self.index_config: + index_name, expected_map, expected_set = self._config_split(index) + handler = ElasticIndex(index_name, expected_map, expected_set) + if not handler.exists: + handler.create_blank() continue - print("restoring: " + json_f) - self.post_bulk_restore(file_name) - os.remove(file_name) + rebuild = handler.validate() + if rebuild: + self._check_backup() + handler.rebuild_index() + continue + + # else all good + print(f"ta_{index_name} index is created and up to date...") + + def reset(self): + """reset all indexes to blank""" + self.delete_all() + self.create_all_blank() + + def delete_all(self): + """delete all indexes""" + print("reset elastic index") + for index in self.index_config: + index_name, _, _ = self._config_split(index) + handler = ElasticIndex(index_name) + handler.delete_index(backup=False) + + def create_all_blank(self): + """create all blank indexes""" + print("create all new indexes in elastic from template") + for index in self.index_config: + index_name, expected_map, expected_set = self._config_split(index) + handler = ElasticIndex(index_name, expected_map, expected_set) + handler.create_blank() @staticmethod - def index_exists(index_name): - """check if index already exists to skip""" - _, status_code = ElasticWrap(f"ta_{index_name}").get() - exists = status_code == 200 - - return exists - - def rotate_backup(self): - """delete old backups if needed""" - rotate = self.config["scheduler"]["run_backup_rotate"] - if not rotate: - return - - all_backup_files = self.get_all_backup_files() - auto = [i for i in all_backup_files if i["reason"] == "auto"] - - if len(auto) <= rotate: - print("no backup files to rotate") - return - - backup_dir = os.path.join(self.cache_dir, "backup") - - all_to_delete = auto[rotate:] - for to_delete in all_to_delete: - file_path = os.path.join(backup_dir, to_delete["filename"]) - print(f"remove old backup file: {file_path}") - os.remove(file_path) - - -def get_mapping(): - """read index_mapping.json and get expected mapping and settings""" - with open("home/src/es/index_mapping.json", "r", encoding="utf-8") as f: - index_config = json.load(f).get("index_config") - - return index_config - - -def index_check(force_restore=False): - """check if all indexes are created and have correct mapping""" - - backed_up = False - index_config = get_mapping() - - for index in index_config: + def _config_split(index): + """split index config keys""" index_name = index["index_name"] expected_map = index["expected_map"] expected_set = index["expected_set"] - handler = ElasticIndex(index_name, expected_map, expected_set) - # force restore - if force_restore: - handler.delete_index(backup=False) - handler.create_blank() - continue - # create new - if not handler.exists: - print(f"create new blank index with name ta_{index_name}...") - handler.create_blank() - continue + return index_name, expected_map, expected_set - # validate index - rebuild = handler.validate() - if rebuild: - # make backup before rebuild - if not backed_up: - print("running backup first") - backup_all_indexes(reason="update") - backed_up = True + def _check_backup(self): + """create backup if needed""" + if self.backup_run: + return - print(f"applying new mappings to index ta_{index_name}...") - handler.rebuild_index() - continue - - # else all good - print(f"ta_{index_name} index is created and up to date...") - - -def get_available_backups(): - """return dict of available backups for settings view""" - index_config = get_mapping() - backup_handler = ElasticBackup(index_config, reason=False) - all_backup_files = backup_handler.get_all_backup_files() - return all_backup_files - - -def backup_all_indexes(reason): - """backup all es indexes to disk""" - index_config = get_mapping() - backup_handler = ElasticBackup(index_config, reason) - - for index in backup_handler.index_config: - index_name = index["index_name"] - print(f"backup: export in progress for {index_name}") - if not backup_handler.index_exists(index_name): - print(f"skip backup for not yet existing index {index_name}") - continue - - backup_handler.backup_index(index_name) - - backup_handler.zip_it() - - if reason == "auto": - backup_handler.rotate_backup() - - -def restore_from_backup(filename): - """restore indexes from backup file""" - # delete - index_check(force_restore=True) - # recreate - index_config = get_mapping() - backup_handler = ElasticBackup(index_config, reason=False) - zip_content = backup_handler.unpack_zip_backup(filename) - backup_handler.restore_json_files(zip_content) + ElasticBackup(reason="update").backup_all_indexes() + self.backup_run = True diff --git a/tubearchivist/home/src/es/snapshot.py b/tubearchivist/home/src/es/snapshot.py index e403fd89..19e49e21 100644 --- a/tubearchivist/home/src/es/snapshot.py +++ b/tubearchivist/home/src/es/snapshot.py @@ -8,7 +8,7 @@ from os import environ from zoneinfo import ZoneInfo from home.src.es.connect import ElasticWrap -from home.src.es.index_setup import get_mapping +from home.src.ta.helper import get_mapping class ElasticSnapshot: diff --git a/tubearchivist/home/src/ta/helper.py b/tubearchivist/home/src/ta/helper.py index 68d49b88..af17fc06 100644 --- a/tubearchivist/home/src/ta/helper.py +++ b/tubearchivist/home/src/ta/helper.py @@ -3,6 +3,7 @@ Loose collection of helper functions - don't import AppConfig class here to avoid circular imports """ +import json import os import random import re @@ -127,6 +128,14 @@ def clear_dl_cache(config): os.remove(to_delete) +def get_mapping(): + """read index_mapping.json and get expected mapping and settings""" + with open("home/src/es/index_mapping.json", "r", encoding="utf-8") as f: + index_config = json.load(f).get("index_config") + + return index_config + + class UrlListParser: """take a multi line string and detect valid youtube ids""" diff --git a/tubearchivist/home/tasks.py b/tubearchivist/home/tasks.py index 7bd4dea8..b868da50 100644 --- a/tubearchivist/home/tasks.py +++ b/tubearchivist/home/tasks.py @@ -17,7 +17,8 @@ from home.src.download.subscriptions import ( ) from home.src.download.thumbnails import ThumbFilesystem, ThumbValidator from home.src.download.yt_dlp_handler import VideoDownloader -from home.src.es.index_setup import backup_all_indexes, restore_from_backup +from home.src.es.backup import ElasticBackup +from home.src.es.index_setup import ElasitIndexWrap from home.src.index.channel import YoutubeChannel from home.src.index.filesystem import ( ImportFolderScanner, @@ -168,7 +169,7 @@ def run_backup(reason="auto"): try: have_lock = my_lock.acquire(blocking=False) if have_lock: - backup_all_indexes(reason) + ElasticBackup(reason=reason).backup_all_indexes() else: print("Did not acquire lock for backup task.") finally: @@ -180,7 +181,8 @@ def run_backup(reason="auto"): @shared_task def run_restore_backup(filename): """called from settings page, dump backup to zip file""" - restore_from_backup(filename) + ElasitIndexWrap().reset() + ElasticBackup().restore(filename) print("index restore finished") diff --git a/tubearchivist/home/views.py b/tubearchivist/home/views.py index f5788d09..18810ad9 100644 --- a/tubearchivist/home/views.py +++ b/tubearchivist/home/views.py @@ -16,8 +16,8 @@ from django.http import JsonResponse from django.shortcuts import redirect, render from django.views import View from home.src.download.yt_dlp_base import CookieHandler +from home.src.es.backup import ElasticBackup from home.src.es.connect import ElasticWrap -from home.src.es.index_setup import get_available_backups from home.src.es.snapshot import ElasticSnapshot from home.src.frontend.api_calls import PostData from home.src.frontend.forms import ( @@ -939,7 +939,7 @@ class SettingsView(View): config_handler = AppConfig(request.user.id) colors = config_handler.colors - available_backups = get_available_backups() + available_backups = ElasticBackup().get_all_backup_files() user_form = UserSettingsForm() app_form = ApplicationSettingsForm() scheduler_form = SchedulerSettingsForm()