mapping wheel buttons

xkb
sezanzeb 4 years ago committed by sezanzeb
parent 4bad927ffc
commit d5b6188aff

@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Generated with glade 3.38.1 -->
<!-- Generated with glade 3.38.2 -->
<interface>
<requires lib="gtk+" version="3.22"/>
<object class="GtkImage" id="check-icon">
@ -339,11 +339,14 @@
<property name="visible">True</property>
<property name="can-focus">True</property>
<property name="receives-default">True</property>
<property name="tooltip-text" translatable="yes">To give your keys back their original mapping.</property>
<property name="tooltip-text" translatable="yes">Shortcut: shift + del
To give your keys back their original mapping.</property>
<property name="halign">end</property>
<property name="image">gtk-redo-icon</property>
<property name="always-show-image">True</property>
<signal name="clicked" handler="on_apply_system_layout_clicked" swapped="no"/>
<accelerator key="Delete" signal="activate" modifiers="GDK_SHIFT_MASK"/>
</object>
<packing>
<property name="expand">False</property>
@ -411,7 +414,7 @@
<property name="visible">True</property>
<property name="can-focus">True</property>
<property name="receives-default">True</property>
<property name="tooltip-text" translatable="yes">Presets need to be saved before they can be applied</property>
<property name="tooltip-text" translatable="yes">Presets need to be saved before they can be applied. Don't hold down any keys while the preset gets applied.</property>
<property name="image">check-icon</property>
<property name="always-show-image">True</property>
<signal name="clicked" handler="on_apply_preset_clicked" swapped="no"/>

@ -1,202 +0,0 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# key-mapper - GUI for device specific keyboard mappings
# Copyright (C) 2021 sezanzeb <proxima@hip70890b.de>
#
# This file is part of key-mapper.
#
# key-mapper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# key-mapper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with key-mapper. If not, see <https://www.gnu.org/licenses/>.
"""Keeps mapping joystick to mouse movements."""
import asyncio
import time
from evdev.ecodes import EV_REL, REL_X, REL_Y, REL_WHEEL, REL_HWHEEL
from keymapper.logger import logger
from keymapper.config import MOUSE, WHEEL
from keymapper.dev.utils import get_max_abs
# miniscule movements on the joystick should not trigger a mouse wheel event
WHEEL_THRESHOLD = 0.15
def _write(device, ev_type, keycode, value):
"""Inject."""
# if the mouse won't move even though correct stuff is written here, the
# capabilities are probably wrong
try:
device.write(ev_type, keycode, value)
device.syn()
except OverflowError:
logger.error('OverflowError (%s, %s, %s)', ev_type, keycode, value)
pass
def accumulate(pending, current):
"""Since devices can't do float values, stuff has to be accumulated.
If pending is 0.6 and current is 0.5, return 0.1 and 1.
Because it should move 1px, and 0.1px is rememberd for the next value in
pending.
"""
pending += current
current = int(pending)
pending -= current
return pending, current
def abs_max(value_1, value_2):
"""Get the value with the higher abs value."""
if abs(value_1) > abs(value_2):
return value_1
return value_2
def get_values(abs_state, left_purpose, right_purpose):
"""Get the raw values for wheel and mouse movement.
If two joysticks have the same purpose, the one that reports higher
absolute values takes over the control.
"""
mouse_x = 0
mouse_y = 0
wheel_x = 0
wheel_y = 0
if left_purpose == MOUSE:
mouse_x = abs_max(mouse_x, abs_state[0])
mouse_y = abs_max(mouse_y, abs_state[1])
if left_purpose == WHEEL:
wheel_x = abs_max(wheel_x, abs_state[0])
wheel_y = abs_max(wheel_y, abs_state[1])
if right_purpose == MOUSE:
mouse_x = abs_max(mouse_x, abs_state[2])
mouse_y = abs_max(mouse_y, abs_state[3])
if right_purpose == WHEEL:
wheel_x = abs_max(wheel_x, abs_state[2])
wheel_y = abs_max(wheel_y, abs_state[3])
return mouse_x, mouse_y, wheel_x, wheel_y
async def ev_abs_mapper(abs_state, input_device, keymapper_device, mapping):
"""Keep writing mouse movements based on the gamepad stick position.
Even if no new input event arrived because the joystick remained at
its position, this will keep injecting the mouse movement events.
Parameters
----------
abs_state : [int, int. int, int]
array to read the current abs values from for events of codes
ABS_X, ABS_Y, ABS_RX and ABS_RY
Its contents will change while this function executes its loop from
the outside.
input_device : evdev.InputDevice
keymapper_device : evdev.UInput
mapping : Mapping
the mapping object that configures the current injection
"""
max_value = get_max_abs(input_device)
if max_value in [0, 1, None]:
# not something that was intended for this
return
logger.debug('Max abs of "%s": %s', input_device.name, max_value)
max_speed = ((max_value ** 2) * 2) ** 0.5
# events only take ints, so a movement of 0.3 needs to add
# up to 1.2 to affect the cursor.
pending_x_rel = 0
pending_y_rel = 0
pending_rx_rel = 0
pending_ry_rel = 0
pointer_speed = mapping.get('gamepad.joystick.pointer_speed')
non_linearity = mapping.get('gamepad.joystick.non_linearity')
left_purpose = mapping.get('gamepad.joystick.left_purpose')
right_purpose = mapping.get('gamepad.joystick.right_purpose')
x_scroll_speed = mapping.get('gamepad.joystick.x_scroll_speed')
y_scroll_speed = mapping.get('gamepad.joystick.y_scroll_speed')
logger.info(
'Left joystick as %s, right joystick as %s',
left_purpose,
right_purpose
)
while True:
start = time.time()
mouse_x, mouse_y, wheel_x, wheel_y = get_values(
abs_state,
left_purpose,
right_purpose
)
out_of_bounds = [
val for val in [mouse_x, mouse_y, wheel_x, wheel_y]
if val > max_value
]
if len(out_of_bounds) > 0:
logger.error(
'Encountered inconsistent values: %s, max abs: %s',
out_of_bounds,
max_value
)
return
# mouse movements
if abs(mouse_x) > 0 or abs(mouse_y) > 0:
if non_linearity != 1:
# to make small movements smaller for more precision
speed = (mouse_x ** 2 + mouse_y ** 2) ** 0.5
factor = (speed / max_speed) ** non_linearity
else:
factor = 1
rel_x = (mouse_x / max_value) * factor * pointer_speed
rel_y = (mouse_y / max_value) * factor * pointer_speed
pending_x_rel, rel_x = accumulate(pending_x_rel, rel_x)
pending_y_rel, rel_y = accumulate(pending_y_rel, rel_y)
if rel_x != 0:
_write(keymapper_device, EV_REL, REL_X, rel_x)
if rel_y != 0:
_write(keymapper_device, EV_REL, REL_Y, rel_y)
# wheel movements
if abs(wheel_x) > 0:
float_rel_rx = wheel_x * x_scroll_speed / max_value
pending_rx_rel, rel_rx = accumulate(pending_rx_rel, float_rel_rx)
if abs(float_rel_rx) > WHEEL_THRESHOLD * x_scroll_speed:
_write(keymapper_device, EV_REL, REL_HWHEEL, rel_rx)
if abs(wheel_y) > 0:
float_rel_ry = wheel_y * y_scroll_speed / max_value
pending_ry_rel, rel_ry = accumulate(pending_ry_rel, float_rel_ry)
if abs(float_rel_ry) > WHEEL_THRESHOLD * y_scroll_speed:
_write(keymapper_device, EV_REL, REL_WHEEL, -rel_ry)
# try to do this as close to 60hz as possible
time_taken = time.time() - start
await asyncio.sleep(max(0.0, (1 / 60) - time_taken))

@ -0,0 +1,263 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# key-mapper - GUI for device specific keyboard mappings
# Copyright (C) 2020 sezanzeb <proxima@hip70890b.de>
#
# This file is part of key-mapper.
#
# key-mapper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# key-mapper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with key-mapper. If not, see <https://www.gnu.org/licenses/>.
"""Keeps mapping joystick to mouse movements."""
import asyncio
import time
from evdev.ecodes import EV_REL, REL_X, REL_Y, REL_WHEEL, REL_HWHEEL, \
EV_ABS, ABS_X, ABS_Y, ABS_RX, ABS_RY
from keymapper.logger import logger
from keymapper.config import MOUSE, WHEEL
from keymapper.dev import utils
# miniscule movements on the joystick should not trigger a mouse wheel event
WHEEL_THRESHOLD = 0.15
def abs_max(value_1, value_2):
"""Get the value with the higher abs value."""
if abs(value_1) > abs(value_2):
return value_1
return value_2
class EventProducer:
"""Keeps producing events at 60hz if needed.
Can debounce writes or map joysticks to mouse movements.
This class does not handle injecting macro stuff over time, that is done
by the keycode_mapper.
"""
def __init__(self, mapping):
"""Construct the event producer without it doing anything yet.
Parameters
----------
mapping : Mapping
the mapping object that configures the current injection
"""
self.mapping = mapping
self.mouse_uinput = None
self.max_abs = None
# events only take ints, so a movement of 0.3 needs to add
# up to 1.2 to affect the cursor, with 0.2 remaining
self.pending_rel = {REL_X: 0, REL_Y: 0, REL_WHEEL: 0, REL_HWHEEL: 0}
# the last known position of the joystick
self.abs_state = {ABS_X: 0, ABS_Y: 0, ABS_RX: 0, ABS_RY: 0}
self.debounces = {}
def notify(self, event):
"""Tell the EventProducer about the newest ABS event.
Afterwards, it can continue moving the mouse pointer in the
correct direction.
"""
if event.type == EV_ABS and event.code in self.abs_state:
self.abs_state[event.code] = event.value
def _write(self, device, ev_type, keycode, value):
"""Inject."""
# if the mouse won't move even though correct stuff is written here,
# the capabilities are probably wrong
try:
device.write(ev_type, keycode, value)
device.syn()
except OverflowError:
# screwed up the calculation of mouse movements
logger.error('OverflowError (%s, %s, %s)', ev_type, keycode, value)
pass
def debounce(self, debounce_id, func, args, ticks):
"""Debounce a function call.
Parameters
----------
debounce_id : hashable
If this function is called with the same debounce_id again,
the previous debouncing is overwritten, and there fore restarted.
func : function
args : tuple
ticks : int
After ticks * 1 / 60 seconds the function will be executed,
unless debounce is called again with the same debounce_id
"""
self.debounces[debounce_id] = [func, args, ticks]
def accumulate(self, code, input_value):
"""Since devices can't do float values, stuff has to be accumulated.
If pending is 0.6 and input_value is 0.5, return 0.1 and 1.
Because it should move 1px, and 0.1px is rememberd for the next value
in pending.
"""
self.pending_rel[code] += input_value
output_value = int(self.pending_rel[code])
self.pending_rel[code] -= output_value
return output_value
def set_mouse_uinput(self, uinput):
"""Set where to write mouse movements to."""
logger.debug('Going to inject mouse movements to "%s"', uinput.name)
self.mouse_uinput = uinput
def set_max_abs_from(self, device):
"""Update the maximum value joysticks will report.
This information is needed for abs -> rel mapping.
"""
if device is None:
return
max_abs = utils.get_max_abs(device)
if max_abs in [0, 1, None]:
# max_abs of joysticks is usually a much higher number
return
self.max_abs = max_abs
logger.debug('Max abs of "%s": %s', device.name, max_abs)
def get_abs_values(self, left_purpose, right_purpose):
"""Get the raw values for wheel and mouse movement.
If two joysticks have the same purpose, the one that reports higher
absolute values takes over the control.
"""
mouse_x, mouse_y, wheel_x, wheel_y = 0, 0, 0, 0
if left_purpose == MOUSE:
mouse_x = abs_max(mouse_x, self.abs_state[ABS_X])
mouse_y = abs_max(mouse_y, self.abs_state[ABS_Y])
if left_purpose == WHEEL:
wheel_x = abs_max(wheel_x, self.abs_state[ABS_X])
wheel_y = abs_max(wheel_y, self.abs_state[ABS_Y])
if right_purpose == MOUSE:
mouse_x = abs_max(mouse_x, self.abs_state[ABS_RX])
mouse_y = abs_max(mouse_y, self.abs_state[ABS_RY])
if right_purpose == WHEEL:
wheel_x = abs_max(wheel_x, self.abs_state[ABS_RX])
wheel_y = abs_max(wheel_y, self.abs_state[ABS_RY])
return mouse_x, mouse_y, wheel_x, wheel_y
def is_handled(self, event):
"""Check if the event is something ev_abs will take care of."""
is_joystick = event.type == EV_ABS and event.code in utils.JOYSTICK
return is_joystick and self.max_abs is not None
async def run(self):
"""Keep writing mouse movements based on the gamepad stick position.
Even if no new input event arrived because the joystick remained at
its position, this will keep injecting the mouse movement events.
"""
max_abs = self.max_abs
mapping = self.mapping
pointer_speed = mapping.get('gamepad.joystick.pointer_speed')
non_linearity = mapping.get('gamepad.joystick.non_linearity')
left_purpose = mapping.get('gamepad.joystick.left_purpose')
right_purpose = mapping.get('gamepad.joystick.right_purpose')
x_scroll_speed = mapping.get('gamepad.joystick.x_scroll_speed')
y_scroll_speed = mapping.get('gamepad.joystick.y_scroll_speed')
logger.info(
'Left joystick as %s, right joystick as %s',
left_purpose,
right_purpose
)
start = time.time()
while True:
# production loop. try to do this as close to 60hz as possible
time_taken = time.time() - start
await asyncio.sleep(max(0.0, (1 / 60) - time_taken))
start = time.time()
"""handling debounces"""
for debounce in self.debounces.values():
if debounce[2] == -1:
# has already been triggered
continue
if debounce[2] == 0:
debounce[0](*debounce[1])
debounce[2] = -1
else:
debounce[2] -= 1
"""mouse movement production"""
if max_abs is None:
# no ev_abs events will be mapped to ev_rel
continue
max_speed = ((max_abs ** 2) * 2) ** 0.5
abs_values = self.get_abs_values(left_purpose, right_purpose)
if len([val for val in abs_values if val > max_abs]) > 0:
logger.error(
'Inconsistent values: %s, max_abs: %s',
abs_values, max_abs
)
return
mouse_x, mouse_y, wheel_x, wheel_y = abs_values
# mouse movements
if abs(mouse_x) > 0 or abs(mouse_y) > 0:
if non_linearity != 1:
# to make small movements smaller for more precision
speed = (mouse_x ** 2 + mouse_y ** 2) ** 0.5
factor = (speed / max_speed) ** non_linearity
else:
factor = 1
rel_x = (mouse_x / max_abs) * factor * pointer_speed
rel_y = (mouse_y / max_abs) * factor * pointer_speed
rel_x = self.accumulate(REL_X, rel_x)
rel_y = self.accumulate(REL_Y, rel_y)
if rel_x != 0:
self._write(self.mouse_uinput, EV_REL, REL_X, rel_x)
if rel_y != 0:
self._write(self.mouse_uinput, EV_REL, REL_Y, rel_y)
# wheel movements
if abs(wheel_x) > 0:
change = wheel_x * x_scroll_speed / max_abs
value = self.accumulate(REL_WHEEL, change)
if abs(change) > WHEEL_THRESHOLD * x_scroll_speed:
self._write(self.mouse_uinput, EV_REL, REL_HWHEEL, value)
if abs(wheel_y) > 0:
change = wheel_y * y_scroll_speed / max_abs
value = self.accumulate(REL_HWHEEL, change)
if abs(change) > WHEEL_THRESHOLD * y_scroll_speed:
self._write(self.mouse_uinput, EV_REL, REL_WHEEL, -value)

