From e7f960bf4606034b18f40d938690ebc68aa80e08 Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 26 Oct 2022 23:17:55 +0700 Subject: [PATCH] implement basics snapshot management --- tubearchivist/home/src/es/snapshot.py | 164 ++++++++++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 tubearchivist/home/src/es/snapshot.py diff --git a/tubearchivist/home/src/es/snapshot.py b/tubearchivist/home/src/es/snapshot.py new file mode 100644 index 00000000..3ddf78ab --- /dev/null +++ b/tubearchivist/home/src/es/snapshot.py @@ -0,0 +1,164 @@ +""" +functionality: +- handle snapshots in ES +""" + +from datetime import datetime +from os import environ +from zoneinfo import ZoneInfo + +from home.src.es.connect import ElasticWrap +from home.src.es.index_setup import get_mapping + + +class ElasticSnapshot: + """interact with snapshots on ES""" + + REPO = "ta_snapshot" + REPO_SETTINGS = { + "compress": "true", + "chunk_size": "1g", + "location": "/usr/share/elasticsearch/data/snapshot", + } + POLICY = "ta_daily" + + def __init__(self): + self.all_indices = self._get_all_indices() + + def _get_all_indices(self): + """return all indices names managed by TA""" + mapping = get_mapping() + all_indices = [f"ta_{i['index_name']}" for i in mapping] + + return all_indices + + def setup(self): + """setup the snapshot in ES, create or update if needed""" + repo_exists = self._check_repo_exists() + if not repo_exists: + self.create_repo() + + policy_exists = self._check_policy_exists() + if not policy_exists: + self.create_policy() + + def _check_repo_exists(self): + """check if expected repo already exists""" + path = f"_snapshot/{self.REPO}" + response, statuscode = ElasticWrap(path).get() + if statuscode == 200: + print(f"snapshot: repo {self.REPO} already created") + matching = response[self.REPO]["settings"] == self.REPO_SETTINGS + if not matching: + print(f"snapshot: update repo settings {self.REPO_SETTINGS}") + + return matching + + print(f"snapshot: setup repo {self.REPO} config {self.REPO_SETTINGS}") + return False + + def create_repo(self): + """create filesystem repo""" + path = f"_snapshot/{self.REPO}" + data = { + "type": "fs", + "settings": self.REPO_SETTINGS, + } + response, statuscode = ElasticWrap(path).post(data=data) + if statuscode == 200: + print(f"snapshot: repo setup correctly: {response}") + + def _check_policy_exists(self): + """check if snapshot policy is set correctly""" + path = f"_slm/policy/{self.POLICY}" + response, statuscode = ElasticWrap(path).get() + expected_policy = self._build_policy_data() + if statuscode == 200: + print(f"snapshot: policy {self.POLICY} exists") + matching = response["ta_daily"]["policy"] == expected_policy + if not matching: + print(f"snapshot: update policy settings {expected_policy}") + + return matching + + print(f"snapshot: create policy {self.POLICY} {expected_policy}") + return False + + def create_policy(self): + """create snapshot lifetime policy""" + path = f"_slm/policy/{self.POLICY}" + data = self._build_policy_data() + response, statuscode = ElasticWrap(path).put(data) + if statuscode == 200: + print(f"snapshot: policy setup correctly: {response}") + + def _build_policy_data(self): + """build policy dict from config""" + return { + "schedule": "0 30 1 * * ?", + "name": f"<{self.POLICY}_{{now/d}}>", + "repository": self.REPO, + "config": { + "indices": self.all_indices, + "include_global_state": True, + }, + "retention": { + "expire_after": "30d", + "min_count": 5, + "max_count": 50, + }, + } + + def take_snapshot_now(self): + """execute daily snapshot now""" + path = f"_slm/policy/{self.POLICY}/_execute" + response, statuscode = ElasticWrap(path).post() + if statuscode == 200: + print(f"snapshot: executing now: {response}") + + def get_all_snapshots(self): + """get a list of all registered snapshots""" + path = f"_snapshot/{self.REPO}/*?sort=start_time&order=desc" + response, statuscode = ElasticWrap(path).get() + if statuscode == 404: + print("snapshots: not configured") + return False + + all_snapshots = response["snapshots"] + if not all_snapshots: + print("snapshots: no snapshots found") + return False + + snap_dicts = [] + for snapshot in all_snapshots: + snap_dict = { + "id": snapshot["snapshot"], + "start": self._date_converter(snapshot["start_time"]), + "end": self._date_converter(snapshot["end_time"]), + "duration_s": snapshot["duration_in_millis"] // 1000, + } + snap_dicts.append(snap_dict) + + return snap_dicts + + @staticmethod + def _date_converter(date_utc): + """convert datetime string""" + expected_format = "%Y-%m-%dT%H:%M:%S.%fZ" + date = datetime.strptime(date_utc, expected_format) + local_datetime = date.replace(tzinfo=ZoneInfo("localtime")) + converted = local_datetime.astimezone(ZoneInfo(environ.get("TZ"))) + converted_str = converted.strftime("%Y-%m-%d %H:%M") + + return converted_str + + def restore_all(self, snapshot_name): + """restore snapshot by name""" + for index in self.all_indices: + _, _ = ElasticWrap(index).delete() + + path = f"_snapshot/{self.REPO}/{snapshot_name}/_restore" + data = {"indices": "*"} + response, statuscode = ElasticWrap(path).post(data=data) + if statuscode == 200: + print(f"snapshot: executing now: {response}")