Properly forwarding unmapped gamepad events

pull/31/head
sezanzeb 4 years ago
parent c33d8e8255
commit db047390d4

@ -99,7 +99,6 @@ class EventProducer:
except OverflowError:
# screwed up the calculation of mouse movements
logger.error('OverflowError (%s, %s, %s)', ev_type, keycode, value)
pass
def debounce(self, debounce_id, func, args, ticks):
"""Debounce a function call.

@ -33,7 +33,7 @@ from evdev.ecodes import EV_KEY, EV_REL
from keymapper.logger import logger
from keymapper.getdevices import get_devices, is_gamepad
from keymapper.dev.keycode_mapper import handle_keycode
from keymapper.dev.keycode_mapper import KeycodeMapper
from keymapper.dev import utils
from keymapper.dev.event_producer import EventProducer
from keymapper.dev.macros import parse, is_this_a_macro
@ -515,6 +515,11 @@ class Injector:
source.path, source.fd
)
keycode_handler = KeycodeMapper(
source, self.mapping, uinput,
self._key_to_code, macros
)
async for event in source.async_read_loop():
if self._event_producer.is_handled(event):
# the event_producer will take care of it
@ -522,14 +527,11 @@ class Injector:
continue
# for mapped stuff
if utils.should_map_event_as_btn(source, event, self.mapping):
if utils.should_map_event_as_btn(event, self.mapping):
will_report_key_up = utils.will_report_key_up(event)
handle_keycode(
self._key_to_code,
macros,
keycode_handler.handle_keycode(
event,
uinput,
)
if not will_report_key_up:
@ -538,11 +540,9 @@ class Injector:
release = evdev.InputEvent(0, 0, event.type, event.code, 0)
self._event_producer.debounce(
debounce_id=(event.type, event.code, event.value),
func=handle_keycode,
func=keycode_handler.handle_keycode,
args=(
self._key_to_code, macros,
release,
uinput,
False
),
ticks=3,
@ -551,7 +551,6 @@ class Injector:
continue
# forward the rest
# TODO triggers should retain their original value if not mapped
uinput.write(event.type, event.code, event.value)
# this already includes SYN events, so need to syn here again

@ -29,43 +29,46 @@ from evdev.ecodes import EV_KEY, EV_ABS
from keymapper.logger import logger
from keymapper.mapping import DISABLE_CODE
from keymapper.dev import utils
# maps mouse buttons to macro instances that have been executed. They may
# still be running or already be done. Just like unreleased, this is a
# mapping of (type, code). The value is not included in the key, because
# a key release event with a value of 0 needs to be able to find the
# running macro. The downside is that a d-pad cannot execute two macros at
# once, one for each direction. Only sequentially.W
# this state is shared by all KeycodeMappers of this process
# maps mouse buttons to macro instances that have been executed.
# They may still be running or already be done. Just like unreleased,
# this is a mapping of (type, code). The value is not included in the
# key, because a key release event with a value of 0 needs to be able
# to find the running macro. The downside is that a d-pad cannot
# execute two macros at once, one for each direction.
# Only sequentially.
active_macros = {}
# mapping of future release event (type, code) to (output, input event),
# with output being a tuple of (type, code) as well. All key-up events
# have a value of 0, so it is not added to the tuple.
# This is needed in order to release the correct event mapped on a
# D-Pad. Each direction on one D-Pad axis reports the same type and
# code, but different values. There cannot be both at the same time,
# as pressing one side of a D-Pad forces the other side to go up.
# If both sides of a D-Pad are mapped to different event-codes, this data
# structure helps to figure out which of those two to release on an event
# of value 0. Same goes for the Wheel.
# The input event is remembered to make sure no duplicate down-events are
# written. Since wheels report a lot of "down" events that don't serve
# any purpose when mapped to a key, those duplicate down events should be
# removed. If the same type and code arrives but with a different value
# (direction), there must be a way to check if the event is actually a
# duplicate and not a different event.
# mapping of future release event (type, code) to an Unreleased object,
# All key-up events have a value of 0, so it is not added to
# the tuple. This is needed in order to release the correct event
# mapped on a D-Pad. Each direction on one D-Pad axis reports the
# same type and code, but different values. There cannot be both at
# the same time, as pressing one side of a D-Pad forces the other
# side to go up. If both sides of a D-Pad are mapped to different
# event-codes, this data structure helps to figure out which of those
# two to release on an event of value 0. Same goes for the Wheel.
# The input event is remembered to make sure no duplicate down-events
# are written. Since wheels report a lot of "down" events that don't
# serve any purpose when mapped to a key, those duplicate down events
# should be removed. If the same type and code arrives but with a
# different value (direction), there must be a way to check if the
# event is actually a duplicate and not a different event.
unreleased = {}
def is_key_down(event):
"""Is this event a key press."""
return event.value != 0
def is_key_down(value):
"""Is this event value a key press."""
return value != 0
def is_key_up(event):
"""Is this event a key release."""
return event.value == 0
def is_key_up(value):
"""Is this event value a key release."""
return value == 0
def write(uinput, key):
@ -84,6 +87,8 @@ def subsets(combination):
If combination is only one element long it returns an empty list,
because it's not a combination and there is no reason to iterate.
Includes the complete input as well.
Parameters
-----------
combination : tuple
@ -121,27 +126,41 @@ class Unreleased:
self.target_type_code = target_type_code
self.input_event_tuple = input_event_tuple
self.key = key
if (
not isinstance(input_event_tuple[0], int) or
len(input_event_tuple) != 3
):
raise ValueError(
'Expected input_event_tuple to be a 3-tuple of ints, but '
f'got {input_event_tuple}'
)
unreleased[input_event_tuple[:2]] = self
def __str__(self):
return (
f'target{self.target_type_code} '
f'input{self.input_event_tuple} '
f'key{self.key}'
'Unreleased('
f'target{self.target_type_code},'
f'input{self.input_event_tuple},'
f'key{"(None)" if self.key is None else self.key}'
')'
)
def __repr__(self):
return self.__str__()
def find_by_event(event):
def find_by_event(key):
"""Find an unreleased entry by an event.
If such an entry exists, it was created by an event that is exactly like
the input parameter (except for the timestamp).
If such an entry exists, it was created by an event that is exactly
like the input parameter (except for the timestamp).
That doesn't mean it triggered something, only that it was seen before.
"""
unreleased_entry = unreleased.get((event.type, event.code))
event_tuple = (event.type, event.code, event.value)
if unreleased_entry and unreleased_entry.input_event_tuple == event_tuple:
unreleased_entry = unreleased.get(key[:2])
if unreleased_entry and unreleased_entry.input_event_tuple == key:
return unreleased_entry
return None
@ -150,9 +169,9 @@ def find_by_event(event):
def find_by_key(key):
"""Find an unreleased entry by a combination of keys.
If such an entry exist, it was created when a combination of keys (which
matches the parameter) (can also be of len 1 = single key) ended
up triggering something.
If such an entry exist, it was created when a combination of keys
(which matches the parameter) (can also be of len 1 = single key)
ended up triggering something.
Parameters
----------
@ -165,65 +184,6 @@ def find_by_key(key):
return None
def _get_key(event, key_to_code, macros):
"""If the event triggers stuff, get the key for that.
This key can be used to index `key_to_code` and `macros` and it might
be a combination of keys.
Otherwise, for unmapped events, returns the input.
The return format is always a tuple of 3-tuples, each 3-tuple being
type, code, value (int, int, int)
"""
# The key used to index the mappings `key_to_code` and `macros`.
# If the key triggers a combination, the returned key will be that one
# instead
key = ((event.type, event.code, event.value),)
unreleased_entry = find_by_event(event)
if unreleased_entry is not None and unreleased_entry.key is not None:
# seen before. If this key triggered a combination,
# use the combination that was triggered by this as key.
return unreleased_entry.key
if is_key_down(event):
# get the key/combination that the key-down would trigger
# the triggering key-down has to be the last element in combination,
# all others can have any arbitrary order. By checking all unreleased
# keys, a + b + c takes priority over b + c, if both mappings exist.
# WARNING! the combination-down triggers, but a single key-up releases.
# Do not check if key in macros and such, if it is an up event. It's
# going to be False.
combination = tuple([
value.input_event_tuple for value
in unreleased.values()
])
if key[0] not in combination: # might be a duplicate-down event
combination += key
# find any triggered combination. macros and key_to_code contain
# every possible equivalent permutation of possible macros. The last
# key in the combination needs to remain the newest key though.
for subset in subsets(combination):
if subset[-1] != key[0]:
# only combinations that are completed and triggered by the
# newest input are of interest
continue
if subset in macros or subset in key_to_code:
key = subset
break
else:
# no subset found, just use the key. all indices are tuples of
# tuples, both for combinations and single keys.
if event.value == 1 and len(combination) > 1:
logger.key_spam(combination, 'unknown combination')
return key
def print_unreleased():
"""For debugging purposes."""
logger.debug('unreleased:')
@ -232,139 +192,258 @@ def print_unreleased():
]))
def handle_keycode(key_to_code, macros, event, uinput, forward=True):
"""Write mapped keycodes, forward unmapped ones and manage macros.
class KeycodeMapper:
def __init__(self, source, mapping, uinput, key_to_code, macros):
"""Create a keycode mapper for one virtual device.
As long as the provided event is mapped it will handle it, it won't
check any type, code or capability anymore. Otherwise it forwards
it as it is.
There may be multiple KeycodeMappers for one hardware device. They
share some state (unreleased and active_macros) with each other.
Parameters
----------
key_to_code : dict
mapping of ((type, code, value),) to linux-keycode
or multiple of those like ((...), (...), ...) for combinations
combinations need to be present in every possible valid ordering.
e.g. shift + alt + a and alt + shift + a
macros : dict
mapping of ((type, code, value),) to _Macro objects.
Combinations work similar as in key_to_code
event : evdev.InputEvent
forward : bool
if False, will not forward the event if it didn't trigger any mapping
"""
if event.type == EV_KEY and event.value == 2:
# button-hold event. Linux creates them on its own for the
# injection-fake-device if the release event won't appear,
# no need to forward or map them.
return
# the tuple of the actual input event. Used to forward the event if it is
# not mapped, and to index unreleased and active_macros. stays constant
event_tuple = (event.type, event.code, event.value)
type_code = (event.type, event.code)
active_macro = active_macros.get(type_code)
key = _get_key(event, key_to_code, macros)
is_mapped = key in macros or key in key_to_code
"""Releasing keys and macros"""
if is_key_up(event):
if active_macro is not None and active_macro.is_holding():
# Tell the macro for that keycode that the key is released and
# let it decide what to do with that information.
active_macro.release_key()
logger.key_spam(key, 'releasing macro')
if type_code in unreleased:
# figure out what this release event was for
target_type, target_code = unreleased[type_code].target_type_code
del unreleased[type_code]
if target_code == DISABLE_CODE:
logger.key_spam(key, 'releasing disabled key')
elif target_code is None:
logger.key_spam(key, 'releasing key')
elif type_code != (target_type, target_code):
# release what the input is mapped to
logger.key_spam(key, 'releasing %s', target_code)
write(uinput, (target_type, target_code, 0))
elif forward:
# forward the release event
logger.key_spam(key, 'forwarding release')
write(uinput, (target_type, target_code, 0))
Parameters
----------
source : InputDevice
where events used in handle_keycode come from
mapping : Mapping
the mapping that is the source of key_to_code and macros,
only used to query config values.
uinput : UInput:
where to inject events to
key_to_code : dict
mapping of ((type, code, value),) to linux-keycode
or multiple of those like ((...), (...), ...) for combinations
combinations need to be present in every possible valid ordering.
e.g. shift + alt + a and alt + shift + a.
This is needed to query keycodes more efficiently without having
to search mapping each time.
macros : dict
mapping of ((type, code, value),) to _Macro objects.
Combinations work similar as in key_to_code
"""
self.source = source
self.max_abs = utils.get_max_abs(source)
self.mapping = mapping
self.uinput = uinput
# some type checking, prevents me from forgetting what that stuff
# is supposed to be when writing tests.
# TODO create that stuff (including macros) from mapping here instead
# of the injector
for key in key_to_code:
for sub_key in key:
if abs(sub_key[2]) > 1:
raise ValueError(
f'Expected values to be one of -1, 0 or 1, '
f'but got {key}'
)
self.key_to_code = key_to_code
self.macros = macros
def _get_key(self, key):
"""If the event triggers stuff, get the key for that.
This key can be used to index `key_to_code` and `macros` and it might
be a combination of keys.
Otherwise, for unmapped events, returns the input.
The return format is always a tuple of 3-tuples, each 3-tuple being
type, code, value (int, int, int)
Parameters
----------
key : int, int, int
3-tuple of type, code, value
Value should be one of -1, 0 or 1
"""
unreleased_entry = find_by_event(key)
# The key used to index the mappings `key_to_code` and `macros`.
# If the key triggers a combination, the returned key will be that one
# instead
value = key[2]
key = (key,)
if unreleased_entry is not None and unreleased_entry.key is not None:
# seen before. If this key triggered a combination,
# use the combination that was triggered by this as key.
return unreleased_entry.key
if is_key_down(value):
# get the key/combination that the key-down would trigger
# the triggering key-down has to be the last element in
# combination, all others can have any arbitrary order. By
# checking all unreleased keys, a + b + c takes priority over
# b + c, if both mappings exist.
# WARNING! the combination-down triggers, but a single key-up
# releases. Do not check if key in macros and such, if it is an
# up event. It's going to be False.
combination = tuple([
value.input_event_tuple for value
in unreleased.values()
])
if key[0] not in combination: # might be a duplicate-down event
combination += key
# find any triggered combination. macros and key_to_code contain
# every possible equivalent permutation of possible macros. The
# last key in the combination needs to remain the newest key
# though.
for subset in subsets(combination):
if subset[-1] != key[0]:
# only combinations that are completed and triggered by
# the newest input are of interest
continue
if subset in self.macros or subset in self.key_to_code:
key = subset
break
else:
logger.key_spam(key, 'not forwarding release')
elif event.type != EV_ABS:
# ABS events might be spammed like crazy every time the position
# slightly changes
logger.key_spam(key, 'unexpected key up')
# everything that can be released is released now
return
"""Filtering duplicate key downs"""
if is_mapped and is_key_down(event):
# unmapped keys should not be filtered here, they should just
# be forwarded to populate unreleased and then be written.
if find_by_key(key) is not None:
# this key/combination triggered stuff before.
# duplicate key-down. skip this event. Avoid writing millions of
# key-down events when a continuous value is reported, for example
# for gamepad triggers or mouse-wheel-side buttons
logger.key_spam(key, 'duplicate key down')
# no subset found, just use the key. all indices are tuples of
# tuples, both for combinations and single keys.
if value == 1 and len(combination) > 1:
logger.key_spam(combination, 'unknown combination')
return key
def handle_keycode(self, event, forward=True):
"""Write mapped keycodes, forward unmapped ones and manage macros.
As long as the provided event is mapped it will handle it, it won't
check any type, code or capability anymore. Otherwise it forwards
it as it is.
Parameters
----------
event : evdev.InputEvent
forward : bool
if False, will not forward the event if it didn't trigger any
mapping
"""
if event.type == EV_KEY and event.value == 2:
# button-hold event. Linux creates them on its own for the
# injection-fake-device if the release event won't appear,
# no need to forward or map them.
return
# it would start a macro usually
if key in macros and active_macro is not None and active_macro.running:
# for key-down events and running macros, don't do anything.
# This avoids spawning a second macro while the first one is not
# finished, especially since gamepad-triggers report a ton of
# events with a positive value.
logger.key_spam(key, 'macro already running')
# normalize event numbers to one of -1, 0, +1. Otherwise
# mapping trigger values that are between 1 and 255 is not
# possible, because they might skip the 1 when pressed fast
# enough.
original_tuple = (event.type, event.code, event.value)
event.value = utils.normalize_value(event, self.max_abs)
# the tuple of the actual input event. Used to forward the event if
# it is not mapped, and to index unreleased and active_macros. stays
# constant
event_tuple = (event.type, event.code, event.value)
type_code = (event.type, event.code)
active_macro = active_macros.get(type_code)
key = self._get_key(event_tuple)
is_mapped = key in self.macros or key in self.key_to_code
"""Releasing keys and macros"""
if is_key_up(event.value):
if active_macro is not None and active_macro.is_holding():
# Tell the macro for that keycode that the key is released and
# let it decide what to do with that information.
active_macro.release_key()
logger.key_spam(key, 'releasing macro')
if type_code in unreleased:
# figure out what this release event was for
target_type, target_code = (
unreleased[type_code].target_type_code
)
del unreleased[type_code]
if target_code == DISABLE_CODE:
logger.key_spam(key, 'releasing disabled key')
elif target_code is None:
logger.key_spam(key, 'releasing key')
elif type_code != (target_type, target_code):
# release what the input is mapped to
logger.key_spam(key, 'releasing %s', target_code)
write(self.uinput, (target_type, target_code, 0))
elif forward:
# forward the release event
logger.key_spam((original_tuple,), 'forwarding release')
write(self.uinput, original_tuple)
else:
logger.key_spam(key, 'not forwarding release')
elif event.type != EV_ABS:
# ABS events might be spammed like crazy every time the
# position slightly changes
logger.key_spam(key, 'unexpected key up')
# everything that can be released is released now
return
"""starting new macros or injecting new keys"""
"""Filtering duplicate key downs"""
if is_key_down(event):
# also enter this for unmapped keys, as they might end up triggering
# a combination, so they should be remembered in unreleased
if is_mapped and is_key_down(event.value):
# unmapped keys should not be filtered here, they should just
# be forwarded to populate unreleased and then be written.
if key in macros:
macro = macros[key]
active_macros[type_code] = macro
Unreleased((None, None), event_tuple, key)
macro.press_key()
logger.key_spam(key, 'maps to macro %s', macro.code)
asyncio.ensure_future(macro.run())
return
if find_by_key(key) is not None:
# this key/combination triggered stuff before.
# duplicate key-down. skip this event. Avoid writing millions
# of key-down events when a continuous value is reported, for
# example for gamepad triggers or mouse-wheel-side buttons
logger.key_spam(key, 'duplicate key down')
return
# it would start a macro usually
if key in self.macros and active_macro and active_macro.running:
# for key-down events and running macros, don't do anything.
# This avoids spawning a second macro while the first one is
# not finished, especially since gamepad-triggers report a ton
# of events with a positive value.
logger.key_spam(key, 'macro already running')
return
"""starting new macros or injecting new keys"""
if key in key_to_code:
target_code = key_to_code[key]
# remember the key that triggered this
# (this combination or this single key)
Unreleased((EV_KEY, target_code), event_tuple, key)
if is_key_down(event.value):
# also enter this for unmapped keys, as they might end up
# triggering a combination, so they should be remembered in
# unreleased
if target_code == DISABLE_CODE:
logger.key_spam(key, 'disabled')
if key in self.macros:
macro = self.macros[key]
active_macros[type_code] = macro
Unreleased((None, None), event_tuple, key)
macro.press_key()
logger.key_spam(key, 'maps to macro %s', macro.code)
asyncio.ensure_future(macro.run())
return
logger.key_spam(key, 'maps to %s', target_code)
write(uinput, (EV_KEY, target_code, 1))
return
if key in self.key_to_code:
target_code = self.key_to_code[key]
# remember the key that triggered this
# (this combination or this single key)
Unreleased((EV_KEY, target_code), event_tuple, key)
if forward:
logger.key_spam(key, 'forwarding')
write(uinput, event_tuple)
else:
logger.key_spam(key, 'not forwarding')
if target_code == DISABLE_CODE:
logger.key_spam(key, 'disabled')
return
# unhandled events may still be important for triggering combinations
# later, so remember them as well.
Unreleased((event_tuple[:2]), event_tuple, None)
return
logger.key_spam(key, 'maps to %s', target_code)
write(self.uinput, (EV_KEY, target_code, 1))
return
if forward:
logger.key_spam((original_tuple,), 'forwarding')
write(self.uinput, original_tuple)
else:
logger.key_spam((event_tuple,), 'not forwarding')
# unhandled events may still be important for triggering
# combinations later, so remember them as well.
Unreleased((event_tuple[:2]), event_tuple, None)
return
logger.error('%s unhandled', key)
logger.error('%s unhandled', key)

