more tests

This commit is contained in:
sezanzeb 2020-11-30 22:42:53 +01:00
parent 1af1ee7bf7
commit d1aceaf777
8 changed files with 139 additions and 60 deletions

View File

@ -1,5 +1,4 @@
[run] [run]
concurrency = multiprocessing
branch = True branch = True
source = /usr/lib/python3.8/site-packages/keymapper source = /usr/lib/python3.8/site-packages/keymapper
concurrency = multiprocessing

View File

@ -99,5 +99,6 @@ sudo dpkg -i python3-key-mapper_0.1.0-1_all.deb
```bash ```bash
pylint keymapper --extension-pkg-whitelist=evdev pylint keymapper --extension-pkg-whitelist=evdev
sudo pip install . && python3 tests/test.py sudo pip install . && coverage run tests/test.py
coverage combine && coverage report -m
``` ```

View File

@ -41,7 +41,8 @@ INITIAL_CONFIG = {
'keystroke_sleep_ms': 10 'keystroke_sleep_ms': 10
}, },
'gamepad': { 'gamepad': {
'non_linearity': 4 'non_linearity': 4,
'pointer_speed': 80
} }
} }

View File

@ -29,6 +29,7 @@ import subprocess
import multiprocessing import multiprocessing
import evdev import evdev
from evdev.ecodes import EV_KEY, EV_ABS, EV_REL
from keymapper.logger import logger from keymapper.logger import logger
from keymapper.config import config from keymapper.config import config
@ -69,9 +70,9 @@ def toggle_numlock():
name=f'key-mapper numlock-control', name=f'key-mapper numlock-control',
phys='key-mapper', phys='key-mapper',
) )
device.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_NUMLOCK, 1) device.write(EV_KEY, evdev.ecodes.KEY_NUMLOCK, 1)
device.syn() device.syn()
device.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_NUMLOCK, 0) device.write(EV_KEY, evdev.ecodes.KEY_NUMLOCK, 0)
device.syn() device.syn()
@ -129,12 +130,14 @@ class KeycodeInjector:
capabilities = device.capabilities(absinfo=False) capabilities = device.capabilities(absinfo=False)
needed = False needed = False
if capabilities.get(EV_KEY) is not None:
for keycode, _ in self.mapping: for keycode, _ in self.mapping:
if keycode - KEYCODE_OFFSET in capabilities[evdev.ecodes.EV_KEY]: if keycode - KEYCODE_OFFSET in capabilities[EV_KEY]:
needed = True needed = True
break break
# TODO only if map ABS to REL keep ABS devics
if capabilities.get(evdev.ecodes.EV_REL) is not None: can_do_abs = evdev.ecodes.ABS_X in capabilities.get(EV_ABS, [])
if self.map_abs_to_rel() and can_do_abs:
needed = True needed = True
if not needed: if not needed:
@ -155,14 +158,14 @@ class KeycodeInjector:
break break
except IOError: except IOError:
attempts += 1 attempts += 1
# it might take a little time until the device is free if
# it was previously grabbed.
logger.debug('Failed attemts to grab %s: %d', path, attempts) logger.debug('Failed attemts to grab %s: %d', path, attempts)
if attempts >= 4: if attempts >= 4:
logger.error('Cannot grab %s, it is possibly in use', path) logger.error('Cannot grab %s, it is possibly in use', path)
return None return None
# it might take a little time until the device is free if
# it was previously grabbed.
time.sleep(0.15) time.sleep(0.15)
return device return device
@ -240,14 +243,30 @@ class KeycodeInjector:
events=self._modify_capabilities(input_device) events=self._modify_capabilities(input_device)
) )
coroutine = self._injection_loop(input_device, uinput) # keycode injection
coroutine = self._keycode_loop(input_device, uinput)
coroutines.append(coroutine)
# mouse movement injection
if self.map_abs_to_rel():
self.abs_x = 0
self.abs_y = 0
# events only take ints, so a movement of 0.3 needs to add
# up to 1.2 to affect the cursor.
self.pending_x_rel = 0
self.pending_y_rel = 0
coroutine = self._movement_loop(input_device, uinput)
coroutines.append(coroutine) coroutines.append(coroutine)
if len(coroutines) == 0: if len(coroutines) == 0:
raise OSError('Could not grab any device') logger.error('Did not grab any device')
return
loop.run_until_complete(asyncio.gather(*coroutines)) loop.run_until_complete(asyncio.gather(*coroutines))
if len(coroutines) > 0:
logger.debug('asyncio coroutines ended')
def _write(self, device, type, keycode, value): def _write(self, device, type, keycode, value):
"""Actually inject.""" """Actually inject."""
device.write(type, keycode, value) device.write(type, keycode, value)
@ -262,15 +281,20 @@ class KeycodeInjector:
) )
self._write( self._write(
keymapper_device, keymapper_device,
evdev.ecodes.EV_KEY, EV_KEY,
keycode - KEYCODE_OFFSET, keycode - KEYCODE_OFFSET,
value value
) )
async def spam_mouse_movements(self, keymapper_device, input_device): async def _movement_loop(self, input_device, keymapper_device):
"""Keep writing mouse movements based on the gamepad stick position.""" """Keep writing mouse movements based on the gamepad stick position."""
max_value = input_device.absinfo(evdev.ecodes.EV_ABS).max logger.info('Mapping gamepad to mouse movements')
max_value = input_device.absinfo(EV_ABS).max
max_speed = ((max_value ** 2) * 2) ** 0.5 max_speed = ((max_value ** 2) * 2) ** 0.5
pointer_speed = config.get('gamepad.pointer_speed', 80)
non_linearity = config.get('gamepad.non_linearity', 4)
while True: while True:
# this is part of the spawned process, so terminating that one # this is part of the spawned process, so terminating that one
# will also stop this loop # will also stop this loop
@ -279,7 +303,6 @@ class KeycodeInjector:
abs_y = self.abs_y abs_y = self.abs_y
abs_x = self.abs_x abs_x = self.abs_x
non_linearity = config.get('gamepad.non_linearity', 4)
if non_linearity != 1: if non_linearity != 1:
# to make small movements smaller for more precision # to make small movements smaller for more precision
speed = (abs_x ** 2 + abs_y ** 2) ** 0.5 speed = (abs_x ** 2 + abs_y ** 2) ** 0.5
@ -287,8 +310,8 @@ class KeycodeInjector:
else: else:
factor = 1 factor = 1
rel_x = abs_x * factor * 80 / max_value rel_x = abs_x * factor * pointer_speed / max_value
rel_y = abs_y * factor * 80 / max_value rel_y = abs_y * factor * pointer_speed / max_value
self.pending_x_rel += rel_x self.pending_x_rel += rel_x
self.pending_y_rel += rel_y self.pending_y_rel += rel_y
@ -300,7 +323,7 @@ class KeycodeInjector:
if rel_y != 0: if rel_y != 0:
self._write( self._write(
keymapper_device, keymapper_device,
evdev.ecodes.EV_REL, EV_REL,
evdev.ecodes.ABS_Y, evdev.ecodes.ABS_Y,
rel_y rel_y
) )
@ -308,12 +331,12 @@ class KeycodeInjector:
if rel_x != 0: if rel_x != 0:
self._write( self._write(
keymapper_device, keymapper_device,
evdev.ecodes.EV_REL, EV_REL,
evdev.ecodes.ABS_X, evdev.ecodes.ABS_X,
rel_x rel_x
) )
async def _injection_loop(self, device, keymapper_device): async def _keycode_loop(self, device, keymapper_device):
"""Inject keycodes for one of the virtual devices. """Inject keycodes for one of the virtual devices.
Parameters Parameters
@ -339,20 +362,8 @@ class KeycodeInjector:
keymapper_device.device.path, keymapper_device.fd keymapper_device.device.path, keymapper_device.fd
) )
if self.map_abs_to_rel():
self.abs_x = 0
self.abs_y = 0
# events only take ints, so a movement of 0.3 needs to add up to
# 1.2 to affect the cursor.
self.pending_x_rel = 0
self.pending_y_rel = 0
asyncio.ensure_future(self.spam_mouse_movements(
keymapper_device,
device
))
async for event in device.async_read_loop(): async for event in device.async_read_loop():
if self.map_abs_to_rel() and event.type == evdev.ecodes.EV_ABS: if self.map_abs_to_rel() and event.type == EV_ABS:
if event.code not in [evdev.ecodes.ABS_X, evdev.ecodes.ABS_Y]: if event.code not in [evdev.ecodes.ABS_X, evdev.ecodes.ABS_Y]:
continue continue
if event.code == evdev.ecodes.ABS_X: if event.code == evdev.ecodes.ABS_X:
@ -361,7 +372,7 @@ class KeycodeInjector:
self.abs_y = event.value self.abs_y = event.value
continue continue
if event.type != evdev.ecodes.EV_KEY: if event.type != EV_KEY:
keymapper_device.write(event.type, event.code, event.value) keymapper_device.write(event.type, event.code, event.value)
# this already includes SYN events, so need to syn here again # this already includes SYN events, so need to syn here again
continue continue
@ -426,6 +437,6 @@ class KeycodeInjector:
@ensure_numlock @ensure_numlock
def stop_injecting(self): def stop_injecting(self):
"""Stop injecting keycodes.""" """Stop injecting keycodes."""
logger.info('Stopping injecting keycodes for device %s', self.device) logger.info('Stopping injecting keycodes for device "%s"', self.device)
if self._process is not None and self._process.is_alive(): if self._process is not None and self._process.is_alive():
self._process.terminate() self._process.terminate()