@ -29,13 +29,13 @@ import subprocess
import multiprocessing
import evdev
from evdev.ecodes import EV_KEY, EV_ABS, EV_REL
from evdev.ecodes import EV_KEY, EV_REL
from keymapper.logger import logger
from keymapper.getdevices import get_devices, map_abs_to_rel
from keymapper.dev.keycode_mapper import handle_keycode
from keymapper.dev import utils
from keymapper.dev.ev_abs_mapper import ev_abs_mapper
from keymapper.dev.event_producer import EventProducer
from keymapper.dev.macros import parse, is_this_a_macro
from keymapper.state import system_mapping
from keymapper.mapping import DISABLE_CODE
@ -45,6 +45,9 @@ DEV_NAME = 'key-mapper'
CLOSE = 0
# TODO joystick unmodified? forward abs_rel and abs_rel in capabilities
def is_numlock_on():
"""Get the current state of the numlock."""
try:
@ -136,12 +139,7 @@ class Injector:
self._msg_pipe = multiprocessing.Pipe()
self._key_to_code = self._map_keys_to_codes()
self.stopped = False
# when moving the joystick and then staying at a position, no
# events will be written anymore. Remember the last value the
# joystick reported, because it is still remaining at that
# position.
self.abs_state = [0, 0, 0, 0]
self._event_producer = None
def _map_keys_to_codes(self):
"""To quickly get target keycodes during operation.
@ -331,7 +329,7 @@ class Injector:
the loops needed to read and map events and keeps running them.
"""
# create a new event loop, because somehow running an infinite loop
# that sleeps on iterations (ev_abs_mapper) in one process causes
# that sleeps on iterations (event_producer) in one process causes
# another injection process to screw up reading from the grabbed
# device.
loop = asyncio.new_event_loop()
@ -346,10 +344,14 @@ class Injector:
paths = get_devices()[self.device]['paths']
self._event_producer = EventProducer(self.mapping)
# Watch over each one of the potentially multiple devices per hardware
for path in paths:
source, abs_to_rel = self._prepare_device(path)
if source is None:
# this path doesn't need to be grabbed for injection, because
# it doesn't provide the events needed to execute the mapping
continue
# each device needs own macro instances to add a custom handler
@ -393,21 +395,15 @@ class Injector:
coroutines.append(self._event_consumer(
macros,
source,
uinput,
abs_to_rel
uinput
))
# mouse movement injection based on the results of the
# event consumer
# The event source of the current iteration will deliver events
# that are needed for this. It is that one that will be mapped
# to a mouse-like devnode.
if abs_to_rel:
self.abs_state[0] = 0
self.abs_state[1] = 0
coroutines.append(ev_abs_mapper(
self.abs_state,
source,
uinput,
self.mapping
))
self._event_producer.set_max_abs_from(source)
self._event_producer.set_mouse_uinput(uinput)
if len(coroutines) == 0:
logger.error('Did not grab any device')
@ -419,6 +415,9 @@ class Injector:
# grabbing devices screws this up
set_numlock(numlock_state)
# run besides this stuff
coroutines.append(self._event_producer.run())
try:
loop.run_until_complete(asyncio.gather(*coroutines))
except RuntimeError:
@ -436,10 +435,13 @@ class Injector:
uinput.write(EV_KEY, code, value)
uinput.syn()
async def _event_consumer(self, macros, source, uinput, abs_to_rel):
"""Reads input events to inject keycodes or talk to the ev_abs_mapper.
async def _event_consumer(self, macros, source, uinput):
"""Reads input events to inject keycodes or talk to the event_producer.
Can be stopped by stopping the asyncio loop.
Can be stopped by stopping the asyncio loop. This loop
reads events from a single device only. Other devnodes may be
present for the hardware device, in which case this needs to be
started multiple times.
Parameters
----------
@ -449,35 +451,45 @@ class Injector:
where to read keycodes from
uinput : evdev.UInput
where to write keycodes to
abs_to_rel : bool
if joystick events should be mapped to mouse movements
"""
logger.debug(
'Started injecting into %s, fd %s',
uinput.device.path, uinput.fd
'Started consumer to inject to %s, fd %s',
source.path, source.fd
)
async for event in source.async_read_loop():
if self._event_producer.is_handled(event):
# the event_producer will take care of it
self._event_producer.notify(event)
continue
# for mapped stuff
if utils.should_map_event_as_btn(source, event, self.mapping):
will_report_key_up = utils.will_report_key_up(event)
handle_keycode(
self._key_to_code,
macros,
event,
uinput
uinput,
)
continue
is_joystick = event.type == EV_ABS and event.code in utils.JOYSTICK
if abs_to_rel and is_joystick:
# talks to the ev_abs_mapper via the abs_state array
if event.code == evdev.ecodes.ABS_X:
self.abs_state[0] = event.value
elif event.code == evdev.ecodes.ABS_Y:
self.abs_state[1] = event.value
elif event.code == evdev.ecodes.ABS_RX:
self.abs_state[2] = event.value
elif event.code == evdev.ecodes.ABS_RY:
self.abs_state[3] = event.value
if not will_report_key_up:
# simulate a key-up event if no down event arrives anymore.
# this may release macros, combinations or keycodes.
release = evdev.InputEvent(0, 0, event.type, event.code, 0)
self._event_producer.debounce(
debounce_id=(event.type, event.code, event.value),
func=handle_keycode,
args=(
self._key_to_code, macros,
release,
uinput,
False
),
ticks=3,
)
continue
# forward the rest
@ -485,8 +497,8 @@ class Injector:
# this already includes SYN events, so need to syn here again
logger.error(
'The injector for "%s" stopped early',
uinput.device.path
'The consumer for "%s" stopped early',
source.path
)
@ensure_numlock

@ -27,7 +27,7 @@ import asyncio
from evdev.ecodes import EV_KEY, EV_ABS
from keymapper.logger import logger, is_debug
from keymapper.logger import logger
from keymapper.mapping import DISABLE_CODE
@ -36,10 +36,12 @@ from keymapper.mapping import DISABLE_CODE
# mapping of (type, code). The value is not included in the key, because
# a key release event with a value of 0 needs to be able to find the
# running macro. The downside is that a d-pad cannot execute two macros at
# once, one for each direction. Only sequentially.
# once, one for each direction. Only sequentially.W
active_macros = {}
# mapping of future up event (type, code) to (output code, input event)
# mapping of future up event (type, code) to (output, input event),
# with output being a tuple of (type, code) as well. All key-up events
# have a value of 0, so it is not added to the tuple.
# This is needed in order to release the correct event mapped on a
# D-Pad. Each direction on one D-Pad axis reports the same type and
# code, but different values. There cannot be both at the same time,
@ -93,26 +95,7 @@ def subsets(combination):
))
def log(key, msg, *args):
"""Function that logs nicely formatted spams."""
if not is_debug():
return
msg = msg % args
str_key = str(key)
str_key = str_key.replace(',)', ')')
spacing = ' ' + '-' * max(0, 30 - len(str_key))
if len(spacing) == 1:
spacing = ''
msg = f'{str_key}{spacing} {msg}'
logger.spam(msg)
return msg
def handle_keycode(key_to_code, macros, event, uinput):
def handle_keycode(key_to_code, macros, event, uinput, forward=True):
"""Write mapped keycodes, forward unmapped ones and manage macros.
As long as the provided event is mapped it will handle it, it won't
@ -130,6 +113,8 @@ def handle_keycode(key_to_code, macros, event, uinput):
mapping of (type, code, value) to _Macro objects.
Combinations work similar as in key_to_code
event : evdev.InputEvent
forward : bool
if False, will not forward the event if it didn't trigger any mapping
"""
if event.type == EV_KEY and event.value == 2:
# button-hold event. Linux creates them on its own for the
@ -151,7 +136,12 @@ def handle_keycode(key_to_code, macros, event, uinput):
# WARNING! the combination-down triggers, but a single key-up releases.
# Do not check if key in macros and such, if it is an up event. It's
# going to be False.
combination = tuple([value[1] for value in unreleased.values()] + [key])
combination = tuple([value[1] for value in unreleased.values()])
if key not in combination: # might be a duplicate-down event
combination += (key,)
mapped = False # only down events are usually mapped
# find any triggered combination. macros and key_to_code contain
# every possible equivalent permutation of possible macros. The last
# key in the combination needs to remain the newest key though.
@ -163,14 +153,16 @@ def handle_keycode(key_to_code, macros, event, uinput):
if subset in macros or subset in key_to_code:
key = subset
mapped = True
break
else:
# no subset found, just use the key. all indices are tuples of tuples,
# both for combinations and single keys.
if event.value == 1 and len(combination) > 1:
log(combination, 'unknown combination')
logger.key_spam(combination, 'unknown combination')
key = (key,)
mapped = key in macros or key in key_to_code
active_macro = active_macros.get(type_code)
@ -181,71 +173,93 @@ def handle_keycode(key_to_code, macros, event, uinput):
# Tell the macro for that keycode that the key is released and
# let it decide what to do with that information.
active_macro.release_key()
log(key, 'releasing macro')
logger.key_spam(key, 'releasing macro')
if type_code in unreleased:
target_type, target_code = unreleased[type_code][0]
del unreleased[type_code]
if target_code == DISABLE_CODE:
log(key, 'releasing disabled key')
else:
log(key, 'releasing %s', target_code)
logger.key_spam(key, 'releasing disabled key')
elif target_code is None:
logger.key_spam(key, 'releasing key')
elif type_code != (target_type, target_code):
# release what the input is mapped to
logger.key_spam(key, 'releasing %s', target_code)
write(uinput, (target_type, target_code, 0))
elif forward:
# forward the release event
logger.key_spam(key, 'forwarding release')
write(uinput, (target_type, target_code, 0))
else:
logger.key_spam(key, 'not forwarding release')
elif event.type != EV_ABS:
# ABS events might be spammed like crazy every time the position
# slightly changes
log(key, 'unexpected key up')
logger.key_spam(key, 'unexpected key up')
# everything that can be released is released now
return
"""Filtering duplicate key downs"""
if is_key_down(event):
if mapped and is_key_down(event):
# unmapped keys should not be filtered here, they should just
# be forwarded to populate unreleased and then be written.
if unreleased.get(type_code, (None, None))[1] == event_tuple:
# duplicate key-down. skip this event. Avoid writing millions of
# key-down events when a continuous value is reported, for example
# for gamepad triggers or mouse-wheel-side buttons
logger.key_spam(key, 'duplicate key down')
return
# it would start a macro usually
if key in macros and active_macro is not None and active_macro.running:
# for key-down events and running macros, don't do anything.
# This avoids spawning a second macro while the first one is not
# finished, especially since gamepad-triggers report a ton of
# events with a positive value.
log(key, 'macro already running')
return
# it would write a key usually
if key in key_to_code and type_code in unreleased:
# duplicate key-down. skip this event. Avoid writing millions of
# key-down events when a continuous value is reported, for example
# for gamepad triggers
log(key, 'duplicate key down')
logger.key_spam(key, 'macro already running')
return
"""starting new macros or injecting new keys"""
if is_key_down(event):
# also enter this for unmapped keys, as they might end up triggering
# a combination, so they should be remembered in unreleased
if key in macros:
macro = macros[key]
active_macros[type_code] = macro
unreleased[type_code] = ((EV_KEY, None), event_tuple)
macro.press_key()
log(key, 'maps to macro %s', macro.code)
logger.key_spam(key, 'maps to macro %s', macro.code)
asyncio.ensure_future(macro.run())
return
if key in key_to_code:
target_code = key_to_code[key]
# remember the key that triggered this (combination or single key)
unreleased[type_code] = ((EV_KEY, target_code), event_tuple)
if target_code == DISABLE_CODE:
log(key, 'disabled')
logger.key_spam(key, 'disabled')
return
log(key, 'maps to %s', target_code)
logger.key_spam(key, 'maps to %s', target_code)
write(uinput, (EV_KEY, target_code, 1))
return
log(key, 'forwarding')
if forward:
logger.key_spam(key, 'forwarding')
write(uinput, event_tuple)
else:
logger.key_spam(key, 'not forwarding')
# unhandled events may still be important for triggering combinations
# later, so remember them as well.
unreleased[type_code] = ((event_tuple[:2]), event_tuple)
write(uinput, event_tuple)
return
logger.error(key, '%s unhandled. %s %s', unreleased, active_macros)

@ -57,7 +57,7 @@ def prioritize(events):
if event is not None
]
return sorted(events, key=lambda e: (
PRIORITIES[e.type],
PRIORITIES.get(e.type, 0),
not (e.type == EV_ABS and e.code == ABS_MISC),
abs(e.value)
))[-1]
@ -228,6 +228,7 @@ class _KeycodeReader:
while self._pipe[0].poll():
event = self._pipe[0].recv()
event_tuple = (event.type, event.code, event.value)
without_value = (event.type, event.code)
if event.value == 0:
@ -235,7 +236,7 @@ class _KeycodeReader:
del self._unreleased[without_value]
continue
if without_value in self._unreleased:
if self._unreleased.get(without_value) == event_tuple:
# no duplicate down events (gamepad triggers)
continue

@ -56,6 +56,16 @@ def sign(value):
return 0
def is_wheel(event):
"""Check if this is a wheel event."""
return event.type == EV_REL and event.code in [REL_WHEEL, REL_HWHEEL]
def will_report_key_up(event):
"""Check if the key is expected to report a down event as well."""
return not is_wheel(event)
def should_map_event_as_btn(device, event, mapping):
"""Does this event describe a button.
@ -75,6 +85,9 @@ def should_map_event_as_btn(device, event, mapping):
if is_mousepad:
return False
if is_wheel(event):
return True
if event.type == EV_ABS:
if event.code in JOYSTICK:
l_purpose = mapping.get('gamepad.joystick.left_purpose')

