@ -10,7 +10,7 @@ import os
import zipfile
from datetime import datetime
import requests
from home . src . es . connect import ElasticWrap , IndexPaginate
from home . src . ta . config import AppConfig
from home . src . ta . helper import ignore_filelist
@ -20,11 +20,6 @@ class ElasticIndex:
handle mapping and settings on elastic search for a given index
"""
CONFIG = AppConfig ( ) . config
ES_URL = CONFIG [ " application " ] [ " es_url " ]
ES_AUTH = CONFIG [ " application " ] [ " es_auth " ]
HEADERS = { " Content-type " : " application/json " }
def __init__ ( self , index_name , expected_map , expected_set ) :
self . index_name = index_name
self . expected_map = expected_map
@ -33,15 +28,9 @@ class ElasticIndex:
def index_exists ( self ) :
""" check if index already exists and return mapping if it does """
index_name = self . index_name
url = f " { self . ES_URL } /ta_ { index_name } "
response = requests . get ( url , auth = self . ES_AUTH )
exists = response . ok
if exists :
details = response . json ( ) [ f " ta_ { index_name } " ]
else :
details = False
response , status_code = ElasticWrap ( f " ta_ { self . index_name } " ) . get ( )
exists = status_code == 200
details = response . get ( f " ta_ { self . index_name } " , False )
return exists , details
@ -110,63 +99,41 @@ class ElasticIndex:
def rebuild_index ( self ) :
""" rebuild with new mapping """
# backup
self . reindex ( " backup " )
# delete original
self . delete_index ( backup = False )
# create new
self . create_blank ( )
self . reindex ( " restore " )
# delete backup
self . delete_index ( )
def reindex ( self , method ) :
""" create on elastic search """
index_name = self . index_name
if method == " backup " :
source = f " ta_ { index_name } "
destination = f " ta_ { index_name } _backup "
source = f " ta_ { self . index_name } "
destination = f " ta_ { self . index_name } _backup "
elif method == " restore " :
source = f " ta_ { index_name } _backup "
destination = f " ta_ { index_name } "
query = { " source " : { " index " : source } , " dest " : { " index " : destination } }
data = json . dumps ( query )
url = self . ES_URL + " /_reindex?refresh=true "
response = requests . post (
url = url , data = data , headers = self . HEADERS , auth = self . ES_AUTH
)
if not response . ok :
print ( response . text )
source = f " ta_ { self . index_name } _backup "
destination = f " ta_ { self . index_name } "
data = { " source " : { " index " : source } , " dest " : { " index " : destination } }
_ , _ = ElasticWrap ( " _reindex?refresh=true " ) . post ( data = data )
def delete_index ( self , backup = True ) :
""" delete index passed as argument """
path = f " ta_ { self . index_name } "
if backup :
url = f " { self . ES_URL } /ta_ { self . index_name } _backup "
else :
url = f " { self . ES_URL } /ta_ { self . index_name } "
response = requests . delete ( url , auth = self . ES_AUTH )
if not response . ok :
print ( response . text )
path = path + " _backup "
_ , _ = ElasticWrap ( path ) . delete ( )
def create_blank ( self ) :
""" apply new mapping and settings for blank new index """
expected_map = self . expected_map
expected_set = self . expected_set
# stich payload
payload = { }
if expected_set :
payload . update ( { " settings " : expected_set } )
if expected_map :
payload . update ( { " mappings " : { " properties " : expected_map } } )
# create
url = f " { self . ES_URL } /ta_ { self . index_name } "
data = json . dumps ( payload )
response = requests . put (
url = url , data = data , headers = self . HEADERS , auth = self . ES_AUTH
)
if not response . ok :
print ( response . text )
data = { }
if self . expected_set :
data . update ( { " settings " : self . expected_set } )
if self . expected_map :
data . update ( { " mappings " : { " properties " : self . expected_map } } )
_ , _ = ElasticWrap ( f " ta_ { self . index_name } " ) . put ( data )
class ElasticBackup :
@ -174,52 +141,21 @@ class ElasticBackup:
def __init__ ( self , index_config , reason ) :
self . config = AppConfig ( ) . config
self . cache_dir = self . config [ " application " ] [ " cache_dir " ]
self . index_config = index_config
self . reason = reason
self . timestamp = datetime . now ( ) . strftime ( " % Y % m %d " )
self . backup_files = [ ]
def get_all_documents ( self , index_name ) :
@staticmethod
def get_all_documents ( index_name ) :
""" export all documents of a single index """
headers = { " Content-type " : " application/json " }
es_url = self . config [ " application " ] [ " es_url " ]
es_auth = self . config [ " application " ] [ " es_auth " ]
# get PIT ID
url = f " { es_url } /ta_ { index_name } /_pit?keep_alive=1m "
response = requests . post ( url , auth = es_auth )
json_data = json . loads ( response . text )
pit_id = json_data [ " id " ]
# build query
data = {
" query " : { " match_all " : { } } ,
" size " : 100 ,
" pit " : { " id " : pit_id , " keep_alive " : " 1m " } ,
" sort " : [ { " _id " : { " order " : " asc " } } ] ,
" sort " : [ { " _doc " : { " order " : " desc " } } ] ,
}
query_str = json . dumps ( data )
url = es_url + " /_search "
# loop until nothing left
all_results = [ ]
while True :
response = requests . get (
url , data = query_str , headers = headers , auth = es_auth
)
json_data = json . loads ( response . text )
all_hits = json_data [ " hits " ] [ " hits " ]
if all_hits :
for hit in all_hits :
search_after = hit [ " sort " ]
all_results . append ( hit )
# update search_after with last hit data
data [ " search_after " ] = search_after
query_str = json . dumps ( data )
else :
break
# clean up PIT
query_str = json . dumps ( { " id " : pit_id } )
requests . delete (
es_url + " /_pit " , data = query_str , headers = headers , auth = es_auth
)
paginate = IndexPaginate ( f " ta_ { index_name } " , data , keep_source = True )
all_results = paginate . get_results ( )
return all_results
@ -244,9 +180,8 @@ class ElasticBackup:
def write_es_json ( self , file_content , index_name ) :
""" write nd-json file for es _bulk API to disk """
cache_dir = self . config [ " application " ] [ " cache_dir " ]
file_name = f " es_ { index_name } - { self . timestamp } .json "
file_path = os . path . join ( cache_dir , " backup " , file_name )
file_path = os . path . join ( self . cache_dir , " backup " , file_name )
with open ( file_path , " w " , encoding = " utf-8 " ) as f :
f . write ( file_content )
@ -254,9 +189,8 @@ class ElasticBackup:
def write_ta_json ( self , all_results , index_name ) :
""" write generic json file to disk """
cache_dir = self . config [ " application " ] [ " cache_dir " ]
file_name = f " ta_ { index_name } - { self . timestamp } .json "
file_path = os . path . join ( cache_dir , " backup " , file_name )
file_path = os . path . join ( self . cache_dir , " backup " , file_name )
to_write = [ i [ " _source " ] for i in all_results ]
file_content = json . dumps ( to_write )
with open ( file_path , " w " , encoding = " utf-8 " ) as f :
@ -266,9 +200,8 @@ class ElasticBackup:
def zip_it ( self ) :
""" pack it up into single zip file """
cache_dir = self . config [ " application " ] [ " cache_dir " ]
file_name = f " ta_backup- { self . timestamp } - { self . reason } .zip "
backup_folder = os . path . join ( cache_dir , " backup " )
backup_folder = os . path . join ( self . cache_dir , " backup " )
backup_file = os . path . join ( backup_folder , file_name )
with zipfile . ZipFile (
@ -283,29 +216,18 @@ class ElasticBackup:
def post_bulk_restore ( self , file_name ) :
""" send bulk to es """
cache_dir = self . config [ " application " ] [ " cache_dir " ]
es_url = self . config [ " application " ] [ " es_url " ]
es_auth = self . config [ " application " ] [ " es_auth " ]
headers = { " Content-type " : " application/x-ndjson " }
file_path = os . path . join ( cache_dir , file_name )
file_path = os . path . join ( self . cache_dir , file_name )
with open ( file_path , " r " , encoding = " utf-8 " ) as f :
query_str = f . read ( )
data = f . read ( )
if not query_str . strip ( ) :
if not data . strip ( ) :
return
url = es_url + " /_bulk "
request = requests . post (
url , data = query_str , headers = headers , auth = es_auth
)
if not request . ok :
print ( request . text )
_ , _ = ElasticWrap ( " _bulk " ) . post ( data = data , ndjson = True )
def get_all_backup_files ( self ) :
""" build all available backup files for view """
cache_dir = self . config [ " application " ] [ " cache_dir " ]
backup_dir = os . path . join ( cache_dir , " backup " )
backup_dir = os . path . join ( self . cache_dir , " backup " )
backup_files = os . listdir ( backup_dir )
all_backup_files = ignore_filelist ( backup_files )
all_available_backups = [
@ -336,8 +258,7 @@ class ElasticBackup:
def unpack_zip_backup ( self , filename ) :
""" extract backup zip and return filelist """
cache_dir = self . config [ " application " ] [ " cache_dir " ]
backup_dir = os . path . join ( cache_dir , " backup " )
backup_dir = os . path . join ( self . cache_dir , " backup " )
file_path = os . path . join ( backup_dir , filename )
with zipfile . ZipFile ( file_path , " r " ) as z :
@ -348,9 +269,7 @@ class ElasticBackup:
def restore_json_files ( self , zip_content ) :
""" go through the unpacked files and restore """
cache_dir = self . config [ " application " ] [ " cache_dir " ]
backup_dir = os . path . join ( cache_dir , " backup " )
backup_dir = os . path . join ( self . cache_dir , " backup " )
for json_f in zip_content :
@ -364,14 +283,13 @@ class ElasticBackup:
self . post_bulk_restore ( file_name )
os . remove ( file_name )
def index_exists ( self , index_name ) :
@staticmethod
def index_exists ( index_name ) :
""" check if index already exists to skip """
es_url = self . config [ " application " ] [ " es_url " ]
es_auth = self . config [ " application " ] [ " es_auth " ]
url = f " { es_url } /ta_ { index_name } "
response = requests . get ( url , auth = es_auth )
_ , status_code = ElasticWrap ( f " ta_ { index_name } " ) . get ( )
exists = status_code == 200
return response. ok
return exists
def rotate_backup ( self ) :
""" delete old backups if needed """
@ -386,8 +304,7 @@ class ElasticBackup:
print ( " no backup files to rotate " )
return
cache_dir = self . config [ " application " ] [ " cache_dir " ]
backup_dir = os . path . join ( cache_dir , " backup " )
backup_dir = os . path . join ( self . cache_dir , " backup " )
all_to_delete = auto [ rotate : ]
for to_delete in all_to_delete :