From a97c44ebd421674507b67d0c29273f11898e770e Mon Sep 17 00:00:00 2001 From: sezanzeb Date: Mon, 30 Nov 2020 18:59:34 +0100 Subject: [PATCH] improved tests, some fixes --- .coveragerc | 5 +++++ keymapper/dev/injector.py | 35 ++++++++++++++--------------- keymapper/dev/reader.py | 8 +++---- keymapper/state.py | 3 ++- tests/test.py | 40 ++++++++++++++++++++++++++++++--- tests/testcases/getdevices.py | 2 -- tests/testcases/injector.py | 33 ++++++++++++++++++--------- tests/testcases/integration.py | 9 ++++---- tests/testcases/mapping.py | 7 +++--- tests/testcases/reader.py | 41 +++++++++++++++++++++------------- 10 files changed, 120 insertions(+), 63 deletions(-) create mode 100644 .coveragerc diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 00000000..92e0ec74 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,5 @@ +[run] +branch = True +source = /usr/lib/python3.8/site-packages/keymapper +# concurrency = multiprocessing + diff --git a/keymapper/dev/injector.py b/keymapper/dev/injector.py index c004303b..3a906ecc 100644 --- a/keymapper/dev/injector.py +++ b/keymapper/dev/injector.py @@ -154,7 +154,7 @@ class KeycodeInjector: break except IOError: attempts += 1 - logger.debug('Failed attemt to grab %s %d', path, attempts) + logger.debug('Failed attemts to grab %s: %d', path, attempts) if attempts >= 4: logger.error('Cannot grab %s, it is possibly in use', path) @@ -182,7 +182,11 @@ class KeycodeInjector: # copy the capabilities because the keymapper_device is going # to act like the device. capabilities = input_device.capabilities(absinfo=False) + # Furthermore, support all injected keycodes + if len(self.mapping) > 0 and capabilities.get(ecodes.EV_KEY) is None: + capabilities[ecodes.EV_KEY] = [] + for _, character in self.mapping: keycode = system_mapping.get(character) if keycode is not None: @@ -257,8 +261,8 @@ class KeycodeInjector: ) self._write( keymapper_device, - evdev.ecodes.EV_KEY - KEYCODE_OFFSET, - keycode, + evdev.ecodes.EV_KEY, + keycode - KEYCODE_OFFSET, value ) @@ -294,17 +298,12 @@ class KeycodeInjector: if event.code not in [evdev.ecodes.ABS_X, evdev.ecodes.ABS_Y]: continue # TODO somehow the injector has to keep injecting EV_REL - # codes to keep the mouse moving + # codes with the most recent value to keep the mouse moving # code 0:X, 1:Y # TODO get absinfo beforehand value = event.value // 2000 if value == 0: continue - print( - evdev.ecodes.EV_REL, - event.code, - value - ) self._write( keymapper_device, evdev.ecodes.EV_REL, @@ -346,21 +345,21 @@ class KeycodeInjector: else: # TODO compile int-int mapping instead of going this route. # I think that makes the reverse mapping obsolete. - target_keycode = system_mapping[character] + target_keycode = system_mapping.get(character) if target_keycode is None: logger.error( - 'Cannot find character %s in the internal mapping', + 'Don\'t know what %s maps to', character ) continue - logger.spam( - 'got code:%s value:%s, maps to code:%s char:%s', - event.code + KEYCODE_OFFSET, - event.value, - target_keycode, - character - ) + logger.spam( + 'got code:%s value:%s, maps to code:%s char:%s', + event.code + KEYCODE_OFFSET, + event.value, + target_keycode, + character + ) self._write( keymapper_device, diff --git a/keymapper/dev/reader.py b/keymapper/dev/reader.py index 062d794c..59be3837 100644 --- a/keymapper/dev/reader.py +++ b/keymapper/dev/reader.py @@ -55,7 +55,7 @@ class _KeycodeReader: if self._pipe is not None: logger.debug('Closing reader pipe') - self._pipe.close() + self._pipe[0].close() self._pipe = None def clear(self): @@ -104,7 +104,7 @@ class _KeycodeReader: args=(pipe[1],) ) self._process.start() - self._pipe = pipe[0] + self._pipe = pipe def _consume_event(self, event, pipe): """Write the event code into the pipe if it is a key-down press.""" @@ -142,8 +142,8 @@ class _KeycodeReader: return None newest_keycode = None - while self._pipe.poll(): - newest_keycode = self._pipe.recv() + while self._pipe[0].poll(): + newest_keycode = self._pipe[0].recv() return newest_keycode diff --git a/keymapper/state.py b/keymapper/state.py index 2700bc46..3e7c4154 100644 --- a/keymapper/state.py +++ b/keymapper/state.py @@ -52,7 +52,8 @@ def populate_system_mapping(): def clear_system_mapping(): """Remove all mapped keys. Only needed for tests.""" - for key in system_mapping: + keys = list(system_mapping.keys()) + for key in keys: del system_mapping[key] diff --git a/tests/test.py b/tests/test.py index 4a22f050..eae57fa8 100644 --- a/tests/test.py +++ b/tests/test.py @@ -39,6 +39,10 @@ assert not os.getcwd().endswith('tests') sys.path = [os.path.abspath('.')] + sys.path +# give tests some time to test stuff while the process +# is still running +EVENT_READ_TIMEOUT = 0.01 + tmp = '/tmp/key-mapper-test' uinput_write_history = [] @@ -143,10 +147,32 @@ def patch_paths(): paths.CONFIG = '/tmp/key-mapper-test/' +def patch_select(): + # goes hand in hand with patch_evdev, which makes InputDevices return + # their names for `.fd`. + # rlist contains device names therefore, so select.select returns the + # name of the device for which events are pending. + import select + + def new_select(rlist, *args): + return ([ + device for device in rlist + if len(pending_events.get(device, [])) > 0 + ],) + + select.select = new_select + + def patch_evdev(): def list_devices(): return fixtures.keys() + """ + rlist = {device.fd: device for device in self.virtual_devices} + while True: + ready = select.select(rlist, [], [])[0] + """ + class InputDevice: # expose as existing attribute, otherwise the patch for # evdev < 1.0.0 will crash the test @@ -156,10 +182,19 @@ def patch_evdev(): self.path = path self.phys = fixtures[path]['phys'] self.name = fixtures[path]['name'] + self.fd = self.name def grab(self): pass + def read(self): + ret = pending_events.get(self.name, []) + if ret is not None: + # consume all of them + pending_events[self.name] = [] + + return ret + def read_one(self): if pending_events.get(self.name) is None: return None @@ -177,9 +212,7 @@ def patch_evdev(): while len(pending_events[self.name]) > 0: yield pending_events[self.name].pop(0) - # give tests some time to test stuff while the process - # is still running - time.sleep(0.01) + time.sleep(EVENT_READ_TIMEOUT) async def async_read_loop(self): """Read all prepared events at once.""" @@ -243,6 +276,7 @@ is_service_running() patch_paths() patch_evdev() patch_unsaved() +patch_select() def main(): diff --git a/tests/testcases/getdevices.py b/tests/testcases/getdevices.py index 2df94d1e..990fdc0e 100644 --- a/tests/testcases/getdevices.py +++ b/tests/testcases/getdevices.py @@ -33,8 +33,6 @@ class FakePipe: class TestGetDevices(unittest.TestCase): def test_get_devices(self): - # don't actually start the process, just use the `run` function. - # otherwise the coverage tool can't keep track. pipe = FakePipe() _GetDevices(pipe).run() self.assertDictEqual(pipe.devices, { diff --git a/tests/testcases/injector.py b/tests/testcases/injector.py index 17738195..26ba8488 100644 --- a/tests/testcases/injector.py +++ b/tests/testcases/injector.py @@ -20,6 +20,7 @@ import unittest +import time import evdev @@ -30,7 +31,7 @@ from keymapper.state import custom_mapping, system_mapping, \ from keymapper.mapping import Mapping from test import uinput_write_history, Event, pending_events, fixtures, \ - clear_write_history + clear_write_history, EVENT_READ_TIMEOUT, uinput_write_history_pipe class TestInjector(unittest.TestCase): @@ -67,12 +68,20 @@ class TestInjector(unittest.TestCase): evdev.ecodes.EV_FF: [1, 2, 3] } - self.injector = KeycodeInjector('foo', Mapping()) + mapping = Mapping() + mapping.change( + new_keycode=80, + character='a' + ) + + maps_to = system_mapping['a'] - KEYCODE_OFFSET + + self.injector = KeycodeInjector('foo', mapping) capabilities = self.injector._modify_capabilities(FakeDevice()) self.assertIn(evdev.ecodes.EV_KEY, capabilities) - self.assertIsInstance(capabilities[evdev.ecodes.EV_KEY], list) - self.assertIsInstance(capabilities[evdev.ecodes.EV_KEY][0], int) + keys = capabilities[evdev.ecodes.EV_KEY] + self.assertEqual(keys[0], maps_to) self.assertNotIn(evdev.ecodes.EV_SYN, capabilities) self.assertNotIn(evdev.ecodes.EV_FF, capabilities) @@ -160,16 +169,18 @@ class TestInjector(unittest.TestCase): ] self.injector = KeycodeInjector('device 2', custom_mapping) - # don't start as process for coverage testing purposes - self.injector._start_injecting() + self.injector.start_injecting() - self.assertEqual(len(uinput_write_history), 7) + uinput_write_history_pipe[0].poll(timeout=1) + time.sleep(EVENT_READ_TIMEOUT * 10) # convert the write history to some easier to manage list - history = [ - (event.type, event.code, event.value) - for event in uinput_write_history - ] + history = [] + while uinput_write_history_pipe[0].poll(): + event = uinput_write_history_pipe[0].recv() + history.append((event.type, event.code, event.value)) + + self.assertEqual(len(history), 7) # since the macro takes a little bit of time to execute, its # keystrokes are all over the place. diff --git a/tests/testcases/integration.py b/tests/testcases/integration.py index 09a0c218..ce762162 100644 --- a/tests/testcases/integration.py +++ b/tests/testcases/integration.py @@ -38,6 +38,7 @@ from keymapper.state import custom_mapping, system_mapping, \ clear_system_mapping from keymapper.paths import CONFIG, get_config_path from keymapper.config import config +from keymapper.dev.reader import keycode_reader from test import tmp, pending_events, Event, uinput_write_history_pipe, \ clear_write_history @@ -222,11 +223,9 @@ class TestIntegration(unittest.TestCase): if code: # modifies the keycode in the row not by writing into the input, # but by sending an event - pending_events[self.window.selected_device] = [ - Event(evdev.events.EV_KEY, code - 8, 1) - ] - self.window.on_window_event(None, None) - + keycode_reader._pipe[1].send(code) + time.sleep(0.1) + gtk_iteration() if success: self.assertEqual(row.get_keycode(), code) self.assertIn( diff --git a/tests/testcases/mapping.py b/tests/testcases/mapping.py index ac8dfec7..5a873eae 100644 --- a/tests/testcases/mapping.py +++ b/tests/testcases/mapping.py @@ -31,11 +31,10 @@ class TestMapping(unittest.TestCase): self.assertFalse(self.mapping.changed) def test_populate_system_mapping(self): - populate_system_mapping(self.mapping) - self.assertGreater(len(self.mapping), 100) + mapping = populate_system_mapping() + self.assertGreater(len(mapping), 100) # keycode 10 is typically mapped to '1' - self.assertEqual(self.mapping.get_keycode('1'), 10) - self.assertTrue(self.mapping.get_character(10).startswith('1')) + self.assertEqual(mapping['1'], 10) def test_clone(self): mapping1 = Mapping() diff --git a/tests/testcases/reader.py b/tests/testcases/reader.py index dd049b09..016d7a69 100644 --- a/tests/testcases/reader.py +++ b/tests/testcases/reader.py @@ -22,10 +22,11 @@ import unittest import evdev +import time from keymapper.dev.reader import keycode_reader -from test import Event, pending_events +from test import Event, pending_events, EVENT_READ_TIMEOUT CODE_1 = 100 @@ -34,60 +35,70 @@ CODE_3 = 102 class TestReader(unittest.TestCase): + def setUp(self): + # verify that tearDown properly cleared the reader + self.assertIsNone(keycode_reader.read()) + def tearDown(self): - keycode_reader.clear() - if pending_events.get('device 1') is not None: - del pending_events['device 1'] - if pending_events.get('device 2') is not None: - del pending_events['device 2'] + keycode_reader.stop_reading() + keys = list(pending_events.keys()) + for key in keys: + del pending_events[key] def test_reading(self): - keycode_reader.start_reading('device 1') pending_events['device 1'] = [ Event(evdev.events.EV_KEY, CODE_1, 1), Event(evdev.events.EV_KEY, CODE_2, 1), Event(evdev.events.EV_KEY, CODE_3, 1) ] + keycode_reader.start_reading('device 1') + time.sleep(EVENT_READ_TIMEOUT * 5) self.assertEqual(keycode_reader.read(), CODE_3 + 8) self.assertIsNone(keycode_reader.read()) - def test_specific_device(self): - keycode_reader.start_reading('device 2') + def test_wrong_device(self): pending_events['device 1'] = [ Event(evdev.events.EV_KEY, CODE_1, 1), Event(evdev.events.EV_KEY, CODE_2, 1), Event(evdev.events.EV_KEY, CODE_3, 1) ] + keycode_reader.start_reading('device 2') + time.sleep(EVENT_READ_TIMEOUT * 5) self.assertIsNone(keycode_reader.read()) def test_keymapper_devices(self): - # key-mapper creates devices in /dev, which are also used for - # reading since the original device is in grab-mode - keycode_reader.start_reading('device 2') + # In order to show pressed keycodes on the ui while the device is + # grabbed, read from that as well. pending_events['key-mapper device 2'] = [ Event(evdev.events.EV_KEY, CODE_1, 1), Event(evdev.events.EV_KEY, CODE_2, 1), Event(evdev.events.EV_KEY, CODE_3, 1) ] + keycode_reader.start_reading('device 2') + time.sleep(EVENT_READ_TIMEOUT * 5) self.assertEqual(keycode_reader.read(), CODE_3 + 8) self.assertIsNone(keycode_reader.read()) def test_clear(self): - keycode_reader.start_reading('device 1') pending_events['device 1'] = [ Event(evdev.events.EV_KEY, CODE_1, 1), Event(evdev.events.EV_KEY, CODE_2, 1), Event(evdev.events.EV_KEY, CODE_3, 1) ] + keycode_reader.start_reading('device 1') + time.sleep(EVENT_READ_TIMEOUT * 5) keycode_reader.clear() self.assertIsNone(keycode_reader.read()) def test_switch_device(self): - keycode_reader.start_reading('device 2') pending_events['device 2'] = [Event(evdev.events.EV_KEY, CODE_1, 1)] + pending_events['device 1'] = [Event(evdev.events.EV_KEY, CODE_3, 1)] + + keycode_reader.start_reading('device 2') + time.sleep(EVENT_READ_TIMEOUT * 5) keycode_reader.start_reading('device 1') - pending_events['device 1'] = [Event(evdev.events.EV_KEY, CODE_3, 1)] + time.sleep(EVENT_READ_TIMEOUT * 5) self.assertEqual(keycode_reader.read(), CODE_3 + 8) self.assertIsNone(keycode_reader.read())