HACK: add preliminary gpg support

This commit is contained in:
Roman Zeyde 2016-05-30 17:49:21 +03:00
parent d9b07e2ac6
commit 92649b290f
5 changed files with 72 additions and 67 deletions

View File

@ -7,7 +7,7 @@ import os
import sys
import time
from . import decode, encode, keyring, proto
from . import encode, keyring, proto
log = logging.getLogger(__name__)
@ -28,36 +28,6 @@ def run_create(args):
sys.stdout.write(proto.armor(result, 'PUBLIC KEY BLOCK'))
def run_sign(args):
"""Generate a GPG signature using hardware-based device."""
pubkey = decode.load_public_key(keyring.export_public_key(user_id=None),
use_custom=True)
f = encode.Factory.from_public_key(pubkey=pubkey,
user_id=pubkey['user_id'])
with contextlib.closing(f):
if args.filename:
data = open(args.filename, 'rb').read()
else:
data = sys.stdin.read()
sig = f.sign_message(data)
sig = proto.armor(sig, 'SIGNATURE').encode('ascii')
decode.verify(pubkey=pubkey, signature=sig, original_data=data)
filename = '-' # write to stdout
if args.output:
filename = args.output
elif args.filename:
filename = args.filename + '.asc'
if filename == '-':
output = sys.stdout
else:
output = open(filename, 'wb')
output.write(sig)
def main():
"""Main function."""
p = argparse.ArgumentParser()
@ -72,13 +42,6 @@ def main():
create.add_argument('-t', '--time', type=int, default=int(time.time()))
create.set_defaults(run=run_create)
sign = subparsers.add_parser('sign')
sign.add_argument('filename', nargs='?',
help='Use stdin, if not specified.')
sign.add_argument('-o', '--output', default=None,
help='Use stdout, if equals to "-".')
sign.set_defaults(run=run_sign)
args = p.parse_args()
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO,
format='%(asctime)s %(levelname)-10s %(message)s')

View File

@ -1,15 +1,10 @@
"""GPG-agent utilities."""
import binascii
import contextlib
import hashlib
import logging
import os
import select
import subprocess
import threading
import ecdsa
from . import keyring
from . import decode, encode, keyring
from .. import server, util
log = logging.getLogger(__name__)
@ -26,29 +21,36 @@ def yield_connections(sock):
def serialize(data):
"""Serialize data according to ASSUAN protocol."""
for c in ['%', '\n', '\r']:
data = data.replace(c, '%{:02X}'.format(ord(c)))
return data
def sig_encode(r, s, o):
def sig_encode(r, s):
"""Serialize ECDSA signature data into GPG S-expression."""
r = serialize(util.num2bytes(r, 32))
s = serialize(util.num2bytes(s, 32))
return '(7:sig-val(5:ecdsa(1:r32:{})(1:s32:{})))\n'.format(r, s)
def pksign(keygrip, digest):
pk = 0x99ddae8aee45de830e08889f76ce1f7f993d80a1a05e843ae950b5ba62c16efb
sk = ecdsa.SigningKey.from_secret_exponent(pk, curve=ecdsa.NIST256p,
hashfunc=hashlib.sha256)
digest = binascii.unhexlify(digest)
result = sk.sign_digest_deterministic(digest, hashfunc=hashlib.sha256,
sigencode=sig_encode)
log.debug('result: %r', result)
return result
def pksign(keygrip, digest, algo):
"""Sign a message digest using a private EC key."""
assert algo == '8'
pubkey = decode.load_public_key(keyring.export_public_key(user_id=None),
use_custom=True)
f = encode.Factory.from_public_key(pubkey=pubkey,
user_id=pubkey['user_id'])
with contextlib.closing(f):
assert f.pubkey.keygrip == binascii.unhexlify(keygrip)
r, s = f.conn.sign(binascii.unhexlify(digest))
result = sig_encode(r, s)
log.debug('result: %r', result)
return result
def handle_connection(conn):
"""Handle connection from GPG binary using the ASSUAN protocol."""
keygrip = None
digest = None
algo = None
@ -70,7 +72,7 @@ def handle_connection(conn):
elif command == 'SETHASH':
algo, digest = args
elif command == 'PKSIGN':
sig = pksign(keygrip, digest)
sig = pksign(keygrip, digest, algo)
conn.sendall('D ' + sig)
else:
log.error('unknown request: %r', line)
@ -80,7 +82,8 @@ def handle_connection(conn):
def main():
logging.basicConfig(level=logging.DEBUG,
"""Run a simple GPG-agent server."""
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(levelname)-10s %(message)s')
sock_path = os.path.expanduser('~/.gnupg/S.gpg-agent')

View File

@ -38,8 +38,7 @@ class HardwareSigner(object):
ecdsa_curve_name=self.curve_name)
assert result.signature[:1] == b'\x00'
sig = result.signature[1:]
return (proto.mpi(util.bytes2num(sig[:32])) +
proto.mpi(util.bytes2num(sig[32:])))
return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:]))
def close(self):
"""Close the connection to the device."""
@ -57,9 +56,8 @@ class AgentSigner(object):
def sign(self, digest):
"""Sign the digest and return an ECDSA/RSA/DSA signature."""
params = keyring.sign_digest(sock=self.sock,
keygrip=self.keygrip, digest=digest)
return b''.join(proto.mpi(p) for p in params)
return keyring.sign_digest(sock=self.sock,
keygrip=self.keygrip, digest=digest)
def close(self):
"""Close the connection to gpg-agent."""

