diff --git a/trezor_agent/__main__.py b/trezor_agent/__main__.py index a56264b..6fae2c3 100644 --- a/trezor_agent/__main__.py +++ b/trezor_agent/__main__.py @@ -7,14 +7,14 @@ import re import subprocess import sys -from . import client, formats, protocol, server +from . import client, formats, protocol, server, util log = logging.getLogger(__name__) def ssh_args(label): """Create SSH command for connecting specified server.""" - identity = client.string_to_identity(label, identity_type=dict) + identity = util.string_to_identity(label, identity_type=dict) args = [] if 'port' in identity: diff --git a/trezor_agent/client.py b/trezor_agent/client.py index dd7ec6a..f3fe760 100644 --- a/trezor_agent/client.py +++ b/trezor_agent/client.py @@ -6,8 +6,6 @@ It is used for getting SSH public keys and ECDSA signing of server requests. import binascii import io import logging -import re -import struct from . import factory, formats, util @@ -39,7 +37,7 @@ class Client(object): def get_identity(self, label, index=0): """Parse label string into Identity protobuf.""" - identity = string_to_identity(label, self.identity_type) + identity = util.string_to_identity(label, self.identity_type) identity.proto = 'ssh' identity.index = index return identity @@ -47,10 +45,10 @@ class Client(object): def get_public_key(self, label): """Get SSH public key corresponding to specified by label.""" identity = self.get_identity(label=label) - label = identity_to_string(identity) # canonize key label + label = util.identity_to_string(identity) # canonize key label log.info('getting "%s" public key (%s) from %s...', label, self.curve, self.device_name) - addr = get_address(identity) + addr = util.get_bip32_address(identity) node = self.client.get_public_node(n=addr, ecdsa_curve_name=self.curve) @@ -92,55 +90,6 @@ class Client(object): return result.signature[1:] -_identity_regexp = re.compile(''.join([ - '^' - r'(?:(?P.*)://)?', - r'(?:(?P.*)@)?', - r'(?P.*?)', - r'(?::(?P\w*))?', - r'(?P/.*)?', - '$' -])) - - -def string_to_identity(s, identity_type): - """Parse string into Identity protobuf.""" - m = _identity_regexp.match(s) - result = m.groupdict() - log.debug('parsed identity: %s', result) - kwargs = {k: v for k, v in result.items() if v} - return identity_type(**kwargs) - - -def identity_to_string(identity): - """Dump Identity protobuf into its string representation.""" - result = [] - if identity.proto: - result.append(identity.proto + '://') - if identity.user: - result.append(identity.user + '@') - result.append(identity.host) - if identity.port: - result.append(':' + identity.port) - if identity.path: - result.append(identity.path) - return ''.join(result) - - -def get_address(identity, ecdh=False): - """Compute BIP32 derivation address according to SLIP-0013/0017.""" - index = struct.pack('I", pathElement) + return result + + @staticmethod + def convert_public_key(ecdsa_curve_name, result): + from trezorlib.messages_pb2 import PublicKey # pylint: disable=import-error + if ecdsa_curve_name == "nist256p1": + if (result[64] & 1) != 0: + result = bytearray([0x03]) + result[1:33] + else: + result = bytearray([0x02]) + result[1:33] + else: + result = result[1:] + keyX = bytearray(result[0:32]) + keyY = bytearray(result[32:][::-1]) + if (keyX[31] & 1) != 0: + keyY[31] |= 0x80 + result = chr(0) + str(keyY) + publicKey = PublicKey() + publicKey.node.public_key = str(result) + return publicKey + + # pylint: disable=unused-argument + def get_public_node(self, n, ecdsa_curve_name="secp256k1", show_display=False): + donglePath = LedgerClientConnection.expand_path(n) + if ecdsa_curve_name == "nist256p1": + p2 = "01" + else: + p2 = "02" + apdu = "800200" + p2 + apdu = apdu.decode('hex') + apdu += chr(len(donglePath) + 1) + chr(len(donglePath) / 4) + apdu += donglePath + result = bytearray(self.dongle.exchange(bytes(apdu)))[1:] + return LedgerClientConnection.convert_public_key(ecdsa_curve_name, result) + + # pylint: disable=too-many-locals + def sign_identity(self, identity, challenge_hidden, challenge_visual, + ecdsa_curve_name="secp256k1"): + from trezorlib.messages_pb2 import SignedIdentity # pylint: disable=import-error + n = util.get_bip32_address(identity) + donglePath = LedgerClientConnection.expand_path(n) + if identity.proto == 'ssh': + ins = "04" + p1 = "00" + else: + ins = "08" + p1 = "00" + if ecdsa_curve_name == "nist256p1": + p2 = "81" if identity.proto == 'ssh' else "01" + else: + p2 = "82" if identity.proto == 'ssh' else "02" + apdu = "80" + ins + p1 + p2 + apdu = apdu.decode('hex') + apdu += chr(len(challenge_hidden) + len(donglePath) + 1) + apdu += chr(len(donglePath) / 4) + donglePath + apdu += challenge_hidden + result = bytearray(self.dongle.exchange(bytes(apdu))) + if ecdsa_curve_name == "nist256p1": + offset = 3 + length = result[offset] + r = result[offset+1:offset+1+length] + if r[0] == 0: + r = r[1:] + offset = offset + 1 + length + 1 + length = result[offset] + s = result[offset+1:offset+1+length] + if s[0] == 0: + s = s[1:] + offset = offset + 1 + length + signature = SignedIdentity() + signature.signature = chr(0) + str(r) + str(s) + if identity.proto == 'ssh': + keyData = result[offset:] + pk = LedgerClientConnection.convert_public_key(ecdsa_curve_name, keyData) + signature.public_key = pk.node.public_key + return signature + else: + signature = SignedIdentity() + signature.signature = chr(0) + str(result[0:64]) + if identity.proto == 'ssh': + keyData = result[64:] + pk = LedgerClientConnection.convert_public_key(ecdsa_curve_name, keyData) + signature.public_key = pk.node.public_key + return signature + + def get_ecdh_session_key(self, identity, peer_public_key, ecdsa_curve_name="secp256k1"): + from trezorlib.messages_pb2 import ECDHSessionKey # pylint: disable=import-error + n = util.get_bip32_address(identity, True) + donglePath = LedgerClientConnection.expand_path(n) + if ecdsa_curve_name == "nist256p1": + p2 = "01" + else: + p2 = "02" + apdu = "800a00" + p2 + apdu = apdu.decode('hex') + apdu += chr(len(peer_public_key) + len(donglePath) + 1) + apdu += chr(len(donglePath) / 4) + donglePath + apdu += peer_public_key + result = bytearray(self.dongle.exchange(bytes(apdu))) + sessionKey = ECDHSessionKey() + sessionKey.session_key = str(result) + return sessionKey + + def clear_session(self): + pass + + def close(self): + self.dongle.close() + + # pylint: disable=unused-argument + # pylint: disable=no-self-use + def ping(self, msg, button_protection=False, pin_protection=False, + passphrase_protection=False): + return msg + + class CallException(Exception): + def __init__(self, code, message): + super(CallException, self).__init__() + self.args = [code, message] + try: + from ledgerblue.comm import getDongle + except ImportError: + log.exception('Missing module: install via "pip install ledgerblue"') + # pylint: disable=bare-except + try: + from trezorlib.types_pb2 import IdentityType # pylint: disable=import-error + dongle = getDongle() + except: + return + yield ClientWrapper(connection=LedgerClientConnection(dongle), + identity_type=IdentityType, + device_name="ledger", + call_exception=CallException) + LOADERS = [ _load_trezor, - _load_keepkey + _load_keepkey, + _load_ledger ] diff --git a/trezor_agent/gpg/agent.py b/trezor_agent/gpg/agent.py index 63cac3b..cd6f8be 100644 --- a/trezor_agent/gpg/agent.py +++ b/trezor_agent/gpg/agent.py @@ -39,7 +39,7 @@ def sig_encode(r, s): def pksign(keygrip, digest, algo): """Sign a message digest using a private EC key.""" - assert algo == '8' + assert algo == '8', 'Unsupported hash algorithm ID {}'.format(algo) user_id = os.environ['TREZOR_GPG_USER_ID'] pubkey_dict = decode.load_public_key( pubkey_bytes=keyring.export_public_key(user_id=user_id), diff --git a/trezor_agent/gpg/encode.py b/trezor_agent/gpg/encode.py index 2287cc8..c25962d 100644 --- a/trezor_agent/gpg/encode.py +++ b/trezor_agent/gpg/encode.py @@ -3,7 +3,7 @@ import logging import time from . import decode, keyring, protocol -from .. import client, factory, formats, util +from .. import factory, formats, util log = logging.getLogger(__name__) @@ -21,7 +21,7 @@ class HardwareSigner(object): def pubkey(self, ecdh=False): """Return public key as VerifyingKey object.""" - addr = client.get_address(identity=self.identity, ecdh=ecdh) + addr = util.get_bip32_address(identity=self.identity, ecdh=ecdh) public_node = self.client_wrapper.connection.get_public_node( n=addr, ecdsa_curve_name=self.curve_name) diff --git a/trezor_agent/tests/test_client.py b/trezor_agent/tests/test_client.py index 022a398..6a8273f 100644 --- a/trezor_agent/tests/test_client.py +++ b/trezor_agent/tests/test_client.py @@ -87,8 +87,8 @@ def test_ssh_agent(): def ssh_sign_identity(identity, challenge_hidden, challenge_visual, ecdsa_curve_name): - assert (client.identity_to_string(identity) == - client.identity_to_string(ident)) + assert (util.identity_to_string(identity) == + util.identity_to_string(ident)) assert challenge_hidden == BLOB assert challenge_visual == '' assert ecdsa_curve_name == 'nist256p1' @@ -133,4 +133,4 @@ def test_utils(): identity.path = '/path' url = 'https://user@host:443/path' - assert client.identity_to_string(identity) == url + assert util.identity_to_string(identity) == url diff --git a/trezor_agent/util.py b/trezor_agent/util.py index 2d8a5ef..a067089 100644 --- a/trezor_agent/util.py +++ b/trezor_agent/util.py @@ -1,9 +1,14 @@ """Various I/O and serialization utilities.""" import binascii import contextlib +import hashlib import io +import logging +import re import struct +log = logging.getLogger(__name__) + def send(conn, data): """Send data blob to connection socket.""" @@ -173,3 +178,52 @@ class Reader(object): yield finally: self._captured = None + + +_identity_regexp = re.compile(''.join([ + '^' + r'(?:(?P.*)://)?', + r'(?:(?P.*)@)?', + r'(?P.*?)', + r'(?::(?P\w*))?', + r'(?P/.*)?', + '$' +])) + + +def string_to_identity(s, identity_type): + """Parse string into Identity protobuf.""" + m = _identity_regexp.match(s) + result = m.groupdict() + log.debug('parsed identity: %s', result) + kwargs = {k: v for k, v in result.items() if v} + return identity_type(**kwargs) + + +def identity_to_string(identity): + """Dump Identity protobuf into its string representation.""" + result = [] + if identity.proto: + result.append(identity.proto + '://') + if identity.user: + result.append(identity.user + '@') + result.append(identity.host) + if identity.port: + result.append(':' + identity.port) + if identity.path: + result.append(identity.path) + return ''.join(result) + + +def get_bip32_address(identity, ecdh=False): + """Compute BIP32 derivation address according to SLIP-0013/0017.""" + index = struct.pack('