|
|
|
@ -342,6 +342,80 @@ class Reindex(ReindexBase):
|
|
|
|
|
return valid
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ReindexProgress(ReindexBase):
|
|
|
|
|
"""
|
|
|
|
|
get progress of reindex task
|
|
|
|
|
request_type: key of self.REINDEX_CONFIG
|
|
|
|
|
request_id: id of request_type
|
|
|
|
|
return = {
|
|
|
|
|
"state": "running" | "queued" | False
|
|
|
|
|
"total_queued": int
|
|
|
|
|
"in_queue_name": "queue_name"
|
|
|
|
|
}
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, request_type=False, request_id=False):
|
|
|
|
|
super().__init__()
|
|
|
|
|
self.request_type = request_type
|
|
|
|
|
self.request_id = request_id
|
|
|
|
|
|
|
|
|
|
def get_progress(self):
|
|
|
|
|
"""get progress from task"""
|
|
|
|
|
queue_name, request_type = self._get_queue_name()
|
|
|
|
|
total = self._get_total_in_queue(queue_name)
|
|
|
|
|
|
|
|
|
|
progress = {
|
|
|
|
|
"total_queued": total,
|
|
|
|
|
"type": request_type,
|
|
|
|
|
}
|
|
|
|
|
state = self._get_state(total, queue_name)
|
|
|
|
|
progress.update(state)
|
|
|
|
|
|
|
|
|
|
return progress
|
|
|
|
|
|
|
|
|
|
def _get_queue_name(self):
|
|
|
|
|
"""return queue_name, queue_type, raise exception on error"""
|
|
|
|
|
if not self.request_type:
|
|
|
|
|
return "all", "all"
|
|
|
|
|
|
|
|
|
|
reindex_config = self.REINDEX_CONFIG.get(self.request_type)
|
|
|
|
|
if not reindex_config:
|
|
|
|
|
print(f"reindex_config not found: {self.request_type}")
|
|
|
|
|
raise ValueError
|
|
|
|
|
|
|
|
|
|
return reindex_config["queue_name"], self.request_type
|
|
|
|
|
|
|
|
|
|
def _get_total_in_queue(self, queue_name):
|
|
|
|
|
"""get all items in queue"""
|
|
|
|
|
total = 0
|
|
|
|
|
if queue_name == "all":
|
|
|
|
|
queues = [i["queue_name"] for i in self.REINDEX_CONFIG.values()]
|
|
|
|
|
for queue in queues:
|
|
|
|
|
total += len(RedisQueue(queue).get_all())
|
|
|
|
|
else:
|
|
|
|
|
total += len(RedisQueue(queue_name).get_all())
|
|
|
|
|
|
|
|
|
|
return total
|
|
|
|
|
|
|
|
|
|
def _get_state(self, total, queue_name):
|
|
|
|
|
"""get state based on request_id"""
|
|
|
|
|
state_dict = {}
|
|
|
|
|
if self.request_id:
|
|
|
|
|
state = RedisQueue(queue_name).in_queue(self.request_id)
|
|
|
|
|
state_dict.update({"id": self.request_id, "state": state})
|
|
|
|
|
|
|
|
|
|
return state_dict
|
|
|
|
|
|
|
|
|
|
if total:
|
|
|
|
|
state = "running"
|
|
|
|
|
else:
|
|
|
|
|
state = "empty"
|
|
|
|
|
|
|
|
|
|
state_dict.update({"state": state})
|
|
|
|
|
|
|
|
|
|
return state_dict
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ChannelUrlFixer:
|
|
|
|
|
"""fix not matching channel names in reindex"""
|
|
|
|
|
|
|
|
|
|