From 3ab1869827b7950f9563f89e1a69501002dab240 Mon Sep 17 00:00:00 2001 From: sezanzeb Date: Sun, 24 Jan 2021 18:36:31 +0100 Subject: [PATCH] efficiently hold forever until released with h() --- keymapper/dev/keycode_mapper.py | 2 +- keymapper/dev/macros.py | 62 +++++++++++++++++++------- tests/testcases/test_keycode_mapper.py | 38 ++++++++-------- tests/testcases/test_macros.py | 41 +++++++++++++---- 4 files changed, 98 insertions(+), 45 deletions(-) diff --git a/keymapper/dev/keycode_mapper.py b/keymapper/dev/keycode_mapper.py index aad0bc3e..8f4f8c5c 100644 --- a/keymapper/dev/keycode_mapper.py +++ b/keymapper/dev/keycode_mapper.py @@ -271,7 +271,7 @@ def handle_keycode(key_to_code, macros, event, uinput, forward=True): """Releasing keys and macros""" if is_key_up(event): - if active_macro is not None and active_macro.holding: + if active_macro is not None and active_macro.is_holding(): # Tell the macro for that keycode that the key is released and # let it decide what to do with that information. active_macro.release_key() diff --git a/keymapper/dev/macros.py b/keymapper/dev/macros.py index 0a1b595b..02b5795f 100644 --- a/keymapper/dev/macros.py +++ b/keymapper/dev/macros.py @@ -79,8 +79,8 @@ class _Macro: self.code = code self.mapping = mapping - # supposed to be True between key event values 1 (down) and 0 (up) - self.holding = False + # is a lock so that h() can be realized + self._holding_lock = asyncio.Lock() self.running = False @@ -89,6 +89,10 @@ class _Macro: self.child_macros = [] + def is_holding(self): + """Check if the macro is waiting for a key to be released.""" + return self._holding_lock.locked() + def get_capabilities(self): """Resolve all capabilities of the macro and those of its children.""" capabilities = self.capabilities.copy() @@ -123,31 +127,52 @@ class _Macro: def press_key(self): """The user pressed the key down.""" - self.holding = True + if self.is_holding(): + logger.error('Already holding') + return + + asyncio.ensure_future(self._holding_lock.acquire()) + for macro in self.child_macros: macro.press_key() def release_key(self): """The user released the key.""" - self.holding = False + if self._holding_lock is not None: + self._holding_lock.release() + for macro in self.child_macros: macro.release_key() - def hold(self, macro): + def hold(self, macro=None): """Loops the execution until key release.""" - if not isinstance(macro, _Macro): - raise ValueError( - 'Expected the param for h (hold) to be ' - f'a macro (like k(a)), but got "{macro}"' - ) - - async def task(): - while self.holding: - await macro.run() + if macro is None: + # no parameters: block until released + async def task(): + # wait until the key is released. Only then it will be + # able to acquire the lock. Release it right after so that + # it can be acquired by press_key again. + await self._holding_lock.acquire() + self._holding_lock.release() + # this seems to be much more efficient on the CPU than + # looping `await asyncio.sleep(0.001)` + + self.tasks.append((1234, task)) + else: + if not isinstance(macro, _Macro): + raise ValueError( + 'Expected the param for h (hold) to be ' + f'a macro (like k(a)), but got "{macro}"' + ) - self.tasks.append((REPEAT, task)) + async def task(): + while self.is_holding(): + # run the child macro completely to avoid + # not-releasing any key + await macro.run() - self.child_macros.append(macro) + self.tasks.append((REPEAT, task)) + self.child_macros.append(macro) return self @@ -335,6 +360,9 @@ def _parse_recurse(macro, mapping, macro_instance=None, depth=0): assert isinstance(macro, str) assert isinstance(depth, int) + if macro == '': + return None + if macro_instance is None: macro_instance = _Macro(macro, mapping) else: @@ -354,7 +382,7 @@ def _parse_recurse(macro, mapping, macro_instance=None, depth=0): 'r': (macro_instance.repeat, 2, 2), 'k': (macro_instance.keycode, 1, 1), 'w': (macro_instance.wait, 1, 1), - 'h': (macro_instance.hold, 1, 1) + 'h': (macro_instance.hold, 0, 1) } function = functions.get(call) diff --git a/tests/testcases/test_keycode_mapper.py b/tests/testcases/test_keycode_mapper.py index b7bfa258..2ae353a0 100644 --- a/tests/testcases/test_keycode_mapper.py +++ b/tests/testcases/test_keycode_mapper.py @@ -78,9 +78,9 @@ class TestKeycodeMapper(unittest.TestCase): def tearDown(self): # make sure all macros are stopped by tests for macro in active_macros.values(): - if macro.holding: + if macro.is_holding(): macro.release_key() - self.assertFalse(macro.holding) + self.assertFalse(macro.is_holding()) self.assertFalse(macro.running) cleanup() @@ -451,7 +451,7 @@ class TestKeycodeMapper(unittest.TestCase): keystroke_sleep = config.get('macros.keystroke_sleep_ms', 10) loop.run_until_complete(asyncio.sleep(sleeptime / 1000)) - self.assertTrue(active_macros[(EV_KEY, 1)].holding) + self.assertTrue(active_macros[(EV_KEY, 1)].is_holding()) self.assertTrue(active_macros[(EV_KEY, 1)].running) """stop macro""" @@ -480,7 +480,7 @@ class TestKeycodeMapper(unittest.TestCase): count_after = len(history) self.assertEqual(count_before, count_after) - self.assertFalse(active_macros[(EV_KEY, 1)].holding) + self.assertFalse(active_macros[(EV_KEY, 1)].is_holding()) self.assertFalse(active_macros[(EV_KEY, 1)].running) def test_hold_2(self): @@ -525,11 +525,11 @@ class TestKeycodeMapper(unittest.TestCase): handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None) handle_keycode({}, macro_mapping, new_event(EV_KEY, 3, 1), None) loop.run_until_complete(asyncio.sleep(0.05)) - self.assertTrue(active_macros[(EV_KEY, 1)].holding) + self.assertTrue(active_macros[(EV_KEY, 1)].is_holding()) self.assertTrue(active_macros[(EV_KEY, 1)].running) - self.assertTrue(active_macros[(EV_KEY, 2)].holding) + self.assertTrue(active_macros[(EV_KEY, 2)].is_holding()) self.assertTrue(active_macros[(EV_KEY, 2)].running) - self.assertTrue(active_macros[(EV_KEY, 3)].holding) + self.assertTrue(active_macros[(EV_KEY, 3)].is_holding()) self.assertTrue(active_macros[(EV_KEY, 3)].running) # there should only be one code_c in the events, because no key @@ -573,11 +573,11 @@ class TestKeycodeMapper(unittest.TestCase): handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 0), None) handle_keycode({}, macro_mapping, new_event(EV_KEY, 3, 0), None) loop.run_until_complete(asyncio.sleep(0.05)) - self.assertFalse(active_macros[(EV_KEY, 1)].holding) + self.assertFalse(active_macros[(EV_KEY, 1)].is_holding()) self.assertFalse(active_macros[(EV_KEY, 1)].running) - self.assertTrue(active_macros[(EV_KEY, 2)].holding) + self.assertTrue(active_macros[(EV_KEY, 2)].is_holding()) self.assertTrue(active_macros[(EV_KEY, 2)].running) - self.assertFalse(active_macros[(EV_KEY, 3)].holding) + self.assertFalse(active_macros[(EV_KEY, 3)].is_holding()) self.assertFalse(active_macros[(EV_KEY, 3)].running) # stop macro 2 @@ -601,11 +601,11 @@ class TestKeycodeMapper(unittest.TestCase): count_after = len(history) self.assertEqual(count_before, count_after) - self.assertFalse(active_macros[(EV_KEY, 1)].holding) + self.assertFalse(active_macros[(EV_KEY, 1)].is_holding()) self.assertFalse(active_macros[(EV_KEY, 1)].running) - self.assertFalse(active_macros[(EV_KEY, 2)].holding) + self.assertFalse(active_macros[(EV_KEY, 2)].is_holding()) self.assertFalse(active_macros[(EV_KEY, 2)].running) - self.assertFalse(active_macros[(EV_KEY, 3)].holding) + self.assertFalse(active_macros[(EV_KEY, 3)].is_holding()) self.assertFalse(active_macros[(EV_KEY, 3)].running) def test_hold_3(self): @@ -634,7 +634,7 @@ class TestKeycodeMapper(unittest.TestCase): loop.run_until_complete(asyncio.sleep(0.1)) for _ in range(5): - self.assertTrue(active_macros[(EV_KEY, 1)].holding) + self.assertTrue(active_macros[(EV_KEY, 1)].is_holding()) self.assertTrue(active_macros[(EV_KEY, 1)].running) handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None) loop.run_until_complete(asyncio.sleep(0.05)) @@ -652,7 +652,7 @@ class TestKeycodeMapper(unittest.TestCase): self.assertEqual(history.count((code_a, 0)), 1) self.assertEqual(history.count((code_c, 1)), 1) self.assertEqual(history.count((code_c, 0)), 1) - self.assertFalse(active_macros[(EV_KEY, 1)].holding) + self.assertFalse(active_macros[(EV_KEY, 1)].is_holding()) self.assertFalse(active_macros[(EV_KEY, 1)].running) # it's stopped and won't write stuff anymore @@ -727,9 +727,9 @@ class TestKeycodeMapper(unittest.TestCase): loop.run_until_complete(asyncio.sleep(sleeptime / 1000)) self.assertEqual(len(active_macros), 2) - self.assertTrue(active_macros[key_1].holding) + self.assertTrue(active_macros[key_1].is_holding()) self.assertTrue(active_macros[key_1].running) - self.assertTrue(active_macros[key_2].holding) + self.assertTrue(active_macros[key_2].is_holding()) self.assertTrue(active_macros[key_2].running) self.assertIn(down_0[:2], unreleased) @@ -748,9 +748,9 @@ class TestKeycodeMapper(unittest.TestCase): loop.run_until_complete(asyncio.sleep(keystroke_sleep * 10 / 1000)) - self.assertFalse(active_macros[key_1].holding) + self.assertFalse(active_macros[key_1].is_holding()) self.assertFalse(active_macros[key_1].running) - self.assertFalse(active_macros[key_2].holding) + self.assertFalse(active_macros[key_2].is_holding()) self.assertFalse(active_macros[key_2].running) events = calculate_event_number(sleeptime, 1, 1) * 2 diff --git a/tests/testcases/test_macros.py b/tests/testcases/test_macros.py index 153ff2b4..dccb883e 100644 --- a/tests/testcases/test_macros.py +++ b/tests/testcases/test_macros.py @@ -164,23 +164,48 @@ class TestMacros(unittest.TestCase): macro.press_key() asyncio.ensure_future(macro.run()) self.loop.run_until_complete(asyncio.sleep(0.2)) + self.assertTrue(macro.is_holding()) + self.assertGreater(len(self.result), 2) + macro.release_key() self.loop.run_until_complete(asyncio.sleep(0.05)) + self.assertFalse(macro.is_holding()) - self.assertEqual( - self.result[0], - (system_mapping.get('1'), 1) - ) - self.assertEqual( - self.result[-1], - (system_mapping.get('3'), 0) - ) + self.assertEqual(self.result[0], (system_mapping.get('1'), 1)) + self.assertEqual(self.result[-1], (system_mapping.get('3'), 0)) code_a = system_mapping.get('a') self.assertGreater(self.result.count((code_a, 1)), 2) self.assertEqual(len(macro.child_macros), 1) + def test_hold_forever(self): + macro = parse('k(1).h().k(3)', self.mapping) + macro.set_handler(self.handler) + self.assertSetEqual(macro.get_capabilities(), { + system_mapping.get('1'), + system_mapping.get('3') + }) + + macro.press_key() + asyncio.ensure_future(macro.run()) + self.loop.run_until_complete(asyncio.sleep(0.1)) + self.assertTrue(macro.is_holding()) + self.assertEqual(len(self.result), 2) + self.loop.run_until_complete(asyncio.sleep(0.1)) + # doesn't do fancy stuff, is blocking until the release + self.assertEqual(len(self.result), 2) + + macro.release_key() + self.loop.run_until_complete(asyncio.sleep(0.05)) + self.assertFalse(macro.is_holding()) + self.assertEqual(len(self.result), 4) + + self.assertEqual(self.result[0], (system_mapping.get('1'), 1)) + self.assertEqual(self.result[-1], (system_mapping.get('3'), 0)) + + self.assertEqual(len(macro.child_macros), 0) + def test_2(self): start = time.time() repeats = 20