diff --git a/.coveragerc b/.coveragerc index 75c5eea5..8dd56205 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,5 +1,4 @@ [run] +concurrency = multiprocessing branch = True source = /usr/lib/python3.8/site-packages/keymapper -concurrency = multiprocessing - diff --git a/README.md b/README.md index 52823c52..a9d7bdee 100644 --- a/README.md +++ b/README.md @@ -99,5 +99,6 @@ sudo dpkg -i python3-key-mapper_0.1.0-1_all.deb ```bash pylint keymapper --extension-pkg-whitelist=evdev -sudo pip install . && python3 tests/test.py +sudo pip install . && coverage run tests/test.py +coverage combine && coverage report -m ``` diff --git a/keymapper/config.py b/keymapper/config.py index 987d67fd..6a8311c1 100644 --- a/keymapper/config.py +++ b/keymapper/config.py @@ -41,7 +41,8 @@ INITIAL_CONFIG = { 'keystroke_sleep_ms': 10 }, 'gamepad': { - 'non_linearity': 4 + 'non_linearity': 4, + 'pointer_speed': 80 } } diff --git a/keymapper/dev/injector.py b/keymapper/dev/injector.py index d83e5120..12a97844 100644 --- a/keymapper/dev/injector.py +++ b/keymapper/dev/injector.py @@ -29,6 +29,7 @@ import subprocess import multiprocessing import evdev +from evdev.ecodes import EV_KEY, EV_ABS, EV_REL from keymapper.logger import logger from keymapper.config import config @@ -69,9 +70,9 @@ def toggle_numlock(): name=f'key-mapper numlock-control', phys='key-mapper', ) - device.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_NUMLOCK, 1) + device.write(EV_KEY, evdev.ecodes.KEY_NUMLOCK, 1) device.syn() - device.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_NUMLOCK, 0) + device.write(EV_KEY, evdev.ecodes.KEY_NUMLOCK, 0) device.syn() @@ -129,12 +130,14 @@ class KeycodeInjector: capabilities = device.capabilities(absinfo=False) needed = False - for keycode, _ in self.mapping: - if keycode - KEYCODE_OFFSET in capabilities[evdev.ecodes.EV_KEY]: - needed = True - break - # TODO only if map ABS to REL keep ABS devics - if capabilities.get(evdev.ecodes.EV_REL) is not None: + if capabilities.get(EV_KEY) is not None: + for keycode, _ in self.mapping: + if keycode - KEYCODE_OFFSET in capabilities[EV_KEY]: + needed = True + break + + can_do_abs = evdev.ecodes.ABS_X in capabilities.get(EV_ABS, []) + if self.map_abs_to_rel() and can_do_abs: needed = True if not needed: @@ -155,14 +158,14 @@ class KeycodeInjector: break except IOError: attempts += 1 + # it might take a little time until the device is free if + # it was previously grabbed. logger.debug('Failed attemts to grab %s: %d', path, attempts) if attempts >= 4: logger.error('Cannot grab %s, it is possibly in use', path) return None - # it might take a little time until the device is free if - # it was previously grabbed. time.sleep(0.15) return device @@ -240,14 +243,30 @@ class KeycodeInjector: events=self._modify_capabilities(input_device) ) - coroutine = self._injection_loop(input_device, uinput) + # keycode injection + coroutine = self._keycode_loop(input_device, uinput) coroutines.append(coroutine) + # mouse movement injection + if self.map_abs_to_rel(): + self.abs_x = 0 + self.abs_y = 0 + # events only take ints, so a movement of 0.3 needs to add + # up to 1.2 to affect the cursor. + self.pending_x_rel = 0 + self.pending_y_rel = 0 + coroutine = self._movement_loop(input_device, uinput) + coroutines.append(coroutine) + if len(coroutines) == 0: - raise OSError('Could not grab any device') + logger.error('Did not grab any device') + return loop.run_until_complete(asyncio.gather(*coroutines)) + if len(coroutines) > 0: + logger.debug('asyncio coroutines ended') + def _write(self, device, type, keycode, value): """Actually inject.""" device.write(type, keycode, value) @@ -262,15 +281,20 @@ class KeycodeInjector: ) self._write( keymapper_device, - evdev.ecodes.EV_KEY, + EV_KEY, keycode - KEYCODE_OFFSET, value ) - async def spam_mouse_movements(self, keymapper_device, input_device): + async def _movement_loop(self, input_device, keymapper_device): """Keep writing mouse movements based on the gamepad stick position.""" - max_value = input_device.absinfo(evdev.ecodes.EV_ABS).max + logger.info('Mapping gamepad to mouse movements') + max_value = input_device.absinfo(EV_ABS).max max_speed = ((max_value ** 2) * 2) ** 0.5 + + pointer_speed = config.get('gamepad.pointer_speed', 80) + non_linearity = config.get('gamepad.non_linearity', 4) + while True: # this is part of the spawned process, so terminating that one # will also stop this loop @@ -279,7 +303,6 @@ class KeycodeInjector: abs_y = self.abs_y abs_x = self.abs_x - non_linearity = config.get('gamepad.non_linearity', 4) if non_linearity != 1: # to make small movements smaller for more precision speed = (abs_x ** 2 + abs_y ** 2) ** 0.5 @@ -287,8 +310,8 @@ class KeycodeInjector: else: factor = 1 - rel_x = abs_x * factor * 80 / max_value - rel_y = abs_y * factor * 80 / max_value + rel_x = abs_x * factor * pointer_speed / max_value + rel_y = abs_y * factor * pointer_speed / max_value self.pending_x_rel += rel_x self.pending_y_rel += rel_y @@ -300,7 +323,7 @@ class KeycodeInjector: if rel_y != 0: self._write( keymapper_device, - evdev.ecodes.EV_REL, + EV_REL, evdev.ecodes.ABS_Y, rel_y ) @@ -308,12 +331,12 @@ class KeycodeInjector: if rel_x != 0: self._write( keymapper_device, - evdev.ecodes.EV_REL, + EV_REL, evdev.ecodes.ABS_X, rel_x ) - async def _injection_loop(self, device, keymapper_device): + async def _keycode_loop(self, device, keymapper_device): """Inject keycodes for one of the virtual devices. Parameters @@ -339,20 +362,8 @@ class KeycodeInjector: keymapper_device.device.path, keymapper_device.fd ) - if self.map_abs_to_rel(): - self.abs_x = 0 - self.abs_y = 0 - # events only take ints, so a movement of 0.3 needs to add up to - # 1.2 to affect the cursor. - self.pending_x_rel = 0 - self.pending_y_rel = 0 - asyncio.ensure_future(self.spam_mouse_movements( - keymapper_device, - device - )) - async for event in device.async_read_loop(): - if self.map_abs_to_rel() and event.type == evdev.ecodes.EV_ABS: + if self.map_abs_to_rel() and event.type == EV_ABS: if event.code not in [evdev.ecodes.ABS_X, evdev.ecodes.ABS_Y]: continue if event.code == evdev.ecodes.ABS_X: @@ -361,7 +372,7 @@ class KeycodeInjector: self.abs_y = event.value continue - if event.type != evdev.ecodes.EV_KEY: + if event.type != EV_KEY: keymapper_device.write(event.type, event.code, event.value) # this already includes SYN events, so need to syn here again continue @@ -426,6 +437,6 @@ class KeycodeInjector: @ensure_numlock def stop_injecting(self): """Stop injecting keycodes.""" - logger.info('Stopping injecting keycodes for device %s', self.device) + logger.info('Stopping injecting keycodes for device "%s"', self.device) if self._process is not None and self._process.is_alive(): self._process.terminate() diff --git a/keymapper/getdevices.py b/keymapper/getdevices.py index faffd369..e3898639 100644 --- a/keymapper/getdevices.py +++ b/keymapper/getdevices.py @@ -28,6 +28,7 @@ import time import asyncio import evdev +from evdev.ecodes import EV_KEY, EV_ABS from keymapper.logger import logger @@ -81,7 +82,7 @@ class _GetDevices(threading.Thread): # only keyboard devices # https://www.kernel.org/doc/html/latest/input/event-codes.html capabilities = device.capabilities().keys() - if evdev.ecodes.EV_KEY not in capabilities: + if EV_KEY not in capabilities and EV_ABS not in capabilities: continue usb = device.phys.split('/')[0] diff --git a/tests/test.py b/tests/test.py index 5bf0b715..c7def240 100644 --- a/tests/test.py +++ b/tests/test.py @@ -43,6 +43,8 @@ sys.path = [os.path.abspath('.')] + sys.path # is still running EVENT_READ_TIMEOUT = 0.01 +MAX_ABS = 2 ** 15 + tmp = '/tmp/key-mapper-test' uinput_write_history = [] @@ -55,7 +57,7 @@ pending_events = {} fixtures = { # device 1 '/dev/input/event11': { - 'capabilities': {evdev.ecodes.EV_KEY: [], evdev.ecodes.EV_ABS: []}, + 'capabilities': {evdev.ecodes.EV_KEY: [], evdev.ecodes.EV_REL: []}, 'phys': 'usb-0000:03:00.0-1/input2', 'name': 'device 1 foo' }, @@ -82,12 +84,13 @@ fixtures = { 'name': 'device 2' }, - # devices that are completely ignored '/dev/input/event30': { - 'capabilities': {evdev.ecodes.EV_SYN: []}, + 'capabilities': {evdev.ecodes.EV_SYN: [], evdev.ecodes.EV_ABS: [0, 1]}, 'phys': 'usb-0000:03:00.0-3/input1', - 'name': 'device 3' + 'name': 'gamepad' }, + + # device that is completely ignored '/dev/input/event31': { 'capabilities': {evdev.ecodes.EV_SYN: []}, 'phys': 'usb-0000:03:00.0-4/input1', @@ -188,7 +191,7 @@ def patch_evdev(): return { evdev.ecodes.EV_ABS: evdev.AbsInfo( value=None, min=None, fuzz=None, flat=None, - resolution=None, max=2**15 + resolution=None, max=MAX_ABS ) }[axis] @@ -239,10 +242,12 @@ def patch_evdev(): class UInput: def __init__(self, *args, **kwargs): self.fd = 0 + self.write_count = 0 self.device = InputDevice('/dev/input/event40') pass def write(self, type, code, value): + self.write_count += 1 event = Event(type, code, value) uinput_write_history.append(event) uinput_write_history_pipe[1].send(event) diff --git a/tests/testcases/getdevices.py b/tests/testcases/getdevices.py index 990fdc0e..a3a0de09 100644 --- a/tests/testcases/getdevices.py +++ b/tests/testcases/getdevices.py @@ -52,6 +52,10 @@ class TestGetDevices(unittest.TestCase): 'paths': ['/dev/input/event20'], 'devices': ['device 2'] }, + 'gamepad': { + 'paths': ['/dev/input/event30'], + 'devices': ['gamepad'] + }, 'key-mapper device 2': { 'paths': ['/dev/input/event40'], 'devices': ['key-mapper device 2'] @@ -76,7 +80,11 @@ class TestGetDevices(unittest.TestCase): 'device 2': { 'paths': ['/dev/input/event20'], 'devices': ['device 2'] - } + }, + 'gamepad': { + 'paths': ['/dev/input/event30'], + 'devices': ['gamepad'] + }, }) diff --git a/tests/testcases/injector.py b/tests/testcases/injector.py index cd0e20d7..46c7e67f 100644 --- a/tests/testcases/injector.py +++ b/tests/testcases/injector.py @@ -23,15 +23,18 @@ import unittest import time import evdev +from evdev.ecodes import EV_REL, EV_KEY, EV_ABS from keymapper.dev.injector import is_numlock_on, toggle_numlock,\ ensure_numlock, KeycodeInjector from keymapper.state import custom_mapping, system_mapping, \ clear_system_mapping, KEYCODE_OFFSET from keymapper.mapping import Mapping +from keymapper.config import config from test import uinput_write_history, Event, pending_events, fixtures, \ - clear_write_history, EVENT_READ_TIMEOUT, uinput_write_history_pipe + clear_write_history, EVENT_READ_TIMEOUT, uinput_write_history_pipe, \ + MAX_ABS class TestInjector(unittest.TestCase): @@ -79,8 +82,8 @@ class TestInjector(unittest.TestCase): self.injector = KeycodeInjector('foo', mapping) capabilities = self.injector._modify_capabilities(FakeDevice()) - self.assertIn(evdev.ecodes.EV_KEY, capabilities) - keys = capabilities[evdev.ecodes.EV_KEY] + self.assertIn(EV_KEY, capabilities) + keys = capabilities[EV_KEY] self.assertEqual(keys[0], maps_to) self.assertNotIn(evdev.ecodes.EV_SYN, capabilities) @@ -114,10 +117,12 @@ class TestInjector(unittest.TestCase): path = '/dev/input/event11' device = self.injector._prepare_device(path) - # make sure the test uses a fixture without capabilities + # make sure the test uses a fixture without interesting capabilities capabilities = evdev.InputDevice(path).capabilities() - self.assertEqual(len(capabilities[evdev.ecodes.EV_KEY]), 0) + self.assertEqual(len(capabilities.get(EV_KEY, [])), 0) + self.assertEqual(len(capabilities.get(EV_ABS, [])), 0) + # skips the device alltogether, so no grab attempts fail self.assertEqual(self.failed, 0) self.assertIsNone(device) @@ -141,7 +146,55 @@ class TestInjector(unittest.TestCase): def test_abs_to_rel(self): # maps gamepad joystick events to mouse events # TODO enable this somewhere so that map_abs_to_rel returns true - pass + # in the .json file of the mapping. + config.set('gamepad.non_linearity', 1) + pointer_speed = 80 + config.set('gamepad.pointer_speed', pointer_speed) + + # same for ABS, 0 for x, 1 for y + rel_x = evdev.ecodes.REL_X + rel_y = evdev.ecodes.REL_Y + + # they need to sum up before something is written + divisor = 10 + x = MAX_ABS / pointer_speed / divisor + y = MAX_ABS / pointer_speed / divisor + pending_events['gamepad'] = [ + Event(EV_ABS, rel_x, x), + Event(EV_ABS, rel_y, y), + Event(EV_ABS, rel_x, -x), + Event(EV_ABS, rel_y, -y), + ] + + self.injector = KeycodeInjector('gamepad', custom_mapping) + self.injector.start_injecting() + + # wait for the injector to start sending, at most 1s + uinput_write_history_pipe[0].poll(1) + + # wait a bit more for it to sum up + sleep = 0.5 + time.sleep(sleep) + + # convert the write history to some easier to manage list + history = [] + while uinput_write_history_pipe[0].poll(): + event = uinput_write_history_pipe[0].recv() + history.append((event.type, event.code, event.value)) + + # movement is written at 60hz and it takes `divisor` steps to + # move 1px. take it times 2 for both x and y events. + self.assertGreater(len(history), 60 * sleep * 0.9 * 2 / divisor) + self.assertLess(len(history), 60 * sleep * 1.1 * 2 / divisor) + + # those may be in arbitrary order, the injector happens to write + # y first + self.assertEqual(history[-1][0], EV_REL) + self.assertEqual(history[-1][1], rel_x) + self.assertAlmostEqual(history[-1][2], -1) + self.assertEqual(history[-2][0], EV_REL) + self.assertEqual(history[-2][1], rel_y) + self.assertAlmostEqual(history[-2][2], -1) def test_injector(self): custom_mapping.change(8, 'k(q).k(w)') @@ -161,14 +214,14 @@ class TestInjector(unittest.TestCase): # keycode used in X and in the mappings pending_events['device 2'] = [ # should execute a macro - Event(evdev.events.EV_KEY, 0, 1), - Event(evdev.events.EV_KEY, 0, 0), + Event(EV_KEY, 0, 1), + Event(EV_KEY, 0, 0), # normal keystroke - Event(evdev.events.EV_KEY, 1, 1), - Event(evdev.events.EV_KEY, 1, 0), + Event(EV_KEY, 1, 1), + Event(EV_KEY, 1, 0), # ignored because unknown to the system - Event(evdev.events.EV_KEY, 2, 1), - Event(evdev.events.EV_KEY, 2, 0), + Event(EV_KEY, 2, 1), + Event(EV_KEY, 2, 0), # just pass those over without modifying Event(3124, 3564, 6542), ] @@ -192,7 +245,7 @@ class TestInjector(unittest.TestCase): # since the macro takes a little bit of time to execute, its # keystrokes are all over the place. # just check if they are there and if so, remove them from the list. - ev_key = evdev.events.EV_KEY + ev_key = EV_KEY self.assertIn((ev_key, code_q - KEYCODE_OFFSET, 1), history) self.assertIn((ev_key, code_q - KEYCODE_OFFSET, 0), history) self.assertIn((ev_key, code_w - KEYCODE_OFFSET, 1), history)