diff --git a/keymapper/linux.py b/keymapper/linux.py index 9dd3648b..cef5efc4 100644 --- a/keymapper/linux.py +++ b/keymapper/linux.py @@ -98,7 +98,62 @@ class KeycodeReader: # keycode_reader = KeycodeReader() -def _get_devices(pipe): +_devices = None + + +class GetDevicesProcess(multiprocessing.Process): + """Process to get the devices that can be worked with. + + Since InputDevice destructors take quite some time, do this + asynchronously so that they can take as much time as they want without + slowing down the initialization. To avoid evdevs asyncio stuff spamming + errors, do this with multiprocessing and not multithreading. + """ + def __init__(self, pipe): + """Construct the process. + + Parameters + ---------- + pipe : multiprocessing.Pipe + used to communicate the result + """ + self.pipe = pipe + super().__init__() + + def run(self): + """Do what get_devices describes.""" + devices = [evdev.InputDevice(path) for path in evdev.list_devices()] + + # group them together by usb device because there could be stuff like + # "Logitech USB Keyboard" and "Logitech USB Keyboard Consumer Control" + grouped = {} + for device in devices: + # only keyboard devices + # https://www.kernel.org/doc/html/latest/input/event-codes.html + if evdev.ecodes.EV_KEY not in device.capabilities().keys(): + continue + + usb = device.phys.split('/')[0] + if grouped.get(usb) is None: + grouped[usb] = [] + grouped[usb].append((device.name, device.path)) + + # now write down all the paths of that group + result = {} + for group in grouped.values(): + names = [entry[0] for entry in group] + devs = [entry[1] for entry in group] + shortest_name = sorted(names, key=len)[0] + result[shortest_name] = { + 'paths': devs, + 'devices': names + } + + self.pipe.send(result) + return result + + +def get_devices(): """Group devices and get relevant infos per group. Returns a list containing mappings of @@ -110,61 +165,11 @@ def _get_devices(pipe): They are grouped by usb port. """ - """ - evdev.list_devices -> string[] dev/input/event# paths - device = evdev.InputDevice(path) - device.capabilities().keys() ein array mit evdev.ecodes.EV_KEY oder - irgendn stuff der nicht interessiert - - device.phys -> - - device.phys usb-0000:03:00.0-4/input2 - device.phys usb-0000:03:00.0-4/input1 - device.phys usb-0000:03:00.0-4/input0 - device.phys usb-0000:03:00.0-3/input1 - device.phys usb-0000:03:00.0-3/input1 - device.phys usb-0000:03:00.0-3/input0 - """ - - devices = [evdev.InputDevice(path) for path in evdev.list_devices()] - - # group them together by usb device because there could be stuff like - # "Logitech USB Keyboard" and "Logitech USB Keyboard Consumer Control" - grouped = {} - for device in devices: - # only keyboard devices - # https://www.kernel.org/doc/html/latest/input/event-codes.html - if evdev.ecodes.EV_KEY not in device.capabilities().keys(): - continue - - usb = device.phys.split('/')[0] - if grouped.get(usb) is None: - grouped[usb] = [] - grouped[usb].append((device.name, device.path)) - - # now write down all the paths of that group - result = {} - for group in grouped.values(): - names = [entry[0] for entry in group] - devs = [entry[1] for entry in group] - shortest_name = sorted(names, key=len)[0] - result[shortest_name] = { - 'paths': devs, - 'devices': names - } - - pipe.send(result) - - -# populate once for the whole app. Since InputDevice destructors take -# quite some time, do this in a process that can take as much time as it -# wants after piping the result. -pipe = multiprocessing.Pipe() -multiprocessing.Process(target=_get_devices, args=(pipe[1],)).start() -# block until devices are available -_devices = pipe[0].recv() -logger.info('Found %s', ', '.join([f'"{name}"' for name in _devices])) - - -def get_devices(): + global _devices + if _devices is None: + pipe = multiprocessing.Pipe() + GetDevicesProcess(pipe[1]).start() + # block until devices are available + _devices = pipe[0].recv() + logger.info('Found %s', ', '.join([f'"{name}"' for name in _devices])) return _devices diff --git a/tests/test.py b/tests/test.py index f8f44b2f..19171ea3 100644 --- a/tests/test.py +++ b/tests/test.py @@ -24,48 +24,97 @@ import sys import unittest +from keymapper.logger import update_verbosity -# quickly fake some stuff before any other file gets a chance to import -# the original version -from keymapper import paths -paths.X11_SYMBOLS = '/tmp/key-mapper-test/X11/symbols' -paths.USERS_SYMBOLS = '/tmp/key-mapper-test/X11/symbols/key-mapper/user' -paths.DEFAULT_SYMBOLS = '/tmp/key-mapper-test/X11/symbols/key-mapper/user/default' -paths.KEYCODES_PATH = '/tmp/key-mapper-test/X11/keycodes/key-mapper' - -from keymapper import linux -linux._devices = { - 'device 1': { - 'paths': [ - '/dev/input/event10', - '/dev/input/event11', - '/dev/input/event13' - ], - 'names': [ - 'device 1 something', - 'device 1', - 'device 1' - ] - }, - 'device 2': { - 'paths': ['/dev/input/event3'], - 'names': ['device 2'] + +tmp = '/tmp/key-mapper-test' + + +def patch_paths(): + from keymapper import paths + prefix = '/tmp/key-mapper-test/X11/' + paths.X11_SYMBOLS = prefix + 'symbols' + paths.USERS_SYMBOLS = prefix + 'symbols/key-mapper/user' + paths.DEFAULT_SYMBOLS = prefix + 'symbols/key-mapper/user/default' + paths.KEYCODES_PATH = prefix + 'keycodes/key-mapper' + + +def patch_linux(): + from keymapper import linux + linux.KeycodeReader.start_reading = lambda *args: None + linux.KeycodeReader.read = lambda *args: None + + +def patch_evdev(): + import evdev + # key-mapper is only interested in devices that have EV_KEY, add some + # random other stuff to test that they are ignored. + fixtures = { + # device 1 + '/dev/input/event11': { + 'capabilities': {evdev.ecodes.EV_KEY: [], evdev.ecodes.EV_ABS: []}, + 'phys': 'usb-0000:03:00.0-1/input2', + 'name': 'device 1 foo' + }, + '/dev/input/event10': { + 'capabilities': {evdev.ecodes.EV_KEY: []}, + 'phys': 'usb-0000:03:00.0-1/input3', + 'name': 'device 1' + }, + '/dev/input/event13': { + 'capabilities': {evdev.ecodes.EV_KEY: [], evdev.ecodes.EV_SYN: []}, + 'phys': 'usb-0000:03:00.0-1/input1', + 'name': 'device 1' + }, + '/dev/input/event14': { + 'capabilities': {evdev.ecodes.EV_SYN: []}, + 'phys': 'usb-0000:03:00.0-1/input0', + 'name': 'device 1 qux' + }, + + # device 2 + '/dev/input/event20': { + 'capabilities': {evdev.ecodes.EV_KEY: []}, + 'phys': 'usb-0000:03:00.0-2/input1', + 'name': 'device 2' + }, + + # something that is completely ignored + '/dev/input/event30': { + 'capabilities': {evdev.ecodes.EV_SYN: []}, + 'phys': 'usb-0000:03:00.0-3/input1', + 'name': 'device 3' + }, } -} -linux.get_devices = lambda: linux._devices -# don't block tests -from keymapper.gtk import unsaved -unsaved.unsaved_changes_dialog = lambda: unsaved.CONTINUE + def list_devices(): + return fixtures.keys() -from keymapper.logger import update_verbosity + class InputDevice: + def __init__(self, path): + self.path = path + self.phys = fixtures[path]['phys'] + self.name = fixtures[path]['name'] -# some class function stubs. -# can be overwritten in tests as well at any time. -linux.KeycodeReader.start_reading = lambda *args: None -linux.KeycodeReader.read = lambda *args: None + def capabilities(self): + return fixtures[self.path]['capabilities'] -tmp = '/tmp/key-mapper-test' + evdev.list_devices = list_devices + evdev.InputDevice = InputDevice + + +def patch_unsaved(): + # don't block tests + from keymapper.gtk import unsaved + unsaved.unsaved_changes_dialog = lambda: unsaved.CONTINUE + + +# quickly fake some stuff before any other file gets a chance to import +# the original versions +patch_paths() +patch_evdev() +patch_linux() +patch_unsaved() if __name__ == "__main__": diff --git a/tests/testcases/linux.py b/tests/testcases/linux.py new file mode 100644 index 00000000..90a318be --- /dev/null +++ b/tests/testcases/linux.py @@ -0,0 +1,56 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# key-mapper - GUI for device specific keyboard mappings +# Copyright (C) 2020 sezanzeb +# +# This file is part of key-mapper. +# +# key-mapper is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# key-mapper is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with key-mapper. If not, see . + + +import unittest + +from keymapper.linux import GetDevicesProcess + + +class TestLinux(unittest.TestCase): + def test_create_preset_1(self): + class FakePipe: + def send(self, stuff): + pass + + # don't actually start the process, just use the `run` function. + # otherwise the coverage tool can't keep track. + devices = GetDevicesProcess(FakePipe()).run() + self.assertDictEqual(devices, { + 'device 1': { + 'paths': [ + '/dev/input/event11', + '/dev/input/event10', + '/dev/input/event13'], + 'devices': [ + 'device 1 foo', + 'device 1', + 'device 1' + ] + }, + 'device 2': { + 'paths': ['/dev/input/event20'], + 'devices': ['device 2'] + } + }) + + +if __name__ == "__main__": + unittest.main()