gamepad buttons as keyboard keys

pull/14/head
sezanzeb 4 years ago
parent 59a6c6e166
commit 4f87ef0b0c

@ -277,7 +277,6 @@
<property name="default_height">350</property>
<property name="icon_name">mouse</property>
<signal name="delete-event" handler="on_close" swapped="no"/>
<signal name="event" handler="on_window_event" swapped="no"/>
<child>
<object class="GtkBox" id="vertical-wrapper">
<property name="visible">True</property>

@ -23,6 +23,8 @@
import evdev
import select
import multiprocessing
from keymapper.logger import logger
from keymapper.getdevices import get_devices, refresh_devices
@ -41,20 +43,37 @@ class _KeycodeReader:
"""
def __init__(self):
self.virtual_devices = []
self._pipe = None
self._process = None
def __del__(self):
self.stop_reading()
def stop_reading(self):
# TODO something like this for the injector?
if self._process is not None:
logger.debug('Terminating reader process')
self._process.terminate()
self._process = None
if self._pipe is not None:
logger.debug('Closing reader pipe')
self._pipe.close()
self._pipe = None
def clear(self):
"""Next time when reading don't return the previous keycode."""
# read all of them to clear the buffer or whatever
for virtual_device in self.virtual_devices:
while virtual_device.read_one():
pass
# just call read to clear the pipe
self.read()
def start_reading(self, device):
def start_reading(self, device_name):
"""Tell the evdev lib to start looking for keycodes.
If read is called without prior start_reading, no keycodes
will be available.
"""
self.stop_reading()
# make sure this sees up to date devices, including those created
# by key-mapper
refresh_devices()
@ -63,48 +82,72 @@ class _KeycodeReader:
for name, group in get_devices(include_keymapper=True).items():
# also find stuff like "key-mapper {device}"
if device not in name:
if device_name not in name:
continue
# Watch over each one of the potentially multiple devices per
# hardware
for path in group['paths']:
try:
self.virtual_devices.append(evdev.InputDevice(path))
device = evdev.InputDevice(path)
except FileNotFoundError:
continue
if evdev.ecodes.EV_KEY in device.capabilities():
self.virtual_devices.append(device)
logger.debug(
'Starting reading keycodes from "%s"',
'", "'.join([device.name for device in self.virtual_devices])
)
def read(self):
"""Get the newest keycode or None if none was pressed."""
newest_keycode = None
for virtual_device in self.virtual_devices:
while True:
pipe = multiprocessing.Pipe()
self._process = multiprocessing.Process(
target=self._read_worker,
args=(pipe[1],)
)
self._process.start()
self._pipe = pipe[0]
def _consume_event(self, event, pipe):
"""Write the event code into the pipe if it is a key-down press."""
# value: 1 for down, 0 for up, 2 for hold.
if event.type == evdev.ecodes.EV_KEY and event.value == 1:
logger.spam(
'got code:%s value:%s',
event.code + KEYCODE_OFFSET, event.value
)
pipe.send(event.code + KEYCODE_OFFSET)
def _read_worker(self, pipe):
"""Process that reads keycodes and buffers them into a pipe."""
# using a process that blocks instead of read_one made it easier
# to debug via the logs, because the UI was not polling properly
# at some point which caused logs for events not to be written.
rlist = {device.fd: device for device in self.virtual_devices}
while True:
ready = select.select(rlist, [], [])[0]
for fd in ready:
try:
event = virtual_device.read_one()
for event in rlist[fd].read():
self._consume_event(event, pipe)
except OSError:
# can happen if a device disappears
logger.debug(
'%s cannot be read anymore',
virtual_device.name
'Device "%s" disappeared from the reader',
rlist[fd].path
)
self.virtual_devices.remove(virtual_device)
break
del rlist[fd]
if event is None:
break
def read(self):
"""Get the newest keycode or None if none was pressed."""
if self._pipe is None:
logger.debug('No pipe available to read from')
return None
newest_keycode = None
while self._pipe.poll():
newest_keycode = self._pipe.recv()
if event.type == evdev.ecodes.EV_KEY and event.value == 1:
logger.spam(
'got code:%s value:%s',
event.code + KEYCODE_OFFSET, event.value
)
# value: 1 for down, 0 for up, 2 for hold.
newest_keycode = event.code + KEYCODE_OFFSET
return newest_keycode

