diff --git a/keymapper/gui/window.py b/keymapper/gui/window.py index 8a77c0ce..47d45091 100755 --- a/keymapper/gui/window.py +++ b/keymapper/gui/window.py @@ -56,7 +56,7 @@ from keymapper.gui.helper import is_helper_running from keymapper.injection.injector import RUNNING, FAILED, NO_GRAB from keymapper.daemon import Daemon from keymapper.config import config -from keymapper.injection.macros import is_this_a_macro, parse +from keymapper.injection.macros.parse import is_this_a_macro, parse def gtk_iteration(): diff --git a/keymapper/injection/context.py b/keymapper/injection/context.py index 00efc38c..5752664e 100644 --- a/keymapper/injection/context.py +++ b/keymapper/injection/context.py @@ -23,7 +23,7 @@ from keymapper.logger import logger -from keymapper.injection.macros import parse, is_this_a_macro +from keymapper.injection.macros.parse import parse, is_this_a_macro from keymapper.system_mapping import system_mapping from keymapper.config import NONE, MOUSE, WHEEL, BUTTONS @@ -59,7 +59,7 @@ class Context: This is needed to query keycodes more efficiently without having to search mapping each time. macros : dict - Mapping of ((type, code, value),) to _Macro objects. + Mapping of ((type, code, value),) to Macro objects. Combinations work similar as in key_to_code uinput : evdev.UInput Where to inject stuff to. This is an extra node in /dev so that diff --git a/keymapper/injection/macros/__init__.py b/keymapper/injection/macros/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/keymapper/injection/macros.py b/keymapper/injection/macros/macro.py similarity index 51% rename from keymapper/injection/macros.py rename to keymapper/injection/macros/macro.py index ebe3f5e7..4b9c6992 100644 --- a/keymapper/injection/macros.py +++ b/keymapper/injection/macros/macro.py @@ -36,119 +36,47 @@ w(1000).m(Shift_L, r(2, k(a))).w(10).k(b): <1s> A A <10ms> b import asyncio -import re -import traceback import copy -import multiprocessing -import atexit -import select from evdev.ecodes import ecodes, EV_KEY, EV_REL, REL_X, REL_Y, REL_WHEEL, REL_HWHEEL from keymapper.logger import logger from keymapper.system_mapping import system_mapping +from keymapper.ipc.shared_dict import SharedDict from keymapper.utils import PRESS, PRESS_NEGATIVE -class SharedDict: - """Share a dictionary across processes.""" - - # because unittests terminate all child processes in cleanup I can't use - # multiprocessing.Manager - def __init__(self): - """Create a shared dictionary.""" - super().__init__() - self.pipe = multiprocessing.Pipe() - self.process = None - atexit.register(self._stop) - self._start() - - # To avoid blocking forever if something goes wrong. The maximum - # observed time communication takes was 0.001 for me on a slow pc - self._timeout = 0.02 - - def _start(self): - """Ensure the process to manage the dictionary is running.""" - if self.process is not None and self.process.is_alive(): - return - - # if the manager has already been running in the past but stopped - # for some reason, the dictionary contents are lost - self.process = multiprocessing.Process(target=self.manage) - self.process.start() - - def manage(self): - """Manage the dictionary, handle read and write requests.""" - shared_dict = dict() - while True: - message = self.pipe[0].recv() - logger.spam("SharedDict got %s", message) - - if message[0] == "stop": - return - - if message[0] == "set": - shared_dict[message[1]] = message[2] - - if message[0] == "get": - self.pipe[0].send(shared_dict.get(message[1])) - - if message[0] == "ping": - self.pipe[0].send("pong") - - def _stop(self): - """Stop the managing process.""" - self.pipe[1].send(("stop",)) - - def get(self, key): - """Get a value from the dictionary.""" - return self.__getitem__(key) - - def is_alive(self, timeout=None): - """Check if the manager process is running.""" - self.pipe[1].send(("ping",)) - select.select([self.pipe[1]], [], [], timeout or self._timeout) - if self.pipe[1].poll(): - return self.pipe[1].recv() == "pong" - - return False - - def __setitem__(self, key, value): - self.pipe[1].send(("set", key, value)) - - def __getitem__(self, key): - self.pipe[1].send(("get", key)) - - select.select([self.pipe[1]], [], [], self._timeout) - if self.pipe[1].poll(): - return self.pipe[1].recv() - - logger.error("select.select timed out") - return None - - def __del__(self): - self._stop() - - macro_variables = SharedDict() -def is_this_a_macro(output): - """Figure out if this is a macro.""" - if not isinstance(output, str): - return False +def type_check(display_name, value, allowed_types, position): + """Validate a parameter used in a macro.""" + for allowed_type in allowed_types: + if allowed_type is None: + if value is None: + return value + else: + continue + + # try to parse "1" as 1 if possible + try: + return allowed_type(value) + except (TypeError, ValueError): + pass - if "+" in output.strip(): - # for example "a + b" - return True + if isinstance(value, allowed_type): + return value - return "(" in output and ")" in output and len(output) >= 4 + raise TypeError( + f"Expected parameter {position} for {display_name} to be " + f"one of {allowed_types}, but got {value}" + ) -class _Macro: +class Macro: """Supports chaining and preparing actions. - Calling functions like keycode on _Macro doesn't inject any events yet, + Calling functions like keycode on Macro doesn't inject any events yet, it means that once .run is used it will be executed along with all other queued tasks. @@ -296,7 +224,7 @@ class _Macro: self.tasks.append(lambda _: self._holding_event.wait()) return - if not isinstance(macro, _Macro): + if not isinstance(macro, Macro): # if macro is a key name, hold down the key while the # keyboard key is physically held down symbol = str(macro) @@ -311,7 +239,7 @@ class _Macro: self.tasks.append(lambda handler: handler(EV_KEY, code, 0)) return - if isinstance(macro, _Macro): + if isinstance(macro, Macro): # repeat the macro forever while the key is held down async def task(handler): while self.is_holding(): @@ -328,13 +256,9 @@ class _Macro: Parameters ---------- modifier : str - macro : _Macro + macro : Macro """ - if not isinstance(macro, _Macro): - raise ValueError( - "Expected the second param for m (modify) to be " - f"a macro (like k(a)), but got {macro}" - ) + type_check("m (modify)", macro, [Macro], 2) modifier = str(modifier) code = system_mapping.get(modifier) @@ -358,22 +282,11 @@ class _Macro: Parameters ---------- - repeats : int or _Macro - macro : _Macro + repeats : int or Macro + macro : Macro """ - if not isinstance(macro, _Macro): - raise ValueError( - "Expected the second param for r (repeat) to be " - f'a macro (like k(a)), but got "{macro}"' - ) - - try: - repeats = int(repeats) - except ValueError as error: - raise ValueError( - "Expected the first param for r (repeat) to be " - f'a number, but got "{repeats}"' - ) from error + repeats = type_check("r (repeat)", repeats, [int], 1) + type_check("r (repeat)", macro, [Macro], 2) async def repeat(handler): for _ in range(repeats): @@ -437,6 +350,9 @@ class _Macro: def mouse(self, direction, speed): """Shortcut for h(e(...)).""" + type_check("mouse", direction, [str], 1) + speed = type_check("mouse", speed, [int], 2) + code, value = { "up": (REL_Y, -1), "down": (REL_Y, 1), @@ -444,34 +360,29 @@ class _Macro: "right": (REL_X, 1), }[direction.lower()] value *= speed - child_macro = _Macro(None, self.context) + child_macro = Macro(None, self.context) child_macro.event(EV_REL, code, value) self.hold(child_macro) def wheel(self, direction, speed): """Shortcut for h(e(...)).""" + type_check("wheel", direction, [str], 1) + speed = type_check("wheel", speed, [int], 2) + code, value = { "up": (REL_WHEEL, 1), "down": (REL_WHEEL, -1), "left": (REL_HWHEEL, 1), "right": (REL_HWHEEL, -1), }[direction.lower()] - child_macro = _Macro(None, self.context) + child_macro = Macro(None, self.context) child_macro.event(EV_REL, code, value) child_macro.wait(100 / speed) self.hold(child_macro) def wait(self, sleeptime): """Wait time in milliseconds.""" - try: - sleeptime = int(sleeptime) - except ValueError as error: - raise ValueError( - "Expected the param for w (wait) to be " - f'a number, but got "{sleeptime}"' - ) from error - - sleeptime /= 1000 + sleeptime = type_check("wait", sleeptime, [int, float], 1) / 1000 async def sleep(_): await asyncio.sleep(sleeptime) @@ -494,20 +405,11 @@ class _Macro: ---------- variable : string value : string | number - then : _Macro | None - otherwise : _Macro | None + then : Macro | None + otherwise : Macro | None """ - if then and not isinstance(then, _Macro): - raise ValueError( - "Expected the third param for ifeq to be " - f'a macro (like k(a)), but got "{then}"' - ) - - if otherwise and not isinstance(otherwise, _Macro): - raise ValueError( - "Expected the fourth param for ifeq to be " - f'a macro (like k(a)), but got "{otherwise}"' - ) + type_check("ifeq", then, [Macro, None], 1) + type_check("ifeq", otherwise, [Macro, None], 2) async def ifeq(handler): set_value = macro_variables.get(variable) @@ -518,9 +420,9 @@ class _Macro: elif otherwise is not None: await otherwise.run(handler) - if isinstance(then, _Macro): + if isinstance(then, Macro): self.child_macros.append(then) - if isinstance(otherwise, _Macro): + if isinstance(otherwise, Macro): self.child_macros.append(otherwise) self.tasks.append(ifeq) @@ -530,25 +432,17 @@ class _Macro: Parameters ---------- - then : _Macro | None - otherwise : _Macro | None + then : Macro | None + otherwise : Macro | None timeout : int """ - if then and not isinstance(then, _Macro): - raise ValueError( - "Expected the first param for if_tap to be " - f'a macro (like k(a)), but got "{then}"' - ) - - if otherwise and not isinstance(otherwise, _Macro): - raise ValueError( - "Expected the second param for if_tap to be " - f'a macro (like k(a)), but got "{otherwise}"' - ) - - if isinstance(then, _Macro): + type_check("if_tap", then, [Macro, None], 1) + type_check("if_tap", otherwise, [Macro, None], 2) + timeout = type_check("if_tap", timeout, [int], 3) + + if isinstance(then, Macro): self.child_macros.append(then) - if isinstance(otherwise, _Macro): + if isinstance(otherwise, Macro): self.child_macros.append(otherwise) async def if_tap(handler): @@ -568,24 +462,15 @@ class _Macro: Parameters ---------- - then : _Macro | None - otherwise : _Macro | None + then : Macro | None + otherwise : Macro | None """ - if then and not isinstance(then, _Macro): - raise ValueError( - "Expected the first param for if_tap to be " - f'a macro (like k(a)), but got "{then}"' - ) - - if otherwise and not isinstance(otherwise, _Macro): - raise ValueError( - "Expected the second param for if_tap to be " - f'a macro (like k(a)), but got "{otherwise}"' - ) - - if isinstance(then, _Macro): + type_check("if_single", then, [Macro, None], 1) + type_check("if_single", otherwise, [Macro, None], 2) + + if isinstance(then, Macro): self.child_macros.append(then) - if isinstance(otherwise, _Macro): + if isinstance(otherwise, Macro): self.child_macros.append(otherwise) async def if_single(handler): @@ -612,232 +497,3 @@ class _Macro: await otherwise.run(handler) self.tasks.append(if_single) - - -def _extract_params(inner): - """Extract parameters from the inner contents of a call. - - This does not parse them. - - Parameters - ---------- - inner : string - for example '1, r, r(2, k(a))' should result in ['1', 'r', 'r(2, k(a))'] - """ - inner = inner.strip() - brackets = 0 - params = [] - start = 0 - for position, char in enumerate(inner): - if char == "(": - brackets += 1 - if char == ")": - brackets -= 1 - if char == "," and brackets == 0: - # , potentially starts another parameter, but only if - # the current brackets are all closed. - params.append(inner[start:position].strip()) - # skip the comma - start = position + 1 - - # one last parameter - params.append(inner[start:].strip()) - - return params - - -def _count_brackets(macro): - """Find where the first opening bracket closes.""" - openings = macro.count("(") - closings = macro.count(")") - if openings != closings: - raise Exception( - f"You entered {openings} opening and {closings} " "closing brackets" - ) - - brackets = 0 - position = 0 - for char in macro: - position += 1 - if char == "(": - brackets += 1 - continue - - if char == ")": - brackets -= 1 - if brackets == 0: - # the closing bracket of the call - break - - return position - - -def _parse_recurse(macro, context, macro_instance=None, depth=0): - """Handle a subset of the macro, e.g. one parameter or function call. - - Parameters - ---------- - macro : string - Just like parse - context : Context - macro_instance : _Macro or None - A macro instance to add tasks to - depth : int - """ - # not using eval for security reasons - assert isinstance(macro, str) - assert isinstance(depth, int) - - if macro == "": - return None - - if macro_instance is None: - macro_instance = _Macro(macro, context) - else: - assert isinstance(macro_instance, _Macro) - - macro = macro.strip() - space = " " * depth - - # is it another macro? - call_match = re.match(r"^(\w+)\(", macro) - call = call_match[1] if call_match else None - if call is not None: - # available functions in the macro and the minimum and maximum number - # of their parameters - functions = { - "m": (macro_instance.modify, 2, 2), - "r": (macro_instance.repeat, 2, 2), - "k": (macro_instance.keycode, 1, 1), - "e": (macro_instance.event, 3, 3), - "w": (macro_instance.wait, 1, 1), - "h": (macro_instance.hold, 0, 1), - "mouse": (macro_instance.mouse, 2, 2), - "wheel": (macro_instance.wheel, 2, 2), - "ifeq": (macro_instance.ifeq, 3, 4), - "set": (macro_instance.set, 2, 2), - "if_tap": (macro_instance.if_tap, 1, 3), - "if_single": (macro_instance.if_single, 1, 2), - } - - function = functions.get(call) - if function is None: - raise Exception(f"Unknown function {call}") - - # get all the stuff inbetween - position = _count_brackets(macro) - - inner = macro[macro.index("(") + 1 : position - 1] - - # split "3, k(a).w(10)" into parameters - string_params = _extract_params(inner) - logger.spam("%scalls %s with %s", space, call, string_params) - # evaluate the params - params = [ - _parse_recurse(param.strip(), context, None, depth + 1) - for param in string_params - ] - - logger.spam("%sadd call to %s with %s", space, call, params) - - if len(params) < function[1] or len(params) > function[2]: - if function[1] != function[2]: - msg = ( - f"{call} takes between {function[1]} and {function[2]}, " - f"not {len(params)} parameters" - ) - else: - msg = f"{call} takes {function[1]}, " f"not {len(params)} parameters" - - raise ValueError(msg) - - function[0](*params) - - # is after this another call? Chain it to the macro_instance - if len(macro) > position and macro[position] == ".": - chain = macro[position + 1 :] - logger.spam("%sfollowed by %s", space, chain) - _parse_recurse(chain, context, macro_instance, depth) - - return macro_instance - - # probably a parameter for an outer function - try: - # if possible, parse as int - macro = int(macro) - except ValueError: - # use as string instead - pass - - logger.spam("%s%s %s", space, type(macro), macro) - return macro - - -def handle_plus_syntax(macro): - """transform a + b + c to m(a, m(b, m(c, h())))""" - if "+" not in macro: - return macro - - if "(" in macro or ")" in macro: - logger.error('Mixing "+" and macros is unsupported: "%s"', macro) - return macro - - chunks = [chunk.strip() for chunk in macro.split("+")] - output = "" - depth = 0 - for chunk in chunks: - if chunk == "": - # invalid syntax - logger.error('Invalid syntax for "%s"', macro) - return macro - - depth += 1 - output += f"m({chunk}," - - output += "h()" - output += depth * ")" - - logger.debug('Transformed "%s" to "%s"', macro, output) - return output - - -def parse(macro, context, return_errors=False): - """parse and generate a _Macro that can be run as often as you want. - - If it could not be parsed, possibly due to syntax errors, will log the - error and return None. - - Parameters - ---------- - macro : string - "r(3, k(a).w(10))" - "r(2, k(a).k(-)).k(b)" - "w(1000).m(Shift_L, r(2, k(a))).w(10, 20).k(b)" - context : Context - return_errors : bool - If True, returns errors as a string or None if parsing worked. - If False, returns the parsed macro. - """ - macro = handle_plus_syntax(macro) - - # whitespaces, tabs, newlines and such don't serve a purpose. make - # the log output clearer and the parsing easier. - macro = re.sub(r"\s", "", macro) - - if '"' in macro or "'" in macro: - logger.info("Quotation marks in macros are not needed") - macro = macro.replace('"', "").replace("'", "") - - if return_errors: - logger.spam("checking the syntax of %s", macro) - else: - logger.spam("preparing macro %s for later execution", macro) - - try: - macro_object = _parse_recurse(macro, context) - return macro_object if not return_errors else None - except Exception as error: - logger.error('Failed to parse macro "%s": %s', macro, error.__repr__()) - # print the traceback in case this is a bug of key-mapper - logger.debug("".join(traceback.format_tb(error.__traceback__)).strip()) - return f"{error.__class__.__name__}: {str(error)}" if return_errors else None diff --git a/keymapper/injection/macros/parse.py b/keymapper/injection/macros/parse.py new file mode 100644 index 00000000..11543ab5 --- /dev/null +++ b/keymapper/injection/macros/parse.py @@ -0,0 +1,280 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# key-mapper - GUI for device specific keyboard mappings +# Copyright (C) 2021 sezanzeb +# +# This file is part of key-mapper. +# +# key-mapper is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# key-mapper is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with key-mapper. If not, see . + + +"""Parse macro code""" + + +import re +import traceback +import inspect + +from keymapper.logger import logger +from keymapper.injection.macros.macro import Macro + + +def is_this_a_macro(output): + """Figure out if this is a macro.""" + if not isinstance(output, str): + return False + + if "+" in output.strip(): + # for example "a + b" + return True + + return "(" in output and ")" in output and len(output) >= 4 + + +FUNCTIONS = { + "m": Macro.modify, + "r": Macro.repeat, + "k": Macro.keycode, + "e": Macro.event, + "w": Macro.wait, + "h": Macro.hold, + "mouse": Macro.mouse, + "wheel": Macro.wheel, + "ifeq": Macro.ifeq, + "set": Macro.set, + "if_tap": Macro.if_tap, + "if_single": Macro.if_single, +} + + +def get_num_parameters(function): + """Get the number of required parameters and the maximum number of parameters.""" + fullargspec = inspect.getfullargspec(function) + num_args = len(fullargspec.args) - 1 # one is `self` + return num_args - len(fullargspec.defaults or ()), num_args + + +def _extract_params(inner): + """Extract parameters from the inner contents of a call. + + This does not parse them. + + Parameters + ---------- + inner : string + for example '1, r, r(2, k(a))' should result in ['1', 'r', 'r(2, k(a))'] + """ + inner = inner.strip() + brackets = 0 + params = [] + start = 0 + for position, char in enumerate(inner): + if char == "(": + brackets += 1 + if char == ")": + brackets -= 1 + if char == "," and brackets == 0: + # , potentially starts another parameter, but only if + # the current brackets are all closed. + params.append(inner[start:position].strip()) + # skip the comma + start = position + 1 + + # one last parameter + params.append(inner[start:].strip()) + + return params + + +def _count_brackets(macro): + """Find where the first opening bracket closes.""" + openings = macro.count("(") + closings = macro.count(")") + if openings != closings: + raise Exception( + f"You entered {openings} opening and {closings} " "closing brackets" + ) + + brackets = 0 + position = 0 + for char in macro: + position += 1 + if char == "(": + brackets += 1 + continue + + if char == ")": + brackets -= 1 + if brackets == 0: + # the closing bracket of the call + break + + return position + + +def _parse_recurse(macro, context, macro_instance=None, depth=0): + """Handle a subset of the macro, e.g. one parameter or function call. + + Parameters + ---------- + macro : string + Just like parse + context : Context + macro_instance : Macro or None + A macro instance to add tasks to + depth : int + """ + # not using eval for security reasons + assert isinstance(macro, str) + assert isinstance(depth, int) + + if macro == "": + return None + + if macro_instance is None: + macro_instance = Macro(macro, context) + else: + assert isinstance(macro_instance, Macro) + + macro = macro.strip() + space = " " * depth + + # is it another macro? + call_match = re.match(r"^(\w+)\(", macro) + call = call_match[1] if call_match else None + if call is not None: + # available functions in the macro and the minimum and maximum number + # of their parameters + function = FUNCTIONS.get(call) + if function is None: + raise Exception(f"Unknown function {call}") + + # get all the stuff inbetween + position = _count_brackets(macro) + + inner = macro[macro.index("(") + 1 : position - 1] + + # split "3, k(a).w(10)" into parameters + string_params = _extract_params(inner) + logger.spam("%scalls %s with %s", space, call, string_params) + # evaluate the params + params = [ + _parse_recurse(param.strip(), context, None, depth + 1) + for param in string_params + ] + + logger.spam("%sadd call to %s with %s", space, call, params) + + min_params, max_params = get_num_parameters(function) + if len(params) < min_params or len(params) > max_params: + if min_params != max_params: + msg = ( + f"{call} takes between {min_params} and {max_params}, " + f"not {len(params)} parameters" + ) + else: + msg = f"{call} takes {min_params}, " f"not {len(params)} parameters" + + raise ValueError(msg) + + function(macro_instance, *params) + + # is after this another call? Chain it to the macro_instance + if len(macro) > position and macro[position] == ".": + chain = macro[position + 1 :] + logger.spam("%sfollowed by %s", space, chain) + _parse_recurse(chain, context, macro_instance, depth) + + return macro_instance + + # probably a parameter for an outer function + try: + # if possible, parse as int + macro = int(macro) + except ValueError: + # use as string instead + pass + + logger.spam("%s%s %s", space, type(macro), macro) + return macro + + +def handle_plus_syntax(macro): + """transform a + b + c to m(a, m(b, m(c, h())))""" + if "+" not in macro: + return macro + + if "(" in macro or ")" in macro: + logger.error('Mixing "+" and macros is unsupported: "%s"', macro) + return macro + + chunks = [chunk.strip() for chunk in macro.split("+")] + output = "" + depth = 0 + for chunk in chunks: + if chunk == "": + # invalid syntax + logger.error('Invalid syntax for "%s"', macro) + return macro + + depth += 1 + output += f"m({chunk}," + + output += "h()" + output += depth * ")" + + logger.debug('Transformed "%s" to "%s"', macro, output) + return output + + +def parse(macro, context, return_errors=False): + """parse and generate a Macro that can be run as often as you want. + + If it could not be parsed, possibly due to syntax errors, will log the + error and return None. + + Parameters + ---------- + macro : string + "r(3, k(a).w(10))" + "r(2, k(a).k(-)).k(b)" + "w(1000).m(Shift_L, r(2, k(a))).w(10, 20).k(b)" + context : Context + return_errors : bool + If True, returns errors as a string or None if parsing worked. + If False, returns the parsed macro. + """ + macro = handle_plus_syntax(macro) + + # whitespaces, tabs, newlines and such don't serve a purpose. make + # the log output clearer and the parsing easier. + macro = re.sub(r"\s", "", macro) + + if '"' in macro or "'" in macro: + logger.info("Quotation marks in macros are not needed") + macro = macro.replace('"', "").replace("'", "") + + if return_errors: + logger.spam("checking the syntax of %s", macro) + else: + logger.spam("preparing macro %s for later execution", macro) + + try: + macro_object = _parse_recurse(macro, context) + return macro_object if not return_errors else None + except Exception as error: + logger.error('Failed to parse macro "%s": %s', macro, error.__repr__()) + # print the traceback in case this is a bug of key-mapper + logger.debug("".join(traceback.format_tb(error.__traceback__)).strip()) + return f"{error.__class__.__name__}: {str(error)}" if return_errors else None diff --git a/keymapper/ipc/shared_dict.py b/keymapper/ipc/shared_dict.py new file mode 100644 index 00000000..4131ba94 --- /dev/null +++ b/keymapper/ipc/shared_dict.py @@ -0,0 +1,109 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# key-mapper - GUI for device specific keyboard mappings +# Copyright (C) 2021 sezanzeb +# +# This file is part of key-mapper. +# +# key-mapper is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# key-mapper is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with key-mapper. If not, see . + + +"""Share a dictionary across processes.""" + + +import multiprocessing +import atexit +import select + +from keymapper.logger import logger + + +class SharedDict: + """Share a dictionary across processes.""" + + # because unittests terminate all child processes in cleanup I can't use + # multiprocessing.Manager + def __init__(self): + """Create a shared dictionary.""" + super().__init__() + self.pipe = multiprocessing.Pipe() + self.process = None + atexit.register(self._stop) + self._start() + + # To avoid blocking forever if something goes wrong. The maximum + # observed time communication takes was 0.001 for me on a slow pc + self._timeout = 0.02 + + def _start(self): + """Ensure the process to manage the dictionary is running.""" + if self.process is not None and self.process.is_alive(): + return + + # if the manager has already been running in the past but stopped + # for some reason, the dictionary contents are lost + self.process = multiprocessing.Process(target=self.manage) + self.process.start() + + def manage(self): + """Manage the dictionary, handle read and write requests.""" + shared_dict = dict() + while True: + message = self.pipe[0].recv() + logger.spam("SharedDict got %s", message) + + if message[0] == "stop": + return + + if message[0] == "set": + shared_dict[message[1]] = message[2] + + if message[0] == "get": + self.pipe[0].send(shared_dict.get(message[1])) + + if message[0] == "ping": + self.pipe[0].send("pong") + + def _stop(self): + """Stop the managing process.""" + self.pipe[1].send(("stop",)) + + def get(self, key): + """Get a value from the dictionary.""" + return self.__getitem__(key) + + def is_alive(self, timeout=None): + """Check if the manager process is running.""" + self.pipe[1].send(("ping",)) + select.select([self.pipe[1]], [], [], timeout or self._timeout) + if self.pipe[1].poll(): + return self.pipe[1].recv() == "pong" + + return False + + def __setitem__(self, key, value): + self.pipe[1].send(("set", key, value)) + + def __getitem__(self, key): + self.pipe[1].send(("get", key)) + + select.select([self.pipe[1]], [], [], self._timeout) + if self.pipe[1].poll(): + return self.pipe[1].recv() + + logger.error("select.select timed out") + return None + + def __del__(self): + self._stop() diff --git a/tests/test.py b/tests/test.py index aed476a5..59198e32 100644 --- a/tests/test.py +++ b/tests/test.py @@ -522,7 +522,7 @@ from keymapper.groups import groups from keymapper.system_mapping import system_mapping from keymapper.gui.custom_mapping import custom_mapping from keymapper.paths import get_config_path -from keymapper.injection.macros import macro_variables +from keymapper.injection.macros.macro import macro_variables from keymapper.injection.consumers.keycode_mapper import active_macros, unreleased # no need for a high number in tests diff --git a/tests/testcases/test_injector.py b/tests/testcases/test_injector.py index a151f93c..668b2265 100644 --- a/tests/testcases/test_injector.py +++ b/tests/testcases/test_injector.py @@ -62,7 +62,7 @@ from keymapper.gui.custom_mapping import custom_mapping from keymapper.mapping import Mapping, DISABLE_CODE, DISABLE_NAME from keymapper.config import config, NONE, MOUSE, WHEEL, BUTTONS from keymapper.key import Key -from keymapper.injection.macros import parse +from keymapper.injection.macros.parse import parse from keymapper.injection.context import Context from keymapper.groups import groups, classify, GAMEPAD diff --git a/tests/testcases/test_keycode_mapper.py b/tests/testcases/test_keycode_mapper.py index 64ae7c3c..6f694f4b 100644 --- a/tests/testcases/test_keycode_mapper.py +++ b/tests/testcases/test_keycode_mapper.py @@ -42,7 +42,7 @@ from keymapper.injection.consumers.keycode_mapper import ( subsets, ) from keymapper.system_mapping import system_mapping -from keymapper.injection.macros import parse +from keymapper.injection.macros.parse import parse from keymapper.injection.context import Context from keymapper.utils import RELEASE, PRESS from keymapper.config import config, BUTTONS diff --git a/tests/testcases/test_macros.py b/tests/testcases/test_macros.py index c71fb9a8..8fe73fbb 100644 --- a/tests/testcases/test_macros.py +++ b/tests/testcases/test_macros.py @@ -26,9 +26,9 @@ import multiprocessing from evdev.ecodes import EV_REL, EV_KEY, REL_Y, REL_X, REL_WHEEL, REL_HWHEEL -from keymapper.injection.macros import ( +from keymapper.injection.macros.macro import Macro, type_check +from keymapper.injection.macros.parse import ( parse, - _Macro, _extract_params, is_this_a_macro, _parse_recurse, @@ -68,6 +68,23 @@ class TestMacros(unittest.IsolatedAsyncioTestCase): print(f"\033[90mmacro wrote{(ev_type, code, value)}\033[0m") self.result.append((ev_type, code, value)) + def test_type_check(self): + # allows params that can be cast to the target type + self.assertEqual(type_check("foo", 1, [str, None], 0), "1") + self.assertEqual(type_check("foo", "1", [int, None], 1), 1) + self.assertEqual(type_check("foo", 1.2, [str], 2), "1.2") + + self.assertRaises(TypeError, lambda: type_check("foo", "1.2", [int], 3), 1.2) + self.assertRaises(TypeError, lambda: type_check("foo", "a", [None], 0)) + self.assertRaises(TypeError, lambda: type_check("foo", "a", [int], 1)) + self.assertRaises(TypeError, lambda: type_check("foo", "a", [int, float], 2)) + self.assertRaises(TypeError, lambda: type_check("foo", "a", [int, float, None], 3)) + self.assertEqual(type_check("foo", "a", [int, float, None, str], 4), "a") + + self.assertRaises(TypeError, lambda: type_check("foo", "a", [Macro], 0)) + self.assertRaises(TypeError, lambda: type_check("foo", 1, [Macro], 0)) + self.assertEqual(type_check("foo", "1", [Macro, int], 4), 1) + async def test_is_this_a_macro(self): self.assertTrue(is_this_a_macro("k(1)")) self.assertTrue(is_this_a_macro("k(1).k(2)")) @@ -214,36 +231,80 @@ class TestMacros(unittest.IsolatedAsyncioTestCase): self.assertIn("bracket", error) error = parse("k((1).k)", self.context, return_errors=True) self.assertIsNotNone(error) - error = parse("r(a, k(1))", self.context, return_errors=True) - self.assertIsNotNone(error) error = parse("k()", self.context, return_errors=True) self.assertIsNotNone(error) error = parse("k(1)", self.context, return_errors=True) self.assertIsNone(error) error = parse("k(1, 1)", self.context, return_errors=True) self.assertIsNotNone(error) + error = parse("h(1, 1)", self.context, return_errors=True) self.assertIsNotNone(error) error = parse("h(h(h(1, 1)))", self.context, return_errors=True) self.assertIsNotNone(error) + error = parse("r(1)", self.context, return_errors=True) self.assertIsNotNone(error) + error = parse("r(a, k(1))", self.context, return_errors=True) + self.assertIsNotNone(error) error = parse("r(1, 1)", self.context, return_errors=True) self.assertIsNotNone(error) error = parse("r(k(1), 1)", self.context, return_errors=True) self.assertIsNotNone(error) + error = parse("r(1.2, k(1))", self.context, return_errors=True) + self.assertIsNotNone(error) error = parse("r(1, k(1))", self.context, return_errors=True) self.assertIsNone(error) + error = parse("m(asdf, k(a))", self.context, return_errors=True) self.assertIsNotNone(error) + error = parse("if_tap(, k(a), 1000)", self.context, return_errors=True) self.assertIsNone(error) + error = parse("if_tap(, k(a))", self.context, return_errors=True) + self.assertIsNone(error) + error = parse("if_tap(k(a),)", self.context, return_errors=True) + self.assertIsNone(error) + error = parse("if_tap(k(a), b)", self.context, return_errors=True) + self.assertIsNotNone(error) + error = parse("if_single(k(a),)", self.context, return_errors=True) self.assertIsNone(error) + error = parse("if_single(1,)", self.context, return_errors=True) + self.assertIsNotNone(error) + error = parse("if_single(,1)", self.context, return_errors=True) + self.assertIsNotNone(error) + + error = parse("mouse(up, 3)", self.context, return_errors=True) + self.assertIsNone(error) + error = parse("mouse(3, up)", self.context, return_errors=True) + self.assertIsNotNone(error) + + error = parse("wheel(left, 3)", self.context, return_errors=True) + self.assertIsNone(error) + error = parse("wheel(3, left)", self.context, return_errors=True) + self.assertIsNotNone(error) + + error = parse("w(2)", self.context, return_errors=True) + self.assertIsNone(error) + error = parse("w(a)", self.context, return_errors=True) + self.assertIsNotNone(error) + + error = parse("ifeq(a, 2, k(a),)", self.context, return_errors=True) + self.assertIsNone(error) + error = parse("ifeq(a, 2, , k(a))", self.context, return_errors=True) + self.assertIsNone(error) + error = parse("ifeq(a, 2, 1,)", self.context, return_errors=True) + self.assertIsNotNone(error) + error = parse("ifeq(a, 2, , 2)", self.context, return_errors=True) + self.assertIsNotNone(error) + error = parse("foo(a)", self.context, return_errors=True) self.assertIn("unknown", error.lower()) self.assertIn("foo", error) + + async def test_hold(self): # repeats k(a) as long as the key is held down macro = parse("k(1).h(k(a)).k(3)", self.context) @@ -490,7 +551,7 @@ class TestMacros(unittest.IsolatedAsyncioTestCase): async def test_6(self): # does nothing without .run macro = parse("k(a).r(3, k(b))", self.context) - self.assertIsInstance(macro, _Macro) + self.assertIsInstance(macro, Macro) self.assertListEqual(self.result, []) async def test_keystroke_sleep_config(self):