View File

@ -23,6 +23,7 @@ def connect_to_agent(sock_path='~/.gnupg/S.gpg-agent', sp=subprocess):
def communicate(sock, msg):
"""Send a message and receive a single line."""
msg += '\n'
sock.sendall(msg.encode('ascii'))
log.debug('-> %r', msg)
@ -30,6 +31,7 @@ def communicate(sock, msg):
def recvline(sock):
"""Receive a single line from the socket."""
reply = io.BytesIO()
while True:
@ -127,10 +129,10 @@ def sign_digest(sock, keygrip, digest, sp=subprocess, environ=None):
assert communicate(sock, 'SIGKEY {}'.format(keygrip)) == b'OK'
hex_digest = binascii.hexlify(digest).upper().decode('ascii')
assert communicate(sock, 'SETHASH {} {}'.format(hash_algo,
hex_digest)) == b'OK'
hex_digest)) == b'OK'
assert communicate(sock, 'SETKEYDESC '
'Sign+a+new+TREZOR-based+subkey') == b'OK'
'Sign+a+new+TREZOR-based+subkey') == b'OK'
assert communicate(sock, 'PKSIGN') == b'OK'
line = recvline(sock).strip()
line = unescape(line)

View File

@ -77,17 +77,54 @@ def _serialize_ed25519(vk):
util.bytes2num(vk.to_bytes()))
def _compute_keygrip(params):
exp = ''.join('(1:{}{}:{})'.format(name, len(value), value)
for name, value in params)
return hashlib.sha1(exp).digest()
def _keygrip_nist256(vk):
curve = vk.curve.curve
gen = vk.curve.generator
g = (4 << 512) | (gen.x() << 256) | gen.y()
point = vk.pubkey.point
q = (4 << 512) | (point.x() << 256) | point.y()
return _compute_keygrip([
['p', util.num2bytes(curve.p(), size=32)],
['a', util.num2bytes(curve.a() % curve.p(), size=32)],
['b', util.num2bytes(curve.b() % curve.p(), size=32)],
['g', util.num2bytes(g, size=65)],
['n', util.num2bytes(vk.curve.order, size=32)],
['q', util.num2bytes(q, size=65)],
])
def _keygrip_ed25519(vk):
# pylint: disable=line-too-long
return _compute_keygrip([
['p', util.num2bytes(0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFED, size=32)], # nopep8
['a', b'\x01'],
['b', util.num2bytes(0x2DFC9311D490018C7338BF8688861767FF8FF5B2BEBE27548A14B235ECA6874A, size=32)], # nopep8
['g', util.num2bytes(0x04216936D3CD6E53FEC0A4E231FDD6DC5C692CC7609525A7B2C9562D608F25D51A6666666666666666666666666666666666666666666666666666666666666658, size=65)], # nopep8
['n', util.num2bytes(0x1000000000000000000000000000000014DEF9DEA2F79CD65812631A5CF5D3ED, size=32)], # nopep8
['q', vk.to_bytes()],
])
SUPPORTED_CURVES = {
formats.CURVE_NIST256: {
# https://tools.ietf.org/html/rfc6637#section-11
'oid': b'\x2A\x86\x48\xCE\x3D\x03\x01\x07',
'algo_id': 19,
'serialize': _serialize_nist256
'serialize': _serialize_nist256,
'keygrip': _keygrip_nist256,
},
formats.CURVE_ED25519: {
'oid': b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01',
'algo_id': 22,
'serialize': _serialize_ed25519
'serialize': _serialize_ed25519,
'keygrip': _keygrip_ed25519,
}
}
@ -111,6 +148,7 @@ class PublicKey(object):
self.created = int(created) # time since Epoch
self.verifying_key = verifying_key
self.algo_id = self.curve_info['algo_id']
self.keygrip = self.curve_info['keygrip'](verifying_key)
hex_key_id = util.hexlify(self.key_id())[-8:]
self.desc = 'GPG public key {}/{}'.format(curve_name, hex_key_id)
@ -175,7 +213,8 @@ def make_signature(signer_func, data_to_sign, public_algo,
log.debug('hashing %d bytes', len(data_to_hash))
digest = hashlib.sha256(data_to_hash).digest()
log.debug('signing digest: %s', util.hexlify(digest))
sig = signer_func(digest=digest)
params = signer_func(digest=digest)
sig = b''.join(mpi(p) for p in params)
return bytes(header + hashed + unhashed +
digest[:2] + # used for decoder's sanity check