mirror of
https://github.com/tubearchivist/tubearchivist
synced 2024-11-04 12:00:21 +00:00
Desc preview, nginx fix, #build
Changed: - Requires new env var: TA_HOST, hostname of your application - Showing preview for description text, by @p0358 - Fix nginx user permission, by @p0358 - Refactor IndexPaginate to take loop callback - Fix backup for large index using loop callback - Fix deactivating missing channels on reindex
This commit is contained in:
commit
393e00866b
@ -13,6 +13,7 @@ required="Missing required environment variable"
|
||||
[[ -f $lockfile ]] || : "${TA_USERNAME:?$required}"
|
||||
: "${TA_PASSWORD:?$required}"
|
||||
: "${ELASTIC_PASSWORD:?$required}"
|
||||
: "${TA_HOST:?$required}"
|
||||
|
||||
# ugly nginx and uwsgi port overwrite with env vars
|
||||
if [[ -n "$TA_PORT" ]]; then
|
||||
|
@ -30,7 +30,7 @@ SECRET_KEY = PW_HASH.hexdigest()
|
||||
# SECURITY WARNING: don't run with debug turned on in production!
|
||||
DEBUG = bool(environ.get("DJANGO_DEBUG"))
|
||||
|
||||
ALLOWED_HOSTS = ["*"]
|
||||
ALLOWED_HOSTS = [i.strip() for i in environ.get("TA_HOST").split()]
|
||||
|
||||
|
||||
# Application definition
|
||||
|
@ -91,16 +91,22 @@ class ElasticWrap:
|
||||
|
||||
|
||||
class IndexPaginate:
|
||||
"""use search_after to go through whole index"""
|
||||
"""use search_after to go through whole index
|
||||
kwargs:
|
||||
- size: int, overwrite DEFAULT_SIZE
|
||||
- keep_source: bool, keep _source key from es resutls
|
||||
- callback: obj, Class with run method collback for every loop
|
||||
"""
|
||||
|
||||
DEFAULT_SIZE = 500
|
||||
|
||||
def __init__(self, index_name, data, size=False, keep_source=False):
|
||||
def __init__(self, index_name, data, **kwargs):
|
||||
self.index_name = index_name
|
||||
self.data = data
|
||||
self.pit_id = False
|
||||
self.size = size
|
||||
self.keep_source = keep_source
|
||||
self.size = kwargs.get("size")
|
||||
self.keep_source = kwargs.get("keep_source")
|
||||
self.callback = kwargs.get("callback")
|
||||
|
||||
def get_results(self):
|
||||
"""get all results"""
|
||||
@ -122,14 +128,13 @@ class IndexPaginate:
|
||||
print(self.data)
|
||||
raise ValueError("missing sort key in data")
|
||||
|
||||
size = self.size or self.DEFAULT_SIZE
|
||||
|
||||
self.data["size"] = size
|
||||
self.data["size"] = self.size or self.DEFAULT_SIZE
|
||||
self.data["pit"] = {"id": self.pit_id, "keep_alive": "10m"}
|
||||
|
||||
def run_loop(self):
|
||||
"""loop through results until last hit"""
|
||||
all_results = []
|
||||
counter = 0
|
||||
while True:
|
||||
response, _ = ElasticWrap("_search").get(data=self.data)
|
||||
all_hits = response["hits"]["hits"]
|
||||
@ -139,10 +144,18 @@ class IndexPaginate:
|
||||
source = hit
|
||||
else:
|
||||
source = hit["_source"]
|
||||
search_after = hit["sort"]
|
||||
all_results.append(source)
|
||||
|
||||
if not self.callback:
|
||||
all_results.append(source)
|
||||
|
||||
if self.callback:
|
||||
self.callback(all_hits, self.index_name).run()
|
||||
if counter % 10 == 0:
|
||||
print(f"{self.index_name}: processing page {counter}")
|
||||
counter = counter + 1
|
||||
|
||||
# update search_after with last hit data
|
||||
self.data["search_after"] = search_after
|
||||
self.data["search_after"] = all_hits[-1]["sort"]
|
||||
else:
|
||||
break
|
||||
|
||||
|
@ -137,35 +137,24 @@ class ElasticIndex:
|
||||
_, _ = ElasticWrap(f"ta_{self.index_name}").put(data)
|
||||
|
||||
|
||||
class ElasticBackup:
|
||||
"""dump index to nd-json files for later bulk import"""
|
||||
class BackupCallback:
|
||||
"""handle backup ndjson writer as callback for IndexPaginate"""
|
||||
|
||||
def __init__(self, index_config, reason):
|
||||
self.config = AppConfig().config
|
||||
self.cache_dir = self.config["application"]["cache_dir"]
|
||||
self.index_config = index_config
|
||||
self.reason = reason
|
||||
def __init__(self, source, index_name):
|
||||
self.source = source
|
||||
self.index_name = index_name
|
||||
self.timestamp = datetime.now().strftime("%Y%m%d")
|
||||
self.backup_files = []
|
||||
|
||||
@staticmethod
|
||||
def get_all_documents(index_name):
|
||||
"""export all documents of a single index"""
|
||||
data = {
|
||||
"query": {"match_all": {}},
|
||||
"sort": [{"_doc": {"order": "desc"}}],
|
||||
}
|
||||
paginate = IndexPaginate(f"ta_{index_name}", data, keep_source=True)
|
||||
all_results = paginate.get_results()
|
||||
def run(self):
|
||||
"""run the junk task"""
|
||||
file_content = self._build_bulk()
|
||||
self._write_es_json(file_content)
|
||||
|
||||
return all_results
|
||||
|
||||
@staticmethod
|
||||
def build_bulk(all_results):
|
||||
def _build_bulk(self):
|
||||
"""build bulk query data from all_results"""
|
||||
bulk_list = []
|
||||
|
||||
for document in all_results:
|
||||
for document in self.source:
|
||||
document_id = document["_id"]
|
||||
es_index = document["_index"]
|
||||
action = {"index": {"_index": es_index, "_id": document_id}}
|
||||
@ -179,40 +168,56 @@ class ElasticBackup:
|
||||
|
||||
return file_content
|
||||
|
||||
def write_es_json(self, file_content, index_name):
|
||||
def _write_es_json(self, file_content):
|
||||
"""write nd-json file for es _bulk API to disk"""
|
||||
file_name = f"es_{index_name}-{self.timestamp}.json"
|
||||
file_path = os.path.join(self.cache_dir, "backup", file_name)
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
cache_dir = AppConfig().config["application"]["cache_dir"]
|
||||
file_name = f"es_{self.index_name.lstrip('ta_')}-{self.timestamp}.json"
|
||||
file_path = os.path.join(cache_dir, "backup", file_name)
|
||||
with open(file_path, "a+", encoding="utf-8") as f:
|
||||
f.write(file_content)
|
||||
|
||||
self.backup_files.append(file_path)
|
||||
|
||||
def write_ta_json(self, all_results, index_name):
|
||||
"""write generic json file to disk"""
|
||||
file_name = f"ta_{index_name}-{self.timestamp}.json"
|
||||
file_path = os.path.join(self.cache_dir, "backup", file_name)
|
||||
to_write = [i["_source"] for i in all_results]
|
||||
file_content = json.dumps(to_write)
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
f.write(file_content)
|
||||
class ElasticBackup:
|
||||
"""dump index to nd-json files for later bulk import"""
|
||||
|
||||
self.backup_files.append(file_path)
|
||||
def __init__(self, index_config, reason):
|
||||
self.config = AppConfig().config
|
||||
self.cache_dir = self.config["application"]["cache_dir"]
|
||||
self.timestamp = datetime.now().strftime("%Y%m%d")
|
||||
self.index_config = index_config
|
||||
self.reason = reason
|
||||
|
||||
@staticmethod
|
||||
def backup_index(index_name):
|
||||
"""export all documents of a single index"""
|
||||
data = {
|
||||
"query": {"match_all": {}},
|
||||
"sort": [{"_doc": {"order": "desc"}}],
|
||||
}
|
||||
paginate = IndexPaginate(
|
||||
f"ta_{index_name}", data, keep_source=True, callback=BackupCallback
|
||||
)
|
||||
_ = paginate.get_results()
|
||||
|
||||
def zip_it(self):
|
||||
"""pack it up into single zip file"""
|
||||
file_name = f"ta_backup-{self.timestamp}-{self.reason}.zip"
|
||||
backup_folder = os.path.join(self.cache_dir, "backup")
|
||||
backup_file = os.path.join(backup_folder, file_name)
|
||||
folder = os.path.join(self.cache_dir, "backup")
|
||||
|
||||
with zipfile.ZipFile(
|
||||
backup_file, "w", compression=zipfile.ZIP_DEFLATED
|
||||
) as zip_f:
|
||||
for backup_file in self.backup_files:
|
||||
to_backup = []
|
||||
for file in os.listdir(folder):
|
||||
if file.endswith(".json"):
|
||||
to_backup.append(os.path.join(folder, file))
|
||||
|
||||
backup_file = os.path.join(folder, file_name)
|
||||
|
||||
comp = zipfile.ZIP_DEFLATED
|
||||
with zipfile.ZipFile(backup_file, "w", compression=comp) as zip_f:
|
||||
for backup_file in to_backup:
|
||||
zip_f.write(backup_file, os.path.basename(backup_file))
|
||||
|
||||
# cleanup
|
||||
for backup_file in self.backup_files:
|
||||
for backup_file in to_backup:
|
||||
os.remove(backup_file)
|
||||
|
||||
def post_bulk_restore(self, file_name):
|
||||
@ -380,10 +385,7 @@ def backup_all_indexes(reason):
|
||||
print(f"backup: export in progress for {index_name}")
|
||||
if not backup_handler.index_exists(index_name):
|
||||
continue
|
||||
all_results = backup_handler.get_all_documents(index_name)
|
||||
file_content = backup_handler.build_bulk(all_results)
|
||||
backup_handler.write_es_json(file_content, index_name)
|
||||
backup_handler.write_ta_json(all_results, index_name)
|
||||
backup_handler.backup_index(index_name)
|
||||
|
||||
backup_handler.zip_it()
|
||||
|
||||
|
@ -37,6 +37,9 @@ class ChannelScraper:
|
||||
"""main method to return channel dict"""
|
||||
self.get_soup()
|
||||
self._extract_yt_json()
|
||||
if self._is_deactivated():
|
||||
return False
|
||||
|
||||
self._parse_channel_main()
|
||||
self._parse_channel_meta()
|
||||
return self.json_data
|
||||
@ -68,6 +71,16 @@ class ChannelScraper:
|
||||
json_raw = script_content.rstrip(";</script>")
|
||||
self.yt_json = json.loads(json_raw)
|
||||
|
||||
def _is_deactivated(self):
|
||||
"""check if channel is deactivated"""
|
||||
alert_text = "This channel does not exist."
|
||||
alerts = self.yt_json.get("alerts")
|
||||
if alerts and alert_text in str(alerts):
|
||||
print(f"{self.channel_id}: {alert_text}")
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _parse_channel_main(self):
|
||||
"""extract maintab values from scraped channel json data"""
|
||||
main_tab = self.yt_json["header"]["c4TabbedHeaderRenderer"]
|
||||
|
@ -195,6 +195,10 @@ class Reindex:
|
||||
subscribed = channel.json_data["channel_subscribed"]
|
||||
overwrites = channel.json_data.get("channel_overwrites", False)
|
||||
channel.get_from_youtube()
|
||||
if not channel.json_data:
|
||||
channel.deactivate()
|
||||
return
|
||||
|
||||
channel.json_data["channel_subscribed"] = subscribed
|
||||
if overwrites:
|
||||
channel.json_data["channel_overwrites"] = overwrites
|
||||
|
Loading…
Reference in New Issue
Block a user