From 2afde0039f33ca1717c03ae998c988d71e43f9eb Mon Sep 17 00:00:00 2001 From: sezanzeb Date: Sun, 22 Nov 2020 16:54:40 +0100 Subject: [PATCH] wip reading keycodes even though the device is grabbed --- keymapper/config.py | 7 +++++- keymapper/getdevices.py | 32 ++++++++++++++++-------- keymapper/gtk/row.py | 6 +++-- keymapper/injector.py | 40 +++++++++++++++++++++++------- keymapper/reader.py | 46 +++++++++++++++++++++-------------- tests/testcases/getdevices.py | 40 ++++++++++++++++++++++++------ 6 files changed, 124 insertions(+), 47 deletions(-) diff --git a/keymapper/config.py b/keymapper/config.py index 2679a247..f98d8205 100644 --- a/keymapper/config.py +++ b/keymapper/config.py @@ -64,7 +64,12 @@ class _Config: self._config['map_EV_REL_devices'] = active def may_modify_movement_devices(self): - """Get if devices that control movements may be modified as well.""" + """Get if devices that control movements may be modified as well. + + Since movement events happen quite often and fast, I'd like to + add the option to disabling mapping those if it affects their + performance. TODO figure out which devices to inject to instead? + """ return self._config['map_EV_REL_devices'] def load_config(self): diff --git a/keymapper/getdevices.py b/keymapper/getdevices.py index 7ede72a4..f1629188 100644 --- a/keymapper/getdevices.py +++ b/keymapper/getdevices.py @@ -23,6 +23,8 @@ import multiprocessing +import time +import copy import evdev @@ -62,10 +64,6 @@ class _GetDevicesProcess(multiprocessing.Process): # "Logitech USB Keyboard" and "Logitech USB Keyboard Consumer Control" grouped = {} for device in devices: - if device.phys.startswith('key-mapper'): - # injector device, not really periphery - continue - # only keyboard devices # https://www.kernel.org/doc/html/latest/input/event-codes.html capabilities = device.capabilities().keys() @@ -76,8 +74,6 @@ class _GetDevicesProcess(multiprocessing.Process): not config.may_modify_movement_devices() and evdev.ecodes.EV_REL in capabilities ): - # skip devices that control movement to avoid affecting - # their performance due to the amount of their events. # TODO add checkbox to automatically load # a preset on login logger.debug( @@ -90,7 +86,7 @@ class _GetDevicesProcess(multiprocessing.Process): if grouped.get(usb) is None: grouped[usb] = [] - logger.debug('Found "%s", %s, %s', device.name, device.path, usb) + logger.spam('Found "%s", %s, %s', device.name, device.path, usb) grouped[usb].append((device.name, device.path)) @@ -109,13 +105,19 @@ class _GetDevicesProcess(multiprocessing.Process): def refresh_devices(): - """Get new devices, e.g. new ones created by key-mapper.""" + """Get new devices, e.g. new ones created by key-mapper. + + This should be called whenever devices in /dev are added or removed. + """ + # it may take a little bit of time until devices are visible after + # changes + time.sleep(0.1) global _devices _devices = None return get_devices() -def get_devices(): +def get_devices(include_keymapper=False): """Group devices and get relevant infos per group. Returns a list containing mappings of @@ -138,4 +140,14 @@ def get_devices(): else: names = [f'"{name}"' for name in _devices] logger.info('Found %s', ', '.join(names)) - return _devices + + # filter the result + result = {} + for device in _devices.keys(): + if include_keymapper and device.startswith('key-mapper'): + result[device] = _devices[device] + continue + + result[device] = _devices[device] + + return result diff --git a/keymapper/gtk/row.py b/keymapper/gtk/row.py index 39f0a879..cfc12aab 100644 --- a/keymapper/gtk/row.py +++ b/keymapper/gtk/row.py @@ -86,8 +86,10 @@ class Row(Gtk.ListBoxRow): self.keycode.set_label(str(new_keycode)) # switch to the character, don't require mouse input because # that would overwrite the key with the mouse-button key if - # the current device is a mouse - self.window.window.set_focus(self.character_input) + # the current device is a mouse. idle_add this so that the + # keycode event won't write into the character input as well. + window = self.window.window + GLib.idle_add(lambda: window.set_focus(self.character_input)) self.highlight() # the character is empty and therefore the mapping is not complete diff --git a/keymapper/injector.py b/keymapper/injector.py index 42a55c28..f20e424d 100644 --- a/keymapper/injector.py +++ b/keymapper/injector.py @@ -34,13 +34,14 @@ import multiprocessing import evdev from keymapper.logger import logger -from keymapper.getdevices import get_devices +from keymapper.getdevices import get_devices, refresh_devices from keymapper.state import custom_mapping, system_mapping DEV_NAME = 'key-mapper' DEVICE_CREATED = 1 FAILED = 2 +DEVICE_SKIPPED = 3 def _grab(path): @@ -88,6 +89,24 @@ def _modify_capabilities(device): return capabilities +def _is_device_mapped(device, mapping): + """Check if this device has capabilities that are being mapped. + + Parameters + ---------- + device : evdev.InputDevice + mapping : Mapping + """ + capabilities = device.capabilities(absinfo=False)[evdev.ecodes.EV_KEY] + needed = False + for keycode, _ in custom_mapping: + if keycode in capabilities: + needed = True + if not needed: + logger.debug('No need to grab %s', device.path) + return needed + + def _start_injecting_worker(path, pipe, mapping): """Inject keycodes for one of the virtual devices. @@ -108,19 +127,21 @@ def _start_injecting_worker(path, pipe, mapping): pipe.send(FAILED) return - capabilities = _modify_capabilities(device) + if not _is_device_mapped(device, mapping): + pipe.send(DEVICE_SKIPPED) + return keymapper_device = evdev.UInput( name=f'key-mapper {device.name}', phys='key-mapper', - events=capabilities + events=_modify_capabilities(device) ) pipe.send(DEVICE_CREATED) logger.debug( 'Started injecting into %s, fd %s', - device.path, keymapper_device.fd + keymapper_device.device.path, keymapper_device.fd ) for event in device.read_loop(): @@ -222,11 +243,7 @@ class KeycodeInjector: paths = get_devices()[self.device]['paths'] - logger.info( - 'Starting injecting the mapping for %s on %s', - self.device, - ', '.join(paths) - ) + logger.info('Starting injecting the mapping for %s', self.device) # Watch over each one of the potentially multiple devices per hardware for path in paths: @@ -247,6 +264,9 @@ class KeycodeInjector: if len(self.processes) == 0: raise OSError('Could not grab any device') + # key-mapper devices were added, freshly scan /dev/input + refresh_devices() + @ensure_numlock def stop_injecting(self): """Stop injecting keycodes.""" @@ -258,3 +278,5 @@ class KeycodeInjector: if process.is_alive(): process.terminate() self.processes[i] = None + + refresh_devices() diff --git a/keymapper/reader.py b/keymapper/reader.py index 84041b5b..a915c4ff 100644 --- a/keymapper/reader.py +++ b/keymapper/reader.py @@ -50,20 +50,31 @@ class _KeycodeReader: If read is called without prior start_reading, no keycodes will be available. - """ - paths = get_devices()[device]['paths'] - - logger.debug( - 'Starting reading keycodes for %s on %s', - device, - ', '.join(paths) - ) - # Watch over each one of the potentially multiple devices per hardware - self.virtual_devices = [ - evdev.InputDevice(path) - for path in paths - ] + Parameters + ---------- + device : string + The name of the device. + """ + groups = get_devices(include_keymapper=True) + for name, group in groups.items(): + # also find stuff like "key-mapper {device}" + if device not in name: + return + + paths = group['paths'] + + # Watch over each one of the potentially multiple devices per + # hardware + self.virtual_devices = [ + evdev.InputDevice(path) + for path in paths + ] + + logger.debug( + 'Starting reading keycodes from "%s"', + '", "'.join([device.name for device in self.virtual_devices]) + ) def read(self): """Get the newest keycode or None if none was pressed.""" @@ -74,12 +85,11 @@ class _KeycodeReader: if event is None: break - logger.spam( - 'got code:%s value:%s', - event.code + 8, event.value - ) - if event.type == evdev.ecodes.EV_KEY and event.value == 1: + logger.spam( + 'got code:%s value:%s', + event.code + 8, event.value + ) # value: 1 for down, 0 for up, 2 for hold. # this happens to report key codes that are 8 lower # than the ones reported by evtest and used in xkb files diff --git a/tests/testcases/getdevices.py b/tests/testcases/getdevices.py index 1e8732cc..1c36c789 100644 --- a/tests/testcases/getdevices.py +++ b/tests/testcases/getdevices.py @@ -22,16 +22,18 @@ import unittest from keymapper.getdevices import _GetDevicesProcess +from keymapper.config import config -class TestGetDevices(unittest.TestCase): - def test_get_devices(self): - class FakePipe: - devices = None +class FakePipe: + devices = None + + def send(self, devices): + self.devices = devices - def send(self, devices): - self.devices = devices +class TestGetDevices(unittest.TestCase): + def test_get_devices(self): # don't actually start the process, just use the `run` function. # otherwise the coverage tool can't keep track. pipe = FakePipe() @@ -41,7 +43,8 @@ class TestGetDevices(unittest.TestCase): 'paths': [ '/dev/input/event11', '/dev/input/event10', - '/dev/input/event13'], + '/dev/input/event13' + ], 'devices': [ 'device 1 foo', 'device 1', @@ -54,6 +57,29 @@ class TestGetDevices(unittest.TestCase): } }) + def test_map_movement_devices(self): + pipe = FakePipe() + config.set_modify_movement_devices(False) + _GetDevicesProcess(pipe).run() + self.assertDictEqual(pipe.devices, { + 'device 1': { + 'paths': [ + '/dev/input/event11', + '/dev/input/event10', + '/dev/input/event13' + ], + 'devices': [ + 'device 1 foo', + 'device 1', + 'device 1' + ] + }, + 'device 2': { + 'paths': ['/dev/input/event20'], + 'devices': ['device 2'] + } + }) + if __name__ == "__main__": unittest.main()