macros.py split, improved type checking

xkb
sezanzeb 3 years ago
parent fc57ccb361
commit f812710bd0

@ -56,7 +56,7 @@ from keymapper.gui.helper import is_helper_running
from keymapper.injection.injector import RUNNING, FAILED, NO_GRAB from keymapper.injection.injector import RUNNING, FAILED, NO_GRAB
from keymapper.daemon import Daemon from keymapper.daemon import Daemon
from keymapper.config import config from keymapper.config import config
from keymapper.injection.macros import is_this_a_macro, parse from keymapper.injection.macros.parse import is_this_a_macro, parse
def gtk_iteration(): def gtk_iteration():

@ -23,7 +23,7 @@
from keymapper.logger import logger from keymapper.logger import logger
from keymapper.injection.macros import parse, is_this_a_macro from keymapper.injection.macros.parse import parse, is_this_a_macro
from keymapper.system_mapping import system_mapping from keymapper.system_mapping import system_mapping
from keymapper.config import NONE, MOUSE, WHEEL, BUTTONS from keymapper.config import NONE, MOUSE, WHEEL, BUTTONS
@ -59,7 +59,7 @@ class Context:
This is needed to query keycodes more efficiently without having This is needed to query keycodes more efficiently without having
to search mapping each time. to search mapping each time.
macros : dict macros : dict
Mapping of ((type, code, value),) to _Macro objects. Mapping of ((type, code, value),) to Macro objects.
Combinations work similar as in key_to_code Combinations work similar as in key_to_code
uinput : evdev.UInput uinput : evdev.UInput
Where to inject stuff to. This is an extra node in /dev so that Where to inject stuff to. This is an extra node in /dev so that