View File

@ -28,6 +28,7 @@ import time
import asyncio import asyncio
import evdev import evdev
from evdev.ecodes import EV_KEY, EV_ABS
from keymapper.logger import logger from keymapper.logger import logger
@ -81,7 +82,7 @@ class _GetDevices(threading.Thread):
# only keyboard devices # only keyboard devices
# https://www.kernel.org/doc/html/latest/input/event-codes.html # https://www.kernel.org/doc/html/latest/input/event-codes.html
capabilities = device.capabilities().keys() capabilities = device.capabilities().keys()
if evdev.ecodes.EV_KEY not in capabilities: if EV_KEY not in capabilities and EV_ABS not in capabilities:
continue continue
usb = device.phys.split('/')[0] usb = device.phys.split('/')[0]

View File

@ -43,6 +43,8 @@ sys.path = [os.path.abspath('.')] + sys.path
# is still running # is still running
EVENT_READ_TIMEOUT = 0.01 EVENT_READ_TIMEOUT = 0.01
MAX_ABS = 2 ** 15
tmp = '/tmp/key-mapper-test' tmp = '/tmp/key-mapper-test'
uinput_write_history = [] uinput_write_history = []
@ -55,7 +57,7 @@ pending_events = {}
fixtures = { fixtures = {
# device 1 # device 1
'/dev/input/event11': { '/dev/input/event11': {
'capabilities': {evdev.ecodes.EV_KEY: [], evdev.ecodes.EV_ABS: []}, 'capabilities': {evdev.ecodes.EV_KEY: [], evdev.ecodes.EV_REL: []},
'phys': 'usb-0000:03:00.0-1/input2', 'phys': 'usb-0000:03:00.0-1/input2',
'name': 'device 1 foo' 'name': 'device 1 foo'
}, },
@ -82,12 +84,13 @@ fixtures = {
'name': 'device 2' 'name': 'device 2'
}, },
# devices that are completely ignored
'/dev/input/event30': { '/dev/input/event30': {
'capabilities': {evdev.ecodes.EV_SYN: []}, 'capabilities': {evdev.ecodes.EV_SYN: [], evdev.ecodes.EV_ABS: [0, 1]},
'phys': 'usb-0000:03:00.0-3/input1', 'phys': 'usb-0000:03:00.0-3/input1',
'name': 'device 3' 'name': 'gamepad'
}, },
# device that is completely ignored
'/dev/input/event31': { '/dev/input/event31': {
'capabilities': {evdev.ecodes.EV_SYN: []}, 'capabilities': {evdev.ecodes.EV_SYN: []},
'phys': 'usb-0000:03:00.0-4/input1', 'phys': 'usb-0000:03:00.0-4/input1',
@ -188,7 +191,7 @@ def patch_evdev():
return { return {
evdev.ecodes.EV_ABS: evdev.AbsInfo( evdev.ecodes.EV_ABS: evdev.AbsInfo(
value=None, min=None, fuzz=None, flat=None, value=None, min=None, fuzz=None, flat=None,
resolution=None, max=2**15 resolution=None, max=MAX_ABS
) )
}[axis] }[axis]
@ -239,10 +242,12 @@ def patch_evdev():
class UInput: class UInput:
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.fd = 0 self.fd = 0
self.write_count = 0
self.device = InputDevice('/dev/input/event40') self.device = InputDevice('/dev/input/event40')
pass pass
def write(self, type, code, value): def write(self, type, code, value):
self.write_count += 1
event = Event(type, code, value) event = Event(type, code, value)
uinput_write_history.append(event) uinput_write_history.append(event)
uinput_write_history_pipe[1].send(event) uinput_write_history_pipe[1].send(event)

View File

@ -52,6 +52,10 @@ class TestGetDevices(unittest.TestCase):
'paths': ['/dev/input/event20'], 'paths': ['/dev/input/event20'],
'devices': ['device 2'] 'devices': ['device 2']
}, },
'gamepad': {
'paths': ['/dev/input/event30'],
'devices': ['gamepad']
},
'key-mapper device 2': { 'key-mapper device 2': {
'paths': ['/dev/input/event40'], 'paths': ['/dev/input/event40'],
'devices': ['key-mapper device 2'] 'devices': ['key-mapper device 2']
@ -76,7 +80,11 @@ class TestGetDevices(unittest.TestCase):
'device 2': { 'device 2': {
'paths': ['/dev/input/event20'], 'paths': ['/dev/input/event20'],
'devices': ['device 2'] 'devices': ['device 2']
} },
'gamepad': {
'paths': ['/dev/input/event30'],
'devices': ['gamepad']
},
}) })

