no redundant macro parsings

xkb
sezanzeb 3 years ago committed by sezanzeb
parent a983cf9126
commit 5e150a3209

@ -412,6 +412,20 @@ class Injector:
self._event_producer = EventProducer(self.mapping)
logger.debug('Parsing macros')
macros = {}
for key, output in self.mapping:
if is_this_a_macro(output):
macro = parse(output, self.mapping)
if macro is None:
continue
for permutation in key.get_permutations():
macros[permutation.keys] = macro
if len(macros) == 0:
logger.debug('No macros configured')
# Watch over each one of the potentially multiple devices per hardware
for path in paths:
source = self._grab_device(path)
@ -420,21 +434,6 @@ class Injector:
# it doesn't provide the events needed to execute the mapping
continue
# each device needs own macro instances to add a custom handler
logger.debug('Parsing macros for %s', path)
macros = {}
for key, output in self.mapping:
if is_this_a_macro(output):
macro = parse(output, self.mapping)
if macro is None:
continue
for permutation in key.get_permutations():
macros[permutation.keys] = macro
if len(macros) == 0:
logger.debug('No macros configured')
logger.spam(
'Original capabilities for "%s": %s',
path, source.capabilities(verbose=True)
@ -455,14 +454,6 @@ class Injector:
path, uinput.capabilities(verbose=True)
)
def handler(*args, uinput=uinput):
# this ensures that the right uinput is used for macro_write,
# because this is within a loop
self._macro_write(*args, uinput)
for macro in macros.values():
macro.set_handler(handler)
# actual reading of events
coroutines.append(self._event_consumer(macros, source, uinput))

