inserted the unfinished macro parsing and running into the injector

xkb
sezanzeb 4 years ago committed by sezanzeb
parent 82fcafe8a1
commit f0e57c5276

@ -33,7 +33,8 @@ import evdev
from keymapper.logger import logger from keymapper.logger import logger
from keymapper.getdevices import get_devices from keymapper.getdevices import get_devices
from keymapper.state import custom_mapping, system_mapping from keymapper.state import system_mapping
from keymapper.dev.macros import parse
DEV_NAME = 'key-mapper' DEV_NAME = 'key-mapper'
@ -217,6 +218,15 @@ class KeycodeInjector:
loop.run_until_complete(asyncio.gather(*coroutines)) loop.run_until_complete(asyncio.gather(*coroutines))
def _write(self, device, keycode, value):
"""Actually inject."""
device.write(
evdev.ecodes.EV_KEY,
keycode - KEYCODE_OFFSET,
value
)
device.syn()
async def _injection_loop(self, device, keymapper_device): async def _injection_loop(self, device, keymapper_device):
"""Inject keycodes for one of the virtual devices. """Inject keycodes for one of the virtual devices.
@ -250,16 +260,24 @@ class KeycodeInjector:
# unknown keycode, forward it # unknown keycode, forward it
target_keycode = input_keycode target_keycode = input_keycode
elif '(' in character: elif '(' in character:
# must be a macro. Only allow it if the injector is not # must be a macro
# running as root. logger.spam(
if os.geteuid() == 0: 'got code:%s value:%s, maps to macro %s',
logger.error( event.code + KEYCODE_OFFSET,
'Cannot allow running macros as root to avoid ' event.value,
'injecting arbitrary code' character
)
# TODO prepare this beforehand, not on each keystroke
parse(
character,
handler=lambda keycode, value: (
self._write(
keymapper_device,
target_keycode,
event.value
)
) )
continue ).run()
Macro()
else: else:
target_keycode = system_mapping.get_keycode(character) target_keycode = system_mapping.get_keycode(character)
if target_keycode is None: if target_keycode is None:
@ -277,12 +295,7 @@ class KeycodeInjector:
character character
) )
keymapper_device.write( self._write(keymapper_device, target_keycode, event.value)
evdev.ecodes.EV_KEY,
target_keycode - KEYCODE_OFFSET,
event.value
)
keymapper_device.syn()
# this should only ever happen in tests to avoid blocking them # this should only ever happen in tests to avoid blocking them
# forever, as soon as all events are consumed. In normal operation # forever, as soon as all events are consumed. In normal operation

