more tests

xkb
sezanzeb 4 years ago committed by sezanzeb
parent 13555a94ce
commit 51b6ccfebf

@ -1,5 +1,4 @@
[run]
concurrency = multiprocessing
branch = True
source = /usr/lib/python3.8/site-packages/keymapper
concurrency = multiprocessing

@ -99,5 +99,6 @@ sudo dpkg -i python3-key-mapper_0.1.0-1_all.deb
```bash
pylint keymapper --extension-pkg-whitelist=evdev
sudo pip install . && python3 tests/test.py
sudo pip install . && coverage run tests/test.py
coverage combine && coverage report -m
```

@ -41,7 +41,8 @@ INITIAL_CONFIG = {
'keystroke_sleep_ms': 10
},
'gamepad': {
'non_linearity': 4
'non_linearity': 4,
'pointer_speed': 80
}
}

@ -29,6 +29,7 @@ import subprocess
import multiprocessing
import evdev
from evdev.ecodes import EV_KEY, EV_ABS, EV_REL
from keymapper.logger import logger
from keymapper.config import config
@ -69,9 +70,9 @@ def toggle_numlock():
name=f'key-mapper numlock-control',
phys='key-mapper',
)
device.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_NUMLOCK, 1)
device.write(EV_KEY, evdev.ecodes.KEY_NUMLOCK, 1)
device.syn()
device.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_NUMLOCK, 0)
device.write(EV_KEY, evdev.ecodes.KEY_NUMLOCK, 0)
device.syn()
@ -129,12 +130,14 @@ class KeycodeInjector:
capabilities = device.capabilities(absinfo=False)
needed = False
for keycode, _ in self.mapping:
if keycode - KEYCODE_OFFSET in capabilities[evdev.ecodes.EV_KEY]:
needed = True
break
# TODO only if map ABS to REL keep ABS devics
if capabilities.get(evdev.ecodes.EV_REL) is not None:
if capabilities.get(EV_KEY) is not None:
for keycode, _ in self.mapping:
if keycode - KEYCODE_OFFSET in capabilities[EV_KEY]:
needed = True
break
can_do_abs = evdev.ecodes.ABS_X in capabilities.get(EV_ABS, [])
if self.map_abs_to_rel() and can_do_abs:
needed = True
if not needed:
@ -155,14 +158,14 @@ class KeycodeInjector:
break
except IOError:
attempts += 1
# it might take a little time until the device is free if
# it was previously grabbed.
logger.debug('Failed attemts to grab %s: %d', path, attempts)
if attempts >= 4:
logger.error('Cannot grab %s, it is possibly in use', path)
return None
# it might take a little time until the device is free if
# it was previously grabbed.
time.sleep(0.15)
return device
@ -240,14 +243,30 @@ class KeycodeInjector:
events=self._modify_capabilities(input_device)
)
coroutine = self._injection_loop(input_device, uinput)
# keycode injection
coroutine = self._keycode_loop(input_device, uinput)
coroutines.append(coroutine)
# mouse movement injection
if self.map_abs_to_rel():
self.abs_x = 0
self.abs_y = 0
# events only take ints, so a movement of 0.3 needs to add
# up to 1.2 to affect the cursor.
self.pending_x_rel = 0
self.pending_y_rel = 0
coroutine = self._movement_loop(input_device, uinput)
coroutines.append(coroutine)
if len(coroutines) == 0:
raise OSError('Could not grab any device')
logger.error('Did not grab any device')
return
loop.run_until_complete(asyncio.gather(*coroutines))
if len(coroutines) > 0:
logger.debug('asyncio coroutines ended')
def _write(self, device, type, keycode, value):
"""Actually inject."""
device.write(type, keycode, value)
@ -262,15 +281,20 @@ class KeycodeInjector:
)
self._write(
keymapper_device,
evdev.ecodes.EV_KEY,
EV_KEY,
keycode - KEYCODE_OFFSET,
value
)
async def spam_mouse_movements(self, keymapper_device, input_device):
async def _movement_loop(self, input_device, keymapper_device):
"""Keep writing mouse movements based on the gamepad stick position."""
max_value = input_device.absinfo(evdev.ecodes.EV_ABS).max
logger.info('Mapping gamepad to mouse movements')
max_value = input_device.absinfo(EV_ABS).max
max_speed = ((max_value ** 2) * 2) ** 0.5
pointer_speed = config.get('gamepad.pointer_speed', 80)
non_linearity = config.get('gamepad.non_linearity', 4)
while True:
# this is part of the spawned process, so terminating that one
# will also stop this loop
@ -279,7 +303,6 @@ class KeycodeInjector:
abs_y = self.abs_y
abs_x = self.abs_x
non_linearity = config.get('gamepad.non_linearity', 4)
if non_linearity != 1:
# to make small movements smaller for more precision
speed = (abs_x ** 2 + abs_y ** 2) ** 0.5
@ -287,8 +310,8 @@ class KeycodeInjector:
else:
factor = 1
rel_x = abs_x * factor * 80 / max_value
rel_y = abs_y * factor * 80 / max_value
rel_x = abs_x * factor * pointer_speed / max_value
rel_y = abs_y * factor * pointer_speed / max_value
self.pending_x_rel += rel_x
self.pending_y_rel += rel_y
@ -300,7 +323,7 @@ class KeycodeInjector:
if rel_y != 0:
self._write(
keymapper_device,
evdev.ecodes.EV_REL,
EV_REL,
evdev.ecodes.ABS_Y,
rel_y
)
@ -308,12 +331,12 @@ class KeycodeInjector:
if rel_x != 0:
self._write(
keymapper_device,
evdev.ecodes.EV_REL,
EV_REL,
evdev.ecodes.ABS_X,
rel_x
)
async def _injection_loop(self, device, keymapper_device):
async def _keycode_loop(self, device, keymapper_device):
"""Inject keycodes for one of the virtual devices.
Parameters
@ -339,20 +362,8 @@ class KeycodeInjector:
keymapper_device.device.path, keymapper_device.fd
)
if self.map_abs_to_rel():
self.abs_x = 0
self.abs_y = 0
# events only take ints, so a movement of 0.3 needs to add up to
# 1.2 to affect the cursor.
self.pending_x_rel = 0
self.pending_y_rel = 0
asyncio.ensure_future(self.spam_mouse_movements(
keymapper_device,
device
))
async for event in device.async_read_loop():
if self.map_abs_to_rel() and event.type == evdev.ecodes.EV_ABS:
if self.map_abs_to_rel() and event.type == EV_ABS:
if event.code not in [evdev.ecodes.ABS_X, evdev.ecodes.ABS_Y]:
continue
if event.code == evdev.ecodes.ABS_X:
@ -361,7 +372,7 @@ class KeycodeInjector:
self.abs_y = event.value
continue
if event.type != evdev.ecodes.EV_KEY:
if event.type != EV_KEY:
keymapper_device.write(event.type, event.code, event.value)
# this already includes SYN events, so need to syn here again
continue
@ -426,6 +437,6 @@ class KeycodeInjector:
@ensure_numlock
def stop_injecting(self):
"""Stop injecting keycodes."""
logger.info('Stopping injecting keycodes for device %s', self.device)
logger.info('Stopping injecting keycodes for device "%s"', self.device)
if self._process is not None and self._process.is_alive():
self._process.terminate()

@ -28,6 +28,7 @@ import time
import asyncio
import evdev
from evdev.ecodes import EV_KEY, EV_ABS
from keymapper.logger import logger
@ -81,7 +82,7 @@ class _GetDevices(threading.Thread):
# only keyboard devices
# https://www.kernel.org/doc/html/latest/input/event-codes.html
capabilities = device.capabilities().keys()
if evdev.ecodes.EV_KEY not in capabilities:
if EV_KEY not in capabilities and EV_ABS not in capabilities:
continue
usb = device.phys.split('/')[0]

@ -43,6 +43,8 @@ sys.path = [os.path.abspath('.')] + sys.path
# is still running
EVENT_READ_TIMEOUT = 0.01
MAX_ABS = 2 ** 15
tmp = '/tmp/key-mapper-test'
uinput_write_history = []
@ -55,7 +57,7 @@ pending_events = {}
fixtures = {
# device 1
'/dev/input/event11': {
'capabilities': {evdev.ecodes.EV_KEY: [], evdev.ecodes.EV_ABS: []},
'capabilities': {evdev.ecodes.EV_KEY: [], evdev.ecodes.EV_REL: []},
'phys': 'usb-0000:03:00.0-1/input2',
'name': 'device 1 foo'
},
@ -82,12 +84,13 @@ fixtures = {
'name': 'device 2'
},
# devices that are completely ignored
'/dev/input/event30': {
'capabilities': {evdev.ecodes.EV_SYN: []},
'capabilities': {evdev.ecodes.EV_SYN: [], evdev.ecodes.EV_ABS: [0, 1]},
'phys': 'usb-0000:03:00.0-3/input1',
'name': 'device 3'
'name': 'gamepad'
},
# device that is completely ignored
'/dev/input/event31': {
'capabilities': {evdev.ecodes.EV_SYN: []},
'phys': 'usb-0000:03:00.0-4/input1',
@ -188,7 +191,7 @@ def patch_evdev():
return {
evdev.ecodes.EV_ABS: evdev.AbsInfo(
value=None, min=None, fuzz=None, flat=None,
resolution=None, max=2**15
resolution=None, max=MAX_ABS
)
}[axis]
@ -239,10 +242,12 @@ def patch_evdev():
class UInput:
def __init__(self, *args, **kwargs):
self.fd = 0
self.write_count = 0
self.device = InputDevice('/dev/input/event40')
pass
def write(self, type, code, value):
self.write_count += 1
event = Event(type, code, value)
uinput_write_history.append(event)
uinput_write_history_pipe[1].send(event)

@ -52,6 +52,10 @@ class TestGetDevices(unittest.TestCase):
'paths': ['/dev/input/event20'],
'devices': ['device 2']
},
'gamepad': {
'paths': ['/dev/input/event30'],
'devices': ['gamepad']
},
'key-mapper device 2': {
'paths': ['/dev/input/event40'],
'devices': ['key-mapper device 2']
@ -76,7 +80,11 @@ class TestGetDevices(unittest.TestCase):
'device 2': {
'paths': ['/dev/input/event20'],
'devices': ['device 2']
}
},
'gamepad': {
'paths': ['/dev/input/event30'],
'devices': ['gamepad']
},
})

@ -23,15 +23,18 @@ import unittest
import time
import evdev
from evdev.ecodes import EV_REL, EV_KEY, EV_ABS
from keymapper.dev.injector import is_numlock_on, toggle_numlock,\
ensure_numlock, KeycodeInjector
from keymapper.state import custom_mapping, system_mapping, \
clear_system_mapping, KEYCODE_OFFSET
from keymapper.mapping import Mapping
from keymapper.config import config
from test import uinput_write_history, Event, pending_events, fixtures, \
clear_write_history, EVENT_READ_TIMEOUT, uinput_write_history_pipe
clear_write_history, EVENT_READ_TIMEOUT, uinput_write_history_pipe, \
MAX_ABS
class TestInjector(unittest.TestCase):
@ -79,8 +82,8 @@ class TestInjector(unittest.TestCase):
self.injector = KeycodeInjector('foo', mapping)
capabilities = self.injector._modify_capabilities(FakeDevice())
self.assertIn(evdev.ecodes.EV_KEY, capabilities)
keys = capabilities[evdev.ecodes.EV_KEY]
self.assertIn(EV_KEY, capabilities)
keys = capabilities[EV_KEY]
self.assertEqual(keys[0], maps_to)
self.assertNotIn(evdev.ecodes.EV_SYN, capabilities)
@ -114,10 +117,12 @@ class TestInjector(unittest.TestCase):
path = '/dev/input/event11'
device = self.injector._prepare_device(path)
# make sure the test uses a fixture without capabilities
# make sure the test uses a fixture without interesting capabilities
capabilities = evdev.InputDevice(path).capabilities()
self.assertEqual(len(capabilities[evdev.ecodes.EV_KEY]), 0)
self.assertEqual(len(capabilities.get(EV_KEY, [])), 0)
self.assertEqual(len(capabilities.get(EV_ABS, [])), 0)
# skips the device alltogether, so no grab attempts fail
self.assertEqual(self.failed, 0)
self.assertIsNone(device)
@ -141,7 +146,55 @@ class TestInjector(unittest.TestCase):
def test_abs_to_rel(self):
# maps gamepad joystick events to mouse events
# TODO enable this somewhere so that map_abs_to_rel returns true
pass
# in the .json file of the mapping.
config.set('gamepad.non_linearity', 1)
pointer_speed = 80
config.set('gamepad.pointer_speed', pointer_speed)
# same for ABS, 0 for x, 1 for y
rel_x = evdev.ecodes.REL_X
rel_y = evdev.ecodes.REL_Y
# they need to sum up before something is written
divisor = 10
x = MAX_ABS / pointer_speed / divisor
y = MAX_ABS / pointer_speed / divisor
pending_events['gamepad'] = [
Event(EV_ABS, rel_x, x),
Event(EV_ABS, rel_y, y),
Event(EV_ABS, rel_x, -x),
Event(EV_ABS, rel_y, -y),
]
self.injector = KeycodeInjector('gamepad', custom_mapping)
self.injector.start_injecting()
# wait for the injector to start sending, at most 1s
uinput_write_history_pipe[0].poll(1)
# wait a bit more for it to sum up
sleep = 0.5
time.sleep(sleep)
# convert the write history to some easier to manage list
history = []
while uinput_write_history_pipe[0].poll():
event = uinput_write_history_pipe[0].recv()
history.append((event.type, event.code, event.value))
# movement is written at 60hz and it takes `divisor` steps to
# move 1px. take it times 2 for both x and y events.
self.assertGreater(len(history), 60 * sleep * 0.9 * 2 / divisor)
self.assertLess(len(history), 60 * sleep * 1.1 * 2 / divisor)
# those may be in arbitrary order, the injector happens to write
# y first
self.assertEqual(history[-1][0], EV_REL)
self.assertEqual(history[-1][1], rel_x)
self.assertAlmostEqual(history[-1][2], -1)
self.assertEqual(history[-2][0], EV_REL)
self.assertEqual(history[-2][1], rel_y)
self.assertAlmostEqual(history[-2][2], -1)
def test_injector(self):
custom_mapping.change(8, 'k(q).k(w)')
@ -161,14 +214,14 @@ class TestInjector(unittest.TestCase):
# keycode used in X and in the mappings
pending_events['device 2'] = [
# should execute a macro
Event(evdev.events.EV_KEY, 0, 1),
Event(evdev.events.EV_KEY, 0, 0),
Event(EV_KEY, 0, 1),
Event(EV_KEY, 0, 0),
# normal keystroke
Event(evdev.events.EV_KEY, 1, 1),
Event(evdev.events.EV_KEY, 1, 0),
Event(EV_KEY, 1, 1),
Event(EV_KEY, 1, 0),
# ignored because unknown to the system
Event(evdev.events.EV_KEY, 2, 1),
Event(evdev.events.EV_KEY, 2, 0),
Event(EV_KEY, 2, 1),
Event(EV_KEY, 2, 0),
# just pass those over without modifying
Event(3124, 3564, 6542),
]
@ -192,7 +245,7 @@ class TestInjector(unittest.TestCase):
# since the macro takes a little bit of time to execute, its
# keystrokes are all over the place.
# just check if they are there and if so, remove them from the list.
ev_key = evdev.events.EV_KEY
ev_key = EV_KEY
self.assertIn((ev_key, code_q - KEYCODE_OFFSET, 1), history)
self.assertIn((ev_key, code_q - KEYCODE_OFFSET, 0), history)
self.assertIn((ev_key, code_w - KEYCODE_OFFSET, 1), history)

Loading…
Cancel
Save