trezor-agent/trezor_agent/gpg/keyring.py

161 lines
4.3 KiB
Python
Raw Normal View History

2016-04-27 18:01:21 +00:00
"""Tools for doing signature using gpg-agent."""
import binascii
import io
import logging
import os
import re
import socket
import subprocess
2016-04-27 18:01:21 +00:00
from .. import util
log = logging.getLogger(__name__)
def connect_to_agent(sock_path='~/.gnupg/S.gpg-agent', sp=subprocess):
2016-04-27 18:01:21 +00:00
"""Connect to GPG agent's UNIX socket."""
sock_path = os.path.expanduser(sock_path)
sp.check_call(['gpg-connect-agent', '/bye'])
2016-04-27 18:01:21 +00:00
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(sock_path)
return sock
def _communicate(sock, msg):
2016-05-23 20:03:02 +00:00
msg += '\n'
sock.sendall(msg.encode('ascii'))
log.debug('-> %r', msg)
2016-04-27 18:01:21 +00:00
return _recvline(sock)
def _recvline(sock):
reply = io.BytesIO()
while True:
c = sock.recv(1)
2016-05-23 20:03:02 +00:00
if c == b'\n':
2016-04-27 18:01:21 +00:00
break
reply.write(c)
2016-05-23 20:03:02 +00:00
result = reply.getvalue()
log.debug('<- %r', result)
return result
2016-04-27 18:01:21 +00:00
2016-05-26 20:16:08 +00:00
def unescape(s):
2016-05-27 08:19:10 +00:00
"""Unescape ASSUAN message (0xAB <-> '%AB')."""
2016-04-27 18:01:21 +00:00
s = bytearray(s)
i = 0
while i < len(s):
if s[i] == ord('%'):
2016-05-23 20:03:02 +00:00
hex_bytes = bytes(s[i+1:i+3])
value = int(hex_bytes.decode('ascii'), 16)
2016-04-27 18:01:21 +00:00
s[i:i+3] = [value]
i += 1
return bytes(s)
2016-05-26 20:16:08 +00:00
def parse_term(s):
"""Parse single s-expr term from bytes."""
2016-05-23 20:03:02 +00:00
size, s = s.split(b':', 1)
2016-04-27 18:01:21 +00:00
size = int(size)
return s[:size], s[size:]
2016-05-26 20:16:08 +00:00
def parse(s):
"""Parse full s-expr from bytes."""
2016-05-23 20:03:02 +00:00
if s.startswith(b'('):
2016-04-27 18:01:21 +00:00
s = s[1:]
2016-05-26 20:16:08 +00:00
name, s = parse_term(s)
2016-04-27 18:01:21 +00:00
values = [name]
2016-05-23 20:03:02 +00:00
while not s.startswith(b')'):
2016-05-26 20:16:08 +00:00
value, s = parse(s)
2016-04-27 18:01:21 +00:00
values.append(value)
return values, s[1:]
else:
2016-05-26 20:16:08 +00:00
return parse_term(s)
2016-04-27 18:01:21 +00:00
def _parse_ecdsa_sig(args):
(r, sig_r), (s, sig_s) = args
2016-05-23 20:03:02 +00:00
assert r == b'r'
assert s == b's'
2016-04-29 14:45:16 +00:00
return (util.bytes2num(sig_r),
util.bytes2num(sig_s))
2016-05-26 20:16:08 +00:00
# DSA happens to have the same structure as ECDSA signatures
_parse_dsa_sig = _parse_ecdsa_sig
2016-04-29 14:45:16 +00:00
def _parse_rsa_sig(args):
(s, sig_s), = args
2016-05-23 20:03:02 +00:00
assert s == b's'
2016-04-30 07:56:15 +00:00
return (util.bytes2num(sig_s),)
2016-05-26 20:16:08 +00:00
def parse_sig(sig):
"""Parse signature integer values from s-expr."""
label, sig = sig
2016-05-23 20:03:02 +00:00
assert label == b'sig-val'
algo_name = sig[0]
2016-05-23 20:03:02 +00:00
parser = {b'rsa': _parse_rsa_sig,
b'ecdsa': _parse_ecdsa_sig,
b'dsa': _parse_dsa_sig}[algo_name]
return parser(args=sig[1:])
def sign_digest(sock, keygrip, digest, sp=subprocess):
2016-04-27 18:01:21 +00:00
"""Sign a digest using specified key using GPG agent."""
hash_algo = 8 # SHA256
assert len(digest) == 32
2016-05-23 20:03:02 +00:00
assert _communicate(sock, 'RESET').startswith(b'OK')
ttyname = sp.check_output(['tty']).strip()
options = ['ttyname={}'.format(ttyname)] # set TTY for passphrase entry
display = os.environ.get('DISPLAY')
if display is not None:
options.append('display={}'.format(display))
for opt in options:
2016-05-23 20:03:02 +00:00
assert _communicate(sock, 'OPTION {}'.format(opt)) == b'OK'
2016-05-23 20:03:02 +00:00
assert _communicate(sock, 'SIGKEY {}'.format(keygrip)) == b'OK'
2016-05-26 20:16:08 +00:00
hex_digest = binascii.hexlify(digest).upper().decode('ascii')
2016-04-27 18:01:21 +00:00
assert _communicate(sock, 'SETHASH {} {}'.format(hash_algo,
2016-05-26 20:16:08 +00:00
hex_digest)) == b'OK'
2016-04-29 07:25:46 +00:00
desc = ('Please+enter+the+passphrase+to+unlock+the+OpenPGP%0A'
'secret+key,+to+sign+a+new+TREZOR-based+subkey')
2016-05-23 20:03:02 +00:00
assert _communicate(sock, 'SETKEYDESC {}'.format(desc)) == b'OK'
assert _communicate(sock, 'PKSIGN') == b'OK'
2016-04-27 18:01:21 +00:00
line = _recvline(sock).strip()
2016-05-26 20:16:08 +00:00
line = unescape(line)
2016-05-23 20:03:02 +00:00
log.debug('unescaped: %r', line)
prefix, sig = line.split(b' ', 1)
if prefix != b'D':
raise ValueError(prefix)
2016-04-27 18:01:21 +00:00
2016-05-26 20:16:08 +00:00
sig, leftover = parse(sig)
assert not leftover, leftover
2016-05-26 20:16:08 +00:00
return parse_sig(sig)
2016-04-27 18:01:21 +00:00
def get_keygrip(user_id, sp=subprocess):
2016-04-27 18:01:21 +00:00
"""Get a keygrip of the primary GPG key of the specified user."""
args = ['gpg2', '--list-keys', '--with-keygrip', user_id]
output = sp.check_output(args).decode('ascii')
2016-04-27 18:01:21 +00:00
return re.findall(r'Keygrip = (\w+)', output)[0]
def export_public_key(user_id, sp=subprocess):
2016-05-27 08:19:10 +00:00
"""Export GPG public key for specified `user_id`."""
args = ['gpg2', '--export'] + ([user_id] if user_id else [])
result = sp.check_output(args=args)
2016-05-27 08:19:10 +00:00
if not result:
log.error('could not find public key %r in local GPG keyring', user_id)
raise KeyError(user_id)
2016-05-27 08:19:10 +00:00
return result