From 201e6510f80646179dbe4ccf63b3696f57699623 Mon Sep 17 00:00:00 2001 From: scito Date: Thu, 29 Dec 2022 21:29:20 +0100 Subject: [PATCH] add type hints (Python 3.11) --- .github/workflows/ci.yml | 2 +- README.md | 4 +- conftest.py | 5 +- extract_otp_secret_keys.py | 105 +++++++++-------- protobuf_generated_python/google_auth_pb2.pyi | 108 ++++++++++++++++++ test_extract_otp_secret_keys_pytest.py | 74 ++++++------ test_extract_otp_secret_keys_unittest.py | 30 ++--- test_extract_qrcode_unittest.py | 14 +-- upgrade_deps.sh | 22 +++- utils.py | 43 +++---- 10 files changed, 272 insertions(+), 135 deletions(-) create mode 100644 protobuf_generated_python/google_auth_pb2.pyi diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 580a262..8edd517 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -47,7 +47,7 @@ jobs: flake8 . --count --exit-zero --max-complexity=10 --max-line-length=200 --statistics - name: Type checking with mypy run: | - mypy *.py + mypy --strict *.py if: matrix.python-version == "3.x" - name: Test with pytest run: pytest diff --git a/README.md b/README.md index 6e97788..62dd072 100644 --- a/README.md +++ b/README.md @@ -239,10 +239,12 @@ The data parameter is a base64 encoded proto3 message (Google Protocol Buffers). Command for regeneration of Python code from proto3 message definition file (only necessary in case of changes of the proto3 message definition or new protobuf versions): - protoc --python_out=protobuf_generated_python google_auth.proto + protoc --python_out=protobuf_generated_python google_auth.proto --mypy_out=protobuf_generated_python The generated protobuf Python code was generated by protoc 21.12 (https://github.com/protocolbuffers/protobuf/releases/tag/v21.12). +https://github.com/nipunn1313/mypy-protobuf + ## References * Proto3 documentation: https://developers.google.com/protocol-buffers/docs/pythontutorial diff --git a/conftest.py b/conftest.py index f17de12..46c17f3 100644 --- a/conftest.py +++ b/conftest.py @@ -1,10 +1,11 @@ import pytest +from typing import Any -def pytest_addoption(parser): +def pytest_addoption(parser: pytest.Parser) -> None: parser.addoption("--relaxed", action='store_true', help="run tests in relaxed mode") @pytest.fixture -def relaxed(request): +def relaxed(request: pytest.FixtureRequest) -> Any: return request.config.getoption("--relaxed") diff --git a/extract_otp_secret_keys.py b/extract_otp_secret_keys.py index c1adbf5..5bbc445 100644 --- a/extract_otp_secret_keys.py +++ b/extract_otp_secret_keys.py @@ -52,15 +52,16 @@ import sys import urllib.parse as urlparse from enum import Enum from operator import add +from typing import TextIO, Any, TypedDict from qrcode import QRCode # type: ignore -import protobuf_generated_python.google_auth_pb2 # type: ignore +import protobuf_generated_python.google_auth_pb2 as migration_protobuf try: import cv2 # type: ignore - import numpy # type: ignore + import numpy try: import pyzbar.pyzbar as zbar # type: ignore @@ -75,8 +76,17 @@ Exception: {e}""") except ImportError: qreader_available = False -verbose: int -quiet: bool +# Types +Args = argparse.Namespace +OtpUrl = str +Otp = TypedDict('Otp', {'name': str, 'secret': str, 'issuer': str, 'type': str, 'counter': int | None, 'url': OtpUrl}) +Otps = list[Otp] +OtpUrls = list[OtpUrl] + + +# Global variable declaration +verbose: int = 0 +quiet: bool = False def sys_main() -> None: @@ -95,7 +105,7 @@ def main(sys_args: list[str]) -> None: write_json(args, otps) -def parse_args(sys_args: list[str]) -> argparse.Namespace: +def parse_args(sys_args: list[str]) -> Args: global verbose, quiet description_text = "Extracts one time password (OTP) secret keys from QR codes, e.g. from Google Authenticator app." if qreader_available: @@ -133,17 +143,17 @@ b) image file containing a QR code or = for stdin for an image containing a QR c return args -def extract_otps(args): +def extract_otps(args: Args) -> Otps: if not args.infile: return extract_otps_from_camera(args) else: return extract_otps_from_files(args) -def extract_otps_from_camera(args): +def extract_otps_from_camera(args: Args) -> Otps: if verbose: print("Capture QR codes from camera") - otp_urls = [] - otps = [] + otp_urls: OtpUrls = [] + otps: Otps = [] QRMode = Enum('QRMode', ['QREADER', 'DEEP_QREADER', 'CV2'], start=0) qr_mode = QRMode.QREADER @@ -215,7 +225,7 @@ def extract_otps_from_camera(args): return otps -def extract_otps_from_otp_url(otp_url, otp_urls, otps, args): +def extract_otps_from_otp_url(otp_url: str, otp_urls: OtpUrls, otps: Otps, args: Args) -> None: if otp_url and verbose: print(otp_url) if otp_url and otp_url not in otp_urls: otp_urls.append(otp_url) @@ -223,8 +233,8 @@ def extract_otps_from_otp_url(otp_url, otp_urls, otps, args): if verbose: print(f"{len(otps)} otp{'s'[:len(otps) != 1]} from {len(otp_urls)} QR code{'s'[:len(otp_urls) != 1]} extracted") -def extract_otps_from_files(args): - otps = [] +def extract_otps_from_files(args: Args) -> Otps: + otps: Otps = [] i = j = k = 0 if verbose: print(f"Input files: {args.infile}") @@ -240,7 +250,7 @@ def extract_otps_from_files(args): return otps -def get_otp_urls_from_file(filename): +def get_otp_urls_from_file(filename: str) -> OtpUrls: # stdin stream cannot be rewinded, thus distinguish, use - for utf-8 stdin and = for binary image stdin if filename != '=': check_file_exists(filename) @@ -255,7 +265,7 @@ def get_otp_urls_from_file(filename): return [] -def read_lines_from_text_file(filename): +def read_lines_from_text_file(filename: str) -> list[str]: if verbose: print(f"Reading lines of {filename}") finput = fileinput.input(filename) try: @@ -268,18 +278,18 @@ def read_lines_from_text_file(filename): lines.append(line) if not lines: eprint(f"WARN: {filename.replace('-', 'stdin')} is empty") - return lines except UnicodeDecodeError: if filename == '-': abort("\nERROR: Unable to open text file form stdin. " "In case you want read an image file from stdin, you must use '=' instead of '-'.") else: # The file is probably an image, process below - return None + return [] finally: finput.close() + return lines -def extract_otp_from_otp_url(otpauth_migration_url, otps, i, j, infile, args): +def extract_otp_from_otp_url(otpauth_migration_url: str, otps: Otps, i: int, j: int, infile: str, args: Args) -> int: payload = get_payload_from_otp_url(otpauth_migration_url, i, infile) # pylint: disable=no-member @@ -290,7 +300,7 @@ def extract_otp_from_otp_url(otpauth_migration_url, otps, i, j, infile, args): if verbose: print('OTP enum type:', get_enum_name_by_number(raw_otp, 'type')) otp_type = get_otp_type_str_from_code(raw_otp.type) otp_url = build_otp_url(secret, raw_otp) - otp = { + otp: Otp = { "name": raw_otp.name, "secret": secret, "issuer": raw_otp.issuer, @@ -311,7 +321,7 @@ def extract_otp_from_otp_url(otpauth_migration_url, otps, i, j, infile, args): return j -def convert_img_to_otp_url(filename): +def convert_img_to_otp_url(filename: str) -> OtpUrls: if verbose: print(f"Reading image {filename}") try: if filename != '=': @@ -321,7 +331,7 @@ def convert_img_to_otp_url(filename): stdin = sys.stdin.buffer.read() except AttributeError: # Workaround for pytest, since pytest cannot monkeypatch sys.stdin.buffer - stdin = sys.stdin.read() + stdin = sys.stdin.read() # type: ignore # Workaround for pytest fixtures if not stdin: eprint("WARN: stdin is empty") try: @@ -338,13 +348,12 @@ def convert_img_to_otp_url(filename): decoded_text = QReader().detect_and_decode(img) if decoded_text is None: abort(f"\nERROR: Unable to read QR Code from file.\ninput file: {filename}") - - return [decoded_text] except Exception as e: abort(f"\nERROR: Encountered exception '{e}'.\ninput file: {filename}") + return [decoded_text] -def get_payload_from_otp_url(otpauth_migration_url, i, input_source): +def get_payload_from_otp_url(otpauth_migration_url: str, i: int, input_source: str) -> migration_protobuf.MigrationPayload: if not otpauth_migration_url.startswith('otpauth-migration://'): eprint(f"\nWARN: line is not a otpauth-migration:// URL\ninput: {input_source}\nline '{otpauth_migration_url}'\nProbably a wrong file was given") parsed_url = urlparse.urlparse(otpauth_migration_url) @@ -352,7 +361,7 @@ def get_payload_from_otp_url(otpauth_migration_url, i, input_source): try: params = urlparse.parse_qs(parsed_url.query, strict_parsing=True) except Exception: # Not necessary for Python >= 3.11 - params = [] + params = {} if verbose > 2: print(f"\nDEBUG: querystring params={params}") if 'data' not in params: abort(f"\nERROR: no data query parameter in input URL\ninput file: {input_source}\nline '{otpauth_migration_url}'\nProbably a wrong file was given") @@ -361,7 +370,7 @@ def get_payload_from_otp_url(otpauth_migration_url, i, input_source): data_base64_fixed = data_base64.replace(' ', '+') if verbose > 2: print(f"\nDEBUG: data_base64_fixed={data_base64_fixed}") data = base64.b64decode(data_base64_fixed, validate=True) - payload = protobuf_generated_python.google_auth_pb2.MigrationPayload() + payload = migration_protobuf.MigrationPayload() try: payload.ParseFromString(data) except Exception: @@ -374,28 +383,28 @@ def get_payload_from_otp_url(otpauth_migration_url, i, input_source): # https://stackoverflow.com/questions/40226049/find-enums-listed-in-python-descriptor-for-protobuf -def get_enum_name_by_number(parent, field_name): +def get_enum_name_by_number(parent: Any, field_name: str) -> str: field_value = getattr(parent, field_name) - return parent.DESCRIPTOR.fields_by_name[field_name].enum_type.values_by_number.get(field_value).name + return parent.DESCRIPTOR.fields_by_name[field_name].enum_type.values_by_number.get(field_value).name # type: ignore # generic code -def get_otp_type_str_from_code(otp_type): +def get_otp_type_str_from_code(otp_type: int) -> str: return 'totp' if otp_type == 2 else 'hotp' -def convert_secret_from_bytes_to_base32_str(bytes): +def convert_secret_from_bytes_to_base32_str(bytes: bytes) -> str: return str(base64.b32encode(bytes), 'utf-8').replace('=', '') -def build_otp_url(secret, raw_otp): +def build_otp_url(secret: str, raw_otp: migration_protobuf.MigrationPayload.OtpParameters) -> str: url_params = {'secret': secret} - if raw_otp.type == 1: url_params['counter'] = raw_otp.counter + if raw_otp.type == 1: url_params['counter'] = str(raw_otp.counter) if raw_otp.issuer: url_params['issuer'] = raw_otp.issuer otp_url = f"otpauth://{get_otp_type_str_from_code(raw_otp.type)}/{urlparse.quote(raw_otp.name)}?" + urlparse.urlencode(url_params) return otp_url -def print_otp(otp): +def print_otp(otp: Otp) -> None: print(f"Name: {otp['name']}") print(f"Secret: {otp['secret']}") if otp['issuer']: print(f"Issuer: {otp['issuer']}") @@ -406,7 +415,7 @@ def print_otp(otp): print(otp['url']) -def save_qr(otp, args, j): +def save_qr(otp: Otp, args: Args, j: int) -> str: dir = args.saveqr if not (os.path.exists(dir)): os.makedirs(dir, exist_ok=True) pattern = re.compile(r'[\W_]+') @@ -416,21 +425,21 @@ def save_qr(otp, args, j): return file_otp_issuer -def save_qr_file(args, data, name): +def save_qr_file(args: Args, otp_url: OtpUrl, name: str) -> None: qr = QRCode() - qr.add_data(data) + qr.add_data(otp_url) img = qr.make_image(fill_color='black', back_color='white') if verbose: print(f"Saving to {name}") img.save(name) -def print_qr(args, data): +def print_qr(args: Args, otp_url: str) -> None: qr = QRCode() - qr.add_data(data) + qr.add_data(otp_url) qr.print_ascii() -def write_csv(args, otps): +def write_csv(args: Args, otps: Otps) -> None: if args.csv and len(otps) > 0: with open_file_or_stdout_for_csv(args.csv) as outfile: writer = csv.DictWriter(outfile, otps[0].keys()) @@ -439,7 +448,7 @@ def write_csv(args, otps): if not quiet: print(f"Exported {len(otps)} otp{'s'[:len(otps) != 1]} to csv {args.csv}") -def write_keepass_csv(args, otps): +def write_keepass_csv(args: Args, otps: Otps) -> None: if args.keepass and len(otps) > 0: has_totp = has_otp_type(otps, 'totp') has_hotp = has_otp_type(otps, 'hotp') @@ -479,34 +488,34 @@ def write_keepass_csv(args, otps): if count_hotp_entries > 0: print(f"Exported {count_hotp_entries} hotp entrie{'s'[:count_hotp_entries != 1]} to keepass csv file {otp_filename_hotp}") -def write_json(args, otps): +def write_json(args: Args, otps: Otps) -> None: if args.json: with open_file_or_stdout(args.json) as outfile: json.dump(otps, outfile, indent=4) if not quiet: print(f"Exported {len(otps)} otp{'s'[:len(otps) != 1]} to json {args.json}") -def has_otp_type(otps, otp_type): +def has_otp_type(otps: Otps, otp_type: str) -> bool: for otp in otps: if otp['type'] == otp_type: return True return False -def add_pre_suffix(file, pre_suffix): +def add_pre_suffix(file: str, pre_suffix: str) -> str: '''filename.ext, pre -> filename.pre.ext''' name, ext = os.path.splitext(file) return name + "." + pre_suffix + (ext if ext else "") -def open_file_or_stdout(filename): +def open_file_or_stdout(filename: str) -> TextIO: '''stdout is denoted as "-". Note: Set before the following line: sys.stdout.close = lambda: None''' return open(filename, "w", encoding='utf-8') if filename != '-' else sys.stdout -def open_file_or_stdout_for_csv(filename): +def open_file_or_stdout_for_csv(filename: str) -> TextIO: '''stdout is denoted as "-". newline='' Note: Set before the following line: @@ -514,13 +523,13 @@ def open_file_or_stdout_for_csv(filename): return open(filename, "w", encoding='utf-8', newline='') if filename != '-' else sys.stdout -def check_file_exists(filename): +def check_file_exists(filename: str) -> None: if filename != '-' and not os.path.isfile(filename): abort(f"\nERROR: Input file provided is non-existent or not a file." f"\ninput file: {filename}") -def is_binary(line): +def is_binary(line: str) -> bool: try: line.startswith('#') return False @@ -528,12 +537,12 @@ def is_binary(line): return True -def eprint(*args, **kwargs): +def eprint(*args: Any, **kwargs: Any) -> None: '''Print to stderr.''' print(*args, file=sys.stderr, **kwargs) -def abort(*args, **kwargs): +def abort(*args: Any, **kwargs: Any) -> None: eprint(*args, **kwargs) sys.exit(1) diff --git a/protobuf_generated_python/google_auth_pb2.pyi b/protobuf_generated_python/google_auth_pb2.pyi new file mode 100644 index 0000000..f4fb74c --- /dev/null +++ b/protobuf_generated_python/google_auth_pb2.pyi @@ -0,0 +1,108 @@ +""" +@generated by mypy-protobuf. Do not edit manually! +isort:skip_file +""" +import builtins +import collections.abc +import google.protobuf.descriptor +import google.protobuf.internal.containers +import google.protobuf.internal.enum_type_wrapper +import google.protobuf.message +import sys +import typing + +if sys.version_info >= (3, 10): + import typing as typing_extensions +else: + import typing_extensions + +DESCRIPTOR: google.protobuf.descriptor.FileDescriptor + +@typing_extensions.final +class MigrationPayload(google.protobuf.message.Message): + """Copied from: https://github.com/beemdevelopment/Aegis/blob/master/app/src/main/proto/google_auth.proto""" + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + class _Algorithm: + ValueType = typing.NewType("ValueType", builtins.int) + V: typing_extensions.TypeAlias = ValueType + + class _AlgorithmEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[MigrationPayload._Algorithm.ValueType], builtins.type): # noqa: F821 + DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor + ALGO_INVALID: MigrationPayload._Algorithm.ValueType # 0 + ALGO_SHA1: MigrationPayload._Algorithm.ValueType # 1 + + class Algorithm(_Algorithm, metaclass=_AlgorithmEnumTypeWrapper): ... + ALGO_INVALID: MigrationPayload.Algorithm.ValueType # 0 + ALGO_SHA1: MigrationPayload.Algorithm.ValueType # 1 + + class _OtpType: + ValueType = typing.NewType("ValueType", builtins.int) + V: typing_extensions.TypeAlias = ValueType + + class _OtpTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[MigrationPayload._OtpType.ValueType], builtins.type): # noqa: F821 + DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor + OTP_INVALID: MigrationPayload._OtpType.ValueType # 0 + OTP_HOTP: MigrationPayload._OtpType.ValueType # 1 + OTP_TOTP: MigrationPayload._OtpType.ValueType # 2 + + class OtpType(_OtpType, metaclass=_OtpTypeEnumTypeWrapper): ... + OTP_INVALID: MigrationPayload.OtpType.ValueType # 0 + OTP_HOTP: MigrationPayload.OtpType.ValueType # 1 + OTP_TOTP: MigrationPayload.OtpType.ValueType # 2 + + @typing_extensions.final + class OtpParameters(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + SECRET_FIELD_NUMBER: builtins.int + NAME_FIELD_NUMBER: builtins.int + ISSUER_FIELD_NUMBER: builtins.int + ALGORITHM_FIELD_NUMBER: builtins.int + DIGITS_FIELD_NUMBER: builtins.int + TYPE_FIELD_NUMBER: builtins.int + COUNTER_FIELD_NUMBER: builtins.int + secret: builtins.bytes + name: builtins.str + issuer: builtins.str + algorithm: global___MigrationPayload.Algorithm.ValueType + digits: builtins.int + type: global___MigrationPayload.OtpType.ValueType + counter: builtins.int + def __init__( + self, + *, + secret: builtins.bytes = ..., + name: builtins.str = ..., + issuer: builtins.str = ..., + algorithm: global___MigrationPayload.Algorithm.ValueType = ..., + digits: builtins.int = ..., + type: global___MigrationPayload.OtpType.ValueType = ..., + counter: builtins.int = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["algorithm", b"algorithm", "counter", b"counter", "digits", b"digits", "issuer", b"issuer", "name", b"name", "secret", b"secret", "type", b"type"]) -> None: ... + + OTP_PARAMETERS_FIELD_NUMBER: builtins.int + VERSION_FIELD_NUMBER: builtins.int + BATCH_SIZE_FIELD_NUMBER: builtins.int + BATCH_INDEX_FIELD_NUMBER: builtins.int + BATCH_ID_FIELD_NUMBER: builtins.int + @property + def otp_parameters(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___MigrationPayload.OtpParameters]: ... + version: builtins.int + batch_size: builtins.int + batch_index: builtins.int + batch_id: builtins.int + def __init__( + self, + *, + otp_parameters: collections.abc.Iterable[global___MigrationPayload.OtpParameters] | None = ..., + version: builtins.int = ..., + batch_size: builtins.int = ..., + batch_index: builtins.int = ..., + batch_id: builtins.int = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["batch_id", b"batch_id", "batch_index", b"batch_index", "batch_size", b"batch_size", "otp_parameters", b"otp_parameters", "version", b"version"]) -> None: ... + +global___MigrationPayload = MigrationPayload diff --git a/test_extract_otp_secret_keys_pytest.py b/test_extract_otp_secret_keys_pytest.py index e1f5515..7348978 100644 --- a/test_extract_otp_secret_keys_pytest.py +++ b/test_extract_otp_secret_keys_pytest.py @@ -21,8 +21,10 @@ import io import os import sys +import pathlib import pytest +from pytest_mock import MockerFixture import extract_otp_secret_keys from utils import (file_exits, quick_and_dirty_workaround_encoding_problem, @@ -30,10 +32,10 @@ from utils import (file_exits, quick_and_dirty_workaround_encoding_problem, read_file_to_str, read_json, read_json_str, replace_escaped_octal_utf8_bytes_with_str) -qreader_available = extract_otp_secret_keys.qreader_available +qreader_available: bool = extract_otp_secret_keys.qreader_available -def test_extract_stdout(capsys): +def test_extract_stdout(capsys: pytest.CaptureFixture[str]) -> None: # Act extract_otp_secret_keys.main(['example_export.txt']) @@ -44,7 +46,7 @@ def test_extract_stdout(capsys): assert captured.err == '' -def test_extract_non_existent_file(capsys): +def test_extract_non_existent_file(capsys: pytest.CaptureFixture[str]) -> None: # Act with pytest.raises(SystemExit) as e: extract_otp_secret_keys.main(['test/non_existent_file.txt']) @@ -60,7 +62,7 @@ def test_extract_non_existent_file(capsys): assert e.type == SystemExit -def test_extract_stdin_stdout(capsys, monkeypatch): +def test_extract_stdin_stdout(capsys: pytest.CaptureFixture[str], monkeypatch: pytest.MonkeyPatch) -> None: # Arrange monkeypatch.setattr('sys.stdin', io.StringIO(read_file_to_str('example_export.txt'))) @@ -74,7 +76,7 @@ def test_extract_stdin_stdout(capsys, monkeypatch): assert captured.err == '' -def test_extract_stdin_empty(capsys, monkeypatch): +def test_extract_stdin_empty(capsys: pytest.CaptureFixture[str], monkeypatch: pytest.MonkeyPatch) -> None: # Arrange monkeypatch.setattr('sys.stdin', io.StringIO()) @@ -89,7 +91,7 @@ def test_extract_stdin_empty(capsys, monkeypatch): # @pytest.mark.skipif(not qreader_available, reason='Test if cv2 and qreader are not available.') -def test_extract_empty_file_no_qreader(capsys): +def test_extract_empty_file_no_qreader(capsys: pytest.CaptureFixture[str]) -> None: if qreader_available: # Act with pytest.raises(SystemExit) as e: @@ -116,7 +118,7 @@ def test_extract_empty_file_no_qreader(capsys): @pytest.mark.qreader -def test_extract_stdin_img_empty(capsys, monkeypatch): +def test_extract_stdin_img_empty(capsys: pytest.CaptureFixture[str], monkeypatch: pytest.MonkeyPatch) -> None: # Arrange monkeypatch.setattr('sys.stdin', io.BytesIO()) @@ -130,7 +132,7 @@ def test_extract_stdin_img_empty(capsys, monkeypatch): assert captured.err == 'WARN: stdin is empty\n' -def test_extract_csv(capsys, tmp_path): +def test_extract_csv(capsys: pytest.CaptureFixture[str], tmp_path: pathlib.Path) -> None: # Arrange output_file = str(tmp_path / 'test_example_output.csv') @@ -149,7 +151,7 @@ def test_extract_csv(capsys, tmp_path): assert captured.err == '' -def test_extract_csv_stdout(capsys): +def test_extract_csv_stdout(capsys: pytest.CaptureFixture[str]) -> None: # Act extract_otp_secret_keys.main(['-c', '-', 'example_export.txt']) @@ -165,7 +167,7 @@ def test_extract_csv_stdout(capsys): assert captured.err == '' -def test_extract_stdin_and_csv_stdout(capsys, monkeypatch): +def test_extract_stdin_and_csv_stdout(capsys: pytest.CaptureFixture[str], monkeypatch: pytest.MonkeyPatch) -> None: # Arrange monkeypatch.setattr('sys.stdin', io.StringIO(read_file_to_str('example_export.txt'))) @@ -184,7 +186,7 @@ def test_extract_stdin_and_csv_stdout(capsys, monkeypatch): assert captured.err == '' -def test_keepass_csv(capsys, tmp_path): +def test_keepass_csv(capsys: pytest.CaptureFixture[str], tmp_path: pathlib.Path) -> None: '''Two csv files .totp and .htop are generated.''' # Arrange file_name = str(tmp_path / 'test_example_keepass_output.csv') @@ -208,7 +210,7 @@ def test_keepass_csv(capsys, tmp_path): assert captured.err == '' -def test_keepass_csv_stdout(capsys): +def test_keepass_csv_stdout(capsys: pytest.CaptureFixture[str]) -> None: '''Two csv files .totp and .htop are generated.''' # Act extract_otp_secret_keys.main(['-k', '-', 'test/example_export_only_totp.txt']) @@ -226,7 +228,7 @@ def test_keepass_csv_stdout(capsys): assert captured.err == '' -def test_single_keepass_csv(capsys, tmp_path): +def test_single_keepass_csv(capsys: pytest.CaptureFixture[str], tmp_path: pathlib.Path) -> None: '''Does not add .totp or .hotp pre-suffix''' # Act extract_otp_secret_keys.main(['-q', '-k', str(tmp_path / 'test_example_keepass_output.csv'), 'test/example_export_only_totp.txt']) @@ -245,7 +247,7 @@ def test_single_keepass_csv(capsys, tmp_path): assert captured.err == '' -def test_extract_json(capsys, tmp_path): +def test_extract_json(capsys: pytest.CaptureFixture[str], tmp_path: pathlib.Path) -> None: # Arrange output_file = str(tmp_path / 'test_example_output.json') @@ -264,7 +266,7 @@ def test_extract_json(capsys, tmp_path): assert captured.err == '' -def test_extract_json_stdout(capsys): +def test_extract_json_stdout(capsys: pytest.CaptureFixture[str]) -> None: # Act extract_otp_secret_keys.main(['-j', '-', 'example_export.txt']) @@ -278,7 +280,7 @@ def test_extract_json_stdout(capsys): assert captured.err == '' -def test_extract_not_encoded_plus(capsys): +def test_extract_not_encoded_plus(capsys: pytest.CaptureFixture[str]) -> None: # Act extract_otp_secret_keys.main(['test/test_plus_problem_export.txt']) @@ -311,7 +313,7 @@ Type: totp assert captured.err == '' -def test_extract_printqr(capsys): +def test_extract_printqr(capsys: pytest.CaptureFixture[str]) -> None: # Act extract_otp_secret_keys.main(['-p', 'example_export.txt']) @@ -324,7 +326,7 @@ def test_extract_printqr(capsys): assert captured.err == '' -def test_extract_saveqr(capsys, tmp_path): +def test_extract_saveqr(capsys: pytest.CaptureFixture[str], tmp_path: pathlib.Path) -> None: # Act extract_otp_secret_keys.main(['-q', '-s', str(tmp_path), 'example_export.txt']) @@ -340,12 +342,12 @@ def test_extract_saveqr(capsys, tmp_path): assert os.path.isfile(tmp_path / '4-piraspberrypi-raspberrypi.png') -def test_normalize_bytes(): +def test_normalize_bytes() -> None: assert replace_escaped_octal_utf8_bytes_with_str( 'Before\\\\302\\\\277\\\\303\nname: enc: \\302\\277\\303\\244\\303\\204\\303\\251\\303\\211?\nAfter') == 'Before\\\\302\\\\277\\\\303\nname: enc: ¿äÄéÉ?\nAfter' -def test_extract_verbose(capsys, relaxed): +def test_extract_verbose(capsys: pytest.CaptureFixture[str], relaxed: bool) -> None: # Act extract_otp_secret_keys.main(['-v', 'example_export.txt']) @@ -367,7 +369,7 @@ def test_extract_verbose(capsys, relaxed): assert captured.err == '' -def test_extract_debug(capsys): +def test_extract_debug(capsys: pytest.CaptureFixture[str]) -> None: # Act extract_otp_secret_keys.main(['-vvv', 'example_export.txt']) @@ -381,7 +383,7 @@ def test_extract_debug(capsys): assert captured.err == '' -def test_extract_help(capsys): +def test_extract_help(capsys: pytest.CaptureFixture[str]) -> None: with pytest.raises(SystemExit) as e: # Act extract_otp_secret_keys.main(['-h']) @@ -396,7 +398,7 @@ def test_extract_help(capsys): assert e.value.code == 0 -def test_extract_no_arguments(capsys, mocker): +def test_extract_no_arguments(capsys: pytest.CaptureFixture[str], mocker: MockerFixture) -> None: if qreader_available: # Arrange otps = read_json('example_output.json') @@ -429,7 +431,7 @@ def test_extract_no_arguments(capsys, mocker): assert e.type == SystemExit -def test_verbose_and_quiet(capsys): +def test_verbose_and_quiet(capsys: pytest.CaptureFixture[str]) -> None: with pytest.raises(SystemExit) as e: # Act extract_otp_secret_keys.main(['-v', '-q', 'example_export.txt']) @@ -444,7 +446,7 @@ def test_verbose_and_quiet(capsys): assert e.type == SystemExit -def test_wrong_data(capsys): +def test_wrong_data(capsys: pytest.CaptureFixture[str]) -> None: with pytest.raises(SystemExit) as e: # Act extract_otp_secret_keys.main(['test/test_export_wrong_data.txt']) @@ -463,7 +465,7 @@ data=XXXX assert e.type == SystemExit -def test_wrong_content(capsys): +def test_wrong_content(capsys: pytest.CaptureFixture[str]) -> None: with pytest.raises(SystemExit) as e: # Act extract_otp_secret_keys.main(['test/test_export_wrong_content.txt']) @@ -489,7 +491,7 @@ Probably a wrong file was given assert e.type == SystemExit -def test_wrong_prefix(capsys): +def test_wrong_prefix(capsys: pytest.CaptureFixture[str]) -> None: # Act extract_otp_secret_keys.main(['test/test_export_wrong_prefix.txt']) @@ -514,14 +516,14 @@ Type: totp assert captured.err == expected_stderr -def test_add_pre_suffix(capsys): +def test_add_pre_suffix(capsys: pytest.CaptureFixture[str]) -> None: assert extract_otp_secret_keys.add_pre_suffix("name.csv", "totp") == "name.totp.csv" assert extract_otp_secret_keys.add_pre_suffix("name.csv", "") == "name..csv" assert extract_otp_secret_keys.add_pre_suffix("name", "totp") == "name.totp" @pytest.mark.qreader -def test_img_qr_reader_from_file_happy_path(capsys): +def test_img_qr_reader_from_file_happy_path(capsys: pytest.CaptureFixture[str]) -> None: # Act extract_otp_secret_keys.main(['test/test_googleauth_export.png']) @@ -533,7 +535,7 @@ def test_img_qr_reader_from_file_happy_path(capsys): @pytest.mark.qreader -def test_extract_multiple_files_and_mixed(capsys): +def test_extract_multiple_files_and_mixed(capsys: pytest.CaptureFixture[str]) -> None: # Act extract_otp_secret_keys.main([ 'example_export.txt', @@ -549,7 +551,7 @@ def test_extract_multiple_files_and_mixed(capsys): @pytest.mark.qreader -def test_img_qr_reader_from_stdin(capsys, monkeypatch): +def test_img_qr_reader_from_stdin(capsys: pytest.CaptureFixture[str], monkeypatch: pytest.MonkeyPatch) -> None: # Arrange # sys.stdin.buffer should be monkey patched, but it does not work monkeypatch.setattr('sys.stdin', read_binary_file_as_stream('test/test_googleauth_export.png')) @@ -582,7 +584,7 @@ Type: totp @pytest.mark.qreader -def test_img_qr_reader_from_stdin_wrong_symbol(capsys, monkeypatch): +def test_img_qr_reader_from_stdin_wrong_symbol(capsys: pytest.CaptureFixture[str], monkeypatch: pytest.MonkeyPatch) -> None: # Arrange # sys.stdin.buffer should be monkey patched, but it does not work monkeypatch.setattr('sys.stdin', read_binary_file_as_stream('test/test_googleauth_export.png')) @@ -603,7 +605,7 @@ def test_img_qr_reader_from_stdin_wrong_symbol(capsys, monkeypatch): @pytest.mark.qreader -def test_extract_stdin_stdout_wrong_symbol(capsys, monkeypatch): +def test_extract_stdin_stdout_wrong_symbol(capsys: pytest.CaptureFixture[str], monkeypatch: pytest.MonkeyPatch) -> None: # Arrange monkeypatch.setattr('sys.stdin', io.StringIO(read_file_to_str('example_export.txt'))) @@ -623,7 +625,7 @@ def test_extract_stdin_stdout_wrong_symbol(capsys, monkeypatch): @pytest.mark.qreader -def test_img_qr_reader_no_qr_code_in_image(capsys): +def test_img_qr_reader_no_qr_code_in_image(capsys: pytest.CaptureFixture[str]) -> None: # Act with pytest.raises(SystemExit) as e: extract_otp_secret_keys.main(['test/lena_std.tif']) @@ -640,7 +642,7 @@ def test_img_qr_reader_no_qr_code_in_image(capsys): @pytest.mark.qreader -def test_img_qr_reader_nonexistent_file(capsys): +def test_img_qr_reader_nonexistent_file(capsys: pytest.CaptureFixture[str]) -> None: # Act with pytest.raises(SystemExit) as e: extract_otp_secret_keys.main(['test/nonexistent.bmp']) @@ -656,7 +658,7 @@ def test_img_qr_reader_nonexistent_file(capsys): assert e.type == SystemExit -def test_non_image_file(capsys): +def test_non_image_file(capsys: pytest.CaptureFixture[str]) -> None: # Act with pytest.raises(SystemExit) as e: extract_otp_secret_keys.main(['test/text_masquerading_as_image.jpeg']) diff --git a/test_extract_otp_secret_keys_unittest.py b/test_extract_otp_secret_keys_unittest.py index acdd851..25cfd5a 100644 --- a/test_extract_otp_secret_keys_unittest.py +++ b/test_extract_otp_secret_keys_unittest.py @@ -30,7 +30,7 @@ import extract_otp_secret_keys class TestExtract(unittest.TestCase): - def test_extract_csv(self): + def test_extract_csv(self) -> None: extract_otp_secret_keys.main(['-q', '-c', 'test_example_output.csv', 'example_export.txt']) expected_csv = read_csv('example_output.csv') @@ -38,7 +38,7 @@ class TestExtract(unittest.TestCase): self.assertEqual(actual_csv, expected_csv) - def test_extract_json(self): + def test_extract_json(self) -> None: extract_otp_secret_keys.main(['-q', '-j', 'test_example_output.json', 'example_export.txt']) expected_json = read_json('example_output.json') @@ -46,7 +46,7 @@ class TestExtract(unittest.TestCase): self.assertEqual(actual_json, expected_json) - def test_extract_stdout_1(self): + def test_extract_stdout_1(self) -> None: with Capturing() as output: extract_otp_secret_keys.main(['example_export.txt']) @@ -82,7 +82,7 @@ class TestExtract(unittest.TestCase): self.assertEqual(output, expected_output) # Ref for capturing https://stackoverflow.com/a/40984270 - def test_extract_stdout_2(self): + def test_extract_stdout_2(self) -> None: out = io.StringIO() with redirect_stdout(out): extract_otp_secret_keys.main(['example_export.txt']) @@ -118,7 +118,7 @@ Type: totp ''' self.assertEqual(actual_output, expected_output) - def test_extract_not_encoded_plus(self): + def test_extract_not_encoded_plus(self) -> None: out = io.StringIO() with redirect_stdout(out): extract_otp_secret_keys.main(['test/test_plus_problem_export.txt']) @@ -147,7 +147,7 @@ Type: totp ''' self.assertEqual(actual_output, expected_output) - def test_extract_printqr(self): + def test_extract_printqr(self) -> None: out = io.StringIO() with redirect_stdout(out): extract_otp_secret_keys.main(['-p', 'example_export.txt']) @@ -157,7 +157,7 @@ Type: totp self.assertEqual(actual_output, expected_output) - def test_extract_saveqr(self): + def test_extract_saveqr(self) -> None: extract_otp_secret_keys.main(['-q', '-s', 'testout/qr/', 'example_export.txt']) self.assertTrue(os.path.isfile('testout/qr/1-piraspberrypi-raspberrypi.png')) @@ -165,7 +165,7 @@ Type: totp self.assertTrue(os.path.isfile('testout/qr/3-piraspberrypi.png')) self.assertTrue(os.path.isfile('testout/qr/4-piraspberrypi-raspberrypi.png')) - def test_extract_verbose(self): + def test_extract_verbose(self) -> None: if sys.implementation.name == 'pypy': self.skipTest("Encoding problems in verbose mode in pypy.") out = io.StringIO() with redirect_stdout(out): @@ -176,7 +176,7 @@ Type: totp self.assertEqual(actual_output, expected_output) - def test_extract_debug(self): + def test_extract_debug(self) -> None: out = io.StringIO() with redirect_stdout(out): extract_otp_secret_keys.main(['-vvv', 'example_export.txt']) @@ -187,7 +187,7 @@ Type: totp self.assertGreater(len(actual_output), len(expected_stdout)) self.assertTrue("DEBUG: " in actual_output) - def test_extract_help_1(self): + def test_extract_help_1(self) -> None: out = io.StringIO() with redirect_stdout(out): try: @@ -201,7 +201,7 @@ Type: totp self.assertGreater(len(actual_output), 0) self.assertTrue("-h, --help" in actual_output and "--verbose, -v" in actual_output) - def test_extract_help_2(self): + def test_extract_help_2(self) -> None: out = io.StringIO() with redirect_stdout(out): with self.assertRaises(SystemExit) as context: @@ -213,7 +213,7 @@ Type: totp self.assertTrue("-h, --help" in actual_output and "--verbose, -v" in actual_output) self.assertEqual(context.exception.code, 0) - def test_extract_help_3(self): + def test_extract_help_3(self) -> None: with Capturing() as actual_output: with self.assertRaises(SystemExit) as context: extract_otp_secret_keys.main(['-h']) @@ -222,13 +222,13 @@ Type: totp self.assertTrue("-h, --help" in "\n".join(actual_output) and "--verbose, -v" in "\n".join(actual_output)) self.assertEqual(context.exception.code, 0) - def setUp(self): + def setUp(self) -> None: self.cleanup() - def tearDown(self): + def tearDown(self) -> None: self.cleanup() - def cleanup(self): + def cleanup(self) -> None: remove_file('test_example_output.csv') remove_file('test_example_output.json') remove_dir_with_files('testout/') diff --git a/test_extract_qrcode_unittest.py b/test_extract_qrcode_unittest.py index 422d370..aacc976 100644 --- a/test_extract_qrcode_unittest.py +++ b/test_extract_qrcode_unittest.py @@ -25,7 +25,7 @@ import extract_otp_secret_keys class TestQRImageExtract(unittest.TestCase): - def test_img_qr_reader_happy_path(self): + def test_img_qr_reader_happy_path(self) -> None: with Capturing() as actual_output: extract_otp_secret_keys.main(['test/test_googleauth_export.png']) @@ -36,7 +36,7 @@ class TestQRImageExtract(unittest.TestCase): self.assertEqual(actual_output, expected_output) - def test_img_qr_reader_no_qr_code_in_image(self): + def test_img_qr_reader_no_qr_code_in_image(self) -> None: with Capturing() as actual_output: with self.assertRaises(SystemExit) as context: extract_otp_secret_keys.main(['test/lena_std.tif']) @@ -46,7 +46,7 @@ class TestQRImageExtract(unittest.TestCase): self.assertEqual(actual_output, expected_output) self.assertEqual(context.exception.code, 1) - def test_img_qr_reader_nonexistent_file(self): + def test_img_qr_reader_nonexistent_file(self) -> None: with Capturing() as actual_output: with self.assertRaises(SystemExit) as context: extract_otp_secret_keys.main(['test/nonexistent.bmp']) @@ -56,7 +56,7 @@ class TestQRImageExtract(unittest.TestCase): self.assertEqual(actual_output, expected_output) self.assertEqual(context.exception.code, 1) - def test_img_qr_reader_non_image_file(self): + def test_img_qr_reader_non_image_file(self) -> None: with Capturing() as actual_output: with self.assertRaises(SystemExit) as context: extract_otp_secret_keys.main(['test/text_masquerading_as_image.jpeg']) @@ -77,13 +77,13 @@ class TestQRImageExtract(unittest.TestCase): self.assertEqual(actual_output, expected_output) self.assertEqual(context.exception.code, 1) - def setUp(self): + def setUp(self) -> None: self.cleanup() - def tearDown(self): + def tearDown(self) -> None: self.cleanup() - def cleanup(self): + def cleanup(self) -> None: pass diff --git a/upgrade_deps.sh b/upgrade_deps.sh index 2f36e72..67d8a05 100755 --- a/upgrade_deps.sh +++ b/upgrade_deps.sh @@ -79,7 +79,7 @@ BASEVERSION=4 echo interactive=true -check_version=true +ignore_version_check=true while test $# -gt 0; do case $1 in @@ -99,7 +99,7 @@ while test $# -gt 0; do shift ;; -C) - check_version=false + ignore_version_check=false shift ;; esac @@ -122,7 +122,7 @@ OLDVERSION=$(cat $BIN/$DEST/.VERSION.txt || echo "") echo -e "\nProtoc remote version $VERSION\n" echo -e "Protoc local version: $OLDVERSION\n" -if [ "$OLDVERSION" != "$VERSION" ]; then +if [ "$OLDVERSION" != "$VERSION" ] || ! $ignore_version_check; then echo "Upgrade protoc from $OLDVERSION to $VERSION" NAME="protoc-$VERSION" @@ -162,7 +162,7 @@ if [ "$OLDVERSION" != "$VERSION" ]; then if $interactive ; then askContinueYn "$cmd"; else echo -e "${cyan}$cmd${reset}";fi eval "$cmd" - cmd="$BIN/$DEST/bin/protoc --python_out=protobuf_generated_python google_auth.proto" + cmd="$BIN/$DEST/bin/protoc --plugin=protoc-gen-mypy=/home/rkurmann/.local/bin/protoc-gen-mypy --python_out=protobuf_generated_python --mypy_out=protobuf_generated_python google_auth.proto" if $interactive ; then askContinueYn "$cmd"; else echo -e "${cyan}$cmd${reset}";fi eval "$cmd" @@ -196,6 +196,18 @@ cmd="$PIP install -U pipenv" if $interactive ; then askContinueYn "$cmd"; else echo -e "${cyan}$cmd${reset}";fi eval "$cmd" +cmd="sudo $PIP install --use-pep517 -U -r requirements.txt" +if $interactive ; then askContinueYn "$cmd"; else echo -e "${cyan}$cmd${reset}";fi +eval "$cmd" + +cmd="sudo $PIP install --use-pep517 -U -r requirements-dev.txt" +if $interactive ; then askContinueYn "$cmd"; else echo -e "${cyan}$cmd${reset}";fi +eval "$cmd" + +cmd="sudo $PIP install -U pipenv" +if $interactive ; then askContinueYn "$cmd"; else echo -e "${cyan}$cmd${reset}";fi +eval "$cmd" + $PIPENV --version cmd="$PIPENV update && $PIPENV --rm && $PIPENV install" @@ -220,7 +232,7 @@ cmd="$MYPY --install-types --non-interactive" if $interactive ; then askContinueYn "$cmd"; else echo -e "${cyan}$cmd${reset}";fi eval "$cmd" -cmd="$MYPY *.py" +cmd="$MYPY --strict *.py" if $interactive ; then askContinueYn "$cmd"; else echo -e "${cyan}$cmd${reset}";fi eval "$cmd" diff --git a/utils.py b/utils.py index a7edfd8..9c60695 100644 --- a/utils.py +++ b/utils.py @@ -21,23 +21,26 @@ import os import re import shutil import sys +import pathlib +from typing import BinaryIO, Any # Ref. https://stackoverflow.com/a/16571630 -class Capturing(list): +class Capturing(list[Any]): '''Capture stdout and stderr Usage: with Capturing() as output: print("Output") ''' - def __enter__(self): + # TODO remove type ignore when fixed, see https://github.com/python/mypy/issues/11871, https://stackoverflow.com/questions/72174409/type-hinting-the-return-value-of-a-class-method-that-returns-self + def __enter__(self): # type: ignore self._stdout = sys.stdout sys.stdout = self._stringio_std = io.StringIO() self._stderr = sys.stderr sys.stderr = self._stringio_err = io.StringIO() return self - def __exit__(self, *args): + def __exit__(self, *args: Any) -> None: self.extend(self._stringio_std.getvalue().splitlines()) del self._stringio_std # free up some memory sys.stdout = self._stdout @@ -47,71 +50,71 @@ with Capturing() as output: sys.stderr = self._stderr -def file_exits(file): +def file_exits(file: str | pathlib.Path) -> bool: return os.path.isfile(file) -def remove_file(file): +def remove_file(file: str | pathlib.Path) -> None: if file_exits(file): os.remove(file) -def remove_files(glob_pattern): +def remove_files(glob_pattern: str) -> None: for f in glob.glob(glob_pattern): os.remove(f) -def remove_dir_with_files(dir): +def remove_dir_with_files(dir: str | pathlib.Path) -> None: if os.path.exists(dir): shutil.rmtree(dir) -def read_csv(filename): +def read_csv(filename: str) -> list[list[str]]: """Returns a list of lines.""" with open(filename, "r", encoding="utf-8", newline='') as infile: - lines = [] + lines: list[list[str]] = [] reader = csv.reader(infile) for line in reader: lines.append(line) return lines -def read_csv_str(str): +def read_csv_str(data_str: str) -> list[list[str]]: """Returns a list of lines.""" - lines = [] - reader = csv.reader(str.splitlines()) + lines: list[list[str]] = [] + reader = csv.reader(data_str.splitlines()) for line in reader: lines.append(line) return lines -def read_json(filename): +def read_json(filename: str) -> Any: """Returns a list or a dictionary.""" with open(filename, "r", encoding="utf-8") as infile: return json.load(infile) -def read_json_str(str): +def read_json_str(data_str: str) -> Any: """Returns a list or a dictionary.""" - return json.loads(str) + return json.loads(data_str) -def read_file_to_list(filename): +def read_file_to_list(filename: str) -> list[str]: """Returns a list of lines.""" with open(filename, "r", encoding="utf-8") as infile: return infile.readlines() -def read_file_to_str(filename): +def read_file_to_str(filename: str) -> str: """Returns a str.""" return "".join(read_file_to_list(filename)) -def read_binary_file_as_stream(filename): +def read_binary_file_as_stream(filename: str) -> BinaryIO: """Returns binary file content.""" with open(filename, "rb",) as infile: return io.BytesIO(infile.read()) -def replace_escaped_octal_utf8_bytes_with_str(str): +def replace_escaped_octal_utf8_bytes_with_str(str: str) -> str: encoded_name_strings = re.findall(r'name: .*$', str, flags=re.MULTILINE) for encoded_name_string in encoded_name_strings: escaped_bytes = re.findall(r'((?:\\[0-9]+)+)', encoded_name_string) @@ -122,5 +125,5 @@ def replace_escaped_octal_utf8_bytes_with_str(str): return str -def quick_and_dirty_workaround_encoding_problem(str): +def quick_and_dirty_workaround_encoding_problem(str: str) -> str: return re.sub(r'name: "encoding: .*$', '', str, flags=re.MULTILINE)