efficiently hold forever until released with h()

pull/31/head
sezanzeb 4 years ago
parent 7e26528bdb
commit 3ab1869827

@ -271,7 +271,7 @@ def handle_keycode(key_to_code, macros, event, uinput, forward=True):
"""Releasing keys and macros""" """Releasing keys and macros"""
if is_key_up(event): if is_key_up(event):
if active_macro is not None and active_macro.holding: if active_macro is not None and active_macro.is_holding():
# Tell the macro for that keycode that the key is released and # Tell the macro for that keycode that the key is released and
# let it decide what to do with that information. # let it decide what to do with that information.
active_macro.release_key() active_macro.release_key()

@ -79,8 +79,8 @@ class _Macro:
self.code = code self.code = code
self.mapping = mapping self.mapping = mapping
# supposed to be True between key event values 1 (down) and 0 (up) # is a lock so that h() can be realized
self.holding = False self._holding_lock = asyncio.Lock()
self.running = False self.running = False
@ -89,6 +89,10 @@ class _Macro:
self.child_macros = [] self.child_macros = []
def is_holding(self):
"""Check if the macro is waiting for a key to be released."""
return self._holding_lock.locked()
def get_capabilities(self): def get_capabilities(self):
"""Resolve all capabilities of the macro and those of its children.""" """Resolve all capabilities of the macro and those of its children."""
capabilities = self.capabilities.copy() capabilities = self.capabilities.copy()
@ -123,31 +127,52 @@ class _Macro:
def press_key(self): def press_key(self):
"""The user pressed the key down.""" """The user pressed the key down."""
self.holding = True if self.is_holding():
logger.error('Already holding')
return
asyncio.ensure_future(self._holding_lock.acquire())
for macro in self.child_macros: for macro in self.child_macros:
macro.press_key() macro.press_key()
def release_key(self): def release_key(self):
"""The user released the key.""" """The user released the key."""
self.holding = False if self._holding_lock is not None:
self._holding_lock.release()
for macro in self.child_macros: for macro in self.child_macros:
macro.release_key() macro.release_key()
def hold(self, macro): def hold(self, macro=None):
"""Loops the execution until key release.""" """Loops the execution until key release."""
if not isinstance(macro, _Macro): if macro is None:
raise ValueError( # no parameters: block until released
'Expected the param for h (hold) to be ' async def task():
f'a macro (like k(a)), but got "{macro}"' # wait until the key is released. Only then it will be
) # able to acquire the lock. Release it right after so that
# it can be acquired by press_key again.
async def task(): await self._holding_lock.acquire()
while self.holding: self._holding_lock.release()
await macro.run() # this seems to be much more efficient on the CPU than
# looping `await asyncio.sleep(0.001)`
self.tasks.append((1234, task))
else:
if not isinstance(macro, _Macro):
raise ValueError(
'Expected the param for h (hold) to be '
f'a macro (like k(a)), but got "{macro}"'
)
self.tasks.append((REPEAT, task)) async def task():
while self.is_holding():
# run the child macro completely to avoid
# not-releasing any key
await macro.run()
self.child_macros.append(macro) self.tasks.append((REPEAT, task))
self.child_macros.append(macro)
return self return self
@ -335,6 +360,9 @@ def _parse_recurse(macro, mapping, macro_instance=None, depth=0):
assert isinstance(macro, str) assert isinstance(macro, str)
assert isinstance(depth, int) assert isinstance(depth, int)
if macro == '':
return None
if macro_instance is None: if macro_instance is None:
macro_instance = _Macro(macro, mapping) macro_instance = _Macro(macro, mapping)
else: else:
@ -354,7 +382,7 @@ def _parse_recurse(macro, mapping, macro_instance=None, depth=0):
'r': (macro_instance.repeat, 2, 2), 'r': (macro_instance.repeat, 2, 2),
'k': (macro_instance.keycode, 1, 1), 'k': (macro_instance.keycode, 1, 1),
'w': (macro_instance.wait, 1, 1), 'w': (macro_instance.wait, 1, 1),
'h': (macro_instance.hold, 1, 1) 'h': (macro_instance.hold, 0, 1)
} }
function = functions.get(call) function = functions.get(call)

