gpg: pydocstyle fixes

This commit is contained in:
Roman Zeyde 2016-04-24 12:22:02 +03:00
parent a114242243
commit d7913a84d5
6 changed files with 76 additions and 19 deletions

View File

@ -0,0 +1,9 @@
"""
TREZOR support for ECDSA GPG signatures.
See these links for more details:
- https://www.gnupg.org/faq/whats-new-in-2.1.html
- https://tools.ietf.org/html/rfc4880
- https://tools.ietf.org/html/rfc6637
- https://tools.ietf.org/html/draft-irtf-cfrg-eddsa-05
"""

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python
"""Check GPG v2 signature for a given public key."""
import argparse
import base64
import io
@ -11,6 +12,7 @@ log = logging.getLogger(__name__)
def original_data(filename):
"""Locate and load original file data, whose signature is provided."""
parts = filename.rsplit('.', 1)
if len(parts) == 2 and parts[1] in ('sig', 'asc'):
log.debug('loading file %s', parts[0])
@ -21,6 +23,7 @@ def verify(pubkey, sig_file):
d = open(sig_file, 'rb')
if d.name.endswith('.asc'):
lines = d.readlines()[3:-1]
"""Verify correctness of public key and signature."""
data = base64.b64decode(''.join(lines))
payload, checksum = data[:-3], data[-3:]
assert util.crc24(payload) == checksum
@ -33,6 +36,7 @@ def verify(pubkey, sig_file):
def main():
"""Main function."""
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(levelname)-10s %(message)s')
p = argparse.ArgumentParser()

View File