@ -240,6 +240,11 @@ class KeycodeMapper:
self.key_to_code = key_to_code
self.macros = macros
def macro_write(self, code, value):
"""Handler for macros."""
self.uinput.write(EV_KEY, code, value)
self.uinput.syn()
def _get_key(self, key):
"""If the event triggers stuff, get the key for that.
@ -419,7 +424,7 @@ class KeycodeMapper:
Unreleased((None, None), event_tuple, key)
macro.press_key()
logger.key_spam(key, 'maps to macro %s', macro.code)
asyncio.ensure_future(macro.run())
asyncio.ensure_future(macro.run(self.macro_write))
return
if key in self.key_to_code:

@ -104,25 +104,18 @@ class _Macro:
capabilities.update(macro.get_capabilities())
return capabilities
def set_handler(self, handler):
"""Set the handler function.
async def run(self, handler):
"""Run the macro.
Parameters
----------
handler : func
A function that accepts keycodes as the first parameter and the
key-press state as the second. 1 for down and 0 for up. The
macro will write to this function once executed with `.run()`.
handler : function
Will receive int code and value for an EV_KEY event to write
"""
self.handler = handler
for macro in self.child_macros:
macro.set_handler(handler)
async def run(self):
"""Run the macro."""
# TODO test handler
self.running = True
for _, task in self.tasks:
coroutine = task()
coroutine = task(handler)
if asyncio.iscoroutine(coroutine):
await coroutine
@ -152,7 +145,7 @@ class _Macro:
"""Loops the execution until key release."""
if macro is None:
# no parameters: block until released
async def task():
async def task(_):
# wait until the key is released. Only then it will be
# able to acquire the lock. Release it right after so that
# it can be acquired by press_key again.
@ -169,11 +162,11 @@ class _Macro:
f'a macro (like k(a)), but got "{macro}"'
)
async def task():
async def task(handler):
while self.is_holding():
# run the child macro completely to avoid
# not-releasing any key
await macro.run()
await macro.run(handler)
self.tasks.append((REPEAT, task))
self.child_macros.append(macro)
@ -204,11 +197,11 @@ class _Macro:
self.child_macros.append(macro)
self.tasks.append((MODIFIER, lambda: self.handler(code, 1)))
self.tasks.append((MODIFIER, lambda handler: handler(code, 1)))
self.add_keycode_pause()
self.tasks.append((CHILD_MACRO, macro.run))
self.add_keycode_pause()
self.tasks.append((MODIFIER, lambda: self.handler(code, 0)))
self.tasks.append((MODIFIER, lambda handler: handler(code, 0)))
self.add_keycode_pause()
return self
@ -245,7 +238,7 @@ class _Macro:
"""To add a pause between keystrokes."""
sleeptime = self.mapping.get('macros.keystroke_sleep_ms') / 1000
async def sleep():
async def sleep(_):
await asyncio.sleep(sleeptime)
self.tasks.append((SLEEP, sleep))
@ -260,9 +253,9 @@ class _Macro:
self.capabilities.add(code)
self.tasks.append((KEYSTROKE, lambda: self.handler(code, 1)))
self.tasks.append((KEYSTROKE, lambda handler: handler(code, 1)))
self.add_keycode_pause()
self.tasks.append((KEYSTROKE, lambda: self.handler(code, 0)))
self.tasks.append((KEYSTROKE, lambda handler: handler(code, 0)))
self.add_keycode_pause()
return self
@ -278,7 +271,7 @@ class _Macro:
sleeptime /= 1000
async def sleep():
async def sleep(_):
await asyncio.sleep(sleeptime)
self.tasks.append((SLEEP, sleep))
@ -476,9 +469,8 @@ def handle_plus_syntax(macro):
def parse(macro, mapping, return_errors=False):
"""parse and generate a _Macro that can be run as often as you want.
You need to use set_handler on it before running. If it could not
be parsed, possibly due to syntax errors, will log the error and
return None.
If it could not be parsed, possibly due to syntax errors, will log the
error and return None.
Parameters
----------

@ -332,6 +332,7 @@ class UInput:
self.device = InputDevice('justdoit')
self.name = name
self.events = events
self.write_history = []
def capabilities(self, *args, **kwargs):
return self.events
@ -341,6 +342,7 @@ class UInput:
event = new_event(type, code, value)
uinput_write_history.append(event)
uinput_write_history_pipe[1].send(event)
self.write_history.append(event)
print(
f'\033[90m' # color
f'{(type, code, value)} written'

@ -478,14 +478,14 @@ class TestKeycodeMapper(unittest.TestCase):
((EV_KEY, 2, 1),): parse('r(5, k(b))', self.mapping)
}
macro_mapping[((EV_KEY, 1, 1),)].set_handler(lambda *args: history.append(args))
macro_mapping[((EV_KEY, 2, 1),)].set_handler(lambda *args: history.append(args))
keycode_mapper = KeycodeMapper(
self.source, self.mapping, None,
{}, macro_mapping
)
keycode_mapper.macro_write = lambda *args: history.append(args)
keycode_mapper.macro_write = lambda *args: history.append(args)
keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 1))
keycode_mapper.handle_keycode(new_event(EV_KEY, 2, 1))
@ -531,13 +531,13 @@ class TestKeycodeMapper(unittest.TestCase):
def handler(*args):
history.append(args)
macro_mapping[((EV_KEY, 1, 1),)].set_handler(handler)
keycode_mapper = KeycodeMapper(
self.source, self.mapping, None,
{}, macro_mapping
)
keycode_mapper.macro_write = handler
"""start macro"""
keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 1))
@ -604,15 +604,15 @@ class TestKeycodeMapper(unittest.TestCase):
def handler(*args):
history.append(args)
macro_mapping[((EV_KEY, 1, 1),)].set_handler(handler)
macro_mapping[((EV_KEY, 2, 1),)].set_handler(handler)
macro_mapping[((EV_KEY, 3, 1),)].set_handler(handler)
keycode_mapper = KeycodeMapper(
self.source, self.mapping, None,
{}, macro_mapping
)
keycode_mapper.macro_write = handler
keycode_mapper.macro_write = handler
keycode_mapper.macro_write = handler
"""start macro 2"""
keycode_mapper.handle_keycode(new_event(EV_KEY, 2, 1))
@ -730,13 +730,13 @@ class TestKeycodeMapper(unittest.TestCase):
def handler(*args):
history.append(args)
macro_mapping[((EV_KEY, 1, 1),)].set_handler(handler)
keycode_mapper = KeycodeMapper(
self.source, self.mapping, None,
{}, macro_mapping
)
keycode_mapper.macro_write = handler
keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 1))
loop = asyncio.get_event_loop()
@ -806,19 +806,17 @@ class TestKeycodeMapper(unittest.TestCase):
def handler(*args):
history.append(args)
macro_mapping[(down_0, down_1)].set_handler(handler)
macro_mapping[(down_2,)].set_handler(handler)
loop = asyncio.get_event_loop()
macros_uinput = UInput()
keys_uinput = UInput()
uinput_1 = UInput()
keycode_mapper = KeycodeMapper(
self.source, self.mapping, macros_uinput,
self.source, self.mapping, uinput_1,
{}, macro_mapping
)
keycode_mapper.macro_write = handler
# key up won't do anything
keycode_mapper.handle_keycode(new_event(*up_0))
keycode_mapper.handle_keycode(new_event(*up_1))
@ -828,16 +826,20 @@ class TestKeycodeMapper(unittest.TestCase):
"""start macros"""
uinput_2 = UInput()
keycode_mapper = KeycodeMapper(
self.source, self.mapping, keys_uinput,
self.source, self.mapping, uinput_2,
{}, macro_mapping
)
keycode_mapper.macro_write = handler
keycode_mapper.handle_keycode(new_event(*down_0))
self.assertEqual(keys_uinput.write_count, 1)
self.assertEqual(uinput_2.write_count, 1)
keycode_mapper.handle_keycode(new_event(*down_1))
keycode_mapper.handle_keycode(new_event(*down_2))
self.assertEqual(keys_uinput.write_count, 1)
self.assertEqual(uinput_2.write_count, 1)
# let the mainloop run for some time so that the macro does its stuff
sleeptime = 500
@ -932,14 +934,14 @@ class TestKeycodeMapper(unittest.TestCase):
def handler(*args):
history.append(args)
macro_mapping[(right,)].set_handler(handler)
macro_mapping[(left,)].set_handler(handler)
keycode_mapper = KeycodeMapper(
self.source, self.mapping, None,
{}, macro_mapping
)
keycode_mapper.macro_write = handler
keycode_mapper.macro_write = handler
keycode_mapper.handle_keycode(new_event(*right))
self.assertIn((EV_ABS, ABS_HAT0X), unreleased)
keycode_mapper.handle_keycode(new_event(*release))
@ -1145,7 +1147,6 @@ class TestKeycodeMapper(unittest.TestCase):
macro_history = []
def handler(*args):
macro_history.append(args)
macro_mapping[(down_1,)].set_handler(handler)
uinput = UInput()
@ -1156,6 +1157,8 @@ class TestKeycodeMapper(unittest.TestCase):
_key_to_code, macro_mapping
)
keycode_mapper.macro_write = handler
# macro starts
keycode_mapper.handle_keycode(new_event(*down_1))
loop.run_until_complete(asyncio.sleep(0.05))
@ -1217,7 +1220,7 @@ class TestKeycodeMapper(unittest.TestCase):
k2c = {combination: 30}
uinput = UInput()
keycode_mapper = KeycodeMapper(
self.source, self.mapping,
uinput, k2c, {}

@ -79,7 +79,6 @@ class TestMacros(unittest.TestCase):
def test_run_plus_syntax(self):
macro = parse('a + b + c + d', self.mapping)
macro.set_handler(self.handler)
self.assertSetEqual(macro.get_capabilities(), {
system_mapping.get('a'),
system_mapping.get('b'),
@ -88,11 +87,9 @@ class TestMacros(unittest.TestCase):
})
macro.press_key()
asyncio.ensure_future(macro.run())
asyncio.ensure_future(macro.run(self.handler))
self.loop.run_until_complete(asyncio.sleep(0.2))
self.assertTrue(macro.is_holding())
print(self.mapping.get('macros.keystroke_sleep_ms'))
print(self.result)
# starting from the left, presses each one down
self.assertEqual(self.result[0], (system_mapping.get('a'), 1))
@ -104,7 +101,6 @@ class TestMacros(unittest.TestCase):
macro.release_key()
self.loop.run_until_complete(asyncio.sleep(0.2))
self.assertFalse(macro.is_holding())
print(self.result)
self.assertEqual(self.result[4], (system_mapping.get('d'), 0))
self.assertEqual(self.result[5], (system_mapping.get('c'), 0))
self.assertEqual(self.result[6], (system_mapping.get('b'), 0))
@ -133,18 +129,6 @@ class TestMacros(unittest.TestCase):
expect(',', ['', ''])
expect(',,', ['', '', ''])
def test_set_handler(self):
macro = parse('r(1, r(1, k(1)))', self.mapping)
one_code = system_mapping.get('1')
self.assertSetEqual(macro.get_capabilities(), {one_code})
self.loop.run_until_complete(macro.run())
self.assertListEqual(self.result, [])
macro.set_handler(self.handler)
self.loop.run_until_complete(macro.run())
self.assertListEqual(self.result, [(one_code, 1), (one_code, 0)])
def test_fails(self):
self.assertIsNone(parse('r(1, a)', self.mapping))
self.assertIsNone(parse('r(a, k(b))', self.mapping))
@ -157,24 +141,22 @@ class TestMacros(unittest.TestCase):
def test_0(self):
macro = parse('k(1)', self.mapping)
macro.set_handler(self.handler)
one_code = system_mapping.get('1')
self.assertSetEqual(macro.get_capabilities(), {one_code})
self.loop.run_until_complete(macro.run())
self.loop.run_until_complete(macro.run(self.handler))
self.assertListEqual(self.result, [(one_code, 1), (one_code, 0)])
self.assertEqual(len(macro.child_macros), 0)
def test_1(self):
macro = parse('k(1).k(a).k(3)', self.mapping)
macro.set_handler(self.handler)
self.assertSetEqual(macro.get_capabilities(), {
system_mapping.get('1'),
system_mapping.get('a'),
system_mapping.get('3')
})
self.loop.run_until_complete(macro.run())
self.loop.run_until_complete(macro.run(self.handler))
self.assertListEqual(self.result, [
(system_mapping.get('1'), 1), (system_mapping.get('1'), 0),
(system_mapping.get('a'), 1), (system_mapping.get('a'), 0),
@ -214,7 +196,6 @@ class TestMacros(unittest.TestCase):
def test_hold(self):
macro = parse('k(1).h(k(a)).k(3)', self.mapping)
macro.set_handler(self.handler)
self.assertSetEqual(macro.get_capabilities(), {
system_mapping.get('1'),
system_mapping.get('a'),
@ -222,7 +203,7 @@ class TestMacros(unittest.TestCase):
})
macro.press_key()
asyncio.ensure_future(macro.run())
asyncio.ensure_future(macro.run(self.handler))
self.loop.run_until_complete(asyncio.sleep(0.2))
self.assertTrue(macro.is_holding())
self.assertGreater(len(self.result), 2)
@ -241,14 +222,13 @@ class TestMacros(unittest.TestCase):
def test_dont_hold(self):
macro = parse('k(1).h(k(a)).k(3)', self.mapping)
macro.set_handler(self.handler)
self.assertSetEqual(macro.get_capabilities(), {
system_mapping.get('1'),
system_mapping.get('a'),
system_mapping.get('3')
})
asyncio.ensure_future(macro.run())
asyncio.ensure_future(macro.run(self.handler))
self.loop.run_until_complete(asyncio.sleep(0.2))
self.assertFalse(macro.is_holding())
# press_key was never called, so the macro completes right away
@ -262,14 +242,13 @@ class TestMacros(unittest.TestCase):
def test_just_hold(self):
macro = parse('k(1).h().k(3)', self.mapping)
macro.set_handler(self.handler)
self.assertSetEqual(macro.get_capabilities(), {
system_mapping.get('1'),
system_mapping.get('3')
})
macro.press_key()
asyncio.ensure_future(macro.run())
asyncio.ensure_future(macro.run(self.handler))
self.loop.run_until_complete(asyncio.sleep(0.1))
self.assertTrue(macro.is_holding())
self.assertEqual(len(self.result), 2)
@ -289,13 +268,12 @@ class TestMacros(unittest.TestCase):
def test_dont_just_hold(self):
macro = parse('k(1).h().k(3)', self.mapping)
macro.set_handler(self.handler)
self.assertSetEqual(macro.get_capabilities(), {
system_mapping.get('1'),
system_mapping.get('3')
})
asyncio.ensure_future(macro.run())
asyncio.ensure_future(macro.run(self.handler))
self.loop.run_until_complete(asyncio.sleep(0.1))
self.assertFalse(macro.is_holding())
# since press_key was never called it just does the macro
@ -312,11 +290,10 @@ class TestMacros(unittest.TestCase):
repeats = 20
macro = parse(f'r({repeats}, k(k)).r(1, k(k))', self.mapping)
macro.set_handler(self.handler)
k_code = system_mapping.get('k')
self.assertSetEqual(macro.get_capabilities(), {k_code})
self.loop.run_until_complete(macro.run())
self.loop.run_until_complete(macro.run(self.handler))
keystroke_sleep = self.mapping.get('macros.keystroke_sleep_ms')
sleep_time = 2 * repeats * keystroke_sleep / 1000
self.assertGreater(time.time() - start, sleep_time * 0.9)
@ -333,10 +310,9 @@ class TestMacros(unittest.TestCase):
def test_3(self):
start = time.time()
macro = parse('r(3, k(m).w(100))', self.mapping)
macro.set_handler(self.handler)
m_code = system_mapping.get('m')
self.assertSetEqual(macro.get_capabilities(), {m_code})
self.loop.run_until_complete(macro.run())
self.loop.run_until_complete(macro.run(self.handler))
keystroke_time = 6 * self.mapping.get('macros.keystroke_sleep_ms')
total_time = keystroke_time + 300
@ -354,7 +330,6 @@ class TestMacros(unittest.TestCase):
def test_4(self):
macro = parse(' r(2,\nk(\nr ).k(minus\n )).k(m) ', self.mapping)
macro.set_handler(self.handler)
r = system_mapping.get('r')
minus = system_mapping.get('minus')
@ -362,7 +337,7 @@ class TestMacros(unittest.TestCase):
self.assertSetEqual(macro.get_capabilities(), {r, minus, m})
self.loop.run_until_complete(macro.run())
self.loop.run_until_complete(macro.run(self.handler))
self.assertListEqual(self.result, [
(r, 1), (r, 0),
(minus, 1), (minus, 0),
@ -376,7 +351,6 @@ class TestMacros(unittest.TestCase):
def test_5(self):
start = time.time()
macro = parse('w(200).r(2,m(w,\nr(2,\tk(BtN_LeFt))).w(10).k(k))', self.mapping)
macro.set_handler(self.handler)
self.assertEqual(len(macro.child_macros), 1)
self.assertEqual(len(macro.child_macros[0].child_macros), 1)
@ -387,7 +361,7 @@ class TestMacros(unittest.TestCase):
self.assertSetEqual(macro.get_capabilities(), {w, left, k})
self.loop.run_until_complete(macro.run())
self.loop.run_until_complete(macro.run(self.handler))
num_pauses = 8 + 6 + 4
keystroke_time = num_pauses * self.mapping.get('macros.keystroke_sleep_ms')
@ -406,7 +380,6 @@ class TestMacros(unittest.TestCase):
def test_6(self):
# does nothing without .run
macro = parse('k(a).r(3, k(b))', self.mapping)
macro.set_handler(self.handler)
self.assertIsInstance(macro, _Macro)
self.assertListEqual(self.result, [])
@ -415,7 +388,7 @@ class TestMacros(unittest.TestCase):
config.set('macros.keystroke_sleep_ms', 100)
start = time.time()
macro = parse('k(a).k(b)', self.mapping)
self.loop.run_until_complete(macro.run())
self.loop.run_until_complete(macro.run(self.handler))
delta = time.time() - start
# is currently over 400, k(b) adds another sleep afterwards
# that doesn't do anything
@ -425,7 +398,7 @@ class TestMacros(unittest.TestCase):
self.mapping.set('macros.keystroke_sleep_ms', 50)
start = time.time()
macro = parse('k(a).k(b)', self.mapping)
self.loop.run_until_complete(macro.run())
self.loop.run_until_complete(macro.run(self.handler))
delta = time.time() - start
self.assertGreater(delta, 0.150)
self.assertLess(delta, 0.300)

Loading…
Cancel
Save