@ -78,9 +78,9 @@ class TestKeycodeMapper(unittest.TestCase):
def tearDown(self): def tearDown(self):
# make sure all macros are stopped by tests # make sure all macros are stopped by tests
for macro in active_macros.values(): for macro in active_macros.values():
if macro.holding: if macro.is_holding():
macro.release_key() macro.release_key()
self.assertFalse(macro.holding) self.assertFalse(macro.is_holding())
self.assertFalse(macro.running) self.assertFalse(macro.running)
cleanup() cleanup()
@ -451,7 +451,7 @@ class TestKeycodeMapper(unittest.TestCase):
keystroke_sleep = config.get('macros.keystroke_sleep_ms', 10) keystroke_sleep = config.get('macros.keystroke_sleep_ms', 10)
loop.run_until_complete(asyncio.sleep(sleeptime / 1000)) loop.run_until_complete(asyncio.sleep(sleeptime / 1000))
self.assertTrue(active_macros[(EV_KEY, 1)].holding) self.assertTrue(active_macros[(EV_KEY, 1)].is_holding())
self.assertTrue(active_macros[(EV_KEY, 1)].running) self.assertTrue(active_macros[(EV_KEY, 1)].running)
"""stop macro""" """stop macro"""
@ -480,7 +480,7 @@ class TestKeycodeMapper(unittest.TestCase):
count_after = len(history) count_after = len(history)
self.assertEqual(count_before, count_after) self.assertEqual(count_before, count_after)
self.assertFalse(active_macros[(EV_KEY, 1)].holding) self.assertFalse(active_macros[(EV_KEY, 1)].is_holding())
self.assertFalse(active_macros[(EV_KEY, 1)].running) self.assertFalse(active_macros[(EV_KEY, 1)].running)
def test_hold_2(self): def test_hold_2(self):
@ -525,11 +525,11 @@ class TestKeycodeMapper(unittest.TestCase):
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None) handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 3, 1), None) handle_keycode({}, macro_mapping, new_event(EV_KEY, 3, 1), None)
loop.run_until_complete(asyncio.sleep(0.05)) loop.run_until_complete(asyncio.sleep(0.05))
self.assertTrue(active_macros[(EV_KEY, 1)].holding) self.assertTrue(active_macros[(EV_KEY, 1)].is_holding())
self.assertTrue(active_macros[(EV_KEY, 1)].running) self.assertTrue(active_macros[(EV_KEY, 1)].running)
self.assertTrue(active_macros[(EV_KEY, 2)].holding) self.assertTrue(active_macros[(EV_KEY, 2)].is_holding())
self.assertTrue(active_macros[(EV_KEY, 2)].running) self.assertTrue(active_macros[(EV_KEY, 2)].running)
self.assertTrue(active_macros[(EV_KEY, 3)].holding) self.assertTrue(active_macros[(EV_KEY, 3)].is_holding())
self.assertTrue(active_macros[(EV_KEY, 3)].running) self.assertTrue(active_macros[(EV_KEY, 3)].running)
# there should only be one code_c in the events, because no key # there should only be one code_c in the events, because no key
@ -573,11 +573,11 @@ class TestKeycodeMapper(unittest.TestCase):
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 0), None) handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 0), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 3, 0), None) handle_keycode({}, macro_mapping, new_event(EV_KEY, 3, 0), None)
loop.run_until_complete(asyncio.sleep(0.05)) loop.run_until_complete(asyncio.sleep(0.05))
self.assertFalse(active_macros[(EV_KEY, 1)].holding) self.assertFalse(active_macros[(EV_KEY, 1)].is_holding())
self.assertFalse(active_macros[(EV_KEY, 1)].running) self.assertFalse(active_macros[(EV_KEY, 1)].running)
self.assertTrue(active_macros[(EV_KEY, 2)].holding) self.assertTrue(active_macros[(EV_KEY, 2)].is_holding())
self.assertTrue(active_macros[(EV_KEY, 2)].running) self.assertTrue(active_macros[(EV_KEY, 2)].running)
self.assertFalse(active_macros[(EV_KEY, 3)].holding) self.assertFalse(active_macros[(EV_KEY, 3)].is_holding())
self.assertFalse(active_macros[(EV_KEY, 3)].running) self.assertFalse(active_macros[(EV_KEY, 3)].running)
# stop macro 2 # stop macro 2
@ -601,11 +601,11 @@ class TestKeycodeMapper(unittest.TestCase):
count_after = len(history) count_after = len(history)
self.assertEqual(count_before, count_after) self.assertEqual(count_before, count_after)
self.assertFalse(active_macros[(EV_KEY, 1)].holding) self.assertFalse(active_macros[(EV_KEY, 1)].is_holding())
self.assertFalse(active_macros[(EV_KEY, 1)].running) self.assertFalse(active_macros[(EV_KEY, 1)].running)
self.assertFalse(active_macros[(EV_KEY, 2)].holding) self.assertFalse(active_macros[(EV_KEY, 2)].is_holding())
self.assertFalse(active_macros[(EV_KEY, 2)].running) self.assertFalse(active_macros[(EV_KEY, 2)].running)
self.assertFalse(active_macros[(EV_KEY, 3)].holding) self.assertFalse(active_macros[(EV_KEY, 3)].is_holding())
self.assertFalse(active_macros[(EV_KEY, 3)].running) self.assertFalse(active_macros[(EV_KEY, 3)].running)
def test_hold_3(self): def test_hold_3(self):
@ -634,7 +634,7 @@ class TestKeycodeMapper(unittest.TestCase):
loop.run_until_complete(asyncio.sleep(0.1)) loop.run_until_complete(asyncio.sleep(0.1))
for _ in range(5): for _ in range(5):
self.assertTrue(active_macros[(EV_KEY, 1)].holding) self.assertTrue(active_macros[(EV_KEY, 1)].is_holding())
self.assertTrue(active_macros[(EV_KEY, 1)].running) self.assertTrue(active_macros[(EV_KEY, 1)].running)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None) handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None)
loop.run_until_complete(asyncio.sleep(0.05)) loop.run_until_complete(asyncio.sleep(0.05))
@ -652,7 +652,7 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertEqual(history.count((code_a, 0)), 1) self.assertEqual(history.count((code_a, 0)), 1)
self.assertEqual(history.count((code_c, 1)), 1) self.assertEqual(history.count((code_c, 1)), 1)
self.assertEqual(history.count((code_c, 0)), 1) self.assertEqual(history.count((code_c, 0)), 1)
self.assertFalse(active_macros[(EV_KEY, 1)].holding) self.assertFalse(active_macros[(EV_KEY, 1)].is_holding())
self.assertFalse(active_macros[(EV_KEY, 1)].running) self.assertFalse(active_macros[(EV_KEY, 1)].running)
# it's stopped and won't write stuff anymore # it's stopped and won't write stuff anymore
@ -727,9 +727,9 @@ class TestKeycodeMapper(unittest.TestCase):
loop.run_until_complete(asyncio.sleep(sleeptime / 1000)) loop.run_until_complete(asyncio.sleep(sleeptime / 1000))
self.assertEqual(len(active_macros), 2) self.assertEqual(len(active_macros), 2)
self.assertTrue(active_macros[key_1].holding) self.assertTrue(active_macros[key_1].is_holding())
self.assertTrue(active_macros[key_1].running) self.assertTrue(active_macros[key_1].running)
self.assertTrue(active_macros[key_2].holding) self.assertTrue(active_macros[key_2].is_holding())
self.assertTrue(active_macros[key_2].running) self.assertTrue(active_macros[key_2].running)
self.assertIn(down_0[:2], unreleased) self.assertIn(down_0[:2], unreleased)
@ -748,9 +748,9 @@ class TestKeycodeMapper(unittest.TestCase):
loop.run_until_complete(asyncio.sleep(keystroke_sleep * 10 / 1000)) loop.run_until_complete(asyncio.sleep(keystroke_sleep * 10 / 1000))
self.assertFalse(active_macros[key_1].holding) self.assertFalse(active_macros[key_1].is_holding())
self.assertFalse(active_macros[key_1].running) self.assertFalse(active_macros[key_1].running)
self.assertFalse(active_macros[key_2].holding) self.assertFalse(active_macros[key_2].is_holding())
self.assertFalse(active_macros[key_2].running) self.assertFalse(active_macros[key_2].running)
events = calculate_event_number(sleeptime, 1, 1) * 2 events = calculate_event_number(sleeptime, 1, 1) * 2

