From d4f0bf8d5ca7efcd54fff59f23182fdf958f0682 Mon Sep 17 00:00:00 2001 From: sezanzeb Date: Fri, 12 Feb 2021 20:29:26 +0100 Subject: [PATCH] fixed autoloading via udev --- keymapper/config.py | 6 ++-- keymapper/daemon.py | 55 ++++++++++++++++++++++++++--- keymapper/getdevices.py | 5 +-- readme/pylint.svg | 4 +-- tests/test.py | 1 + tests/testcases/test_control.py | 12 +++---- tests/testcases/test_daemon.py | 43 ++++++++++++++++++++-- tests/testcases/test_injector.py | 1 - tests/testcases/test_integration.py | 5 +++ 9 files changed, 113 insertions(+), 19 deletions(-) diff --git a/keymapper/config.py b/keymapper/config.py index bf1b40b4..30a68a77 100644 --- a/keymapper/config.py +++ b/keymapper/config.py @@ -26,6 +26,7 @@ import os import sys import json import shutil +import copy from keymapper.paths import CONFIG_PATH, USER, touch from keymapper.logger import logger @@ -162,7 +163,8 @@ class ConfigBase: if resolved is None and log_unknown: logger.error('Unknown config key "%s"', path) - return resolved + # modifications are only allowed via set + return copy.deepcopy(resolved) def clear_config(self): """Remove all configurations in memory.""" @@ -236,7 +238,7 @@ class GlobalConfig(ConfigBase): # treated like an empty config logger.debug('Config "%s" doesn\'t exist yet', self.path) self.clear_config() - self._config = INITIAL_CONFIG + self._config = copy.deepcopy(INITIAL_CONFIG) self.save_config() return diff --git a/keymapper/daemon.py b/keymapper/daemon.py index 2300b5ff..63a2b395 100644 --- a/keymapper/daemon.py +++ b/keymapper/daemon.py @@ -30,6 +30,7 @@ import subprocess import json import time +import evdev from pydbus import SystemBus from gi.repository import GLib @@ -215,6 +216,33 @@ class Daemon: self.config_dir = None self.autoload_history = AutoloadHistory() + self.refreshed_devices_at = 0 + + def refresh_devices(self, device=None): + """Keep the devices up to date.""" + now = time.time() + if now - 10 > self.refreshed_devices_at: + logger.debug('Refreshing because last info is too old') + refresh_devices() + self.refreshed_devices_at = now + return + + if device is not None: + if device.startswith('/dev/input/'): + for group in get_devices().values(): + if device in group['paths']: + break + else: + logger.debug('Refreshing because path unknown') + refresh_devices() + self.refreshed_devices_at = now + return + else: + if device not in get_devices(): + logger.debug('Refreshing because name unknown') + refresh_devices() + self.refreshed_devices_at = now + return def stop_injecting(self, device): """Stop injecting the mapping for a single device.""" @@ -262,7 +290,15 @@ class Daemon: Device name. Expects a key that is present in get_devices(). Can also be a path starting with /dev/input/ """ + self.refresh_devices(device) + device = path_to_device_name(device) + if device not in get_devices(): + # even after refresh_devices, the device is not in + # get_devices(), so it's either not relevant for key-mapper, + # or not connected yet + return + preset = config.get(['autoload', device], log_unknown=False) if preset is None: @@ -297,6 +333,19 @@ class Daemon: device : str The name of the device as indexed in get_devices() """ + if device.startswith('/dev/input/'): + # this is only here to avoid confusing console output, + # block invalid requests before any logs are written. + # Those requests are rejected later anyway. + try: + name = evdev.InputDevice(device).name + if 'key-mapper' in name: + return + except OSError: + return + + logger.info('Request to autoload for "%s"', device) + if self.config_dir is None: logger.error( 'Tried to autoload %s without configuring the daemon first ' @@ -343,6 +392,8 @@ class Daemon: preset : string The name of the preset """ + self.refresh_devices(device) + device = path_to_device_name(device) if self.config_dir is None: @@ -352,10 +403,6 @@ class Daemon: ) return - if device not in get_devices(): - logger.debug('Devices possibly outdated, refreshing') - refresh_devices() - if device not in get_devices(): logger.error('Could not find device "%s"', device) return diff --git a/keymapper/getdevices.py b/keymapper/getdevices.py index 25ef4bbe..ab015bf9 100644 --- a/keymapper/getdevices.py +++ b/keymapper/getdevices.py @@ -102,12 +102,13 @@ class _GetDevices(threading.Thread): asyncio.set_event_loop(loop) logger.debug('Discovering device paths') - devices = [evdev.InputDevice(path) for path in evdev.list_devices()] # group them together by usb device because there could be stuff like # "Logitech USB Keyboard" and "Logitech USB Keyboard Consumer Control" grouped = {} - for device in devices: + for path in evdev.list_devices(): + device = evdev.InputDevice(path) + if device.name == 'Power Button': continue diff --git a/readme/pylint.svg b/readme/pylint.svg index 958382ca..6d3f49fe 100644 --- a/readme/pylint.svg +++ b/readme/pylint.svg @@ -17,7 +17,7 @@ pylint - 9.73 - 9.73 + 9.74 + 9.74 \ No newline at end of file diff --git a/tests/test.py b/tests/test.py index 83d01d52..a1b6341f 100644 --- a/tests/test.py +++ b/tests/test.py @@ -268,6 +268,7 @@ class InputDevice: ret = [e.copy() for e in pending_events.get(self.group, [])] if ret is not None: # consume all of them + self.log('read all', self.group) pending_events[self.group] = [] return ret diff --git a/tests/testcases/test_control.py b/tests/testcases/test_control.py index d63b9f8e..a38fe13c 100644 --- a/tests/testcases/test_control.py +++ b/tests/testcases/test_control.py @@ -66,8 +66,8 @@ class TestControl(unittest.TestCase): cleanup() def test_autoload(self): - devices = ['device 1234', 'device 2345'] - presets = ['preset', 'bar', 'bar2'] + devices = ['device 1', 'device 2'] + presets = ['bar0', 'bar', 'bar2'] paths = [ get_preset_path(devices[0], presets[0]), get_preset_path(devices[1], presets[1]), @@ -152,9 +152,9 @@ class TestControl(unittest.TestCase): self.assertTrue(daemon.autoload_history.may_autoload(devices[1], presets[2])) def test_autoload_other_path(self): - devices = ['device 1234', 'device 2345'] - presets = ['preset', 'bar'] - config_dir = os.path.join(tmp, 'foo', 'bar') + devices = ['device 1', 'device 2'] + presets = ['bar123', 'bar2'] + config_dir = os.path.join(tmp, 'qux', 'quux') paths = [ os.path.join(config_dir, 'presets', devices[0], presets[0] + '.json'), os.path.join(config_dir, 'presets', devices[1], presets[1] + '.json') @@ -182,7 +182,7 @@ class TestControl(unittest.TestCase): def test_start_stop(self): device = 'device 1234' - preset = 'preset' + preset = 'preset9' daemon = Daemon() diff --git a/tests/testcases/test_daemon.py b/tests/testcases/test_daemon.py index d994183f..deda85f5 100644 --- a/tests/testcases/test_daemon.py +++ b/tests/testcases/test_daemon.py @@ -255,6 +255,30 @@ class TestDaemon(unittest.TestCase): self.daemon.stop_injecting(device) self.assertEqual(self.daemon.get_state(device), STOPPED) + def test_refresh_devices_for_unknown_paths(self): + device = '9876 name' + # this test only makes sense if this device is unknown yet + self.assertIsNone(get_devices().get(device)) + + self.daemon = Daemon() + + # make sure the devices are populated + get_devices() + + self.daemon.refresh_devices() + + fixtures[self.new_fixture] = { + 'capabilities': {evdev.ecodes.EV_KEY: [evdev.ecodes.KEY_A]}, + 'phys': '9876 phys', + 'info': evdev.device.DeviceInfo(4, 5, 6, 7), + 'name': device + } + + self.daemon._autoload(self.new_fixture) + + # test if the injector called refresh_devices successfully + self.assertIsNotNone(get_devices().get(device)) + def test_xmodmap_file(self): from_keycode = evdev.ecodes.KEY_A to_name = 'qux' @@ -302,7 +326,7 @@ class TestDaemon(unittest.TestCase): def test_start_stop(self): device = 'device 1' - preset = 'preset' + preset = 'preset8' path = '/dev/input/event11' daemon = Daemon() @@ -369,7 +393,7 @@ class TestDaemon(unittest.TestCase): def test_autoload(self): device = 'device 1' - preset = 'preset' + preset = 'preset7' path = '/dev/input/event11' daemon = Daemon() @@ -389,10 +413,13 @@ class TestDaemon(unittest.TestCase): config.set_autoload_preset(device, preset) config.save_config() self.daemon.set_config_dir(get_config_path()) + len_before = len(self.daemon.autoload_history._autoload_history) self.daemon._autoload(path) + len_after = len(self.daemon.autoload_history._autoload_history) self.assertEqual(daemon.autoload_history._autoload_history[device][1], preset) self.assertFalse(daemon.autoload_history.may_autoload(device, preset)) injector = daemon.injectors[device] + self.assertEqual(len_before + 1, len_after) # calling duplicate _autoload does nothing self.daemon._autoload(path) @@ -404,6 +431,18 @@ class TestDaemon(unittest.TestCase): self.daemon.start_injecting(device, preset) self.assertTrue(daemon.autoload_history.may_autoload(device, preset)) + # calling autoload for (yet) unknown devices does nothing + len_before = len(self.daemon.autoload_history._autoload_history) + self.daemon._autoload('/dev/input/qux') + len_after = len(self.daemon.autoload_history._autoload_history) + self.assertEqual(len_before, len_after) + + # autoloading key-mapper devices does nothing + len_before = len(self.daemon.autoload_history._autoload_history) + self.daemon.autoload_single('/dev/input/event40') + len_after = len(self.daemon.autoload_history._autoload_history) + self.assertEqual(len_before, len_after) + if __name__ == "__main__": unittest.main() diff --git a/tests/testcases/test_injector.py b/tests/testcases/test_injector.py index 95ade8f5..510bbf93 100644 --- a/tests/testcases/test_injector.py +++ b/tests/testcases/test_injector.py @@ -420,7 +420,6 @@ class TestInjector(unittest.TestCase): # convert the write history to some easier to manage list history = read_write_history_pipe() - print(history) self.assertEqual(history.count((EV_KEY, 77, 1)), 1) self.assertEqual(history.count((EV_ABS, ABS_RZ, value)), 1) diff --git a/tests/testcases/test_integration.py b/tests/testcases/test_integration.py index 6d77c386..809a7bc8 100644 --- a/tests/testcases/test_integration.py +++ b/tests/testcases/test_integration.py @@ -951,6 +951,11 @@ class TestIntegration(unittest.TestCase): custom_mapping.set('gamepad.joystick.non_linearity', 1) self.assertEqual(speed, 2 ** 6) + # don't consume the events in the reader, they are used to test + # the injection + keycode_reader.stop_reading() + time.sleep(0.1) + pending_events['gamepad'] = [ new_event(EV_ABS, ABS_RX, -MAX_ABS), new_event(EV_ABS, ABS_X, MAX_ABS)