@ -36,119 +36,47 @@ w(1000).m(Shift_L, r(2, k(a))).w(10).k(b): <1s> A A <10ms> b
import asyncio import asyncio
import re
import traceback
import copy import copy
import multiprocessing
import atexit
import select
from evdev.ecodes import ecodes, EV_KEY, EV_REL, REL_X, REL_Y, REL_WHEEL, REL_HWHEEL from evdev.ecodes import ecodes, EV_KEY, EV_REL, REL_X, REL_Y, REL_WHEEL, REL_HWHEEL
from keymapper.logger import logger from keymapper.logger import logger
from keymapper.system_mapping import system_mapping from keymapper.system_mapping import system_mapping
from keymapper.ipc.shared_dict import SharedDict
from keymapper.utils import PRESS, PRESS_NEGATIVE from keymapper.utils import PRESS, PRESS_NEGATIVE
class SharedDict:
"""Share a dictionary across processes."""
# because unittests terminate all child processes in cleanup I can't use
# multiprocessing.Manager
def __init__(self):
"""Create a shared dictionary."""
super().__init__()
self.pipe = multiprocessing.Pipe()
self.process = None
atexit.register(self._stop)
self._start()
# To avoid blocking forever if something goes wrong. The maximum
# observed time communication takes was 0.001 for me on a slow pc
self._timeout = 0.02
def _start(self):
"""Ensure the process to manage the dictionary is running."""
if self.process is not None and self.process.is_alive():
return
# if the manager has already been running in the past but stopped
# for some reason, the dictionary contents are lost
self.process = multiprocessing.Process(target=self.manage)
self.process.start()
def manage(self):
"""Manage the dictionary, handle read and write requests."""
shared_dict = dict()
while True:
message = self.pipe[0].recv()
logger.spam("SharedDict got %s", message)
if message[0] == "stop":
return
if message[0] == "set":
shared_dict[message[1]] = message[2]
if message[0] == "get":
self.pipe[0].send(shared_dict.get(message[1]))
if message[0] == "ping":
self.pipe[0].send("pong")
def _stop(self):
"""Stop the managing process."""
self.pipe[1].send(("stop",))
def get(self, key):
"""Get a value from the dictionary."""
return self.__getitem__(key)
def is_alive(self, timeout=None):
"""Check if the manager process is running."""
self.pipe[1].send(("ping",))
select.select([self.pipe[1]], [], [], timeout or self._timeout)
if self.pipe[1].poll():
return self.pipe[1].recv() == "pong"
return False
def __setitem__(self, key, value):
self.pipe[1].send(("set", key, value))
def __getitem__(self, key):
self.pipe[1].send(("get", key))
select.select([self.pipe[1]], [], [], self._timeout)
if self.pipe[1].poll():
return self.pipe[1].recv()
logger.error("select.select timed out")
return None
def __del__(self):
self._stop()
macro_variables = SharedDict() macro_variables = SharedDict()
def is_this_a_macro(output): def type_check(display_name, value, allowed_types, position):
"""Figure out if this is a macro.""" """Validate a parameter used in a macro."""
if not isinstance(output, str): for allowed_type in allowed_types:
return False if allowed_type is None:
if value is None:
return value
else:
continue
# try to parse "1" as 1 if possible
try:
return allowed_type(value)
except (TypeError, ValueError):
pass
if "+" in output.strip(): if isinstance(value, allowed_type):
# for example "a + b" return value
return True
return "(" in output and ")" in output and len(output) >= 4 raise TypeError(
f"Expected parameter {position} for {display_name} to be "
f"one of {allowed_types}, but got {value}"
)
class _Macro: class Macro:
"""Supports chaining and preparing actions. """Supports chaining and preparing actions.
Calling functions like keycode on _Macro doesn't inject any events yet, Calling functions like keycode on Macro doesn't inject any events yet,
it means that once .run is used it will be executed along with all other it means that once .run is used it will be executed along with all other
queued tasks. queued tasks.
@ -296,7 +224,7 @@ class _Macro:
self.tasks.append(lambda _: self._holding_event.wait()) self.tasks.append(lambda _: self._holding_event.wait())
return return
if not isinstance(macro, _Macro): if not isinstance(macro, Macro):
# if macro is a key name, hold down the key while the # if macro is a key name, hold down the key while the
# keyboard key is physically held down # keyboard key is physically held down
symbol = str(macro) symbol = str(macro)
@ -311,7 +239,7 @@ class _Macro:
self.tasks.append(lambda handler: handler(EV_KEY, code, 0)) self.tasks.append(lambda handler: handler(EV_KEY, code, 0))
return return
if isinstance(macro, _Macro): if isinstance(macro, Macro):
# repeat the macro forever while the key is held down # repeat the macro forever while the key is held down
async def task(handler): async def task(handler):
while self.is_holding(): while self.is_holding():
@ -328,13 +256,9 @@ class _Macro:
Parameters Parameters
---------- ----------
modifier : str modifier : str
macro : _Macro macro : Macro
""" """
if not isinstance(macro, _Macro): type_check("m (modify)", macro, [Macro], 2)
raise ValueError(
"Expected the second param for m (modify) to be "
f"a macro (like k(a)), but got {macro}"
)
modifier = str(modifier) modifier = str(modifier)
code = system_mapping.get(modifier) code = system_mapping.get(modifier)
@ -358,22 +282,11 @@ class _Macro:
Parameters Parameters
---------- ----------
repeats : int or _Macro repeats : int or Macro
macro : _Macro macro : Macro
""" """
if not isinstance(macro, _Macro): repeats = type_check("r (repeat)", repeats, [int], 1)
raise ValueError( type_check("r (repeat)", macro, [Macro], 2)
"Expected the second param for r (repeat) to be "
f'a macro (like k(a)), but got "{macro}"'
)
try:
repeats = int(repeats)
except ValueError as error:
raise ValueError(
"Expected the first param for r (repeat) to be "
f'a number, but got "{repeats}"'
) from error
async def repeat(handler): async def repeat(handler):
for _ in range(repeats): for _ in range(repeats):
@ -437,6 +350,9 @@ class _Macro:
def mouse(self, direction, speed): def mouse(self, direction, speed):
"""Shortcut for h(e(...)).""" """Shortcut for h(e(...))."""
type_check("mouse", direction, [str], 1)
speed = type_check("mouse", speed, [int], 2)
code, value = { code, value = {
"up": (REL_Y, -1), "up": (REL_Y, -1),
"down": (REL_Y, 1), "down": (REL_Y, 1),
@ -444,34 +360,29 @@ class _Macro:
"right": (REL_X, 1), "right": (REL_X, 1),
}[direction.lower()] }[direction.lower()]
value *= speed value *= speed
child_macro = _Macro(None, self.context) child_macro = Macro(None, self.context)
child_macro.event(EV_REL, code, value) child_macro.event(EV_REL, code, value)
self.hold(child_macro) self.hold(child_macro)
def wheel(self, direction, speed): def wheel(self, direction, speed):
"""Shortcut for h(e(...)).""" """Shortcut for h(e(...))."""
type_check("wheel", direction, [str], 1)
speed = type_check("wheel", speed, [int], 2)
code, value = { code, value = {
"up": (REL_WHEEL, 1), "up": (REL_WHEEL, 1),
"down": (REL_WHEEL, -1), "down": (REL_WHEEL, -1),
"left": (REL_HWHEEL, 1), "left": (REL_HWHEEL, 1),
"right": (REL_HWHEEL, -1), "right": (REL_HWHEEL, -1),
}[direction.lower()] }[direction.lower()]
child_macro = _Macro(None, self.context) child_macro = Macro(None, self.context)
child_macro.event(EV_REL, code, value) child_macro.event(EV_REL, code, value)
child_macro.wait(100 / speed) child_macro.wait(100 / speed)
self.hold(child_macro) self.hold(child_macro)
def wait(self, sleeptime): def wait(self, sleeptime):
"""Wait time in milliseconds.""" """Wait time in milliseconds."""
try: sleeptime = type_check("wait", sleeptime, [int, float], 1) / 1000
sleeptime = int(sleeptime)
except ValueError as error:
raise ValueError(
"Expected the param for w (wait) to be "
f'a number, but got "{sleeptime}"'
) from error
sleeptime /= 1000
async def sleep(_): async def sleep(_):
await asyncio.sleep(sleeptime) await asyncio.sleep(sleeptime)
@ -494,20 +405,11 @@ class _Macro:
---------- ----------
variable : string variable : string
value : string | number value : string | number
then : _Macro | None then : Macro | None
otherwise : _Macro | None otherwise : Macro | None
""" """
if then and not isinstance(then, _Macro): type_check("ifeq", then, [Macro, None], 1)
raise ValueError( type_check("ifeq", otherwise, [Macro, None], 2)
"Expected the third param for ifeq to be "
f'a macro (like k(a)), but got "{then}"'
)
if otherwise and not isinstance(otherwise, _Macro):
raise ValueError(
"Expected the fourth param for ifeq to be "
f'a macro (like k(a)), but got "{otherwise}"'
)
async def ifeq(handler): async def ifeq(handler):
set_value = macro_variables.get(variable) set_value = macro_variables.get(variable)
@ -518,9 +420,9 @@ class _Macro:
elif otherwise is not None: elif otherwise is not None:
await otherwise.run(handler) await otherwise.run(handler)
if isinstance(then, _Macro): if isinstance(then, Macro):
self.child_macros.append(then) self.child_macros.append(then)
if isinstance(otherwise, _Macro): if isinstance(otherwise, Macro):
self.child_macros.append(otherwise) self.child_macros.append(otherwise)
self.tasks.append(ifeq) self.tasks.append(ifeq)
@ -530,25 +432,17 @@ class _Macro:
Parameters Parameters
---------- ----------
then : _Macro | None then : Macro | None
otherwise : _Macro | None otherwise : Macro | None
timeout : int timeout : int
""" """
if then and not isinstance(then, _Macro): type_check("if_tap", then, [Macro, None], 1)
raise ValueError( type_check("if_tap", otherwise, [Macro, None], 2)
"Expected the first param for if_tap to be " timeout = type_check("if_tap", timeout, [int], 3)
f'a macro (like k(a)), but got "{then}"'
) if isinstance(then, Macro):
if otherwise and not isinstance(otherwise, _Macro):
raise ValueError(
"Expected the second param for if_tap to be "
f'a macro (like k(a)), but got "{otherwise}"'
)
if isinstance(then, _Macro):
self.child_macros.append(then) self.child_macros.append(then)
if isinstance(otherwise, _Macro): if isinstance(otherwise, Macro):
self.child_macros.append(otherwise) self.child_macros.append(otherwise)
async def if_tap(handler): async def if_tap(handler):
@ -568,24 +462,15 @@ class _Macro:
Parameters Parameters
---------- ----------
then : _Macro | None then : Macro | None
otherwise : _Macro | None otherwise : Macro | None
""" """
if then and not isinstance(then, _Macro): type_check("if_single", then, [Macro, None], 1)
raise ValueError( type_check("if_single", otherwise, [Macro, None], 2)
"Expected the first param for if_tap to be "
f'a macro (like k(a)), but got "{then}"' if isinstance(then, Macro):
)
if otherwise and not isinstance(otherwise, _Macro):
raise ValueError(
"Expected the second param for if_tap to be "
f'a macro (like k(a)), but got "{otherwise}"'
)
if isinstance(then, _Macro):
self.child_macros.append(then) self.child_macros.append(then)
if isinstance(otherwise, _Macro): if isinstance(otherwise, Macro):
self.child_macros.append(otherwise) self.child_macros.append(otherwise)
async def if_single(handler): async def if_single(handler):
@ -612,232 +497,3 @@ class _Macro:
await otherwise.run(handler) await otherwise.run(handler)
self.tasks.append(if_single) self.tasks.append(if_single)
def _extract_params(inner):
"""Extract parameters from the inner contents of a call.
This does not parse them.
Parameters
----------
inner : string
for example '1, r, r(2, k(a))' should result in ['1', 'r', 'r(2, k(a))']
"""
inner = inner.strip()
brackets = 0
params = []
start = 0
for position, char in enumerate(inner):
if char == "(":
brackets += 1
if char == ")":
brackets -= 1
if char == "," and brackets == 0:
# , potentially starts another parameter, but only if
# the current brackets are all closed.
params.append(inner[start:position].strip())
# skip the comma
start = position + 1
# one last parameter
params.append(inner[start:].strip())
return params
def _count_brackets(macro):
"""Find where the first opening bracket closes."""
openings = macro.count("(")
closings = macro.count(")")
if openings != closings:
raise Exception(
f"You entered {openings} opening and {closings} " "closing brackets"
)
brackets = 0
position = 0
for char in macro:
position += 1
if char == "(":
brackets += 1
continue
if char == ")":
brackets -= 1
if brackets == 0:
# the closing bracket of the call
break
return position
def _parse_recurse(macro, context, macro_instance=None, depth=0):
"""Handle a subset of the macro, e.g. one parameter or function call.
Parameters
----------
macro : string
Just like parse
context : Context
macro_instance : _Macro or None
A macro instance to add tasks to
depth : int
"""
# not using eval for security reasons
assert isinstance(macro, str)
assert isinstance(depth, int)
if macro == "":
return None
if macro_instance is None:
macro_instance = _Macro(macro, context)
else:
assert isinstance(macro_instance, _Macro)
macro = macro.strip()
space = " " * depth
# is it another macro?
call_match = re.match(r"^(\w+)\(", macro)
call = call_match[1] if call_match else None
if call is not None:
# available functions in the macro and the minimum and maximum number
# of their parameters
functions = {
"m": (macro_instance.modify, 2, 2),
"r": (macro_instance.repeat, 2, 2),
"k": (macro_instance.keycode, 1, 1),
"e": (macro_instance.event, 3, 3),
"w": (macro_instance.wait, 1, 1),
"h": (macro_instance.hold, 0, 1),
"mouse": (macro_instance.mouse, 2, 2),
"wheel": (macro_instance.wheel, 2, 2),
"ifeq": (macro_instance.ifeq, 3, 4),
"set": (macro_instance.set, 2, 2),
"if_tap": (macro_instance.if_tap, 1, 3),
"if_single": (macro_instance.if_single, 1, 2),
}
function = functions.get(call)
if function is None:
raise Exception(f"Unknown function {call}")
# get all the stuff inbetween
position = _count_brackets(macro)
inner = macro[macro.index("(") + 1 : position - 1]
# split "3, k(a).w(10)" into parameters
string_params = _extract_params(inner)
logger.spam("%scalls %s with %s", space, call, string_params)
# evaluate the params
params = [
_parse_recurse(param.strip(), context, None, depth + 1)
for param in string_params
]
logger.spam("%sadd call to %s with %s", space, call, params)
if len(params) < function[1] or len(params) > function[2]:
if function[1] != function[2]:
msg = (
f"{call} takes between {function[1]} and {function[2]}, "
f"not {len(params)} parameters"
)
else:
msg = f"{call} takes {function[1]}, " f"not {len(params)} parameters"
raise ValueError(msg)
function[0](*params)
# is after this another call? Chain it to the macro_instance
if len(macro) > position and macro[position] == ".":
chain = macro[position + 1 :]
logger.spam("%sfollowed by %s", space, chain)
_parse_recurse(chain, context, macro_instance, depth)
return macro_instance
# probably a parameter for an outer function
try:
# if possible, parse as int
macro = int(macro)
except ValueError:
# use as string instead
pass
logger.spam("%s%s %s", space, type(macro), macro)
return macro
def handle_plus_syntax(macro):
"""transform a + b + c to m(a, m(b, m(c, h())))"""
if "+" not in macro:
return macro
if "(" in macro or ")" in macro:
logger.error('Mixing "+" and macros is unsupported: "%s"', macro)
return macro
chunks = [chunk.strip() for chunk in macro.split("+")]
output = ""
depth = 0
for chunk in chunks:
if chunk == "":
# invalid syntax
logger.error('Invalid syntax for "%s"', macro)
return macro
depth += 1
output += f"m({chunk},"
output += "h()"
output += depth * ")"
logger.debug('Transformed "%s" to "%s"', macro, output)
return output
def parse(macro, context, return_errors=False):
"""parse and generate a _Macro that can be run as often as you want.
If it could not be parsed, possibly due to syntax errors, will log the
error and return None.
Parameters
----------
macro : string
"r(3, k(a).w(10))"
"r(2, k(a).k(-)).k(b)"
"w(1000).m(Shift_L, r(2, k(a))).w(10, 20).k(b)"
context : Context
return_errors : bool
If True, returns errors as a string or None if parsing worked.
If False, returns the parsed macro.
"""
macro = handle_plus_syntax(macro)
# whitespaces, tabs, newlines and such don't serve a purpose. make
# the log output clearer and the parsing easier.
macro = re.sub(r"\s", "", macro)
if '"' in macro or "'" in macro:
logger.info("Quotation marks in macros are not needed")
macro = macro.replace('"', "").replace("'", "")
if return_errors:
logger.spam("checking the syntax of %s", macro)
else:
logger.spam("preparing macro %s for later execution", macro)
try:
macro_object = _parse_recurse(macro, context)
return macro_object if not return_errors else None
except Exception as error:
logger.error('Failed to parse macro "%s": %s', macro, error.__repr__())
# print the traceback in case this is a bug of key-mapper
logger.debug("".join(traceback.format_tb(error.__traceback__)).strip())
return f"{error.__class__.__name__}: {str(error)}" if return_errors else None

