diff --git a/keymapper/dev/injector.py b/keymapper/dev/injector.py index 7f9469a5..8b86d197 100644 --- a/keymapper/dev/injector.py +++ b/keymapper/dev/injector.py @@ -33,7 +33,8 @@ import evdev from keymapper.logger import logger from keymapper.getdevices import get_devices -from keymapper.state import custom_mapping, system_mapping +from keymapper.state import system_mapping +from keymapper.dev.macros import parse DEV_NAME = 'key-mapper' @@ -217,6 +218,15 @@ class KeycodeInjector: loop.run_until_complete(asyncio.gather(*coroutines)) + def _write(self, device, keycode, value): + """Actually inject.""" + device.write( + evdev.ecodes.EV_KEY, + keycode - KEYCODE_OFFSET, + value + ) + device.syn() + async def _injection_loop(self, device, keymapper_device): """Inject keycodes for one of the virtual devices. @@ -250,16 +260,24 @@ class KeycodeInjector: # unknown keycode, forward it target_keycode = input_keycode elif '(' in character: - # must be a macro. Only allow it if the injector is not - # running as root. - if os.geteuid() == 0: - logger.error( - 'Cannot allow running macros as root to avoid ' - 'injecting arbitrary code' + # must be a macro + logger.spam( + 'got code:%s value:%s, maps to macro %s', + event.code + KEYCODE_OFFSET, + event.value, + character + ) + # TODO prepare this beforehand, not on each keystroke + parse( + character, + handler=lambda keycode, value: ( + self._write( + keymapper_device, + target_keycode, + event.value + ) ) - continue - - Macro() + ).run() else: target_keycode = system_mapping.get_keycode(character) if target_keycode is None: @@ -277,12 +295,7 @@ class KeycodeInjector: character ) - keymapper_device.write( - evdev.ecodes.EV_KEY, - target_keycode - KEYCODE_OFFSET, - event.value - ) - keymapper_device.syn() + self._write(keymapper_device, target_keycode, event.value) # this should only ever happen in tests to avoid blocking them # forever, as soon as all events are consumed. In normal operation diff --git a/keymapper/dev/macros.py b/keymapper/dev/macros.py index dc951d4a..ea838852 100644 --- a/keymapper/dev/macros.py +++ b/keymapper/dev/macros.py @@ -45,7 +45,7 @@ import random from keymapper.logger import logger -class Macro: +class _Macro: """Supports chaining and preparing actions.""" def __init__(self, handler): """Create a macro instance that can be populated with tasks. @@ -76,7 +76,7 @@ class Macro: Parameters ---------- modifier : str - macro : Macro + macro : _Macro """ self.tasks.append(lambda: self.handler(modifier, 1)) self.tasks.append(macro.run) @@ -89,7 +89,7 @@ class Macro: Parameters ---------- repeats : int - macro : Macro + macro : _Macro """ for _ in range(repeats): self.tasks.append(macro.run) @@ -108,25 +108,7 @@ class Macro: return self -def parse(macro, handler): - """parse and generate a Macro that can be run as often as you want. - - Parameters - ---------- - macro : string - "r(3, k(a).w(10))" - "r(2, k(a).k(-)).k(b)" - "w(1000).m(SHIFT_L, r(2, k(a))).w(10, 20).k(b)" - handler : func - A function that accepts keycodes as the first parameter and the - key-press state as the second. 1 for down and 0 for up. The - macro will write to this function once executed with `.run()`. - """ - # simpler function prototype and docstring than parse_recurse - return parse_recurse(macro, handler) - - -def extract_params(inner): +def _extract_params(inner): """Extract parameters from the inner contents of a call. Parameters @@ -157,7 +139,7 @@ def extract_params(inner): return params -def parse_recurse(macro, handler, macro_instance=None, depth=0): +def _parse_recurse(macro, handler, macro_instance=None, depth=0): """Handle a subset of the macro, e.g. one parameter or function call. Parameters @@ -165,8 +147,8 @@ def parse_recurse(macro, handler, macro_instance=None, depth=0): macro : string Just like parse handler : function - passed to Macro constructors - macro_instance : Macro or None + passed to _Macro constructors + macro_instance : _Macro or None A macro instance to add tasks to depth : int For logging and debugging purposes @@ -182,9 +164,9 @@ def parse_recurse(macro, handler, macro_instance=None, depth=0): assert isinstance(depth, int) if macro_instance is None: - macro_instance = Macro(handler) + macro_instance = _Macro(handler) else: - assert isinstance(macro_instance, Macro) + assert isinstance(macro_instance, _Macro) macro = macro.strip() logger.spam('%sinput %s', ' ' * depth, macro) @@ -230,11 +212,11 @@ def parse_recurse(macro, handler, macro_instance=None, depth=0): inner = macro[2:position - 1] # split "3, k(a).w(10)" into parameters - string_params = extract_params(inner) + string_params = _extract_params(inner) logger.spam('%scalls %s with %s', space, call, string_params) # evaluate the params params = [ - parse_recurse(param.strip(), handler, None, depth + 1) + _parse_recurse(param.strip(), handler, None, depth + 1) for param in string_params ] @@ -245,7 +227,7 @@ def parse_recurse(macro, handler, macro_instance=None, depth=0): if len(macro) > position and macro[position] == '.': chain = macro[position + 1:] logger.spam('%sfollowed by %s', space, chain) - parse_recurse(chain, handler, macro_instance, depth) + _parse_recurse(chain, handler, macro_instance, depth) return macro_instance else: @@ -255,3 +237,24 @@ def parse_recurse(macro, handler, macro_instance=None, depth=0): except ValueError: pass return macro + + +def parse(macro, handler): + """parse and generate a _Macro that can be run as often as you want. + + Parameters + ---------- + macro : string + "r(3, k(a).w(10))" + "r(2, k(a).k(-)).k(b)" + "w(1000).m(SHIFT_L, r(2, k(a))).w(10, 20).k(b)" + handler : func + A function that accepts keycodes as the first parameter and the + key-press state as the second. 1 for down and 0 for up. The + macro will write to this function once executed with `.run()`. + """ + try: + return _parse_recurse(macro, handler) + except Exception as e: + logger.error('Failed to parse macro "%s": %s', macro, e) + return None diff --git a/tests/testcases/macros.py b/tests/testcases/macros.py index 43d17086..5f0c15a2 100644 --- a/tests/testcases/macros.py +++ b/tests/testcases/macros.py @@ -52,6 +52,7 @@ class TestMacros(unittest.TestCase): def test_3(self): parse('r(3, k(m).w(200))', self.handler).run() + # TODO test passed time self.assertListEqual(self.result, [ ('m', 1), ('m', 0), ('m', 1), ('m', 0), @@ -75,6 +76,8 @@ class TestMacros(unittest.TestCase): expected += [('w', 0)] expected += [('k', 1), ('k', 0)] expected *= 2 + # TODO test passed time + # TODO test asyncio self.assertListEqual(self.result, expected) def test_6(self):