2
0
mirror of https://github.com/ComradCollective/Comrad synced 2024-11-17 21:25:37 +00:00
Comrad/komrade/backend/keymaker.py

477 lines
17 KiB
Python
Raw Normal View History

2020-09-05 16:26:37 +00:00
import os,sys; sys.path.append(os.path.abspath(os.path.join(os.path.abspath(os.path.join(os.path.dirname(__file__),'..')),'..')))
from komrade import *
2020-09-05 21:11:42 +00:00
from komrade.backend.crypt import *
2020-09-06 14:45:40 +00:00
from abc import ABC, abstractmethod
class KomradeKey(ABC):
@abstractmethod
def encrypt(self,msg,**kwargs): pass
@abstractmethod
def decrypt(self,msg,**kwargs): pass
@abstractmethod
def data(self): pass
2020-09-06 13:07:27 +00:00
2020-09-06 14:45:40 +00:00
class KomradeSymmetricKey(KomradeKey):
2020-09-06 13:07:27 +00:00
@property
def cell(self):
if not hasattr(self,'_cell'):
if hasattr(self,'passphrase') and self.passphrase:
2020-09-06 14:45:40 +00:00
self._cell = SCellSeal(passphrase=self.passphrase)
2020-09-06 13:07:27 +00:00
elif hasattr(self,'key') and self.key:
2020-09-06 14:45:40 +00:00
self._cell = SCellSeal(key=self.key)
return self._cell
def encrypt(self,msg,**kwargs):
if issubclass(type(msg), KomradeKey): msg=msg.data
return self.cell.encrypt(msg,**kwargs)
def decrypt(self,msg,**kwargs):
return self.cell.decrypt(msg,**kwargs)
2020-09-06 13:07:27 +00:00
2020-09-06 14:45:40 +00:00
class KomradeSymmetricKeyWithPassphrase(KomradeSymmetricKey):
2020-09-08 07:20:42 +00:00
def __init__(self,passphrase=DEBUG_DEFAULT_PASSPHRASE, why=WHY_MSG):
2020-09-06 13:07:27 +00:00
self.passphrase=passphrase
if not self.passphrase:
self.passphrase=getpass.getpass(why)
2020-09-06 14:45:40 +00:00
#return self.passphrase
@property
def data(self): return KEY_TYPE_SYMMETRIC_WITH_PASSPHRASE.encode('utf-8')
2020-09-06 13:07:27 +00:00
2020-09-06 14:45:40 +00:00
class KomradeSymmetricKeyWithoutPassphrase(KomradeSymmetricKey):
2020-09-07 14:51:59 +00:00
def __init__(self,key=None):
self.key = GenerateSymmetricKey() if not key else key
2020-09-06 14:45:40 +00:00
@property
def data(self): return self.key
class KomradeAsymmetricKey(KomradeKey):
def __init__(self,pubkey,privkey):
self.pubkey=pubkey
self.privkey=privkey
def encrypt(self,msg,pubkey=None,privkey=None):
if issubclass(type(msg), KomradeKey): msg=msg.data
pubkey=pubkey if pubkey else self.pubkey
privkey=privkey if privkey else self.privkey
return SMessage(privkey,pubkey).wrap(msg)
def decrypt(self,msg,pubkey=None,privkey=None):
pubkey=pubkey if pubkey else self.pubkey
privkey=privkey if privkey else self.privkey
return SMessage(privkey,pubkey).unwrap(msg)
@property
def data(self): return self.key
class KomradeAsymmetricPublicKey(KomradeAsymmetricKey):
@property
def key(self): return self.pubkey
class KomradeAsymmetricPrivateKey(KomradeAsymmetricKey):
@property
def key(self): return self.privkey
2020-09-06 13:07:27 +00:00
class Keymaker(Logger):
2020-09-08 07:01:35 +00:00
def __init__(self,
name=None,
2020-09-08 07:20:42 +00:00
passphrase=DEBUG_DEFAULT_PASSPHRASE,
2020-09-08 07:01:35 +00:00
uri_id=None,
keychain={},
path_crypt_keys=None,
path_crypt_data=None):
# set defaults
2020-09-05 14:09:31 +00:00
self.name=name
2020-09-08 08:13:35 +00:00
self._uri_id=uri_id
2020-09-08 11:23:41 +00:00
self._pubkey=None
2020-09-07 17:11:52 +00:00
self._keychain=keychain
2020-09-05 14:09:31 +00:00
self.passphrase=passphrase
2020-09-06 09:48:01 +00:00
self.path_crypt_keys=path_crypt_keys
self.path_crypt_data=path_crypt_data
2020-09-05 14:09:31 +00:00
2020-09-09 14:38:37 +00:00
def find_pubkey(self):
global TELEPHONE_KEYCHAIN,OPERATOR_KEYCHAIN
2020-09-08 15:31:16 +00:00
#self.log('keychain?',self.keychain())
if 'pubkey' in self._keychain and self._keychain['pubkey']:
return self._keychain['pubkey']
2020-09-09 14:38:37 +00:00
2020-09-08 15:31:16 +00:00
res = self.crypt_keys.get(self.name, prefix='/pubkey/')
if res: return res
2020-09-09 15:41:33 +00:00
res = self.load_qr(self.name)
2020-09-08 15:31:16 +00:00
if res: return res
self.log('I don\'t know my public key!')
2020-09-09 14:38:37 +00:00
raise KomradeException(f'I don\'t know my public key!\n{self}\n{self._keychain}')
#return None
2020-09-09 18:31:36 +00:00
@property
def keys(self):
return sorted(list(self.keychain().keys()))
@property
def top_keys(self):
return [k for k in self.keys if k.count('_')==0]
2020-09-08 11:23:41 +00:00
def keychain(self,look_for=KEYMAKER_DEFAULT_ALL_KEY_NAMES):
2020-09-09 14:38:37 +00:00
# load existing keychain
2020-09-08 15:31:16 +00:00
keys = self._keychain #self._keychain = keys = {**self._keychain}
2020-09-09 14:38:37 +00:00
# make sure we have the pubkey
if not 'pubkey' in self._keychain: self._keychain['pubkey']=self.find_pubkey()
pubkey=self._keychain['pubkey']
# get uri
uri = b64encode(pubkey)
2020-09-08 11:23:41 +00:00
# get from cache
for keyname in look_for:
2020-09-08 15:14:48 +00:00
if keyname in keys and keys[keyname]: continue
2020-09-08 11:23:41 +00:00
key = self.crypt_keys.get(uri,prefix=f'/{keyname}/')
2020-09-08 15:14:48 +00:00
if key: keys[keyname]=key
2020-09-08 11:23:41 +00:00
# try to assemble
keys = self.assemble(self.assemble(keys))
2020-09-09 14:38:37 +00:00
#store to existing set
self._keychain = keys
#return
2020-09-08 11:23:41 +00:00
return keys
2020-09-09 14:38:37 +00:00
@property
def pubkey(self): return self.keychain().get('pubkey')
2020-09-08 11:23:41 +00:00
@property
2020-09-08 15:14:48 +00:00
def privkey(self): return self.keychain().get('privkey')
2020-09-08 11:23:41 +00:00
@property
2020-09-08 15:14:48 +00:00
def adminkey(self): return self.keychain().get('adminkey')
2020-09-09 17:34:19 +00:00
@property
def pubkey_encr(self): return self.keychain().get('pubkey_encr')
@property
def privkey_encr(self): return self.keychain().get('privkey_encr')
@property
def adminkey_encr(self): return self.keychain().get('adminkey_encr')
@property
def pubkey_decr(self): return self.keychain().get('pubkey_decr')
@property
def privkey_decr(self): return self.keychain().get('privkey_decr')
@property
def adminkey_decr(self): return self.keychain().get('adminkey_decr')
2020-09-08 11:23:41 +00:00
def load_qr(self,name):
# try to load?
contact_fnfn = os.path.join(PATH_QRCODES,name+'.png')
2020-09-08 15:31:16 +00:00
if not os.path.exists(contact_fnfn): return ''
2020-09-08 11:23:41 +00:00
# with open(contact_fnfn,'rb') as f: dat=f.read()
from pyzbar.pyzbar import decode
from PIL import Image
2020-09-08 15:14:48 +00:00
res= decode(Image.open(contact_fnfn))[0].data
2020-09-08 11:23:41 +00:00
2020-09-08 15:14:48 +00:00
# self.log('QR??',res,b64decode(res))
return b64decode(res)
2020-09-08 11:23:41 +00:00
2020-09-08 08:13:35 +00:00
@property
def uri_id(self):
2020-09-08 15:14:48 +00:00
if not self._uri_id:
2020-09-09 14:38:37 +00:00
pubkey = self.find_pubkey()
self._uri_id = b64encode(pubkey)
2020-09-08 08:13:35 +00:00
return self._uri_id
2020-09-05 14:09:31 +00:00
2020-09-04 12:46:22 +00:00
### BASE STORAGE
@property
def crypt_keys(self):
if not hasattr(self,'_crypt_keys'):
2020-09-06 09:48:01 +00:00
self._crypt_keys = Crypt(fn=self.path_crypt_keys)
2020-09-04 12:46:22 +00:00
return self._crypt_keys
2020-09-06 16:29:27 +00:00
@property
def crypt_keys_mem(self):
if not hasattr(self,'_crypt_keys_mem'):
self._crypt_keys_mem = CryptMemory()
return self._crypt_keys_mem
2020-09-04 12:46:22 +00:00
@property
def crypt_data(self):
if not hasattr(self,'_crypt_data'):
2020-09-06 09:48:01 +00:00
self._crypt_data = Crypt(fn=self.path_crypt_data)
2020-09-04 12:46:22 +00:00
return self._crypt_data
2020-09-09 17:34:19 +00:00
def can_log_in(self):
if not self.pubkey: return False
if not (self.privkey or self.privkey_encr): return False
return True
2020-09-04 15:50:08 +00:00
### CREATING KEYS
2020-09-06 13:07:27 +00:00
def get_new_keys(self):
raise KomradeException('Every keymaker must make their own get_new_keys() !')
2020-09-07 23:34:27 +00:00
2020-09-08 07:20:42 +00:00
def gen_keys_from_types(self,key_types=KEYMAKER_DEFAULT_KEY_TYPES,passphrase=DEBUG_DEFAULT_PASSPHRASE):
2020-09-08 06:58:54 +00:00
"""
Get new asymmetric/symmetric keys, given a dictionary of constants describing their type
"""
2020-09-06 13:07:27 +00:00
asymmetric_pubkey=None
asymmetric_privkey=None
2020-09-06 17:25:03 +00:00
keychain = {}
2020-09-06 14:45:40 +00:00
for key_name,key_type_desc in key_types.items():
if key_type_desc in {KEY_TYPE_ASYMMETRIC_PUBKEY,KEY_TYPE_ASYMMETRIC_PRIVKEY}:
2020-09-06 13:07:27 +00:00
if not asymmetric_privkey or not asymmetric_pubkey:
keypair = GenerateKeyPair(KEY_PAIR_TYPE.EC)
asymmetric_privkey = keypair.export_private_key()
asymmetric_pubkey = keypair.export_public_key()
2020-09-06 14:45:40 +00:00
if key_type_desc==KEY_TYPE_ASYMMETRIC_PRIVKEY:
keychain[key_name] = KomradeAsymmetricPrivateKey(asymmetric_pubkey,asymmetric_privkey)
elif key_type_desc==KEY_TYPE_ASYMMETRIC_PUBKEY:
keychain[key_name] = KomradeAsymmetricPublicKey(asymmetric_pubkey,asymmetric_privkey)
2020-09-06 13:07:27 +00:00
elif key_type_desc==KEY_TYPE_SYMMETRIC_WITHOUT_PASSPHRASE:
2020-09-06 14:45:40 +00:00
keychain[key_name]=KomradeSymmetricKeyWithoutPassphrase()
2020-09-06 13:07:27 +00:00
elif key_type_desc==KEY_TYPE_SYMMETRIC_WITH_PASSPHRASE:
2020-09-06 14:45:40 +00:00
if not passphrase and not self.passphrase: self.passphrase=getpass.getpass(WHY_MSG)
passphrase=passphrase if passphrase else self.passphrase
keychain[key_name]=KomradeSymmetricKeyWithPassphrase(passphrase=passphrase)
return keychain
2020-09-06 13:07:27 +00:00
2020-09-07 23:34:27 +00:00
2020-09-08 07:20:42 +00:00
def gen_encr_keys(self,keychain,keys_to_gen,passphrase=DEBUG_DEFAULT_PASSPHRASE):
2020-09-08 06:58:54 +00:00
"""
Encrypt other keys with still other keys!
"""
# generate encrypted keys too
for key_name in keys_to_gen:
if key_name.endswith('_encr') and key_name not in keychain:
# encrypt it with the associated decr
self.log(f'let\'s encrypt {key_name}!')
name_of_what_to_encrypt = key_name[:-len('_encr')]
the_key_to_encrypt_it_with = name_of_what_to_encrypt + '_decr'
if the_key_to_encrypt_it_with in keychain and name_of_what_to_encrypt in keychain:
_key_decr = keychain[the_key_to_encrypt_it_with]
_key = keychain[name_of_what_to_encrypt]
self.log(f'about to encrypt key {name_of_what_to_encrypt}, using {the_key_to_encrypt_it_with}, which is a type {KEYMAKER_DEFAULT_KEY_TYPES[the_key_to_encrypt_it_with]} and has value {keychain[the_key_to_encrypt_it_with]}')
_key_encr = _key_decr.encrypt(_key)
# self.log(f'{_key}\n-- encrypting ----->\n{_key_encr}')
keychain[key_name]=_key_encr
return keychain
2020-09-06 13:07:27 +00:00
2020-09-07 14:02:46 +00:00
2020-09-06 13:07:27 +00:00
def forge_new_keys(self,
2020-09-06 14:45:40 +00:00
name=None,
2020-09-08 07:20:42 +00:00
passphrase=DEBUG_DEFAULT_PASSPHRASE,
2020-09-08 09:17:09 +00:00
keys_to_save = KEYMAKER_DEFAULT_KEYS_TO_SAVE_ON_SERVER,
keys_to_return = KEYMAKER_DEFAULT_KEYS_TO_SAVE_ON_CLIENT,
2020-09-06 14:45:40 +00:00
keys_to_gen = KEYMAKER_DEFAULT_KEYS_TO_GEN,
2020-09-06 13:07:27 +00:00
key_types = KEYMAKER_DEFAULT_KEY_TYPES):
self.log('forging new keys...',name,self.name)
self.log('keys_to_save:',keys_to_save)
self.log('keys_to_return',keys_to_return)
2020-09-08 06:58:54 +00:00
# name
2020-09-06 14:45:40 +00:00
if not name: name=self.name
keys_to_gen = set(keys_to_gen) | set(keys_to_save) | set(keys_to_return)
keys_to_gen = sorted(list(keys_to_gen),key=lambda x: x.count('_'))
self.log('keys_to_gen =',keys_to_gen)
2020-09-06 14:45:40 +00:00
key_types = dict([(k,key_types[k]) for k in keys_to_gen])
self.log('key_types =',key_types)
2020-09-06 14:45:40 +00:00
# get decryptor keys!
keychain = self.gen_keys_from_types(key_types,passphrase=passphrase)
self.log('keychain 1 =',keychain)
# gen encrypted keys!
keychain = self.gen_encr_keys(keychain,keys_to_gen,passphrase=passphrase)
self.log('keychain 2 =',keychain)
2020-09-08 09:14:42 +00:00
# save keys!
2020-09-08 06:58:54 +00:00
# get URI id to save under (except for pubkeys, accessible by name)
2020-09-08 07:16:22 +00:00
uri_id,keys_saved,keychain = self.save_keychain(name,keychain,keys_to_save)
2020-09-08 06:58:54 +00:00
self.log('uri_id =',uri_id)
self.log('keys_saved =',keys_saved)
2020-09-08 07:16:22 +00:00
self.log('keychain =',keychain)
# return keys!
keys_returned = self.return_keychain(keychain,keys_to_return)
2020-09-08 06:59:09 +00:00
return {'uri_id':uri_id,'_keychain':keys_returned}
2020-09-07 23:34:27 +00:00
def return_keychain(self,keychain,keys_to_return=None):
keychain_toreturn = {}
2020-09-08 07:18:07 +00:00
if not keys_to_return: keys_to_return = list(keychain.keys())
for key in keys_to_return:
if key in keychain:
keychain_toreturn[key]=keychain[key]
return keychain_toreturn
2020-09-08 07:26:49 +00:00
def save_uri_as_qrcode(self,name=None,uri_id=None,odir=None):
2020-09-08 07:13:48 +00:00
if not uri_id: uri_id = get_random_id() + get_random_id()
uri_id = self.uri_id
if not uri_id and not self.uri_id: raise KomradeException('Need URI id to save!')
# gen
import pyqrcode
2020-09-08 07:20:42 +00:00
qr = pyqrcode.create(uri_id)
2020-09-08 07:26:49 +00:00
if not odir: odir = PATH_QRCODES
2020-09-08 07:30:10 +00:00
ofnfn = os.path.join(odir,self.name+'.png')
2020-09-08 07:32:44 +00:00
qr.png(ofnfn,scale=5)
2020-09-08 07:13:48 +00:00
self.log('>> saved:',ofnfn)
2020-09-08 06:58:54 +00:00
def save_keychain(self,name,keychain,keys_to_save=None,uri_id=None):
2020-09-07 23:34:27 +00:00
if not keys_to_save: keys_to_save = list(keychain.keys())
2020-09-08 08:38:54 +00:00
if not uri_id: uri_id = b64encode(keychain['pubkey'].data).decode() #uri_id = get_random_id() + get_random_id()
self.log(f'SAVING KEYCHAIN FOR {name} under URI {uri_id}')
2020-09-08 08:13:35 +00:00
self._uri_id = uri_id
2020-09-06 14:45:40 +00:00
# filter for transfer
for k,v in keychain.items():
if issubclass(type(v),KomradeKey):
v=v.data
keychain[k]=v
2020-09-07 23:34:27 +00:00
# save keychain
keys_saved_d={}
for keyname in keys_to_save:
2020-09-08 08:34:01 +00:00
if not '_' in keyname and keyname!='pubkey':
2020-09-07 23:34:27 +00:00
raise KomradeException('there is no private property in a socialist network! all keys must be split between komrades')
if keyname in keychain:
2020-09-08 09:14:42 +00:00
# uri = uri_id
2020-09-09 17:52:54 +00:00
uri = uri_id if keyname!='pubkey' else name
2020-09-08 09:14:42 +00:00
self.crypt_keys.set(uri,keychain[keyname],prefix=f'/{keyname}/')
2020-09-07 23:34:27 +00:00
keys_saved_d[keyname] = keychain[keyname]
2020-09-09 15:35:38 +00:00
# save pubkey as QR
if not 'pubkey' in keys_saved_d:
self.log('did not save pubkey in crypt, storing as QR...')
self.save_uri_as_qrcode(name=name, uri_id=uri_id, odir=PATH_QRCODES)
2020-09-08 07:16:22 +00:00
return (uri_id,keys_saved_d,keychain)
2020-09-06 18:02:18 +00:00
2020-09-07 22:27:29 +00:00
def assemble(self,_keychain):
# last minute assemblies?
encr_keys = [k for k in _keychain if k.endswith('_encr')]
for ekey in encr_keys:
eval=_keychain[ekey]
if not eval: continue
unencrkey = ekey[:-len('_encr')]
if unencrkey in _keychain: continue
decrkey = unencrkey+'_decr'
if decrkey not in _keychain: continue
dval=_keychain[decrkey]
if not dval: continue
2020-09-09 15:24:20 +00:00
# self.log(ekey,decrkey,'??')
# self.log(eval,dval,'????')
2020-09-07 22:27:29 +00:00
new_val = self.assemble_key(eval,dval)
2020-09-09 15:24:20 +00:00
# self.log('!!#!',new_val)
2020-09-07 22:27:29 +00:00
if new_val:
_keychain[unencrkey] = new_val
return _keychain
2020-09-07 23:34:27 +00:00
def assemble_key(self, key_encr, key_decr, key_encr_name=None, key_decr_name=None):
# self.log(f'assembling key: {key_decr} decrypting {key_encr}')
# need the encrypted half
if not key_encr:
# self.log('!! encrypted half not given')
return
if not key_decr:
if self.passphrase:
key_decr = self.passphrase
else:
# self.log('!! decryptor half not given')
return
# need some way to regenerate the decryptor
decr_cell = self.get_cell(key_decr)
# need the decryptor half
if not decr_cell:
# self.log('!! decryptor cell not regenerable')
return
# decrypt!
try:
2020-09-09 15:24:20 +00:00
# self.log(f'>> decrypting {key_encr_name} with {key_decr_name}\n({key_encr} with cell {decr_cell}')
2020-09-07 23:34:27 +00:00
key = decr_cell.decrypt(key_encr)
# self.log('assembled_key built:',key)
return key
except ThemisError as e:
2020-09-09 15:24:20 +00:00
# self.log('!! decryption failed:',e)
2020-09-07 23:34:27 +00:00
return
def get_cell(self, str_or_key_or_cell):
# self.log('getting decr cell for',str_or_key_or_cell)
if type(str_or_key_or_cell)==SCellSeal:
return str_or_key_or_cell
elif type(str_or_key_or_cell)==str:
return SCellSeal(passphrase=str_or_key_or_cell)
elif type(str_or_key_or_cell)==bytes:
return SCellSeal(key=str_or_key_or_cell)
2020-09-07 22:27:29 +00:00
2020-09-08 11:23:41 +00:00
# def keychain(self,
# passphrase=DEBUG_DEFAULT_PASSPHRASE,
# extra_keys={},
# keys_to_gen=KEYMAKER_DEFAULT_KEYS_TO_GEN,
# uri_id=None,
# **kwargs):
# # assemble as many keys as we can!
# self.log(f'''keychain(
# passphrase={passphrase},
# extra_keys={extra_keys},
# keys_to_gen={keys_to_gen},
# uri_id={uri_id},
# **kwargs = {kwargs}
# )''')
# if not uri_id: uri_id = self.uri_id
# if not uri_id and not self.uri_id:
# raise KomradeException('Need URI id to complete finding of keys!')
# self.log('getting keychain for uri ID:',uri_id)
# # if not force and hasattr(self,'_keychain') and self._keychain: return self._keychain
# if passphrase: self.passphrase=passphrase
# # start off keychain
# _keychain = {**extra_keys, **self._keychain}
# self.log('_keychain at start of keychain() =',_keychain)
2020-09-07 22:27:29 +00:00
2020-09-08 11:23:41 +00:00
# # find
# for keyname in keys_to_gen:
# if keyname in _keychain and _keychain[keyname]: continue
# # self.log('??',keyname,keyname in self._keychain,'...')
# newkey = self.crypt_keys.get(uri_id,prefix=f'/{keyname}/')
# if newkey: _keychain[keyname] = newkey
2020-09-07 22:27:29 +00:00
2020-09-08 11:23:41 +00:00
# # return
# _keychain = self.assemble(_keychain)
# self._keychain = _keychain
# return _keychain
2020-09-07 22:27:29 +00:00
return _keychain
2020-09-06 14:45:40 +00:00
if __name__ == '__main__':
keymaker = Keymaker('marx69')
keychain = keymaker.forge_new_keys()
print(keychain)