@ -0,0 +1,280 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# key-mapper - GUI for device specific keyboard mappings
# Copyright (C) 2021 sezanzeb <proxima@sezanzeb.de>
#
# This file is part of key-mapper.
#
# key-mapper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# key-mapper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with key-mapper. If not, see <https://www.gnu.org/licenses/>.
"""Parse macro code"""
import re
import traceback
import inspect
from keymapper.logger import logger
from keymapper.injection.macros.macro import Macro
def is_this_a_macro(output):
"""Figure out if this is a macro."""
if not isinstance(output, str):
return False
if "+" in output.strip():
# for example "a + b"
return True
return "(" in output and ")" in output and len(output) >= 4
FUNCTIONS = {
"m": Macro.modify,
"r": Macro.repeat,
"k": Macro.keycode,
"e": Macro.event,
"w": Macro.wait,
"h": Macro.hold,
"mouse": Macro.mouse,
"wheel": Macro.wheel,
"ifeq": Macro.ifeq,
"set": Macro.set,
"if_tap": Macro.if_tap,
"if_single": Macro.if_single,
}
def get_num_parameters(function):
"""Get the number of required parameters and the maximum number of parameters."""
fullargspec = inspect.getfullargspec(function)
num_args = len(fullargspec.args) - 1 # one is `self`
return num_args - len(fullargspec.defaults or ()), num_args
def _extract_params(inner):
"""Extract parameters from the inner contents of a call.
This does not parse them.
Parameters
----------
inner : string
for example '1, r, r(2, k(a))' should result in ['1', 'r', 'r(2, k(a))']
"""
inner = inner.strip()
brackets = 0
params = []
start = 0
for position, char in enumerate(inner):
if char == "(":
brackets += 1
if char == ")":
brackets -= 1
if char == "," and brackets == 0:
# , potentially starts another parameter, but only if
# the current brackets are all closed.
params.append(inner[start:position].strip())
# skip the comma
start = position + 1
# one last parameter
params.append(inner[start:].strip())
return params
def _count_brackets(macro):
"""Find where the first opening bracket closes."""
openings = macro.count("(")
closings = macro.count(")")
if openings != closings:
raise Exception(
f"You entered {openings} opening and {closings} " "closing brackets"
)
brackets = 0
position = 0
for char in macro:
position += 1
if char == "(":
brackets += 1
continue
if char == ")":
brackets -= 1
if brackets == 0:
# the closing bracket of the call
break
return position
def _parse_recurse(macro, context, macro_instance=None, depth=0):
"""Handle a subset of the macro, e.g. one parameter or function call.
Parameters
----------
macro : string
Just like parse
context : Context
macro_instance : Macro or None
A macro instance to add tasks to
depth : int
"""
# not using eval for security reasons
assert isinstance(macro, str)
assert isinstance(depth, int)
if macro == "":
return None
if macro_instance is None:
macro_instance = Macro(macro, context)
else:
assert isinstance(macro_instance, Macro)
macro = macro.strip()
space = " " * depth
# is it another macro?
call_match = re.match(r"^(\w+)\(", macro)
call = call_match[1] if call_match else None
if call is not None:
# available functions in the macro and the minimum and maximum number
# of their parameters
function = FUNCTIONS.get(call)
if function is None:
raise Exception(f"Unknown function {call}")
# get all the stuff inbetween
position = _count_brackets(macro)
inner = macro[macro.index("(") + 1 : position - 1]
# split "3, k(a).w(10)" into parameters
string_params = _extract_params(inner)
logger.spam("%scalls %s with %s", space, call, string_params)
# evaluate the params
params = [
_parse_recurse(param.strip(), context, None, depth + 1)
for param in string_params
]
logger.spam("%sadd call to %s with %s", space, call, params)
min_params, max_params = get_num_parameters(function)
if len(params) < min_params or len(params) > max_params:
if min_params != max_params:
msg = (
f"{call} takes between {min_params} and {max_params}, "
f"not {len(params)} parameters"
)
else:
msg = f"{call} takes {min_params}, " f"not {len(params)} parameters"
raise ValueError(msg)
function(macro_instance, *params)
# is after this another call? Chain it to the macro_instance
if len(macro) > position and macro[position] == ".":
chain = macro[position + 1 :]
logger.spam("%sfollowed by %s", space, chain)
_parse_recurse(chain, context, macro_instance, depth)
return macro_instance
# probably a parameter for an outer function
try:
# if possible, parse as int
macro = int(macro)
except ValueError:
# use as string instead
pass
logger.spam("%s%s %s", space, type(macro), macro)
return macro
def handle_plus_syntax(macro):
"""transform a + b + c to m(a, m(b, m(c, h())))"""
if "+" not in macro:
return macro
if "(" in macro or ")" in macro:
logger.error('Mixing "+" and macros is unsupported: "%s"', macro)
return macro
chunks = [chunk.strip() for chunk in macro.split("+")]
output = ""
depth = 0
for chunk in chunks:
if chunk == "":
# invalid syntax
logger.error('Invalid syntax for "%s"', macro)
return macro
depth += 1
output += f"m({chunk},"
output += "h()"
output += depth * ")"
logger.debug('Transformed "%s" to "%s"', macro, output)
return output
def parse(macro, context, return_errors=False):
"""parse and generate a Macro that can be run as often as you want.
If it could not be parsed, possibly due to syntax errors, will log the
error and return None.
Parameters
----------
macro : string
"r(3, k(a).w(10))"
"r(2, k(a).k(-)).k(b)"
"w(1000).m(Shift_L, r(2, k(a))).w(10, 20).k(b)"
context : Context
return_errors : bool
If True, returns errors as a string or None if parsing worked.
If False, returns the parsed macro.
"""
macro = handle_plus_syntax(macro)
# whitespaces, tabs, newlines and such don't serve a purpose. make
# the log output clearer and the parsing easier.
macro = re.sub(r"\s", "", macro)
if '"' in macro or "'" in macro:
logger.info("Quotation marks in macros are not needed")
macro = macro.replace('"', "").replace("'", "")
if return_errors:
logger.spam("checking the syntax of %s", macro)
else:
logger.spam("preparing macro %s for later execution", macro)
try:
macro_object = _parse_recurse(macro, context)
return macro_object if not return_errors else None
except Exception as error:
logger.error('Failed to parse macro "%s": %s', macro, error.__repr__())
# print the traceback in case this is a bug of key-mapper
logger.debug("".join(traceback.format_tb(error.__traceback__)).strip())
return f"{error.__class__.__name__}: {str(error)}" if return_errors else None