@ -174,9 +174,12 @@ class _KeycodeReader:
# which breaks the current workflow.
return
if not utils.should_map_event_as_btn(device, event, custom_mapping):
if not utils.should_map_event_as_btn(event, custom_mapping):
return
max_abs = utils.get_max_abs(device)
event.value = utils.normalize_value(event, max_abs)
self._pipe[1].send(event)
def _read_worker(self):
@ -267,7 +270,6 @@ class _KeycodeReader:
# have to trigger anything, manage any macros and only
# reports key-down events. This function is called periodically
# by the window.
if self._pipe is None:
self.fail_counter += 1
if self.fail_counter % 10 == 0: # spam less
@ -293,11 +295,8 @@ class _KeycodeReader:
self._release(type_code)
continue
key_down_received = True
if self._unreleased.get(type_code) == event_tuple:
if event.type != EV_ABS: # spams a lot
logger.key_spam(event_tuple, 'duplicate key down')
logger.key_spam(event_tuple, 'duplicate key down')
self._debounce_start(event_tuple)
continue
@ -318,7 +317,10 @@ class _KeycodeReader:
previous_event.value
)
if prev_tuple[:2] in self._unreleased:
logger.key_spam(prev_tuple, 'ignoring previous event')
logger.key_spam(
event_tuple,
'ignoring previous event %s', prev_tuple
)
self._release(prev_tuple[:2])
# to keep track of combinations.
@ -326,6 +328,7 @@ class _KeycodeReader:
# event for a D-Pad axis might be any direction, hence this maps
# from release to input in order to remember it. Since all release
# events have value 0, the value is not used in the key.
key_down_received = True
logger.key_spam(event_tuple, 'down')
self._unreleased[type_code] = event_tuple
self._debounce_start(event_tuple)

