2
0
mirror of https://github.com/ComradCollective/Comrad synced 2024-11-01 21:40:32 +00:00
Comrad/komrade/utils.py

148 lines
4.3 KiB
Python
Raw Normal View History

2020-09-05 16:26:37 +00:00
class KomradeException(Exception): pass
# make sure komrade is on path
import sys,os
sys.path.append(os.path.dirname(__file__))
2020-09-05 21:21:29 +00:00
def logger():
import logging
handler = logging.StreamHandler()
formatter = logging.Formatter('[%(asctime)s]\n%(message)s\n')
handler.setFormatter(formatter)
2020-09-07 17:11:52 +00:00
logger = logging.getLogger('komrade')
2020-09-05 21:21:29 +00:00
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
return logger
LOG = None
def log(*x):
global LOG
if not LOG: LOG=logger().debug
tolog=' '.join(str(_) for _ in x)
LOG(tolog)
2020-09-09 18:31:36 +00:00
def clear_screen():
import os
os.system('cls' if os.name == 'nt' else 'clear')
def pause():
input('')
2020-09-05 21:21:29 +00:00
2020-09-05 16:26:37 +00:00
import inspect
class Logger(object):
def log(self,*x):
curframe = inspect.currentframe()
calframe = inspect.getouterframes(curframe, 2)
mytype = type(self).__name__
caller = calframe[1][3]
2020-09-05 21:26:18 +00:00
log(f'\n[{mytype}.{caller}()]',*x)
2020-09-09 18:31:36 +00:00
try:
pause()
clear_screen()
except KeyboardInterrupt:
exit()
2020-09-05 17:55:12 +00:00
2020-09-05 21:54:07 +00:00
import binascii,base64
2020-09-05 21:53:40 +00:00
def isBase64(sb):
try:
if isinstance(sb, str):
# If there's any unicode here, an exception will be thrown and the function will return false
sb_bytes = bytes(sb, 'ascii')
elif isinstance(sb, bytes):
sb_bytes = sb
else:
raise ValueError("Argument must be string or bytes")
return base64.b64encode(base64.b64decode(sb_bytes)) == sb_bytes
except binascii.Error:
return False
2020-09-05 17:55:12 +00:00
def hashish(binary_data):
import hashlib
return hashlib.sha256(binary_data).hexdigest()
2020-09-05 17:55:12 +00:00
from base64 import b64encode,b64decode
import ujson as json
2020-09-07 22:43:35 +00:00
import pickle
def package_for_transmission(data_json):
2020-09-07 21:50:22 +00:00
# print('package_for_transmission.data_json =',data_json)
2020-09-07 22:43:35 +00:00
data_json_b = pickle.dumps(data_json)
2020-09-07 21:50:22 +00:00
# print('package_for_transmission.data_json_b =',data_json_bstr)
2020-09-07 22:43:35 +00:00
return b64encode(data_json_b)
2020-09-07 21:45:01 +00:00
def dejsonize(dict):
for k,v in dict.items():
if type(v)==str and isBase64(v):
2020-09-07 21:46:07 +00:00
dict[k]=v.encode()
# if type(v)==bytes and isBase64(v):
# dict[k]=b64decode(v)
elif type(v)==dict:
2020-09-07 21:45:01 +00:00
dict[k]=dejsonize(v)
return dict
def unpackage_from_transmission(data_json_b64):
2020-09-07 22:43:35 +00:00
# print('unpackage_from_transmission.data_json_b64 =',data_json_b64)
data_json_b = b64decode(data_json_b64)
2020-09-09 17:39:30 +00:00
# print('unpackage_from_transmission.data_json_bstr =',data_json_b)
2020-09-07 21:45:01 +00:00
2020-09-07 22:43:35 +00:00
data_json = pickle.loads(data_json_b)
# print('unpackage_from_transmission.data_json =',data_json)
2020-09-07 21:45:01 +00:00
2020-09-07 22:43:35 +00:00
# data_json_dejson = dejsonize(data_json)
# print('unpackage_from_transmission.data_json =',data_json_dejson)
2020-09-07 21:45:01 +00:00
return data_json
2020-09-05 17:55:12 +00:00
2020-09-08 06:58:54 +00:00
def get_random_id():
import uuid
return uuid.uuid4().hex
def get_random_binary_id():
import base64
idstr = get_random_id()
return base64.b64encode(idstr.encode())
2020-09-05 17:55:12 +00:00
# Recursive dictionary merge
# https://gist.github.com/angstwad/bf22d1822c38a92ec0a9
#
# Copyright (C) 2016 Paul Durivage <pauldurivage+github@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import collections
def dict_merge(dct, merge_dct):
""" Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
updating only top-level keys, dict_merge recurses down into dicts nested
to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
``dct``.
:param dct: dict onto which the merge is executed
:param merge_dct: dct merged into dct
:return: None
"""
2020-09-06 21:12:49 +00:00
for k, v in merge_dct.items():
2020-09-05 17:55:12 +00:00
if (k in dct and isinstance(dct[k], dict)
and isinstance(merge_dct[k], collections.Mapping)):
dict_merge(dct[k], merge_dct[k])
else:
dct[k] = merge_dct[k]