wip reading keycodes even though the device is grabbed

xkb
sezanzeb 4 years ago committed by sezanzeb
parent 246c3a5ec3
commit 6f0e8d97fb

@ -64,7 +64,12 @@ class _Config:
self._config['map_EV_REL_devices'] = active
def may_modify_movement_devices(self):
"""Get if devices that control movements may be modified as well."""
"""Get if devices that control movements may be modified as well.
Since movement events happen quite often and fast, I'd like to
add the option to disabling mapping those if it affects their
performance. TODO figure out which devices to inject to instead?
"""
return self._config['map_EV_REL_devices']
def load_config(self):

@ -23,6 +23,8 @@
import multiprocessing
import time
import copy
import evdev
@ -62,10 +64,6 @@ class _GetDevicesProcess(multiprocessing.Process):
# "Logitech USB Keyboard" and "Logitech USB Keyboard Consumer Control"
grouped = {}
for device in devices:
if device.phys.startswith('key-mapper'):
# injector device, not really periphery
continue
# only keyboard devices
# https://www.kernel.org/doc/html/latest/input/event-codes.html
capabilities = device.capabilities().keys()
@ -76,8 +74,6 @@ class _GetDevicesProcess(multiprocessing.Process):
not config.may_modify_movement_devices()
and evdev.ecodes.EV_REL in capabilities
):
# skip devices that control movement to avoid affecting
# their performance due to the amount of their events.
# TODO add checkbox to automatically load
# a preset on login
logger.debug(
@ -90,7 +86,7 @@ class _GetDevicesProcess(multiprocessing.Process):
if grouped.get(usb) is None:
grouped[usb] = []
logger.debug('Found "%s", %s, %s', device.name, device.path, usb)
logger.spam('Found "%s", %s, %s', device.name, device.path, usb)
grouped[usb].append((device.name, device.path))
@ -109,13 +105,19 @@ class _GetDevicesProcess(multiprocessing.Process):
def refresh_devices():
"""Get new devices, e.g. new ones created by key-mapper."""
"""Get new devices, e.g. new ones created by key-mapper.
This should be called whenever devices in /dev are added or removed.
"""
# it may take a little bit of time until devices are visible after
# changes
time.sleep(0.1)
global _devices
_devices = None
return get_devices()
def get_devices():
def get_devices(include_keymapper=False):
"""Group devices and get relevant infos per group.
Returns a list containing mappings of
@ -138,4 +140,14 @@ def get_devices():
else:
names = [f'"{name}"' for name in _devices]
logger.info('Found %s', ', '.join(names))
return _devices
# filter the result
result = {}
for device in _devices.keys():
if include_keymapper and device.startswith('key-mapper'):
result[device] = _devices[device]
continue
result[device] = _devices[device]
return result

@ -86,8 +86,10 @@ class Row(Gtk.ListBoxRow):
self.keycode.set_label(str(new_keycode))
# switch to the character, don't require mouse input because
# that would overwrite the key with the mouse-button key if
# the current device is a mouse
self.window.window.set_focus(self.character_input)
# the current device is a mouse. idle_add this so that the
# keycode event won't write into the character input as well.
window = self.window.window
GLib.idle_add(lambda: window.set_focus(self.character_input))
self.highlight()
# the character is empty and therefore the mapping is not complete

@ -34,13 +34,14 @@ import multiprocessing
import evdev
from keymapper.logger import logger
from keymapper.getdevices import get_devices
from keymapper.getdevices import get_devices, refresh_devices
from keymapper.state import custom_mapping, system_mapping
DEV_NAME = 'key-mapper'
DEVICE_CREATED = 1
FAILED = 2
DEVICE_SKIPPED = 3
def _grab(path):
@ -88,6 +89,24 @@ def _modify_capabilities(device):
return capabilities
def _is_device_mapped(device, mapping):
"""Check if this device has capabilities that are being mapped.
Parameters
----------
device : evdev.InputDevice
mapping : Mapping
"""
capabilities = device.capabilities(absinfo=False)[evdev.ecodes.EV_KEY]
needed = False
for keycode, _ in custom_mapping:
if keycode in capabilities:
needed = True
if not needed:
logger.debug('No need to grab %s', device.path)
return needed
def _start_injecting_worker(path, pipe, mapping):
"""Inject keycodes for one of the virtual devices.
@ -108,19 +127,21 @@ def _start_injecting_worker(path, pipe, mapping):
pipe.send(FAILED)
return
capabilities = _modify_capabilities(device)
if not _is_device_mapped(device, mapping):
pipe.send(DEVICE_SKIPPED)
return
keymapper_device = evdev.UInput(
name=f'key-mapper {device.name}',
phys='key-mapper',
events=capabilities
events=_modify_capabilities(device)
)
pipe.send(DEVICE_CREATED)
logger.debug(
'Started injecting into %s, fd %s',
device.path, keymapper_device.fd
keymapper_device.device.path, keymapper_device.fd
)
for event in device.read_loop():
@ -222,11 +243,7 @@ class KeycodeInjector:
paths = get_devices()[self.device]['paths']
logger.info(
'Starting injecting the mapping for %s on %s',
self.device,
', '.join(paths)
)
logger.info('Starting injecting the mapping for %s', self.device)
# Watch over each one of the potentially multiple devices per hardware
for path in paths:
@ -247,6 +264,9 @@ class KeycodeInjector:
if len(self.processes) == 0:
raise OSError('Could not grab any device')
# key-mapper devices were added, freshly scan /dev/input
refresh_devices()
@ensure_numlock
def stop_injecting(self):
"""Stop injecting keycodes."""
@ -258,3 +278,5 @@ class KeycodeInjector:
if process.is_alive():
process.terminate()
self.processes[i] = None
refresh_devices()

@ -50,20 +50,31 @@ class _KeycodeReader:
If read is called without prior start_reading, no keycodes
will be available.
"""
paths = get_devices()[device]['paths']
logger.debug(
'Starting reading keycodes for %s on %s',
device,
', '.join(paths)
)
# Watch over each one of the potentially multiple devices per hardware
self.virtual_devices = [
evdev.InputDevice(path)
for path in paths
]
Parameters
----------
device : string
The name of the device.
"""
groups = get_devices(include_keymapper=True)
for name, group in groups.items():
# also find stuff like "key-mapper {device}"
if device not in name:
return
paths = group['paths']
# Watch over each one of the potentially multiple devices per
# hardware
self.virtual_devices = [
evdev.InputDevice(path)
for path in paths
]
logger.debug(
'Starting reading keycodes from "%s"',
'", "'.join([device.name for device in self.virtual_devices])
)
def read(self):
"""Get the newest keycode or None if none was pressed."""
@ -74,12 +85,11 @@ class _KeycodeReader:
if event is None:
break
logger.spam(
'got code:%s value:%s',
event.code + 8, event.value
)
if event.type == evdev.ecodes.EV_KEY and event.value == 1:
logger.spam(
'got code:%s value:%s',
event.code + 8, event.value
)
# value: 1 for down, 0 for up, 2 for hold.
# this happens to report key codes that are 8 lower
# than the ones reported by evtest and used in xkb files

@ -22,16 +22,18 @@
import unittest
from keymapper.getdevices import _GetDevicesProcess
from keymapper.config import config
class TestGetDevices(unittest.TestCase):
def test_get_devices(self):
class FakePipe:
devices = None
class FakePipe:
devices = None
def send(self, devices):
self.devices = devices
def send(self, devices):
self.devices = devices
class TestGetDevices(unittest.TestCase):
def test_get_devices(self):
# don't actually start the process, just use the `run` function.
# otherwise the coverage tool can't keep track.
pipe = FakePipe()
@ -41,7 +43,8 @@ class TestGetDevices(unittest.TestCase):
'paths': [
'/dev/input/event11',
'/dev/input/event10',
'/dev/input/event13'],
'/dev/input/event13'
],
'devices': [
'device 1 foo',
'device 1',
@ -54,6 +57,29 @@ class TestGetDevices(unittest.TestCase):
}
})
def test_map_movement_devices(self):
pipe = FakePipe()
config.set_modify_movement_devices(False)
_GetDevicesProcess(pipe).run()
self.assertDictEqual(pipe.devices, {
'device 1': {
'paths': [
'/dev/input/event11',
'/dev/input/event10',
'/dev/input/event13'
],
'devices': [
'device 1 foo',
'device 1',
'device 1'
]
},
'device 2': {
'paths': ['/dev/input/event20'],
'devices': ['device 2']
}
})
if __name__ == "__main__":
unittest.main()

Loading…
Cancel
Save