@ -0,0 +1,109 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# key-mapper - GUI for device specific keyboard mappings
# Copyright (C) 2021 sezanzeb <proxima@sezanzeb.de>
#
# This file is part of key-mapper.
#
# key-mapper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# key-mapper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with key-mapper. If not, see <https://www.gnu.org/licenses/>.
"""Share a dictionary across processes."""
import multiprocessing
import atexit
import select
from keymapper.logger import logger
class SharedDict:
"""Share a dictionary across processes."""
# because unittests terminate all child processes in cleanup I can't use
# multiprocessing.Manager
def __init__(self):
"""Create a shared dictionary."""
super().__init__()
self.pipe = multiprocessing.Pipe()
self.process = None
atexit.register(self._stop)
self._start()
# To avoid blocking forever if something goes wrong. The maximum
# observed time communication takes was 0.001 for me on a slow pc
self._timeout = 0.02
def _start(self):
"""Ensure the process to manage the dictionary is running."""
if self.process is not None and self.process.is_alive():
return
# if the manager has already been running in the past but stopped
# for some reason, the dictionary contents are lost
self.process = multiprocessing.Process(target=self.manage)
self.process.start()
def manage(self):
"""Manage the dictionary, handle read and write requests."""
shared_dict = dict()
while True:
message = self.pipe[0].recv()
logger.spam("SharedDict got %s", message)
if message[0] == "stop":
return
if message[0] == "set":
shared_dict[message[1]] = message[2]
if message[0] == "get":
self.pipe[0].send(shared_dict.get(message[1]))
if message[0] == "ping":
self.pipe[0].send("pong")
def _stop(self):
"""Stop the managing process."""
self.pipe[1].send(("stop",))
def get(self, key):
"""Get a value from the dictionary."""
return self.__getitem__(key)
def is_alive(self, timeout=None):
"""Check if the manager process is running."""
self.pipe[1].send(("ping",))
select.select([self.pipe[1]], [], [], timeout or self._timeout)
if self.pipe[1].poll():
return self.pipe[1].recv() == "pong"
return False
def __setitem__(self, key, value):
self.pipe[1].send(("set", key, value))
def __getitem__(self, key):
self.pipe[1].send(("get", key))
select.select([self.pipe[1]], [], [], self._timeout)
if self.pipe[1].poll():
return self.pipe[1].recv()
logger.error("select.select timed out")
return None
def __del__(self):
self._stop()