View File

@ -23,15 +23,18 @@ import unittest
import time import time
import evdev import evdev
from evdev.ecodes import EV_REL, EV_KEY, EV_ABS
from keymapper.dev.injector import is_numlock_on, toggle_numlock,\ from keymapper.dev.injector import is_numlock_on, toggle_numlock,\
ensure_numlock, KeycodeInjector ensure_numlock, KeycodeInjector
from keymapper.state import custom_mapping, system_mapping, \ from keymapper.state import custom_mapping, system_mapping, \
clear_system_mapping, KEYCODE_OFFSET clear_system_mapping, KEYCODE_OFFSET
from keymapper.mapping import Mapping from keymapper.mapping import Mapping
from keymapper.config import config
from test import uinput_write_history, Event, pending_events, fixtures, \ from test import uinput_write_history, Event, pending_events, fixtures, \
clear_write_history, EVENT_READ_TIMEOUT, uinput_write_history_pipe clear_write_history, EVENT_READ_TIMEOUT, uinput_write_history_pipe, \
MAX_ABS
class TestInjector(unittest.TestCase): class TestInjector(unittest.TestCase):
@ -79,8 +82,8 @@ class TestInjector(unittest.TestCase):
self.injector = KeycodeInjector('foo', mapping) self.injector = KeycodeInjector('foo', mapping)
capabilities = self.injector._modify_capabilities(FakeDevice()) capabilities = self.injector._modify_capabilities(FakeDevice())
self.assertIn(evdev.ecodes.EV_KEY, capabilities) self.assertIn(EV_KEY, capabilities)
keys = capabilities[evdev.ecodes.EV_KEY] keys = capabilities[EV_KEY]
self.assertEqual(keys[0], maps_to) self.assertEqual(keys[0], maps_to)
self.assertNotIn(evdev.ecodes.EV_SYN, capabilities) self.assertNotIn(evdev.ecodes.EV_SYN, capabilities)
@ -114,10 +117,12 @@ class TestInjector(unittest.TestCase):
path = '/dev/input/event11' path = '/dev/input/event11'
device = self.injector._prepare_device(path) device = self.injector._prepare_device(path)
# make sure the test uses a fixture without capabilities # make sure the test uses a fixture without interesting capabilities
capabilities = evdev.InputDevice(path).capabilities() capabilities = evdev.InputDevice(path).capabilities()
self.assertEqual(len(capabilities[evdev.ecodes.EV_KEY]), 0) self.assertEqual(len(capabilities.get(EV_KEY, [])), 0)
self.assertEqual(len(capabilities.get(EV_ABS, [])), 0)
# skips the device alltogether, so no grab attempts fail
self.assertEqual(self.failed, 0) self.assertEqual(self.failed, 0)
self.assertIsNone(device) self.assertIsNone(device)
@ -141,7 +146,55 @@ class TestInjector(unittest.TestCase):
def test_abs_to_rel(self): def test_abs_to_rel(self):
# maps gamepad joystick events to mouse events # maps gamepad joystick events to mouse events
# TODO enable this somewhere so that map_abs_to_rel returns true # TODO enable this somewhere so that map_abs_to_rel returns true
pass # in the .json file of the mapping.
config.set('gamepad.non_linearity', 1)
pointer_speed = 80
config.set('gamepad.pointer_speed', pointer_speed)
# same for ABS, 0 for x, 1 for y
rel_x = evdev.ecodes.REL_X
rel_y = evdev.ecodes.REL_Y
# they need to sum up before something is written
divisor = 10
x = MAX_ABS / pointer_speed / divisor
y = MAX_ABS / pointer_speed / divisor
pending_events['gamepad'] = [
Event(EV_ABS, rel_x, x),
Event(EV_ABS, rel_y, y),
Event(EV_ABS, rel_x, -x),
Event(EV_ABS, rel_y, -y),
]
self.injector = KeycodeInjector('gamepad', custom_mapping)
self.injector.start_injecting()
# wait for the injector to start sending, at most 1s
uinput_write_history_pipe[0].poll(1)
# wait a bit more for it to sum up
sleep = 0.5
time.sleep(sleep)
# convert the write history to some easier to manage list
history = []
while uinput_write_history_pipe[0].poll():
event = uinput_write_history_pipe[0].recv()
history.append((event.type, event.code, event.value))
# movement is written at 60hz and it takes `divisor` steps to
# move 1px. take it times 2 for both x and y events.
self.assertGreater(len(history), 60 * sleep * 0.9 * 2 / divisor)
self.assertLess(len(history), 60 * sleep * 1.1 * 2 / divisor)
# those may be in arbitrary order, the injector happens to write
# y first
self.assertEqual(history[-1][0], EV_REL)
self.assertEqual(history[-1][1], rel_x)
self.assertAlmostEqual(history[-1][2], -1)
self.assertEqual(history[-2][0], EV_REL)
self.assertEqual(history[-2][1], rel_y)
self.assertAlmostEqual(history[-2][2], -1)
def test_injector(self): def test_injector(self):
custom_mapping.change(8, 'k(q).k(w)') custom_mapping.change(8, 'k(q).k(w)')
@ -161,14 +214,14 @@ class TestInjector(unittest.TestCase):
# keycode used in X and in the mappings # keycode used in X and in the mappings
pending_events['device 2'] = [ pending_events['device 2'] = [
# should execute a macro # should execute a macro
Event(evdev.events.EV_KEY, 0, 1), Event(EV_KEY, 0, 1),
Event(evdev.events.EV_KEY, 0, 0), Event(EV_KEY, 0, 0),
# normal keystroke # normal keystroke
Event(evdev.events.EV_KEY, 1, 1), Event(EV_KEY, 1, 1),
Event(evdev.events.EV_KEY, 1, 0), Event(EV_KEY, 1, 0),
# ignored because unknown to the system # ignored because unknown to the system
Event(evdev.events.EV_KEY, 2, 1), Event(EV_KEY, 2, 1),
Event(evdev.events.EV_KEY, 2, 0), Event(EV_KEY, 2, 0),
# just pass those over without modifying # just pass those over without modifying
Event(3124, 3564, 6542), Event(3124, 3564, 6542),
] ]
@ -192,7 +245,7 @@ class TestInjector(unittest.TestCase):
# since the macro takes a little bit of time to execute, its # since the macro takes a little bit of time to execute, its
# keystrokes are all over the place. # keystrokes are all over the place.
# just check if they are there and if so, remove them from the list. # just check if they are there and if so, remove them from the list.
ev_key = evdev.events.EV_KEY ev_key = EV_KEY
self.assertIn((ev_key, code_q - KEYCODE_OFFSET, 1), history) self.assertIn((ev_key, code_q - KEYCODE_OFFSET, 1), history)
self.assertIn((ev_key, code_q - KEYCODE_OFFSET, 0), history) self.assertIn((ev_key, code_q - KEYCODE_OFFSET, 0), history)
self.assertIn((ev_key, code_w - KEYCODE_OFFSET, 1), history) self.assertIn((ev_key, code_w - KEYCODE_OFFSET, 1), history)