trezor-agent/trezor_agent/gpg/agent.py

137 lines
4.2 KiB
Python
Raw Normal View History

2016-05-30 14:49:21 +00:00
"""GPG-agent utilities."""
2016-05-28 20:02:45 +00:00
import binascii
import contextlib
import logging
2016-06-21 23:56:37 +00:00
import os
2016-05-28 20:02:45 +00:00
2016-05-30 14:49:21 +00:00
from . import decode, encode, keyring
2016-06-04 06:36:48 +00:00
from .. import util
2016-05-28 20:02:45 +00:00
log = logging.getLogger(__name__)
def yield_connections(sock):
"""Run a server on the specified socket."""
while True:
log.debug('waiting for connection on %s', sock.getsockname())
2016-06-03 11:42:40 +00:00
try:
conn, _ = sock.accept()
except KeyboardInterrupt:
return
2016-05-28 20:02:45 +00:00
conn.settimeout(None)
log.debug('accepted connection on %s', sock.getsockname())
yield conn
def serialize(data):
2016-05-30 14:49:21 +00:00
"""Serialize data according to ASSUAN protocol."""
2016-05-28 20:02:45 +00:00
for c in ['%', '\n', '\r']:
data = data.replace(c, '%{:02X}'.format(ord(c)))
return data
2016-05-30 14:49:21 +00:00
def sig_encode(r, s):
"""Serialize ECDSA signature data into GPG S-expression."""
2016-05-28 20:02:45 +00:00
r = serialize(util.num2bytes(r, 32))
s = serialize(util.num2bytes(s, 32))
2016-06-01 15:34:22 +00:00
return '(7:sig-val(5:ecdsa(1:r32:{})(1:s32:{})))'.format(r, s)
2016-05-28 20:02:45 +00:00
2016-05-30 14:49:21 +00:00
def pksign(keygrip, digest, algo):
"""Sign a message digest using a private EC key."""
assert algo == '8'
2016-06-21 23:56:37 +00:00
user_id = os.environ['TREZOR_GPG_USER_ID']
2016-06-11 12:06:35 +00:00
pubkey_dict = decode.load_public_key(
2016-06-21 23:56:37 +00:00
pubkey_bytes=keyring.export_public_key(user_id=user_id),
use_custom=True, ecdh=False)
2016-06-11 12:06:35 +00:00
pubkey, conn = encode.load_from_public_key(pubkey_dict=pubkey_dict)
with contextlib.closing(conn):
assert pubkey.keygrip == binascii.unhexlify(keygrip)
r, s = conn.sign(binascii.unhexlify(digest))
2016-05-30 14:49:21 +00:00
result = sig_encode(r, s)
log.debug('result: %r', result)
return result
2016-05-28 20:02:45 +00:00
def _serialize_point(data):
data = '{}:'.format(len(data)) + data
# https://www.gnupg.org/documentation/manuals/assuan/Server-responses.html
for c in ['%', '\n', '\r']:
data = data.replace(c, '%{:02X}'.format(ord(c)))
return '(5:value' + data + ')'
2016-06-03 11:39:16 +00:00
def parse_ecdh(line):
2016-06-03 19:44:25 +00:00
"""Parse ECDH request and return remote public key."""
2016-06-03 11:39:16 +00:00
prefix, line = line.split(' ', 1)
assert prefix == 'D'
exp, leftover = keyring.parse(keyring.unescape(line))
log.debug('ECDH s-exp: %r', exp)
assert not leftover
label, exp = exp
assert label == b'enc-val'
assert exp[0] == b'ecdh'
items = exp[1:]
log.debug('ECDH parameters: %r', items)
return dict(items)['e']
def pkdecrypt(keygrip, conn):
2016-06-03 19:44:25 +00:00
"""Handle decryption using ECDH."""
for msg in [b'S INQUIRE_MAXLEN 4096', b'INQUIRE CIPHERTEXT']:
keyring.sendline(conn, msg)
line = keyring.recvline(conn)
2016-06-03 11:39:16 +00:00
assert keyring.recvline(conn) == b'END'
remote_pubkey = parse_ecdh(line)
2016-06-21 23:56:37 +00:00
user_id = os.environ['TREZOR_GPG_USER_ID']
2016-06-03 11:39:16 +00:00
local_pubkey = decode.load_public_key(
2016-06-21 23:56:37 +00:00
pubkey_bytes=keyring.export_public_key(user_id=user_id),
use_custom=True, ecdh=True)
2016-06-11 12:06:35 +00:00
pubkey, conn = encode.load_from_public_key(pubkey_dict=local_pubkey)
with contextlib.closing(conn):
assert pubkey.keygrip == binascii.unhexlify(keygrip)
shared_secret = conn.ecdh(remote_pubkey)
assert len(shared_secret) == 65
assert shared_secret[:1] == b'\x04'
return _serialize_point(shared_secret)
2016-05-28 20:02:45 +00:00
def handle_connection(conn):
2016-05-30 14:49:21 +00:00
"""Handle connection from GPG binary using the ASSUAN protocol."""
2016-05-28 20:02:45 +00:00
keygrip = None
digest = None
algo = None
2016-06-04 16:45:03 +00:00
version = keyring.gpg_version()
2016-05-28 20:02:45 +00:00
2016-06-01 15:34:22 +00:00
keyring.sendline(conn, b'OK')
2016-07-26 14:50:49 +00:00
for line in keyring.iterlines(conn):
2016-05-28 20:02:45 +00:00
parts = line.split(' ')
command = parts[0]
args = parts[1:]
if command in {'RESET', 'OPTION', 'HAVEKEY', 'SETKEYDESC'}:
pass # reply with OK
elif command == 'GETINFO':
2016-06-04 16:45:03 +00:00
keyring.sendline(conn, b'D ' + version)
2016-05-28 20:02:45 +00:00
elif command == 'AGENT_ID':
2016-06-01 15:34:22 +00:00
keyring.sendline(conn, b'D TREZOR')
elif command in {'SIGKEY', 'SETKEY'}:
2016-05-28 20:02:45 +00:00
keygrip, = args
elif command == 'SETHASH':
algo, digest = args
elif command == 'PKSIGN':
2016-05-30 14:49:21 +00:00
sig = pksign(keygrip, digest, algo)
2016-06-01 15:34:22 +00:00
keyring.sendline(conn, b'D ' + sig)
elif command == 'PKDECRYPT':
sec = pkdecrypt(keygrip, conn)
keyring.sendline(conn, b'D ' + sec)
2016-06-03 14:43:46 +00:00
elif command == 'BYE':
return
2016-05-28 20:02:45 +00:00
else:
log.error('unknown request: %r', line)
return
2016-06-01 15:34:22 +00:00
keyring.sendline(conn, b'OK')