@ -41,12 +41,13 @@ JOYSTICK = [
]
# a third of a quarter circle
# a third of a quarter circle, so that each quarter is divided in 3 areas:
# up, left and up-left. That makes up/down/left/right larger than the
# overlapping sections though, maybe it should be 8 equal areas though, idk
JOYSTICK_BUTTON_THRESHOLD = math.sin((math.pi / 2) / 3 * 1)
def sign(value):
"""Get the sign of the value, or 0 if 0."""
if value > 0:
return 1
@ -56,6 +57,23 @@ def sign(value):
return 0
def normalize_value(event, max_abs):
"""Fit the event value to one of 0, 1 or -1."""
if event.type == EV_ABS and event.code in JOYSTICK:
if max_abs is None:
logger.error(
'Got %s, but max_abs is %s',
(event.type, event.code, event.value), max_abs
)
return event.value
threshold = max_abs * JOYSTICK_BUTTON_THRESHOLD
triggered = abs(event.value) > threshold
return sign(event.value) if triggered else 0
return sign(event.value)
def is_wheel(event):
"""Check if this is a wheel event."""
return event.type == EV_REL and event.code in [REL_WHEEL, REL_HWHEEL]
@ -66,7 +84,7 @@ def will_report_key_up(event):
return not is_wheel(event)
def should_map_event_as_btn(device, event, mapping):
def should_map_event_as_btn(event, mapping):
"""Does this event describe a button.
If it does, this function will make sure its value is one of [-1, 0, 1],
@ -93,30 +111,12 @@ def should_map_event_as_btn(device, event, mapping):
l_purpose = mapping.get('gamepad.joystick.left_purpose')
r_purpose = mapping.get('gamepad.joystick.right_purpose')
max_abs = get_max_abs(device)
if max_abs is None:
logger.error(
'Got %s, but max_abs is %s',
(event.type, event.code, event.value), max_abs
)
return False
threshold = max_abs * JOYSTICK_BUTTON_THRESHOLD
triggered = abs(event.value) > threshold
if event.code in [ABS_X, ABS_Y] and l_purpose == BUTTONS:
event.value = sign(event.value) if triggered else 0
return True
if event.code in [ABS_RX, ABS_RY] and r_purpose == BUTTONS:
event.value = sign(event.value) if triggered else 0
return True
else:
# normalize event numbers to one of -1, 0, +1. Otherwise mapping
# trigger values that are between 1 and 255 is not possible,
# because they might skip the 1 when pressed fast enough.
event.value = sign(event.value)
return True
return False

@ -100,6 +100,10 @@ class Key:
def __str__(self):
return f'Key{str(self.keys)}'
def __repr__(self):
# used in the AssertionError output of tests
return self.__str__()
def __hash__(self):
if len(self.keys) == 1:
return hash(self.keys[0])

@ -32,6 +32,8 @@ SPAM = 5
start = time.time()
previous_key_spam = None
def spam(self, message, *args, **kwargs):
"""Log a more-verbose message than debug."""
@ -52,6 +54,9 @@ def key_spam(self, key, msg, *args):
"""
if not self.isEnabledFor(SPAM):
return
global previous_key_spam
msg = msg % args
str_key = str(key)
str_key = str_key.replace(',)', ')')
@ -59,6 +64,13 @@ def key_spam(self, key, msg, *args):
if len(spacing) == 1:
spacing = ''
msg = f'{str_key}{spacing} {msg}'
if msg == previous_key_spam:
# avoid some super spam from EV_ABS events
return
previous_key_spam = msg
self._log(SPAM, msg, args=None)

@ -17,7 +17,7 @@
<text x="22.0" y="14">pylint</text>
</g>
<g fill="#fff" text-anchor="middle" font-family="DejaVu Sans,Verdana,Geneva,sans-serif" font-size="11">
<text x="63.0" y="15" fill="#010101" fill-opacity=".3">9.73</text>
<text x="62.0" y="14">9.73</text>
<text x="63.0" y="15" fill="#010101" fill-opacity=".3">9.72</text>
<text x="62.0" y="14">9.72</text>
</g>
</svg>

