python-trezor/trezorlib/transport.py

134 lines
4.2 KiB
Python
Raw Normal View History

2012-11-13 14:09:39 +00:00
import struct
2012-11-15 10:42:19 +00:00
import mapping
2012-11-13 14:09:39 +00:00
2013-01-05 14:42:09 +00:00
class NotImplementedException(Exception):
pass
class ConnectionError(Exception):
pass
2012-11-13 14:09:39 +00:00
class Transport(object):
def __init__(self, device, *args, **kwargs):
self.device = device
2013-09-09 13:36:17 +00:00
self.session_depth = 0
2012-11-13 14:09:39 +00:00
self._open()
def _open(self):
2013-01-05 14:42:09 +00:00
raise NotImplementedException("Not implemented")
2012-11-13 14:09:39 +00:00
def _close(self):
2013-01-05 14:42:09 +00:00
raise NotImplementedException("Not implemented")
2012-11-13 14:09:39 +00:00
2014-07-26 14:27:28 +00:00
def _write(self, msg, protobuf_msg):
2013-01-05 14:42:09 +00:00
raise NotImplementedException("Not implemented")
2012-11-13 14:09:39 +00:00
def _read(self):
2013-01-05 14:42:09 +00:00
raise NotImplementedException("Not implemented")
2012-11-13 14:09:39 +00:00
2013-09-09 13:36:17 +00:00
def _session_begin(self):
pass
def _session_end(self):
pass
2012-11-15 20:08:02 +00:00
def ready_to_read(self):
2014-08-26 13:54:06 +00:00
"""
Returns True if there is data to be read from the transport. Otherwise, False.
"""
2013-01-05 14:42:09 +00:00
raise NotImplementedException("Not implemented")
2013-09-09 13:36:17 +00:00
def session_begin(self):
2014-08-26 13:54:06 +00:00
"""
Apply a lock to the device in order to preform synchronous multistep "conversations" with the device. For example, before entering the transaction signing workflow, one begins a session. After the transaction is complete, the session may be ended.
"""
2013-09-09 13:36:17 +00:00
if self.session_depth == 0:
self._session_begin()
self.session_depth += 1
def session_end(self):
2014-08-26 13:54:06 +00:00
"""
End a session. Se session_begin for an in depth description of TREZOR sessions.
"""
2013-09-09 13:36:17 +00:00
self.session_depth -= 1
self.session_depth = max(0, self.session_depth)
if self.session_depth == 0:
self._session_end()
2012-11-15 20:08:02 +00:00
2012-11-13 14:09:39 +00:00
def close(self):
2014-08-26 13:54:06 +00:00
"""
Close the connection to the physical device or file descriptor represented by the Transport.
"""
2012-11-13 14:09:39 +00:00
self._close()
def write(self, msg):
2014-08-26 13:54:06 +00:00
"""
Write mesage to tansport. msg should be a member of a valid `protobuf class <https://developers.google.com/protocol-buffers/docs/pythontutorial>`_ with a SerializeToString() method.
"""
2012-11-13 14:09:39 +00:00
ser = msg.SerializeToString()
header = struct.pack(">HL", mapping.get_type(msg), len(ser))
2014-07-26 14:27:28 +00:00
self._write("##%s%s" % (header, ser), msg)
2012-11-15 20:08:02 +00:00
2012-11-13 14:09:39 +00:00
def read(self):
2014-08-26 13:54:06 +00:00
"""
If there is data available to be read from the transport, reads the data and tries to parse it as a protobuf message. If the parsing succeeds, return a protobuf object.
Otherwise, returns None.
"""
2012-12-09 13:53:09 +00:00
if not self.ready_to_read():
return None
2012-12-09 13:53:09 +00:00
data = self._read()
if data == None:
return None
return self._parse_message(data)
def read_blocking(self):
2014-08-26 13:54:06 +00:00
"""
Same as read, except blocks untill data is available to be read.
"""
while True:
data = self._read()
if data != None:
break
return self._parse_message(data)
def _parse_message(self, data):
2012-12-09 13:53:09 +00:00
(msg_type, data) = data
2014-07-26 14:27:28 +00:00
if msg_type == 'protobuf':
return data
else:
inst = mapping.get_class(msg_type)()
inst.ParseFromString(data)
return inst
2012-11-13 14:09:39 +00:00
def _read_headers(self, read_f):
# Try to read headers until some sane value are detected
is_ok = False
while not is_ok:
# Align cursor to the beginning of the header ("##")
c = read_f.read(1)
2013-09-24 23:14:54 +00:00
i = 0
2012-11-13 14:09:39 +00:00
while c != '#':
2013-09-24 23:14:54 +00:00
i += 1
if i >= 64:
2012-11-13 14:09:39 +00:00
# timeout
raise Exception("Timed out while waiting for the magic character")
2013-09-24 23:14:54 +00:00
#print "Aligning to magic characters"
2012-11-13 14:09:39 +00:00
c = read_f.read(1)
if read_f.read(1) != "#":
# Second character must be # to be valid header
raise Exception("Second magic character is broken")
# Now we're most likely on the beginning of the header
try:
headerlen = struct.calcsize(">HL")
(msg_type, datalen) = struct.unpack(">HL", read_f.read(headerlen))
break
except:
raise Exception("Cannot parse header length")
2012-11-15 10:42:19 +00:00
return (msg_type, datalen)