From d7913a84d5cfe132957a8c1cd950892766066406 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Sun, 24 Apr 2016 12:22:02 +0300 Subject: [PATCH] gpg: pydocstyle fixes --- trezor_agent/gpg/__init__.py | 9 +++++++ trezor_agent/gpg/check.py | 4 +++ trezor_agent/gpg/decode.py | 44 ++++++++++++++++++++++----------- trezor_agent/gpg/git_wrapper.py | 2 ++ trezor_agent/gpg/signer.py | 35 +++++++++++++++++++++++--- trezor_agent/util.py | 1 + 6 files changed, 76 insertions(+), 19 deletions(-) diff --git a/trezor_agent/gpg/__init__.py b/trezor_agent/gpg/__init__.py index e69de29..f0448b4 100644 --- a/trezor_agent/gpg/__init__.py +++ b/trezor_agent/gpg/__init__.py @@ -0,0 +1,9 @@ +""" +TREZOR support for ECDSA GPG signatures. + +See these links for more details: + - https://www.gnupg.org/faq/whats-new-in-2.1.html + - https://tools.ietf.org/html/rfc4880 + - https://tools.ietf.org/html/rfc6637 + - https://tools.ietf.org/html/draft-irtf-cfrg-eddsa-05 +""" diff --git a/trezor_agent/gpg/check.py b/trezor_agent/gpg/check.py index 15d4ac6..f179582 100755 --- a/trezor_agent/gpg/check.py +++ b/trezor_agent/gpg/check.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +"""Check GPG v2 signature for a given public key.""" import argparse import base64 import io @@ -11,6 +12,7 @@ log = logging.getLogger(__name__) def original_data(filename): + """Locate and load original file data, whose signature is provided.""" parts = filename.rsplit('.', 1) if len(parts) == 2 and parts[1] in ('sig', 'asc'): log.debug('loading file %s', parts[0]) @@ -21,6 +23,7 @@ def verify(pubkey, sig_file): d = open(sig_file, 'rb') if d.name.endswith('.asc'): lines = d.readlines()[3:-1] + """Verify correctness of public key and signature.""" data = base64.b64decode(''.join(lines)) payload, checksum = data[:-3], data[-3:] assert util.crc24(payload) == checksum @@ -33,6 +36,7 @@ def verify(pubkey, sig_file): def main(): + """Main function.""" logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)-10s %(message)s') p = argparse.ArgumentParser() diff --git a/trezor_agent/gpg/decode.py b/trezor_agent/gpg/decode.py index 726b9e4..75fd030 100644 --- a/trezor_agent/gpg/decode.py +++ b/trezor_agent/gpg/decode.py @@ -1,3 +1,4 @@ +"""Decoders for GPG v2 data structures.""" import binascii import contextlib import hashlib @@ -14,31 +15,39 @@ log = logging.getLogger(__name__) def bit(value, i): + """Extract the i-th bit out of value.""" return 1 if value & (1 << i) else 0 def low_bits(value, n): + """Extract the lowest n bits out of value.""" return value & ((1 << n) - 1) def readfmt(stream, fmt): + """Read and unpack an object from stream, using a struct format string.""" size = struct.calcsize(fmt) blob = stream.read(size) return struct.unpack(fmt, blob) class Reader(object): + """Read basic type objects out of given stream.""" + def __init__(self, stream): + """Create a non-capturing reader.""" self.s = stream self._captured = None def readfmt(self, fmt): + """Read a specified object, using a struct format string.""" size = struct.calcsize(fmt) blob = self.read(size) obj, = struct.unpack(fmt, blob) return obj def read(self, size=None): + """Read `size` bytes from stream.""" blob = self.s.read(size) if size is not None and len(blob) < size: raise EOFError @@ -48,6 +57,7 @@ class Reader(object): @contextlib.contextmanager def capture(self, stream): + """Capture all data read during this context.""" self._captured = stream try: yield @@ -58,6 +68,7 @@ length_types = {0: '>B', 1: '>H', 2: '>L'} def parse_subpackets(s): + """See https://tools.ietf.org/html/rfc4880#section-5.2.3.1 for details.""" subpackets = [] total_size = s.readfmt('>H') data = s.read(total_size) @@ -75,12 +86,18 @@ def parse_subpackets(s): def parse_mpi(s): + """See https://tools.ietf.org/html/rfc4880#section-3.2 for details.""" bits = s.readfmt('>H') blob = bytearray(s.read(int((bits + 7) // 8))) return sum(v << (8 * i) for i, v in enumerate(reversed(blob))) def split_bits(value, *bits): + """ + Split integer value into list of ints, according to `bits` list. + + For example, split_bits(0x1234, 4, 8, 4) == [0x1, 0x23, 0x4] + """ result = [] for b in reversed(bits): mask = (1 << b) - 1 @@ -125,11 +142,13 @@ SUPPORTED_CURVES = { class Parser(object): + """Parse GPG packets from a given stream.""" + def __init__(self, stream, to_hash=None): + """Create an empty parser.""" self.stream = stream self.packet_types = { 2: self.signature, - 4: self.onepass, 6: self.pubkey, 11: self.literal, 13: self.user_id, @@ -139,21 +158,11 @@ class Parser(object): self.to_hash.write(to_hash) def __iter__(self): + """Support iterative parsing of available GPG packets.""" return self - def onepass(self, stream): - # pylint: disable=no-self-use - p = {'type': 'onepass'} - p['version'] = stream.readfmt('B') - p['sig_type'] = stream.readfmt('B') - p['hash_alg'] = stream.readfmt('B') - p['pubkey_alg'] = stream.readfmt('B') - p['key_id'] = stream.readfmt('8s') - p['nested'] = stream.readfmt('B') - assert not stream.read() - return p - def literal(self, stream): + """See https://tools.ietf.org/html/rfc4880#section-5.9 for details.""" p = {'type': 'literal'} p['format'] = stream.readfmt('c') filename_len = stream.readfmt('B') @@ -164,6 +173,7 @@ class Parser(object): return p def signature(self, stream): + """See https://tools.ietf.org/html/rfc4880#section-5.2 for details.""" p = {'type': 'signature'} to_hash = io.BytesIO() @@ -195,6 +205,7 @@ class Parser(object): return p def pubkey(self, stream): + """See https://tools.ietf.org/html/rfc4880#section-5.5 for details.""" p = {'type': 'pubkey'} packet = io.BytesIO() with stream.capture(packet): @@ -224,14 +235,15 @@ class Parser(object): return p def user_id(self, stream): + """See https://tools.ietf.org/html/rfc4880#section-5.11 for details.""" value = stream.read() self.to_hash.write(b'\xb4' + struct.pack('>L', len(value))) self.to_hash.write(value) return {'type': 'user_id', 'value': value} def __next__(self): + """See https://tools.ietf.org/html/rfc4880#section-4.2 for details.""" try: - # https://tools.ietf.org/html/rfc4880#section-4.2 value = self.stream.readfmt('B') except EOFError: raise StopIteration @@ -252,7 +264,7 @@ class Parser(object): if packet_type: p = packet_type(Reader(io.BytesIO(packet_data))) else: - p = {'type': 'UNKNOWN'} + raise ValueError('Unknown packet type: {}'.format(packet_type)) p['tag'] = tag log.debug('packet "%s": %s', p['type'], p) return p @@ -261,6 +273,7 @@ class Parser(object): def load_public_key(stream): + """Parse and validate GPG public key from an input stream.""" parser = Parser(Reader(stream)) pubkey, userid, signature = list(parser) log.debug('loaded public key "%s"', userid['value']) @@ -270,6 +283,7 @@ def load_public_key(stream): def verify_digest(pubkey, digest, signature, label): + """Verify a digest signature from a specified public key.""" verifier = pubkey['verifier'] try: verifier(signature, digest) diff --git a/trezor_agent/gpg/git_wrapper.py b/trezor_agent/gpg/git_wrapper.py index 24f85bf..7b6d9a2 100755 --- a/trezor_agent/gpg/git_wrapper.py +++ b/trezor_agent/gpg/git_wrapper.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +"""A simple wrapper for Git commit/tag GPG signing.""" import logging import subprocess as sp import sys @@ -9,6 +10,7 @@ log = logging.getLogger(__name__) def main(): + """Main function.""" logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)-10s %(message)s') diff --git a/trezor_agent/gpg/signer.py b/trezor_agent/gpg/signer.py index 4480615..b77a626 100755 --- a/trezor_agent/gpg/signer.py +++ b/trezor_agent/gpg/signer.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +"""Create GPG ECDSA signatures and public keys using TREZOR device.""" import argparse import base64 import binascii @@ -16,10 +17,12 @@ log = logging.getLogger(__name__) def prefix_len(fmt, blob): + """Prefix `blob` with its size, serialized using `fmt` format.""" return struct.pack(fmt, len(blob)) + blob def packet(tag, blob): + """Create small GPG packet.""" assert len(blob) < 256 length_type = 0 # : 1 byte for length leading_byte = 0x80 | (tag << 2) | (length_type) @@ -27,28 +30,34 @@ def packet(tag, blob): def subpacket(subpacket_type, fmt, *values): + """Create GPG subpacket.""" blob = struct.pack(fmt, *values) if values else fmt return struct.pack('>B', subpacket_type) + blob def subpacket_long(subpacket_type, value): + """Create GPG subpacket with 32-bit unsigned integer.""" return subpacket(subpacket_type, '>L', value) def subpacket_time(value): + """Create GPG subpacket with time in seconds (since Epoch).""" return subpacket_long(2, value) def subpacket_byte(subpacket_type, value): + """Create GPG subpacket with 8-bit unsigned integer.""" return subpacket(subpacket_type, '>B', value) def subpackets(*items): + """Serialize several GPG subpackets.""" prefixed = [prefix_len('>B', item) for item in items] return prefix_len('>H', b''.join(prefixed)) def mpi(value): + """Serialize multipresicion integer using GPG format.""" bits = value.bit_length() data_size = (bits + 7) // 8 data_bytes = [0] * data_size @@ -61,10 +70,12 @@ def mpi(value): def time_format(t): + """Utility for consistent time formatting.""" return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t)) def hexlify(blob): + """Utility for consistent hexadecimal formatting.""" return binascii.hexlify(blob).decode('ascii').upper() @@ -94,15 +105,17 @@ SUPPORTED_CURVES = { } -def find_curve_by_algo_id(algo_id): +def _find_curve_by_algo_id(algo_id): curve_name, = [name for name, info in SUPPORTED_CURVES.items() if info['algo_id'] == algo_id] return curve_name class Signer(object): + """Performs GPG operations with the TREZOR.""" def __init__(self, user_id, created, curve_name): + """Construct and loads a public key from the device.""" self.user_id = user_id assert curve_name in formats.SUPPORTED_CURVES self.curve_name = curve_name @@ -126,9 +139,15 @@ class Signer(object): @classmethod def from_public_key(cls, pubkey, user_id): + """ + Create from an existing GPG public key. + + `pubkey` should be loaded via `load_from_gpg(user_id)` + from the local GPG keyring. + """ s = Signer(user_id=user_id, created=pubkey['created'], - curve_name=find_curve_by_algo_id(pubkey['algo'])) + curve_name=_find_curve_by_algo_id(pubkey['algo'])) assert s.key_id() == pubkey['key_id'] return s @@ -149,16 +168,20 @@ class Signer(object): return hashlib.sha1(self._pubkey_data_to_hash()).digest() def key_id(self): + """Short (8 byte) GPG key ID.""" return self._fingerprint()[-8:] def hex_short_key_id(self): + """Short (8 hexadecimal digits) GPG key ID.""" return hexlify(self.key_id()[-4:]) def close(self): + """Close connection and turn off the screen of the device.""" self.client_wrapper.connection.clear_session() self.client_wrapper.connection.close() def export(self): + """Export GPG public key, ready for "gpg2 --import".""" pubkey_packet = packet(tag=6, blob=self._pubkey_data()) user_id_packet = packet(tag=13, blob=self.user_id) @@ -180,6 +203,7 @@ class Signer(object): return pubkey_packet + user_id_packet + sign_packet def sign(self, msg, sign_time=None): + """Sign GPG message at specified time.""" if sign_time is None: sign_time = int(time.time()) @@ -223,7 +247,7 @@ class Signer(object): sig) # actual ECDSA signature -def split_lines(body, size): +def _split_lines(body, size): lines = [] for i in range(0, len(body), size): lines.append(body[i:i+size] + '\n') @@ -231,14 +255,16 @@ def split_lines(body, size): def armor(blob, type_str): + """See https://tools.ietf.org/html/rfc4880#section-6 for details.""" head = '-----BEGIN PGP {}-----\nVersion: GnuPG v2\n\n'.format(type_str) body = base64.b64encode(blob) checksum = base64.b64encode(util.crc24(blob)) tail = '-----END PGP {}-----\n'.format(type_str) - return head + split_lines(body, 64) + '=' + checksum + '\n' + tail + return head + _split_lines(body, 64) + '=' + checksum + '\n' + tail def load_from_gpg(user_id): + """Load existing GPG public key for `user_id` from local keyring.""" pubkey_bytes = subprocess.check_output(['gpg2', '--export', user_id]) if pubkey_bytes: return decode.load_public_key(io.BytesIO(pubkey_bytes)) @@ -248,6 +274,7 @@ def load_from_gpg(user_id): def main(): + """Main function.""" p = argparse.ArgumentParser() p.add_argument('user_id') p.add_argument('filename', nargs='?') diff --git a/trezor_agent/util.py b/trezor_agent/util.py index 350114b..7e96428 100644 --- a/trezor_agent/util.py +++ b/trezor_agent/util.py @@ -78,6 +78,7 @@ def frame(*msgs): def crc24(blob): + """See https://tools.ietf.org/html/rfc4880#section-6.1 for details.""" CRC24_INIT = 0x0B704CE CRC24_POLY = 0x1864CFB