Before

Width:  |  Height:  |  Size: 1.0 KiB

After

Width:  |  Height:  |  Size: 1.0 KiB

@ -28,7 +28,6 @@ from evdev.ecodes import EV_KEY, EV_ABS, ABS_HAT0X, KEY_A, \
from keymapper.config import config, BUTTONS
from keymapper.mapping import Mapping
from keymapper.dev import utils
from keymapper.key import Key
from tests.test import new_event, InputDevice, MAX_ABS
@ -51,12 +50,11 @@ class TestDevUtils(unittest.TestCase):
self.assertFalse(utils.is_wheel(new_event(EV_ABS, ABS_HAT0X, -1)))
def test_should_map_event_as_btn(self):
device = InputDevice('/dev/input/event30')
mapping = Mapping()
# the function name is so horribly long
def do(event):
return utils.should_map_event_as_btn(device, event, mapping)
return utils.should_map_event_as_btn(event, mapping)
"""D-Pad"""
@ -86,29 +84,31 @@ class TestDevUtils(unittest.TestCase):
self.assertFalse(do(new_event(EV_ABS, ecodes.ABS_RX, 1234)))
self.assertFalse(do(new_event(EV_ABS, ecodes.ABS_Y, -1)))
self.assertFalse(do(new_event(EV_ABS, ecodes.ABS_RY, -1)))
mapping.set('gamepad.joystick.left_purpose', BUTTONS)
mapping.set('gamepad.joystick.right_purpose', BUTTONS)
config.set('gamepad.joystick.left_purpose', BUTTONS)
self.assertTrue(do(new_event(EV_ABS, ecodes.ABS_Y, -1)))
self.assertTrue(do(new_event(EV_ABS, ecodes.ABS_RY, -1)))
def test_normalize_value(self):
def do(event):
return utils.normalize_value(event, MAX_ABS)
# the event.value should be modified for the left joystick
# to one of 0, -1 or 1
event = new_event(EV_ABS, ecodes.ABS_RX, MAX_ABS)
self.assertFalse(do(event))
self.assertEqual(event.value, MAX_ABS)
self.assertEqual(do(event), 1)
event = new_event(EV_ABS, ecodes.ABS_Y, -MAX_ABS)
self.assertTrue(do(event))
self.assertEqual(event.value, -1)
self.assertEqual(do(event), -1)
event = new_event(EV_ABS, ecodes.ABS_X, -MAX_ABS // 4)
self.assertTrue(do(event))
self.assertEqual(event.value, 0)
config.set('gamepad.joystick.right_purpose', BUTTONS)
self.assertEqual(do(event), 0)
event = new_event(EV_ABS, ecodes.ABS_RX, MAX_ABS)
self.assertTrue(do(event))
self.assertEqual(event.value, 1)
self.assertEqual(do(event), 1)
event = new_event(EV_ABS, ecodes.ABS_Y, MAX_ABS)
self.assertTrue(do(event))
self.assertEqual(event.value, 1)
self.assertEqual(do(event), 1)
event = new_event(EV_ABS, ecodes.ABS_X, MAX_ABS // 4)
self.assertTrue(do(event))
self.assertEqual(event.value, 0)
self.assertEqual(do(event), 0)
# if none, it just forwards the value
event = new_event(EV_ABS, ecodes.ABS_RX, MAX_ABS)
self.assertEqual(utils.normalize_value(event, None), MAX_ABS)

@ -25,7 +25,8 @@ import copy
import evdev
from evdev.ecodes import EV_REL, EV_KEY, EV_ABS, ABS_HAT0X, BTN_LEFT, KEY_A, \
REL_X, REL_Y, REL_WHEEL, REL_HWHEEL, BTN_A, ABS_X, ABS_Y
REL_X, REL_Y, REL_WHEEL, REL_HWHEEL, BTN_A, ABS_X, ABS_Y, BTN_NORTH,\
ABS_Z, ABS_RZ
from keymapper.dev.injector import is_numlock_on, set_numlock, \
ensure_numlock, Injector, is_in_capabilities, \
@ -396,6 +397,33 @@ class TestInjector(unittest.TestCase):
self.assertEqual(history.count((EV_KEY, 77, 1)), 2)
self.assertEqual(history.count((EV_KEY, 77, 0)), 2)
def test_gamepad_trigger(self):
# map one of the triggers to BTN_NORTH, while the other one
# should be forwarded unchanged
value = MAX_ABS // 2
pending_events['gamepad'] = [
new_event(EV_ABS, ABS_Z, value),
new_event(EV_ABS, ABS_RZ, value),
]
# ABS_Z -> 77
# ABS_RZ is not mapped
custom_mapping.change(Key((EV_ABS, ABS_Z, 1)), 'b')
system_mapping._set('b', 77)
self.injector = Injector('gamepad', custom_mapping)
self.injector.start_injecting()
# wait for the injector to start sending, at most 1s
uinput_write_history_pipe[0].poll(1)
time.sleep(0.2)
# convert the write history to some easier to manage list
history = read_write_history_pipe()
print(history)
self.assertEqual(history.count((EV_KEY, 77, 1)), 1)
self.assertEqual(history.count((EV_ABS, ABS_RZ, value)), 1)
def test_gamepad_to_mouse_event_producer(self):
custom_mapping.set('gamepad.joystick.left_purpose', MOUSE)
custom_mapping.set('gamepad.joystick.right_purpose', NONE)

@ -34,6 +34,7 @@ class TestKey(unittest.TestCase):
self.assertEqual(len(key_1), 2)
self.assertEqual(key_1[0], (1, 3, 1))
self.assertEqual(key_1[1], (1, 5, 1))
self.assertEqual(hash(key_1), hash(((1, 3, 1), (1, 5, 1))))
key_2 = Key((1, 3, 1))
self.assertEqual(str(key_2), 'Key((1, 3, 1),)')

@ -24,17 +24,17 @@ import asyncio
import time
from evdev.ecodes import EV_KEY, EV_ABS, KEY_A, BTN_TL, \
ABS_HAT0X, ABS_HAT0Y, ABS_HAT1X, ABS_HAT1Y
ABS_HAT0X, ABS_HAT0Y, ABS_HAT1X, ABS_HAT1Y, ABS_Y
from keymapper.dev.keycode_mapper import active_macros, handle_keycode,\
from keymapper.dev.keycode_mapper import active_macros, KeycodeMapper, \
unreleased, subsets
from keymapper.state import system_mapping
from keymapper.dev.macros import parse
from keymapper.config import config
from keymapper.config import config, BUTTONS
from keymapper.mapping import Mapping, DISABLE_CODE
from tests.test import new_event, UInput, uinput_write_history, \
cleanup
cleanup, InputDevice, MAX_ABS
def wait(func, timeout=1.0):
@ -74,6 +74,7 @@ def calculate_event_number(holdtime, before, after):
class TestKeycodeMapper(unittest.TestCase):
def setUp(self):
self.mapping = Mapping()
self.source = InputDevice('/dev/input/event11')
def tearDown(self):
# make sure all macros are stopped by tests
@ -117,9 +118,15 @@ class TestKeycodeMapper(unittest.TestCase):
}
uinput = UInput()
keycode_mapper = KeycodeMapper(
self.source, self.mapping, uinput,
_key_to_code, {}
)
# a bunch of d-pad key down events at once
handle_keycode(_key_to_code, {}, new_event(*ev_1), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_4), uinput)
keycode_mapper.handle_keycode(new_event(*ev_1))
keycode_mapper.handle_keycode(new_event(*ev_4))
self.assertEqual(len(unreleased), 2)
self.assertEqual(unreleased.get(ev_1[:2]).target_type_code, (EV_KEY, _key_to_code[(ev_1,)]))
@ -131,13 +138,13 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertEqual(unreleased.get(ev_4[:2]).key, (ev_4,))
# release all of them
handle_keycode(_key_to_code, {}, new_event(*ev_3), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_6), uinput)
keycode_mapper.handle_keycode(new_event(*ev_3))
keycode_mapper.handle_keycode(new_event(*ev_6))
self.assertEqual(len(unreleased), 0)
# repeat with other values
handle_keycode(_key_to_code, {}, new_event(*ev_2), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_5), uinput)
keycode_mapper.handle_keycode(new_event(*ev_2))
keycode_mapper.handle_keycode(new_event(*ev_5))
self.assertEqual(len(unreleased), 2)
self.assertEqual(unreleased.get(ev_2[:2]).target_type_code, (EV_KEY, _key_to_code[(ev_2,)]))
self.assertEqual(unreleased.get(ev_2[:2]).input_event_tuple, ev_2)
@ -145,8 +152,8 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertEqual(unreleased.get(ev_5[:2]).input_event_tuple, ev_5)
# release all of them again
handle_keycode(_key_to_code, {}, new_event(*ev_3), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_6), uinput)
keycode_mapper.handle_keycode(new_event(*ev_3))
keycode_mapper.handle_keycode(new_event(*ev_6))
self.assertEqual(len(unreleased), 0)
self.assertEqual(len(uinput_write_history), 8)
@ -168,16 +175,55 @@ class TestKeycodeMapper(unittest.TestCase):
up = (EV_KEY, 91, 0)
uinput = UInput()
handle_keycode({}, {}, new_event(*down), uinput, False)
keycode_mapper = KeycodeMapper(
self.source, self.mapping, uinput,
{}, {}
)
keycode_mapper.handle_keycode(new_event(*down), False)
self.assertEqual(unreleased[(EV_KEY, 91)].input_event_tuple, down)
self.assertEqual(unreleased[(EV_KEY, 91)].target_type_code, down[:2])
self.assertEqual(len(unreleased), 1)
self.assertEqual(uinput.write_count, 0)
handle_keycode({}, {}, new_event(*up), uinput, False)
keycode_mapper.handle_keycode(new_event(*up), False)
self.assertEqual(len(unreleased), 0)
self.assertEqual(uinput.write_count, 0)
def test_release_joystick_button(self):
# with the left joystick mapped as button, it will release the mapped
# key when it goes back to close to its resting position
ev_1 = (3, 0, MAX_ABS // 10) # release
ev_3 = (3, 0, -MAX_ABS) # press
uinput = UInput()
_key_to_code = {
((3, 0, -1),): 73
}
self.mapping.set('gamepad.joystick.left_purpose', BUTTONS)
# something with gamepad capabilities
source = InputDevice('/dev/input/event30')
keycode_mapper = KeycodeMapper(
source, self.mapping, uinput,
_key_to_code, {}
)
keycode_mapper.handle_keycode(new_event(*ev_3))
keycode_mapper.handle_keycode(new_event(*ev_1))
# array of 3-tuples
history = [a.t for a in uinput_write_history]
self.assertIn((EV_KEY, 73, 1), history)
self.assertEqual(history.count((EV_KEY, 73, 1)), 1)
self.assertIn((EV_KEY, 73, 0), history)
self.assertEqual(history.count((EV_KEY, 73, 0)), 1)
def test_dont_filter_unmapped(self):
# if an event is not used at all, it should be written into
# unmapped but not furthermore modified. For example wheel events
@ -188,15 +234,20 @@ class TestKeycodeMapper(unittest.TestCase):
up = (EV_KEY, 91, 0)
uinput = UInput()
keycode_mapper = KeycodeMapper(
self.source, self.mapping, uinput,
{}, {}
)
for _ in range(10):
handle_keycode({}, {}, new_event(*down), uinput)
keycode_mapper.handle_keycode(new_event(*down))
self.assertEqual(unreleased[(EV_KEY, 91)].input_event_tuple, down)
self.assertEqual(unreleased[(EV_KEY, 91)].target_type_code, down[:2])
self.assertEqual(len(unreleased), 1)
self.assertEqual(uinput.write_count, 10)
handle_keycode({}, {}, new_event(*up), uinput)
keycode_mapper.handle_keycode(new_event(*up))
self.assertEqual(len(unreleased), 0)
self.assertEqual(uinput.write_count, 11)
@ -215,9 +266,14 @@ class TestKeycodeMapper(unittest.TestCase):
(down_1, down_2): 71
}
handle_keycode(key_to_code, {}, new_event(*down_1), uinput)
keycode_mapper = KeycodeMapper(
self.source, self.mapping, uinput,
key_to_code, {}
)
keycode_mapper.handle_keycode(new_event(*down_1))
for _ in range(10):
handle_keycode(key_to_code, {}, new_event(*down_2), uinput)
keycode_mapper.handle_keycode(new_event(*down_2))
# all duplicate down events should have been ignored
self.assertEqual(len(unreleased), 2)
@ -225,8 +281,8 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertEqual(uinput_write_history[0].t, down_1)
self.assertEqual(uinput_write_history[1].t, (EV_KEY, output, 1))
handle_keycode({}, {}, new_event(*up_1), uinput)
handle_keycode({}, {}, new_event(*up_2), uinput)
keycode_mapper.handle_keycode(new_event(*up_1))
keycode_mapper.handle_keycode(new_event(*up_2))
self.assertEqual(len(unreleased), 0)
self.assertEqual(uinput.write_count, 4)
self.assertEqual(uinput_write_history[2].t, up_1)
@ -245,9 +301,15 @@ class TestKeycodeMapper(unittest.TestCase):
}
uinput = UInput()
keycode_mapper = KeycodeMapper(
self.source, self.mapping, uinput,
_key_to_code, {}
)
# a bunch of d-pad key down events at once
handle_keycode(_key_to_code, {}, new_event(*ev_1), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_2), uinput)
keycode_mapper.handle_keycode(new_event(*ev_1))
keycode_mapper.handle_keycode(new_event(*ev_2))
# (what_will_be_released, what_caused_the_key_down)
self.assertEqual(unreleased.get(ev_1[:2]).target_type_code, (EV_ABS, ABS_HAT0X))
self.assertEqual(unreleased.get(ev_1[:2]).input_event_tuple, ev_1)
@ -261,8 +323,8 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertEqual(uinput_write_history[1].t, (EV_KEY, 51, 1))
# release all of them
handle_keycode(_key_to_code, {}, new_event(*ev_3), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_4), uinput)
keycode_mapper.handle_keycode(new_event(*ev_3))
keycode_mapper.handle_keycode(new_event(*ev_4))
self.assertEqual(len(unreleased), 0)
self.assertEqual(len(uinput_write_history), 4)
@ -276,9 +338,15 @@ class TestKeycodeMapper(unittest.TestCase):
}
uinput = UInput()
handle_keycode(_key_to_code, {}, new_event(EV_KEY, 1, 1), uinput)
handle_keycode(_key_to_code, {}, new_event(EV_KEY, 3, 1), uinput)
handle_keycode(_key_to_code, {}, new_event(EV_KEY, 2, 1), uinput)
keycode_mapper = KeycodeMapper(
self.source, self.mapping, uinput,
_key_to_code, {}
)
keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 1))
keycode_mapper.handle_keycode(new_event(EV_KEY, 3, 1))
keycode_mapper.handle_keycode(new_event(EV_KEY, 2, 1))
self.assertEqual(len(uinput_write_history), 3)
self.assertEqual(uinput_write_history[0].t, (EV_KEY, 101, 1))
@ -292,8 +360,14 @@ class TestKeycodeMapper(unittest.TestCase):
}
uinput = UInput()
handle_keycode(_key_to_code, {}, new_event(*combination[0]), uinput)
handle_keycode(_key_to_code, {}, new_event(*combination[1]), uinput)
keycode_mapper = KeycodeMapper(
self.source, self.mapping, uinput,
_key_to_code, {}
)
keycode_mapper.handle_keycode(new_event(*combination[0]))
keycode_mapper.handle_keycode(new_event(*combination[1]))
self.assertEqual(len(uinput_write_history), 2)
# the first event is written and then the triggered combination
@ -301,8 +375,8 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertEqual(uinput_write_history[1].t, (EV_KEY, 101, 1))
# release them
handle_keycode(_key_to_code, {}, new_event(*combination[0][:2], 0), uinput)
handle_keycode(_key_to_code, {}, new_event(*combination[1][:2], 0), uinput)
keycode_mapper.handle_keycode(new_event(*combination[0][:2], 0))
keycode_mapper.handle_keycode(new_event(*combination[1][:2], 0))
# the first key writes its release event. The second key is hidden
# behind the executed combination. The result of the combination is
# also released, because it acts like a key.
@ -312,8 +386,8 @@ class TestKeycodeMapper(unittest.TestCase):
# press them in the wrong order (the wrong key at the end, the order
# of all other keys won't matter). no combination should be triggered
handle_keycode(_key_to_code, {}, new_event(*combination[1]), uinput)
handle_keycode(_key_to_code, {}, new_event(*combination[0]), uinput)
keycode_mapper.handle_keycode(new_event(*combination[1]))
keycode_mapper.handle_keycode(new_event(*combination[0]))
self.assertEqual(len(uinput_write_history), 6)
self.assertEqual(uinput_write_history[4].t, (EV_KEY, 2, 1))
self.assertEqual(uinput_write_history[5].t, (EV_KEY, 1, 1))
@ -321,7 +395,7 @@ class TestKeycodeMapper(unittest.TestCase):
def test_combination_keycode_2(self):
combination_1 = (
(EV_KEY, 1, 1),
(EV_KEY, 2, 1),
(EV_ABS, ABS_Y, -MAX_ABS),
(EV_KEY, 3, 1),
(EV_KEY, 4, 1)
)
@ -337,30 +411,44 @@ class TestKeycodeMapper(unittest.TestCase):
up_5 = (EV_KEY, 5, 0)
up_4 = (EV_KEY, 4, 0)
def sign_value(key):
return key[0], key[1], key[2] / abs(key[2])
_key_to_code = {
combination_1: 101,
# key_to_code is supposed to only contain normalized values
tuple([sign_value(a) for a in combination_1]): 101,
combination_2: 102,
(down_5,): 103
}
uinput = UInput()
# 10 and 11: more key-down events than needed
handle_keycode(_key_to_code, {}, new_event(EV_KEY, 10, 1), uinput)
handle_keycode(_key_to_code, {}, new_event(*combination_1[0]), uinput)
handle_keycode(_key_to_code, {}, new_event(*combination_1[1]), uinput)
handle_keycode(_key_to_code, {}, new_event(*combination_1[2]), uinput)
handle_keycode(_key_to_code, {}, new_event(EV_KEY, 11, 1), uinput)
handle_keycode(_key_to_code, {}, new_event(*combination_1[3]), uinput)
source = InputDevice('/dev/input/event30')
keycode_mapper = KeycodeMapper(
source, self.mapping, uinput,
_key_to_code, {}
)
# 10 and 11: insert some more arbitrary key-down events,
# they should not break the combinations
keycode_mapper.handle_keycode(new_event(EV_KEY, 10, 1))
keycode_mapper.handle_keycode(new_event(*combination_1[0]))
keycode_mapper.handle_keycode(new_event(*combination_1[1]))
keycode_mapper.handle_keycode(new_event(*combination_1[2]))
keycode_mapper.handle_keycode(new_event(EV_KEY, 11, 1))
keycode_mapper.handle_keycode(new_event(*combination_1[3]))
self.assertEqual(len(uinput_write_history), 6)
# the first event is written and then the triggered combination
self.assertEqual(uinput_write_history[1].t, (EV_KEY, 1, 1))
self.assertEqual(uinput_write_history[2].t, (EV_KEY, 2, 1))
self.assertEqual(uinput_write_history[3].t, (EV_KEY, 3, 1))
# the first events are written and then the triggered combination,
# while the triggering event is the only one that is omitted
self.assertEqual(uinput_write_history[1].t, combination_1[0])
self.assertEqual(uinput_write_history[2].t, combination_1[1])
self.assertEqual(uinput_write_history[3].t, combination_1[2])
self.assertEqual(uinput_write_history[5].t, (EV_KEY, 101, 1))
# while the combination is down, another unrelated key can be used
handle_keycode(_key_to_code, {}, new_event(*down_5), uinput)
keycode_mapper.handle_keycode(new_event(*down_5))
# the keycode_mapper searches for subsets of the current held-down
# keys to activate combinations, down_5 should not trigger them
# again.
@ -369,8 +457,8 @@ class TestKeycodeMapper(unittest.TestCase):
# release the combination by releasing the last key, and release
# the unrelated key
handle_keycode(_key_to_code, {}, new_event(*up_4), uinput)
handle_keycode(_key_to_code, {}, new_event(*up_5), uinput)
keycode_mapper.handle_keycode(new_event(*up_4))
keycode_mapper.handle_keycode(new_event(*up_5))
self.assertEqual(len(uinput_write_history), 9)
self.assertEqual(uinput_write_history[7].t, (EV_KEY, 101, 0))
@ -393,8 +481,13 @@ class TestKeycodeMapper(unittest.TestCase):
macro_mapping[((EV_KEY, 1, 1),)].set_handler(lambda *args: history.append(args))
macro_mapping[((EV_KEY, 2, 1),)].set_handler(lambda *args: history.append(args))
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 2, 1), None)
keycode_mapper = KeycodeMapper(
self.source, self.mapping, None,
{}, macro_mapping
)
keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 1))
keycode_mapper.handle_keycode(new_event(EV_KEY, 2, 1))
loop = asyncio.get_event_loop()
@ -413,8 +506,8 @@ class TestKeycodeMapper(unittest.TestCase):
# releasing stuff
self.assertIn((EV_KEY, 1), unreleased)
self.assertIn((EV_KEY, 2), unreleased)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 0), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 2, 0), None)
keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 0))
keycode_mapper.handle_keycode(new_event(EV_KEY, 2, 0))
self.assertNotIn((EV_KEY, 1), unreleased)
self.assertNotIn((EV_KEY, 2), unreleased)
loop.run_until_complete(asyncio.sleep(0.1))
@ -440,9 +533,14 @@ class TestKeycodeMapper(unittest.TestCase):
macro_mapping[((EV_KEY, 1, 1),)].set_handler(handler)
keycode_mapper = KeycodeMapper(
self.source, self.mapping, None,
{}, macro_mapping
)
"""start macro"""
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None)
keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 1))
loop = asyncio.get_event_loop()
@ -456,7 +554,7 @@ class TestKeycodeMapper(unittest.TestCase):
"""stop macro"""
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 0), None)
keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 0))
loop.run_until_complete(asyncio.sleep(keystroke_sleep * 10 / 1000))
@ -510,9 +608,14 @@ class TestKeycodeMapper(unittest.TestCase):
macro_mapping[((EV_KEY, 2, 1),)].set_handler(handler)
macro_mapping[((EV_KEY, 3, 1),)].set_handler(handler)
keycode_mapper = KeycodeMapper(
self.source, self.mapping, None,
{}, macro_mapping
)
"""start macro 2"""
handle_keycode({}, macro_mapping, new_event(EV_KEY, 2, 1), None)
keycode_mapper.handle_keycode(new_event(EV_KEY, 2, 1))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.sleep(0.1))
@ -522,8 +625,8 @@ class TestKeycodeMapper(unittest.TestCase):
# spam garbage events
for _ in range(5):
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 3, 1), None)
keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 1))
keycode_mapper.handle_keycode(new_event(EV_KEY, 3, 1))
loop.run_until_complete(asyncio.sleep(0.05))
self.assertTrue(active_macros[(EV_KEY, 1)].is_holding())
self.assertTrue(active_macros[(EV_KEY, 1)].running)
@ -541,7 +644,7 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertNotIn((code_d, 0), history)
# stop macro 2
handle_keycode({}, macro_mapping, new_event(EV_KEY, 2, 0), None)
keycode_mapper.handle_keycode(new_event(EV_KEY, 2, 0))
loop.run_until_complete(asyncio.sleep(0.1))
# it stopped and didn't restart, so the count stays at 1
@ -562,7 +665,7 @@ class TestKeycodeMapper(unittest.TestCase):
history = []
handle_keycode({}, macro_mapping, new_event(EV_KEY, 2, 1), None)
keycode_mapper.handle_keycode(new_event(EV_KEY, 2, 1))
loop.run_until_complete(asyncio.sleep(0.1))
self.assertEqual(history.count((code_c, 1)), 1)
self.assertEqual(history.count((code_c, 0)), 1)
@ -570,8 +673,8 @@ class TestKeycodeMapper(unittest.TestCase):
# spam garbage events again, this time key-up events on all other
# macros
for _ in range(5):
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 0), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 3, 0), None)
keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 0))
keycode_mapper.handle_keycode(new_event(EV_KEY, 3, 0))
loop.run_until_complete(asyncio.sleep(0.05))
self.assertFalse(active_macros[(EV_KEY, 1)].is_holding())
self.assertFalse(active_macros[(EV_KEY, 1)].running)
@ -581,7 +684,7 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertFalse(active_macros[(EV_KEY, 3)].running)
# stop macro 2
handle_keycode({}, macro_mapping, new_event(EV_KEY, 2, 0), None)
keycode_mapper.handle_keycode(new_event(EV_KEY, 2, 0))
loop.run_until_complete(asyncio.sleep(0.1))
# was started only once
self.assertEqual(history.count((code_c, 1)), 1)
@ -591,8 +694,8 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertEqual(history.count((code_d, 0)), 1)
# stop all macros
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 0), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 3, 0), None)
keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 0))
keycode_mapper.handle_keycode(new_event(EV_KEY, 3, 0))
loop.run_until_complete(asyncio.sleep(0.1))
# it's stopped and won't write stuff anymore
@ -629,14 +732,19 @@ class TestKeycodeMapper(unittest.TestCase):
macro_mapping[((EV_KEY, 1, 1),)].set_handler(handler)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None)
keycode_mapper = KeycodeMapper(
self.source, self.mapping, None,
{}, macro_mapping
)
keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 1))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.sleep(0.1))
for _ in range(5):
self.assertTrue(active_macros[(EV_KEY, 1)].is_holding())
self.assertTrue(active_macros[(EV_KEY, 1)].running)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None)
keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 1))
loop.run_until_complete(asyncio.sleep(0.05))
# duplicate key down events don't do anything
@ -646,7 +754,7 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertEqual(history.count((code_c, 0)), 0)
# stop
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 0), None)
keycode_mapper.handle_keycode(new_event(EV_KEY, 1, 0))
loop.run_until_complete(asyncio.sleep(0.1))
self.assertEqual(history.count((code_a, 1)), 1)
self.assertEqual(history.count((code_a, 0)), 1)
@ -706,19 +814,29 @@ class TestKeycodeMapper(unittest.TestCase):
macros_uinput = UInput()
keys_uinput = UInput()
keycode_mapper = KeycodeMapper(
self.source, self.mapping, macros_uinput,
{}, macro_mapping
)
# key up won't do anything
handle_keycode({}, macro_mapping, new_event(*up_0), macros_uinput)
handle_keycode({}, macro_mapping, new_event(*up_1), macros_uinput)
handle_keycode({}, macro_mapping, new_event(*up_2), macros_uinput)
keycode_mapper.handle_keycode(new_event(*up_0))
keycode_mapper.handle_keycode(new_event(*up_1))
keycode_mapper.handle_keycode(new_event(*up_2))
loop.run_until_complete(asyncio.sleep(0.1))
self.assertEqual(len(active_macros), 0)
"""start macros"""
handle_keycode({}, macro_mapping, new_event(*down_0), keys_uinput)
keycode_mapper = KeycodeMapper(
self.source, self.mapping, keys_uinput,
{}, macro_mapping
)
keycode_mapper.handle_keycode(new_event(*down_0))
self.assertEqual(keys_uinput.write_count, 1)
handle_keycode({}, macro_mapping, new_event(*down_1), keys_uinput)
handle_keycode({}, macro_mapping, new_event(*down_2), keys_uinput)
keycode_mapper.handle_keycode(new_event(*down_1))
keycode_mapper.handle_keycode(new_event(*down_2))
self.assertEqual(keys_uinput.write_count, 1)
# let the mainloop run for some time so that the macro does its stuff
@ -738,9 +856,14 @@ class TestKeycodeMapper(unittest.TestCase):
"""stop macros"""
keycode_mapper = KeycodeMapper(
self.source, self.mapping, None,
{}, macro_mapping
)
# releasing the last key of a combination releases the whole macro
handle_keycode({}, macro_mapping, new_event(*up_1), None)
handle_keycode({}, macro_mapping, new_event(*up_2), None)
keycode_mapper.handle_keycode(new_event(*up_1))
keycode_mapper.handle_keycode(new_event(*up_2))
self.assertIn(down_0[:2], unreleased)
self.assertNotIn(down_1[:2], unreleased)
@ -812,11 +935,16 @@ class TestKeycodeMapper(unittest.TestCase):
macro_mapping[(right,)].set_handler(handler)
macro_mapping[(left,)].set_handler(handler)
handle_keycode({}, macro_mapping, new_event(*right), None)
keycode_mapper = KeycodeMapper(
self.source, self.mapping, None,
{}, macro_mapping
)
keycode_mapper.handle_keycode(new_event(*right))
self.assertIn((EV_ABS, ABS_HAT0X), unreleased)
handle_keycode({}, macro_mapping, new_event(*release), None)
keycode_mapper.handle_keycode(new_event(*release))
self.assertNotIn((EV_ABS, ABS_HAT0X), unreleased)
handle_keycode({}, macro_mapping, new_event(*left), None)
keycode_mapper.handle_keycode(new_event(*left))
self.assertIn((EV_ABS, ABS_HAT0X), unreleased)
loop = asyncio.get_event_loop()
@ -840,13 +968,18 @@ class TestKeycodeMapper(unittest.TestCase):
uinput = UInput()
keycode_mapper = KeycodeMapper(
self.source, self.mapping, uinput,
_key_to_code, {}
)
"""positive"""
for _ in range(1, 20):
handle_keycode(_key_to_code, {}, new_event(*trigger, 1), uinput)
keycode_mapper.handle_keycode(new_event(*trigger, 1))
self.assertIn(trigger, unreleased)
handle_keycode(_key_to_code, {}, new_event(*trigger, 0), uinput)
keycode_mapper.handle_keycode(new_event(*trigger, 0))
self.assertNotIn(trigger, unreleased)
self.assertEqual(len(uinput_write_history), 2)
@ -854,10 +987,10 @@ class TestKeycodeMapper(unittest.TestCase):
"""negative"""
for _ in range(1, 20):
handle_keycode(_key_to_code, {}, new_event(*trigger, -1), uinput)
keycode_mapper.handle_keycode(new_event(*trigger, -1))
self.assertIn(trigger, unreleased)
handle_keycode(_key_to_code, {}, new_event(*trigger, 0), uinput)
keycode_mapper.handle_keycode(new_event(*trigger, 0))
self.assertNotIn(trigger, unreleased)
self.assertEqual(len(uinput_write_history), 4)
@ -880,13 +1013,19 @@ class TestKeycodeMapper(unittest.TestCase):
}
uinput = UInput()
handle_keycode(_key_to_code, {}, new_event(*ev_1), uinput)
keycode_mapper = KeycodeMapper(
self.source, self.mapping, uinput,
_key_to_code, {}
)
keycode_mapper.handle_keycode(new_event(*ev_1))
for _ in range(10):
handle_keycode(_key_to_code, {}, new_event(*ev_2), uinput)
keycode_mapper.handle_keycode(new_event(*ev_2))
self.assertIn(key, unreleased)
handle_keycode(_key_to_code, {}, new_event(*ev_3), uinput)
keycode_mapper.handle_keycode(new_event(*ev_3))
self.assertNotIn(key, unreleased)
self.assertEqual(len(uinput_write_history), 2)
@ -915,16 +1054,21 @@ class TestKeycodeMapper(unittest.TestCase):
uinput = UInput()
keycode_mapper = KeycodeMapper(
self.source, self.mapping, uinput,
_key_to_code, {}
)
"""single keys"""
# down
handle_keycode(_key_to_code, {}, new_event(*ev_1), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_3), uinput)
keycode_mapper.handle_keycode(new_event(*ev_1))
keycode_mapper.handle_keycode(new_event(*ev_3))
self.assertIn(ev_1[:2], unreleased)
self.assertIn(ev_3[:2], unreleased)
# up
handle_keycode(_key_to_code, {}, new_event(*ev_2), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_4), uinput)
keycode_mapper.handle_keycode(new_event(*ev_2))
keycode_mapper.handle_keycode(new_event(*ev_4))
self.assertNotIn(ev_1[:2], unreleased)
self.assertNotIn(ev_3[:2], unreleased)
@ -935,8 +1079,8 @@ class TestKeycodeMapper(unittest.TestCase):
"""a combination that ends in a disabled key"""
# ev_5 should be forwarded and the combination triggered
handle_keycode(_key_to_code, {}, new_event(*combi_1[0]), uinput)
handle_keycode(_key_to_code, {}, new_event(*combi_1[1]), uinput)
keycode_mapper.handle_keycode(new_event(*combi_1[0]))
keycode_mapper.handle_keycode(new_event(*combi_1[1]))
self.assertEqual(len(uinput_write_history), 4)
self.assertEqual(uinput_write_history[2].t, (EV_KEY, KEY_A, 1))
self.assertEqual(uinput_write_history[3].t, (EV_KEY, 62, 1))
@ -950,14 +1094,14 @@ class TestKeycodeMapper(unittest.TestCase):
# release the last key of the combi first, it should
# release what the combination maps to
event = new_event(combi_1[1][0], combi_1[1][1], 0)
handle_keycode(_key_to_code, {}, event, uinput)
keycode_mapper.handle_keycode(event)
self.assertEqual(len(uinput_write_history), 5)
self.assertEqual(uinput_write_history[-1].t, (EV_KEY, 62, 0))
self.assertIn(combi_1[0][:2], unreleased)
self.assertNotIn(combi_1[1][:2], unreleased)
event = new_event(combi_1[0][0], combi_1[0][1], 0)
handle_keycode(_key_to_code, {}, event, uinput)
keycode_mapper.handle_keycode(event)
self.assertEqual(len(uinput_write_history), 6)
self.assertEqual(uinput_write_history[-1].t, (EV_KEY, KEY_A, 0))
self.assertNotIn(combi_1[0][:2], unreleased)
@ -966,22 +1110,22 @@ class TestKeycodeMapper(unittest.TestCase):
"""a combination that starts with a disabled key"""
# only the combination should get triggered
handle_keycode(_key_to_code, {}, new_event(*combi_2[0]), uinput)
handle_keycode(_key_to_code, {}, new_event(*combi_2[1]), uinput)
keycode_mapper.handle_keycode(new_event(*combi_2[0]))
keycode_mapper.handle_keycode(new_event(*combi_2[1]))
self.assertEqual(len(uinput_write_history), 7)
self.assertEqual(uinput_write_history[-1].t, (EV_KEY, 63, 1))
# release the last key of the combi first, it should
# release what the combination maps to
event = new_event(combi_2[1][0], combi_2[1][1], 0)
handle_keycode(_key_to_code, {}, event, uinput)
keycode_mapper.handle_keycode(event)
self.assertEqual(len(uinput_write_history), 8)
self.assertEqual(uinput_write_history[-1].t, (EV_KEY, 63, 0))
# the first key of combi_2 is disabled, so it won't write another
# key-up event
event = new_event(combi_2[0][0], combi_2[0][1], 0)
handle_keycode(_key_to_code, {}, event, uinput)
keycode_mapper.handle_keycode(event)
self.assertEqual(len(uinput_write_history), 8)
def test_combination_keycode_macro_mix(self):
@ -1007,8 +1151,13 @@ class TestKeycodeMapper(unittest.TestCase):
loop = asyncio.get_event_loop()
keycode_mapper = KeycodeMapper(
self.source, self.mapping, uinput,
_key_to_code, macro_mapping
)
# macro starts
handle_keycode(_key_to_code, macro_mapping, new_event(*down_1), uinput)
keycode_mapper.handle_keycode(new_event(*down_1))
loop.run_until_complete(asyncio.sleep(0.05))
self.assertEqual(len(uinput_write_history), 0)
self.assertGreater(len(macro_history), 1)
@ -1016,7 +1165,7 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertIn((92, 1), macro_history)
# combination triggered
handle_keycode(_key_to_code, macro_mapping, new_event(*down_2), uinput)
keycode_mapper.handle_keycode(new_event(*down_2))
self.assertIn(down_1[:2], unreleased)
self.assertIn(down_2[:2], unreleased)
self.assertEqual(uinput_write_history[0].t, (EV_KEY, 91, 1))
@ -1028,7 +1177,7 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertGreater(len_b, len_a)
# release
handle_keycode(_key_to_code, macro_mapping, new_event(*up_1), uinput)
keycode_mapper.handle_keycode(new_event(*up_1))
self.assertNotIn(down_1[:2], unreleased)
self.assertIn(down_2[:2], unreleased)
loop.run_until_complete(asyncio.sleep(0.05))
@ -1038,7 +1187,7 @@ class TestKeycodeMapper(unittest.TestCase):
# not running anymore
self.assertEqual(len_c, len_d)
handle_keycode(_key_to_code, macro_mapping, new_event(*up_2), uinput)
keycode_mapper.handle_keycode(new_event(*up_2))
self.assertEqual(uinput_write_history[1].t, (EV_KEY, 91, 0))
self.assertEqual(len(uinput_write_history), 2)
self.assertNotIn(down_1[:2], unreleased)
@ -1068,24 +1217,29 @@ class TestKeycodeMapper(unittest.TestCase):
k2c = {combination: 30}
uinput = UInput()
keycode_mapper = KeycodeMapper(
self.source, self.mapping,
uinput, k2c, {}
)
handle_keycode(k2c, {}, new_event(*btn_down), uinput)
keycode_mapper.handle_keycode(new_event(*btn_down))
# "forwarding"
self.assertEqual(uinput_write_history[0].t, btn_down)
handle_keycode(k2c, {}, new_event(*scroll), uinput)
keycode_mapper.handle_keycode(new_event(*scroll))
# "maps to 30"
self.assertEqual(uinput_write_history[1].t, (1, 30, 1))
for _ in range(5):
# keep scrolling
# "duplicate key down"
handle_keycode(k2c, {}, new_event(*scroll), uinput)
keycode_mapper.handle_keycode(new_event(*scroll))
# nothing new since all of them were duplicate key downs
self.assertEqual(len(uinput_write_history), 2)
handle_keycode(k2c, {}, new_event(*btn_up), uinput)
keycode_mapper.handle_keycode(new_event(*btn_up))
# "forwarding release"
self.assertEqual(uinput_write_history[2].t, btn_up)
@ -1093,14 +1247,14 @@ class TestKeycodeMapper(unittest.TestCase):
# it should be ignored as duplicate key-down
self.assertEqual(len(uinput_write_history), 3)
# "forwarding" (should be "duplicate key down")
handle_keycode(k2c, {}, new_event(*scroll), uinput)
keycode_mapper.handle_keycode(new_event(*scroll))
self.assertEqual(len(uinput_write_history), 3)
# the failure to release the mapped key
# forward=False is what the debouncer uses, because a
# "scroll release" doesn't actually exist so it is not actually
# written if it doesn't release any mapping
handle_keycode(k2c, {}, new_event(*scroll_up), uinput, forward=False)
keycode_mapper.handle_keycode(new_event(*scroll_up), forward=False)
# 30 should be released
self.assertEqual(uinput_write_history[3].t, (1, 30, 0))

