more tests

pull/14/head
sezanzeb 4 years ago
parent d75082bdf7
commit 6d6a7b25d7

@ -64,16 +64,7 @@ def toggle_numlock():
subprocess.check_output(['numlockx', 'toggle']) subprocess.check_output(['numlockx', 'toggle'])
except FileNotFoundError: except FileNotFoundError:
# doesn't seem to be installed everywhere # doesn't seem to be installed everywhere
logger.debug('numlockx not found, trying to inject a keycode') logger.debug('numlockx not found')
# and this doesn't always work.
device = evdev.UInput(
name=f'{DEV_NAME} numlock-control',
phys=DEV_NAME,
)
device.write(EV_KEY, evdev.ecodes.KEY_NUMLOCK, 1)
device.syn()
device.write(EV_KEY, evdev.ecodes.KEY_NUMLOCK, 0)
device.syn()
def ensure_numlock(func): def ensure_numlock(func):
@ -98,7 +89,6 @@ class KeycodeInjector:
make running multiple injector easier. There is one procss per make running multiple injector easier. There is one procss per
hardware-device that is being mapped. hardware-device that is being mapped.
""" """
@ensure_numlock
def __init__(self, device, mapping): def __init__(self, device, mapping):
"""Start injecting keycodes based on custom_mapping. """Start injecting keycodes based on custom_mapping.
@ -134,9 +124,9 @@ class KeycodeInjector:
because the capabilities of the returned device are changed because the capabilities of the returned device are changed
so this cannot be checked later anymore. so this cannot be checked later anymore.
""" """
device = evdev.InputDevice(path) try:
device = evdev.InputDevice(path)
if device is None: except FileNotFoundError:
return None, False return None, False
capabilities = device.capabilities(absinfo=False) capabilities = device.capabilities(absinfo=False)
@ -360,8 +350,6 @@ class KeycodeInjector:
async for event in device.async_read_loop(): async for event in device.async_read_loop():
if abs_to_rel and event.type == EV_ABS and event.code in JOYSTICK: if abs_to_rel and event.type == EV_ABS and event.code in JOYSTICK:
if event.code not in [evdev.ecodes.ABS_X, evdev.ecodes.ABS_Y]:
continue
if event.code == evdev.ecodes.ABS_X: if event.code == evdev.ecodes.ABS_X:
self.abs_state[0] = event.value self.abs_state[0] = event.value
if event.code == evdev.ecodes.ABS_Y: if event.code == evdev.ecodes.ABS_Y:

@ -23,6 +23,7 @@
import os import os
import time
import logging import logging
import pkg_resources import pkg_resources
@ -41,6 +42,8 @@ def spam(self, message, *args, **kwargs):
logging.addLevelName(SPAM, "SPAM") logging.addLevelName(SPAM, "SPAM")
logging.Logger.spam = spam logging.Logger.spam = spam
start = time.time()
class Formatter(logging.Formatter): class Formatter(logging.Formatter):
"""Overwritten Formatter to print nicer logs.""" """Overwritten Formatter to print nicer logs."""

