2016-10-26 19:38:35 +00:00
|
|
|
"""TREZOR-related code (see http://bitcointrezor.com/)."""
|
|
|
|
|
|
|
|
import binascii
|
|
|
|
import logging
|
2017-01-06 10:44:58 +00:00
|
|
|
import os
|
2017-05-21 18:03:04 +00:00
|
|
|
import sys
|
2017-01-06 10:37:14 +00:00
|
|
|
|
2016-10-26 19:38:35 +00:00
|
|
|
import semver
|
|
|
|
|
|
|
|
from . import interface
|
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
class Trezor(interface.Device):
|
|
|
|
"""Connection to TREZOR device."""
|
|
|
|
|
2016-12-27 10:34:07 +00:00
|
|
|
@property
|
|
|
|
def _defs(self):
|
|
|
|
from . import trezor_defs
|
2017-04-21 18:25:46 +00:00
|
|
|
# Allow using TREZOR bridge transport (instead of the HID default)
|
|
|
|
trezor_defs.Transport = {
|
|
|
|
'bridge': trezor_defs.BridgeTransport,
|
|
|
|
}.get(os.environ.get('TREZOR_TRANSPORT'), trezor_defs.HidTransport)
|
2016-12-27 10:34:07 +00:00
|
|
|
return trezor_defs
|
2016-10-26 19:38:35 +00:00
|
|
|
|
|
|
|
required_version = '>=1.4.0'
|
2017-01-06 10:44:58 +00:00
|
|
|
passphrase = os.environ.get('TREZOR_PASSPHRASE', '')
|
2016-10-26 19:38:35 +00:00
|
|
|
|
|
|
|
def connect(self):
|
|
|
|
"""Enumerate and connect to the first USB HID interface."""
|
2017-01-06 09:59:57 +00:00
|
|
|
def passphrase_handler(_):
|
2017-01-06 10:44:58 +00:00
|
|
|
log.debug('using %s passphrase for %s',
|
|
|
|
'non-empty' if self.passphrase else 'empty', self)
|
2017-01-06 09:59:57 +00:00
|
|
|
return self._defs.PassphraseAck(passphrase=self.passphrase)
|
2016-10-26 19:38:35 +00:00
|
|
|
|
2017-05-21 14:29:57 +00:00
|
|
|
def create_pin_handler(conn):
|
2017-08-31 14:00:27 +00:00
|
|
|
if not sys.stdin.closed and os.isatty(sys.stdin.fileno()):
|
2017-05-21 14:29:57 +00:00
|
|
|
return conn.callback_PinMatrixRequest # CLI-based PIN handler
|
|
|
|
|
|
|
|
def qt_handler(_):
|
2017-05-21 18:03:04 +00:00
|
|
|
# pylint: disable=import-error
|
|
|
|
from PyQt5.QtWidgets import QApplication, QInputDialog, QLineEdit
|
2017-05-21 14:29:57 +00:00
|
|
|
label = ('Use the numeric keypad to describe number positions.\n'
|
|
|
|
'The layout is:\n'
|
|
|
|
' 7 8 9\n'
|
|
|
|
' 4 5 6\n'
|
|
|
|
' 1 2 3\n'
|
|
|
|
'Please enter PIN:')
|
|
|
|
app = QApplication([])
|
|
|
|
qd = QInputDialog()
|
|
|
|
qd.setTextEchoMode(QLineEdit.Password)
|
|
|
|
qd.setLabelText(label)
|
|
|
|
qd.show()
|
|
|
|
app.exec_()
|
|
|
|
return self._defs.PinMatrixAck(pin=qd.textValue())
|
|
|
|
|
|
|
|
return qt_handler
|
|
|
|
|
2017-04-21 18:25:46 +00:00
|
|
|
for d in self._defs.Transport.enumerate():
|
2016-10-26 19:38:35 +00:00
|
|
|
log.debug('endpoint: %s', d)
|
2017-04-21 18:25:46 +00:00
|
|
|
transport = self._defs.Transport(d)
|
2016-12-27 10:34:07 +00:00
|
|
|
connection = self._defs.Client(transport)
|
2017-01-06 09:59:57 +00:00
|
|
|
connection.callback_PassphraseRequest = passphrase_handler
|
2017-05-21 14:29:57 +00:00
|
|
|
connection.callback_PinMatrixRequest = create_pin_handler(connection)
|
2016-10-26 19:38:35 +00:00
|
|
|
f = connection.features
|
|
|
|
log.debug('connected to %s %s', self, f.device_id)
|
|
|
|
log.debug('label : %s', f.label)
|
|
|
|
log.debug('vendor : %s', f.vendor)
|
|
|
|
current_version = '{}.{}.{}'.format(f.major_version,
|
|
|
|
f.minor_version,
|
|
|
|
f.patch_version)
|
|
|
|
log.debug('version : %s', current_version)
|
|
|
|
log.debug('revision : %s', binascii.hexlify(f.revision))
|
|
|
|
if not semver.match(current_version, self.required_version):
|
|
|
|
fmt = ('Please upgrade your {} firmware to {} version'
|
|
|
|
' (current: {})')
|
|
|
|
raise ValueError(fmt.format(self, self.required_version,
|
|
|
|
current_version))
|
|
|
|
connection.ping(msg='', pin_protection=True) # unlock PIN
|
|
|
|
return connection
|
|
|
|
raise interface.NotFoundError('{} not connected'.format(self))
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
"""Close connection."""
|
|
|
|
self.conn.close()
|
|
|
|
|
2016-11-03 18:19:24 +00:00
|
|
|
def pubkey(self, identity, ecdh=False):
|
2016-10-26 19:38:35 +00:00
|
|
|
"""Return public key."""
|
2016-11-03 18:19:24 +00:00
|
|
|
curve_name = identity.get_curve_name(ecdh=ecdh)
|
2016-10-26 19:38:35 +00:00
|
|
|
log.debug('"%s" getting public key (%s) from %s',
|
2017-10-07 16:18:58 +00:00
|
|
|
identity.to_string(), curve_name, self)
|
2016-11-03 18:19:24 +00:00
|
|
|
addr = identity.get_bip32_address(ecdh=ecdh)
|
2016-10-26 19:38:35 +00:00
|
|
|
result = self.conn.get_public_node(n=addr,
|
|
|
|
ecdsa_curve_name=curve_name)
|
|
|
|
log.debug('result: %s', result)
|
|
|
|
return result.node.public_key
|
|
|
|
|
2016-11-03 18:19:24 +00:00
|
|
|
def _identity_proto(self, identity):
|
2016-12-27 10:34:07 +00:00
|
|
|
result = self._defs.IdentityType()
|
2016-11-03 18:19:24 +00:00
|
|
|
for name, value in identity.items():
|
2016-10-26 19:38:35 +00:00
|
|
|
setattr(result, name, value)
|
|
|
|
return result
|
|
|
|
|
2016-11-03 18:19:24 +00:00
|
|
|
def sign(self, identity, blob):
|
2016-10-26 19:38:35 +00:00
|
|
|
"""Sign given blob and return the signature (as bytes)."""
|
2016-11-03 18:19:24 +00:00
|
|
|
curve_name = identity.get_curve_name(ecdh=False)
|
2016-10-26 19:38:35 +00:00
|
|
|
log.debug('"%s" signing %r (%s) on %s',
|
2017-10-07 16:18:58 +00:00
|
|
|
identity.to_string(), blob, curve_name, self)
|
2016-10-26 19:38:35 +00:00
|
|
|
try:
|
|
|
|
result = self.conn.sign_identity(
|
2016-11-03 18:19:24 +00:00
|
|
|
identity=self._identity_proto(identity),
|
2016-10-26 19:38:35 +00:00
|
|
|
challenge_hidden=blob,
|
|
|
|
challenge_visual='',
|
|
|
|
ecdsa_curve_name=curve_name)
|
|
|
|
log.debug('result: %s', result)
|
|
|
|
assert len(result.signature) == 65
|
|
|
|
assert result.signature[:1] == b'\x00'
|
|
|
|
return result.signature[1:]
|
2017-01-06 10:37:14 +00:00
|
|
|
except self._defs.Error as e:
|
2016-10-26 19:38:35 +00:00
|
|
|
msg = '{} error: {}'.format(self, e)
|
|
|
|
log.debug(msg, exc_info=True)
|
|
|
|
raise interface.DeviceError(msg)
|
|
|
|
|
2016-11-03 18:19:24 +00:00
|
|
|
def ecdh(self, identity, pubkey):
|
2016-10-26 19:38:35 +00:00
|
|
|
"""Get shared session key using Elliptic Curve Diffie-Hellman."""
|
2016-11-03 18:19:24 +00:00
|
|
|
curve_name = identity.get_curve_name(ecdh=True)
|
2016-10-26 19:38:35 +00:00
|
|
|
log.debug('"%s" shared session key (%s) for %r from %s',
|
2017-10-07 16:18:58 +00:00
|
|
|
identity.to_string(), curve_name, pubkey, self)
|
2016-10-26 19:38:35 +00:00
|
|
|
try:
|
|
|
|
result = self.conn.get_ecdh_session_key(
|
2016-11-03 18:19:24 +00:00
|
|
|
identity=self._identity_proto(identity),
|
2016-10-26 19:38:35 +00:00
|
|
|
peer_public_key=pubkey,
|
|
|
|
ecdsa_curve_name=curve_name)
|
|
|
|
log.debug('result: %s', result)
|
|
|
|
assert len(result.session_key) in {65, 33} # NIST256 or Curve25519
|
|
|
|
assert result.session_key[:1] == b'\x04'
|
|
|
|
return result.session_key
|
2017-01-06 10:37:14 +00:00
|
|
|
except self._defs.Error as e:
|
2016-10-26 19:38:35 +00:00
|
|
|
msg = '{} error: {}'.format(self, e)
|
|
|
|
log.debug(msg, exc_info=True)
|
|
|
|
raise interface.DeviceError(msg)
|