@ -25,7 +25,7 @@ import multiprocessing
from evdev.ecodes import EV_KEY, EV_ABS, ABS_HAT0X, ABS_HAT0Y, KEY_COMMA, \
BTN_LEFT, BTN_TOOL_DOUBLETAP, ABS_Z, ABS_Y, ABS_MISC, KEY_A, \
EV_REL, REL_WHEEL, REL_X
EV_REL, REL_WHEEL, REL_X, ABS_X, ABS_RZ
from keymapper.dev.reader import keycode_reader, will_report_up, \
event_unix_time
@ -253,7 +253,10 @@ class TestReader(unittest.TestCase):
# if their purpose is "buttons"
custom_mapping.set('gamepad.joystick.left_purpose', BUTTONS)
pending_events['gamepad'] = [
new_event(EV_ABS, ABS_Y, MAX_ABS)
new_event(EV_ABS, ABS_Y, MAX_ABS),
# the value of that one is interpreted as release, because
# it is too small
new_event(EV_ABS, ABS_X, MAX_ABS // 10)
]
keycode_reader.start_reading('gamepad')
wait(keycode_reader._pipe[0].poll, 0.5)
@ -271,6 +274,36 @@ class TestReader(unittest.TestCase):
self.assertEqual(keycode_reader.read(), None)
self.assertEqual(len(keycode_reader._unreleased), 0)
def test_combine_triggers(self):
pipe = multiprocessing.Pipe()
keycode_reader._pipe = pipe
i = 0
def next_timestamp():
nonlocal i
i += 1
return 100 * i
# based on an observed bug
pipe[1].send(new_event(3, 1, 0, next_timestamp()))
pipe[1].send(new_event(3, 0, 0, next_timestamp()))
pipe[1].send(new_event(3, 2, 1, next_timestamp()))
self.assertEqual(keycode_reader.read(), (EV_ABS, ABS_Z, 1))
pipe[1].send(new_event(3, 0, 0, next_timestamp()))
pipe[1].send(new_event(3, 5, 1, next_timestamp()))
self.assertEqual(keycode_reader.read(), ((EV_ABS, ABS_Z, 1), (EV_ABS, ABS_RZ, 1)))
pipe[1].send(new_event(3, 5, 0, next_timestamp()))
pipe[1].send(new_event(3, 0, 0, next_timestamp()))
pipe[1].send(new_event(3, 1, 0, next_timestamp()))
self.assertEqual(keycode_reader.read(), None)
pipe[1].send(new_event(3, 2, 1, next_timestamp()))
pipe[1].send(new_event(3, 1, 0, next_timestamp()))
pipe[1].send(new_event(3, 0, 0, next_timestamp()))
# due to not properly handling the duplicate down event it cleared
# the combination and returned it. Instead it should report None
# and by doing that keep the previous combination.
self.assertEqual(keycode_reader.read(), None)
def test_ignore_btn_left(self):
# click events are ignored because overwriting them would render the
# mouse useless, but a mouse is needed to stop the injection
@ -456,6 +489,8 @@ class TestReader(unittest.TestCase):
def test_prioritizing_3_normalize(self):
# take the sign of -1234, just like in test_prioritizing_2_normalize
pending_events['device 1'] = [
# HAT0X usually reports only -1, 0 and 1, but that shouldn't
# matter. Everything is normalized.
new_event(EV_ABS, ABS_HAT0X, -1234, 1234.0000),
new_event(EV_ABS, ABS_HAT0Y, 0, 1234.0030) # ignored
# this time don't release anything as well, but it's not

Loading…
Cancel
Save