@ -28,6 +28,7 @@ from gi.repository import Gtk, GLib, Gdk
from keymapper.state import custom_mapping, system_mapping
from keymapper.logger import logger
from keymapper.key import Key
from keymapper.dev.reader import keycode_reader
CTX_KEYCODE = 2
@ -84,6 +85,11 @@ def to_string(key):
(evdev.ecodes.ABS_RX, -1): 'L',
(evdev.ecodes.ABS_RY, 1): 'D',
(evdev.ecodes.ABS_RY, -1): 'U',
# wheel
(evdev.ecodes.REL_WHEEL, -1): 'D',
(evdev.ecodes.REL_WHEEL, 1): 'U',
(evdev.ecodes.REL_HWHEEL, -1): 'L',
(evdev.ecodes.REL_HWHEEL, 1): 'R',
}.get((code, value))
if direction is not None:
key_name += f' {direction}'
@ -258,6 +264,7 @@ class Row(Gtk.ListBoxRow):
self.show_click_here()
self.keycode_input.set_active(False)
self.state = IDLE
keycode_reader.clear()
def set_keycode_input_label(self, label):
"""Set the label of the keycode input."""

@ -188,7 +188,8 @@ class Window:
This has nothing to do with the keycode reader.
"""
gdk_keycode = event.get_keyval()[1]
if gdk_keycode == Gdk.KEY_Control_L:
if gdk_keycode in [Gdk.KEY_Control_L, Gdk.KEY_Control_R]:
self.ctrl = True
if gdk_keycode == Gdk.KEY_q and self.ctrl:
@ -200,7 +201,8 @@ class Window:
This has nothing to do with the keycode reader.
"""
gdk_keycode = event.get_keyval()[1]
if gdk_keycode == Gdk.KEY_Control_L:
if gdk_keycode in [Gdk.KEY_Control_L, Gdk.KEY_Control_R]:
self.ctrl = False
def initialize_gamepad_config(self):
@ -361,7 +363,7 @@ class Window:
if key is None:
return True
if key.is_problematic():
if key.is_problematic() and isinstance(focused, Gtk.ToggleButton):
self.show_status(
CTX_WARNING,
'ctrl, alt and shift may not combine properly',
@ -398,8 +400,8 @@ class Window:
if context_id == CTX_WARNING:
self.get('warning_status_icon').show()
if len(message) > 48:
message = message[:50] + '...'
if len(message) > 55:
message = message[:52] + '...'
status_bar = self.get('status_bar')
status_bar.push(context_id, message)
@ -460,11 +462,14 @@ class Window:
if custom_mapping.changed:
self.show_status(
CTX_WARNING,
f'Applied outdated preset "{preset}"',
f'"{preset}" is outdated. shift + del to stop.',
'Click "Save" first for changes to take effect'
)
else:
self.show_status(CTX_APPLY, f'Applied preset "{preset}"')
self.show_status(
CTX_APPLY,
f'Applied preset "{preset}". shift + del to stop'
)
path = get_preset_path(device, preset)
success = self.dbus.start_injecting(device, path, get_config_path())

@ -30,6 +30,8 @@ import pkg_resources
SPAM = 5
start = time.time()
def spam(self, message, *args, **kwargs):
"""Log a more-verbose message than debug."""
@ -39,8 +41,23 @@ def spam(self, message, *args, **kwargs):
self._log(SPAM, message, args, **kwargs)
def key_spam(self, key, msg, *args):
"""Log a spam message custom tailored to keycode_mapper."""
if not self.isEnabledFor(SPAM):
return
msg = msg % args
str_key = str(key)
str_key = str_key.replace(',)', ')')
spacing = ' ' + '-' * max(0, 30 - len(str_key))
if len(spacing) == 1:
spacing = ''
msg = f'{str_key}{spacing} {msg}'
self._log(SPAM, msg, args=None)
logging.addLevelName(SPAM, "SPAM")
logging.Logger.spam = spam
logging.Logger.key_spam = key_spam
start = time.time()
@ -75,13 +92,14 @@ class Formatter(logging.Formatter):
pid = f'pid {os.getpid()}, '
if debug:
delta = f' {str(time.time() - start)[:7]}, '
self._style._fmt = ( # noqa
'\033[1m' # bold
f'\033[{color}m' # color
f'%(levelname)s'
'\033[0m' # end style
f'\033[{color}m' # color
f': {pid}%(filename)s, line %(lineno)d, %(message)s'
f':{delta}{pid}%(filename)s, line %(lineno)d, %(message)s'
'\033[0m' # end style
)
else:

@ -240,7 +240,9 @@ class Mapping(ConfigBase):
Parameters
----------
key : Key
key : Key or InputEvent
If an InputEvent, will test if that event is mapped
and take the sign of the value.
"""
if not isinstance(key, Key):
raise TypeError('Expected key to be a Key object')

@ -17,7 +17,7 @@
<text x="32.5" y="14">coverage</text>
</g>
<g fill="#fff" text-anchor="middle" font-family="DejaVu Sans,Verdana,Geneva,sans-serif" font-size="11">
<text x="83.0" y="15" fill="#010101" fill-opacity=".3">91%</text>
<text x="82.0" y="14">91%</text>
<text x="83.0" y="15" fill="#010101" fill-opacity=".3">92%</text>
<text x="82.0" y="14">92%</text>
</g>
</svg>

Before

Width:  |  Height:  |  Size: 1.0 KiB

After

Width:  |  Height:  |  Size: 1.0 KiB

@ -28,7 +28,7 @@ requests.
- [x] mapping a combined button press to a key
- [x] add "disable" as mapping option
- [x] mapping joystick directions as buttons, making it act like a D-Pad
- [ ] mapping mouse wheel events to buttons
- [x] mapping mouse wheel events to buttons
- [ ] automatically load presets when devices get plugged in after login (udev)
- [ ] using keys that aren't available in the systems keyboard layout
- [ ] user-friendly way to map the left mouse button
@ -47,7 +47,7 @@ to get debug output.
## Releasing
Install dpkg or ssh/login into a debian/ubuntu environment
ssh/login into a debian/ubuntu environment
```bash
./scripts/build.sh

@ -17,7 +17,7 @@
<text x="22.0" y="14">pylint</text>
</g>
<g fill="#fff" text-anchor="middle" font-family="DejaVu Sans,Verdana,Geneva,sans-serif" font-size="11">
<text x="63.0" y="15" fill="#010101" fill-opacity=".3">9.76</text>
<text x="62.0" y="14">9.76</text>
<text x="63.0" y="15" fill="#010101" fill-opacity=".3">9.72</text>
<text x="62.0" y="14">9.72</text>
</g>
</svg>

Before

Width:  |  Height:  |  Size: 1.0 KiB

After

Width:  |  Height:  |  Size: 1.0 KiB

@ -89,28 +89,35 @@ info_1 = 'bus: 0001, vendor 0001, product 0001, version 0001'
fixtures = {
# device 1
'/dev/input/event11': {
'capabilities': {evdev.ecodes.EV_KEY: [], evdev.ecodes.EV_REL: []},
'capabilities': {evdev.ecodes.EV_KEY: [], evdev.ecodes.EV_REL: [
evdev.ecodes.REL_WHEEL,
evdev.ecodes.REL_HWHEEL
]},
'phys': f'{phys_1}/input2',
'info': info_1,
'name': 'device 1 foo'
'name': 'device 1 foo',
'group': 'device 1'
},
'/dev/input/event10': {
'capabilities': {evdev.ecodes.EV_KEY: list(evdev.ecodes.keys.keys())},
'phys': f'{phys_1}/input3',
'info': info_1,
'name': 'device 1'
'name': 'device 1',
'group': 'device 1'
},
'/dev/input/event13': {
'capabilities': {evdev.ecodes.EV_KEY: [], evdev.ecodes.EV_SYN: []},
'phys': f'{phys_1}/input1',
'info': info_1,
'name': 'device 1'
'name': 'device 1',
'group': 'device 1'
},
'/dev/input/event14': {
'capabilities': {evdev.ecodes.EV_SYN: []},
'phys': f'{phys_1}/input0',
'info': info_1,
'name': 'device 1 qux'
'name': 'device 1 qux',
'group': 'device 1'
},
# device 2
@ -178,39 +185,15 @@ def push_event(device, event):
pending_events[device].append(event)
class InputEvent:
"""Event to put into the injector for tests.
def new_event(type, code, value, timestamp=None):
"""Create a new input_event."""
if timestamp is None:
timestamp = time.time()
fakes evdev.InputEvent
"""
def __init__(self, type, code, value, timestamp=None):
"""
Paramaters
----------
type : int
one of evdev.ecodes.EV_*
code : int
keyboard event code as known to linux. E.g. 2 for the '1' button
value : int
1 for down, 0 for up, 2 for hold
"""
self.type = type
self.code = code
self.value = value
if timestamp is None:
timestamp = time.time()
self.sec = int(timestamp)
self.usec = timestamp % 1 * 1000000
@property
def t(self):
# tuple shorthand
return self.type, self.code, self.value
def __str__(self):
return f'InputEvent{self.t}'
sec = int(timestamp)
usec = timestamp % 1 * 1000000
event = evdev.InputEvent(sec, usec, type, code, value)
return event
def patch_paths():
@ -251,19 +234,24 @@ class InputDevice:
path = None
def __init__(self, path):
if path not in fixtures:
if path != 'justdoit' and path not in fixtures:
raise FileNotFoundError()
self.path = path
self.phys = fixtures[path]['phys']
self.info = fixtures[path]['info']
self.name = fixtures[path]['name']
fixture = fixtures.get(path, {})
self.phys = fixture.get('phys', 'unset')
self.info = fixture.get('info', 'unset')
self.name = fixture.get('name', 'unset')
self.fd = self.name
# properties that exists for test purposes and are not part of
# the original object
self.group = fixture.get('group', self.name)
def log(self, key, msg):
print(
f'\033[90m' # color
f'{msg} "{self.name}" "{self.phys}" {key}'
f'{msg} "{self.name}" "{self.path}" {key}'
'\033[0m' # end style
)
@ -274,42 +262,45 @@ class InputDevice:
pass
def read(self):
ret = pending_events.get(self.name, [])
# the patched fake InputDevice objects read anything pending from
# that group, to be realistic it would have to check if the provided
# element is in its capabilities.
ret = pending_events.get(self.group, [])
if ret is not None:
# consume all of them
pending_events[self.name] = []
pending_events[self.group] = []
return ret
def read_one(self):
if pending_events.get(self.name) is None:
if pending_events.get(self.group) is None:
return None
if len(pending_events[self.name]) == 0:
if len(pending_events[self.group]) == 0:
return None
event = pending_events[self.name].pop(0)
event = pending_events[self.group].pop(0)
self.log(event, 'read_one')
return event
def read_loop(self):
"""Read all prepared events at once."""
if pending_events.get(self.name) is None:
if pending_events.get(self.group) is None:
return
while len(pending_events[self.name]) > 0:
result = pending_events[self.name].pop(0)
while len(pending_events[self.group]) > 0:
result = pending_events[self.group].pop(0)
self.log(result, 'read_loop')
yield result
time.sleep(EVENT_READ_TIMEOUT)
async def async_read_loop(self):
"""Read all prepared events at once."""
if pending_events.get(self.name) is None:
if pending_events.get(self.group) is None:
return
while len(pending_events[self.name]) > 0:
result = pending_events[self.name].pop(0)
while len(pending_events[self.group]) > 0:
result = pending_events[self.group].pop(0)
self.log(result, 'async_read_loop')
yield result
await asyncio.sleep(0.01)
@ -330,25 +321,38 @@ class InputDevice:
class UInput:
def __init__(self, *args, **kwargs):
def __init__(self, events=None, name='unnamed', *args, **kwargs):
self.fd = 0
self.write_count = 0
self.device = InputDevice('/dev/input/event40')
self.device = InputDevice('justdoit')
self.name = name
self.events = events
pass
def capabilities(self, *args, **kwargs):
return []
return self.events
def write(self, type, code, value):
self.write_count += 1
event = InputEvent(type, code, value)
event = new_event(type, code, value)
uinput_write_history.append(event)
uinput_write_history_pipe[1].send(event)
print(
f'\033[90m' # color
f'{(type, code, value)} written'
'\033[0m' # end style
)
def syn(self):
pass
class InputEvent(evdev.InputEvent):
def __init__(self, sec, usec, type, code, value):
self.t = (type, code, value)
super().__init__(sec, usec, type, code, value)
def patch_evdev():
def list_devices():
return fixtures.keys()
@ -356,6 +360,7 @@ def patch_evdev():
evdev.list_devices = list_devices
evdev.InputDevice = InputDevice
evdev.UInput = UInput
evdev.InputEvent = InputEvent
def patch_unsaved():
@ -364,6 +369,13 @@ def patch_unsaved():
unsaved.unsaved_changes_dialog = lambda: unsaved.CONTINUE
def patch_events():
# improve logging of stuff
evdev.InputEvent.__str__ = lambda self: (
f'InputEvent{(self.type, self.code, self.value)}'
)
def clear_write_history():
"""Empty the history in preparation for the next test."""
while len(uinput_write_history) > 0:
@ -378,6 +390,7 @@ patch_paths()
patch_evdev()
patch_unsaved()
patch_select()
patch_events()
from keymapper.logger import update_verbosity
from keymapper.dev.injector import Injector
@ -396,6 +409,11 @@ _fixture_copy = copy.deepcopy(fixtures)
def cleanup():
"""Reset the applications state."""
print(
f'\033[90m' # color
f'cleanup'
'\033[0m' # end style
)
keycode_reader.stop_reading()
keycode_reader.clear()
keycode_reader.newest_event = None

@ -37,7 +37,7 @@ from keymapper.paths import get_preset_path
from keymapper.key import Key
from keymapper.daemon import Daemon, get_dbus_interface, BUS_NAME
from tests.test import cleanup, uinput_write_history_pipe, InputEvent, \
from tests.test import cleanup, uinput_write_history_pipe, new_event, \
pending_events, is_service_running, fixtures, tmp
@ -142,7 +142,7 @@ class TestDaemon(unittest.TestCase):
# should forward the event unchanged
pending_events[device] = [
InputEvent(EV_KEY, 13, 1)
new_event(EV_KEY, 13, 1)
]
self.daemon = Daemon()
@ -166,21 +166,22 @@ class TestDaemon(unittest.TestCase):
try:
self.assertFalse(uinput_write_history_pipe[0].poll())
except AssertionError:
print(uinput_write_history_pipe[0].recv())
print('Unexpected', uinput_write_history_pipe[0].recv())
# possibly a duplicate write!
raise
"""injection 2"""
# -1234 will be normalized to -1 by the injector
pending_events[device] = [
InputEvent(*ev_2, -1234)
new_event(*ev_2, -1234)
]
path = get_preset_path(device, preset)
self.daemon.start_injecting(device, path)
# the written key is a key-down event, not the original
# event value of -5678
# event value of -1234
event = uinput_write_history_pipe[0].recv()
self.assertEqual(event.type, EV_KEY)
self.assertEqual(event.code, keycode_to_2)
@ -199,7 +200,7 @@ class TestDaemon(unittest.TestCase):
custom_mapping.save(get_preset_path(device, preset))
config.set_autoload_preset(device, preset)
pending_events[device] = [
InputEvent(*ev, 1)
new_event(*ev, 1)
]
self.daemon = Daemon()
preset_path = get_preset_path(device, preset)
@ -245,7 +246,7 @@ class TestDaemon(unittest.TestCase):
config.set_autoload_preset(device, preset)
pending_events[device] = [
InputEvent(*event)
new_event(*event)
]
config_dir = os.path.join(tmp, 'foo')

@ -22,14 +22,15 @@
import unittest
from evdev import ecodes
from evdev.ecodes import EV_KEY, EV_ABS, ABS_HAT0X, KEY_A, ABS_X, \
from evdev.ecodes import EV_KEY, EV_ABS, ABS_HAT0X, KEY_A, \
EV_REL, REL_X, REL_WHEEL, REL_HWHEEL
from keymapper.config import config, BUTTONS
from keymapper.mapping import Mapping
from keymapper.dev import utils
from keymapper.key import Key
from tests.test import InputEvent, InputDevice, MAX_ABS
from tests.test import new_event, InputDevice, MAX_ABS
class TestDevUtils(unittest.TestCase):
@ -37,62 +38,77 @@ class TestDevUtils(unittest.TestCase):
self.assertEqual(utils.get_max_abs(InputDevice('/dev/input/event30')), MAX_ABS)
self.assertIsNone(utils.get_max_abs(InputDevice('/dev/input/event10')))
def test_will_report_key_up(self):
self.assertFalse(utils.will_report_key_up(new_event(EV_REL, REL_WHEEL, 1)))
self.assertFalse(utils.will_report_key_up(new_event(EV_REL, REL_HWHEEL, -1)))
self.assertTrue(utils.will_report_key_up(new_event(EV_KEY, KEY_A, 1)))
self.assertTrue(utils.will_report_key_up(new_event(EV_ABS, ABS_HAT0X, -1)))
def test_is_wheel(self):
self.assertTrue(utils.is_wheel(new_event(EV_REL, REL_WHEEL, 1)))
self.assertTrue(utils.is_wheel(new_event(EV_REL, REL_HWHEEL, -1)))
self.assertFalse(utils.is_wheel(new_event(EV_KEY, KEY_A, 1)))
self.assertFalse(utils.is_wheel(new_event(EV_ABS, ABS_HAT0X, -1)))
def test_should_map_event_as_btn(self):
device = InputDevice('/dev/input/event30')
mapping = Mapping()
# the function name is so horribly long
do = utils.should_map_event_as_btn
def do(event):
return utils.should_map_event_as_btn(device, event, mapping)
"""D-Pad"""
self.assertTrue(do(device, InputEvent(EV_ABS, ABS_HAT0X, 1), mapping))
self.assertTrue(do(device, InputEvent(EV_ABS, ABS_HAT0X, -1), mapping))
self.assertTrue(do(new_event(EV_ABS, ABS_HAT0X, 1)))
self.assertTrue(do(new_event(EV_ABS, ABS_HAT0X, -1)))
"""Mouse movements"""
self.assertFalse(do(device, InputEvent(EV_REL, REL_WHEEL, 1), mapping))
self.assertFalse(do(device, InputEvent(EV_REL, REL_WHEEL, -1), mapping))
self.assertFalse(do(device, InputEvent(EV_REL, REL_HWHEEL, 1), mapping))
self.assertFalse(do(device, InputEvent(EV_REL, REL_HWHEEL, -1), mapping))
self.assertFalse(do(device, InputEvent(EV_REL, REL_X, -1), mapping))
self.assertTrue(do(new_event(EV_REL, REL_WHEEL, 1)))
self.assertTrue(do(new_event(EV_REL, REL_WHEEL, -1)))
self.assertTrue(do(new_event(EV_REL, REL_HWHEEL, 1)))
self.assertTrue(do(new_event(EV_REL, REL_HWHEEL, -1)))
self.assertFalse(do(new_event(EV_REL, REL_X, -1)))
"""regular keys and buttons"""
self.assertTrue(do(device, InputEvent(EV_KEY, KEY_A, 1), mapping))
self.assertTrue(do(device, InputEvent(EV_ABS, ABS_HAT0X, -1), mapping))
self.assertTrue(do(new_event(EV_KEY, KEY_A, 1)))
self.assertTrue(do(new_event(EV_ABS, ABS_HAT0X, -1)))
"""mousepad events"""
self.assertFalse(do(device, InputEvent(EV_ABS, ecodes.ABS_MT_SLOT, 1), mapping))
self.assertFalse(do(device, InputEvent(EV_ABS, ecodes.ABS_MT_TOOL_Y, 1), mapping))
self.assertFalse(do(device, InputEvent(EV_ABS, ecodes.ABS_MT_POSITION_X, 1), mapping))
self.assertFalse(do(new_event(EV_ABS, ecodes.ABS_MT_SLOT, 1)))
self.assertFalse(do(new_event(EV_ABS, ecodes.ABS_MT_TOOL_Y, 1)))
self.assertFalse(do(new_event(EV_ABS, ecodes.ABS_MT_POSITION_X, 1)))
"""joysticks"""
self.assertFalse(do(device, InputEvent(EV_ABS, ecodes.ABS_RX, 1234), mapping))
self.assertFalse(do(device, InputEvent(EV_ABS, ecodes.ABS_Y, -1), mapping))
self.assertFalse(do(new_event(EV_ABS, ecodes.ABS_RX, 1234)))
self.assertFalse(do(new_event(EV_ABS, ecodes.ABS_Y, -1)))
mapping.set('gamepad.joystick.left_purpose', BUTTONS)
event = InputEvent(EV_ABS, ecodes.ABS_RX, MAX_ABS)
self.assertFalse(do(device, event, mapping))
# the event.value should be modified for the left joystick
# to one of 0, -1 or 1
event = new_event(EV_ABS, ecodes.ABS_RX, MAX_ABS)
self.assertFalse(do(event))
self.assertEqual(event.value, MAX_ABS)
event = InputEvent(EV_ABS, ecodes.ABS_Y, -MAX_ABS)
self.assertTrue(do(device, event, mapping))
event = new_event(EV_ABS, ecodes.ABS_Y, -MAX_ABS)
self.assertTrue(do(event))
self.assertEqual(event.value, -1)
event = InputEvent(EV_ABS, ecodes.ABS_X, -MAX_ABS / 4)
self.assertTrue(do(device, event, mapping))
event = new_event(EV_ABS, ecodes.ABS_X, -MAX_ABS // 4)
self.assertTrue(do(event))
self.assertEqual(event.value, 0)
config.set('gamepad.joystick.right_purpose', BUTTONS)
event = InputEvent(EV_ABS, ecodes.ABS_RX, MAX_ABS)
self.assertTrue(do(device, event, mapping))
event = new_event(EV_ABS, ecodes.ABS_RX, MAX_ABS)
self.assertTrue(do(event))
self.assertEqual(event.value, 1)
event = InputEvent(EV_ABS, ecodes.ABS_Y, MAX_ABS)
self.assertTrue(do(device, event, mapping))
event = new_event(EV_ABS, ecodes.ABS_Y, MAX_ABS)
self.assertTrue(do(event))
self.assertEqual(event.value, 1)
event = InputEvent(EV_ABS, ecodes.ABS_X, MAX_ABS / 4)
self.assertTrue(do(device, event, mapping))
event = new_event(EV_ABS, ecodes.ABS_X, MAX_ABS // 4)
self.assertTrue(do(event))
self.assertEqual(event.value, 0)

@ -22,21 +22,21 @@
import unittest
import asyncio
from evdev.ecodes import EV_REL, REL_X, REL_Y, REL_WHEEL, REL_HWHEEL
from evdev.ecodes import EV_REL, REL_X, REL_Y, REL_WHEEL, REL_HWHEEL, \
EV_ABS, ABS_X, ABS_Y, ABS_RX, ABS_RY
from keymapper.dev.ev_abs_mapper import ev_abs_mapper
from keymapper.config import config
from keymapper.mapping import Mapping
from keymapper.dev.ev_abs_mapper import MOUSE, WHEEL
from keymapper.dev.event_producer import EventProducer, MOUSE, WHEEL
from tests.test import InputDevice, UInput, MAX_ABS, clear_write_history, \
uinput_write_history, cleanup
uinput_write_history, cleanup, new_event
abs_state = [0, 0, 0, 0]
class TestEvAbsMapper(unittest.TestCase):
class TestEventProducer(unittest.TestCase):
# there is also `test_abs_to_rel` in test_injector.py
def setUp(self):
loop = asyncio.new_event_loop()
@ -46,12 +46,10 @@ class TestEvAbsMapper(unittest.TestCase):
device = InputDevice('/dev/input/event30')
uinput = UInput()
asyncio.ensure_future(ev_abs_mapper(
abs_state,
device,
uinput,
self.mapping
))
self.event_producer = EventProducer(self.mapping)
self.event_producer.set_max_abs_from(device)
self.event_producer.set_mouse_uinput(uinput)
asyncio.ensure_future(self.event_producer.run())
config.set('gamepad.joystick.x_scroll_speed', 1)
config.set('gamepad.joystick.y_scroll_speed', 1)
@ -59,13 +57,66 @@ class TestEvAbsMapper(unittest.TestCase):
def tearDown(self):
cleanup()
def test_debounce_1(self):
loop = asyncio.get_event_loop()
tick_time = 1 / 60
history = []
self.event_producer.debounce(1234, history.append, (1,), 10)
asyncio.ensure_future(self.event_producer.run())
loop.run_until_complete(asyncio.sleep(6 * tick_time))
self.assertEqual(len(history), 0)
loop.run_until_complete(asyncio.sleep(6 * tick_time))
self.assertEqual(len(history), 1)
# won't get called a second time
loop.run_until_complete(asyncio.sleep(11 * tick_time))
self.assertEqual(len(history), 1)
self.assertEqual(history[0], 1)
def test_debounce_2(self):
loop = asyncio.get_event_loop()
tick_time = 1 / 60
history = []
self.event_producer.debounce(1234, history.append, (1,), 10)
asyncio.ensure_future(self.event_producer.run())
loop.run_until_complete(asyncio.sleep(6 * tick_time))
self.assertEqual(len(history), 0)
# replaces
self.event_producer.debounce(1234, history.append, (2,), 20)
loop.run_until_complete(asyncio.sleep(6 * tick_time))
self.assertEqual(len(history), 0)
loop.run_until_complete(asyncio.sleep(11 * tick_time))
self.assertEqual(len(history), 1)
# won't get called a second time
loop.run_until_complete(asyncio.sleep(21 * tick_time))
self.assertEqual(len(history), 1)
self.assertEqual(history[0], 2)
def test_debounce_3(self):
loop = asyncio.get_event_loop()
tick_time = 1 / 60
history = []
self.event_producer.debounce(1234, history.append, (1,), 10)
self.event_producer.debounce(5678, history.append, (2,), 20)
asyncio.ensure_future(self.event_producer.run())
loop.run_until_complete(asyncio.sleep(11 * tick_time))
self.assertEqual(len(history), 1)
loop.run_until_complete(asyncio.sleep(11 * tick_time))
self.assertEqual(len(history), 2)
loop.run_until_complete(asyncio.sleep(21 * tick_time))
self.assertEqual(len(history), 2)
self.assertEqual(history[0], 1)
self.assertEqual(history[1], 2)
def do(self, a, b, c, d, expectation):
"""Present fake values to the loop and observe the outcome."""
clear_write_history()
abs_state[0] = a
abs_state[1] = b
abs_state[2] = c
abs_state[3] = d
self.event_producer.notify(new_event(EV_ABS, ABS_X, a))
self.event_producer.notify(new_event(EV_ABS, ABS_Y, b))
self.event_producer.notify(new_event(EV_ABS, ABS_RX, c))
self.event_producer.notify(new_event(EV_ABS, ABS_RY, d))
# 3 frames
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.sleep(3 / 60))

@ -24,7 +24,8 @@ import time
import copy
import evdev
from evdev.ecodes import EV_REL, EV_KEY, EV_ABS, ABS_HAT0X, BTN_LEFT, KEY_A
from evdev.ecodes import EV_REL, EV_KEY, EV_ABS, ABS_HAT0X, BTN_LEFT, KEY_A, \
REL_X, REL_Y, REL_WHEEL, REL_HWHEEL
from keymapper.dev.injector import is_numlock_on, set_numlock, \
ensure_numlock, Injector, is_in_capabilities
@ -34,8 +35,10 @@ from keymapper.config import config
from keymapper.key import Key
from keymapper.dev.macros import parse
from keymapper.dev import utils
from keymapper.logger import logger
from keymapper.getdevices import get_devices
from tests.test import InputEvent, pending_events, fixtures, \
from tests.test import new_event, pending_events, fixtures, \
EVENT_READ_TIMEOUT, uinput_write_history_pipe, \
MAX_ABS, cleanup, read_write_history_pipe, InputDevice
@ -315,18 +318,15 @@ class TestInjector(unittest.TestCase):
pointer_speed = 80
config.set('gamepad.joystick.pointer_speed', pointer_speed)
rel_x = evdev.ecodes.REL_X
rel_y = evdev.ecodes.REL_Y
# they need to sum up before something is written
divisor = 10
x = MAX_ABS / pointer_speed / divisor
y = MAX_ABS / pointer_speed / divisor
pending_events['gamepad'] = [
InputEvent(EV_ABS, rel_x, x),
InputEvent(EV_ABS, rel_y, y),
InputEvent(EV_ABS, rel_x, -x),
InputEvent(EV_ABS, rel_y, -y),
new_event(EV_ABS, REL_X, x),
new_event(EV_ABS, REL_Y, y),
new_event(EV_ABS, REL_X, -x),
new_event(EV_ABS, REL_Y, -y),
]
self.injector = Injector('gamepad', custom_mapping)
@ -348,6 +348,7 @@ class TestInjector(unittest.TestCase):
if history[0][0] == EV_ABS:
raise AssertionError(
'The injector probably just forwarded them unchanged'
# possibly in addition to writing mouse events
)
# movement is written at 60hz and it takes `divisor` steps to
@ -355,14 +356,13 @@ class TestInjector(unittest.TestCase):
self.assertGreater(len(history), 60 * sleep * 0.9 * 2 / divisor)
self.assertLess(len(history), 60 * sleep * 1.1 * 2 / divisor)
# those may be in arbitrary order, the injector happens to write
# y first
self.assertEqual(history[-1][0], EV_REL)
self.assertEqual(history[-1][1], rel_x)
self.assertAlmostEqual(history[-1][2], -1)
self.assertEqual(history[-2][0], EV_REL)
self.assertEqual(history[-2][1], rel_y)
self.assertAlmostEqual(history[-2][2], -1)
# those may be in arbitrary order
count_x = history.count((EV_REL, REL_X, -1))
count_y = history.count((EV_REL, REL_Y, -1))
self.assertGreater(count_x, 1)
self.assertGreater(count_y, 1)
# only those two types of events were written
self.assertEqual(len(history), count_x + count_y)
def test_injector(self):
# the tests in test_keycode_mapper.py test this stuff in detail
@ -387,17 +387,17 @@ class TestInjector(unittest.TestCase):
pending_events['device 2'] = [
# should execute a macro...
InputEvent(EV_KEY, 8, 1),
InputEvent(EV_KEY, 9, 1), # ...now
InputEvent(EV_KEY, 8, 0),
InputEvent(EV_KEY, 9, 0),
new_event(EV_KEY, 8, 1),
new_event(EV_KEY, 9, 1), # ...now
new_event(EV_KEY, 8, 0),
new_event(EV_KEY, 9, 0),
# gamepad stuff. trigger a combination
InputEvent(EV_ABS, ABS_HAT0X, -1),
InputEvent(EV_ABS, ABS_HAT0X, 0),
new_event(EV_ABS, ABS_HAT0X, -1),
new_event(EV_ABS, ABS_HAT0X, 0),
# just pass those over without modifying
InputEvent(EV_KEY, 10, 1),
InputEvent(EV_KEY, 10, 0),
InputEvent(3124, 3564, 6542),
new_event(EV_KEY, 10, 1),
new_event(EV_KEY, 10, 0),
new_event(3124, 3564, 6542),
]
self.injector = Injector('device 2', custom_mapping)
@ -494,10 +494,10 @@ class TestInjector(unittest.TestCase):
uinput_write_history_pipe[0].recv()
pending_events['gamepad'] = [
InputEvent(*w_down),
InputEvent(*d_down),
InputEvent(*w_up),
InputEvent(*d_up),
new_event(*w_down),
new_event(*d_down),
new_event(*w_up),
new_event(*d_up),
]
self.injector = Injector('gamepad', custom_mapping)
@ -529,6 +529,81 @@ class TestInjector(unittest.TestCase):
self.assertEqual(history.count((EV_KEY, code_w, 0)), 1)
self.assertEqual(history.count((EV_KEY, code_d, 0)), 1)
def test_wheel(self):
# wheel release events are made up with a debouncer
# map those two to stuff
w_up = (EV_REL, REL_WHEEL, -1)
hw_right = (EV_REL, REL_HWHEEL, 1)
# should be forwarded and present in the capabilities
hw_left = (EV_REL, REL_HWHEEL, -1)
custom_mapping.change(Key(*hw_right), 'k(b)')
custom_mapping.change(Key(*w_up), 'c')
system_mapping.clear()
code_b = 91
code_c = 92
system_mapping._set('b', code_b)
system_mapping._set('c', code_c)
device_name = 'device 1'
pending_events[device_name] = [
new_event(*w_up),
] * 10 + [
new_event(*hw_right),
new_event(*w_up),
] * 5 + [
new_event(*hw_left)
]
self.injector = Injector(device_name, custom_mapping)
device = InputDevice('/dev/input/event11')
# make sure this test uses a device that has the needed capabilities
# for the injector to grab it
self.assertIn(EV_REL, device.capabilities())
self.assertIn(REL_WHEEL, device.capabilities()[EV_REL])
self.assertIn(REL_HWHEEL, device.capabilities()[EV_REL])
self.assertIn(device.path, get_devices()[device_name]['paths'])
self.injector.start_injecting()
# wait for the first injected key down event
uinput_write_history_pipe[0].poll(timeout=1)
self.assertTrue(uinput_write_history_pipe[0].poll())
event = uinput_write_history_pipe[0].recv()
self.assertEqual(event.t, (EV_KEY, code_c, 1))
time.sleep(EVENT_READ_TIMEOUT * 5)
# in 5 more read-loop ticks, nothing new should have happened
self.assertFalse(uinput_write_history_pipe[0].poll())
time.sleep(EVENT_READ_TIMEOUT * 6)
# 5 more and it should be within the second phase in which
# the horizontal wheel is used. add some tolerance
self.assertTrue(uinput_write_history_pipe[0].poll())
event = uinput_write_history_pipe[0].recv()
self.assertEqual(event.t, (EV_KEY, code_b, 1))
time.sleep(EVENT_READ_TIMEOUT * 10 + 5 / 60)
# after 21 read-loop ticks all events should be consumed, wait for
# at least 3 (=5) producer-ticks so that the debouncers are triggered.
# Key-up events for both wheel events should be written now that no
# new key-down event arrived.
events = read_write_history_pipe()
self.assertEqual(events.count((EV_KEY, code_b, 0)), 1)
self.assertEqual(events.count((EV_KEY, code_c, 0)), 1)
self.assertEqual(events.count(hw_left), 1) # the unmapped wheel
# the unmapped wheel won't get a debounced release command, it's
# forwarded as is
self.assertNotIn((EV_REL, REL_HWHEEL, 0), events)
print(events)
self.assertEqual(len(events), 3)
def test_store_permutations_for_macros(self):
mapping = Mapping()
ev_1 = (EV_KEY, 41, 1)

@ -44,7 +44,7 @@ from keymapper.gtk.row import to_string, HOLDING, IDLE
from keymapper.dev import permissions
from keymapper.key import Key
from tests.test import tmp, pending_events, InputEvent, \
from tests.test import tmp, pending_events, new_event, \
uinput_write_history_pipe, cleanup
@ -333,7 +333,7 @@ class TestIntegration(unittest.TestCase):
# per second, so sleep a bit more than 0.033ms each time
# press down all the keys of a combination
for sub_key in key:
keycode_reader._pipe[1].send(InputEvent(*sub_key))
keycode_reader._pipe[1].send(new_event(*sub_key))
time.sleep(FILTER_THRESHOLD * 2)
# make the window consume the keycode
@ -347,7 +347,7 @@ class TestIntegration(unittest.TestCase):
# release all the keys
for sub_key in key:
keycode_reader._pipe[1].send(InputEvent(*sub_key[:2], 0))
keycode_reader._pipe[1].send(new_event(*sub_key[:2], 0))
# make the window consume the keycode
time.sleep(0.06)
@ -363,6 +363,7 @@ class TestIntegration(unittest.TestCase):
self.assertIn('changed', css_classes)
self.assertEqual(row.keycode_input.get_label(), to_string(key))
self.assertFalse(row.keycode_input.is_focus())
self.assertEqual(len(keycode_reader._unreleased), 0)
if not expect_success:
self.assertIsNone(row.get_key())
@ -733,8 +734,8 @@ class TestIntegration(unittest.TestCase):
system_mapping._set('a', keycode_to)
pending_events['device 2'] = [
InputEvent(evdev.events.EV_KEY, keycode_from, 1),
InputEvent(evdev.events.EV_KEY, keycode_from, 0)
new_event(evdev.events.EV_KEY, keycode_from, 1),
new_event(evdev.events.EV_KEY, keycode_from, 0)
]
custom_mapping.save(get_preset_path('device 2', 'foo preset'))
@ -774,7 +775,7 @@ class TestIntegration(unittest.TestCase):
# not all of those events should be processed, since that takes some
# time due to time.sleep in the fakes and the injection is stopped.
pending_events['device 2'] = [InputEvent(1, keycode_from, 1)] * 100
pending_events['device 2'] = [new_event(1, keycode_from, 1)] * 100
custom_mapping.save(get_preset_path('device 2', 'foo preset'))

@ -23,16 +23,17 @@ import unittest
import asyncio
import time
from evdev.ecodes import EV_KEY, EV_ABS, ABS_HAT0X, ABS_HAT0Y, KEY_A, BTN_TL
from evdev.ecodes import EV_KEY, EV_ABS, KEY_A, BTN_TL, \
ABS_HAT0X, ABS_HAT0Y, ABS_HAT1X, ABS_HAT1Y
from keymapper.dev.keycode_mapper import active_macros, handle_keycode,\
unreleased, subsets, log
unreleased, subsets
from keymapper.state import system_mapping
from keymapper.dev.macros import parse
from keymapper.config import config
from keymapper.mapping import Mapping, DISABLE_CODE
from tests.test import InputEvent, UInput, uinput_write_history, \
from tests.test import new_event, UInput, uinput_write_history, \
cleanup
@ -117,27 +118,27 @@ class TestKeycodeMapper(unittest.TestCase):
uinput = UInput()
# a bunch of d-pad key down events at once
handle_keycode(_key_to_code, {}, InputEvent(*ev_1), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*ev_4), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_1), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_4), uinput)
self.assertEqual(len(unreleased), 2)
self.assertEqual(unreleased.get(ev_1[:2]), ((EV_KEY, _key_to_code[(ev_1,)]), ev_1))
self.assertEqual(unreleased.get(ev_4[:2]), ((EV_KEY, _key_to_code[(ev_4,)]), ev_4))
# release all of them
handle_keycode(_key_to_code, {}, InputEvent(*ev_3), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*ev_6), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_3), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_6), uinput)
self.assertEqual(len(unreleased), 0)
# repeat with other values
handle_keycode(_key_to_code, {}, InputEvent(*ev_2), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*ev_5), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_2), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_5), uinput)
self.assertEqual(len(unreleased), 2)
self.assertEqual(unreleased.get(ev_2[:2]), ((EV_KEY, _key_to_code[(ev_2,)]), ev_2))
self.assertEqual(unreleased.get(ev_5[:2]), ((EV_KEY, _key_to_code[(ev_5,)]), ev_5))
# release all of them again
handle_keycode(_key_to_code, {}, InputEvent(*ev_3), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*ev_6), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_3), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_6), uinput)
self.assertEqual(len(unreleased), 0)
self.assertEqual(len(uinput_write_history), 8)
@ -154,6 +155,70 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertEqual(uinput_write_history[6].t, (EV_KEY, 52, 0))
self.assertEqual(uinput_write_history[7].t, (EV_KEY, 55, 0))
def test_not_forward(self):
down = (EV_KEY, 91, 1)
up = (EV_KEY, 91, 0)
uinput = UInput()
handle_keycode({}, {}, new_event(*down), uinput, False)
self.assertEqual(unreleased[(EV_KEY, 91)], (down[:2], down))
self.assertEqual(len(unreleased), 1)
self.assertEqual(uinput.write_count, 0)
handle_keycode({}, {}, new_event(*up), uinput, False)
self.assertEqual(len(unreleased), 0)
self.assertEqual(uinput.write_count, 0)
def test_dont_filter_unmapped(self):
# if an event is not used at all, it should be written into
# unmapped but not furthermore modified
down = (EV_KEY, 91, 1)
up = (EV_KEY, 91, 0)
uinput = UInput()
for _ in range(10):
handle_keycode({}, {}, new_event(*down), uinput)
self.assertEqual(unreleased[(EV_KEY, 91)], (down[:2], down))
self.assertEqual(len(unreleased), 1)
self.assertEqual(uinput.write_count, 10)
handle_keycode({}, {}, new_event(*up), uinput)
self.assertEqual(len(unreleased), 0)
self.assertEqual(uinput.write_count, 11)
def test_filter_combi_mapped_duplicate_down(self):
# the opposite of the other test, but don't map the key directly
# but rather as the trigger for a combination
down_1 = (EV_KEY, 91, 1)
down_2 = (EV_KEY, 92, 1)
up_1 = (EV_KEY, 91, 0)
up_2 = (EV_KEY, 92, 0)
uinput = UInput()
output = 71
key_to_code = {
(down_1, down_2): 71
}
handle_keycode(key_to_code, {}, new_event(*down_1), uinput)
for _ in range(10):
handle_keycode(key_to_code, {}, new_event(*down_2), uinput)
# all duplicate down events should have been ignored
self.assertEqual(len(unreleased), 2)
self.assertEqual(uinput.write_count, 2)
self.assertEqual(uinput_write_history[0].t, down_1)
self.assertEqual(uinput_write_history[1].t, (EV_KEY, output, 1))
handle_keycode({}, {}, new_event(*up_1), uinput)
handle_keycode({}, {}, new_event(*up_2), uinput)
self.assertEqual(len(unreleased), 0)
self.assertEqual(uinput.write_count, 4)
self.assertEqual(uinput_write_history[2].t, up_1)
self.assertEqual(uinput_write_history[3].t, (EV_KEY, output, 0))
def test_d_pad_combination(self):
ev_1 = (EV_ABS, ABS_HAT0X, 1)
ev_2 = (EV_ABS, ABS_HAT0Y, -1)
@ -168,8 +233,8 @@ class TestKeycodeMapper(unittest.TestCase):
uinput = UInput()
# a bunch of d-pad key down events at once
handle_keycode(_key_to_code, {}, InputEvent(*ev_1), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*ev_2), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_1), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_2), uinput)
# (what_will_be_released, what_caused_the_key_down)
self.assertEqual(unreleased.get(ev_1[:2]), ((EV_ABS, ABS_HAT0X), ev_1))
self.assertEqual(unreleased.get(ev_2[:2]), ((EV_KEY, 51), ev_2))
@ -181,8 +246,8 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertEqual(uinput_write_history[1].t, (EV_KEY, 51, 1))
# release all of them
handle_keycode(_key_to_code, {}, InputEvent(*ev_3), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*ev_4), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_3), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_4), uinput)
self.assertEqual(len(unreleased), 0)
self.assertEqual(len(uinput_write_history), 4)
@ -196,9 +261,9 @@ class TestKeycodeMapper(unittest.TestCase):
}
uinput = UInput()
handle_keycode(_key_to_code, {}, InputEvent(EV_KEY, 1, 1), uinput)
handle_keycode(_key_to_code, {}, InputEvent(EV_KEY, 3, 1), uinput)
handle_keycode(_key_to_code, {}, InputEvent(EV_KEY, 2, 1), uinput)
handle_keycode(_key_to_code, {}, new_event(EV_KEY, 1, 1), uinput)
handle_keycode(_key_to_code, {}, new_event(EV_KEY, 3, 1), uinput)
handle_keycode(_key_to_code, {}, new_event(EV_KEY, 2, 1), uinput)
self.assertEqual(len(uinput_write_history), 3)
self.assertEqual(uinput_write_history[0].t, (EV_KEY, 101, 1))
@ -212,8 +277,8 @@ class TestKeycodeMapper(unittest.TestCase):
}
uinput = UInput()
handle_keycode(_key_to_code, {}, InputEvent(*combination[0]), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*combination[1]), uinput)
handle_keycode(_key_to_code, {}, new_event(*combination[0]), uinput)
handle_keycode(_key_to_code, {}, new_event(*combination[1]), uinput)
self.assertEqual(len(uinput_write_history), 2)
# the first event is written and then the triggered combination
@ -221,8 +286,8 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertEqual(uinput_write_history[1].t, (EV_KEY, 101, 1))
# release them
handle_keycode(_key_to_code, {}, InputEvent(*combination[0][:2], 0), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*combination[1][:2], 0), uinput)
handle_keycode(_key_to_code, {}, new_event(*combination[0][:2], 0), uinput)
handle_keycode(_key_to_code, {}, new_event(*combination[1][:2], 0), uinput)
# the first key writes its release event. The second key is hidden
# behind the executed combination. The result of the combination is
# also released, because it acts like a key.
@ -232,8 +297,8 @@ class TestKeycodeMapper(unittest.TestCase):
# press them in the wrong order (the wrong key at the end, the order
# of all other keys won't matter). no combination should be triggered
handle_keycode(_key_to_code, {}, InputEvent(*combination[1]), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*combination[0]), uinput)
handle_keycode(_key_to_code, {}, new_event(*combination[1]), uinput)
handle_keycode(_key_to_code, {}, new_event(*combination[0]), uinput)
self.assertEqual(len(uinput_write_history), 6)
self.assertEqual(uinput_write_history[4].t, (EV_KEY, 2, 1))
self.assertEqual(uinput_write_history[5].t, (EV_KEY, 1, 1))
@ -265,12 +330,12 @@ class TestKeycodeMapper(unittest.TestCase):
uinput = UInput()
# 10 and 11: more key-down events than needed
handle_keycode(_key_to_code, {}, InputEvent(EV_KEY, 10, 1), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*combination_1[0]), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*combination_1[1]), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*combination_1[2]), uinput)
handle_keycode(_key_to_code, {}, InputEvent(EV_KEY, 11, 1), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*combination_1[3]), uinput)
handle_keycode(_key_to_code, {}, new_event(EV_KEY, 10, 1), uinput)
handle_keycode(_key_to_code, {}, new_event(*combination_1[0]), uinput)
handle_keycode(_key_to_code, {}, new_event(*combination_1[1]), uinput)
handle_keycode(_key_to_code, {}, new_event(*combination_1[2]), uinput)
handle_keycode(_key_to_code, {}, new_event(EV_KEY, 11, 1), uinput)
handle_keycode(_key_to_code, {}, new_event(*combination_1[3]), uinput)
self.assertEqual(len(uinput_write_history), 6)
# the first event is written and then the triggered combination
@ -280,7 +345,7 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertEqual(uinput_write_history[5].t, (EV_KEY, 101, 1))
# while the combination is down, another unrelated key can be used
handle_keycode(_key_to_code, {}, InputEvent(*down_5), uinput)
handle_keycode(_key_to_code, {}, new_event(*down_5), uinput)
# the keycode_mapper searches for subsets of the current held-down
# keys to activate combinations, down_5 should not trigger them
# again.
@ -289,8 +354,8 @@ class TestKeycodeMapper(unittest.TestCase):
# release the combination by releasing the last key, and release
# the unrelated key
handle_keycode(_key_to_code, {}, InputEvent(*up_4), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*up_5), uinput)
handle_keycode(_key_to_code, {}, new_event(*up_4), uinput)
handle_keycode(_key_to_code, {}, new_event(*up_5), uinput)
self.assertEqual(len(uinput_write_history), 9)
self.assertEqual(uinput_write_history[7].t, (EV_KEY, 101, 0))
@ -313,8 +378,8 @@ class TestKeycodeMapper(unittest.TestCase):
macro_mapping[((EV_KEY, 1, 1),)].set_handler(lambda *args: history.append(args))
macro_mapping[((EV_KEY, 2, 1),)].set_handler(lambda *args: history.append(args))
handle_keycode({}, macro_mapping, InputEvent(EV_KEY, 1, 1), None)
handle_keycode({}, macro_mapping, InputEvent(EV_KEY, 2, 1), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 2, 1), None)
loop = asyncio.get_event_loop()
@ -330,6 +395,16 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertIn((code_b, 1), history)
self.assertIn((code_b, 0), history)
# releasing stuff
self.assertIn((EV_KEY, 1), unreleased)
self.assertIn((EV_KEY, 2), unreleased)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 0), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 2, 0), None)
self.assertNotIn((EV_KEY, 1), unreleased)
self.assertNotIn((EV_KEY, 2), unreleased)
loop.run_until_complete(asyncio.sleep(0.1))
self.assertEqual(len(history), 12)
def test_hold(self):
history = []
@ -352,7 +427,7 @@ class TestKeycodeMapper(unittest.TestCase):
"""start macro"""
handle_keycode({}, macro_mapping, InputEvent(EV_KEY, 1, 1), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None)
loop = asyncio.get_event_loop()
@ -366,7 +441,7 @@ class TestKeycodeMapper(unittest.TestCase):
"""stop macro"""
handle_keycode({}, macro_mapping, InputEvent(EV_KEY, 1, 0), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 0), None)
loop.run_until_complete(asyncio.sleep(keystroke_sleep * 10 / 1000))
@ -422,7 +497,7 @@ class TestKeycodeMapper(unittest.TestCase):
"""start macro 2"""
handle_keycode({}, macro_mapping, InputEvent(EV_KEY, 2, 1), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 2, 1), None)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.sleep(0.1))
@ -432,8 +507,8 @@ class TestKeycodeMapper(unittest.TestCase):
# spam garbage events
for _ in range(5):
handle_keycode({}, macro_mapping, InputEvent(EV_KEY, 1, 1), None)
handle_keycode({}, macro_mapping, InputEvent(EV_KEY, 3, 1), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 3, 1), None)
loop.run_until_complete(asyncio.sleep(0.05))
self.assertTrue(active_macros[(EV_KEY, 1)].holding)
self.assertTrue(active_macros[(EV_KEY, 1)].running)
@ -451,7 +526,7 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertNotIn((code_d, 0), history)
# stop macro 2
handle_keycode({}, macro_mapping, InputEvent(EV_KEY, 2, 0), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 2, 0), None)
loop.run_until_complete(asyncio.sleep(0.1))
# it stopped and didn't restart, so the count stays at 1
@ -472,7 +547,7 @@ class TestKeycodeMapper(unittest.TestCase):
history = []
handle_keycode({}, macro_mapping, InputEvent(EV_KEY, 2, 1), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 2, 1), None)
loop.run_until_complete(asyncio.sleep(0.1))
self.assertEqual(history.count((code_c, 1)), 1)
self.assertEqual(history.count((code_c, 0)), 1)
@ -480,8 +555,8 @@ class TestKeycodeMapper(unittest.TestCase):
# spam garbage events again, this time key-up events on all other
# macros
for _ in range(5):
handle_keycode({}, macro_mapping, InputEvent(EV_KEY, 1, 0), None)
handle_keycode({}, macro_mapping, InputEvent(EV_KEY, 3, 0), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 0), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 3, 0), None)
loop.run_until_complete(asyncio.sleep(0.05))
self.assertFalse(active_macros[(EV_KEY, 1)].holding)
self.assertFalse(active_macros[(EV_KEY, 1)].running)
@ -491,7 +566,7 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertFalse(active_macros[(EV_KEY, 3)].running)
# stop macro 2
handle_keycode({}, macro_mapping, InputEvent(EV_KEY, 2, 0), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 2, 0), None)
loop.run_until_complete(asyncio.sleep(0.1))
# was started only once
self.assertEqual(history.count((code_c, 1)), 1)
@ -501,8 +576,8 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertEqual(history.count((code_d, 0)), 1)
# stop all macros
handle_keycode({}, macro_mapping, InputEvent(EV_KEY, 1, 0), None)
handle_keycode({}, macro_mapping, InputEvent(EV_KEY, 3, 0), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 0), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 3, 0), None)
loop.run_until_complete(asyncio.sleep(0.1))
# it's stopped and won't write stuff anymore
@ -539,14 +614,14 @@ class TestKeycodeMapper(unittest.TestCase):
macro_mapping[((EV_KEY, 1, 1),)].set_handler(handler)
handle_keycode({}, macro_mapping, InputEvent(EV_KEY, 1, 1), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.sleep(0.1))
for _ in range(5):
self.assertTrue(active_macros[(EV_KEY, 1)].holding)
self.assertTrue(active_macros[(EV_KEY, 1)].running)
handle_keycode({}, macro_mapping, InputEvent(EV_KEY, 1, 1), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 1), None)
loop.run_until_complete(asyncio.sleep(0.05))
# duplicate key down events don't do anything
@ -556,7 +631,7 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertEqual(history.count((code_c, 0)), 0)
# stop
handle_keycode({}, macro_mapping, InputEvent(EV_KEY, 1, 0), None)
handle_keycode({}, macro_mapping, new_event(EV_KEY, 1, 0), None)
loop.run_until_complete(asyncio.sleep(0.1))
self.assertEqual(history.count((code_a, 1)), 1)
self.assertEqual(history.count((code_a, 0)), 1)
@ -617,18 +692,18 @@ class TestKeycodeMapper(unittest.TestCase):
keys_uinput = UInput()
# key up won't do anything
handle_keycode({}, macro_mapping, InputEvent(*up_0), macros_uinput)
handle_keycode({}, macro_mapping, InputEvent(*up_1), macros_uinput)
handle_keycode({}, macro_mapping, InputEvent(*up_2), macros_uinput)
handle_keycode({}, macro_mapping, new_event(*up_0), macros_uinput)
handle_keycode({}, macro_mapping, new_event(*up_1), macros_uinput)
handle_keycode({}, macro_mapping, new_event(*up_2), macros_uinput)
loop.run_until_complete(asyncio.sleep(0.1))
self.assertEqual(len(active_macros), 0)
"""start macros"""
handle_keycode({}, macro_mapping, InputEvent(*down_0), keys_uinput)
handle_keycode({}, macro_mapping, new_event(*down_0), keys_uinput)
self.assertEqual(keys_uinput.write_count, 1)
handle_keycode({}, macro_mapping, InputEvent(*down_1), keys_uinput)
handle_keycode({}, macro_mapping, InputEvent(*down_2), keys_uinput)
handle_keycode({}, macro_mapping, new_event(*down_1), keys_uinput)
handle_keycode({}, macro_mapping, new_event(*down_2), keys_uinput)
self.assertEqual(keys_uinput.write_count, 1)
# let the mainloop run for some time so that the macro does its stuff
@ -642,11 +717,19 @@ class TestKeycodeMapper(unittest.TestCase):
self.assertTrue(active_macros[key_2].holding)
self.assertTrue(active_macros[key_2].running)
self.assertIn(down_0[:2], unreleased)
self.assertIn(down_1[:2], unreleased)
self.assertIn(down_2[:2], unreleased)
"""stop macros"""
# releasing the last key of a combination releases the whole macro
handle_keycode({}, macro_mapping, InputEvent(*up_1), None)
handle_keycode({}, macro_mapping, InputEvent(*up_2), None)
handle_keycode({}, macro_mapping, new_event(*up_1), None)
handle_keycode({}, macro_mapping, new_event(*up_2), None)
self.assertIn(down_0[:2], unreleased)
self.assertNotIn(down_1[:2], unreleased)
self.assertNotIn(down_2[:2], unreleased)
loop.run_until_complete(asyncio.sleep(keystroke_sleep * 10 / 1000))
@ -695,14 +778,15 @@ class TestKeycodeMapper(unittest.TestCase):
# try two concurrent macros with D-Pad events because they are
# more difficult to manage, since their only difference is their
# value, and one of them is negative.
down_1 = (EV_ABS, ABS_HAT0X, 1)
down_2 = (EV_ABS, ABS_HAT0X, -1)
right = (EV_ABS, ABS_HAT0X, 1)
release = (EV_ABS, ABS_HAT0X, 0)
left = (EV_ABS, ABS_HAT0X, -1)
repeats = 10
macro_mapping = {
(down_1,): parse(f'r({repeats}, k(1))', self.mapping),
(down_2,): parse(f'r({repeats}, k(2))', self.mapping)
(right,): parse(f'r({repeats}, k(1))', self.mapping),
(left,): parse(f'r({repeats}, k(2))', self.mapping)
}
history = []
@ -710,22 +794,25 @@ class TestKeycodeMapper(unittest.TestCase):
def handler(*args):
history.append(args)
macro_mapping[(down_1,)].set_handler(handler)
macro_mapping[(down_2,)].set_handler(handler)
macro_mapping[(right,)].set_handler(handler)
macro_mapping[(left,)].set_handler(handler)
handle_keycode({}, macro_mapping, InputEvent(*down_1), None)
handle_keycode({}, macro_mapping, InputEvent(*down_2), None)
handle_keycode({}, macro_mapping, new_event(*right), None)
self.assertIn((EV_ABS, ABS_HAT0X), unreleased)
handle_keycode({}, macro_mapping, new_event(*release), None)
self.assertNotIn((EV_ABS, ABS_HAT0X), unreleased)
handle_keycode({}, macro_mapping, new_event(*left), None)
self.assertIn((EV_ABS, ABS_HAT0X), unreleased)
loop = asyncio.get_event_loop()
sleeptime = config.get('macros.keystroke_sleep_ms') / 1000
loop.run_until_complete(asyncio.sleep(1.1 * repeats * 2 * sleeptime))
self.assertEqual(len(history), repeats * 4)
self.assertEqual(history.count((code_1, 1)), 10)
self.assertEqual(history.count((code_1, 0)), 10)
self.assertEqual(history.count((code_2, 1)), 10)
self.assertEqual(history.count((code_2, 0)), 10)
self.assertEqual(len(history), repeats * 4)
def test_filter_trigger_spam(self):
# test_filter_duplicates
@ -741,16 +828,22 @@ class TestKeycodeMapper(unittest.TestCase):
"""positive"""
for _ in range(1, 20):
handle_keycode(_key_to_code, {}, InputEvent(*trigger, 1), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*trigger, 0), uinput)
handle_keycode(_key_to_code, {}, new_event(*trigger, 1), uinput)
self.assertIn(trigger, unreleased)
handle_keycode(_key_to_code, {}, new_event(*trigger, 0), uinput)
self.assertNotIn(trigger, unreleased)
self.assertEqual(len(uinput_write_history), 2)
"""negative"""
for _ in range(1, 20):
handle_keycode(_key_to_code, {}, InputEvent(*trigger, -1), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*trigger, 0), uinput)
handle_keycode(_key_to_code, {}, new_event(*trigger, -1), uinput)
self.assertIn(trigger, unreleased)
handle_keycode(_key_to_code, {}, new_event(*trigger, 0), uinput)
self.assertNotIn(trigger, unreleased)
self.assertEqual(len(uinput_write_history), 4)
self.assertEqual(uinput_write_history[0].t, (EV_KEY, 51, 1))
@ -772,10 +865,14 @@ class TestKeycodeMapper(unittest.TestCase):
}
uinput = UInput()
handle_keycode(_key_to_code, {}, InputEvent(*ev_1), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_1), uinput)
for _ in range(10):
handle_keycode(_key_to_code, {}, InputEvent(*ev_2), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*ev_3), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_2), uinput)
self.assertIn(key, unreleased)
handle_keycode(_key_to_code, {}, new_event(*ev_3), uinput)
self.assertNotIn(key, unreleased)
self.assertEqual(len(uinput_write_history), 2)
self.assertEqual(uinput_write_history[0].t, (EV_KEY, 21, 1))
@ -785,7 +882,7 @@ class TestKeycodeMapper(unittest.TestCase):
ev_1 = (EV_ABS, ABS_HAT0Y, 1)
ev_2 = (EV_ABS, ABS_HAT0Y, 0)
ev_3 = (EV_ABS, ABS_HAT0X, 1)
ev_3 = (EV_ABS, ABS_HAT0X, 1) # disabled
ev_4 = (EV_ABS, ABS_HAT0X, 0)
ev_5 = (EV_KEY, KEY_A, 1)
@ -806,11 +903,15 @@ class TestKeycodeMapper(unittest.TestCase):
"""single keys"""
# down
handle_keycode(_key_to_code, {}, InputEvent(*ev_1), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*ev_3), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_1), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_3), uinput)
self.assertIn(ev_1[:2], unreleased)
self.assertIn(ev_3[:2], unreleased)
# up
handle_keycode(_key_to_code, {}, InputEvent(*ev_2), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*ev_4), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_2), uinput)
handle_keycode(_key_to_code, {}, new_event(*ev_4), uinput)
self.assertNotIn(ev_1[:2], unreleased)
self.assertNotIn(ev_3[:2], unreleased)
self.assertEqual(len(uinput_write_history), 2)
self.assertEqual(uinput_write_history[0].t, (EV_KEY, 61, 1))
@ -819,51 +920,110 @@ class TestKeycodeMapper(unittest.TestCase):
"""a combination that ends in a disabled key"""
# ev_5 should be forwarded and the combination triggered
handle_keycode(_key_to_code, {}, InputEvent(*combi_1[0]), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*combi_1[1]), uinput)
handle_keycode(_key_to_code, {}, new_event(*combi_1[0]), uinput)
handle_keycode(_key_to_code, {}, new_event(*combi_1[1]), uinput)
self.assertEqual(len(uinput_write_history), 4)
self.assertEqual(uinput_write_history[2].t, (EV_KEY, KEY_A, 1))
self.assertEqual(uinput_write_history[3].t, (EV_KEY, 62, 1))
self.assertIn(combi_1[0][:2], unreleased)
self.assertIn(combi_1[1][:2], unreleased)
# release the last key of the combi first, it should
# release what the combination maps to
event = InputEvent(combi_1[1][0], combi_1[1][1], 0)
event = new_event(combi_1[1][0], combi_1[1][1], 0)
handle_keycode(_key_to_code, {}, event, uinput)
self.assertEqual(len(uinput_write_history), 5)
self.assertEqual(uinput_write_history[-1].t, (EV_KEY, 62, 0))
self.assertIn(combi_1[0][:2], unreleased)
self.assertNotIn(combi_1[1][:2], unreleased)
event = InputEvent(combi_1[0][0], combi_1[0][1], 0)
event = new_event(combi_1[0][0], combi_1[0][1], 0)
handle_keycode(_key_to_code, {}, event, uinput)
self.assertEqual(len(uinput_write_history), 6)
self.assertEqual(uinput_write_history[-1].t, (EV_KEY, KEY_A, 0))
self.assertNotIn(combi_1[0][:2], unreleased)
self.assertNotIn(combi_1[1][:2], unreleased)
"""a combination that starts with a disabled key"""
# only the combination should get triggered
handle_keycode(_key_to_code, {}, InputEvent(*combi_2[0]), uinput)
handle_keycode(_key_to_code, {}, InputEvent(*combi_2[1]), uinput)
handle_keycode(_key_to_code, {}, new_event(*combi_2[0]), uinput)
handle_keycode(_key_to_code, {}, new_event(*combi_2[1]), uinput)
self.assertEqual(len(uinput_write_history), 7)
self.assertEqual(uinput_write_history[-1].t, (EV_KEY, 63, 1))
# release the last key of the combi first, it should
# release what the combination maps to
event = InputEvent(combi_2[1][0], combi_2[1][1], 0)
event = new_event(combi_2[1][0], combi_2[1][1], 0)
handle_keycode(_key_to_code, {}, event, uinput)
self.assertEqual(len(uinput_write_history), 8)
self.assertEqual(uinput_write_history[-1].t, (EV_KEY, 63, 0))
# the first key of combi_2 is disabled, so it won't write another
# key-up event
event = InputEvent(combi_2[0][0], combi_2[0][1], 0)
event = new_event(combi_2[0][0], combi_2[0][1], 0)
handle_keycode(_key_to_code, {}, event, uinput)
self.assertEqual(len(uinput_write_history), 8)
def test_log(self):
msg1 = log(((1, 2, 1),), 'foo %s bar', 1234)
self.assertEqual(msg1, '((1, 2, 1)) ------------------- foo 1234 bar')
def test_combination_keycode_macro_mix(self):
# ev_1 triggers macro, ev_1 + ev_2 triggers key while the macro is
# still running
system_mapping.clear()
system_mapping._set('a', 92)
down_1 = (EV_ABS, ABS_HAT1X, 1)
down_2 = (EV_ABS, ABS_HAT1Y, -1)
up_1 = (EV_ABS, ABS_HAT1X, 0)
up_2 = (EV_ABS, ABS_HAT1Y, 0)
macro_mapping = {(down_1,): parse('h(k(a))', self.mapping)}
_key_to_code = {(down_1, down_2): 91}
macro_history = []
def handler(*args):
macro_history.append(args)
macro_mapping[(down_1,)].set_handler(handler)
msg2 = log(((1, 200, -1), (1, 5, 1)), 'foo %s', (1, 2))
self.assertEqual(msg2, '((1, 200, -1), (1, 5, 1)) ----- foo (1, 2)')
uinput = UInput()
loop = asyncio.get_event_loop()
# macro starts
handle_keycode(_key_to_code, macro_mapping, new_event(*down_1), uinput)
loop.run_until_complete(asyncio.sleep(0.05))
self.assertEqual(len(uinput_write_history), 0)
self.assertGreater(len(macro_history), 1)
self.assertIn(down_1[:2], unreleased)
self.assertIn((92, 1), macro_history)
# combination triggered
handle_keycode(_key_to_code, macro_mapping, new_event(*down_2), uinput)
self.assertIn(down_1[:2], unreleased)
self.assertIn(down_2[:2], unreleased)
self.assertEqual(uinput_write_history[0].t, (EV_KEY, 91, 1))
len_a = len(macro_history)
loop.run_until_complete(asyncio.sleep(0.05))
len_b = len(macro_history)
# still running
self.assertGreater(len_b, len_a)
# release
handle_keycode(_key_to_code, macro_mapping, new_event(*up_1), uinput)
self.assertNotIn(down_1[:2], unreleased)
self.assertIn(down_2[:2], unreleased)
loop.run_until_complete(asyncio.sleep(0.05))
len_c = len(macro_history)
loop.run_until_complete(asyncio.sleep(0.05))
len_d = len(macro_history)
# not running anymore
self.assertEqual(len_c, len_d)
handle_keycode(_key_to_code, macro_mapping, new_event(*up_2), uinput)
self.assertEqual(uinput_write_history[1].t, (EV_KEY, 91, 0))
self.assertEqual(len(uinput_write_history), 2)
self.assertNotIn(down_1[:2], unreleased)
self.assertNotIn(down_2[:2], unreleased)
if __name__ == "__main__":

@ -43,6 +43,15 @@ class TestLogger(unittest.TestCase):
update_verbosity(debug=True)
def test_key_spam(self):
path = add_filehandler(os.path.join(tmp, 'logger-test'))
logger.key_spam(((1, 2, 1),), 'foo %s bar', 1234)
logger.key_spam(((1, 200, -1), (1, 5, 1)), 'foo %s', (1, 2))
with open(path, 'r') as f:
content = f.read().lower()
self.assertIn('((1, 2, 1)) ------------------- foo 1234 bar', content)
self.assertIn('((1, 200, -1), (1, 5, 1)) ----- foo (1, 2)', content)
def test_log_info(self):
update_verbosity(debug=False)
path = add_filehandler(os.path.join(tmp, 'logger-test'))

@ -22,6 +22,7 @@
import os
import unittest
import json
from evdev.ecodes import EV_KEY, EV_ABS, ABS_HAT0X, KEY_A
from keymapper.mapping import Mapping

@ -30,7 +30,7 @@ from keymapper.dev.reader import keycode_reader
from keymapper.state import custom_mapping
from keymapper.config import BUTTONS, MOUSE
from tests.test import InputEvent, pending_events, EVENT_READ_TIMEOUT, \
from tests.test import new_event, pending_events, EVENT_READ_TIMEOUT, \
cleanup, MAX_ABS
@ -61,7 +61,7 @@ class TestReader(unittest.TestCase):
def test_reading_1(self):
# a single event
pending_events['device 1'] = [
InputEvent(EV_ABS, ABS_HAT0X, 1)
new_event(EV_ABS, ABS_HAT0X, 1)
]
keycode_reader.start_reading('device 1')
wait(keycode_reader._pipe[0].poll, 0.5)
@ -69,12 +69,28 @@ class TestReader(unittest.TestCase):
self.assertEqual(keycode_reader.read(), None)
self.assertEqual(len(keycode_reader._unreleased), 1)
def test_change_wheel_direction(self):
keycode_reader.start_reading('device 1')
keycode_reader._pipe[1].send(new_event(1234, 2345, 1))
self.assertEqual(keycode_reader.read(), (1234, 2345, 1))
self.assertEqual(len(keycode_reader._unreleased), 1)
self.assertEqual(keycode_reader.read(), None)
keycode_reader._pipe[1].send(new_event(1234, 2345, -1))
self.assertEqual(keycode_reader.read(), (1234, 2345, -1))
# notice that this is no combination of two sides, the previous
# entry in unreleased has to get overwritten. So there is still only
# one element in it.
self.assertEqual(len(keycode_reader._unreleased), 1)
self.assertEqual(keycode_reader.read(), None)
def test_reading_2(self):
# a combination of events
pending_events['device 1'] = [
InputEvent(EV_KEY, CODE_1, 1, 10000.1234),
InputEvent(EV_KEY, CODE_3, 1, 10001.1234),
InputEvent(EV_ABS, ABS_HAT0X, -1, 10002.1234)
new_event(EV_KEY, CODE_1, 1, 10000.1234),
new_event(EV_KEY, CODE_3, 1, 10001.1234),
new_event(EV_ABS, ABS_HAT0X, -1, 10002.1234)
]
keycode_reader.start_reading('device 1')
@ -95,7 +111,7 @@ class TestReader(unittest.TestCase):
# if their purpose is "buttons"
custom_mapping.set('gamepad.joystick.left_purpose', BUTTONS)
pending_events['gamepad'] = [
InputEvent(EV_ABS, ABS_Y, MAX_ABS)
new_event(EV_ABS, ABS_Y, MAX_ABS)
]
keycode_reader.start_reading('gamepad')
wait(keycode_reader._pipe[0].poll, 0.5)
@ -106,7 +122,7 @@ class TestReader(unittest.TestCase):
keycode_reader._unreleased = {}
custom_mapping.set('gamepad.joystick.left_purpose', MOUSE)
pending_events['gamepad'] = [
InputEvent(EV_ABS, ABS_Y, MAX_ABS)
new_event(EV_ABS, ABS_Y, MAX_ABS)
]
keycode_reader.start_reading('gamepad')
time.sleep(0.1)
@ -119,9 +135,9 @@ class TestReader(unittest.TestCase):
# comfortably. Furthermore, reading mouse events breaks clicking
# around in the table. It can still be changed in the config files.
pending_events['device 1'] = [
InputEvent(EV_KEY, BTN_LEFT, 1),
InputEvent(EV_KEY, CODE_2, 1),
InputEvent(EV_KEY, BTN_TOOL_DOUBLETAP, 1),
new_event(EV_KEY, BTN_LEFT, 1),
new_event(EV_KEY, CODE_2, 1),
new_event(EV_KEY, BTN_TOOL_DOUBLETAP, 1),
]
keycode_reader.start_reading('device 1')
time.sleep(0.1)
@ -132,8 +148,8 @@ class TestReader(unittest.TestCase):
def test_ignore_value_2(self):
# this is not a combination, because (EV_KEY CODE_3, 2) is ignored
pending_events['device 1'] = [
InputEvent(EV_ABS, ABS_HAT0X, 1),
InputEvent(EV_KEY, CODE_3, 2)
new_event(EV_ABS, ABS_HAT0X, 1),
new_event(EV_KEY, CODE_3, 2)
]
keycode_reader.start_reading('device 1')
wait(keycode_reader._pipe[0].poll, 0.5)
@ -143,9 +159,9 @@ class TestReader(unittest.TestCase):
def test_reading_ignore_up(self):
pending_events['device 1'] = [
InputEvent(EV_KEY, CODE_1, 0, 10),
InputEvent(EV_KEY, CODE_2, 1, 11),
InputEvent(EV_KEY, CODE_3, 0, 12),
new_event(EV_KEY, CODE_1, 0, 10),
new_event(EV_KEY, CODE_2, 1, 11),
new_event(EV_KEY, CODE_3, 0, 12),
]
keycode_reader.start_reading('device 1')
time.sleep(0.1)
@ -155,13 +171,13 @@ class TestReader(unittest.TestCase):
def test_reading_ignore_duplicate_down(self):
pipe = multiprocessing.Pipe()
pipe[1].send(InputEvent(EV_ABS, ABS_Z, 1, 10))
pipe[1].send(new_event(EV_ABS, ABS_Z, 1, 10))
keycode_reader._pipe = pipe
self.assertEqual(keycode_reader.read(), (EV_ABS, ABS_Z, 1))
self.assertEqual(keycode_reader.read(), None)
pipe[1].send(InputEvent(EV_ABS, ABS_Z, 1, 10))
pipe[1].send(new_event(EV_ABS, ABS_Z, 1, 10))
# still none
self.assertEqual(keycode_reader.read(), None)
@ -169,9 +185,9 @@ class TestReader(unittest.TestCase):
def test_wrong_device(self):
pending_events['device 1'] = [
InputEvent(EV_KEY, CODE_1, 1),
InputEvent(EV_KEY, CODE_2, 1),
InputEvent(EV_KEY, CODE_3, 1)
new_event(EV_KEY, CODE_1, 1),
new_event(EV_KEY, CODE_2, 1),
new_event(EV_KEY, CODE_3, 1)
]
keycode_reader.start_reading('device 2')
time.sleep(EVENT_READ_TIMEOUT * 5)
@ -184,9 +200,9 @@ class TestReader(unittest.TestCase):
# intentionally programmed it won't even do that. But it was at some
# point.
pending_events['key-mapper device 2'] = [
InputEvent(EV_KEY, CODE_1, 1),
InputEvent(EV_KEY, CODE_2, 1),
InputEvent(EV_KEY, CODE_3, 1)
new_event(EV_KEY, CODE_1, 1),
new_event(EV_KEY, CODE_2, 1),
new_event(EV_KEY, CODE_3, 1)
]
keycode_reader.start_reading('device 2')
time.sleep(EVENT_READ_TIMEOUT * 5)
@ -195,9 +211,9 @@ class TestReader(unittest.TestCase):
def test_clear(self):
pending_events['device 1'] = [
InputEvent(EV_KEY, CODE_1, 1),
InputEvent(EV_KEY, CODE_2, 1),
InputEvent(EV_KEY, CODE_3, 1)
new_event(EV_KEY, CODE_1, 1),
new_event(EV_KEY, CODE_2, 1),
new_event(EV_KEY, CODE_3, 1)
]
keycode_reader.start_reading('device 1')
time.sleep(EVENT_READ_TIMEOUT * 5)
@ -206,8 +222,8 @@ class TestReader(unittest.TestCase):
self.assertEqual(len(keycode_reader._unreleased), 0)
def test_switch_device(self):
pending_events['device 2'] = [InputEvent(EV_KEY, CODE_1, 1)]
pending_events['device 1'] = [InputEvent(EV_KEY, CODE_3, 1)]
pending_events['device 2'] = [new_event(EV_KEY, CODE_1, 1)]
pending_events['device 1'] = [new_event(EV_KEY, CODE_3, 1)]
keycode_reader.start_reading('device 2')
time.sleep(EVENT_READ_TIMEOUT * 5)
@ -223,19 +239,19 @@ class TestReader(unittest.TestCase):
# with every button press. Or more general, prioritize them
# based on the event type
pending_events['device 1'] = [
InputEvent(EV_ABS, ABS_HAT0X, 1, 1234.0000),
InputEvent(EV_ABS, ABS_HAT0X, 0, 1234.0001),
new_event(EV_ABS, ABS_HAT0X, 1, 1234.0000),
new_event(EV_ABS, ABS_HAT0X, 0, 1234.0001),
InputEvent(EV_ABS, ABS_HAT0X, 1, 1235.0000), # ignored
InputEvent(EV_ABS, ABS_HAT0X, 0, 1235.0001),
new_event(EV_ABS, ABS_HAT0X, 1, 1235.0000), # ignored
new_event(EV_ABS, ABS_HAT0X, 0, 1235.0001),
InputEvent(EV_KEY, KEY_COMMA, 1, 1235.0010),
InputEvent(EV_KEY, KEY_COMMA, 0, 1235.0011),
new_event(EV_KEY, KEY_COMMA, 1, 1235.0010),
new_event(EV_KEY, KEY_COMMA, 0, 1235.0011),
InputEvent(EV_ABS, ABS_HAT0X, 1, 1235.0020), # ignored
InputEvent(EV_ABS, ABS_HAT0X, 0, 1235.0021), # ignored
new_event(EV_ABS, ABS_HAT0X, 1, 1235.0020), # ignored
new_event(EV_ABS, ABS_HAT0X, 0, 1235.0021), # ignored
InputEvent(EV_ABS, ABS_HAT0X, 1, 1236.0000)
new_event(EV_ABS, ABS_HAT0X, 1, 1236.0000)
]
keycode_reader.start_reading('device 1')
wait(keycode_reader._pipe[0].poll, 0.5)
@ -249,11 +265,11 @@ class TestReader(unittest.TestCase):
# value and normal for some ev_abs events.
custom_mapping.set('gamepad.joystick.left_purpose', BUTTONS)
pending_events['gamepad'] = [
InputEvent(EV_ABS, ABS_HAT0X, 1, 1234.0000),
InputEvent(EV_ABS, ABS_MISC, 1, 1235.0000), # ignored
InputEvent(EV_ABS, ABS_Y, MAX_ABS, 1235.0010),
InputEvent(EV_ABS, ABS_MISC, 1, 1235.0020), # ignored
InputEvent(EV_ABS, ABS_MISC, 1, 1235.0030) # ignored
new_event(EV_ABS, ABS_HAT0X, 1, 1234.0000),
new_event(EV_ABS, ABS_MISC, 1, 1235.0000), # ignored
new_event(EV_ABS, ABS_Y, MAX_ABS, 1235.0010),
new_event(EV_ABS, ABS_MISC, 1, 1235.0020), # ignored
new_event(EV_ABS, ABS_MISC, 1, 1235.0030) # ignored
# this time, don't release anything. the combination should
# ignore stuff as well.
]
@ -270,8 +286,8 @@ class TestReader(unittest.TestCase):
def test_prioritizing_3_normalize(self):
# take the sign of -1234, just like in test_prioritizing_2_normalize
pending_events['device 1'] = [
InputEvent(EV_ABS, ABS_HAT0X, -1234, 1234.0000),
InputEvent(EV_ABS, ABS_HAT0Y, 0, 1234.0030) # ignored
new_event(EV_ABS, ABS_HAT0X, -1234, 1234.0000),
new_event(EV_ABS, ABS_HAT0Y, 0, 1234.0030) # ignored
# this time don't release anything as well, but it's not
# a combination because only one event is accepted
]

Loading…
Cancel
Save