@ -164,23 +164,48 @@ class TestMacros(unittest.TestCase):
macro.press_key() macro.press_key()
asyncio.ensure_future(macro.run()) asyncio.ensure_future(macro.run())
self.loop.run_until_complete(asyncio.sleep(0.2)) self.loop.run_until_complete(asyncio.sleep(0.2))
self.assertTrue(macro.is_holding())
self.assertGreater(len(self.result), 2)
macro.release_key() macro.release_key()
self.loop.run_until_complete(asyncio.sleep(0.05)) self.loop.run_until_complete(asyncio.sleep(0.05))
self.assertFalse(macro.is_holding())
self.assertEqual( self.assertEqual(self.result[0], (system_mapping.get('1'), 1))
self.result[0], self.assertEqual(self.result[-1], (system_mapping.get('3'), 0))
(system_mapping.get('1'), 1)
)
self.assertEqual(
self.result[-1],
(system_mapping.get('3'), 0)
)
code_a = system_mapping.get('a') code_a = system_mapping.get('a')
self.assertGreater(self.result.count((code_a, 1)), 2) self.assertGreater(self.result.count((code_a, 1)), 2)
self.assertEqual(len(macro.child_macros), 1) self.assertEqual(len(macro.child_macros), 1)
def test_hold_forever(self):
macro = parse('k(1).h().k(3)', self.mapping)
macro.set_handler(self.handler)
self.assertSetEqual(macro.get_capabilities(), {
system_mapping.get('1'),
system_mapping.get('3')
})
macro.press_key()
asyncio.ensure_future(macro.run())
self.loop.run_until_complete(asyncio.sleep(0.1))
self.assertTrue(macro.is_holding())
self.assertEqual(len(self.result), 2)
self.loop.run_until_complete(asyncio.sleep(0.1))
# doesn't do fancy stuff, is blocking until the release
self.assertEqual(len(self.result), 2)
macro.release_key()
self.loop.run_until_complete(asyncio.sleep(0.05))
self.assertFalse(macro.is_holding())
self.assertEqual(len(self.result), 4)
self.assertEqual(self.result[0], (system_mapping.get('1'), 1))
self.assertEqual(self.result[-1], (system_mapping.get('3'), 0))
self.assertEqual(len(macro.child_macros), 0)
def test_2(self): def test_2(self):
start = time.time() start = time.time()
repeats = 20 repeats = 20

Loading…
Cancel
Save