gpg: handle multiple packets

nistp521
Roman Zeyde 9 years ago
parent ab192619f4
commit 3bf926620b

@ -8,6 +8,7 @@ import subprocess
import ecdsa
import ed25519
from . import proto
from .. import util
log = logging.getLogger(__name__)
@ -134,6 +135,8 @@ def _parse_signature(stream):
log.info('embedded sigs: %s', embedded)
p['embedded'] = embedded
p['_is_custom'] = (proto.CUSTOM_SUBPACKET in p['unhashed_subpackets'])
p['hash_prefix'] = stream.readfmt('2s')
if p['pubkey_alg'] in ECDSA_ALGO_IDS:
p['sig'] = (parse_mpi(stream), parse_mpi(stream))
@ -273,23 +276,28 @@ def digest_packets(packets):
return hashlib.sha256(data_to_hash.getvalue()).digest()
def load_public_key(stream):
def load_public_key(stream, use_custom=False):
"""Parse and validate GPG public key from an input stream."""
packets = list(parse_packets(util.Reader(stream)))
subkey = subsig = None
if len(packets) == 5:
pubkey, userid, signature, subkey, subsig = packets
log.debug('subkey: %s', subkey)
log.debug('subsig: %s', subsig)
else:
pubkey, userid, signature = packets
pubkey, userid, signature = packets[:3]
packets = packets[3:]
digest = digest_packets([pubkey, userid, signature])
assert signature['hash_prefix'] == digest[:2]
log.debug('loaded public key "%s"', userid['value'])
verify_digest(pubkey=pubkey, digest=digest,
signature=signature['sig'], label='GPG public key')
return subkey or pubkey
packet = pubkey
while use_custom:
if packet['type'] in ('pubkey', 'subkey') and signature['_is_custom']:
log.debug('found custom %s', packet['type'])
break
packet, signature = packets[:2]
packets = packets[2:]
return packet
def load_signature(stream, original_data):
@ -300,11 +308,11 @@ def load_signature(stream, original_data):
return signature, digest
def load_from_gpg(user_id):
def load_from_gpg(user_id, use_custom=False):
"""Load existing GPG public key for `user_id` from local keyring."""
pubkey_bytes = subprocess.check_output(['gpg2', '--export', user_id])
if pubkey_bytes:
return load_public_key(io.BytesIO(pubkey_bytes))
return load_public_key(io.BytesIO(pubkey_bytes), use_custom=use_custom)
else:
log.error('could not find public key %r in local GPG keyring', user_id)
raise KeyError(user_id)

@ -163,7 +163,7 @@ class Signer(object):
proto.subpacket_byte(0x17, 0x80)] # key server prefs (no-modify)
unhashed_subpackets = [
proto.subpacket(16, self.pubkey.key_id()), # issuer key id
proto.MARKER_SUBPACKET]
proto.CUSTOM_SUBPACKET]
signature = _make_signature(
signer_func=self.conn.sign,
@ -204,7 +204,7 @@ class Signer(object):
unhashed_subpackets = [
proto.subpacket(16, primary['key_id']), # issuer key id
proto.subpacket(32, embedded_sig),
proto.MARKER_SUBPACKET]
proto.CUSTOM_SUBPACKET]
gpg_agent = AgentSigner(self.user_id)
signature = _make_signature(signer_func=gpg_agent.sign,
data_to_sign=data_to_sign,

@ -22,7 +22,7 @@ def main():
command = args[0]
user_id = ' '.join(args[1:])
assert command == '-bsau' # --detach-sign --sign --armor --local-user
pubkey = decode.load_from_gpg(user_id)
pubkey = decode.load_from_gpg(user_id, use_custom=True)
s = encode.Signer.from_public_key(user_id=user_id, pubkey=pubkey)
data = sys.stdin.read()

@ -0,0 +1,100 @@
"""GPG protocol utilities."""
import logging
import struct
from .. import formats, util
log = logging.getLogger(__name__)
def packet(tag, blob):
"""Create small GPG packet."""
assert len(blob) < 2**32
if len(blob) < 2**8:
length_type = 0
elif len(blob) < 2**16:
length_type = 1
else:
length_type = 2
fmt = ['>B', '>H', '>L'][length_type]
leading_byte = 0x80 | (tag << 2) | (length_type)
return struct.pack('>B', leading_byte) + util.prefix_len(fmt, blob)
def subpacket(subpacket_type, fmt, *values):
"""Create GPG subpacket."""
blob = struct.pack(fmt, *values) if values else fmt
return struct.pack('>B', subpacket_type) + blob
def subpacket_long(subpacket_type, value):
"""Create GPG subpacket with 32-bit unsigned integer."""
return subpacket(subpacket_type, '>L', value)
def subpacket_time(value):
"""Create GPG subpacket with time in seconds (since Epoch)."""
return subpacket_long(2, value)
def subpacket_byte(subpacket_type, value):
"""Create GPG subpacket with 8-bit unsigned integer."""
return subpacket(subpacket_type, '>B', value)
def subpackets(*items):
"""Serialize several GPG subpackets."""
prefixed = [util.prefix_len('>B', item) for item in items]
return util.prefix_len('>H', b''.join(prefixed))
def mpi(value):
"""Serialize multipresicion integer using GPG format."""
bits = value.bit_length()
data_size = (bits + 7) // 8
data_bytes = bytearray(data_size)
for i in range(data_size):
data_bytes[i] = value & 0xFF
value = value >> 8
data_bytes.reverse()
return struct.pack('>H', bits) + bytes(data_bytes)
def _serialize_nist256(vk):
return mpi((4 << 512) |
(vk.pubkey.point.x() << 256) |
(vk.pubkey.point.y()))
def _serialize_ed25519(vk):
return mpi((0x40 << 256) |
util.bytes2num(vk.to_bytes()))
SUPPORTED_CURVES = {
formats.CURVE_NIST256: {
# https://tools.ietf.org/html/rfc6637#section-11
'oid': b'\x2A\x86\x48\xCE\x3D\x03\x01\x07',
'algo_id': 19,
'serialize': _serialize_nist256
},
formats.CURVE_ED25519: {
'oid': b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01',
'algo_id': 22,
'serialize': _serialize_ed25519
}
}
CUSTOM_SUBPACKET = subpacket(100, b'TREZOR-GPG') # marks "our" pubkey
def find_curve_by_algo_id(algo_id):
"""Find curve name that matches a public key algorith ID."""
curve_name, = [name for name, info in SUPPORTED_CURVES.items()
if info['algo_id'] == algo_id]
return curve_name

@ -61,7 +61,7 @@ def main():
else:
_open_output(filename).write(pubkey)
else:
pubkey = decode.load_from_gpg(user_id)
pubkey = decode.load_from_gpg(user_id, use_custom=True)
s = encode.Signer.from_public_key(pubkey=pubkey, user_id=user_id)
data = open(args.filename, 'rb').read()
sig, ext = s.sign(data), '.sig'

Loading…
Cancel
Save