@ -1,3 +1,4 @@
"""Decoders for GPG v2 data structures."""
import binascii
import contextlib
import hashlib
@ -14,31 +15,39 @@ log = logging.getLogger(__name__)
def bit(value, i):
"""Extract the i-th bit out of value."""
return 1 if value & (1 << i) else 0
def low_bits(value, n):
"""Extract the lowest n bits out of value."""
return value & ((1 << n) - 1)
def readfmt(stream, fmt):
"""Read and unpack an object from stream, using a struct format string."""
size = struct.calcsize(fmt)
blob = stream.read(size)
return struct.unpack(fmt, blob)
class Reader(object):
"""Read basic type objects out of given stream."""
def __init__(self, stream):
"""Create a non-capturing reader."""
self.s = stream
self._captured = None
def readfmt(self, fmt):
"""Read a specified object, using a struct format string."""
size = struct.calcsize(fmt)
blob = self.read(size)
obj, = struct.unpack(fmt, blob)
return obj
def read(self, size=None):
"""Read `size` bytes from stream."""
blob = self.s.read(size)
if size is not None and len(blob) < size:
raise EOFError
@ -48,6 +57,7 @@ class Reader(object):
@contextlib.contextmanager
def capture(self, stream):
"""Capture all data read during this context."""
self._captured = stream
try:
yield
@ -58,6 +68,7 @@ length_types = {0: '>B', 1: '>H', 2: '>L'}
def parse_subpackets(s):
"""See https://tools.ietf.org/html/rfc4880#section-5.2.3.1 for details."""
subpackets = []
total_size = s.readfmt('>H')
data = s.read(total_size)
@ -75,12 +86,18 @@ def parse_subpackets(s):
def parse_mpi(s):
"""See https://tools.ietf.org/html/rfc4880#section-3.2 for details."""
bits = s.readfmt('>H')
blob = bytearray(s.read(int((bits + 7) // 8)))
return sum(v << (8 * i) for i, v in enumerate(reversed(blob)))
def split_bits(value, *bits):
"""
Split integer value into list of ints, according to `bits` list.
For example, split_bits(0x1234, 4, 8, 4) == [0x1, 0x23, 0x4]
"""
result = []
for b in reversed(bits):
mask = (1 << b) - 1
@ -125,11 +142,13 @@ SUPPORTED_CURVES = {
class Parser(object):
"""Parse GPG packets from a given stream."""
def __init__(self, stream, to_hash=None):
"""Create an empty parser."""
self.stream = stream
self.packet_types = {
2: self.signature,
4: self.onepass,
6: self.pubkey,
11: self.literal,
13: self.user_id,
@ -139,21 +158,11 @@ class Parser(object):
self.to_hash.write(to_hash)
def __iter__(self):
"""Support iterative parsing of available GPG packets."""
return self
def onepass(self, stream):
# pylint: disable=no-self-use
p = {'type': 'onepass'}
p['version'] = stream.readfmt('B')
p['sig_type'] = stream.readfmt('B')
p['hash_alg'] = stream.readfmt('B')
p['pubkey_alg'] = stream.readfmt('B')
p['key_id'] = stream.readfmt('8s')
p['nested'] = stream.readfmt('B')
assert not stream.read()
return p
def literal(self, stream):
"""See https://tools.ietf.org/html/rfc4880#section-5.9 for details."""
p = {'type': 'literal'}
p['format'] = stream.readfmt('c')
filename_len = stream.readfmt('B')
@ -164,6 +173,7 @@ class Parser(object):
return p
def signature(self, stream):
"""See https://tools.ietf.org/html/rfc4880#section-5.2 for details."""
p = {'type': 'signature'}
to_hash = io.BytesIO()
@ -195,6 +205,7 @@ class Parser(object):
return p
def pubkey(self, stream):
"""See https://tools.ietf.org/html/rfc4880#section-5.5 for details."""
p = {'type': 'pubkey'}
packet = io.BytesIO()
with stream.capture(packet):
@ -224,14 +235,15 @@ class Parser(object):
return p
def user_id(self, stream):
"""See https://tools.ietf.org/html/rfc4880#section-5.11 for details."""
value = stream.read()
self.to_hash.write(b'\xb4' + struct.pack('>L', len(value)))
self.to_hash.write(value)
return {'type': 'user_id', 'value': value}
def __next__(self):
"""See https://tools.ietf.org/html/rfc4880#section-4.2 for details."""
try:
# https://tools.ietf.org/html/rfc4880#section-4.2
value = self.stream.readfmt('B')
except EOFError:
raise StopIteration
@ -252,7 +264,7 @@ class Parser(object):
if packet_type:
p = packet_type(Reader(io.BytesIO(packet_data)))
else:
p = {'type': 'UNKNOWN'}
raise ValueError('Unknown packet type: {}'.format(packet_type))
p['tag'] = tag
log.debug('packet "%s": %s', p['type'], p)
return p
@ -261,6 +273,7 @@ class Parser(object):
def load_public_key(stream):
"""Parse and validate GPG public key from an input stream."""
parser = Parser(Reader(stream))
pubkey, userid, signature = list(parser)
log.debug('loaded public key "%s"', userid['value'])
@ -270,6 +283,7 @@ def load_public_key(stream):
def verify_digest(pubkey, digest, signature, label):
"""Verify a digest signature from a specified public key."""
verifier = pubkey['verifier']
try:
verifier(signature, digest)

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python
"""A simple wrapper for Git commit/tag GPG signing."""
import logging
import subprocess as sp
import sys
@ -9,6 +10,7 @@ log = logging.getLogger(__name__)
def main():
"""Main function."""
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(levelname)-10s %(message)s')

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python
"""Create GPG ECDSA signatures and public keys using TREZOR device."""
import argparse
import base64
import binascii
@ -16,10 +17,12 @@ log = logging.getLogger(__name__)
def prefix_len(fmt, blob):
"""Prefix `blob` with its size, serialized using `fmt` format."""
return struct.pack(fmt, len(blob)) + blob
def packet(tag, blob):
"""Create small GPG packet."""
assert len(blob) < 256
length_type = 0 # : 1 byte for length
leading_byte = 0x80 | (tag << 2) | (length_type)
@ -27,28 +30,34 @@ def packet(tag, blob):
def subpacket(subpacket_type, fmt, *values):
"""Create GPG subpacket."""
blob = struct.pack(fmt, *values) if values else fmt
return struct.pack('>B', subpacket_type) + blob
def subpacket_long(subpacket_type, value):
"""Create GPG subpacket with 32-bit unsigned integer."""
return subpacket(subpacket_type, '>L', value)
def subpacket_time(value):
"""Create GPG subpacket with time in seconds (since Epoch)."""
return subpacket_long(2, value)
def subpacket_byte(subpacket_type, value):
"""Create GPG subpacket with 8-bit unsigned integer."""
return subpacket(subpacket_type, '>B', value)
def subpackets(*items):
"""Serialize several GPG subpackets."""
prefixed = [prefix_len('>B', item) for item in items]
return prefix_len('>H', b''.join(prefixed))
def mpi(value):
"""Serialize multipresicion integer using GPG format."""
bits = value.bit_length()
data_size = (bits + 7) // 8
data_bytes = [0] * data_size
@ -61,10 +70,12 @@ def mpi(value):
def time_format(t):
"""Utility for consistent time formatting."""
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t))
def hexlify(blob):
"""Utility for consistent hexadecimal formatting."""
return binascii.hexlify(blob).decode('ascii').upper()
@ -94,15 +105,17 @@ SUPPORTED_CURVES = {
}
def find_curve_by_algo_id(algo_id):
def _find_curve_by_algo_id(algo_id):
curve_name, = [name for name, info in SUPPORTED_CURVES.items()
if info['algo_id'] == algo_id]
return curve_name
class Signer(object):
"""Performs GPG operations with the TREZOR."""
def __init__(self, user_id, created, curve_name):
"""Construct and loads a public key from the device."""
self.user_id = user_id
assert curve_name in formats.SUPPORTED_CURVES
self.curve_name = curve_name
@ -126,9 +139,15 @@ class Signer(object):
@classmethod
def from_public_key(cls, pubkey, user_id):
"""
Create from an existing GPG public key.
`pubkey` should be loaded via `load_from_gpg(user_id)`
from the local GPG keyring.
"""
s = Signer(user_id=user_id,
created=pubkey['created'],
curve_name=find_curve_by_algo_id(pubkey['algo']))
curve_name=_find_curve_by_algo_id(pubkey['algo']))
assert s.key_id() == pubkey['key_id']
return s
@ -149,16 +168,20 @@ class Signer(object):
return hashlib.sha1(self._pubkey_data_to_hash()).digest()
def key_id(self):
"""Short (8 byte) GPG key ID."""
return self._fingerprint()[-8:]
def hex_short_key_id(self):
"""Short (8 hexadecimal digits) GPG key ID."""
return hexlify(self.key_id()[-4:])
def close(self):
"""Close connection and turn off the screen of the device."""
self.client_wrapper.connection.clear_session()
self.client_wrapper.connection.close()
def export(self):
"""Export GPG public key, ready for "gpg2 --import"."""
pubkey_packet = packet(tag=6, blob=self._pubkey_data())
user_id_packet = packet(tag=13, blob=self.user_id)
@ -180,6 +203,7 @@ class Signer(object):
return pubkey_packet + user_id_packet + sign_packet
def sign(self, msg, sign_time=None):
"""Sign GPG message at specified time."""
if sign_time is None:
sign_time = int(time.time())
@ -223,7 +247,7 @@ class Signer(object):
sig) # actual ECDSA signature
def split_lines(body, size):
def _split_lines(body, size):
lines = []
for i in range(0, len(body), size):
lines.append(body[i:i+size] + '\n')
@ -231,14 +255,16 @@ def split_lines(body, size):
def armor(blob, type_str):
"""See https://tools.ietf.org/html/rfc4880#section-6 for details."""
head = '-----BEGIN PGP {}-----\nVersion: GnuPG v2\n\n'.format(type_str)
body = base64.b64encode(blob)
checksum = base64.b64encode(util.crc24(blob))
tail = '-----END PGP {}-----\n'.format(type_str)
return head + split_lines(body, 64) + '=' + checksum + '\n' + tail
return head + _split_lines(body, 64) + '=' + checksum + '\n' + tail
def load_from_gpg(user_id):
"""Load existing GPG public key for `user_id` from local keyring."""
pubkey_bytes = subprocess.check_output(['gpg2', '--export', user_id])
if pubkey_bytes:
return decode.load_public_key(io.BytesIO(pubkey_bytes))
@ -248,6 +274,7 @@ def load_from_gpg(user_id):
def main():
"""Main function."""
p = argparse.ArgumentParser()
p.add_argument('user_id')
p.add_argument('filename', nargs='?')

View File

@ -78,6 +78,7 @@ def frame(*msgs):
def crc24(blob):
"""See https://tools.ietf.org/html/rfc4880#section-6.1 for details."""
CRC24_INIT = 0x0B704CE
CRC24_POLY = 0x1864CFB