add type hints

pull/474/head
simon 1 year ago
parent e9d7523a1f
commit ee4dbf99b3
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4

@ -2,6 +2,7 @@
functionality: functionality:
- interact with redis - interact with redis
- hold temporary download queue in redis - hold temporary download queue in redis
- interact with celery tasks results
""" """
import json import json
@ -13,9 +14,9 @@ import redis
class RedisBase: class RedisBase:
"""connection base for redis""" """connection base for redis"""
REDIS_HOST = os.environ.get("REDIS_HOST") REDIS_HOST: str = str(os.environ.get("REDIS_HOST"))
REDIS_PORT = os.environ.get("REDIS_PORT") or 6379 REDIS_PORT: int = int(os.environ.get("REDIS_PORT") or 6379)
NAME_SPACE = "ta:" NAME_SPACE: str = "ta:"
def __init__(self): def __init__(self):
self.conn = redis.Redis(host=self.REDIS_HOST, port=self.REDIS_PORT) self.conn = redis.Redis(host=self.REDIS_HOST, port=self.REDIS_PORT)
@ -24,7 +25,7 @@ class RedisBase:
class RedisArchivist(RedisBase): class RedisArchivist(RedisBase):
"""collection of methods to interact with redis""" """collection of methods to interact with redis"""
CHANNELS = [ CHANNELS: list[str] = [
"download", "download",
"add", "add",
"rescan", "rescan",
@ -34,7 +35,13 @@ class RedisArchivist(RedisBase):
"setting", "setting",
] ]
def set_message(self, key, message, path=".", expire=False): def set_message(
self,
key: str,
message: dict,
path: str = ".",
expire: bool | int = False,
) -> None:
"""write new message to redis""" """write new message to redis"""
self.conn.execute_command( self.conn.execute_command(
"JSON.SET", self.NAME_SPACE + key, path, json.dumps(message) "JSON.SET", self.NAME_SPACE + key, path, json.dumps(message)
@ -42,32 +49,30 @@ class RedisArchivist(RedisBase):
if expire: if expire:
if isinstance(expire, bool): if isinstance(expire, bool):
secs = 20 secs: int = 20
else: else:
secs = expire secs = expire
self.conn.execute_command("EXPIRE", self.NAME_SPACE + key, secs) self.conn.execute_command("EXPIRE", self.NAME_SPACE + key, secs)
def get_message(self, key): def get_message(self, key: str) -> dict:
"""get message dict from redis""" """get message dict from redis"""
reply = self.conn.execute_command("JSON.GET", self.NAME_SPACE + key) reply = self.conn.execute_command("JSON.GET", self.NAME_SPACE + key)
if reply: if reply:
json_str = json.loads(reply) return json.loads(reply)
else:
json_str = {"status": False}
return json_str return {"status": False}
def list_keys(self, query): def list_keys(self, query: str) -> list:
"""return all key matches""" """return all key matches"""
reply = self.conn.execute_command( reply = self.conn.execute_command(
"KEYS", self.NAME_SPACE + query + "*" "KEYS", self.NAME_SPACE + query + "*"
) )
if not reply: if not reply:
return False return []
return [i.decode().lstrip(self.NAME_SPACE) for i in reply] return [i.decode().lstrip(self.NAME_SPACE) for i in reply]
def list_items(self, query): def list_items(self, query: str) -> list:
"""list all matches""" """list all matches"""
all_matches = self.list_keys(query) all_matches = self.list_keys(query)
if not all_matches: if not all_matches:
@ -75,41 +80,16 @@ class RedisArchivist(RedisBase):
return [self.get_message(i) for i in all_matches] return [self.get_message(i) for i in all_matches]
def del_message(self, key): def del_message(self, key: str) -> bool:
"""delete key from redis""" """delete key from redis"""
response = self.conn.execute_command("DEL", self.NAME_SPACE + key) response = self.conn.execute_command("DEL", self.NAME_SPACE + key)
return response return response
def get_lock(self, lock_key):
"""handle lock for task management"""
redis_lock = self.conn.lock(self.NAME_SPACE + lock_key)
return redis_lock
def is_locked(self, lock_key):
"""check if lock is set"""
lock_name = self.NAME_SPACE + lock_key
lock_status = bool(self.conn.execute_command("GET", lock_name))
return lock_status
def get_progress(self):
"""get a list of all progress messages"""
all_messages = []
for channel in self.CHANNELS:
key = "message:" + channel
reply = self.conn.execute_command(
"JSON.GET", self.NAME_SPACE + key
)
if reply:
json_str = json.loads(reply)
all_messages.append(json_str)
return all_messages
class RedisQueue(RedisBase): class RedisQueue(RedisBase):
"""dynamically interact with queues in redis""" """dynamically interact with queues in redis"""
def __init__(self, queue_name): def __init__(self, queue_name: str):
super().__init__() super().__init__()
self.key = f"{self.NAME_SPACE}{queue_name}" self.key = f"{self.NAME_SPACE}{queue_name}"
@ -119,11 +99,11 @@ class RedisQueue(RedisBase):
all_elements = [i.decode() for i in result] all_elements = [i.decode() for i in result]
return all_elements return all_elements
def length(self): def length(self) -> int:
"""return total elements in list""" """return total elements in list"""
return self.conn.execute_command("LLEN", self.key) return self.conn.execute_command("LLEN", self.key)
def in_queue(self, element): def in_queue(self, element) -> str | bool:
"""check if element is in list""" """check if element is in list"""
result = self.conn.execute_command("LPOS", self.key, element) result = self.conn.execute_command("LPOS", self.key, element)
if result is not None: if result is not None:
@ -135,13 +115,13 @@ class RedisQueue(RedisBase):
"""add list to queue""" """add list to queue"""
self.conn.execute_command("RPUSH", self.key, *to_add) self.conn.execute_command("RPUSH", self.key, *to_add)
def add_priority(self, to_add): def add_priority(self, to_add: str) -> None:
"""add single video to front of queue""" """add single video to front of queue"""
item = json.dumps(to_add) item: str = json.dumps(to_add)
self.clear_item(item) self.clear_item(item)
self.conn.execute_command("LPUSH", self.key, item) self.conn.execute_command("LPUSH", self.key, item)
def get_next(self): def get_next(self) -> str | bool:
"""return next element in the queue, False if none""" """return next element in the queue, False if none"""
result = self.conn.execute_command("LPOP", self.key) result = self.conn.execute_command("LPOP", self.key)
if not result: if not result:
@ -150,19 +130,19 @@ class RedisQueue(RedisBase):
next_element = result.decode() next_element = result.decode()
return next_element return next_element
def clear(self): def clear(self) -> None:
"""delete list from redis""" """delete list from redis"""
self.conn.execute_command("DEL", self.key) self.conn.execute_command("DEL", self.key)
def clear_item(self, to_clear): def clear_item(self, to_clear: str) -> None:
"""remove single item from list if it's there""" """remove single item from list if it's there"""
self.conn.execute_command("LREM", self.key, 0, to_clear) self.conn.execute_command("LREM", self.key, 0, to_clear)
def trim(self, size): def trim(self, size: int) -> None:
"""trim the queue based on settings amount""" """trim the queue based on settings amount"""
self.conn.execute_command("LTRIM", self.key, 0, size) self.conn.execute_command("LTRIM", self.key, 0, size)
def has_item(self): def has_item(self) -> bool:
"""check if queue as at least one pending item""" """check if queue as at least one pending item"""
result = self.conn.execute_command("LRANGE", self.key, 0, 0) result = self.conn.execute_command("LRANGE", self.key, 0, 0)
return bool(result) return bool(result)
@ -171,32 +151,34 @@ class RedisQueue(RedisBase):
class TaskRedis(RedisBase): class TaskRedis(RedisBase):
"""interact with redis tasks""" """interact with redis tasks"""
BASE = "celery-task-meta-" BASE: str = "celery-task-meta-"
EXPIRE = 60 * 60 * 24 EXPIRE: int = 60 * 60 * 24
COMMANDS = ["STOP", "KILL"] COMMANDS: list[str] = ["STOP", "KILL"]
def get_all(self): def get_all(self) -> list:
"""return all tasks""" """return all tasks"""
all_keys = self.conn.execute_command("KEYS", f"{self.BASE}*") all_keys = self.conn.execute_command("KEYS", f"{self.BASE}*")
return [i.decode().replace(self.BASE, "") for i in all_keys] return [i.decode().replace(self.BASE, "") for i in all_keys]
def get_single(self, task_id): def get_single(self, task_id: str) -> dict:
"""return content of single task""" """return content of single task"""
result = self.conn.execute_command("GET", self.BASE + task_id) result = self.conn.execute_command("GET", self.BASE + task_id)
if not result: if not result:
return False return {}
return json.loads(result.decode()) return json.loads(result.decode())
def set_key(self, task_id, message, expire=False): def set_key(
self, task_id: str, message: dict, expire: bool | int = False
) -> None:
"""set value for lock, initial or update""" """set value for lock, initial or update"""
key = f"{self.BASE}{task_id}" key: str = f"{self.BASE}{task_id}"
self.conn.execute_command("SET", key, json.dumps(message)) self.conn.execute_command("SET", key, json.dumps(message))
if expire: if expire:
self.conn.execute_command("EXPIRE", key, self.EXPIRE) self.conn.execute_command("EXPIRE", key, self.EXPIRE)
def set_command(self, task_id, command): def set_command(self, task_id: str, command: str) -> None:
"""set task command""" """set task command"""
if command not in self.COMMANDS: if command not in self.COMMANDS:
print(f"{command} not in valid commands {self.COMMANDS}") print(f"{command} not in valid commands {self.COMMANDS}")
@ -210,11 +192,11 @@ class TaskRedis(RedisBase):
message.update({"command": command}) message.update({"command": command})
self.set_key(task_id, message) self.set_key(task_id, message)
def del_task(self, task_id): def del_task(self, task_id: str) -> None:
"""delete task result by id""" """delete task result by id"""
self.conn.execute_command("DEL", f"{self.BASE}{task_id}") self.conn.execute_command("DEL", f"{self.BASE}{task_id}")
def del_all(self): def del_all(self) -> None:
"""delete all task results""" """delete all task results"""
all_tasks = self.get_all() all_tasks = self.get_all()
for task_id in all_tasks: for task_id in all_tasks:

Loading…
Cancel
Save