improved tests, some fixes

pull/14/head
sezanzeb 4 years ago
parent ef6710877c
commit a97c44ebd4

@ -0,0 +1,5 @@
[run]
branch = True
source = /usr/lib/python3.8/site-packages/keymapper
# concurrency = multiprocessing

@ -154,7 +154,7 @@ class KeycodeInjector:
break break
except IOError: except IOError:
attempts += 1 attempts += 1
logger.debug('Failed attemt to grab %s %d', path, attempts) logger.debug('Failed attemts to grab %s: %d', path, attempts)
if attempts >= 4: if attempts >= 4:
logger.error('Cannot grab %s, it is possibly in use', path) logger.error('Cannot grab %s, it is possibly in use', path)
@ -182,7 +182,11 @@ class KeycodeInjector:
# copy the capabilities because the keymapper_device is going # copy the capabilities because the keymapper_device is going
# to act like the device. # to act like the device.
capabilities = input_device.capabilities(absinfo=False) capabilities = input_device.capabilities(absinfo=False)
# Furthermore, support all injected keycodes # Furthermore, support all injected keycodes
if len(self.mapping) > 0 and capabilities.get(ecodes.EV_KEY) is None:
capabilities[ecodes.EV_KEY] = []
for _, character in self.mapping: for _, character in self.mapping:
keycode = system_mapping.get(character) keycode = system_mapping.get(character)
if keycode is not None: if keycode is not None:
@ -257,8 +261,8 @@ class KeycodeInjector:
) )
self._write( self._write(
keymapper_device, keymapper_device,
evdev.ecodes.EV_KEY - KEYCODE_OFFSET, evdev.ecodes.EV_KEY,
keycode, keycode - KEYCODE_OFFSET,
value value
) )
@ -294,17 +298,12 @@ class KeycodeInjector:
if event.code not in [evdev.ecodes.ABS_X, evdev.ecodes.ABS_Y]: if event.code not in [evdev.ecodes.ABS_X, evdev.ecodes.ABS_Y]:
continue continue
# TODO somehow the injector has to keep injecting EV_REL # TODO somehow the injector has to keep injecting EV_REL
# codes to keep the mouse moving # codes with the most recent value to keep the mouse moving
# code 0:X, 1:Y # code 0:X, 1:Y
# TODO get absinfo beforehand # TODO get absinfo beforehand
value = event.value // 2000 value = event.value // 2000
if value == 0: if value == 0:
continue continue
print(
evdev.ecodes.EV_REL,
event.code,
value
)
self._write( self._write(
keymapper_device, keymapper_device,
evdev.ecodes.EV_REL, evdev.ecodes.EV_REL,
@ -346,21 +345,21 @@ class KeycodeInjector:
else: else:
# TODO compile int-int mapping instead of going this route. # TODO compile int-int mapping instead of going this route.
# I think that makes the reverse mapping obsolete. # I think that makes the reverse mapping obsolete.
target_keycode = system_mapping[character] target_keycode = system_mapping.get(character)
if target_keycode is None: if target_keycode is None:
logger.error( logger.error(
'Cannot find character %s in the internal mapping', 'Don\'t know what %s maps to',
character character
) )
continue continue
logger.spam( logger.spam(
'got code:%s value:%s, maps to code:%s char:%s', 'got code:%s value:%s, maps to code:%s char:%s',
event.code + KEYCODE_OFFSET, event.code + KEYCODE_OFFSET,
event.value, event.value,
target_keycode, target_keycode,
character character
) )
self._write( self._write(
keymapper_device, keymapper_device,

@ -55,7 +55,7 @@ class _KeycodeReader:
if self._pipe is not None: if self._pipe is not None:
logger.debug('Closing reader pipe') logger.debug('Closing reader pipe')
self._pipe.close() self._pipe[0].close()
self._pipe = None self._pipe = None
def clear(self): def clear(self):
@ -104,7 +104,7 @@ class _KeycodeReader:
args=(pipe[1],) args=(pipe[1],)
) )
self._process.start() self._process.start()
self._pipe = pipe[0] self._pipe = pipe
def _consume_event(self, event, pipe): def _consume_event(self, event, pipe):
"""Write the event code into the pipe if it is a key-down press.""" """Write the event code into the pipe if it is a key-down press."""
@ -142,8 +142,8 @@ class _KeycodeReader:
return None return None
newest_keycode = None newest_keycode = None
while self._pipe.poll(): while self._pipe[0].poll():
newest_keycode = self._pipe.recv() newest_keycode = self._pipe[0].recv()
return newest_keycode return newest_keycode

@ -52,7 +52,8 @@ def populate_system_mapping():
def clear_system_mapping(): def clear_system_mapping():
"""Remove all mapped keys. Only needed for tests.""" """Remove all mapped keys. Only needed for tests."""
for key in system_mapping: keys = list(system_mapping.keys())
for key in keys:
del system_mapping[key] del system_mapping[key]

@ -39,6 +39,10 @@ assert not os.getcwd().endswith('tests')
sys.path = [os.path.abspath('.')] + sys.path sys.path = [os.path.abspath('.')] + sys.path
# give tests some time to test stuff while the process
# is still running
EVENT_READ_TIMEOUT = 0.01
tmp = '/tmp/key-mapper-test' tmp = '/tmp/key-mapper-test'
uinput_write_history = [] uinput_write_history = []
@ -143,10 +147,32 @@ def patch_paths():
paths.CONFIG = '/tmp/key-mapper-test/' paths.CONFIG = '/tmp/key-mapper-test/'
def patch_select():
# goes hand in hand with patch_evdev, which makes InputDevices return
# their names for `.fd`.
# rlist contains device names therefore, so select.select returns the
# name of the device for which events are pending.
import select
def new_select(rlist, *args):
return ([
device for device in rlist
if len(pending_events.get(device, [])) > 0
],)
select.select = new_select
def patch_evdev(): def patch_evdev():
def list_devices(): def list_devices():
return fixtures.keys() return fixtures.keys()
"""
rlist = {device.fd: device for device in self.virtual_devices}
while True:
ready = select.select(rlist, [], [])[0]
"""
class InputDevice: class InputDevice:
# expose as existing attribute, otherwise the patch for # expose as existing attribute, otherwise the patch for
# evdev < 1.0.0 will crash the test # evdev < 1.0.0 will crash the test
@ -156,10 +182,19 @@ def patch_evdev():
self.path = path self.path = path
self.phys = fixtures[path]['phys'] self.phys = fixtures[path]['phys']
self.name = fixtures[path]['name'] self.name = fixtures[path]['name']
self.fd = self.name
def grab(self): def grab(self):
pass pass
def read(self):
ret = pending_events.get(self.name, [])
if ret is not None:
# consume all of them
pending_events[self.name] = []
return ret
def read_one(self): def read_one(self):
if pending_events.get(self.name) is None: if pending_events.get(self.name) is None:
return None return None
@ -177,9 +212,7 @@ def patch_evdev():
while len(pending_events[self.name]) > 0: while len(pending_events[self.name]) > 0:
yield pending_events[self.name].pop(0) yield pending_events[self.name].pop(0)
# give tests some time to test stuff while the process time.sleep(EVENT_READ_TIMEOUT)
# is still running
time.sleep(0.01)
async def async_read_loop(self): async def async_read_loop(self):
"""Read all prepared events at once.""" """Read all prepared events at once."""
@ -243,6 +276,7 @@ is_service_running()
patch_paths() patch_paths()
patch_evdev() patch_evdev()
patch_unsaved() patch_unsaved()
patch_select()
def main(): def main():

@ -33,8 +33,6 @@ class FakePipe:
class TestGetDevices(unittest.TestCase): class TestGetDevices(unittest.TestCase):
def test_get_devices(self): def test_get_devices(self):
# don't actually start the process, just use the `run` function.
# otherwise the coverage tool can't keep track.
pipe = FakePipe() pipe = FakePipe()
_GetDevices(pipe).run() _GetDevices(pipe).run()
self.assertDictEqual(pipe.devices, { self.assertDictEqual(pipe.devices, {

@ -20,6 +20,7 @@
import unittest import unittest
import time
import evdev import evdev
@ -30,7 +31,7 @@ from keymapper.state import custom_mapping, system_mapping, \
from keymapper.mapping import Mapping from keymapper.mapping import Mapping
from test import uinput_write_history, Event, pending_events, fixtures, \ from test import uinput_write_history, Event, pending_events, fixtures, \
clear_write_history clear_write_history, EVENT_READ_TIMEOUT, uinput_write_history_pipe
class TestInjector(unittest.TestCase): class TestInjector(unittest.TestCase):
@ -67,12 +68,20 @@ class TestInjector(unittest.TestCase):
evdev.ecodes.EV_FF: [1, 2, 3] evdev.ecodes.EV_FF: [1, 2, 3]
} }
self.injector = KeycodeInjector('foo', Mapping()) mapping = Mapping()
mapping.change(
new_keycode=80,
character='a'
)
maps_to = system_mapping['a'] - KEYCODE_OFFSET
self.injector = KeycodeInjector('foo', mapping)
capabilities = self.injector._modify_capabilities(FakeDevice()) capabilities = self.injector._modify_capabilities(FakeDevice())
self.assertIn(evdev.ecodes.EV_KEY, capabilities) self.assertIn(evdev.ecodes.EV_KEY, capabilities)
self.assertIsInstance(capabilities[evdev.ecodes.EV_KEY], list) keys = capabilities[evdev.ecodes.EV_KEY]
self.assertIsInstance(capabilities[evdev.ecodes.EV_KEY][0], int) self.assertEqual(keys[0], maps_to)
self.assertNotIn(evdev.ecodes.EV_SYN, capabilities) self.assertNotIn(evdev.ecodes.EV_SYN, capabilities)
self.assertNotIn(evdev.ecodes.EV_FF, capabilities) self.assertNotIn(evdev.ecodes.EV_FF, capabilities)
@ -160,16 +169,18 @@ class TestInjector(unittest.TestCase):
] ]
self.injector = KeycodeInjector('device 2', custom_mapping) self.injector = KeycodeInjector('device 2', custom_mapping)
# don't start as process for coverage testing purposes self.injector.start_injecting()
self.injector._start_injecting()
self.assertEqual(len(uinput_write_history), 7) uinput_write_history_pipe[0].poll(timeout=1)
time.sleep(EVENT_READ_TIMEOUT * 10)
# convert the write history to some easier to manage list # convert the write history to some easier to manage list
history = [ history = []
(event.type, event.code, event.value) while uinput_write_history_pipe[0].poll():
for event in uinput_write_history event = uinput_write_history_pipe[0].recv()
] history.append((event.type, event.code, event.value))
self.assertEqual(len(history), 7)
# since the macro takes a little bit of time to execute, its # since the macro takes a little bit of time to execute, its
# keystrokes are all over the place. # keystrokes are all over the place.

@ -38,6 +38,7 @@ from keymapper.state import custom_mapping, system_mapping, \
clear_system_mapping clear_system_mapping
from keymapper.paths import CONFIG, get_config_path from keymapper.paths import CONFIG, get_config_path
from keymapper.config import config from keymapper.config import config
from keymapper.dev.reader import keycode_reader
from test import tmp, pending_events, Event, uinput_write_history_pipe, \ from test import tmp, pending_events, Event, uinput_write_history_pipe, \
clear_write_history clear_write_history
@ -222,11 +223,9 @@ class TestIntegration(unittest.TestCase):
if code: if code:
# modifies the keycode in the row not by writing into the input, # modifies the keycode in the row not by writing into the input,
# but by sending an event # but by sending an event
pending_events[self.window.selected_device] = [ keycode_reader._pipe[1].send(code)
Event(evdev.events.EV_KEY, code - 8, 1) time.sleep(0.1)
] gtk_iteration()
self.window.on_window_event(None, None)
if success: if success:
self.assertEqual(row.get_keycode(), code) self.assertEqual(row.get_keycode(), code)
self.assertIn( self.assertIn(

@ -31,11 +31,10 @@ class TestMapping(unittest.TestCase):
self.assertFalse(self.mapping.changed) self.assertFalse(self.mapping.changed)
def test_populate_system_mapping(self): def test_populate_system_mapping(self):
populate_system_mapping(self.mapping) mapping = populate_system_mapping()
self.assertGreater(len(self.mapping), 100) self.assertGreater(len(mapping), 100)
# keycode 10 is typically mapped to '1' # keycode 10 is typically mapped to '1'
self.assertEqual(self.mapping.get_keycode('1'), 10) self.assertEqual(mapping['1'], 10)
self.assertTrue(self.mapping.get_character(10).startswith('1'))
def test_clone(self): def test_clone(self):
mapping1 = Mapping() mapping1 = Mapping()

@ -22,10 +22,11 @@
import unittest import unittest
import evdev import evdev
import time
from keymapper.dev.reader import keycode_reader from keymapper.dev.reader import keycode_reader
from test import Event, pending_events from test import Event, pending_events, EVENT_READ_TIMEOUT
CODE_1 = 100 CODE_1 = 100
@ -34,60 +35,70 @@ CODE_3 = 102
class TestReader(unittest.TestCase): class TestReader(unittest.TestCase):
def setUp(self):
# verify that tearDown properly cleared the reader
self.assertIsNone(keycode_reader.read())
def tearDown(self): def tearDown(self):
keycode_reader.clear() keycode_reader.stop_reading()
if pending_events.get('device 1') is not None: keys = list(pending_events.keys())
del pending_events['device 1'] for key in keys:
if pending_events.get('device 2') is not None: del pending_events[key]
del pending_events['device 2']
def test_reading(self): def test_reading(self):
keycode_reader.start_reading('device 1')
pending_events['device 1'] = [ pending_events['device 1'] = [
Event(evdev.events.EV_KEY, CODE_1, 1), Event(evdev.events.EV_KEY, CODE_1, 1),
Event(evdev.events.EV_KEY, CODE_2, 1), Event(evdev.events.EV_KEY, CODE_2, 1),
Event(evdev.events.EV_KEY, CODE_3, 1) Event(evdev.events.EV_KEY, CODE_3, 1)
] ]
keycode_reader.start_reading('device 1')
time.sleep(EVENT_READ_TIMEOUT * 5)
self.assertEqual(keycode_reader.read(), CODE_3 + 8) self.assertEqual(keycode_reader.read(), CODE_3 + 8)
self.assertIsNone(keycode_reader.read()) self.assertIsNone(keycode_reader.read())
def test_specific_device(self): def test_wrong_device(self):
keycode_reader.start_reading('device 2')
pending_events['device 1'] = [ pending_events['device 1'] = [
Event(evdev.events.EV_KEY, CODE_1, 1), Event(evdev.events.EV_KEY, CODE_1, 1),
Event(evdev.events.EV_KEY, CODE_2, 1), Event(evdev.events.EV_KEY, CODE_2, 1),
Event(evdev.events.EV_KEY, CODE_3, 1) Event(evdev.events.EV_KEY, CODE_3, 1)
] ]
keycode_reader.start_reading('device 2')
time.sleep(EVENT_READ_TIMEOUT * 5)
self.assertIsNone(keycode_reader.read()) self.assertIsNone(keycode_reader.read())
def test_keymapper_devices(self): def test_keymapper_devices(self):
# key-mapper creates devices in /dev, which are also used for # In order to show pressed keycodes on the ui while the device is
# reading since the original device is in grab-mode # grabbed, read from that as well.
keycode_reader.start_reading('device 2')
pending_events['key-mapper device 2'] = [ pending_events['key-mapper device 2'] = [
Event(evdev.events.EV_KEY, CODE_1, 1), Event(evdev.events.EV_KEY, CODE_1, 1),
Event(evdev.events.EV_KEY, CODE_2, 1), Event(evdev.events.EV_KEY, CODE_2, 1),
Event(evdev.events.EV_KEY, CODE_3, 1) Event(evdev.events.EV_KEY, CODE_3, 1)
] ]
keycode_reader.start_reading('device 2')
time.sleep(EVENT_READ_TIMEOUT * 5)
self.assertEqual(keycode_reader.read(), CODE_3 + 8) self.assertEqual(keycode_reader.read(), CODE_3 + 8)
self.assertIsNone(keycode_reader.read()) self.assertIsNone(keycode_reader.read())
def test_clear(self): def test_clear(self):
keycode_reader.start_reading('device 1')
pending_events['device 1'] = [ pending_events['device 1'] = [
Event(evdev.events.EV_KEY, CODE_1, 1), Event(evdev.events.EV_KEY, CODE_1, 1),
Event(evdev.events.EV_KEY, CODE_2, 1), Event(evdev.events.EV_KEY, CODE_2, 1),
Event(evdev.events.EV_KEY, CODE_3, 1) Event(evdev.events.EV_KEY, CODE_3, 1)
] ]
keycode_reader.start_reading('device 1')
time.sleep(EVENT_READ_TIMEOUT * 5)
keycode_reader.clear() keycode_reader.clear()
self.assertIsNone(keycode_reader.read()) self.assertIsNone(keycode_reader.read())
def test_switch_device(self): def test_switch_device(self):
keycode_reader.start_reading('device 2')
pending_events['device 2'] = [Event(evdev.events.EV_KEY, CODE_1, 1)] pending_events['device 2'] = [Event(evdev.events.EV_KEY, CODE_1, 1)]
pending_events['device 1'] = [Event(evdev.events.EV_KEY, CODE_3, 1)]
keycode_reader.start_reading('device 2')
time.sleep(EVENT_READ_TIMEOUT * 5)
keycode_reader.start_reading('device 1') keycode_reader.start_reading('device 1')
pending_events['device 1'] = [Event(evdev.events.EV_KEY, CODE_3, 1)] time.sleep(EVENT_READ_TIMEOUT * 5)
self.assertEqual(keycode_reader.read(), CODE_3 + 8) self.assertEqual(keycode_reader.read(), CODE_3 + 8)
self.assertIsNone(keycode_reader.read()) self.assertIsNone(keycode_reader.read())

Loading…
Cancel
Save