@ -113,7 +113,10 @@ class Window:
self.select_newest_preset()
self.timeout = GLib.timeout_add(100, self.check_add_row)
self.timeouts = [
GLib.timeout_add(100, self.check_add_row),
GLib.timeout_add(1000 / 30, self.consume_newest_keycode)
]
# now show the proper finished content of the window
self.get('vertical-wrapper').set_opacity(1)
@ -124,7 +127,9 @@ class Window:
def on_close(self, *_):
"""Safely close the application."""
GLib.source_remove(self.timeout)
for timeout in self.timeouts:
GLib.source_remove(timeout)
keycode_reader.stop_reading()
Gtk.main_quit()
def check_add_row(self):
@ -159,7 +164,10 @@ class Window:
device_selection.append(device, device)
def populate_presets(self):
"""Show the available presets for the selected device."""
"""Show the available presets for the selected device.
This will destroy unsaved changes in the custom_mapping.
"""
self.get('preset_name_input').set_text('')
device = self.selected_device
@ -167,6 +175,7 @@ class Window:
if len(presets) == 0:
new_preset = get_available_preset_name(self.selected_device)
custom_mapping.empty()
custom_mapping.save(self.selected_device, new_preset)
presets = [new_preset]
else:
@ -195,22 +204,20 @@ class Window:
key_list = self.get('key_list')
key_list.forall(lambda row: row.unhighlight())
def on_window_event(self, *args):
"""Write down the pressed key in the UI.
Triggered from any mouse and keyboard event.
"""
# to capture regular keyboard keys or extra-mouse keys
def consume_newest_keycode(self):
"""To capture events from keyboard, mice and gamepads."""
# the "event" event of Gtk.Window wouldn't trigger on gamepad
# events, so it became a GLib timeout
keycode = keycode_reader.read()
if keycode is None:
return
return True
if keycode in [280, 333]:
# disable mapping the left mouse button because it would break
# the mouse. Also it is emitted right when focusing the row
# which breaks the current workflow.
return
return True
self.get('keycode').set_text(str(keycode))
@ -220,6 +227,8 @@ class Window:
if isinstance(focused, Gtk.ToggleButton) and isinstance(row, Row):
row.set_new_keycode(keycode)
return True
def on_apply_system_layout_clicked(self, _):
"""Load the mapping."""
self.dbus.stop_injecting(self.selected_device)

@ -148,6 +148,10 @@ def patch_evdev():
return fixtures.keys()
class InputDevice:
# expose as existing attribute, otherwise the patch for
# evdev < 1.0.0 will crash the test
path = None
def __init__(self, path):
self.path = path
self.phys = fixtures[path]['phys']

@ -24,6 +24,7 @@ import time
import os
import unittest
import evdev
import json
from unittest.mock import patch
from importlib.util import spec_from_loader, module_from_spec
from importlib.machinery import SourceFileLoader
@ -34,7 +35,7 @@ gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from keymapper.state import custom_mapping, system_mapping
from keymapper.paths import CONFIG
from keymapper.paths import CONFIG, get_config_path
from keymapper.config import config
from test import tmp, pending_events, Event, uinput_write_history_pipe, \
@ -143,6 +144,24 @@ class TestIntegration(unittest.TestCase):
[('device 1', 'new preset')]
)
def test_select_device(self):
# creates a new empty preset when no preset exists for the device
self.window.on_select_device(FakeDropdown('device 1'))
custom_mapping.change(50, 'q')
custom_mapping.change(51, 'u')
custom_mapping.change(52, 'x')
self.assertEqual(len(custom_mapping), 3)
self.window.on_select_device(FakeDropdown('device 2'))
self.assertEqual(len(custom_mapping), 0)
# it creates the file for that right away. It may have been possible
# to write it such that it doesn't (its empty anyway), but it does,
# so use that to test it in more detail.
path = get_config_path('device 2', 'new preset')
self.assertTrue(os.path.exists(path))
with open(path, 'r') as file:
preset = json.load(file)
self.assertEqual(len(preset['mapping']), 0)
def test_can_start(self):
self.assertIsNotNone(self.window)
self.assertTrue(self.window.window.get_visible())

Loading…
Cancel
Save