diff --git a/data/key-mapper.glade b/data/key-mapper.glade index 44cb5d63..ec53d623 100644 --- a/data/key-mapper.glade +++ b/data/key-mapper.glade @@ -277,7 +277,6 @@ 350 mouse - True diff --git a/keymapper/dev/reader.py b/keymapper/dev/reader.py index 1de936e5..be8dfcff 100644 --- a/keymapper/dev/reader.py +++ b/keymapper/dev/reader.py @@ -23,6 +23,8 @@ import evdev +import select +import multiprocessing from keymapper.logger import logger from keymapper.getdevices import get_devices, refresh_devices @@ -41,20 +43,37 @@ class _KeycodeReader: """ def __init__(self): self.virtual_devices = [] + self._pipe = None + self._process = None + + def __del__(self): + self.stop_reading() + + def stop_reading(self): + # TODO something like this for the injector? + if self._process is not None: + logger.debug('Terminating reader process') + self._process.terminate() + self._process = None + + if self._pipe is not None: + logger.debug('Closing reader pipe') + self._pipe.close() + self._pipe = None def clear(self): """Next time when reading don't return the previous keycode.""" - # read all of them to clear the buffer or whatever - for virtual_device in self.virtual_devices: - while virtual_device.read_one(): - pass + # just call read to clear the pipe + self.read() - def start_reading(self, device): + def start_reading(self, device_name): """Tell the evdev lib to start looking for keycodes. If read is called without prior start_reading, no keycodes will be available. """ + self.stop_reading() + # make sure this sees up to date devices, including those created # by key-mapper refresh_devices() @@ -63,48 +82,72 @@ class _KeycodeReader: for name, group in get_devices(include_keymapper=True).items(): # also find stuff like "key-mapper {device}" - if device not in name: + if device_name not in name: continue # Watch over each one of the potentially multiple devices per # hardware for path in group['paths']: try: - self.virtual_devices.append(evdev.InputDevice(path)) + device = evdev.InputDevice(path) except FileNotFoundError: continue + if evdev.ecodes.EV_KEY in device.capabilities(): + self.virtual_devices.append(device) + logger.debug( 'Starting reading keycodes from "%s"', '", "'.join([device.name for device in self.virtual_devices]) ) - def read(self): - """Get the newest keycode or None if none was pressed.""" - newest_keycode = None - for virtual_device in self.virtual_devices: - while True: + pipe = multiprocessing.Pipe() + self._process = multiprocessing.Process( + target=self._read_worker, + args=(pipe[1],) + ) + self._process.start() + self._pipe = pipe[0] + + def _consume_event(self, event, pipe): + """Write the event code into the pipe if it is a key-down press.""" + # value: 1 for down, 0 for up, 2 for hold. + if event.type == evdev.ecodes.EV_KEY and event.value == 1: + logger.spam( + 'got code:%s value:%s', + event.code + KEYCODE_OFFSET, event.value + ) + pipe.send(event.code + KEYCODE_OFFSET) + + def _read_worker(self, pipe): + """Process that reads keycodes and buffers them into a pipe.""" + # using a process that blocks instead of read_one made it easier + # to debug via the logs, because the UI was not polling properly + # at some point which caused logs for events not to be written. + rlist = {device.fd: device for device in self.virtual_devices} + while True: + ready = select.select(rlist, [], [])[0] + for fd in ready: try: - event = virtual_device.read_one() + for event in rlist[fd].read(): + self._consume_event(event, pipe) except OSError: - # can happen if a device disappears logger.debug( - '%s cannot be read anymore', - virtual_device.name + 'Device "%s" disappeared from the reader', + rlist[fd].path ) - self.virtual_devices.remove(virtual_device) - break + del rlist[fd] - if event is None: - break + def read(self): + """Get the newest keycode or None if none was pressed.""" + if self._pipe is None: + logger.debug('No pipe available to read from') + return None + + newest_keycode = None + while self._pipe.poll(): + newest_keycode = self._pipe.recv() - if event.type == evdev.ecodes.EV_KEY and event.value == 1: - logger.spam( - 'got code:%s value:%s', - event.code + KEYCODE_OFFSET, event.value - ) - # value: 1 for down, 0 for up, 2 for hold. - newest_keycode = event.code + KEYCODE_OFFSET return newest_keycode diff --git a/keymapper/gtk/window.py b/keymapper/gtk/window.py index ab343c4c..77a1ba03 100755 --- a/keymapper/gtk/window.py +++ b/keymapper/gtk/window.py @@ -113,7 +113,10 @@ class Window: self.select_newest_preset() - self.timeout = GLib.timeout_add(100, self.check_add_row) + self.timeouts = [ + GLib.timeout_add(100, self.check_add_row), + GLib.timeout_add(1000 / 30, self.consume_newest_keycode) + ] # now show the proper finished content of the window self.get('vertical-wrapper').set_opacity(1) @@ -124,7 +127,9 @@ class Window: def on_close(self, *_): """Safely close the application.""" - GLib.source_remove(self.timeout) + for timeout in self.timeouts: + GLib.source_remove(timeout) + keycode_reader.stop_reading() Gtk.main_quit() def check_add_row(self): @@ -159,7 +164,10 @@ class Window: device_selection.append(device, device) def populate_presets(self): - """Show the available presets for the selected device.""" + """Show the available presets for the selected device. + + This will destroy unsaved changes in the custom_mapping. + """ self.get('preset_name_input').set_text('') device = self.selected_device @@ -167,6 +175,7 @@ class Window: if len(presets) == 0: new_preset = get_available_preset_name(self.selected_device) + custom_mapping.empty() custom_mapping.save(self.selected_device, new_preset) presets = [new_preset] else: @@ -195,22 +204,20 @@ class Window: key_list = self.get('key_list') key_list.forall(lambda row: row.unhighlight()) - def on_window_event(self, *args): - """Write down the pressed key in the UI. - - Triggered from any mouse and keyboard event. - """ - # to capture regular keyboard keys or extra-mouse keys + def consume_newest_keycode(self): + """To capture events from keyboard, mice and gamepads.""" + # the "event" event of Gtk.Window wouldn't trigger on gamepad + # events, so it became a GLib timeout keycode = keycode_reader.read() if keycode is None: - return + return True if keycode in [280, 333]: # disable mapping the left mouse button because it would break # the mouse. Also it is emitted right when focusing the row # which breaks the current workflow. - return + return True self.get('keycode').set_text(str(keycode)) @@ -220,6 +227,8 @@ class Window: if isinstance(focused, Gtk.ToggleButton) and isinstance(row, Row): row.set_new_keycode(keycode) + return True + def on_apply_system_layout_clicked(self, _): """Load the mapping.""" self.dbus.stop_injecting(self.selected_device) diff --git a/tests/test.py b/tests/test.py index 4d635467..4a22f050 100644 --- a/tests/test.py +++ b/tests/test.py @@ -148,6 +148,10 @@ def patch_evdev(): return fixtures.keys() class InputDevice: + # expose as existing attribute, otherwise the patch for + # evdev < 1.0.0 will crash the test + path = None + def __init__(self, path): self.path = path self.phys = fixtures[path]['phys'] diff --git a/tests/testcases/integration.py b/tests/testcases/integration.py index b78cc1b4..754c34da 100644 --- a/tests/testcases/integration.py +++ b/tests/testcases/integration.py @@ -24,6 +24,7 @@ import time import os import unittest import evdev +import json from unittest.mock import patch from importlib.util import spec_from_loader, module_from_spec from importlib.machinery import SourceFileLoader @@ -34,7 +35,7 @@ gi.require_version('Gtk', '3.0') from gi.repository import Gtk from keymapper.state import custom_mapping, system_mapping -from keymapper.paths import CONFIG +from keymapper.paths import CONFIG, get_config_path from keymapper.config import config from test import tmp, pending_events, Event, uinput_write_history_pipe, \ @@ -143,6 +144,24 @@ class TestIntegration(unittest.TestCase): [('device 1', 'new preset')] ) + def test_select_device(self): + # creates a new empty preset when no preset exists for the device + self.window.on_select_device(FakeDropdown('device 1')) + custom_mapping.change(50, 'q') + custom_mapping.change(51, 'u') + custom_mapping.change(52, 'x') + self.assertEqual(len(custom_mapping), 3) + self.window.on_select_device(FakeDropdown('device 2')) + self.assertEqual(len(custom_mapping), 0) + # it creates the file for that right away. It may have been possible + # to write it such that it doesn't (its empty anyway), but it does, + # so use that to test it in more detail. + path = get_config_path('device 2', 'new preset') + self.assertTrue(os.path.exists(path)) + with open(path, 'r') as file: + preset = json.load(file) + self.assertEqual(len(preset['mapping']), 0) + def test_can_start(self): self.assertIsNotNone(self.window) self.assertTrue(self.window.window.get_visible())