2
0
mirror of https://github.com/ComradCollective/Comrad synced 2024-11-11 13:10:45 +00:00
Comrad/komrade/backend/the_telephone.py
quadrismegistus e1d38700df updates
2020-09-12 20:00:33 +01:00

88 lines
2.8 KiB
Python

# internal imports
import os,sys; sys.path.append(os.path.abspath(os.path.join(os.path.abspath(os.path.join(os.path.dirname(__file__),'..')),'..')))
from komrade import *
from komrade.backend import *
from komrade.backend.phonelines import *
### ACTUAL PHONE CONNECTIONS
class TheTelephone(Operator):
"""
API client class for Caller to interact with The Operator.
"""
def __init__(self, caller=None):
super().__init__(name=TELEPHONE_NAME)
self.caller=caller
from komrade.backend.phonelines import check_phonelines
keychain = check_phonelines()[TELEPHONE_NAME]
self._keychain = self.load_keychain_from_bytes(keychain)
# def find_pubkey(self):
# return self.telephone_keychain.get('pubkey')
def send_and_receive(self,msg_d,**y):
msg_b=msg_d["msg"]
msg_b64 = b64encode(msg_b)
msg_b64_str = msg_b64.decode()
self.log(f'But because I am @Telephone, and @Operator communicates only with me directly,\n I will omit all metadata and send only the encrypted packcage:\n\n{msg_b64_str}')
# seal for transport
msg_b64_str_esc = msg_b64_str.replace('/','_')
# dial the operator
URL = OPERATOR_API_URL + msg_b64_str_esc + '/'
self.log("DIALING THE OPERATOR:",URL)
phonecall=komrade_request(URL)
if phonecall.status_code!=200:
self.log('!! error in request',phonecall.status_code,phonecall.text)
return
# response back from Operator!
resp_msg_b64_str = phonecall.text
# self.log('resp_msg_b64_str',resp_msg_b64_str)
resp_msg_b64 = resp_msg_b64_str.encode()
resp_msg_b = b64decode(resp_msg_b64)
# unseal
resp_msg_obj = self.unseal_msg(resp_msg_b)
# res = resp_msg_b_unsealed
self.log('unsealed resp_msg_obj',resp_msg_obj)
# decrypt
# resp_msg_obj.decrypt()
# self.log('returning decrypted form:',resp_msg_obj)
return resp_msg_obj
# return self.pronto_pronto(resp_msg_obj)
def ring_ring(self,msg,**y):
return super().ring_ring(
msg,
to_whom=self.op,
get_resp_from=self.send_and_receive,
**y
)
def test_call():
phone = TheTelephone()
pprint(phone.keychain())
# caller = Caller('marx33') #Caller('marx')
# caller.boot(create=True)
# print(caller.keychain())
# phone = TheTelephone()
# req_json = {'_route':'forge_new_keys','name':name, 'pubkey_is_public':pubkey_is_public}}
# req_json_s = jsonify(req_json)
# res = phone.req({'forge_new_keys':{'name':'marx', 'pubkey_is_public':True}})
# print(res)
# asyncio.run(caller.get_new_keys())
# x=caller.get_new_keys(passphrase='1869')
# print('YEAH COOL',x)
## main
if __name__=='__main__': test_call()