efficiently hold forever until released with h()

xkb
sezanzeb 4 years ago committed by sezanzeb
parent b502bd2b8c
commit d5f1405e28

@ -271,7 +271,7 @@ def handle_keycode(key_to_code, macros, event, uinput, forward=True):
"""Releasing keys and macros"""
if is_key_up(event):
if active_macro is not None and active_macro.holding:
if active_macro is not None and active_macro.is_holding():
# Tell the macro for that keycode that the key is released and
# let it decide what to do with that information.
active_macro.release_key()

@ -79,8 +79,8 @@ class _Macro:
self.code = code
self.mapping = mapping
# supposed to be True between key event values 1 (down) and 0 (up)
self.holding = False
# is a lock so that h() can be realized
self._holding_lock = asyncio.Lock()
self.running = False
@ -89,6 +89,10 @@ class _Macro:
self.child_macros = []
def is_holding(self):
"""Check if the macro is waiting for a key to be released."""
return self._holding_lock.locked()
def get_capabilities(self):
"""Resolve all capabilities of the macro and those of its children."""
capabilities = self.capabilities.copy()
@ -123,31 +127,52 @@ class _Macro:
def press_key(self):
"""The user pressed the key down."""
self.holding = True
if self.is_holding():
logger.error('Already holding')
return
asyncio.ensure_future(self._holding_lock.acquire())
for macro in self.child_macros:
macro.press_key()
def release_key(self):
"""The user released the key."""
self.holding = False
if self._holding_lock is not None:
self._holding_lock.release()
for macro in self.child_macros:
macro.release_key()
def hold(self, macro):
def hold(self, macro=None):
"""Loops the execution until key release."""
if not isinstance(macro, _Macro):
raise ValueError(
'Expected the param for h (hold) to be '
f'a macro (like k(a)), but got "{macro}"'
)
async def task():
while self.holding:
await macro.run()
if macro is None:
# no parameters: block until released
async def task():
# wait until the key is released. Only then it will be
# able to acquire the lock. Release it right after so that
# it can be acquired by press_key again.
await self._holding_lock.acquire()
self._holding_lock.release()
# this seems to be much more efficient on the CPU than
# looping `await asyncio.sleep(0.001)`
self.tasks.append((1234, task))
else:
if not isinstance(macro, _Macro):
raise ValueError(
'Expected the param for h (hold) to be '
f'a macro (like k(a)), but got "{macro}"'
)
self.tasks.append((REPEAT, task))
async def task():
while self.is_holding():
# run the child macro completely to avoid
# not-releasing any key
await macro.run()
self.child_macros.append(macro)
self.tasks.append((REPEAT, task))
self.child_macros.append(macro)
return self
@ -335,6 +360,9 @@ def _parse_recurse(macro, mapping, macro_instance=None, depth=0):
assert isinstance(macro, str)
assert isinstance(depth, int)
if macro == '':
return None
if macro_instance is None:
macro_instance = _Macro(macro, mapping)
else:
@ -354,7 +382,7 @@ def _parse_recurse(macro, mapping, macro_instance=None, depth=0):
'r': (macro_instance.repeat, 2, 2),
'k': (macro_instance.keycode, 1, 1),
'w': (macro_instance.wait, 1, 1),
'h': (macro_instance.hold, 1, 1)
'h': (macro_instance.hold, 0, 1)
}
function = functions.get(call)