@ -207,10 +207,14 @@ def patch_evdev():
path = None path = None
def __init__(self, path): def __init__(self, path):
if path not in fixtures:
raise FileNotFoundError()
self.path = path self.path = path
self.phys = fixtures[path]['phys'] self.phys = fixtures[path]['phys']
self.name = fixtures[path]['name'] self.name = fixtures[path]['name']
self.fd = self.name self.fd = self.name
self.capa = copy.deepcopy(fixtures[self.path]['capabilities'])
def absinfo(axis): def absinfo(axis):
return { return {
@ -262,7 +266,7 @@ def patch_evdev():
await asyncio.sleep(0.01) await asyncio.sleep(0.01)
def capabilities(self, absinfo=True): def capabilities(self, absinfo=True):
return copy.deepcopy(fixtures[self.path]['capabilities']) return self.capa
class UInput: class UInput:
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):

@ -26,7 +26,7 @@ import time
import evdev import evdev
from evdev.ecodes import EV_REL, EV_KEY, EV_ABS, ABS_HAT0X from evdev.ecodes import EV_REL, EV_KEY, EV_ABS, ABS_HAT0X
from keymapper.dev.injector import is_numlock_on, toggle_numlock,\ from keymapper.dev.injector import is_numlock_on, toggle_numlock, \
ensure_numlock, KeycodeInjector ensure_numlock, KeycodeInjector
from keymapper.dev.keycode_mapper import handle_keycode from keymapper.dev.keycode_mapper import handle_keycode
from keymapper.state import custom_mapping, system_mapping, \ from keymapper.state import custom_mapping, system_mapping, \
@ -48,9 +48,10 @@ class TestInjector(unittest.TestCase):
def setUp(self): def setUp(self):
self.failed = 0 self.failed = 0
self.make_it_fail = 2
def grab_fail_twice(_): def grab_fail_twice(_):
if self.failed < 2: if self.failed < self.make_it_fail:
self.failed += 1 self.failed += 1
raise OSError() raise OSError()
@ -79,6 +80,9 @@ class TestInjector(unittest.TestCase):
mapping = Mapping() mapping = Mapping()
mapping.change((EV_KEY, 80), 'a') mapping.change((EV_KEY, 80), 'a')
# going to be ignored
mapping.change((EV_REL, 1234), 'b')
maps_to = system_mapping['a'] maps_to = system_mapping['a']
self.injector = KeycodeInjector('foo', mapping) self.injector = KeycodeInjector('foo', mapping)
@ -94,6 +98,7 @@ class TestInjector(unittest.TestCase):
self.assertNotIn(evdev.ecodes.EV_SYN, capabilities) self.assertNotIn(evdev.ecodes.EV_SYN, capabilities)
self.assertNotIn(evdev.ecodes.EV_FF, capabilities) self.assertNotIn(evdev.ecodes.EV_FF, capabilities)
self.assertNotIn(evdev.ecodes.EV_REL, capabilities)
def test_grab(self): def test_grab(self):
# path is from the fixtures # path is from the fixtures
@ -109,6 +114,23 @@ class TestInjector(unittest.TestCase):
# success on the third try # success on the third try
device.name = fixtures[path]['name'] device.name = fixtures[path]['name']
def test_fail_grab(self):
self.make_it_fail = 10
custom_mapping.change((EV_KEY, 10), 'a')
self.injector = KeycodeInjector('device 1', custom_mapping)
path = '/dev/input/event10'
device, abs_to_rel = self.injector._prepare_device(path)
self.assertFalse(abs_to_rel)
self.assertGreaterEqual(self.failed, 1)
self.assertIsNone(device)
self.injector.start_injecting()
# since none can be grabbed, the process will terminate. But that
# actually takes quite some time.
time.sleep(1.5)
self.assertFalse(self.injector._process.is_alive())
def test_prepare_device_1(self): def test_prepare_device_1(self):
# according to the fixtures, /dev/input/event30 can do ABS_HAT0X # according to the fixtures, /dev/input/event30 can do ABS_HAT0X
custom_mapping.change((EV_ABS, ABS_HAT0X), 'a') custom_mapping.change((EV_ABS, ABS_HAT0X), 'a')
@ -118,6 +140,13 @@ class TestInjector(unittest.TestCase):
self.assertIsNone(_prepare_device('/dev/input/event10')[0]) self.assertIsNone(_prepare_device('/dev/input/event10')[0])
self.assertIsNotNone(_prepare_device('/dev/input/event30')[0]) self.assertIsNotNone(_prepare_device('/dev/input/event30')[0])
def test_prepare_device_non_existing(self):
custom_mapping.change((EV_ABS, ABS_HAT0X), 'a')
self.injector = KeycodeInjector('foobar', custom_mapping)
_prepare_device = self.injector._prepare_device
self.assertIsNone(_prepare_device('/dev/input/event1234')[0])
def test_gamepad_capabilities(self): def test_gamepad_capabilities(self):
self.injector = KeycodeInjector('gamepad', custom_mapping) self.injector = KeycodeInjector('gamepad', custom_mapping)
@ -166,10 +195,17 @@ class TestInjector(unittest.TestCase):
self.assertEqual(not before, is_numlock_on()) self.assertEqual(not before, is_numlock_on())
@ensure_numlock @ensure_numlock
def wrapped(): def wrapped_1():
toggle_numlock() toggle_numlock()
wrapped() # should not change @ensure_numlock
def wrapped_2():
pass
# should not change
wrapped_1()
self.assertEqual(not before, is_numlock_on())
wrapped_2()
self.assertEqual(not before, is_numlock_on()) self.assertEqual(not before, is_numlock_on())
# toggle one more time to restore the previous configuration # toggle one more time to restore the previous configuration
@ -331,6 +367,10 @@ class TestInjector(unittest.TestCase):
uinput_write_history_pipe[0].poll(timeout=1) uinput_write_history_pipe[0].poll(timeout=1)
time.sleep(EVENT_READ_TIMEOUT * 10) time.sleep(EVENT_READ_TIMEOUT * 10)
# sending anything arbitrary does not stop the process
# (is_alive checked later after some time)
self.injector._msg_pipe[1].send(1234)
# convert the write history to some easier to manage list # convert the write history to some easier to manage list
history = [] history = []
while uinput_write_history_pipe[0].poll(): while uinput_write_history_pipe[0].poll():
@ -372,6 +412,9 @@ class TestInjector(unittest.TestCase):
self.assertEqual(history[3], (ev_key, input_b, 0)) self.assertEqual(history[3], (ev_key, input_b, 0))
self.assertEqual(history[4], (3124, 3564, 6542)) self.assertEqual(history[4], (3124, 3564, 6542))
time.sleep(0.1)
self.assertTrue(self.injector._process.is_alive())
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

Loading…
Cancel
Save