@ -45,7 +45,7 @@ import random
from keymapper.logger import logger from keymapper.logger import logger
class Macro: class _Macro:
"""Supports chaining and preparing actions.""" """Supports chaining and preparing actions."""
def __init__(self, handler): def __init__(self, handler):
"""Create a macro instance that can be populated with tasks. """Create a macro instance that can be populated with tasks.
@ -76,7 +76,7 @@ class Macro:
Parameters Parameters
---------- ----------
modifier : str modifier : str
macro : Macro macro : _Macro
""" """
self.tasks.append(lambda: self.handler(modifier, 1)) self.tasks.append(lambda: self.handler(modifier, 1))
self.tasks.append(macro.run) self.tasks.append(macro.run)
@ -89,7 +89,7 @@ class Macro:
Parameters Parameters
---------- ----------
repeats : int repeats : int
macro : Macro macro : _Macro
""" """
for _ in range(repeats): for _ in range(repeats):
self.tasks.append(macro.run) self.tasks.append(macro.run)
@ -108,25 +108,7 @@ class Macro:
return self return self
def parse(macro, handler): def _extract_params(inner):
"""parse and generate a Macro that can be run as often as you want.
Parameters
----------
macro : string
"r(3, k(a).w(10))"
"r(2, k(a).k(-)).k(b)"
"w(1000).m(SHIFT_L, r(2, k(a))).w(10, 20).k(b)"
handler : func
A function that accepts keycodes as the first parameter and the
key-press state as the second. 1 for down and 0 for up. The
macro will write to this function once executed with `.run()`.
"""
# simpler function prototype and docstring than parse_recurse
return parse_recurse(macro, handler)
def extract_params(inner):
"""Extract parameters from the inner contents of a call. """Extract parameters from the inner contents of a call.
Parameters Parameters
@ -157,7 +139,7 @@ def extract_params(inner):
return params return params
def parse_recurse(macro, handler, macro_instance=None, depth=0): def _parse_recurse(macro, handler, macro_instance=None, depth=0):
"""Handle a subset of the macro, e.g. one parameter or function call. """Handle a subset of the macro, e.g. one parameter or function call.
Parameters Parameters
@ -165,8 +147,8 @@ def parse_recurse(macro, handler, macro_instance=None, depth=0):
macro : string macro : string
Just like parse Just like parse
handler : function handler : function
passed to Macro constructors passed to _Macro constructors
macro_instance : Macro or None macro_instance : _Macro or None
A macro instance to add tasks to A macro instance to add tasks to
depth : int depth : int
For logging and debugging purposes For logging and debugging purposes
@ -182,9 +164,9 @@ def parse_recurse(macro, handler, macro_instance=None, depth=0):
assert isinstance(depth, int) assert isinstance(depth, int)
if macro_instance is None: if macro_instance is None:
macro_instance = Macro(handler) macro_instance = _Macro(handler)
else: else:
assert isinstance(macro_instance, Macro) assert isinstance(macro_instance, _Macro)
macro = macro.strip() macro = macro.strip()
logger.spam('%sinput %s', ' ' * depth, macro) logger.spam('%sinput %s', ' ' * depth, macro)
@ -230,11 +212,11 @@ def parse_recurse(macro, handler, macro_instance=None, depth=0):
inner = macro[2:position - 1] inner = macro[2:position - 1]
# split "3, k(a).w(10)" into parameters # split "3, k(a).w(10)" into parameters
string_params = extract_params(inner) string_params = _extract_params(inner)
logger.spam('%scalls %s with %s', space, call, string_params) logger.spam('%scalls %s with %s', space, call, string_params)
# evaluate the params # evaluate the params
params = [ params = [
parse_recurse(param.strip(), handler, None, depth + 1) _parse_recurse(param.strip(), handler, None, depth + 1)
for param in string_params for param in string_params
] ]
@ -245,7 +227,7 @@ def parse_recurse(macro, handler, macro_instance=None, depth=0):
if len(macro) > position and macro[position] == '.': if len(macro) > position and macro[position] == '.':
chain = macro[position + 1:] chain = macro[position + 1:]
logger.spam('%sfollowed by %s', space, chain) logger.spam('%sfollowed by %s', space, chain)
parse_recurse(chain, handler, macro_instance, depth) _parse_recurse(chain, handler, macro_instance, depth)
return macro_instance return macro_instance
else: else:
@ -255,3 +237,24 @@ def parse_recurse(macro, handler, macro_instance=None, depth=0):
except ValueError: except ValueError:
pass pass
return macro return macro
def parse(macro, handler):
"""parse and generate a _Macro that can be run as often as you want.
Parameters
----------
macro : string
"r(3, k(a).w(10))"
"r(2, k(a).k(-)).k(b)"
"w(1000).m(SHIFT_L, r(2, k(a))).w(10, 20).k(b)"
handler : func
A function that accepts keycodes as the first parameter and the
key-press state as the second. 1 for down and 0 for up. The
macro will write to this function once executed with `.run()`.
"""
try:
return _parse_recurse(macro, handler)
except Exception as e:
logger.error('Failed to parse macro "%s": %s', macro, e)
return None

@ -52,6 +52,7 @@ class TestMacros(unittest.TestCase):
def test_3(self): def test_3(self):
parse('r(3, k(m).w(200))', self.handler).run() parse('r(3, k(m).w(200))', self.handler).run()
# TODO test passed time
self.assertListEqual(self.result, [ self.assertListEqual(self.result, [
('m', 1), ('m', 0), ('m', 1), ('m', 0),
('m', 1), ('m', 0), ('m', 1), ('m', 0),
@ -75,6 +76,8 @@ class TestMacros(unittest.TestCase):
expected += [('w', 0)] expected += [('w', 0)]
expected += [('k', 1), ('k', 0)] expected += [('k', 1), ('k', 0)]
expected *= 2 expected *= 2
# TODO test passed time
# TODO test asyncio
self.assertListEqual(self.result, expected) self.assertListEqual(self.result, expected)
def test_6(self): def test_6(self):

Loading…
Cancel
Save