2
0
mirror of https://github.com/ComradCollective/Comrad synced 2024-11-05 21:20:51 +00:00
Comrad/komrade/utils.py

367 lines
11 KiB
Python
Raw Normal View History

2020-09-05 16:26:37 +00:00
class KomradeException(Exception): pass
# make sure komrade is on path
import sys,os
sys.path.append(os.path.dirname(__file__))
2020-09-05 21:21:29 +00:00
def logger():
import logging
handler = logging.StreamHandler()
formatter = logging.Formatter('[%(asctime)s]\n%(message)s\n')
handler.setFormatter(formatter)
2020-09-07 17:11:52 +00:00
logger = logging.getLogger('komrade')
2020-09-05 21:21:29 +00:00
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
return logger
LOG = None
def log(*x):
global LOG
2020-09-13 13:57:06 +00:00
#if not LOG: LOG=logger().debug
if not LOG: LOG=print
2020-09-05 21:21:29 +00:00
tolog=' '.join(str(_) for _ in x)
LOG(tolog)
2020-09-09 18:31:36 +00:00
def clear_screen():
import os
2020-09-11 14:35:47 +00:00
# pass
2020-09-09 18:31:36 +00:00
os.system('cls' if os.name == 'nt' else 'clear')
2020-09-10 13:57:56 +00:00
def do_pause():
2020-09-11 14:35:47 +00:00
try:
input('')
except KeyboardInterrupt:
exit('\n\nGoodbye.')
2020-09-05 21:21:29 +00:00
2020-09-12 19:07:27 +00:00
import textwrap as tw
2020-09-09 18:43:53 +00:00
def dict_format(d, tab=0):
2020-09-10 21:32:59 +00:00
def reppr(v):
2020-09-12 19:07:27 +00:00
if type(v)==bytes:
if not isBase64(v):
v=b64encode(v)
v=v.decode()
2020-09-12 21:01:59 +00:00
# v='\n'.join(tw.wrap(v,10))
2020-09-10 14:26:39 +00:00
return v
2020-09-10 14:46:09 +00:00
s = ['{\n\n']
2020-09-10 09:31:32 +00:00
for k,v in sorted(d.items()):
2020-09-12 07:55:23 +00:00
v=reppr(v)
#print(k,v,type(v))
2020-09-09 18:43:53 +00:00
if isinstance(v, dict):
2020-09-09 19:15:35 +00:00
v = dict_format(v, tab+1)
2020-09-09 18:43:53 +00:00
else:
v = repr(v)
2020-09-12 07:55:23 +00:00
2020-09-10 07:44:03 +00:00
# s.append('%s%r: %s (%s),\n' % (' '*tab, k, v, type(v).__name__))
2020-09-10 21:32:59 +00:00
s.append('%s%r: %s,\n\n' % (' '*tab, k, reppr(v)))
2020-09-11 14:35:47 +00:00
s.append('%s}' % (' '*(tab-2)))
2020-09-09 18:43:53 +00:00
return ''.join(s)
2020-09-10 17:52:07 +00:00
import inspect,time
2020-09-11 14:35:47 +00:00
from komrade.constants import *
2020-09-05 16:26:37 +00:00
class Logger(object):
2020-09-15 08:51:52 +00:00
@property
def off(self):
x=os.environ.get('KOMRADE_SHOW_LOG')
if x is not None:
x=x.strip().lower()
return x in {'n','0','false'}
return not SHOW_LOG
def hide_log(self):
os.environ['KOMRADE_SHOW_LOG']='0'
def show_log(self):
os.environ['KOMRADE_SHOW_LOG']='1'
def toggle_log(self):
self.show_log() if self.off else self.hide_log()
2020-09-13 18:20:19 +00:00
def log(self,*x,pause=PAUSE_LOGGER,clear=CLEAR_LOGGER):
2020-09-15 08:51:52 +00:00
if self.off: return
2020-09-05 16:26:37 +00:00
curframe = inspect.currentframe()
calframe = inspect.getouterframes(curframe, 2)
mytype = type(self).__name__
caller = calframe[1][3]
2020-09-13 13:57:06 +00:00
log(f'[{mytype}.{caller}()]'.center(CLI_WIDTH) + '\n\n',*x)
2020-09-09 20:03:39 +00:00
2020-09-10 16:11:27 +00:00
# try:
if pause: do_pause()
2020-09-13 18:55:14 +00:00
if clear: clear_screen()
2020-09-10 16:11:27 +00:00
# except KeyboardInterrupt:
2020-09-11 14:35:47 +00:00
# exit()
2020-09-05 17:55:12 +00:00
2020-09-12 09:25:09 +00:00
def printt(*x,width=STATUS_LINE_WIDTH,end='\n',indent=1,ret=False,scan=False,**y):
2020-09-11 14:35:47 +00:00
if not scan and not width:
print(*x,end=end,**y)
else:
import textwrap as tw
xs=end.join(str(xx) for xx in x if type(xx)==str)
if width:
xw = [_.strip() for _ in tw.wrap(xs,width=width)]
# xw = [_ for _ in tw.wrap(xs,width=width)]
xs=end.join(xw)
xs = tw.indent(xs,' '*indent)
2020-09-11 17:17:21 +00:00
if ret: return xs
2020-09-11 14:35:47 +00:00
print(xs) if scan==False else scan_print(xs)
def status(self,*msg,pause=True,clear=False,ticks=[],tab=2,speed=10,end=None,indent=0,width=80,scan=False):
2020-09-10 21:32:59 +00:00
import random
2020-09-10 17:54:49 +00:00
if not SHOW_STATUS: return
2020-09-10 17:52:07 +00:00
# if len(msg)==1 and type(msg[0])==str:
# msg=[x for x in msg[0].split('\n\n')]
if clear: clear_screen()
2020-09-10 14:16:53 +00:00
paras=[]
2020-09-10 17:52:07 +00:00
res={}
for para in msg:
plen = para if type(para)==int or type(para)==float else None
if type(para) in {int,float}:
plen=int(para)
2020-09-11 14:35:47 +00:00
# print()
print(' '*indent,end='',flush=True)
for i in range(plen):
2020-09-10 17:52:07 +00:00
tick = ticks[i] if i<len(ticks) else '.'
2020-09-11 14:35:47 +00:00
print(tick,end=end if end else ' ',flush=True) #,scan=scan)
# time.sleep(random.random() / speed)
time.sleep(random.uniform(0.05,0.2))
# print()
2020-09-10 17:52:07 +00:00
elif para is None:
clear_screen()
elif para is False:
2020-09-12 07:55:23 +00:00
do_pause()
2020-09-11 14:35:47 +00:00
elif para is True:
2020-09-12 07:55:23 +00:00
print()
2020-09-11 14:35:47 +00:00
elif type(para) is set: # logo/image
pl = [x for x in para if type(x)==str]
txt=pl[0]
2020-09-11 17:17:21 +00:00
speed =[x for x in para if type(x) in {int,float}]
2020-09-11 14:35:47 +00:00
speed = speed[0] if speed else 1
if True in para:
scan_print(txt,speed=speed)
else:
print(txt,end=end)
2020-09-10 17:52:07 +00:00
elif type(para) is tuple:
k=para[0]
q=para[1]
f=para[2] if len(para)>2 else input
ans=None
while not ans:
ans=f(q).strip()
res[k]=ans
elif type(para) is dict:
print(dict_format(para,tab=tab))
elif pause:
2020-09-12 09:25:09 +00:00
self.printt(para,flush=True,end=end if end else '\n',scan=scan,indent=indent)
2020-09-10 17:52:07 +00:00
paras+=[para]
do_pause()
else:
2020-09-12 09:25:09 +00:00
self.printt(para,flush=True,end=end if end else '\n',scan=scan,indent=indent)
2020-09-10 17:52:07 +00:00
paras+=[para]
return {'paras':paras, 'vals':res}
2020-09-10 13:57:56 +00:00
2020-09-05 21:54:07 +00:00
import binascii,base64
2020-09-05 21:53:40 +00:00
def isBase64(sb):
try:
if isinstance(sb, str):
# If there's any unicode here, an exception will be thrown and the function will return false
sb_bytes = bytes(sb, 'ascii')
elif isinstance(sb, bytes):
sb_bytes = sb
else:
2020-09-12 22:02:25 +00:00
return False
2020-09-05 21:53:40 +00:00
raise ValueError("Argument must be string or bytes")
return base64.b64encode(base64.b64decode(sb_bytes)) == sb_bytes
except binascii.Error:
return False
2020-09-05 17:55:12 +00:00
2020-09-12 18:18:37 +00:00
import inspect,functools
def get_class_that_defined_method(meth):
if isinstance(meth, functools.partial):
return get_class_that_defined_method(meth.func)
if inspect.ismethod(meth) or (inspect.isbuiltin(meth) and getattr(meth, '__self__', None) is not None and getattr(meth.__self__, '__class__', None)):
for cls in inspect.getmro(meth.__self__.__class__):
if meth.__name__ in cls.__dict__:
return cls
meth = getattr(meth, '__func__', meth) # fallback to __qualname__ parsing
if inspect.isfunction(meth):
cls = getattr(inspect.getmodule(meth),
meth.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0],
None)
if isinstance(cls, type):
return cls
return getattr(meth, '__objclass__', None) # handle special descriptor objects
2020-09-05 17:55:12 +00:00
2020-09-12 21:50:28 +00:00
def d2b64(d):
d2={}
for k,v in d.items():
if type(v)==bytes and not isBase64(v):
d2[k]=b64encode(v)
else:
d2[k]=v
return d2
2020-09-05 17:55:12 +00:00
2020-09-13 06:28:25 +00:00
def b64enc(x):
2020-09-13 07:11:30 +00:00
if type(x) not in {str,bytes}: return x
2020-09-13 06:28:25 +00:00
if type(x)==str: x=x.encode()
if not isBase64(x): x=b64encode(x)
return x
2020-09-13 08:10:22 +00:00
def b64dec(x):
if type(x) not in {str,bytes}: return x
if type(x)==str: x=x.encode()
if isBase64(x): x=b64decode(x)
return x
2020-09-13 06:28:25 +00:00
2020-09-13 07:11:30 +00:00
def b64enc_s(x):
return b64enc(x).decode()
2020-09-13 06:28:25 +00:00
def hashish(binary_data):
import hashlib
return hashlib.sha256(binary_data).hexdigest()
2020-09-05 17:55:12 +00:00
2020-09-12 20:48:26 +00:00
def create_secret():
if not os.path.exists(PATH_CRYPT_SECRET):
secret = get_random_binary_id()
from komrade.backend.keymaker import make_key_discreet
print('shhh! creating secret:',make_key_discreet(secret))
with open(PATH_CRYPT_SECRET,'wb') as of:
of.write(secret)
2020-09-12 14:32:03 +00:00
def hasher(dat,secret=None):
import hashlib
if not secret:
2020-09-12 20:48:26 +00:00
create_secret()
2020-09-12 14:32:03 +00:00
with open(PATH_CRYPT_SECRET,'rb') as f:
secret = f.read()
if type(dat)==str:
dat=dat.encode()
# print(dat,secret[:10])
return b64encode(hashlib.sha256(dat + secret).hexdigest().encode()).decode()
2020-09-05 17:55:12 +00:00
from base64 import b64encode,b64decode
import ujson as json
2020-09-07 22:43:35 +00:00
import pickle
def package_for_transmission(data_json):
2020-09-07 21:50:22 +00:00
# print('package_for_transmission.data_json =',data_json)
2020-09-09 19:15:35 +00:00
2020-09-07 22:43:35 +00:00
data_json_b = pickle.dumps(data_json)
2020-09-09 19:15:35 +00:00
# print('data_json_b??')
2020-09-07 21:50:22 +00:00
# print('package_for_transmission.data_json_b =',data_json_bstr)
2020-09-09 19:22:13 +00:00
return data_json_b
2020-09-07 21:45:01 +00:00
def dejsonize(dict):
for k,v in dict.items():
if type(v)==str and isBase64(v):
2020-09-07 21:46:07 +00:00
dict[k]=v.encode()
# if type(v)==bytes and isBase64(v):
# dict[k]=b64decode(v)
elif type(v)==dict:
2020-09-07 21:45:01 +00:00
dict[k]=dejsonize(v)
return dict
def unpackage_from_transmission(data_json_b64):
2020-09-07 22:43:35 +00:00
# print('unpackage_from_transmission.data_json_b64 =',data_json_b64)
data_json_b = b64decode(data_json_b64)
2020-09-09 17:39:30 +00:00
# print('unpackage_from_transmission.data_json_bstr =',data_json_b)
2020-09-07 21:45:01 +00:00
2020-09-07 22:43:35 +00:00
data_json = pickle.loads(data_json_b)
# print('unpackage_from_transmission.data_json =',data_json)
2020-09-07 21:45:01 +00:00
2020-09-07 22:43:35 +00:00
# data_json_dejson = dejsonize(data_json)
# print('unpackage_from_transmission.data_json =',data_json_dejson)
2020-09-07 21:45:01 +00:00
return data_json
2020-09-05 17:55:12 +00:00
2020-09-08 06:58:54 +00:00
def get_random_id():
import uuid
return uuid.uuid4().hex
def get_random_binary_id():
import base64
idstr = get_random_id()
return base64.b64encode(idstr.encode())
2020-09-05 17:55:12 +00:00
# Recursive dictionary merge
# https://gist.github.com/angstwad/bf22d1822c38a92ec0a9
#
# Copyright (C) 2016 Paul Durivage <pauldurivage+github@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import collections
def dict_merge(dct, merge_dct):
""" Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
updating only top-level keys, dict_merge recurses down into dicts nested
to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
``dct``.
:param dct: dict onto which the merge is executed
:param merge_dct: dct merged into dct
:return: None
"""
2020-09-06 21:12:49 +00:00
for k, v in merge_dct.items():
2020-09-05 17:55:12 +00:00
if (k in dct and isinstance(dct[k], dict)
and isinstance(merge_dct[k], collections.Mapping)):
dict_merge(dct[k], merge_dct[k])
else:
2020-09-10 12:28:36 +00:00
dct[k] = merge_dct[k]
def capture_stdout(func):
import io
from contextlib import redirect_stdout
f = io.StringIO()
with redirect_stdout(f):
func()
out = f.getvalue()
2020-09-11 14:35:47 +00:00
return out
2020-09-12 07:55:23 +00:00
def scan_print(xstr,min_pause=0,max_pause=.01,speed=1):
2020-09-11 14:35:47 +00:00
import random,time
for c in xstr:
print(c,end='',flush=True)
2020-09-11 17:17:21 +00:00
naptime=random.uniform(min_pause, max_pause / speed)
time.sleep(naptime)
# time.sleep()
2020-09-11 14:35:47 +00:00
def get_qr_str(data):
import qrcode
qr=qrcode.QRCode()
qr.add_data(data)
ascii = capture_stdout(qr.print_ascii)
ascii = ascii[:-1] # removing last line break
2020-09-11 17:17:21 +00:00
return '\n ' + ascii.strip()
def indent_str(x,n):
import textwrap as tw
return tw.indent(x,' '*n)