@ -522,7 +522,7 @@ from keymapper.groups import groups
from keymapper.system_mapping import system_mapping from keymapper.system_mapping import system_mapping
from keymapper.gui.custom_mapping import custom_mapping from keymapper.gui.custom_mapping import custom_mapping
from keymapper.paths import get_config_path from keymapper.paths import get_config_path
from keymapper.injection.macros import macro_variables from keymapper.injection.macros.macro import macro_variables
from keymapper.injection.consumers.keycode_mapper import active_macros, unreleased from keymapper.injection.consumers.keycode_mapper import active_macros, unreleased
# no need for a high number in tests # no need for a high number in tests

@ -62,7 +62,7 @@ from keymapper.gui.custom_mapping import custom_mapping
from keymapper.mapping import Mapping, DISABLE_CODE, DISABLE_NAME from keymapper.mapping import Mapping, DISABLE_CODE, DISABLE_NAME
from keymapper.config import config, NONE, MOUSE, WHEEL, BUTTONS from keymapper.config import config, NONE, MOUSE, WHEEL, BUTTONS
from keymapper.key import Key from keymapper.key import Key
from keymapper.injection.macros import parse from keymapper.injection.macros.parse import parse
from keymapper.injection.context import Context from keymapper.injection.context import Context
from keymapper.groups import groups, classify, GAMEPAD from keymapper.groups import groups, classify, GAMEPAD

