diff --git a/keymapper/dev/event_producer.py b/keymapper/dev/event_producer.py index 55df38f5..8247c49d 100644 --- a/keymapper/dev/event_producer.py +++ b/keymapper/dev/event_producer.py @@ -99,7 +99,6 @@ class EventProducer: except OverflowError: # screwed up the calculation of mouse movements logger.error('OverflowError (%s, %s, %s)', ev_type, keycode, value) - pass def debounce(self, debounce_id, func, args, ticks): """Debounce a function call. diff --git a/keymapper/dev/injector.py b/keymapper/dev/injector.py index 5f40d4ae..f97419a5 100644 --- a/keymapper/dev/injector.py +++ b/keymapper/dev/injector.py @@ -33,7 +33,7 @@ from evdev.ecodes import EV_KEY, EV_REL from keymapper.logger import logger from keymapper.getdevices import get_devices, is_gamepad -from keymapper.dev.keycode_mapper import handle_keycode +from keymapper.dev.keycode_mapper import KeycodeMapper from keymapper.dev import utils from keymapper.dev.event_producer import EventProducer from keymapper.dev.macros import parse, is_this_a_macro @@ -515,6 +515,11 @@ class Injector: source.path, source.fd ) + keycode_handler = KeycodeMapper( + source, self.mapping, uinput, + self._key_to_code, macros + ) + async for event in source.async_read_loop(): if self._event_producer.is_handled(event): # the event_producer will take care of it @@ -522,14 +527,11 @@ class Injector: continue # for mapped stuff - if utils.should_map_event_as_btn(source, event, self.mapping): + if utils.should_map_event_as_btn(event, self.mapping): will_report_key_up = utils.will_report_key_up(event) - handle_keycode( - self._key_to_code, - macros, + keycode_handler.handle_keycode( event, - uinput, ) if not will_report_key_up: @@ -538,11 +540,9 @@ class Injector: release = evdev.InputEvent(0, 0, event.type, event.code, 0) self._event_producer.debounce( debounce_id=(event.type, event.code, event.value), - func=handle_keycode, + func=keycode_handler.handle_keycode, args=( - self._key_to_code, macros, release, - uinput, False ), ticks=3, @@ -551,7 +551,6 @@ class Injector: continue # forward the rest - # TODO triggers should retain their original value if not mapped uinput.write(event.type, event.code, event.value) # this already includes SYN events, so need to syn here again diff --git a/keymapper/dev/keycode_mapper.py b/keymapper/dev/keycode_mapper.py index 8f4f8c5c..9b9c6fa5 100644 --- a/keymapper/dev/keycode_mapper.py +++ b/keymapper/dev/keycode_mapper.py @@ -29,43 +29,46 @@ from evdev.ecodes import EV_KEY, EV_ABS from keymapper.logger import logger from keymapper.mapping import DISABLE_CODE +from keymapper.dev import utils -# maps mouse buttons to macro instances that have been executed. They may -# still be running or already be done. Just like unreleased, this is a -# mapping of (type, code). The value is not included in the key, because -# a key release event with a value of 0 needs to be able to find the -# running macro. The downside is that a d-pad cannot execute two macros at -# once, one for each direction. Only sequentially.W +# this state is shared by all KeycodeMappers of this process + +# maps mouse buttons to macro instances that have been executed. +# They may still be running or already be done. Just like unreleased, +# this is a mapping of (type, code). The value is not included in the +# key, because a key release event with a value of 0 needs to be able +# to find the running macro. The downside is that a d-pad cannot +# execute two macros at once, one for each direction. +# Only sequentially. active_macros = {} -# mapping of future release event (type, code) to (output, input event), -# with output being a tuple of (type, code) as well. All key-up events -# have a value of 0, so it is not added to the tuple. -# This is needed in order to release the correct event mapped on a -# D-Pad. Each direction on one D-Pad axis reports the same type and -# code, but different values. There cannot be both at the same time, -# as pressing one side of a D-Pad forces the other side to go up. -# If both sides of a D-Pad are mapped to different event-codes, this data -# structure helps to figure out which of those two to release on an event -# of value 0. Same goes for the Wheel. -# The input event is remembered to make sure no duplicate down-events are -# written. Since wheels report a lot of "down" events that don't serve -# any purpose when mapped to a key, those duplicate down events should be -# removed. If the same type and code arrives but with a different value -# (direction), there must be a way to check if the event is actually a -# duplicate and not a different event. +# mapping of future release event (type, code) to an Unreleased object, +# All key-up events have a value of 0, so it is not added to +# the tuple. This is needed in order to release the correct event +# mapped on a D-Pad. Each direction on one D-Pad axis reports the +# same type and code, but different values. There cannot be both at +# the same time, as pressing one side of a D-Pad forces the other +# side to go up. If both sides of a D-Pad are mapped to different +# event-codes, this data structure helps to figure out which of those +# two to release on an event of value 0. Same goes for the Wheel. +# The input event is remembered to make sure no duplicate down-events +# are written. Since wheels report a lot of "down" events that don't +# serve any purpose when mapped to a key, those duplicate down events +# should be removed. If the same type and code arrives but with a +# different value (direction), there must be a way to check if the +# event is actually a duplicate and not a different event. unreleased = {} -def is_key_down(event): - """Is this event a key press.""" - return event.value != 0 +def is_key_down(value): + """Is this event value a key press.""" + return value != 0 -def is_key_up(event): - """Is this event a key release.""" - return event.value == 0 +def is_key_up(value): + """Is this event value a key release.""" + return value == 0 def write(uinput, key): @@ -84,6 +87,8 @@ def subsets(combination): If combination is only one element long it returns an empty list, because it's not a combination and there is no reason to iterate. + Includes the complete input as well. + Parameters ----------- combination : tuple @@ -121,27 +126,41 @@ class Unreleased: self.target_type_code = target_type_code self.input_event_tuple = input_event_tuple self.key = key + + if ( + not isinstance(input_event_tuple[0], int) or + len(input_event_tuple) != 3 + ): + raise ValueError( + 'Expected input_event_tuple to be a 3-tuple of ints, but ' + f'got {input_event_tuple}' + ) + unreleased[input_event_tuple[:2]] = self def __str__(self): return ( - f'target{self.target_type_code} ' - f'input{self.input_event_tuple} ' - f'key{self.key}' + 'Unreleased(' + f'target{self.target_type_code},' + f'input{self.input_event_tuple},' + f'key{"(None)" if self.key is None else self.key}' + ')' ) + def __repr__(self): + return self.__str__() + -def find_by_event(event): +def find_by_event(key): """Find an unreleased entry by an event. - If such an entry exists, it was created by an event that is exactly like - the input parameter (except for the timestamp). + If such an entry exists, it was created by an event that is exactly + like the input parameter (except for the timestamp). That doesn't mean it triggered something, only that it was seen before. """ - unreleased_entry = unreleased.get((event.type, event.code)) - event_tuple = (event.type, event.code, event.value) - if unreleased_entry and unreleased_entry.input_event_tuple == event_tuple: + unreleased_entry = unreleased.get(key[:2]) + if unreleased_entry and unreleased_entry.input_event_tuple == key: return unreleased_entry return None @@ -150,9 +169,9 @@ def find_by_event(event): def find_by_key(key): """Find an unreleased entry by a combination of keys. - If such an entry exist, it was created when a combination of keys (which - matches the parameter) (can also be of len 1 = single key) ended - up triggering something. + If such an entry exist, it was created when a combination of keys + (which matches the parameter) (can also be of len 1 = single key) + ended up triggering something. Parameters ---------- @@ -165,65 +184,6 @@ def find_by_key(key): return None -def _get_key(event, key_to_code, macros): - """If the event triggers stuff, get the key for that. - - This key can be used to index `key_to_code` and `macros` and it might - be a combination of keys. - - Otherwise, for unmapped events, returns the input. - - The return format is always a tuple of 3-tuples, each 3-tuple being - type, code, value (int, int, int) - """ - # The key used to index the mappings `key_to_code` and `macros`. - # If the key triggers a combination, the returned key will be that one - # instead - key = ((event.type, event.code, event.value),) - - unreleased_entry = find_by_event(event) - if unreleased_entry is not None and unreleased_entry.key is not None: - # seen before. If this key triggered a combination, - # use the combination that was triggered by this as key. - return unreleased_entry.key - - if is_key_down(event): - # get the key/combination that the key-down would trigger - - # the triggering key-down has to be the last element in combination, - # all others can have any arbitrary order. By checking all unreleased - # keys, a + b + c takes priority over b + c, if both mappings exist. - # WARNING! the combination-down triggers, but a single key-up releases. - # Do not check if key in macros and such, if it is an up event. It's - # going to be False. - combination = tuple([ - value.input_event_tuple for value - in unreleased.values() - ]) - if key[0] not in combination: # might be a duplicate-down event - combination += key - - # find any triggered combination. macros and key_to_code contain - # every possible equivalent permutation of possible macros. The last - # key in the combination needs to remain the newest key though. - for subset in subsets(combination): - if subset[-1] != key[0]: - # only combinations that are completed and triggered by the - # newest input are of interest - continue - - if subset in macros or subset in key_to_code: - key = subset - break - else: - # no subset found, just use the key. all indices are tuples of - # tuples, both for combinations and single keys. - if event.value == 1 and len(combination) > 1: - logger.key_spam(combination, 'unknown combination') - - return key - - def print_unreleased(): """For debugging purposes.""" logger.debug('unreleased:') @@ -232,139 +192,258 @@ def print_unreleased(): ])) -def handle_keycode(key_to_code, macros, event, uinput, forward=True): - """Write mapped keycodes, forward unmapped ones and manage macros. +class KeycodeMapper: + def __init__(self, source, mapping, uinput, key_to_code, macros): + """Create a keycode mapper for one virtual device. - As long as the provided event is mapped it will handle it, it won't - check any type, code or capability anymore. Otherwise it forwards - it as it is. + There may be multiple KeycodeMappers for one hardware device. They + share some state (unreleased and active_macros) with each other. - Parameters - ---------- - key_to_code : dict - mapping of ((type, code, value),) to linux-keycode - or multiple of those like ((...), (...), ...) for combinations - combinations need to be present in every possible valid ordering. - e.g. shift + alt + a and alt + shift + a - macros : dict - mapping of ((type, code, value),) to _Macro objects. - Combinations work similar as in key_to_code - event : evdev.InputEvent - forward : bool - if False, will not forward the event if it didn't trigger any mapping - """ - if event.type == EV_KEY and event.value == 2: - # button-hold event. Linux creates them on its own for the - # injection-fake-device if the release event won't appear, - # no need to forward or map them. - return - - # the tuple of the actual input event. Used to forward the event if it is - # not mapped, and to index unreleased and active_macros. stays constant - event_tuple = (event.type, event.code, event.value) - type_code = (event.type, event.code) - active_macro = active_macros.get(type_code) - - key = _get_key(event, key_to_code, macros) - is_mapped = key in macros or key in key_to_code - - """Releasing keys and macros""" - - if is_key_up(event): - if active_macro is not None and active_macro.is_holding(): - # Tell the macro for that keycode that the key is released and - # let it decide what to do with that information. - active_macro.release_key() - logger.key_spam(key, 'releasing macro') - - if type_code in unreleased: - # figure out what this release event was for - target_type, target_code = unreleased[type_code].target_type_code - del unreleased[type_code] - - if target_code == DISABLE_CODE: - logger.key_spam(key, 'releasing disabled key') - elif target_code is None: - logger.key_spam(key, 'releasing key') - elif type_code != (target_type, target_code): - # release what the input is mapped to - logger.key_spam(key, 'releasing %s', target_code) - write(uinput, (target_type, target_code, 0)) - elif forward: - # forward the release event - logger.key_spam(key, 'forwarding release') - write(uinput, (target_type, target_code, 0)) + Parameters + ---------- + source : InputDevice + where events used in handle_keycode come from + mapping : Mapping + the mapping that is the source of key_to_code and macros, + only used to query config values. + uinput : UInput: + where to inject events to + key_to_code : dict + mapping of ((type, code, value),) to linux-keycode + or multiple of those like ((...), (...), ...) for combinations + combinations need to be present in every possible valid ordering. + e.g. shift + alt + a and alt + shift + a. + This is needed to query keycodes more efficiently without having + to search mapping each time. + macros : dict + mapping of ((type, code, value),) to _Macro objects. + Combinations work similar as in key_to_code + """ + self.source = source + self.max_abs = utils.get_max_abs(source) + self.mapping = mapping + self.uinput = uinput + + # some type checking, prevents me from forgetting what that stuff + # is supposed to be when writing tests. + # TODO create that stuff (including macros) from mapping here instead + # of the injector + for key in key_to_code: + for sub_key in key: + if abs(sub_key[2]) > 1: + raise ValueError( + f'Expected values to be one of -1, 0 or 1, ' + f'but got {key}' + ) + + self.key_to_code = key_to_code + self.macros = macros + + def _get_key(self, key): + """If the event triggers stuff, get the key for that. + + This key can be used to index `key_to_code` and `macros` and it might + be a combination of keys. + + Otherwise, for unmapped events, returns the input. + + The return format is always a tuple of 3-tuples, each 3-tuple being + type, code, value (int, int, int) + + Parameters + ---------- + key : int, int, int + 3-tuple of type, code, value + Value should be one of -1, 0 or 1 + """ + unreleased_entry = find_by_event(key) + + # The key used to index the mappings `key_to_code` and `macros`. + # If the key triggers a combination, the returned key will be that one + # instead + value = key[2] + key = (key,) + + if unreleased_entry is not None and unreleased_entry.key is not None: + # seen before. If this key triggered a combination, + # use the combination that was triggered by this as key. + return unreleased_entry.key + + if is_key_down(value): + # get the key/combination that the key-down would trigger + + # the triggering key-down has to be the last element in + # combination, all others can have any arbitrary order. By + # checking all unreleased keys, a + b + c takes priority over + # b + c, if both mappings exist. + # WARNING! the combination-down triggers, but a single key-up + # releases. Do not check if key in macros and such, if it is an + # up event. It's going to be False. + combination = tuple([ + value.input_event_tuple for value + in unreleased.values() + ]) + if key[0] not in combination: # might be a duplicate-down event + combination += key + + # find any triggered combination. macros and key_to_code contain + # every possible equivalent permutation of possible macros. The + # last key in the combination needs to remain the newest key + # though. + for subset in subsets(combination): + if subset[-1] != key[0]: + # only combinations that are completed and triggered by + # the newest input are of interest + continue + + if subset in self.macros or subset in self.key_to_code: + key = subset + break else: - logger.key_spam(key, 'not forwarding release') - elif event.type != EV_ABS: - # ABS events might be spammed like crazy every time the position - # slightly changes - logger.key_spam(key, 'unexpected key up') - - # everything that can be released is released now - return - - """Filtering duplicate key downs""" - - if is_mapped and is_key_down(event): - # unmapped keys should not be filtered here, they should just - # be forwarded to populate unreleased and then be written. - - if find_by_key(key) is not None: - # this key/combination triggered stuff before. - # duplicate key-down. skip this event. Avoid writing millions of - # key-down events when a continuous value is reported, for example - # for gamepad triggers or mouse-wheel-side buttons - logger.key_spam(key, 'duplicate key down') + # no subset found, just use the key. all indices are tuples of + # tuples, both for combinations and single keys. + if value == 1 and len(combination) > 1: + logger.key_spam(combination, 'unknown combination') + + return key + + def handle_keycode(self, event, forward=True): + """Write mapped keycodes, forward unmapped ones and manage macros. + + As long as the provided event is mapped it will handle it, it won't + check any type, code or capability anymore. Otherwise it forwards + it as it is. + + Parameters + ---------- + event : evdev.InputEvent + forward : bool + if False, will not forward the event if it didn't trigger any + mapping + """ + if event.type == EV_KEY and event.value == 2: + # button-hold event. Linux creates them on its own for the + # injection-fake-device if the release event won't appear, + # no need to forward or map them. return - # it would start a macro usually - if key in macros and active_macro is not None and active_macro.running: - # for key-down events and running macros, don't do anything. - # This avoids spawning a second macro while the first one is not - # finished, especially since gamepad-triggers report a ton of - # events with a positive value. - logger.key_spam(key, 'macro already running') + # normalize event numbers to one of -1, 0, +1. Otherwise + # mapping trigger values that are between 1 and 255 is not + # possible, because they might skip the 1 when pressed fast + # enough. + original_tuple = (event.type, event.code, event.value) + event.value = utils.normalize_value(event, self.max_abs) + + # the tuple of the actual input event. Used to forward the event if + # it is not mapped, and to index unreleased and active_macros. stays + # constant + event_tuple = (event.type, event.code, event.value) + type_code = (event.type, event.code) + active_macro = active_macros.get(type_code) + + key = self._get_key(event_tuple) + is_mapped = key in self.macros or key in self.key_to_code + + """Releasing keys and macros""" + + if is_key_up(event.value): + if active_macro is not None and active_macro.is_holding(): + # Tell the macro for that keycode that the key is released and + # let it decide what to do with that information. + active_macro.release_key() + logger.key_spam(key, 'releasing macro') + + if type_code in unreleased: + # figure out what this release event was for + target_type, target_code = ( + unreleased[type_code].target_type_code + ) + del unreleased[type_code] + + if target_code == DISABLE_CODE: + logger.key_spam(key, 'releasing disabled key') + elif target_code is None: + logger.key_spam(key, 'releasing key') + elif type_code != (target_type, target_code): + # release what the input is mapped to + logger.key_spam(key, 'releasing %s', target_code) + write(self.uinput, (target_type, target_code, 0)) + elif forward: + # forward the release event + logger.key_spam((original_tuple,), 'forwarding release') + write(self.uinput, original_tuple) + else: + logger.key_spam(key, 'not forwarding release') + elif event.type != EV_ABS: + # ABS events might be spammed like crazy every time the + # position slightly changes + logger.key_spam(key, 'unexpected key up') + + # everything that can be released is released now return - """starting new macros or injecting new keys""" + """Filtering duplicate key downs""" - if is_key_down(event): - # also enter this for unmapped keys, as they might end up triggering - # a combination, so they should be remembered in unreleased + if is_mapped and is_key_down(event.value): + # unmapped keys should not be filtered here, they should just + # be forwarded to populate unreleased and then be written. - if key in macros: - macro = macros[key] - active_macros[type_code] = macro - Unreleased((None, None), event_tuple, key) - macro.press_key() - logger.key_spam(key, 'maps to macro %s', macro.code) - asyncio.ensure_future(macro.run()) - return + if find_by_key(key) is not None: + # this key/combination triggered stuff before. + # duplicate key-down. skip this event. Avoid writing millions + # of key-down events when a continuous value is reported, for + # example for gamepad triggers or mouse-wheel-side buttons + logger.key_spam(key, 'duplicate key down') + return + + # it would start a macro usually + if key in self.macros and active_macro and active_macro.running: + # for key-down events and running macros, don't do anything. + # This avoids spawning a second macro while the first one is + # not finished, especially since gamepad-triggers report a ton + # of events with a positive value. + logger.key_spam(key, 'macro already running') + return + + """starting new macros or injecting new keys""" - if key in key_to_code: - target_code = key_to_code[key] - # remember the key that triggered this - # (this combination or this single key) - Unreleased((EV_KEY, target_code), event_tuple, key) + if is_key_down(event.value): + # also enter this for unmapped keys, as they might end up + # triggering a combination, so they should be remembered in + # unreleased - if target_code == DISABLE_CODE: - logger.key_spam(key, 'disabled') + if key in self.macros: + macro = self.macros[key] + active_macros[type_code] = macro + Unreleased((None, None), event_tuple, key) + macro.press_key() + logger.key_spam(key, 'maps to macro %s', macro.code) + asyncio.ensure_future(macro.run()) return - logger.key_spam(key, 'maps to %s', target_code) - write(uinput, (EV_KEY, target_code, 1)) - return + if key in self.key_to_code: + target_code = self.key_to_code[key] + # remember the key that triggered this + # (this combination or this single key) + Unreleased((EV_KEY, target_code), event_tuple, key) - if forward: - logger.key_spam(key, 'forwarding') - write(uinput, event_tuple) - else: - logger.key_spam(key, 'not forwarding') + if target_code == DISABLE_CODE: + logger.key_spam(key, 'disabled') + return - # unhandled events may still be important for triggering combinations - # later, so remember them as well. - Unreleased((event_tuple[:2]), event_tuple, None) - return + logger.key_spam(key, 'maps to %s', target_code) + write(self.uinput, (EV_KEY, target_code, 1)) + return + + if forward: + logger.key_spam((original_tuple,), 'forwarding') + write(self.uinput, original_tuple) + else: + logger.key_spam((event_tuple,), 'not forwarding') + + # unhandled events may still be important for triggering + # combinations later, so remember them as well. + Unreleased((event_tuple[:2]), event_tuple, None) + return - logger.error('%s unhandled', key) + logger.error('%s unhandled', key) diff --git a/keymapper/dev/reader.py b/keymapper/dev/reader.py index 5b0169f2..4317d1aa 100644 --- a/keymapper/dev/reader.py +++ b/keymapper/dev/reader.py @@ -174,9 +174,12 @@ class _KeycodeReader: # which breaks the current workflow. return - if not utils.should_map_event_as_btn(device, event, custom_mapping): + if not utils.should_map_event_as_btn(event, custom_mapping): return + max_abs = utils.get_max_abs(device) + event.value = utils.normalize_value(event, max_abs) + self._pipe[1].send(event) def _read_worker(self): @@ -267,7 +270,6 @@ class _KeycodeReader: # have to trigger anything, manage any macros and only # reports key-down events. This function is called periodically # by the window. - if self._pipe is None: self.fail_counter += 1 if self.fail_counter % 10 == 0: # spam less @@ -293,11 +295,8 @@ class _KeycodeReader: self._release(type_code) continue - key_down_received = True - if self._unreleased.get(type_code) == event_tuple: - if event.type != EV_ABS: # spams a lot - logger.key_spam(event_tuple, 'duplicate key down') + logger.key_spam(event_tuple, 'duplicate key down') self._debounce_start(event_tuple) continue @@ -318,7 +317,10 @@ class _KeycodeReader: previous_event.value ) if prev_tuple[:2] in self._unreleased: - logger.key_spam(prev_tuple, 'ignoring previous event') + logger.key_spam( + event_tuple, + 'ignoring previous event %s', prev_tuple + ) self._release(prev_tuple[:2]) # to keep track of combinations. @@ -326,6 +328,7 @@ class _KeycodeReader: # event for a D-Pad axis might be any direction, hence this maps # from release to input in order to remember it. Since all release # events have value 0, the value is not used in the key. + key_down_received = True logger.key_spam(event_tuple, 'down') self._unreleased[type_code] = event_tuple self._debounce_start(event_tuple) diff --git a/keymapper/dev/utils.py b/keymapper/dev/utils.py index 2282091c..86460add 100644 --- a/keymapper/dev/utils.py +++ b/keymapper/dev/utils.py @@ -41,12 +41,13 @@ JOYSTICK = [ ] -# a third of a quarter circle +# a third of a quarter circle, so that each quarter is divided in 3 areas: +# up, left and up-left. That makes up/down/left/right larger than the +# overlapping sections though, maybe it should be 8 equal areas though, idk JOYSTICK_BUTTON_THRESHOLD = math.sin((math.pi / 2) / 3 * 1) def sign(value): - """Get the sign of the value, or 0 if 0.""" if value > 0: return 1 @@ -56,6 +57,23 @@ def sign(value): return 0 +def normalize_value(event, max_abs): + """Fit the event value to one of 0, 1 or -1.""" + if event.type == EV_ABS and event.code in JOYSTICK: + if max_abs is None: + logger.error( + 'Got %s, but max_abs is %s', + (event.type, event.code, event.value), max_abs + ) + return event.value + + threshold = max_abs * JOYSTICK_BUTTON_THRESHOLD + triggered = abs(event.value) > threshold + return sign(event.value) if triggered else 0 + + return sign(event.value) + + def is_wheel(event): """Check if this is a wheel event.""" return event.type == EV_REL and event.code in [REL_WHEEL, REL_HWHEEL] @@ -66,7 +84,7 @@ def will_report_key_up(event): return not is_wheel(event) -def should_map_event_as_btn(device, event, mapping): +def should_map_event_as_btn(event, mapping): """Does this event describe a button. If it does, this function will make sure its value is one of [-1, 0, 1], @@ -93,30 +111,12 @@ def should_map_event_as_btn(device, event, mapping): l_purpose = mapping.get('gamepad.joystick.left_purpose') r_purpose = mapping.get('gamepad.joystick.right_purpose') - max_abs = get_max_abs(device) - - if max_abs is None: - logger.error( - 'Got %s, but max_abs is %s', - (event.type, event.code, event.value), max_abs - ) - return False - - threshold = max_abs * JOYSTICK_BUTTON_THRESHOLD - triggered = abs(event.value) > threshold - if event.code in [ABS_X, ABS_Y] and l_purpose == BUTTONS: - event.value = sign(event.value) if triggered else 0 return True if event.code in [ABS_RX, ABS_RY] and r_purpose == BUTTONS: - event.value = sign(event.value) if triggered else 0 return True else: - # normalize event numbers to one of -1, 0, +1. Otherwise mapping - # trigger values that are between 1 and 255 is not possible, - # because they might skip the 1 when pressed fast enough. - event.value = sign(event.value) return True return False diff --git a/keymapper/key.py b/keymapper/key.py index b6777c55..4d31bb7e 100644 --- a/keymapper/key.py +++ b/keymapper/key.py @@ -100,6 +100,10 @@ class Key: def __str__(self): return f'Key{str(self.keys)}' + def __repr__(self): + # used in the AssertionError output of tests + return self.__str__() + def __hash__(self): if len(self.keys) == 1: return hash(self.keys[0]) diff --git a/keymapper/logger.py b/keymapper/logger.py index 352cab1b..2b7acf31 100644 --- a/keymapper/logger.py +++ b/keymapper/logger.py @@ -32,6 +32,8 @@ SPAM = 5 start = time.time() +previous_key_spam = None + def spam(self, message, *args, **kwargs): """Log a more-verbose message than debug.""" @@ -52,6 +54,9 @@ def key_spam(self, key, msg, *args): """ if not self.isEnabledFor(SPAM): return + + global previous_key_spam + msg = msg % args str_key = str(key) str_key = str_key.replace(',)', ')') @@ -59,6 +64,13 @@ def key_spam(self, key, msg, *args): if len(spacing) == 1: spacing = '' msg = f'{str_key}{spacing} {msg}' + + if msg == previous_key_spam: + # avoid some super spam from EV_ABS events + return + + previous_key_spam = msg + self._log(SPAM, msg, args=None) diff --git a/readme/pylint.svg b/readme/pylint.svg index 958382ca..2162ea00 100644 --- a/readme/pylint.svg +++ b/readme/pylint.svg @@ -17,7 +17,7 @@ pylint - 9.73 - 9.73 + 9.72 + 9.72 \ No newline at end of file diff --git a/tests/testcases/test_dev_utils.py b/tests/testcases/test_dev_utils.py index 459b9046..d3b0c2ec 100644 --- a/tests/testcases/test_dev_utils.py +++ b/tests/testcases/test_dev_utils.py @@ -28,7 +28,6 @@ from evdev.ecodes import EV_KEY, EV_ABS, ABS_HAT0X, KEY_A, \ from keymapper.config import config, BUTTONS from keymapper.mapping import Mapping from keymapper.dev import utils -from keymapper.key import Key from tests.test import new_event, InputDevice, MAX_ABS @@ -51,12 +50,11 @@ class TestDevUtils(unittest.TestCase): self.assertFalse(utils.is_wheel(new_event(EV_ABS, ABS_HAT0X, -1))) def test_should_map_event_as_btn(self): - device = InputDevice('/dev/input/event30') mapping = Mapping() # the function name is so horribly long def do(event): - return utils.should_map_event_as_btn(device, event, mapping) + return utils.should_map_event_as_btn(event, mapping) """D-Pad""" @@ -86,29 +84,31 @@ class TestDevUtils(unittest.TestCase): self.assertFalse(do(new_event(EV_ABS, ecodes.ABS_RX, 1234))) self.assertFalse(do(new_event(EV_ABS, ecodes.ABS_Y, -1))) + self.assertFalse(do(new_event(EV_ABS, ecodes.ABS_RY, -1))) - mapping.set('gamepad.joystick.left_purpose', BUTTONS) + mapping.set('gamepad.joystick.right_purpose', BUTTONS) + config.set('gamepad.joystick.left_purpose', BUTTONS) + + self.assertTrue(do(new_event(EV_ABS, ecodes.ABS_Y, -1))) + self.assertTrue(do(new_event(EV_ABS, ecodes.ABS_RY, -1))) + + def test_normalize_value(self): + def do(event): + return utils.normalize_value(event, MAX_ABS) - # the event.value should be modified for the left joystick - # to one of 0, -1 or 1 event = new_event(EV_ABS, ecodes.ABS_RX, MAX_ABS) - self.assertFalse(do(event)) - self.assertEqual(event.value, MAX_ABS) + self.assertEqual(do(event), 1) event = new_event(EV_ABS, ecodes.ABS_Y, -MAX_ABS) - self.assertTrue(do(event)) - self.assertEqual(event.value, -1) + self.assertEqual(do(event), -1) event = new_event(EV_ABS, ecodes.ABS_X, -MAX_ABS // 4) - self.assertTrue(do(event)) - self.assertEqual(event.value, 0) - - config.set('gamepad.joystick.right_purpose', BUTTONS) - + self.assertEqual(do(event), 0) event = new_event(EV_ABS, ecodes.ABS_RX, MAX_ABS) - self.assertTrue(do(event)) - self.assertEqual(event.value, 1) + self.assertEqual(do(event), 1) event = new_event(EV_ABS, ecodes.ABS_Y, MAX_ABS) - self.assertTrue(do(event)) - self.assertEqual(event.value, 1) + self.assertEqual(do(event), 1) event = new_event(EV_ABS, ecodes.ABS_X, MAX_ABS // 4) - self.assertTrue(do(event)) - self.assertEqual(event.value, 0) + self.assertEqual(do(event), 0) + + # if none, it just forwards the value + event = new_event(EV_ABS, ecodes.ABS_RX, MAX_ABS) + self.assertEqual(utils.normalize_value(event, None), MAX_ABS) diff --git a/tests/testcases/test_injector.py b/tests/testcases/test_injector.py index ed4b6995..60e7905d 100644 --- a/tests/testcases/test_injector.py +++ b/tests/testcases/test_injector.py @@ -25,7 +25,8 @@ import copy import evdev from evdev.ecodes import EV_REL, EV_KEY, EV_ABS, ABS_HAT0X, BTN_LEFT, KEY_A, \ - REL_X, REL_Y, REL_WHEEL, REL_HWHEEL, BTN_A, ABS_X, ABS_Y + REL_X, REL_Y, REL_WHEEL, REL_HWHEEL, BTN_A, ABS_X, ABS_Y, BTN_NORTH,\ + ABS_Z, ABS_RZ from keymapper.dev.injector import is_numlock_on, set_numlock, \ ensure_numlock, Injector, is_in_capabilities, \ @@ -396,6 +397,33 @@ class TestInjector(unittest.TestCase): self.assertEqual(history.count((EV_KEY, 77, 1)), 2) self.assertEqual(history.count((EV_KEY, 77, 0)), 2) + def test_gamepad_trigger(self): + # map one of the triggers to BTN_NORTH, while the other one + # should be forwarded unchanged + value = MAX_ABS // 2 + pending_events['gamepad'] = [ + new_event(EV_ABS, ABS_Z, value), + new_event(EV_ABS, ABS_RZ, value), + ] + + # ABS_Z -> 77 + # ABS_RZ is not mapped + custom_mapping.change(Key((EV_ABS, ABS_Z, 1)), 'b') + system_mapping._set('b', 77) + self.injector = Injector('gamepad', custom_mapping) + self.injector.start_injecting() + + # wait for the injector to start sending, at most 1s + uinput_write_history_pipe[0].poll(1) + time.sleep(0.2) + + # convert the write history to some easier to manage list + history = read_write_history_pipe() + + print(history) + self.assertEqual(history.count((EV_KEY, 77, 1)), 1) + self.assertEqual(history.count((EV_ABS, ABS_RZ, value)), 1) + def test_gamepad_to_mouse_event_producer(self): custom_mapping.set('gamepad.joystick.left_purpose', MOUSE) custom_mapping.set('gamepad.joystick.right_purpose', NONE) diff --git a/tests/testcases/test_key.py b/tests/testcases/test_key.py index 2cb64277..efad3edb 100644 --- a/tests/testcases/test_key.py +++ b/tests/testcases/test_key.py @@ -34,6 +34,7 @@ class TestKey(unittest.TestCase): self.assertEqual(len(key_1), 2) self.assertEqual(key_1[0], (1, 3, 1)) self.assertEqual(key_1[1], (1, 5, 1)) + self.assertEqual(hash(key_1), hash(((1, 3, 1), (1, 5, 1)))) key_2 = Key((1, 3, 1)) self.assertEqual(str(key_2), 'Key((1, 3, 1),)') diff --git a/tests/testcases/test_keycode_mapper.py b/tests/testcases/test_keycode_mapper.py index 2ae353a0..9cc2cd9f 100644 --- a/tests/testcases/test_keycode_mapper.py +++ b/tests/testcases/test_keycode_mapper.py @@ -24,17 +24,17 @@ import asyncio import time from evdev.ecodes import EV_KEY, EV_ABS, KEY_A, BTN_TL, \ - ABS_HAT0X, ABS_HAT0Y, ABS_HAT1X, ABS_HAT1Y + ABS_HAT0X, ABS_HAT0Y, ABS_HAT1X, ABS_HAT1Y, ABS_Y -from keymapper.dev.keycode_mapper import active_macros, handle_keycode,\ +from keymapper.dev.keycode_mapper import active_macros, KeycodeMapper, \ unreleased, subsets from keymapper.state import system_mapping from keymapper.dev.macros import parse -from keymapper.config import config +from keymapper.config import config, BUTTONS from keymapper.mapping import Mapping, DISABLE_CODE from tests.test import new_event, UInput, uinput_write_history, \ - cleanup + cleanup, InputDevice, MAX_ABS def wait(func, timeout=1.0): @@ -74,6 +74,7 @@ def calculate_event_number(holdtime, before, after): class TestKeycodeMapper(unittest.TestCase): def setUp(self): self.mapping = Mapping() + self.source = InputDevice('/dev/input/event11') def tearDown(self): # make sure all macros are stopped by tests @@ -117,9 +118,15 @@ class TestKeycodeMapper(unittest.TestCase): } uinput = UInput() + + keycode_mapper = KeycodeMapper( + self.source, self.mapping, uinput, + _key_to_code, {} + ) + # a bunch of d-pad key down events at once - handle_keycode(_key_to_code, {}, new_event(*ev_1), uinput) - handle_keycode(_key_to_code, {}, new_event(*ev_4), uinput) + keycode_mapper.handle_keycode(new_event(*ev_1)) + keycode_mapper.handle_keycode(new_event(*ev_4)) self.assertEqual(len(unreleased), 2) self.assertEqual(unreleased.get(ev_1[:2]).target_type_code, (EV_KEY, _key_to_code[(ev_1,)])) @@ -131,13 +138,13 @@ class TestKeycodeMapper(unittest.TestCase): self.assertEqual(unreleased.get(ev_4[:2]).key, (ev_4,)) # release all of them - handle_keycode(_key_to_code, {}, new_event(*ev_3), uinput) - handle_keycode(_key_to_code, {}, new_event(*ev_6), uinput) + keycode_mapper.handle_keycode(new_event(*ev_3)) + keycode_mapper.handle_keycode(new_event(*ev_6)) self.assertEqual(len(unreleased), 0) # repeat with other values - handle_keycode(_key_to_code, {}, new_event(*ev_2), uinput) - handle_keycode(_key_to_code, {}, new_event(*ev_5), uinput) + keycode_mapper.handle_keycode(new_event(*ev_2)) + keycode_mapper.handle_keycode(new_event(*ev_5)) self.assertEqual(len(unreleased), 2) self.assertEqual(unreleased.get(ev_2[:2]).target_type_code, (EV_KEY, _key_to_code[(ev_2,)])) self.assertEqual(unreleased.get(ev_2[:2]).input_event_tuple, ev_2) @@ -145,8 +152,8 @@ class TestKeycodeMapper(unittest.TestCase): self.assertEqual(unreleased.get(ev_5[:2]).input_event_tuple, ev_5) # release all of them again - handle_keycode(_key_to_code, {}, new_event(*ev_3), uinput) - handle_keycode(_key_to_code, {}, new_event(*ev_6), uinput) + keycode_mapper.handle_keycode(new_event(*ev_3)) + keycode_mapper.handle_keycode(new_event(*ev_6)) self.assertEqual(len(unreleased), 0) self.assertEqual(len(uinput_write_history), 8) @@ -168,16 +175,55 @@ class TestKeycodeMapper(unittest.TestCase): up = (EV_KEY, 91, 0) uinput = UInput() - handle_keycode({}, {}, new_event(*down), uinput, False) + keycode_mapper = KeycodeMapper( + self.source, self.mapping, uinput, + {}, {} + ) + + keycode_mapper.handle_keycode(new_event(*down), False) self.assertEqual(unreleased[(EV_KEY, 91)].input_event_tuple, down) self.assertEqual(unreleased[(EV_KEY, 91)].target_type_code, down[:2]) self.assertEqual(len(unreleased), 1) self.assertEqual(uinput.write_count, 0) - handle_keycode({}, {}, new_event(*up), uinput, False) + keycode_mapper.handle_keycode(new_event(*up), False) self.assertEqual(len(unreleased), 0) self.assertEqual(uinput.write_count, 0) + def test_release_joystick_button(self): + # with the left joystick mapped as button, it will release the mapped + # key when it goes back to close to its resting position + ev_1 = (3, 0, MAX_ABS // 10) # release + ev_3 = (3, 0, -MAX_ABS) # press + + uinput = UInput() + + _key_to_code = { + ((3, 0, -1),): 73 + } + + self.mapping.set('gamepad.joystick.left_purpose', BUTTONS) + + # something with gamepad capabilities + source = InputDevice('/dev/input/event30') + + keycode_mapper = KeycodeMapper( + source, self.mapping, uinput, + _key_to_code, {} + ) + + keycode_mapper.handle_keycode(new_event(*ev_3)) + keycode_mapper.handle_keycode(new_event(*ev_1)) + + # array of 3-tuples + history = [a.t for a in uinput_write_history] + + self.assertIn((EV_KEY, 73, 1), history) + self.assertEqual(history.count((EV_KEY, 73, 1)), 1) + + self.assertIn((EV_KEY, 73, 0), history) + self.assertEqual(history.count((EV_KEY, 73, 0)), 1) + def test_dont_filter_unmapped(self): # if an event is not used at all, it should be written into # unmapped but not furthermore modified. For example wheel events @@ -188,15 +234,20 @@ class TestKeycodeMapper(unittest.TestCase): up = (EV_KEY, 91, 0) uinput = UInput() + keycode_mapper = KeycodeMapper( + self.source, self.mapping, uinput, + {}, {} + ) + for _ in range(10): - handle_keycode({}, {}, new_event(*down), uinput) + keycode_mapper.handle_keycode(new_event(*down)) self.assertEqual(unreleased[(EV_KEY, 91)].input_event_tuple, down) self.assertEqual(unreleased[(EV_KEY, 91)].target_type_code, down[:2]) self.assertEqual(len(unreleased), 1) self.assertEqual(uinput.write_count, 10) - handle_keycode({}, {}, new_event(*up), uinput) + keycode_mapper.handle_keycode(new_event(*up)) self.assertEqual(len(unreleased), 0) self.assertEqual(uinput.write_count, 11) @@ -215,9 +266,14 @@ class TestKeycodeMapper(unittest.TestCase): (down_1, down_2): 71 } - handle_keycode(key_to_code, {}, new_event(*down_1), uinput) + keycode_mapper = KeycodeMapper( + self.source, self.mapping, uinput, + key_to_code, {} + ) + + keycode_mapper.handle_keycode(new_event(*down_1)) for _ in range(10): - handle_keycode(key_to_code, {}, new_event(*down_2), uinput) + keycode_mapper.handle_keycode(new_event(*down_2)) # all duplicate down events should have been ignored self.assertEqual(len(unreleased), 2) @@ -225,8 +281,8 @@ class TestKeycodeMapper(unittest.TestCase): self.assertEqual(uinput_write_history[0].t, down_1) self.assertEqual(uinput_write_history[1].t, (EV_KEY, output, 1)) - handle_keycode({}, {}, new_event(*up_1), uinput) - handle_keycode({}, {}, new_event(*up_2), uinput) + keycode_mapper.handle_keycode(new_event(*up_1)) + keycode_mapper.handle_keycode(new_event(*up_2)) self.assertEqual(len(unreleased), 0) self.assertEqual(uinput.write_count, 4) self.assertEqual(uinput_write_history[2].t, up_1) @@ -245,9 +301,15 @@ class TestKeycodeMapper(unittest.TestCase): } uinput = UInput() + + keycode_mapper = KeycodeMapper( + self.source, self.mapping, uinput, + _key_to_code, {} + ) + # a bunch of d-pad key down events at once - handle_keycode(_key_to_code, {}, new_event(*ev_1), uinput) - handle_keycode(_key_to_code, {}, new_event(*ev_2), uinput) + keycode_mapper.handle_keycode(new_event(*ev_1)) + keycode_mapper.handle_keycode(new_event(*ev_2)) # (what_will_be_released, what_caused_the_key_down) self.assertEqual(unreleased.get(ev_1[:2]).target_type_code, (EV_ABS, ABS_HAT0X)) self.assertEqual(unreleased.get(ev_1[:2]).input_event_tuple, ev_1) @@ -261,8 +323,8 @@ class TestKeycodeMapper(unittest.TestCase): self.assertEqual(uinput_write_history[1].t, (EV_KEY, 51, 1)) # release all of them - handle_keycode(_key_to_code, {}, new_event(*ev_3), uinput) - handle_keycode(_key_to_code, {}, new_event(*ev_4), uinput) + keycode_mapper.handle_keycode(new_event(*ev_3)) + keycode_mapper.handle_keycode(new_event(*ev_4)) self.assertEqual(len(unreleased), 0) self.assertEqual(len(uinput_write_history), 4) @@ -276,9 +338,15 @@ class TestKeycodeMapper(unittest.TestCase): } uinput = UInput() - handle_keycode(_key_to_code, {}, new_event(EV_KEY, 1, 1), uinput) - handle_keycode(_key_to_code, {}, new_event(EV_KEY, 3, 1), uinput) - handle_keycode(_key_to_code, {}, new_event(EV_KEY, 2, 1), uinput) + + keycode_mapper = KeycodeMapper( + self.source, self.mapping, uinput, + _key_to_code, {} + ) + + keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 1)) + keycode_mapper.handle_keycode(new_event(EV_KEY, 3, 1)) + keycode_mapper.handle_keycode(new_event(EV_KEY, 2, 1)) self.assertEqual(len(uinput_write_history), 3) self.assertEqual(uinput_write_history[0].t, (EV_KEY, 101, 1)) @@ -292,8 +360,14 @@ class TestKeycodeMapper(unittest.TestCase): } uinput = UInput() - handle_keycode(_key_to_code, {}, new_event(*combination[0]), uinput) - handle_keycode(_key_to_code, {}, new_event(*combination[1]), uinput) + + keycode_mapper = KeycodeMapper( + self.source, self.mapping, uinput, + _key_to_code, {} + ) + + keycode_mapper.handle_keycode(new_event(*combination[0])) + keycode_mapper.handle_keycode(new_event(*combination[1])) self.assertEqual(len(uinput_write_history), 2) # the first event is written and then the triggered combination @@ -301,8 +375,8 @@ class TestKeycodeMapper(unittest.TestCase): self.assertEqual(uinput_write_history[1].t, (EV_KEY, 101, 1)) # release them - handle_keycode(_key_to_code, {}, new_event(*combination[0][:2], 0), uinput) - handle_keycode(_key_to_code, {}, new_event(*combination[1][:2], 0), uinput) + keycode_mapper.handle_keycode(new_event(*combination[0][:2], 0)) + keycode_mapper.handle_keycode(new_event(*combination[1][:2], 0)) # the first key writes its release event. The second key is hidden # behind the executed combination. The result of the combination is # also released, because it acts like a key. @@ -312,8 +386,8 @@ class TestKeycodeMapper(unittest.TestCase): # press them in the wrong order (the wrong key at the end, the order # of all other keys won't matter). no combination should be triggered - handle_keycode(_key_to_code, {}, new_event(*combination[1]), uinput) - handle_keycode(_key_to_code, {}, new_event(*combination[0]), uinput) + keycode_mapper.handle_keycode(new_event(*combination[1])) + keycode_mapper.handle_keycode(new_event(*combination[0])) self.assertEqual(len(uinput_write_history), 6) self.assertEqual(uinput_write_history[4].t, (EV_KEY, 2, 1)) self.assertEqual(uinput_write_history[5].t, (EV_KEY, 1, 1)) @@ -321,7 +395,7 @@ class TestKeycodeMapper(unittest.TestCase): def test_combination_keycode_2(self): combination_1 = ( (EV_KEY, 1, 1), - (EV_KEY, 2, 1), + (EV_ABS, ABS_Y, -MAX_ABS), (EV_KEY, 3, 1), (EV_KEY, 4, 1) ) @@ -337,30 +411,44 @@ class TestKeycodeMapper(unittest.TestCase): up_5 = (EV_KEY, 5, 0) up_4 = (EV_KEY, 4, 0) + def sign_value(key): + return key[0], key[1], key[2] / abs(key[2]) + _key_to_code = { - combination_1: 101, + # key_to_code is supposed to only contain normalized values + tuple([sign_value(a) for a in combination_1]): 101, combination_2: 102, (down_5,): 103 } uinput = UInput() - # 10 and 11: more key-down events than needed - handle_keycode(_key_to_code, {}, new_event(EV_KEY, 10, 1), uinput) - handle_keycode(_key_to_code, {}, new_event(*combination_1[0]), uinput) - handle_keycode(_key_to_code, {}, new_event(*combination_1[1]), uinput) - handle_keycode(_key_to_code, {}, new_event(*combination_1[2]), uinput) - handle_keycode(_key_to_code, {}, new_event(EV_KEY, 11, 1), uinput) - handle_keycode(_key_to_code, {}, new_event(*combination_1[3]), uinput) + + source = InputDevice('/dev/input/event30') + + keycode_mapper = KeycodeMapper( + source, self.mapping, uinput, + _key_to_code, {} + ) + + # 10 and 11: insert some more arbitrary key-down events, + # they should not break the combinations + keycode_mapper.handle_keycode(new_event(EV_KEY, 10, 1)) + keycode_mapper.handle_keycode(new_event(*combination_1[0])) + keycode_mapper.handle_keycode(new_event(*combination_1[1])) + keycode_mapper.handle_keycode(new_event(*combination_1[2])) + keycode_mapper.handle_keycode(new_event(EV_KEY, 11, 1)) + keycode_mapper.handle_keycode(new_event(*combination_1[3])) self.assertEqual(len(uinput_write_history), 6) - # the first event is written and then the triggered combination - self.assertEqual(uinput_write_history[1].t, (EV_KEY, 1, 1)) - self.assertEqual(uinput_write_history[2].t, (EV_KEY, 2, 1)) - self.assertEqual(uinput_write_history[3].t, (EV_KEY, 3, 1)) + # the first events are written and then the triggered combination, + # while the triggering event is the only one that is omitted + self.assertEqual(uinput_write_history[1].t, combination_1[0]) + self.assertEqual(uinput_write_history[2].t, combination_1[1]) + self.assertEqual(uinput_write_history[3].t, combination_1[2]) self.assertEqual(uinput_write_history[5].t, (EV_KEY, 101, 1)) # while the combination is down, another unrelated key can be used - handle_keycode(_key_to_code, {}, new_event(*down_5), uinput) + keycode_mapper.handle_keycode(new_event(*down_5)) # the keycode_mapper searches for subsets of the current held-down # keys to activate combinations, down_5 should not trigger them # again. @@ -369,8 +457,8 @@ class TestKeycodeMapper(unittest.TestCase): # release the combination by releasing the last key, and release # the unrelated key - handle_keycode(_key_to_code, {}, new_event(*up_4), uinput) - handle_keycode(_key_to_code, {}, new_event(*up_5), uinput) + keycode_mapper.handle_keycode(new_event(*up_4)) + keycode_mapper.handle_keycode(new_event(*up_5)) self.assertEqual(len(uinput_write_history), 9) self.assertEqual(uinput_write_history[7].t, (EV_KEY, 101, 0)) @@ -393,8 +481,13 @@ class TestKeycodeMapper(unittest.TestCase): macro_mapping[((EV_KEY, 1, 1),)].set_handler(lambda *args: history.append(args)) macro_mapping[((EV_KEY, 2, 1),)].set_handler(lambda *args: history.append(args)) - handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None) - handle_keycode({}, macro_mapping, new_event(EV_KEY, 2, 1), None) + keycode_mapper = KeycodeMapper( + self.source, self.mapping, None, + {}, macro_mapping + ) + + keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 1)) + keycode_mapper.handle_keycode(new_event(EV_KEY, 2, 1)) loop = asyncio.get_event_loop() @@ -413,8 +506,8 @@ class TestKeycodeMapper(unittest.TestCase): # releasing stuff self.assertIn((EV_KEY, 1), unreleased) self.assertIn((EV_KEY, 2), unreleased) - handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 0), None) - handle_keycode({}, macro_mapping, new_event(EV_KEY, 2, 0), None) + keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 0)) + keycode_mapper.handle_keycode(new_event(EV_KEY, 2, 0)) self.assertNotIn((EV_KEY, 1), unreleased) self.assertNotIn((EV_KEY, 2), unreleased) loop.run_until_complete(asyncio.sleep(0.1)) @@ -440,9 +533,14 @@ class TestKeycodeMapper(unittest.TestCase): macro_mapping[((EV_KEY, 1, 1),)].set_handler(handler) + keycode_mapper = KeycodeMapper( + self.source, self.mapping, None, + {}, macro_mapping + ) + """start macro""" - handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None) + keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 1)) loop = asyncio.get_event_loop() @@ -456,7 +554,7 @@ class TestKeycodeMapper(unittest.TestCase): """stop macro""" - handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 0), None) + keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 0)) loop.run_until_complete(asyncio.sleep(keystroke_sleep * 10 / 1000)) @@ -510,9 +608,14 @@ class TestKeycodeMapper(unittest.TestCase): macro_mapping[((EV_KEY, 2, 1),)].set_handler(handler) macro_mapping[((EV_KEY, 3, 1),)].set_handler(handler) + keycode_mapper = KeycodeMapper( + self.source, self.mapping, None, + {}, macro_mapping + ) + """start macro 2""" - handle_keycode({}, macro_mapping, new_event(EV_KEY, 2, 1), None) + keycode_mapper.handle_keycode(new_event(EV_KEY, 2, 1)) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.sleep(0.1)) @@ -522,8 +625,8 @@ class TestKeycodeMapper(unittest.TestCase): # spam garbage events for _ in range(5): - handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None) - handle_keycode({}, macro_mapping, new_event(EV_KEY, 3, 1), None) + keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 1)) + keycode_mapper.handle_keycode(new_event(EV_KEY, 3, 1)) loop.run_until_complete(asyncio.sleep(0.05)) self.assertTrue(active_macros[(EV_KEY, 1)].is_holding()) self.assertTrue(active_macros[(EV_KEY, 1)].running) @@ -541,7 +644,7 @@ class TestKeycodeMapper(unittest.TestCase): self.assertNotIn((code_d, 0), history) # stop macro 2 - handle_keycode({}, macro_mapping, new_event(EV_KEY, 2, 0), None) + keycode_mapper.handle_keycode(new_event(EV_KEY, 2, 0)) loop.run_until_complete(asyncio.sleep(0.1)) # it stopped and didn't restart, so the count stays at 1 @@ -562,7 +665,7 @@ class TestKeycodeMapper(unittest.TestCase): history = [] - handle_keycode({}, macro_mapping, new_event(EV_KEY, 2, 1), None) + keycode_mapper.handle_keycode(new_event(EV_KEY, 2, 1)) loop.run_until_complete(asyncio.sleep(0.1)) self.assertEqual(history.count((code_c, 1)), 1) self.assertEqual(history.count((code_c, 0)), 1) @@ -570,8 +673,8 @@ class TestKeycodeMapper(unittest.TestCase): # spam garbage events again, this time key-up events on all other # macros for _ in range(5): - handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 0), None) - handle_keycode({}, macro_mapping, new_event(EV_KEY, 3, 0), None) + keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 0)) + keycode_mapper.handle_keycode(new_event(EV_KEY, 3, 0)) loop.run_until_complete(asyncio.sleep(0.05)) self.assertFalse(active_macros[(EV_KEY, 1)].is_holding()) self.assertFalse(active_macros[(EV_KEY, 1)].running) @@ -581,7 +684,7 @@ class TestKeycodeMapper(unittest.TestCase): self.assertFalse(active_macros[(EV_KEY, 3)].running) # stop macro 2 - handle_keycode({}, macro_mapping, new_event(EV_KEY, 2, 0), None) + keycode_mapper.handle_keycode(new_event(EV_KEY, 2, 0)) loop.run_until_complete(asyncio.sleep(0.1)) # was started only once self.assertEqual(history.count((code_c, 1)), 1) @@ -591,8 +694,8 @@ class TestKeycodeMapper(unittest.TestCase): self.assertEqual(history.count((code_d, 0)), 1) # stop all macros - handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 0), None) - handle_keycode({}, macro_mapping, new_event(EV_KEY, 3, 0), None) + keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 0)) + keycode_mapper.handle_keycode(new_event(EV_KEY, 3, 0)) loop.run_until_complete(asyncio.sleep(0.1)) # it's stopped and won't write stuff anymore @@ -629,14 +732,19 @@ class TestKeycodeMapper(unittest.TestCase): macro_mapping[((EV_KEY, 1, 1),)].set_handler(handler) - handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None) + keycode_mapper = KeycodeMapper( + self.source, self.mapping, None, + {}, macro_mapping + ) + + keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 1)) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.sleep(0.1)) for _ in range(5): self.assertTrue(active_macros[(EV_KEY, 1)].is_holding()) self.assertTrue(active_macros[(EV_KEY, 1)].running) - handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None) + keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 1)) loop.run_until_complete(asyncio.sleep(0.05)) # duplicate key down events don't do anything @@ -646,7 +754,7 @@ class TestKeycodeMapper(unittest.TestCase): self.assertEqual(history.count((code_c, 0)), 0) # stop - handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 0), None) + keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 0)) loop.run_until_complete(asyncio.sleep(0.1)) self.assertEqual(history.count((code_a, 1)), 1) self.assertEqual(history.count((code_a, 0)), 1) @@ -706,19 +814,29 @@ class TestKeycodeMapper(unittest.TestCase): macros_uinput = UInput() keys_uinput = UInput() + keycode_mapper = KeycodeMapper( + self.source, self.mapping, macros_uinput, + {}, macro_mapping + ) + # key up won't do anything - handle_keycode({}, macro_mapping, new_event(*up_0), macros_uinput) - handle_keycode({}, macro_mapping, new_event(*up_1), macros_uinput) - handle_keycode({}, macro_mapping, new_event(*up_2), macros_uinput) + keycode_mapper.handle_keycode(new_event(*up_0)) + keycode_mapper.handle_keycode(new_event(*up_1)) + keycode_mapper.handle_keycode(new_event(*up_2)) loop.run_until_complete(asyncio.sleep(0.1)) self.assertEqual(len(active_macros), 0) """start macros""" - handle_keycode({}, macro_mapping, new_event(*down_0), keys_uinput) + keycode_mapper = KeycodeMapper( + self.source, self.mapping, keys_uinput, + {}, macro_mapping + ) + + keycode_mapper.handle_keycode(new_event(*down_0)) self.assertEqual(keys_uinput.write_count, 1) - handle_keycode({}, macro_mapping, new_event(*down_1), keys_uinput) - handle_keycode({}, macro_mapping, new_event(*down_2), keys_uinput) + keycode_mapper.handle_keycode(new_event(*down_1)) + keycode_mapper.handle_keycode(new_event(*down_2)) self.assertEqual(keys_uinput.write_count, 1) # let the mainloop run for some time so that the macro does its stuff @@ -738,9 +856,14 @@ class TestKeycodeMapper(unittest.TestCase): """stop macros""" + keycode_mapper = KeycodeMapper( + self.source, self.mapping, None, + {}, macro_mapping + ) + # releasing the last key of a combination releases the whole macro - handle_keycode({}, macro_mapping, new_event(*up_1), None) - handle_keycode({}, macro_mapping, new_event(*up_2), None) + keycode_mapper.handle_keycode(new_event(*up_1)) + keycode_mapper.handle_keycode(new_event(*up_2)) self.assertIn(down_0[:2], unreleased) self.assertNotIn(down_1[:2], unreleased) @@ -812,11 +935,16 @@ class TestKeycodeMapper(unittest.TestCase): macro_mapping[(right,)].set_handler(handler) macro_mapping[(left,)].set_handler(handler) - handle_keycode({}, macro_mapping, new_event(*right), None) + keycode_mapper = KeycodeMapper( + self.source, self.mapping, None, + {}, macro_mapping + ) + + keycode_mapper.handle_keycode(new_event(*right)) self.assertIn((EV_ABS, ABS_HAT0X), unreleased) - handle_keycode({}, macro_mapping, new_event(*release), None) + keycode_mapper.handle_keycode(new_event(*release)) self.assertNotIn((EV_ABS, ABS_HAT0X), unreleased) - handle_keycode({}, macro_mapping, new_event(*left), None) + keycode_mapper.handle_keycode(new_event(*left)) self.assertIn((EV_ABS, ABS_HAT0X), unreleased) loop = asyncio.get_event_loop() @@ -840,13 +968,18 @@ class TestKeycodeMapper(unittest.TestCase): uinput = UInput() + keycode_mapper = KeycodeMapper( + self.source, self.mapping, uinput, + _key_to_code, {} + ) + """positive""" for _ in range(1, 20): - handle_keycode(_key_to_code, {}, new_event(*trigger, 1), uinput) + keycode_mapper.handle_keycode(new_event(*trigger, 1)) self.assertIn(trigger, unreleased) - handle_keycode(_key_to_code, {}, new_event(*trigger, 0), uinput) + keycode_mapper.handle_keycode(new_event(*trigger, 0)) self.assertNotIn(trigger, unreleased) self.assertEqual(len(uinput_write_history), 2) @@ -854,10 +987,10 @@ class TestKeycodeMapper(unittest.TestCase): """negative""" for _ in range(1, 20): - handle_keycode(_key_to_code, {}, new_event(*trigger, -1), uinput) + keycode_mapper.handle_keycode(new_event(*trigger, -1)) self.assertIn(trigger, unreleased) - handle_keycode(_key_to_code, {}, new_event(*trigger, 0), uinput) + keycode_mapper.handle_keycode(new_event(*trigger, 0)) self.assertNotIn(trigger, unreleased) self.assertEqual(len(uinput_write_history), 4) @@ -880,13 +1013,19 @@ class TestKeycodeMapper(unittest.TestCase): } uinput = UInput() - handle_keycode(_key_to_code, {}, new_event(*ev_1), uinput) + + keycode_mapper = KeycodeMapper( + self.source, self.mapping, uinput, + _key_to_code, {} + ) + + keycode_mapper.handle_keycode(new_event(*ev_1)) for _ in range(10): - handle_keycode(_key_to_code, {}, new_event(*ev_2), uinput) + keycode_mapper.handle_keycode(new_event(*ev_2)) self.assertIn(key, unreleased) - handle_keycode(_key_to_code, {}, new_event(*ev_3), uinput) + keycode_mapper.handle_keycode(new_event(*ev_3)) self.assertNotIn(key, unreleased) self.assertEqual(len(uinput_write_history), 2) @@ -915,16 +1054,21 @@ class TestKeycodeMapper(unittest.TestCase): uinput = UInput() + keycode_mapper = KeycodeMapper( + self.source, self.mapping, uinput, + _key_to_code, {} + ) + """single keys""" # down - handle_keycode(_key_to_code, {}, new_event(*ev_1), uinput) - handle_keycode(_key_to_code, {}, new_event(*ev_3), uinput) + keycode_mapper.handle_keycode(new_event(*ev_1)) + keycode_mapper.handle_keycode(new_event(*ev_3)) self.assertIn(ev_1[:2], unreleased) self.assertIn(ev_3[:2], unreleased) # up - handle_keycode(_key_to_code, {}, new_event(*ev_2), uinput) - handle_keycode(_key_to_code, {}, new_event(*ev_4), uinput) + keycode_mapper.handle_keycode(new_event(*ev_2)) + keycode_mapper.handle_keycode(new_event(*ev_4)) self.assertNotIn(ev_1[:2], unreleased) self.assertNotIn(ev_3[:2], unreleased) @@ -935,8 +1079,8 @@ class TestKeycodeMapper(unittest.TestCase): """a combination that ends in a disabled key""" # ev_5 should be forwarded and the combination triggered - handle_keycode(_key_to_code, {}, new_event(*combi_1[0]), uinput) - handle_keycode(_key_to_code, {}, new_event(*combi_1[1]), uinput) + keycode_mapper.handle_keycode(new_event(*combi_1[0])) + keycode_mapper.handle_keycode(new_event(*combi_1[1])) self.assertEqual(len(uinput_write_history), 4) self.assertEqual(uinput_write_history[2].t, (EV_KEY, KEY_A, 1)) self.assertEqual(uinput_write_history[3].t, (EV_KEY, 62, 1)) @@ -950,14 +1094,14 @@ class TestKeycodeMapper(unittest.TestCase): # release the last key of the combi first, it should # release what the combination maps to event = new_event(combi_1[1][0], combi_1[1][1], 0) - handle_keycode(_key_to_code, {}, event, uinput) + keycode_mapper.handle_keycode(event) self.assertEqual(len(uinput_write_history), 5) self.assertEqual(uinput_write_history[-1].t, (EV_KEY, 62, 0)) self.assertIn(combi_1[0][:2], unreleased) self.assertNotIn(combi_1[1][:2], unreleased) event = new_event(combi_1[0][0], combi_1[0][1], 0) - handle_keycode(_key_to_code, {}, event, uinput) + keycode_mapper.handle_keycode(event) self.assertEqual(len(uinput_write_history), 6) self.assertEqual(uinput_write_history[-1].t, (EV_KEY, KEY_A, 0)) self.assertNotIn(combi_1[0][:2], unreleased) @@ -966,22 +1110,22 @@ class TestKeycodeMapper(unittest.TestCase): """a combination that starts with a disabled key""" # only the combination should get triggered - handle_keycode(_key_to_code, {}, new_event(*combi_2[0]), uinput) - handle_keycode(_key_to_code, {}, new_event(*combi_2[1]), uinput) + keycode_mapper.handle_keycode(new_event(*combi_2[0])) + keycode_mapper.handle_keycode(new_event(*combi_2[1])) self.assertEqual(len(uinput_write_history), 7) self.assertEqual(uinput_write_history[-1].t, (EV_KEY, 63, 1)) # release the last key of the combi first, it should # release what the combination maps to event = new_event(combi_2[1][0], combi_2[1][1], 0) - handle_keycode(_key_to_code, {}, event, uinput) + keycode_mapper.handle_keycode(event) self.assertEqual(len(uinput_write_history), 8) self.assertEqual(uinput_write_history[-1].t, (EV_KEY, 63, 0)) # the first key of combi_2 is disabled, so it won't write another # key-up event event = new_event(combi_2[0][0], combi_2[0][1], 0) - handle_keycode(_key_to_code, {}, event, uinput) + keycode_mapper.handle_keycode(event) self.assertEqual(len(uinput_write_history), 8) def test_combination_keycode_macro_mix(self): @@ -1007,8 +1151,13 @@ class TestKeycodeMapper(unittest.TestCase): loop = asyncio.get_event_loop() + keycode_mapper = KeycodeMapper( + self.source, self.mapping, uinput, + _key_to_code, macro_mapping + ) + # macro starts - handle_keycode(_key_to_code, macro_mapping, new_event(*down_1), uinput) + keycode_mapper.handle_keycode(new_event(*down_1)) loop.run_until_complete(asyncio.sleep(0.05)) self.assertEqual(len(uinput_write_history), 0) self.assertGreater(len(macro_history), 1) @@ -1016,7 +1165,7 @@ class TestKeycodeMapper(unittest.TestCase): self.assertIn((92, 1), macro_history) # combination triggered - handle_keycode(_key_to_code, macro_mapping, new_event(*down_2), uinput) + keycode_mapper.handle_keycode(new_event(*down_2)) self.assertIn(down_1[:2], unreleased) self.assertIn(down_2[:2], unreleased) self.assertEqual(uinput_write_history[0].t, (EV_KEY, 91, 1)) @@ -1028,7 +1177,7 @@ class TestKeycodeMapper(unittest.TestCase): self.assertGreater(len_b, len_a) # release - handle_keycode(_key_to_code, macro_mapping, new_event(*up_1), uinput) + keycode_mapper.handle_keycode(new_event(*up_1)) self.assertNotIn(down_1[:2], unreleased) self.assertIn(down_2[:2], unreleased) loop.run_until_complete(asyncio.sleep(0.05)) @@ -1038,7 +1187,7 @@ class TestKeycodeMapper(unittest.TestCase): # not running anymore self.assertEqual(len_c, len_d) - handle_keycode(_key_to_code, macro_mapping, new_event(*up_2), uinput) + keycode_mapper.handle_keycode(new_event(*up_2)) self.assertEqual(uinput_write_history[1].t, (EV_KEY, 91, 0)) self.assertEqual(len(uinput_write_history), 2) self.assertNotIn(down_1[:2], unreleased) @@ -1068,24 +1217,29 @@ class TestKeycodeMapper(unittest.TestCase): k2c = {combination: 30} uinput = UInput() + + keycode_mapper = KeycodeMapper( + self.source, self.mapping, + uinput, k2c, {} + ) - handle_keycode(k2c, {}, new_event(*btn_down), uinput) + keycode_mapper.handle_keycode(new_event(*btn_down)) # "forwarding" self.assertEqual(uinput_write_history[0].t, btn_down) - handle_keycode(k2c, {}, new_event(*scroll), uinput) + keycode_mapper.handle_keycode(new_event(*scroll)) # "maps to 30" self.assertEqual(uinput_write_history[1].t, (1, 30, 1)) for _ in range(5): # keep scrolling # "duplicate key down" - handle_keycode(k2c, {}, new_event(*scroll), uinput) + keycode_mapper.handle_keycode(new_event(*scroll)) # nothing new since all of them were duplicate key downs self.assertEqual(len(uinput_write_history), 2) - handle_keycode(k2c, {}, new_event(*btn_up), uinput) + keycode_mapper.handle_keycode(new_event(*btn_up)) # "forwarding release" self.assertEqual(uinput_write_history[2].t, btn_up) @@ -1093,14 +1247,14 @@ class TestKeycodeMapper(unittest.TestCase): # it should be ignored as duplicate key-down self.assertEqual(len(uinput_write_history), 3) # "forwarding" (should be "duplicate key down") - handle_keycode(k2c, {}, new_event(*scroll), uinput) + keycode_mapper.handle_keycode(new_event(*scroll)) self.assertEqual(len(uinput_write_history), 3) # the failure to release the mapped key # forward=False is what the debouncer uses, because a # "scroll release" doesn't actually exist so it is not actually # written if it doesn't release any mapping - handle_keycode(k2c, {}, new_event(*scroll_up), uinput, forward=False) + keycode_mapper.handle_keycode(new_event(*scroll_up), forward=False) # 30 should be released self.assertEqual(uinput_write_history[3].t, (1, 30, 0)) diff --git a/tests/testcases/test_reader.py b/tests/testcases/test_reader.py index 09cdb95a..a66c217e 100644 --- a/tests/testcases/test_reader.py +++ b/tests/testcases/test_reader.py @@ -25,7 +25,7 @@ import multiprocessing from evdev.ecodes import EV_KEY, EV_ABS, ABS_HAT0X, ABS_HAT0Y, KEY_COMMA, \ BTN_LEFT, BTN_TOOL_DOUBLETAP, ABS_Z, ABS_Y, ABS_MISC, KEY_A, \ - EV_REL, REL_WHEEL, REL_X + EV_REL, REL_WHEEL, REL_X, ABS_X, ABS_RZ from keymapper.dev.reader import keycode_reader, will_report_up, \ event_unix_time @@ -253,7 +253,10 @@ class TestReader(unittest.TestCase): # if their purpose is "buttons" custom_mapping.set('gamepad.joystick.left_purpose', BUTTONS) pending_events['gamepad'] = [ - new_event(EV_ABS, ABS_Y, MAX_ABS) + new_event(EV_ABS, ABS_Y, MAX_ABS), + # the value of that one is interpreted as release, because + # it is too small + new_event(EV_ABS, ABS_X, MAX_ABS // 10) ] keycode_reader.start_reading('gamepad') wait(keycode_reader._pipe[0].poll, 0.5) @@ -271,6 +274,36 @@ class TestReader(unittest.TestCase): self.assertEqual(keycode_reader.read(), None) self.assertEqual(len(keycode_reader._unreleased), 0) + def test_combine_triggers(self): + pipe = multiprocessing.Pipe() + keycode_reader._pipe = pipe + + i = 0 + def next_timestamp(): + nonlocal i + i += 1 + return 100 * i + + # based on an observed bug + pipe[1].send(new_event(3, 1, 0, next_timestamp())) + pipe[1].send(new_event(3, 0, 0, next_timestamp())) + pipe[1].send(new_event(3, 2, 1, next_timestamp())) + self.assertEqual(keycode_reader.read(), (EV_ABS, ABS_Z, 1)) + pipe[1].send(new_event(3, 0, 0, next_timestamp())) + pipe[1].send(new_event(3, 5, 1, next_timestamp())) + self.assertEqual(keycode_reader.read(), ((EV_ABS, ABS_Z, 1), (EV_ABS, ABS_RZ, 1))) + pipe[1].send(new_event(3, 5, 0, next_timestamp())) + pipe[1].send(new_event(3, 0, 0, next_timestamp())) + pipe[1].send(new_event(3, 1, 0, next_timestamp())) + self.assertEqual(keycode_reader.read(), None) + pipe[1].send(new_event(3, 2, 1, next_timestamp())) + pipe[1].send(new_event(3, 1, 0, next_timestamp())) + pipe[1].send(new_event(3, 0, 0, next_timestamp())) + # due to not properly handling the duplicate down event it cleared + # the combination and returned it. Instead it should report None + # and by doing that keep the previous combination. + self.assertEqual(keycode_reader.read(), None) + def test_ignore_btn_left(self): # click events are ignored because overwriting them would render the # mouse useless, but a mouse is needed to stop the injection @@ -456,6 +489,8 @@ class TestReader(unittest.TestCase): def test_prioritizing_3_normalize(self): # take the sign of -1234, just like in test_prioritizing_2_normalize pending_events['device 1'] = [ + # HAT0X usually reports only -1, 0 and 1, but that shouldn't + # matter. Everything is normalized. new_event(EV_ABS, ABS_HAT0X, -1234, 1234.0000), new_event(EV_ABS, ABS_HAT0Y, 0, 1234.0030) # ignored # this time don't release anything as well, but it's not