diff --git a/README.md b/README.md index 92498518..9138bc27 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,6 @@ sudo python3 setup.py install && python3 tests/test.py - [x] highlight changes and alert before discarding unsaved changes - [x] automatically load presets on login for plugged in devices - [x] make sure it works on wayland +- [ ] support timed macros, maybe using some sort of syntax - [ ] add to the AUR, provide .deb and .appimage files - [ ] automatically load presets when devices get plugged in after login -- [ ] support timed macros, maybe using some sort of syntax diff --git a/keymapper/daemon.py b/keymapper/daemon.py index a637ae49..924d6134 100644 --- a/keymapper/daemon.py +++ b/keymapper/daemon.py @@ -84,11 +84,11 @@ class Daemon(service.Object): self.injectors = {} if autoload: for device, preset in config.iterate_autoload_presets(): - print(device, preset) mapping = Mapping() mapping.load(device, preset) try: injector = KeycodeInjector(device, mapping) + injector.start_injecting() self.injectors[device] = injector except OSError as error: logger.error(error) @@ -133,7 +133,9 @@ class Daemon(service.Object): mapping = Mapping() mapping.load(device, preset) try: - self.injectors[device] = KeycodeInjector(device, mapping) + injector = KeycodeInjector(device, mapping) + injector.start_injecting() + self.injectors[device] = injector except OSError: return False diff --git a/keymapper/dev/injector.py b/keymapper/dev/injector.py index a212f038..7626e1fe 100644 --- a/keymapper/dev/injector.py +++ b/keymapper/dev/injector.py @@ -26,9 +26,6 @@ import re import asyncio import time import subprocess -# By using processes instead of threads, the mappings are -# automatically copied, so that they can be worked with in the ui -# without breaking the device. And it's possible to terminate processes. import multiprocessing import evdev @@ -47,152 +44,6 @@ DEVICE_SKIPPED = 3 KEYCODE_OFFSET = 8 -def _grab(path): - """Try to grab, repeat a few times with time inbetween on failure.""" - attempts = 0 - while True: - device = evdev.InputDevice(path) - try: - # grab to avoid e.g. the disabled keycode of 10 to confuse - # X, especially when one of the buttons of your mouse also - # uses 10. This also avoids having to load an empty xkb - # symbols file to prevent writing any unwanted keys. - device.grab() - break - except IOError: - attempts += 1 - logger.debug('Failed attemt to grab %s %d', path, attempts) - - if attempts >= 4: - logger.error('Cannot grab %s, it is possibly in use', path) - return None - - # it might take a little time until the device is free if - # it was previously grabbed. - time.sleep(0.15) - return device - - -def _modify_capabilities(device): - """Adds all keycode into a copy of a devices capabilities.""" - # copy the capabilities because the keymapper_device is going - # to act like the device. - capabilities = device.capabilities(absinfo=False) - # However, make sure that it supports all keycodes, not just some - # random ones, because the mapping could contain anything. - # That's why I avoid from_device for this - capabilities[evdev.ecodes.EV_KEY] = list(evdev.ecodes.keys.keys()) - - # just like what python-evdev does in from_device - if evdev.ecodes.EV_SYN in capabilities: - del capabilities[evdev.ecodes.EV_SYN] - if evdev.ecodes.EV_FF in capabilities: - del capabilities[evdev.ecodes.EV_FF] - - return capabilities - - -def _is_device_mapped(device, mapping): - """Check if this device has capabilities that are being mapped. - - Parameters - ---------- - device : evdev.InputDevice - mapping : Mapping - """ - capabilities = device.capabilities(absinfo=False)[evdev.ecodes.EV_KEY] - needed = False - for keycode, _ in mapping: - if keycode - KEYCODE_OFFSET in capabilities: - needed = True - if not needed: - logger.debug('No need to grab %s', device.path) - return needed - - -def _start_injecting_worker(path, pipe, mapping): - """Inject keycodes for one of the virtual devices. - - Parameters - ---------- - path : string - path in /dev to read keycodes from - pipe : multiprocessing.Pipe - pipe to send status codes over - """ - # evdev needs asyncio to work - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - try: - device = _grab(path) - except Exception as error: - logger.error(error) - pipe.send(FAILED) - return - - if device is None: - pipe.send(FAILED) - return - - if not _is_device_mapped(device, mapping): - # skipping reading and checking on events from those devices - # may be beneficial for performance. - pipe.send(DEVICE_SKIPPED) - return - - keymapper_device = evdev.UInput( - name=f'key-mapper {device.name}', - phys='key-mapper', - events=_modify_capabilities(device) - ) - - pipe.send(DEVICE_CREATED) - - logger.debug( - 'Started injecting into %s, fd %s', - keymapper_device.device.path, keymapper_device.fd - ) - - for event in device.read_loop(): - if event.type != evdev.ecodes.EV_KEY: - keymapper_device.write(event.type, event.code, event.value) - # this already includes SYN events, so need to syn here again - continue - - if event.value == 2: - # linux does them itself, no need to trigger them - continue - - input_keycode = event.code + KEYCODE_OFFSET - - character = mapping.get_character(input_keycode) - - if character is None: - # unknown keycode, forward it - target_keycode = input_keycode - else: - target_keycode = system_mapping.get_keycode(character) - if target_keycode is None: - logger.error( - 'Cannot find character %s in the internal mapping', - character - ) - continue - - logger.spam( - 'got code:%s value:%s, maps to code:%s char:%s', - event.code + KEYCODE_OFFSET, event.value, target_keycode, character - ) - - keymapper_device.write( - evdev.ecodes.EV_KEY, - target_keycode - KEYCODE_OFFSET, - event.value - ) - keymapper_device.syn() - - def is_numlock_on(): """Get the current state of the numlock.""" xset_q = subprocess.check_output(['xset', 'q']).decode() @@ -241,13 +92,106 @@ def ensure_numlock(func): class KeycodeInjector: - """Keeps injecting keycodes in the background based on the mapping.""" + """Keeps injecting keycodes in the background based on the mapping. + + Is a process to make it non-blocking for the rest of the code and to + make running multiple injector easier. There is one procss per + hardware-device that is being mapped. + """ @ensure_numlock - def __init__(self, device, mapping=custom_mapping): - """Start injecting keycodes based on custom_mapping.""" + def __init__(self, device, mapping): + """Start injecting keycodes based on custom_mapping. + + Parameters + ---------- + device : string + the name of the device as available in get_device + """ self.device = device - self.virtual_devices = [] - self.processes = [] + self.mapping = mapping + self._process = None + + def start_injecting(self): + """Start injecting keycodes.""" + self._process = multiprocessing.Process(target=self._start_injecting) + self._process.start() + + def _prepare_device(self, path): + """Try to grab the device, return if not needed/possible.""" + device = evdev.InputDevice(path) + + if device is None: + return None + + capabilities = device.capabilities(absinfo=False)[evdev.ecodes.EV_KEY] + + needed = False + for keycode, _ in self.mapping: + if keycode - KEYCODE_OFFSET in capabilities: + needed = True + break + + if not needed: + # skipping reading and checking on events from those devices + # may be beneficial for performance. + logger.debug('No need to grab %s', device.path) + return None + + attempts = 0 + while True: + device = evdev.InputDevice(path) + try: + # grab to avoid e.g. the disabled keycode of 10 to confuse + # X, especially when one of the buttons of your mouse also + # uses 10. This also avoids having to load an empty xkb + # symbols file to prevent writing any unwanted keys. + device.grab() + break + except IOError: + attempts += 1 + logger.debug('Failed attemt to grab %s %d', path, attempts) + + if attempts >= 4: + logger.error('Cannot grab %s, it is possibly in use', path) + return None + + # it might take a little time until the device is free if + # it was previously grabbed. + time.sleep(0.15) + + return device + + def _modify_capabilities(self, input_device): + """Adds all keycode into a copy of a devices capabilities. + + Prameters + --------- + input_device : evdev.InputDevice + """ + # copy the capabilities because the keymapper_device is going + # to act like the device. + capabilities = input_device.capabilities(absinfo=False) + # However, make sure that it supports all keycodes, not just some + # random ones, because the mapping could contain anything. + # That's why I avoid from_device for this + capabilities[evdev.ecodes.EV_KEY] = list(evdev.ecodes.keys.keys()) + + # just like what python-evdev does in from_device + if evdev.ecodes.EV_SYN in capabilities: + del capabilities[evdev.ecodes.EV_SYN] + if evdev.ecodes.EV_FF in capabilities: + del capabilities[evdev.ecodes.EV_FF] + + return capabilities + + def _start_injecting(self): + """The injection worker that keeps injecting until terminated. + + Stuff is non-blocking by using asyncio in order to do multiple things + somewhat concurrently. + """ + loop = asyncio.get_event_loop() + coroutines = [] paths = get_devices()[self.device]['paths'] @@ -255,31 +199,84 @@ class KeycodeInjector: # Watch over each one of the potentially multiple devices per hardware for path in paths: - pipe = multiprocessing.Pipe() - worker = multiprocessing.Process( - target=_start_injecting_worker, - args=(path, pipe[1], mapping) + input_device = self._prepare_device(path) + if input_device is None: + continue + + uinput = evdev.UInput( + name=f'key-mapper {input_device.name}', + phys='key-mapper', + events=self._modify_capabilities(input_device) ) - worker.start() - # wait for the process to notify creation of the new injection - # device, to keep the logs in order. - status = pipe[0].recv() - if status == DEVICE_CREATED: - self.processes.append(worker) - else: - worker.join() + coroutine = self._injection_loop(input_device, uinput) + coroutines.append(coroutine) - if len(self.processes) == 0: + if len(coroutines) == 0: raise OSError('Could not grab any device') + loop.run_until_complete(asyncio.gather(*coroutines)) + + async def _injection_loop(self, device, keymapper_device): + """Inject keycodes for one of the virtual devices. + + Parameters + ---------- + device : evdev.InputDevice + where to read keycodes from + keymapper_device : evdev.UInput + where to write keycodes to + mapping : Mapping + to figure out which keycodes to write + """ + logger.debug( + 'Started injecting into %s, fd %s', + keymapper_device.device.path, keymapper_device.fd + ) + + async for event in device.async_read_loop(): + if event.type != evdev.ecodes.EV_KEY: + keymapper_device.write(event.type, event.code, event.value) + # this already includes SYN events, so need to syn here again + continue + + if event.value == 2: + # linux does them itself, no need to trigger them + continue + + input_keycode = event.code + KEYCODE_OFFSET + + character = self.mapping.get_character(input_keycode) + + if character is None: + # unknown keycode, forward it + target_keycode = input_keycode + else: + target_keycode = system_mapping.get_keycode(character) + if target_keycode is None: + logger.error( + 'Cannot find character %s in the internal mapping', + character + ) + continue + + logger.spam( + 'got code:%s value:%s, maps to code:%s char:%s', + event.code + KEYCODE_OFFSET, + event.value, + target_keycode, + character + ) + + keymapper_device.write( + evdev.ecodes.EV_KEY, + target_keycode - KEYCODE_OFFSET, + event.value + ) + keymapper_device.syn() + @ensure_numlock def stop_injecting(self): """Stop injecting keycodes.""" logger.info('Stopping injecting keycodes for device %s', self.device) - for i, process in enumerate(self.processes): - if process is None: - continue - - if process.is_alive(): - process.terminate() - self.processes[i] = None + if self._process is not None and self._process.is_alive(): + self._process.terminate() diff --git a/keymapper/dev/macros.py b/keymapper/dev/macros.py index dc40d7e9..4c8cd279 100644 --- a/keymapper/dev/macros.py +++ b/keymapper/dev/macros.py @@ -67,10 +67,15 @@ class Macro: self.tasks = [] def run(self): - """Run the macro""" + """Run the macro.""" for task in self.tasks: + # TODO async, don't block the rest of the application task() + def stop(self): + """Stop the macro.""" + # TODO + def m(self, modifier, macro): """Do stuff while a modifier is activated. @@ -96,12 +101,8 @@ class Macro: self.tasks.append(macro.run) return self - def k(self, character, value=None): - """Write the character. - - Parameters - ---------- - """ + def k(self, character): + """Write the character.""" # TODO write character self.tasks.append(lambda: print(character)) return self @@ -125,3 +126,5 @@ print() w(400).m('SHIFT_L', r(2, k('a'))).w(10).k('b').run() print() +# prints nothing yet +k('a').r(3, k('b')) diff --git a/keymapper/mapping.py b/keymapper/mapping.py index 89750bd8..0ae6a747 100644 --- a/keymapper/mapping.py +++ b/keymapper/mapping.py @@ -25,6 +25,7 @@ import os import json import shutil +import copy from keymapper.logger import logger from keymapper.paths import get_config_path @@ -157,6 +158,15 @@ class Mapping: self.changed = False + def clone(self): + """Create a copy of the mapping.""" + # TODO test + mapping = Mapping() + mapping._mapping = copy.deepcopy(self._mapping) + mapping.update_reverse_mapping() + mapping.changed = self.changed + return Mapping() + def save(self, device, preset): """Dump as JSON into home.""" path = get_config_path(device, preset) diff --git a/tests/test.py b/tests/test.py index 052da715..a75a446a 100644 --- a/tests/test.py +++ b/tests/test.py @@ -26,6 +26,7 @@ import sys import time import unittest import multiprocessing +import asyncio import evdev @@ -169,6 +170,15 @@ def patch_evdev(): # is still running time.sleep(0.01) + async def async_read_loop(self): + """Read all prepared events at once.""" + if pending_events.get(self.name) is None: + return + + while len(pending_events[self.name]) > 0: + yield pending_events[self.name].pop(0) + await asyncio.sleep(0.01) + def capabilities(self, absinfo=True): return fixtures[self.path]['capabilities'] diff --git a/tests/testcases/injector.py b/tests/testcases/injector.py index 6c90c1e2..2c5255b2 100644 --- a/tests/testcases/injector.py +++ b/tests/testcases/injector.py @@ -23,11 +23,10 @@ import unittest import evdev -from keymapper.dev.injector import _start_injecting_worker, _grab, \ - is_numlock_on, toggle_numlock, ensure_numlock, _modify_capabilities, \ - KeycodeInjector -from keymapper.getdevices import get_devices +from keymapper.dev.injector import is_numlock_on, toggle_numlock,\ + ensure_numlock, KeycodeInjector from keymapper.state import custom_mapping, system_mapping +from keymapper.mapping import Mapping from test import uinput_write_history, Event, pending_events, fixtures @@ -65,7 +64,8 @@ class TestInjector(unittest.TestCase): evdev.ecodes.EV_FF: [1, 2, 3] } - capabilities = _modify_capabilities(FakeDevice()) + self.injector = KeycodeInjector('foo', Mapping()) + capabilities = self.injector._modify_capabilities(FakeDevice()) self.assertIn(evdev.ecodes.EV_KEY, capabilities) self.assertIsInstance(capabilities[evdev.ecodes.EV_KEY], list) @@ -76,12 +76,39 @@ class TestInjector(unittest.TestCase): def test_grab(self): # path is from the fixtures - path = '/dev/input/event11' - device = _grab(path) + custom_mapping.change(10, 'a') + + self.injector = KeycodeInjector('device 1', custom_mapping) + path = '/dev/input/event10' + # this test needs to pass around all other constraints of + # _prepare_device + device = self.injector._prepare_device(path) self.assertEqual(self.failed, 2) # success on the third try device.name = fixtures[path]['name'] + def test_skip_unused_device(self): + # skips a device because its capabilities are not used in the mapping + custom_mapping.change(10, 'a') + self.injector = KeycodeInjector('device 1', custom_mapping) + path = '/dev/input/event11' + device = self.injector._prepare_device(path) + self.assertEqual(self.failed, 0) + self.assertIsNone(device) + + def test_skip_unknown_device(self): + # skips a device because its capabilities are not used in the mapping + self.injector = KeycodeInjector('device 1', custom_mapping) + path = '/dev/input/event11' + device = self.injector._prepare_device(path) + + # make sure the test uses a fixture without capabilities + capabilities = evdev.InputDevice(path).capabilities() + self.assertEqual(len(capabilities[evdev.ecodes.EV_KEY]), 0) + + self.assertEqual(self.failed, 0) + self.assertIsNone(device) + def test_numlock(self): before = is_numlock_on() @@ -99,28 +126,7 @@ class TestInjector(unittest.TestCase): toggle_numlock() self.assertEqual(before, is_numlock_on()) - def test_injector_constructor(self): - # this buys some time to test if the process is alive. 2 (20ms) would - # already be enough - pending_events['device 2'] = [Event(1, 31, 1)] * 10 - - injector2 = None - try: - injector2 = KeycodeInjector('device 2') - self.assertEqual(len(injector2.processes), 1) - self.assertEqual(injector2.processes[0].is_alive(), True) - injector2.processes[0].join() - except Exception as error: - # make sure to not cause race conditions for other tests - # if this test fails - if injector2 is not None: - for p in injector2.processes: - p.join() - raise error - def test_injector(self): - device = get_devices()['device 2'] - custom_mapping.change(9, 'a') # one mapping that is unknown in the system_mapping on purpose custom_mapping.change(10, 'b') @@ -140,15 +146,11 @@ class TestInjector(unittest.TestCase): Event(3124, 3564, 6542), ] - class FakePipe: - def send(self, message): - pass + self.injector = KeycodeInjector('device 2', custom_mapping) + # don't start the process for coverage testing purposes + self.injector._start_injecting() - _start_injecting_worker( - path=device['paths'][0], - pipe=FakePipe(), - mapping=custom_mapping - ) + self.assertEqual(len(uinput_write_history), 3) self.assertEqual(uinput_write_history[0].type, evdev.events.EV_KEY) self.assertEqual(uinput_write_history[0].code, 92)