@ -42,7 +42,7 @@ from keymapper.injection.consumers.keycode_mapper import (
subsets, subsets,
) )
from keymapper.system_mapping import system_mapping from keymapper.system_mapping import system_mapping
from keymapper.injection.macros import parse from keymapper.injection.macros.parse import parse
from keymapper.injection.context import Context from keymapper.injection.context import Context
from keymapper.utils import RELEASE, PRESS from keymapper.utils import RELEASE, PRESS
from keymapper.config import config, BUTTONS from keymapper.config import config, BUTTONS

@ -26,9 +26,9 @@ import multiprocessing
from evdev.ecodes import EV_REL, EV_KEY, REL_Y, REL_X, REL_WHEEL, REL_HWHEEL from evdev.ecodes import EV_REL, EV_KEY, REL_Y, REL_X, REL_WHEEL, REL_HWHEEL
from keymapper.injection.macros import ( from keymapper.injection.macros.macro import Macro, type_check
from keymapper.injection.macros.parse import (
parse, parse,
_Macro,
_extract_params, _extract_params,
is_this_a_macro, is_this_a_macro,
_parse_recurse, _parse_recurse,
@ -68,6 +68,23 @@ class TestMacros(unittest.IsolatedAsyncioTestCase):
print(f"\033[90mmacro wrote{(ev_type, code, value)}\033[0m") print(f"\033[90mmacro wrote{(ev_type, code, value)}\033[0m")
self.result.append((ev_type, code, value)) self.result.append((ev_type, code, value))
def test_type_check(self):
# allows params that can be cast to the target type
self.assertEqual(type_check("foo", 1, [str, None], 0), "1")
self.assertEqual(type_check("foo", "1", [int, None], 1), 1)
self.assertEqual(type_check("foo", 1.2, [str], 2), "1.2")
self.assertRaises(TypeError, lambda: type_check("foo", "1.2", [int], 3), 1.2)
self.assertRaises(TypeError, lambda: type_check("foo", "a", [None], 0))
self.assertRaises(TypeError, lambda: type_check("foo", "a", [int], 1))
self.assertRaises(TypeError, lambda: type_check("foo", "a", [int, float], 2))
self.assertRaises(TypeError, lambda: type_check("foo", "a", [int, float, None], 3))
self.assertEqual(type_check("foo", "a", [int, float, None, str], 4), "a")
self.assertRaises(TypeError, lambda: type_check("foo", "a", [Macro], 0))
self.assertRaises(TypeError, lambda: type_check("foo", 1, [Macro], 0))
self.assertEqual(type_check("foo", "1", [Macro, int], 4), 1)
async def test_is_this_a_macro(self): async def test_is_this_a_macro(self):
self.assertTrue(is_this_a_macro("k(1)")) self.assertTrue(is_this_a_macro("k(1)"))
self.assertTrue(is_this_a_macro("k(1).k(2)")) self.assertTrue(is_this_a_macro("k(1).k(2)"))
@ -214,36 +231,80 @@ class TestMacros(unittest.IsolatedAsyncioTestCase):
self.assertIn("bracket", error) self.assertIn("bracket", error)
error = parse("k((1).k)", self.context, return_errors=True) error = parse("k((1).k)", self.context, return_errors=True)
self.assertIsNotNone(error) self.assertIsNotNone(error)
error = parse("r(a, k(1))", self.context, return_errors=True)
self.assertIsNotNone(error)
error = parse("k()", self.context, return_errors=True) error = parse("k()", self.context, return_errors=True)
self.assertIsNotNone(error) self.assertIsNotNone(error)
error = parse("k(1)", self.context, return_errors=True) error = parse("k(1)", self.context, return_errors=True)
self.assertIsNone(error) self.assertIsNone(error)
error = parse("k(1, 1)", self.context, return_errors=True) error = parse("k(1, 1)", self.context, return_errors=True)
self.assertIsNotNone(error) self.assertIsNotNone(error)
error = parse("h(1, 1)", self.context, return_errors=True) error = parse("h(1, 1)", self.context, return_errors=True)
self.assertIsNotNone(error) self.assertIsNotNone(error)
error = parse("h(h(h(1, 1)))", self.context, return_errors=True) error = parse("h(h(h(1, 1)))", self.context, return_errors=True)
self.assertIsNotNone(error) self.assertIsNotNone(error)
error = parse("r(1)", self.context, return_errors=True) error = parse("r(1)", self.context, return_errors=True)
self.assertIsNotNone(error) self.assertIsNotNone(error)
error = parse("r(a, k(1))", self.context, return_errors=True)
self.assertIsNotNone(error)
error = parse("r(1, 1)", self.context, return_errors=True) error = parse("r(1, 1)", self.context, return_errors=True)
self.assertIsNotNone(error) self.assertIsNotNone(error)
error = parse("r(k(1), 1)", self.context, return_errors=True) error = parse("r(k(1), 1)", self.context, return_errors=True)
self.assertIsNotNone(error) self.assertIsNotNone(error)
error = parse("r(1.2, k(1))", self.context, return_errors=True)
self.assertIsNotNone(error)
error = parse("r(1, k(1))", self.context, return_errors=True) error = parse("r(1, k(1))", self.context, return_errors=True)
self.assertIsNone(error) self.assertIsNone(error)
error = parse("m(asdf, k(a))", self.context, return_errors=True) error = parse("m(asdf, k(a))", self.context, return_errors=True)
self.assertIsNotNone(error) self.assertIsNotNone(error)
error = parse("if_tap(, k(a), 1000)", self.context, return_errors=True) error = parse("if_tap(, k(a), 1000)", self.context, return_errors=True)
self.assertIsNone(error) self.assertIsNone(error)
error = parse("if_tap(, k(a))", self.context, return_errors=True)
self.assertIsNone(error)
error = parse("if_tap(k(a),)", self.context, return_errors=True)
self.assertIsNone(error)
error = parse("if_tap(k(a), b)", self.context, return_errors=True)
self.assertIsNotNone(error)
error = parse("if_single(k(a),)", self.context, return_errors=True) error = parse("if_single(k(a),)", self.context, return_errors=True)
self.assertIsNone(error) self.assertIsNone(error)
error = parse("if_single(1,)", self.context, return_errors=True)
self.assertIsNotNone(error)
error = parse("if_single(,1)", self.context, return_errors=True)
self.assertIsNotNone(error)
error = parse("mouse(up, 3)", self.context, return_errors=True)
self.assertIsNone(error)
error = parse("mouse(3, up)", self.context, return_errors=True)
self.assertIsNotNone(error)
error = parse("wheel(left, 3)", self.context, return_errors=True)
self.assertIsNone(error)
error = parse("wheel(3, left)", self.context, return_errors=True)
self.assertIsNotNone(error)
error = parse("w(2)", self.context, return_errors=True)
self.assertIsNone(error)
error = parse("w(a)", self.context, return_errors=True)
self.assertIsNotNone(error)
error = parse("ifeq(a, 2, k(a),)", self.context, return_errors=True)
self.assertIsNone(error)
error = parse("ifeq(a, 2, , k(a))", self.context, return_errors=True)
self.assertIsNone(error)
error = parse("ifeq(a, 2, 1,)", self.context, return_errors=True)
self.assertIsNotNone(error)
error = parse("ifeq(a, 2, , 2)", self.context, return_errors=True)
self.assertIsNotNone(error)
error = parse("foo(a)", self.context, return_errors=True) error = parse("foo(a)", self.context, return_errors=True)
self.assertIn("unknown", error.lower()) self.assertIn("unknown", error.lower())
self.assertIn("foo", error) self.assertIn("foo", error)
async def test_hold(self): async def test_hold(self):
# repeats k(a) as long as the key is held down # repeats k(a) as long as the key is held down
macro = parse("k(1).h(k(a)).k(3)", self.context) macro = parse("k(1).h(k(a)).k(3)", self.context)
@ -490,7 +551,7 @@ class TestMacros(unittest.IsolatedAsyncioTestCase):
async def test_6(self): async def test_6(self):
# does nothing without .run # does nothing without .run
macro = parse("k(a).r(3, k(b))", self.context) macro = parse("k(a).r(3, k(b))", self.context)
self.assertIsInstance(macro, _Macro) self.assertIsInstance(macro, Macro)
self.assertListEqual(self.result, []) self.assertListEqual(self.result, [])
async def test_keystroke_sleep_config(self): async def test_keystroke_sleep_config(self):

Loading…
Cancel
Save