From 4335740abebe7de0312c3a9fdacfe6458581287b Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Sat, 16 Apr 2016 21:21:12 +0300 Subject: [PATCH] Add experimental support for GPG signing via TREZOR In order to use this feature, GPG "modern" (v2.1) is required [1]. Also, since TREZOR protocol does not support arbitrary long fields, TREZOR firmware needs to be adapted with the following patch [2], to support signing fixed-size digests of GPG messages of arbitrary size. [1] https://gist.github.com/vt0r/a2f8c0bcb1400131ff51 [2] https://gist.github.com/romanz/b66f5df1ca8ef15641df8ea5bb09fd47 --- gpg/check.py | 290 ++++++++++++++++++++++++++++++++++++++++++++++++++ gpg/demo.sh | 14 +++ gpg/signer.py | 222 ++++++++++++++++++++++++++++++++++++++ gpg/util.py | 18 ++++ 4 files changed, 544 insertions(+) create mode 100755 gpg/check.py create mode 100755 gpg/demo.sh create mode 100755 gpg/signer.py create mode 100644 gpg/util.py diff --git a/gpg/check.py b/gpg/check.py new file mode 100755 index 0000000..88d5423 --- /dev/null +++ b/gpg/check.py @@ -0,0 +1,290 @@ +#!/usr/bin/env python +import argparse +import base64 +import binascii +import contextlib +import hashlib +import io +import logging +import struct + +import ecdsa + +import util + +log = logging.getLogger(__name__) + + +def bit(value, i): + return 1 if value & (1 << i) else 0 + + +def low_bits(value, n): + return value & ((1 << n) - 1) + + +def readfmt(stream, fmt): + size = struct.calcsize(fmt) + blob = stream.read(size) + return struct.unpack(fmt, blob) + + +class Reader(object): + def __init__(self, stream): + self.s = stream + self._captured = None + + def readfmt(self, fmt): + size = struct.calcsize(fmt) + blob = self.read(size) + obj, = struct.unpack(fmt, blob) + return obj + + def read(self, size=None): + blob = self.s.read(size) + if size is not None and len(blob) < size: + raise EOFError + if self._captured: + self._captured.write(blob) + return blob + + @contextlib.contextmanager + def capture(self, stream): + self._captured = stream + try: + yield + finally: + self._captured = None + +length_types = {0: '>B', 1: '>H', 2: '>L'} + + +def parse_subpackets(s): + subpackets = [] + total_size = s.readfmt('>H') + data = s.read(total_size) + s = Reader(io.BytesIO(data)) + + while True: + try: + subpacket_len = s.readfmt('B') + except EOFError: + break + + subpackets.append(s.read(subpacket_len)) + + return subpackets + + +def parse_mpi(s): + bits = s.readfmt('>H') + blob = bytearray(s.read(int((bits + 7) // 8))) + return sum(v << (8 * i) for i, v in enumerate(reversed(blob))) + + +def split_bits(value, *bits): + result = [] + for b in reversed(bits): + mask = (1 << b) - 1 + result.append(value & mask) + value = value >> b + assert value == 0 + return reversed(result) + + +class Parser(object): + def __init__(self, stream, to_hash=None): + self.stream = stream + self.packet_types = { + 2: self.signature, + 4: self.onepass, + 6: self.pubkey, + 11: self.literal, + 13: self.user_id, + } + self.to_hash = io.BytesIO() + if to_hash: + self.to_hash.write(to_hash) + + def __iter__(self): + return self + + def onepass(self, stream): + # pylint: disable=no-self-use + p = {'type': 'onepass'} + p['version'] = stream.readfmt('B') + p['sig_type'] = stream.readfmt('B') + p['hash_alg'] = stream.readfmt('B') + p['pubkey_alg'] = stream.readfmt('B') + p['key_id'] = stream.readfmt('8s') + p['nested'] = stream.readfmt('B') + assert not stream.read() + return p + + def literal(self, stream): + p = {'type': 'literal'} + p['format'] = stream.readfmt('c') + filename_len = stream.readfmt('B') + p['filename'] = stream.read(filename_len) + p['date'] = stream.readfmt('>L') + with stream.capture(self.to_hash): + p['content'] = stream.read() + return p + + def signature(self, stream): + p = {'type': 'signature'} + + to_hash = io.BytesIO() + with stream.capture(to_hash): + p['version'] = stream.readfmt('B') + p['sig_type'] = stream.readfmt('B') + p['pubkey_alg'] = stream.readfmt('B') + p['hash_alg'] = stream.readfmt('B') + p['hashed_subpackets'] = parse_subpackets(stream) + self.to_hash.write(to_hash.getvalue()) + + # https://tools.ietf.org/html/rfc4880#section-5.2.4 + self.to_hash.write(b'\x04\xff' + struct.pack('>L', to_hash.tell())) + data_to_sign = self.to_hash.getvalue() + log.debug('hashing %d bytes for signature: %r', + len(data_to_sign), data_to_sign) + digest = hashlib.sha256(data_to_sign).digest() + + p['unhashed_subpackets'] = parse_subpackets(stream) + p['hash_prefix'] = stream.readfmt('2s') + if p['hash_prefix'] != digest[:2]: + log.warning('Bad hash prefix: %r (expected %r)', + digest[:2], p['hash_prefix']) + else: + p['digest'] = digest + p['sig'] = (parse_mpi(stream), parse_mpi(stream)) + assert not stream.read() + + return p + + def pubkey(self, stream): + p = {'type': 'pubkey'} + packet = io.BytesIO() + with stream.capture(packet): + p['version'] = stream.readfmt('B') + p['created'] = stream.readfmt('>L') + p['algo'] = stream.readfmt('B') + + # https://tools.ietf.org/html/rfc6637#section-11 + oid_size = stream.readfmt('B') + oid = stream.read(oid_size) + assert oid == b'\x2A\x86\x48\xCE\x3D\x03\x01\x07' # NIST P-256 + + mpi = parse_mpi(stream) + log.debug('mpi: %x', mpi) + prefix, x, y = split_bits(mpi, 4, 256, 256) + assert prefix == 4 + p['point'] = (x, y) + assert not stream.read() + + # https://tools.ietf.org/html/rfc4880#section-12.2 + packet_data = packet.getvalue() + data_to_hash = (b'\x99' + struct.pack('>H', len(packet_data)) + + packet_data) + p['key_id'] = hashlib.sha1(data_to_hash).digest()[-8:] + log.debug('key ID: %s', binascii.hexlify(p['key_id']).decode('ascii')) + self.to_hash.write(data_to_hash) + + return p + + def user_id(self, stream): + value = stream.read() + self.to_hash.write(b'\xb4' + struct.pack('>L', len(value))) + self.to_hash.write(value) + return {'type': 'user_id', 'value': value} + + def __next__(self): + try: + # https://tools.ietf.org/html/rfc4880#section-4.2 + value = self.stream.readfmt('B') + except EOFError: + raise StopIteration + + log.debug('prefix byte: %02x', value) + assert bit(value, 7) == 1 + assert bit(value, 6) == 0 # new format not supported yet + + tag = low_bits(value, 6) + length_type = low_bits(tag, 2) + tag = tag >> 2 + fmt = length_types[length_type] + log.debug('length_type: %s', fmt) + packet_size = self.stream.readfmt(fmt) + log.debug('packet length: %d', packet_size) + packet_data = self.stream.read(packet_size) + packet_type = self.packet_types.get(tag) + if packet_type: + p = packet_type(Reader(io.BytesIO(packet_data))) + else: + p = {'type': 'UNKNOWN'} + p['tag'] = tag + log.debug('packet "%s": %s', p['type'], p) + return p + + next = __next__ + + +def original_data(filename): + parts = filename.rsplit('.', 1) + if len(parts) == 2 and parts[1] in ('sig', 'asc'): + log.debug('loading file %s', parts[0]) + return open(parts[0], 'rb').read() + + +def load_public_key(filename): + parser = Parser(Reader(open(filename, 'rb'))) + pubkey, userid, signature = list(parser) + log.info('loaded %s public key', userid['value']) + verify_digest(pubkey=pubkey, digest=signature['digest'], + signature=signature['sig'], label=filename) + return pubkey + + +def check(pubkey, sig_file): + d = open(sig_file, 'rb') + if d.name.endswith('.asc'): + lines = d.readlines()[3:-1] + data = base64.b64decode(''.join(lines)) + payload, checksum = data[:-3], data[-3:] + assert util.crc24(payload) == checksum + d = io.BytesIO(payload) + parser = Parser(Reader(d), original_data(sig_file)) + signature, = list(parser) + verify_digest(pubkey=pubkey, digest=signature['digest'], + signature=signature['sig'], label=sig_file) + + +def verify_digest(pubkey, digest, signature, label): + coords = pubkey['point'] + point = ecdsa.ellipticcurve.Point(curve=ecdsa.NIST256p.curve, + x=coords[0], y=coords[1]) + v = ecdsa.VerifyingKey.from_public_point(point=point, + curve=ecdsa.curves.NIST256p, + hashfunc=hashlib.sha256) + try: + v.verify_digest(signature=signature, + digest=digest, + sigdecode=lambda rs, order: rs) + log.info('%s is OK', label) + except ecdsa.keys.BadSignatureError: + log.error('%s has bad signature!', label) + raise + + +def main(): + logging.basicConfig(level=logging.INFO, + format='%(asctime)s %(levelname)-10s %(message)s') + p = argparse.ArgumentParser() + p.add_argument('pubkey') + p.add_argument('signature') + args = p.parse_args() + check(pubkey=load_public_key(args.pubkey), + sig_file=args.signature) + +if __name__ == '__main__': + main() diff --git a/gpg/demo.sh b/gpg/demo.sh new file mode 100755 index 0000000..6bc7480 --- /dev/null +++ b/gpg/demo.sh @@ -0,0 +1,14 @@ +#!/bin/bash +set -x +CREATED=1460731897 # needed for consistent public key creation +NAME="trezor_key" # will be used as GPG user id and public key name + +echo "Hello GPG World!" > EXAMPLE +./signer.py $NAME --time $CREATED --public-key --file EXAMPLE --verbose +./check.py $NAME.pub EXAMPLE.sig # pure Python verification + +# Install GPG v2.1 (modern) and verify the signature +gpg2 --import $NAME.pub +gpg2 --list-keys $NAME +# gpg2 --edit-key trezor_key trust # optional: mark it as trusted +gpg2 --verify EXAMPLE.sig diff --git a/gpg/signer.py b/gpg/signer.py new file mode 100755 index 0000000..1faf0a5 --- /dev/null +++ b/gpg/signer.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python +import argparse +import base64 +import binascii +import hashlib +import logging +import struct +import time + +import ecdsa + +import trezor_agent.client +import trezor_agent.formats +import trezor_agent.util +import util + +log = logging.getLogger(__name__) + + +def prefix_len(fmt, blob): + return struct.pack(fmt, len(blob)) + blob + + +def packet(tag, blob): + assert len(blob) < 256 + length_type = 0 # : 1 byte for length + leading_byte = 0x80 | (tag << 2) | (length_type) + return struct.pack('>B', leading_byte) + prefix_len('>B', blob) + + +def subpacket(subpacket_type, fmt, *values): + blob = struct.pack(fmt, *values) if values else fmt + return struct.pack('>B', subpacket_type) + blob + + +def subpacket_long(subpacket_type, value): + return subpacket(subpacket_type, '>L', value) + + +def subpacket_time(value): + return subpacket_long(2, value) + + +def subpacket_byte(subpacket_type, value): + return subpacket(subpacket_type, '>B', value) + + +def subpackets(*items): + prefixed = [prefix_len('>B', item) for item in items] + return prefix_len('>H', b''.join(prefixed)) + + +def mpi(value): + bits = value.bit_length() + data_size = (bits + 7) // 8 + data_bytes = [0] * data_size + for i in range(data_size): + data_bytes[i] = value & 0xFF + value = value >> 8 + + data_bytes.reverse() + return struct.pack('>H', bits) + bytearray(data_bytes) + + +def time_format(t): + return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t)) + + +def hexlify(blob): + return binascii.hexlify(blob).decode('ascii').upper() + + +def split_lines(body, size): + lines = [] + for i in range(0, len(body), size): + lines.append(body[i:i+size] + '\n') + return ''.join(lines) + +def armor_sig(blob): + head = '-----BEGIN PGP SIGNATURE-----\nVersion: GnuPG v2\n\n' + body = base64.b64encode(blob) + checksum = base64.b64encode(util.crc24(blob)) + tail = '-----END PGP SIGNATURE-----\n' + return head + split_lines(body + '=' + checksum, 64) + tail + +class Signer(object): + + curve = ecdsa.NIST256p + ecdsa_curve_name = trezor_agent.formats.CURVE_NIST256 + + def __init__(self, user_id, created): + self.user_id = user_id + self.client_wrapper = trezor_agent.factory.load() + + # This requires the following patch to trezor-mcu to work: + # https://gist.github.com/romanz/b66f5df1ca8ef15641df8ea5bb09fd47 + self.identity = self.client_wrapper.identity_type() + self.identity.proto = 'gpg' + self.identity.host = user_id + addr = trezor_agent.client.get_address(self.identity) + public_node = self.client_wrapper.connection.get_public_node( + n=addr, ecdsa_curve_name=self.ecdsa_curve_name) + + verifying_key = trezor_agent.formats.decompress_pubkey( + pubkey=public_node.node.public_key, + curve_name=self.ecdsa_curve_name) + + self.created = int(created) + header = struct.pack('>BLB', + 4, # version + self.created, # creation + 19) # ECDSA + + # https://tools.ietf.org/html/rfc6637#section-11 (NIST P-256 OID) + oid = prefix_len('>B', b'\x2A\x86\x48\xCE\x3D\x03\x01\x07') + + point = verifying_key.pubkey.point + self.pubkey_data = header + oid + mpi((4 << 512) | + (point.x() << 256) | + (point.y())) + + self.data_to_hash = b'\x99' + prefix_len('>H', self.pubkey_data) + fingerprint = hashlib.sha1(self.data_to_hash).digest() + self.key_id = fingerprint[-8:] + log.info('key %s created at %s', + hexlify(fingerprint[-4:]), time_format(self.created)) + + def close(self): + self.client_wrapper.connection.clear_session() + self.client_wrapper.connection.close() + + def export(self): + pubkey_packet = packet(tag=6, blob=self.pubkey_data) + user_id_packet = packet(tag=13, blob=self.user_id) + + user_id_to_hash = user_id_packet[:1] + prefix_len('>L', self.user_id) + data_to_sign = self.data_to_hash + user_id_to_hash + log.info('signing user_id: %r', self.user_id.decode('ascii')) + hashed_subpackets = [ + subpacket_time(self.created), # signature creaion time + subpacket_byte(0x1B, 1 | 2), # key flags (certify & sign) + subpacket_byte(0x15, 8), # preferred hash (SHA256) + subpacket_byte(0x16, 0), # preferred compression (none) + subpacket_byte(0x17, 0x80)] # key server prefs (no-modify) + signature = self._make_signature(visual='Sign GPG public key', + data_to_sign=data_to_sign, + sig_type=0x13, # user id & public key + hashed_subpackets=hashed_subpackets) + + sign_packet = packet(tag=2, blob=signature) + return pubkey_packet + user_id_packet + sign_packet + + def sign(self, msg, sign_time=None): + if sign_time is None: + sign_time = int(time.time()) + + log.info('signing message %r at %s', msg, + time_format(sign_time)) + hashed_subpackets = [subpacket_time(sign_time)] + blob = self._make_signature( + visual='Sign GPG message', + data_to_sign=msg, hashed_subpackets=hashed_subpackets) + return packet(tag=2, blob=blob) + + def _make_signature(self, visual, data_to_sign, + hashed_subpackets, sig_type=0): + header = struct.pack('>BBBB', + 4, # version + sig_type, # rfc4880 (section-5.2.1) + 19, # pubkey_alg (ECDSA) + 8) # hash_alg (SHA256) + hashed = subpackets(*hashed_subpackets) + unhashed = subpackets( + subpacket(16, self.key_id) # issuer key id + ) + tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed)) + data_to_hash = data_to_sign + header + hashed + tail + + log.debug('hashing %d bytes', len(data_to_hash)) + digest = hashlib.sha256(data_to_hash).digest() + + result = self.client_wrapper.connection.sign_identity( + identity=self.identity, + challenge_hidden=hashlib.sha256(data_to_hash).digest(), + challenge_visual=visual, + ecdsa_curve_name=self.ecdsa_curve_name) + assert result.signature[:1] == b'\x00' + sig = result.signature[1:] + sig = [trezor_agent.util.bytes2num(sig[:32]), + trezor_agent.util.bytes2num(sig[32:])] + + hash_prefix = digest[:2] # used for decoder's sanity check + signature = mpi(sig[0]) + mpi(sig[1]) # actual ECDSA signature + return header + hashed + unhashed + hash_prefix + signature + + +def main(): + p = argparse.ArgumentParser() + p.add_argument('user_id') + p.add_argument('-t', '--time', type=int, default=int(time.time())) + p.add_argument('-f', '--filename') + p.add_argument('-a', '--armor', action='store_true', default=False) + p.add_argument('-p', '--public-key', action='store_true', default=False) + p.add_argument('-v', '--verbose', action='store_true', default=False) + + args = p.parse_args() + logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO, + format='%(asctime)s %(levelname)-10s %(message)s') + s = Signer(user_id=args.user_id.encode('ascii'), created=args.time) + if args.public_key: + open(args.user_id + '.pub', 'wb').write(s.export()) + if args.filename: + data = open(args.filename, 'rb').read() + sig, ext = s.sign(data), '.sig' + if args.armor: + sig, ext = armor_sig(sig), '.asc' + open(args.filename + ext, 'wb').write(sig) + s.close() + + +if __name__ == '__main__': + main() diff --git a/gpg/util.py b/gpg/util.py new file mode 100644 index 0000000..a2b1f57 --- /dev/null +++ b/gpg/util.py @@ -0,0 +1,18 @@ +import struct + + +def crc24(blob): + CRC24_INIT = 0xB704CEL + CRC24_POLY = 0x1864CFBL + + crc = CRC24_INIT + for octet in bytearray(blob): + crc ^= (octet << 16) + for _ in range(8): + crc <<= 1 + if crc & 0x1000000: + crc ^= CRC24_POLY + assert 0 <= crc < 0x1000000 + crc_bytes = struct.pack('>L', crc) + assert crc_bytes[0] == b'\x00' + return crc_bytes[1:]