diff --git a/tubearchivist/home/src/ta/ta_redis.py b/tubearchivist/home/src/ta/ta_redis.py index 7081fdc5..46b1b5c0 100644 --- a/tubearchivist/home/src/ta/ta_redis.py +++ b/tubearchivist/home/src/ta/ta_redis.py @@ -2,6 +2,7 @@ functionality: - interact with redis - hold temporary download queue in redis +- interact with celery tasks results """ import json @@ -13,9 +14,9 @@ import redis class RedisBase: """connection base for redis""" - REDIS_HOST = os.environ.get("REDIS_HOST") - REDIS_PORT = os.environ.get("REDIS_PORT") or 6379 - NAME_SPACE = "ta:" + REDIS_HOST: str = str(os.environ.get("REDIS_HOST")) + REDIS_PORT: int = int(os.environ.get("REDIS_PORT") or 6379) + NAME_SPACE: str = "ta:" def __init__(self): self.conn = redis.Redis(host=self.REDIS_HOST, port=self.REDIS_PORT) @@ -24,7 +25,7 @@ class RedisBase: class RedisArchivist(RedisBase): """collection of methods to interact with redis""" - CHANNELS = [ + CHANNELS: list[str] = [ "download", "add", "rescan", @@ -34,7 +35,13 @@ class RedisArchivist(RedisBase): "setting", ] - def set_message(self, key, message, path=".", expire=False): + def set_message( + self, + key: str, + message: dict, + path: str = ".", + expire: bool | int = False, + ) -> None: """write new message to redis""" self.conn.execute_command( "JSON.SET", self.NAME_SPACE + key, path, json.dumps(message) @@ -42,32 +49,30 @@ class RedisArchivist(RedisBase): if expire: if isinstance(expire, bool): - secs = 20 + secs: int = 20 else: secs = expire self.conn.execute_command("EXPIRE", self.NAME_SPACE + key, secs) - def get_message(self, key): + def get_message(self, key: str) -> dict: """get message dict from redis""" reply = self.conn.execute_command("JSON.GET", self.NAME_SPACE + key) if reply: - json_str = json.loads(reply) - else: - json_str = {"status": False} + return json.loads(reply) - return json_str + return {"status": False} - def list_keys(self, query): + def list_keys(self, query: str) -> list: """return all key matches""" reply = self.conn.execute_command( "KEYS", self.NAME_SPACE + query + "*" ) if not reply: - return False + return [] return [i.decode().lstrip(self.NAME_SPACE) for i in reply] - def list_items(self, query): + def list_items(self, query: str) -> list: """list all matches""" all_matches = self.list_keys(query) if not all_matches: @@ -75,41 +80,16 @@ class RedisArchivist(RedisBase): return [self.get_message(i) for i in all_matches] - def del_message(self, key): + def del_message(self, key: str) -> bool: """delete key from redis""" response = self.conn.execute_command("DEL", self.NAME_SPACE + key) return response - def get_lock(self, lock_key): - """handle lock for task management""" - redis_lock = self.conn.lock(self.NAME_SPACE + lock_key) - return redis_lock - - def is_locked(self, lock_key): - """check if lock is set""" - lock_name = self.NAME_SPACE + lock_key - lock_status = bool(self.conn.execute_command("GET", lock_name)) - return lock_status - - def get_progress(self): - """get a list of all progress messages""" - all_messages = [] - for channel in self.CHANNELS: - key = "message:" + channel - reply = self.conn.execute_command( - "JSON.GET", self.NAME_SPACE + key - ) - if reply: - json_str = json.loads(reply) - all_messages.append(json_str) - - return all_messages - class RedisQueue(RedisBase): """dynamically interact with queues in redis""" - def __init__(self, queue_name): + def __init__(self, queue_name: str): super().__init__() self.key = f"{self.NAME_SPACE}{queue_name}" @@ -119,11 +99,11 @@ class RedisQueue(RedisBase): all_elements = [i.decode() for i in result] return all_elements - def length(self): + def length(self) -> int: """return total elements in list""" return self.conn.execute_command("LLEN", self.key) - def in_queue(self, element): + def in_queue(self, element) -> str | bool: """check if element is in list""" result = self.conn.execute_command("LPOS", self.key, element) if result is not None: @@ -135,13 +115,13 @@ class RedisQueue(RedisBase): """add list to queue""" self.conn.execute_command("RPUSH", self.key, *to_add) - def add_priority(self, to_add): + def add_priority(self, to_add: str) -> None: """add single video to front of queue""" - item = json.dumps(to_add) + item: str = json.dumps(to_add) self.clear_item(item) self.conn.execute_command("LPUSH", self.key, item) - def get_next(self): + def get_next(self) -> str | bool: """return next element in the queue, False if none""" result = self.conn.execute_command("LPOP", self.key) if not result: @@ -150,19 +130,19 @@ class RedisQueue(RedisBase): next_element = result.decode() return next_element - def clear(self): + def clear(self) -> None: """delete list from redis""" self.conn.execute_command("DEL", self.key) - def clear_item(self, to_clear): + def clear_item(self, to_clear: str) -> None: """remove single item from list if it's there""" self.conn.execute_command("LREM", self.key, 0, to_clear) - def trim(self, size): + def trim(self, size: int) -> None: """trim the queue based on settings amount""" self.conn.execute_command("LTRIM", self.key, 0, size) - def has_item(self): + def has_item(self) -> bool: """check if queue as at least one pending item""" result = self.conn.execute_command("LRANGE", self.key, 0, 0) return bool(result) @@ -171,32 +151,34 @@ class RedisQueue(RedisBase): class TaskRedis(RedisBase): """interact with redis tasks""" - BASE = "celery-task-meta-" - EXPIRE = 60 * 60 * 24 - COMMANDS = ["STOP", "KILL"] + BASE: str = "celery-task-meta-" + EXPIRE: int = 60 * 60 * 24 + COMMANDS: list[str] = ["STOP", "KILL"] - def get_all(self): + def get_all(self) -> list: """return all tasks""" all_keys = self.conn.execute_command("KEYS", f"{self.BASE}*") return [i.decode().replace(self.BASE, "") for i in all_keys] - def get_single(self, task_id): + def get_single(self, task_id: str) -> dict: """return content of single task""" result = self.conn.execute_command("GET", self.BASE + task_id) if not result: - return False + return {} return json.loads(result.decode()) - def set_key(self, task_id, message, expire=False): + def set_key( + self, task_id: str, message: dict, expire: bool | int = False + ) -> None: """set value for lock, initial or update""" - key = f"{self.BASE}{task_id}" + key: str = f"{self.BASE}{task_id}" self.conn.execute_command("SET", key, json.dumps(message)) if expire: self.conn.execute_command("EXPIRE", key, self.EXPIRE) - def set_command(self, task_id, command): + def set_command(self, task_id: str, command: str) -> None: """set task command""" if command not in self.COMMANDS: print(f"{command} not in valid commands {self.COMMANDS}") @@ -210,11 +192,11 @@ class TaskRedis(RedisBase): message.update({"command": command}) self.set_key(task_id, message) - def del_task(self, task_id): + def del_task(self, task_id: str) -> None: """delete task result by id""" self.conn.execute_command("DEL", f"{self.BASE}{task_id}") - def del_all(self): + def del_all(self) -> None: """delete all task results""" all_tasks = self.get_all() for task_id in all_tasks: