#194 Fixes nested if_tap macros

xkb
sezanzeb 3 years ago
parent b41f1fe3ab
commit 0f2129712c

@ -97,15 +97,15 @@ class ConsumerControl:
for consumer in self._consumers:
# copy so that the consumer doesn't screw this up for
# all other future consumers
event = evdev.InputEvent(
event_copy = evdev.InputEvent(
sec=event.sec,
usec=event.usec,
type=event.type,
code=event.code,
value=event.value,
)
if consumer.is_handled(event):
await consumer.notify(event)
if consumer.is_handled(event_copy):
await consumer.notify(event_copy)
handled = True
if not handled:

@ -422,7 +422,7 @@ class KeycodeMapper(Consumer):
if active_macro is not None and active_macro.is_holding():
# Tell the macro for that keycode that the key is released and
# let it decide what to do with that information.
active_macro.release_key()
active_macro.release_trigger()
logger.key_spam(key, "releasing macro")
if type_and_code in unreleased:
@ -476,6 +476,7 @@ class KeycodeMapper(Consumer):
# not finished, especially since gamepad-triggers report a ton
# of events with a positive value.
logger.key_spam(key, "macro already running")
self.context.macros[key].press_trigger()
return
"""starting new macros or injecting new keys"""
@ -489,7 +490,7 @@ class KeycodeMapper(Consumer):
macro = self.context.macros[key]
active_macros[type_and_code] = macro
Unreleased((None, None), (*type_and_code, action), key)
macro.press_key()
macro.press_trigger()
logger.key_spam(key, "maps to macro %s", macro.code)
asyncio.ensure_future(macro.run(self.macro_write))
return

@ -192,8 +192,11 @@ class Macro:
self.tasks = []
# can be used to wait for the release of the event
self._holding_event = asyncio.Event()
self._holding_event.set() # released by default
self._trigger_release_event = asyncio.Event()
self._trigger_press_event = asyncio.Event()
# released by default
self._trigger_release_event.set()
self._trigger_press_event.clear()
self.running = False
@ -220,7 +223,7 @@ class Macro:
self._newest_action = action
self._new_event_arrived.set()
async def wait_for_event(self, filter=None):
async def _wait_for_event(self, filter=None):
"""Wait until a specific event arrives.
The parameters can be used to provide a filter. It will block
@ -243,7 +246,7 @@ class Macro:
def is_holding(self):
"""Check if the macro is waiting for a key to be released."""
return not self._holding_event.is_set()
return not self._trigger_release_event.is_set()
def get_capabilities(self):
"""Get the merged capabilities of the macro and its children."""
@ -290,23 +293,25 @@ class Macro:
# done
self.running = False
def press_key(self):
"""The user pressed the key down."""
def press_trigger(self):
"""The user pressed the trigger key down."""
if self.is_holding():
logger.error("Already holding")
return
self._holding_event.clear()
self._trigger_release_event.clear()
self._trigger_press_event.set()
for macro in self.child_macros:
macro.press_key()
macro.press_trigger()
def release_key(self):
"""The user released the key."""
self._holding_event.set()
def release_trigger(self):
"""The user released the trigger key."""
self._trigger_release_event.set()
self._trigger_press_event.clear()
for macro in self.child_macros:
macro.release_key()
macro.release_trigger()
async def _keycode_pause(self, _=None):
"""To add a pause between keystrokes."""
@ -329,7 +334,7 @@ class Macro:
_type_check(macro, [Macro, str, None], "h (hold)", 1)
if macro is None:
self.tasks.append(lambda _: self._holding_event.wait())
self.tasks.append(lambda _: self._trigger_release_event.wait())
return
if not isinstance(macro, Macro):
@ -341,7 +346,7 @@ class Macro:
resolved_code = _resolve(code, [int])
self.capabilities[EV_KEY].add(resolved_code)
handler(EV_KEY, resolved_code, 1)
await self._holding_event.wait()
await self._trigger_release_event.wait()
handler(EV_KEY, resolved_code, 0)
self.capabilities[EV_KEY].add(code)
@ -575,7 +580,13 @@ class Macro:
self.tasks.append(task)
def add_if_tap(self, then=None, _else=None, timeout=300):
"""If a key was pressed quickly."""
"""If a key was pressed quickly.
macro key pressed -> if_tap starts -> key released -> then
macro key pressed -> released (does other stuff in the meantime)
-> if_tap starts -> pressed -> released -> then
"""
_type_check(then, [Macro, None], "if_tap", 1)
_type_check(_else, [Macro, None], "if_tap", 2)
timeout = _type_check(timeout, [int, float], "if_tap", 3)
@ -585,11 +596,18 @@ class Macro:
if isinstance(_else, Macro):
self.child_macros.append(_else)
async def wait():
"""Wait for a release, or if nothing pressed yet, a press and release."""
if self.is_holding():
await self._trigger_release_event.wait()
else:
await self._trigger_press_event.wait()
await self._trigger_release_event.wait()
async def task(handler):
coroutine = self._holding_event.wait()
resolved_timeout = _resolve(timeout, [int, float]) / 1000
try:
await asyncio.wait_for(coroutine, resolved_timeout)
await asyncio.wait_for(wait(), resolved_timeout)
if then:
await then.run(handler)
except asyncio.TimeoutError:
@ -621,7 +639,7 @@ class Macro:
if action in (PRESS, PRESS_NEGATIVE):
return True
coroutine = self.wait_for_event(event_filter)
coroutine = self._wait_for_event(event_filter)
resolved_timeout = _resolve(timeout, allowed_types=[int, float, None])
try:
if resolved_timeout is not None:

@ -11,7 +11,7 @@ Examples for particular devices and/or use cases:
- Mouse scroll `wheel(down, 1)` `wheel(up, 1)`
- Mouse move `mouse(left, 1)` `mouse(right, 1)` `mouse(up, 1)` `mouse(down, 1)`
## Macros
## Quick Overview of Macros
- `k(BTN_LEFT)` a single mouse-click
- `k(1).k(2)` 1, 2
@ -31,6 +31,12 @@ Examples for particular devices and/or use cases:
- `if_single(k(a), k(b))` writes b if another key is pressed, or a if the key is released
and no other key was pressed in the meantime.
## Double Tap
`if_tap(if_tap(k(a), k(b)), k(c))`
Will write "a" if tapped twice, "b" if tapped once and "c" if held down long enough
## Combinations Spanning Multiple Devices
For regular combinations on only single devices it is not required to

@ -106,7 +106,7 @@ class TestKeycodeMapper(unittest.IsolatedAsyncioTestCase):
for macro in active_macros.values():
if macro.is_holding():
macro.release_key()
macro.release_trigger()
asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.01))
self.assertFalse(macro.is_holding())
self.assertFalse(macro.running)

@ -271,7 +271,7 @@ class TestMacros(MacroTestBase):
},
)
macro.press_key()
macro.press_trigger()
asyncio.ensure_future(macro.run(self.handler))
await asyncio.sleep(0.2)
self.assertTrue(macro.is_holding())
@ -283,7 +283,7 @@ class TestMacros(MacroTestBase):
self.assertEqual(self.result[3], (EV_KEY, system_mapping.get("d"), 1))
# and then releases starting with the previously pressed key
macro.release_key()
macro.release_trigger()
await asyncio.sleep(0.2)
self.assertFalse(macro.is_holding())
self.assertEqual(self.result[4], (EV_KEY, system_mapping.get("d"), 0))
@ -515,11 +515,11 @@ class TestMacros(MacroTestBase):
"""down"""
macro.press_key()
macro.press_trigger()
await (asyncio.sleep(0.05))
self.assertTrue(macro.is_holding())
macro.press_key() # redundantly calling doesn't break anything
macro.press_trigger() # redundantly calling doesn't break anything
asyncio.ensure_future(macro.run(self.handler))
await asyncio.sleep(0.2)
self.assertTrue(macro.is_holding())
@ -527,7 +527,7 @@ class TestMacros(MacroTestBase):
"""up"""
macro.release_key()
macro.release_trigger()
await (asyncio.sleep(0.05))
self.assertFalse(macro.is_holding())
@ -549,7 +549,7 @@ class TestMacros(MacroTestBase):
asyncio.ensure_future(macro.run(self.handler))
await asyncio.sleep(0.2)
self.assertFalse(macro.is_holding())
# press_key was never called, so the macro completes right away
# press_trigger was never called, so the macro completes right away
# and the child macro of hold is never called.
self.assertEqual(len(self.result), 4)
@ -567,7 +567,7 @@ class TestMacros(MacroTestBase):
"""down"""
macro.press_key()
macro.press_trigger()
asyncio.ensure_future(macro.run(self.handler))
await (asyncio.sleep(0.1))
self.assertTrue(macro.is_holding())
@ -578,7 +578,7 @@ class TestMacros(MacroTestBase):
"""up"""
macro.release_key()
macro.release_trigger()
await (asyncio.sleep(0.05))
self.assertFalse(macro.is_holding())
self.assertEqual(len(self.result), 4)
@ -598,7 +598,7 @@ class TestMacros(MacroTestBase):
asyncio.ensure_future(macro.run(self.handler))
await (asyncio.sleep(0.1))
self.assertFalse(macro.is_holding())
# since press_key was never called it just does the macro
# since press_trigger was never called it just does the macro
# completely
self.assertEqual(len(self.result), 4)
@ -620,12 +620,12 @@ class TestMacros(MacroTestBase):
"""down"""
macro.press_key()
macro.press_trigger()
await (asyncio.sleep(0.05))
self.assertTrue(macro.is_holding())
asyncio.ensure_future(macro.run(self.handler))
macro.press_key() # redundantly calling doesn't break anything
macro.press_trigger() # redundantly calling doesn't break anything
await asyncio.sleep(0.2)
self.assertTrue(macro.is_holding())
self.assertEqual(len(self.result), 1)
@ -633,7 +633,7 @@ class TestMacros(MacroTestBase):
"""up"""
macro.release_key()
macro.release_trigger()
await (asyncio.sleep(0.05))
self.assertFalse(macro.is_holding())
@ -776,7 +776,7 @@ class TestMacros(MacroTestBase):
async def test_duplicate_run(self):
# it won't restart the macro, because that may screw up the
# internal state (in particular the _holding_event).
# internal state (in particular the _trigger_release_event).
# I actually don't know at all what kind of bugs that might produce,
# lets just avoid it. It might cause it to be held down forever.
a = system_mapping.get("a")
@ -790,14 +790,14 @@ class TestMacros(MacroTestBase):
asyncio.ensure_future(macro.run(self.handler)) # ignored
self.assertFalse(macro.is_holding())
macro.press_key()
macro.press_trigger()
await asyncio.sleep(0.2)
self.assertTrue(macro.is_holding())
asyncio.ensure_future(macro.run(self.handler)) # ignored
self.assertTrue(macro.is_holding())
macro.release_key()
macro.release_trigger()
await asyncio.sleep(0.2)
self.assertFalse(macro.is_holding())
@ -814,10 +814,10 @@ class TestMacros(MacroTestBase):
"""not ignored, since previous run is over"""
asyncio.ensure_future(macro.run(self.handler))
macro.press_key()
macro.press_trigger()
await asyncio.sleep(0.2)
self.assertTrue(macro.is_holding())
macro.release_key()
macro.release_trigger()
await asyncio.sleep(0.2)
self.assertFalse(macro.is_holding())
@ -834,15 +834,15 @@ class TestMacros(MacroTestBase):
async def test_mouse(self):
macro_1 = parse("mouse(up, 4)", self.context)
macro_2 = parse("wheel(left, 3)", self.context)
macro_1.press_key()
macro_2.press_key()
macro_1.press_trigger()
macro_2.press_trigger()
asyncio.ensure_future(macro_1.run(self.handler))
asyncio.ensure_future(macro_2.run(self.handler))
await (asyncio.sleep(0.1))
self.assertTrue(macro_1.is_holding())
self.assertTrue(macro_2.is_holding())
macro_1.release_key()
macro_2.release_key()
macro_1.release_trigger()
macro_2.release_trigger()
self.assertIn((EV_REL, REL_Y, -4), self.result)
self.assertIn((EV_REL, REL_HWHEEL, 1), self.result)
@ -876,34 +876,36 @@ class TestMacros(MacroTestBase):
self.assertListEqual(self.result, [(5421, code, 154)])
self.assertEqual(len(macro.child_macros), 1)
async def test_wait_for_event(self):
async def test__wait_for_event(self):
macro = parse("h(a)", self.context)
try:
# should timeout, no event known
await asyncio.wait_for(macro.wait_for_event(), 0.1)
await asyncio.wait_for(macro._wait_for_event(), 0.1)
raise AssertionError("Expected asyncio.TimeoutError")
except asyncio.TimeoutError:
pass
# should not timeout because a new event arrived
macro.notify(new_event(EV_KEY, 1, 1), PRESS)
await asyncio.wait_for(macro.wait_for_event(), 0.1)
await asyncio.wait_for(macro._wait_for_event(), 0.1)
try:
# should timeout, because the previous event doesn't match the filter
await asyncio.wait_for(macro.wait_for_event(lambda e, a: e.value == 3), 0.1)
await asyncio.wait_for(
macro._wait_for_event(lambda e, a: e.value == 3), 0.1
)
raise AssertionError("Expected asyncio.TimeoutError")
except asyncio.TimeoutError:
pass
# should not timeout because a new event arrived
macro.notify(new_event(EV_KEY, 1, 3), RELEASE)
await asyncio.wait_for(macro.wait_for_event(), 0.1)
await asyncio.wait_for(macro._wait_for_event(), 0.1)
try:
# should timeout, because the previous event doesn't match the filter
await asyncio.wait_for(macro.wait_for_event(lambda _, a: a == PRESS), 0.1)
await asyncio.wait_for(macro._wait_for_event(lambda _, a: a == PRESS), 0.1)
raise AssertionError("Expected asyncio.TimeoutError")
except asyncio.TimeoutError:
pass
@ -1228,25 +1230,87 @@ class TestIfTap(MacroTestBase):
y = system_mapping.get("y")
self.assertSetEqual(macro.get_capabilities()[EV_KEY], {x, y})
macro.press_key()
# this is the regular routine of how a macro is started. the tigger is pressed
# already when the macro runs, and released during if_tap within the timeout.
macro.press_trigger()
asyncio.ensure_future(macro.run(self.handler))
await asyncio.sleep(0.05)
macro.release_key()
macro.release_trigger()
await asyncio.sleep(0.05)
self.assertListEqual(self.result, [(EV_KEY, x, 1), (EV_KEY, x, 0)])
self.assertFalse(macro.running)
async def test_if_tap_2(self):
# when the press arrives shortly after run.
# a tap will happen within the timeout even if the tigger is not pressed when
# it does into if_tap
macro = parse("if_tap(k(a), k(b), 100)", self.context)
asyncio.ensure_future(macro.run(self.handler))
await asyncio.sleep(0.01)
macro.press_trigger()
await asyncio.sleep(0.01)
macro.release_trigger()
await asyncio.sleep(0.2)
self.assertListEqual(self.result, [(EV_KEY, KEY_A, 1), (EV_KEY, KEY_A, 0)])
self.assertFalse(macro.running)
self.result.clear()
async def test_if_double_tap(self):
macro = parse("if_tap(if_tap(k(a), k(b), 100), k(c), 100)", self.context)
self.assertEqual(len(macro.child_macros), 2)
self.assertEqual(len(macro.child_macros[0].child_macros), 2)
self.assertSetEqual(macro.get_capabilities()[EV_KEY], {KEY_A, KEY_B, KEY_C})
asyncio.ensure_future(macro.run(self.handler))
# first tap
macro.press_trigger()
await asyncio.sleep(0.05)
macro.release_trigger()
# second tap
await asyncio.sleep(0.04)
macro.press_trigger()
await asyncio.sleep(0.04)
macro.release_trigger()
await asyncio.sleep(0.05)
self.assertListEqual(self.result, [(EV_KEY, KEY_A, 1), (EV_KEY, KEY_A, 0)])
self.assertFalse(macro.running)
self.result.clear()
"""If the second tap takes too long, runs else there"""
asyncio.ensure_future(macro.run(self.handler))
# first tap
macro.press_trigger()
await asyncio.sleep(0.05)
macro.release_trigger()
# second tap
await asyncio.sleep(0.06)
macro.press_trigger()
await asyncio.sleep(0.06)
macro.release_trigger()
await asyncio.sleep(0.05)
self.assertListEqual(self.result, [(EV_KEY, KEY_B, 1), (EV_KEY, KEY_B, 0)])
self.assertFalse(macro.running)
self.result.clear()
async def test_if_tap_none(self):
# first param none
macro = parse("if_tap(, k(y), 100)", self.context)
self.assertEqual(len(macro.child_macros), 1)
y = system_mapping.get("y")
self.assertSetEqual(macro.get_capabilities()[EV_KEY], {y})
macro.press_key()
macro.press_trigger()
asyncio.ensure_future(macro.run(self.handler))
await asyncio.sleep(0.05)
macro.release_key()
macro.release_trigger()
await asyncio.sleep(0.05)
self.assertListEqual(self.result, [])
@ -1255,10 +1319,10 @@ class TestIfTap(MacroTestBase):
self.assertEqual(len(macro.child_macros), 1)
y = system_mapping.get("y")
self.assertSetEqual(macro.get_capabilities()[EV_KEY], {y})
macro.press_key()
macro.press_trigger()
asyncio.ensure_future(macro.run(self.handler))
await asyncio.sleep(0.1)
macro.release_key()
macro.release_trigger()
await asyncio.sleep(0.05)
self.assertListEqual(self.result, [])
@ -1272,10 +1336,10 @@ class TestIfTap(MacroTestBase):
y = system_mapping.get("y")
self.assertSetEqual(macro.get_capabilities()[EV_KEY], {x, y})
macro.press_key()
macro.press_trigger()
asyncio.ensure_future(macro.run(self.handler))
await asyncio.sleep(0.1)
macro.release_key()
macro.release_trigger()
await asyncio.sleep(0.05)
self.assertListEqual(self.result, [(EV_KEY, y, 1), (EV_KEY, y, 0)])
@ -1289,10 +1353,10 @@ class TestIfTap(MacroTestBase):
y = system_mapping.get("y")
self.assertSetEqual(macro.get_capabilities()[EV_KEY], {x, y})
macro.press_key()
macro.press_trigger()
asyncio.ensure_future(macro.run(self.handler))
await asyncio.sleep(0.1)
macro.release_key()
macro.release_trigger()
await asyncio.sleep(0.05)
self.assertListEqual(self.result, [(EV_KEY, y, 1), (EV_KEY, y, 0)])

Loading…
Cancel
Save