@ -78,9 +78,9 @@ class TestKeycodeMapper(unittest.TestCase):
def tearDown(self):
# make sure all macros are stopped by tests
for macro in active_macros.values():
if macro.holding:
if macro.is_holding():
macro.release_key()
self.assertFalse(macro.holding)
self.assertFalse(macro.is_holding())
self.assertFalse(macro.running)
cleanup()
@ -451,7 +451,7 @@ class TestKeycodeMapper(unittest.TestCase):
keystroke_sleep = config.get('macros.keystroke_sleep_ms', 10)
loop.run_until_complete(asyncio.sleep(sleeptime / 1000))
self.assertTrue(active_macros[(EV_KEY, 1)].holding)
self.assertTrue(active_macros[(EV_KEY, 1)].is_holding())
self.assertTrue(active_macros[(EV_KEY, 1)].running)
"""stop macro"""
@ -480,7 +480,7 @@ class TestKeycodeMapper(unittest.TestCase):
count_after = len(history)
self.assertEqual(count_before, count_after)
self.assertFalse(active_macros[(EV_KEY, 1)].holding)
self.assertFalse(active_macros[(EV_KEY, 1)].is_holding())
self.assertFalse(active_macros[(EV_KEY, 1)].running)
def test_hold_2(self):
@ -525,11 +525,11 @@ class TestKeycodeMapper(unittest.TestCase):
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 3, 1), None)
loop.run_until_complete(asyncio.sleep(0.05))
self.assertTrue(active_macros[(EV_KEY, 1)].holding)
self.assertTrue(active_macros[(EV_KEY, 1)].is_holding())
self.assertTrue(active_macros[(EV_KEY, 1)].running)
self.assertTrue(active_macros[(EV_KEY, 2)].holding)
self.assertTrue(active_macros[(EV_KEY, 2)].is_holding())
self.assertTrue(active_macros[(EV_KEY, 2)].running)
self.assertTrue(active_macros[(EV_KEY, 3)].holding)
self.assertTrue(active_macros[(EV_KEY, 3)].is_holding())
self.assertTrue(active_macros[(EV_KEY, 3)].running)
# there should only be one code_c in the events, because no key
@ -573,11 +573,11 @@ class TestKeycodeMapper(unittest.TestCase):
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 0), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 3, 0), None)
loop.run_until_complete(asyncio.sleep(0.05))
self.assertFalse(active_macros[(EV_KEY, 1)].holding)
self.assertFalse(active_macros[(EV_KEY, 1)].is_holding())
self.assertFalse(active_macros[(EV_KEY, 1)].running)
self.assertTrue(active_macros[(EV_KEY, 2)].holding)
self.assertTrue(active_macros[(EV_KEY, 2)].is_holding())
self.assertTrue(active_macros[(EV_KEY, 2)].running)
self.assertFalse(active_macros[(EV_KEY, 3)].holding)
self.assertFalse(active_macros[(EV_KEY, 3)].is_holding())
self.assertFalse(active_macros[(EV_KEY, 3)].running)
# stop macro 2
@ -601,11 +601,11 @@ class TestKeycodeMapper(unittest.TestCase):
count_after = len(history)
self.assertEqual(count_before, count_after)
self.assertFalse(active_macros[(EV_KEY, 1)].holding)
self.assertFalse(active_macros[(EV_KEY, 1)].is_holding())
self.assertFalse(active_macros[(EV_KEY, 1)].running)
self.assertFalse(active_macros[(EV_KEY, 2)].holding)
self.assertFalse(active_macros[(EV_KEY, 2)].is_holding())
self.assertFalse(active_macros[(EV_KEY, 2)].running)
self.assertFalse(active_macros[(EV_KEY, 3)].holding)
self.assertFalse(active_macros[(EV_KEY, 3)].is_holding())
self.assertFalse(active_macros[(EV_KEY, 3)].running)
def test_hold_3(self):
@ -634,7 +634,7 @@ class TestKeycodeMapper(unittest.TestCase):
loop.run_until_complete(asyncio.sleep(0.1))
for _ in range(5):
self.assertTrue(active_macros[(EV_KEY, 1)].holding)
self.assertTrue(active_macros[(EV_KEY, 1)].is_holding())
self.assertTrue(active_macros[(EV_KEY, 1)].running)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None)
loop.run_until_complete(asyncio.sleep(0.05))
@ -652,7 +652,7 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertEqual(history.count((code_a, 0)), 1)
self.assertEqual(history.count((code_c, 1)), 1)
self.assertEqual(history.count((code_c, 0)), 1)
self.assertFalse(active_macros[(EV_KEY, 1)].holding)
self.assertFalse(active_macros[(EV_KEY, 1)].is_holding())
self.assertFalse(active_macros[(EV_KEY, 1)].running)
# it's stopped and won't write stuff anymore
@ -727,9 +727,9 @@ class TestKeycodeMapper(unittest.TestCase):
loop.run_until_complete(asyncio.sleep(sleeptime / 1000))
self.assertEqual(len(active_macros), 2)
self.assertTrue(active_macros[key_1].holding)
self.assertTrue(active_macros[key_1].is_holding())
self.assertTrue(active_macros[key_1].running)
self.assertTrue(active_macros[key_2].holding)
self.assertTrue(active_macros[key_2].is_holding())
self.assertTrue(active_macros[key_2].running)
self.assertIn(down_0[:2], unreleased)
@ -748,9 +748,9 @@ class TestKeycodeMapper(unittest.TestCase):
loop.run_until_complete(asyncio.sleep(keystroke_sleep * 10 / 1000))
self.assertFalse(active_macros[key_1].holding)
self.assertFalse(active_macros[key_1].is_holding())
self.assertFalse(active_macros[key_1].running)
self.assertFalse(active_macros[key_2].holding)
self.assertFalse(active_macros[key_2].is_holding())
self.assertFalse(active_macros[key_2].running)
events = calculate_event_number(sleeptime, 1, 1) * 2

@ -164,23 +164,48 @@ class TestMacros(unittest.TestCase):
macro.press_key()
asyncio.ensure_future(macro.run())
self.loop.run_until_complete(asyncio.sleep(0.2))
self.assertTrue(macro.is_holding())
self.assertGreater(len(self.result), 2)
macro.release_key()
self.loop.run_until_complete(asyncio.sleep(0.05))
self.assertFalse(macro.is_holding())
self.assertEqual(
self.result[0],
(system_mapping.get('1'), 1)
)
self.assertEqual(
self.result[-1],
(system_mapping.get('3'), 0)
)
self.assertEqual(self.result[0], (system_mapping.get('1'), 1))
self.assertEqual(self.result[-1], (system_mapping.get('3'), 0))
code_a = system_mapping.get('a')
self.assertGreater(self.result.count((code_a, 1)), 2)
self.assertEqual(len(macro.child_macros), 1)
def test_hold_forever(self):
macro = parse('k(1).h().k(3)', self.mapping)
macro.set_handler(self.handler)
self.assertSetEqual(macro.get_capabilities(), {
system_mapping.get('1'),
system_mapping.get('3')
})
macro.press_key()
asyncio.ensure_future(macro.run())
self.loop.run_until_complete(asyncio.sleep(0.1))
self.assertTrue(macro.is_holding())
self.assertEqual(len(self.result), 2)
self.loop.run_until_complete(asyncio.sleep(0.1))
# doesn't do fancy stuff, is blocking until the release
self.assertEqual(len(self.result), 2)
macro.release_key()
self.loop.run_until_complete(asyncio.sleep(0.05))
self.assertFalse(macro.is_holding())
self.assertEqual(len(self.result), 4)
self.assertEqual(self.result[0], (system_mapping.get('1'), 1))
self.assertEqual(self.result[-1], (system_mapping.get('3'), 0))
self.assertEqual(len(macro.child_macros), 0)
def test_2(self):
start = time.time()
repeats = 20

Loading…
Cancel
Save