From 0f2129712ccd8d34f7947e8bf96fd9718550c51c Mon Sep 17 00:00:00 2001 From: sezanzeb Date: Sat, 16 Oct 2021 11:38:34 +0200 Subject: [PATCH] #194 Fixes nested if_tap macros --- keymapper/injection/consumer_control.py | 6 +- .../injection/consumers/keycode_mapper.py | 5 +- keymapper/injection/macros/macro.py | 54 ++++--- readme/examples.md | 8 +- tests/testcases/test_keycode_mapper.py | 2 +- tests/testcases/test_macros.py | 138 +++++++++++++----- 6 files changed, 151 insertions(+), 62 deletions(-) diff --git a/keymapper/injection/consumer_control.py b/keymapper/injection/consumer_control.py index cd26161f..cacdaffe 100644 --- a/keymapper/injection/consumer_control.py +++ b/keymapper/injection/consumer_control.py @@ -97,15 +97,15 @@ class ConsumerControl: for consumer in self._consumers: # copy so that the consumer doesn't screw this up for # all other future consumers - event = evdev.InputEvent( + event_copy = evdev.InputEvent( sec=event.sec, usec=event.usec, type=event.type, code=event.code, value=event.value, ) - if consumer.is_handled(event): - await consumer.notify(event) + if consumer.is_handled(event_copy): + await consumer.notify(event_copy) handled = True if not handled: diff --git a/keymapper/injection/consumers/keycode_mapper.py b/keymapper/injection/consumers/keycode_mapper.py index 73a700c1..291772f0 100644 --- a/keymapper/injection/consumers/keycode_mapper.py +++ b/keymapper/injection/consumers/keycode_mapper.py @@ -422,7 +422,7 @@ class KeycodeMapper(Consumer): if active_macro is not None and active_macro.is_holding(): # Tell the macro for that keycode that the key is released and # let it decide what to do with that information. - active_macro.release_key() + active_macro.release_trigger() logger.key_spam(key, "releasing macro") if type_and_code in unreleased: @@ -476,6 +476,7 @@ class KeycodeMapper(Consumer): # not finished, especially since gamepad-triggers report a ton # of events with a positive value. logger.key_spam(key, "macro already running") + self.context.macros[key].press_trigger() return """starting new macros or injecting new keys""" @@ -489,7 +490,7 @@ class KeycodeMapper(Consumer): macro = self.context.macros[key] active_macros[type_and_code] = macro Unreleased((None, None), (*type_and_code, action), key) - macro.press_key() + macro.press_trigger() logger.key_spam(key, "maps to macro %s", macro.code) asyncio.ensure_future(macro.run(self.macro_write)) return diff --git a/keymapper/injection/macros/macro.py b/keymapper/injection/macros/macro.py index fb51dceb..15093d74 100644 --- a/keymapper/injection/macros/macro.py +++ b/keymapper/injection/macros/macro.py @@ -192,8 +192,11 @@ class Macro: self.tasks = [] # can be used to wait for the release of the event - self._holding_event = asyncio.Event() - self._holding_event.set() # released by default + self._trigger_release_event = asyncio.Event() + self._trigger_press_event = asyncio.Event() + # released by default + self._trigger_release_event.set() + self._trigger_press_event.clear() self.running = False @@ -220,7 +223,7 @@ class Macro: self._newest_action = action self._new_event_arrived.set() - async def wait_for_event(self, filter=None): + async def _wait_for_event(self, filter=None): """Wait until a specific event arrives. The parameters can be used to provide a filter. It will block @@ -243,7 +246,7 @@ class Macro: def is_holding(self): """Check if the macro is waiting for a key to be released.""" - return not self._holding_event.is_set() + return not self._trigger_release_event.is_set() def get_capabilities(self): """Get the merged capabilities of the macro and its children.""" @@ -290,23 +293,25 @@ class Macro: # done self.running = False - def press_key(self): - """The user pressed the key down.""" + def press_trigger(self): + """The user pressed the trigger key down.""" if self.is_holding(): logger.error("Already holding") return - self._holding_event.clear() + self._trigger_release_event.clear() + self._trigger_press_event.set() for macro in self.child_macros: - macro.press_key() + macro.press_trigger() - def release_key(self): - """The user released the key.""" - self._holding_event.set() + def release_trigger(self): + """The user released the trigger key.""" + self._trigger_release_event.set() + self._trigger_press_event.clear() for macro in self.child_macros: - macro.release_key() + macro.release_trigger() async def _keycode_pause(self, _=None): """To add a pause between keystrokes.""" @@ -329,7 +334,7 @@ class Macro: _type_check(macro, [Macro, str, None], "h (hold)", 1) if macro is None: - self.tasks.append(lambda _: self._holding_event.wait()) + self.tasks.append(lambda _: self._trigger_release_event.wait()) return if not isinstance(macro, Macro): @@ -341,7 +346,7 @@ class Macro: resolved_code = _resolve(code, [int]) self.capabilities[EV_KEY].add(resolved_code) handler(EV_KEY, resolved_code, 1) - await self._holding_event.wait() + await self._trigger_release_event.wait() handler(EV_KEY, resolved_code, 0) self.capabilities[EV_KEY].add(code) @@ -575,7 +580,13 @@ class Macro: self.tasks.append(task) def add_if_tap(self, then=None, _else=None, timeout=300): - """If a key was pressed quickly.""" + """If a key was pressed quickly. + + macro key pressed -> if_tap starts -> key released -> then + + macro key pressed -> released (does other stuff in the meantime) + -> if_tap starts -> pressed -> released -> then + """ _type_check(then, [Macro, None], "if_tap", 1) _type_check(_else, [Macro, None], "if_tap", 2) timeout = _type_check(timeout, [int, float], "if_tap", 3) @@ -585,11 +596,18 @@ class Macro: if isinstance(_else, Macro): self.child_macros.append(_else) + async def wait(): + """Wait for a release, or if nothing pressed yet, a press and release.""" + if self.is_holding(): + await self._trigger_release_event.wait() + else: + await self._trigger_press_event.wait() + await self._trigger_release_event.wait() + async def task(handler): - coroutine = self._holding_event.wait() resolved_timeout = _resolve(timeout, [int, float]) / 1000 try: - await asyncio.wait_for(coroutine, resolved_timeout) + await asyncio.wait_for(wait(), resolved_timeout) if then: await then.run(handler) except asyncio.TimeoutError: @@ -621,7 +639,7 @@ class Macro: if action in (PRESS, PRESS_NEGATIVE): return True - coroutine = self.wait_for_event(event_filter) + coroutine = self._wait_for_event(event_filter) resolved_timeout = _resolve(timeout, allowed_types=[int, float, None]) try: if resolved_timeout is not None: diff --git a/readme/examples.md b/readme/examples.md index b65f24c7..4ff50531 100644 --- a/readme/examples.md +++ b/readme/examples.md @@ -11,7 +11,7 @@ Examples for particular devices and/or use cases: - Mouse scroll `wheel(down, 1)` `wheel(up, 1)` - Mouse move `mouse(left, 1)` `mouse(right, 1)` `mouse(up, 1)` `mouse(down, 1)` -## Macros +## Quick Overview of Macros - `k(BTN_LEFT)` a single mouse-click - `k(1).k(2)` 1, 2 @@ -31,6 +31,12 @@ Examples for particular devices and/or use cases: - `if_single(k(a), k(b))` writes b if another key is pressed, or a if the key is released and no other key was pressed in the meantime. +## Double Tap + +`if_tap(if_tap(k(a), k(b)), k(c))` + +Will write "a" if tapped twice, "b" if tapped once and "c" if held down long enough + ## Combinations Spanning Multiple Devices For regular combinations on only single devices it is not required to diff --git a/tests/testcases/test_keycode_mapper.py b/tests/testcases/test_keycode_mapper.py index 6f694f4b..ab74406c 100644 --- a/tests/testcases/test_keycode_mapper.py +++ b/tests/testcases/test_keycode_mapper.py @@ -106,7 +106,7 @@ class TestKeycodeMapper(unittest.IsolatedAsyncioTestCase): for macro in active_macros.values(): if macro.is_holding(): - macro.release_key() + macro.release_trigger() asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.01)) self.assertFalse(macro.is_holding()) self.assertFalse(macro.running) diff --git a/tests/testcases/test_macros.py b/tests/testcases/test_macros.py index e9f44804..e7b05406 100644 --- a/tests/testcases/test_macros.py +++ b/tests/testcases/test_macros.py @@ -271,7 +271,7 @@ class TestMacros(MacroTestBase): }, ) - macro.press_key() + macro.press_trigger() asyncio.ensure_future(macro.run(self.handler)) await asyncio.sleep(0.2) self.assertTrue(macro.is_holding()) @@ -283,7 +283,7 @@ class TestMacros(MacroTestBase): self.assertEqual(self.result[3], (EV_KEY, system_mapping.get("d"), 1)) # and then releases starting with the previously pressed key - macro.release_key() + macro.release_trigger() await asyncio.sleep(0.2) self.assertFalse(macro.is_holding()) self.assertEqual(self.result[4], (EV_KEY, system_mapping.get("d"), 0)) @@ -515,11 +515,11 @@ class TestMacros(MacroTestBase): """down""" - macro.press_key() + macro.press_trigger() await (asyncio.sleep(0.05)) self.assertTrue(macro.is_holding()) - macro.press_key() # redundantly calling doesn't break anything + macro.press_trigger() # redundantly calling doesn't break anything asyncio.ensure_future(macro.run(self.handler)) await asyncio.sleep(0.2) self.assertTrue(macro.is_holding()) @@ -527,7 +527,7 @@ class TestMacros(MacroTestBase): """up""" - macro.release_key() + macro.release_trigger() await (asyncio.sleep(0.05)) self.assertFalse(macro.is_holding()) @@ -549,7 +549,7 @@ class TestMacros(MacroTestBase): asyncio.ensure_future(macro.run(self.handler)) await asyncio.sleep(0.2) self.assertFalse(macro.is_holding()) - # press_key was never called, so the macro completes right away + # press_trigger was never called, so the macro completes right away # and the child macro of hold is never called. self.assertEqual(len(self.result), 4) @@ -567,7 +567,7 @@ class TestMacros(MacroTestBase): """down""" - macro.press_key() + macro.press_trigger() asyncio.ensure_future(macro.run(self.handler)) await (asyncio.sleep(0.1)) self.assertTrue(macro.is_holding()) @@ -578,7 +578,7 @@ class TestMacros(MacroTestBase): """up""" - macro.release_key() + macro.release_trigger() await (asyncio.sleep(0.05)) self.assertFalse(macro.is_holding()) self.assertEqual(len(self.result), 4) @@ -598,7 +598,7 @@ class TestMacros(MacroTestBase): asyncio.ensure_future(macro.run(self.handler)) await (asyncio.sleep(0.1)) self.assertFalse(macro.is_holding()) - # since press_key was never called it just does the macro + # since press_trigger was never called it just does the macro # completely self.assertEqual(len(self.result), 4) @@ -620,12 +620,12 @@ class TestMacros(MacroTestBase): """down""" - macro.press_key() + macro.press_trigger() await (asyncio.sleep(0.05)) self.assertTrue(macro.is_holding()) asyncio.ensure_future(macro.run(self.handler)) - macro.press_key() # redundantly calling doesn't break anything + macro.press_trigger() # redundantly calling doesn't break anything await asyncio.sleep(0.2) self.assertTrue(macro.is_holding()) self.assertEqual(len(self.result), 1) @@ -633,7 +633,7 @@ class TestMacros(MacroTestBase): """up""" - macro.release_key() + macro.release_trigger() await (asyncio.sleep(0.05)) self.assertFalse(macro.is_holding()) @@ -776,7 +776,7 @@ class TestMacros(MacroTestBase): async def test_duplicate_run(self): # it won't restart the macro, because that may screw up the - # internal state (in particular the _holding_event). + # internal state (in particular the _trigger_release_event). # I actually don't know at all what kind of bugs that might produce, # lets just avoid it. It might cause it to be held down forever. a = system_mapping.get("a") @@ -790,14 +790,14 @@ class TestMacros(MacroTestBase): asyncio.ensure_future(macro.run(self.handler)) # ignored self.assertFalse(macro.is_holding()) - macro.press_key() + macro.press_trigger() await asyncio.sleep(0.2) self.assertTrue(macro.is_holding()) asyncio.ensure_future(macro.run(self.handler)) # ignored self.assertTrue(macro.is_holding()) - macro.release_key() + macro.release_trigger() await asyncio.sleep(0.2) self.assertFalse(macro.is_holding()) @@ -814,10 +814,10 @@ class TestMacros(MacroTestBase): """not ignored, since previous run is over""" asyncio.ensure_future(macro.run(self.handler)) - macro.press_key() + macro.press_trigger() await asyncio.sleep(0.2) self.assertTrue(macro.is_holding()) - macro.release_key() + macro.release_trigger() await asyncio.sleep(0.2) self.assertFalse(macro.is_holding()) @@ -834,15 +834,15 @@ class TestMacros(MacroTestBase): async def test_mouse(self): macro_1 = parse("mouse(up, 4)", self.context) macro_2 = parse("wheel(left, 3)", self.context) - macro_1.press_key() - macro_2.press_key() + macro_1.press_trigger() + macro_2.press_trigger() asyncio.ensure_future(macro_1.run(self.handler)) asyncio.ensure_future(macro_2.run(self.handler)) await (asyncio.sleep(0.1)) self.assertTrue(macro_1.is_holding()) self.assertTrue(macro_2.is_holding()) - macro_1.release_key() - macro_2.release_key() + macro_1.release_trigger() + macro_2.release_trigger() self.assertIn((EV_REL, REL_Y, -4), self.result) self.assertIn((EV_REL, REL_HWHEEL, 1), self.result) @@ -876,34 +876,36 @@ class TestMacros(MacroTestBase): self.assertListEqual(self.result, [(5421, code, 154)]) self.assertEqual(len(macro.child_macros), 1) - async def test_wait_for_event(self): + async def test__wait_for_event(self): macro = parse("h(a)", self.context) try: # should timeout, no event known - await asyncio.wait_for(macro.wait_for_event(), 0.1) + await asyncio.wait_for(macro._wait_for_event(), 0.1) raise AssertionError("Expected asyncio.TimeoutError") except asyncio.TimeoutError: pass # should not timeout because a new event arrived macro.notify(new_event(EV_KEY, 1, 1), PRESS) - await asyncio.wait_for(macro.wait_for_event(), 0.1) + await asyncio.wait_for(macro._wait_for_event(), 0.1) try: # should timeout, because the previous event doesn't match the filter - await asyncio.wait_for(macro.wait_for_event(lambda e, a: e.value == 3), 0.1) + await asyncio.wait_for( + macro._wait_for_event(lambda e, a: e.value == 3), 0.1 + ) raise AssertionError("Expected asyncio.TimeoutError") except asyncio.TimeoutError: pass # should not timeout because a new event arrived macro.notify(new_event(EV_KEY, 1, 3), RELEASE) - await asyncio.wait_for(macro.wait_for_event(), 0.1) + await asyncio.wait_for(macro._wait_for_event(), 0.1) try: # should timeout, because the previous event doesn't match the filter - await asyncio.wait_for(macro.wait_for_event(lambda _, a: a == PRESS), 0.1) + await asyncio.wait_for(macro._wait_for_event(lambda _, a: a == PRESS), 0.1) raise AssertionError("Expected asyncio.TimeoutError") except asyncio.TimeoutError: pass @@ -1228,25 +1230,87 @@ class TestIfTap(MacroTestBase): y = system_mapping.get("y") self.assertSetEqual(macro.get_capabilities()[EV_KEY], {x, y}) - macro.press_key() + # this is the regular routine of how a macro is started. the tigger is pressed + # already when the macro runs, and released during if_tap within the timeout. + macro.press_trigger() asyncio.ensure_future(macro.run(self.handler)) await asyncio.sleep(0.05) - macro.release_key() + macro.release_trigger() await asyncio.sleep(0.05) self.assertListEqual(self.result, [(EV_KEY, x, 1), (EV_KEY, x, 0)]) self.assertFalse(macro.running) + async def test_if_tap_2(self): + # when the press arrives shortly after run. + # a tap will happen within the timeout even if the tigger is not pressed when + # it does into if_tap + macro = parse("if_tap(k(a), k(b), 100)", self.context) + asyncio.ensure_future(macro.run(self.handler)) + + await asyncio.sleep(0.01) + macro.press_trigger() + await asyncio.sleep(0.01) + macro.release_trigger() + await asyncio.sleep(0.2) + self.assertListEqual(self.result, [(EV_KEY, KEY_A, 1), (EV_KEY, KEY_A, 0)]) + self.assertFalse(macro.running) + self.result.clear() + + async def test_if_double_tap(self): + macro = parse("if_tap(if_tap(k(a), k(b), 100), k(c), 100)", self.context) + self.assertEqual(len(macro.child_macros), 2) + self.assertEqual(len(macro.child_macros[0].child_macros), 2) + self.assertSetEqual(macro.get_capabilities()[EV_KEY], {KEY_A, KEY_B, KEY_C}) + + asyncio.ensure_future(macro.run(self.handler)) + + # first tap + macro.press_trigger() + await asyncio.sleep(0.05) + macro.release_trigger() + + # second tap + await asyncio.sleep(0.04) + macro.press_trigger() + await asyncio.sleep(0.04) + macro.release_trigger() + + await asyncio.sleep(0.05) + self.assertListEqual(self.result, [(EV_KEY, KEY_A, 1), (EV_KEY, KEY_A, 0)]) + self.assertFalse(macro.running) + self.result.clear() + + """If the second tap takes too long, runs else there""" + + asyncio.ensure_future(macro.run(self.handler)) + + # first tap + macro.press_trigger() + await asyncio.sleep(0.05) + macro.release_trigger() + + # second tap + await asyncio.sleep(0.06) + macro.press_trigger() + await asyncio.sleep(0.06) + macro.release_trigger() + + await asyncio.sleep(0.05) + self.assertListEqual(self.result, [(EV_KEY, KEY_B, 1), (EV_KEY, KEY_B, 0)]) + self.assertFalse(macro.running) + self.result.clear() + async def test_if_tap_none(self): # first param none macro = parse("if_tap(, k(y), 100)", self.context) self.assertEqual(len(macro.child_macros), 1) y = system_mapping.get("y") self.assertSetEqual(macro.get_capabilities()[EV_KEY], {y}) - macro.press_key() + macro.press_trigger() asyncio.ensure_future(macro.run(self.handler)) await asyncio.sleep(0.05) - macro.release_key() + macro.release_trigger() await asyncio.sleep(0.05) self.assertListEqual(self.result, []) @@ -1255,10 +1319,10 @@ class TestIfTap(MacroTestBase): self.assertEqual(len(macro.child_macros), 1) y = system_mapping.get("y") self.assertSetEqual(macro.get_capabilities()[EV_KEY], {y}) - macro.press_key() + macro.press_trigger() asyncio.ensure_future(macro.run(self.handler)) await asyncio.sleep(0.1) - macro.release_key() + macro.release_trigger() await asyncio.sleep(0.05) self.assertListEqual(self.result, []) @@ -1272,10 +1336,10 @@ class TestIfTap(MacroTestBase): y = system_mapping.get("y") self.assertSetEqual(macro.get_capabilities()[EV_KEY], {x, y}) - macro.press_key() + macro.press_trigger() asyncio.ensure_future(macro.run(self.handler)) await asyncio.sleep(0.1) - macro.release_key() + macro.release_trigger() await asyncio.sleep(0.05) self.assertListEqual(self.result, [(EV_KEY, y, 1), (EV_KEY, y, 0)]) @@ -1289,10 +1353,10 @@ class TestIfTap(MacroTestBase): y = system_mapping.get("y") self.assertSetEqual(macro.get_capabilities()[EV_KEY], {x, y}) - macro.press_key() + macro.press_trigger() asyncio.ensure_future(macro.run(self.handler)) await asyncio.sleep(0.1) - macro.release_key() + macro.release_trigger() await asyncio.sleep(0.05) self.assertListEqual(self.result, [(EV_KEY, y, 1), (EV_KEY, y, 0)])