mirror of
https://github.com/sezanzeb/input-remapper
synced 2024-11-13 19:10:50 +00:00
wip reading keycodes even though the device is grabbed
This commit is contained in:
parent
c7736b15ed
commit
2afde0039f
@ -64,7 +64,12 @@ class _Config:
|
||||
self._config['map_EV_REL_devices'] = active
|
||||
|
||||
def may_modify_movement_devices(self):
|
||||
"""Get if devices that control movements may be modified as well."""
|
||||
"""Get if devices that control movements may be modified as well.
|
||||
|
||||
Since movement events happen quite often and fast, I'd like to
|
||||
add the option to disabling mapping those if it affects their
|
||||
performance. TODO figure out which devices to inject to instead?
|
||||
"""
|
||||
return self._config['map_EV_REL_devices']
|
||||
|
||||
def load_config(self):
|
||||
|
@ -23,6 +23,8 @@
|
||||
|
||||
|
||||
import multiprocessing
|
||||
import time
|
||||
import copy
|
||||
|
||||
import evdev
|
||||
|
||||
@ -62,10 +64,6 @@ class _GetDevicesProcess(multiprocessing.Process):
|
||||
# "Logitech USB Keyboard" and "Logitech USB Keyboard Consumer Control"
|
||||
grouped = {}
|
||||
for device in devices:
|
||||
if device.phys.startswith('key-mapper'):
|
||||
# injector device, not really periphery
|
||||
continue
|
||||
|
||||
# only keyboard devices
|
||||
# https://www.kernel.org/doc/html/latest/input/event-codes.html
|
||||
capabilities = device.capabilities().keys()
|
||||
@ -76,8 +74,6 @@ class _GetDevicesProcess(multiprocessing.Process):
|
||||
not config.may_modify_movement_devices()
|
||||
and evdev.ecodes.EV_REL in capabilities
|
||||
):
|
||||
# skip devices that control movement to avoid affecting
|
||||
# their performance due to the amount of their events.
|
||||
# TODO add checkbox to automatically load
|
||||
# a preset on login
|
||||
logger.debug(
|
||||
@ -90,7 +86,7 @@ class _GetDevicesProcess(multiprocessing.Process):
|
||||
if grouped.get(usb) is None:
|
||||
grouped[usb] = []
|
||||
|
||||
logger.debug('Found "%s", %s, %s', device.name, device.path, usb)
|
||||
logger.spam('Found "%s", %s, %s', device.name, device.path, usb)
|
||||
|
||||
grouped[usb].append((device.name, device.path))
|
||||
|
||||
@ -109,13 +105,19 @@ class _GetDevicesProcess(multiprocessing.Process):
|
||||
|
||||
|
||||
def refresh_devices():
|
||||
"""Get new devices, e.g. new ones created by key-mapper."""
|
||||
"""Get new devices, e.g. new ones created by key-mapper.
|
||||
|
||||
This should be called whenever devices in /dev are added or removed.
|
||||
"""
|
||||
# it may take a little bit of time until devices are visible after
|
||||
# changes
|
||||
time.sleep(0.1)
|
||||
global _devices
|
||||
_devices = None
|
||||
return get_devices()
|
||||
|
||||
|
||||
def get_devices():
|
||||
def get_devices(include_keymapper=False):
|
||||
"""Group devices and get relevant infos per group.
|
||||
|
||||
Returns a list containing mappings of
|
||||
@ -138,4 +140,14 @@ def get_devices():
|
||||
else:
|
||||
names = [f'"{name}"' for name in _devices]
|
||||
logger.info('Found %s', ', '.join(names))
|
||||
return _devices
|
||||
|
||||
# filter the result
|
||||
result = {}
|
||||
for device in _devices.keys():
|
||||
if include_keymapper and device.startswith('key-mapper'):
|
||||
result[device] = _devices[device]
|
||||
continue
|
||||
|
||||
result[device] = _devices[device]
|
||||
|
||||
return result
|
||||
|
@ -86,8 +86,10 @@ class Row(Gtk.ListBoxRow):
|
||||
self.keycode.set_label(str(new_keycode))
|
||||
# switch to the character, don't require mouse input because
|
||||
# that would overwrite the key with the mouse-button key if
|
||||
# the current device is a mouse
|
||||
self.window.window.set_focus(self.character_input)
|
||||
# the current device is a mouse. idle_add this so that the
|
||||
# keycode event won't write into the character input as well.
|
||||
window = self.window.window
|
||||
GLib.idle_add(lambda: window.set_focus(self.character_input))
|
||||
self.highlight()
|
||||
|
||||
# the character is empty and therefore the mapping is not complete
|
||||
|
@ -34,13 +34,14 @@ import multiprocessing
|
||||
import evdev
|
||||
|
||||
from keymapper.logger import logger
|
||||
from keymapper.getdevices import get_devices
|
||||
from keymapper.getdevices import get_devices, refresh_devices
|
||||
from keymapper.state import custom_mapping, system_mapping
|
||||
|
||||
|
||||
DEV_NAME = 'key-mapper'
|
||||
DEVICE_CREATED = 1
|
||||
FAILED = 2
|
||||
DEVICE_SKIPPED = 3
|
||||
|
||||
|
||||
def _grab(path):
|
||||
@ -88,6 +89,24 @@ def _modify_capabilities(device):
|
||||
return capabilities
|
||||
|
||||
|
||||
def _is_device_mapped(device, mapping):
|
||||
"""Check if this device has capabilities that are being mapped.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
device : evdev.InputDevice
|
||||
mapping : Mapping
|
||||
"""
|
||||
capabilities = device.capabilities(absinfo=False)[evdev.ecodes.EV_KEY]
|
||||
needed = False
|
||||
for keycode, _ in custom_mapping:
|
||||
if keycode in capabilities:
|
||||
needed = True
|
||||
if not needed:
|
||||
logger.debug('No need to grab %s', device.path)
|
||||
return needed
|
||||
|
||||
|
||||
def _start_injecting_worker(path, pipe, mapping):
|
||||
"""Inject keycodes for one of the virtual devices.
|
||||
|
||||
@ -108,19 +127,21 @@ def _start_injecting_worker(path, pipe, mapping):
|
||||
pipe.send(FAILED)
|
||||
return
|
||||
|
||||
capabilities = _modify_capabilities(device)
|
||||
if not _is_device_mapped(device, mapping):
|
||||
pipe.send(DEVICE_SKIPPED)
|
||||
return
|
||||
|
||||
keymapper_device = evdev.UInput(
|
||||
name=f'key-mapper {device.name}',
|
||||
phys='key-mapper',
|
||||
events=capabilities
|
||||
events=_modify_capabilities(device)
|
||||
)
|
||||
|
||||
pipe.send(DEVICE_CREATED)
|
||||
|
||||
logger.debug(
|
||||
'Started injecting into %s, fd %s',
|
||||
device.path, keymapper_device.fd
|
||||
keymapper_device.device.path, keymapper_device.fd
|
||||
)
|
||||
|
||||
for event in device.read_loop():
|
||||
@ -222,11 +243,7 @@ class KeycodeInjector:
|
||||
|
||||
paths = get_devices()[self.device]['paths']
|
||||
|
||||
logger.info(
|
||||
'Starting injecting the mapping for %s on %s',
|
||||
self.device,
|
||||
', '.join(paths)
|
||||
)
|
||||
logger.info('Starting injecting the mapping for %s', self.device)
|
||||
|
||||
# Watch over each one of the potentially multiple devices per hardware
|
||||
for path in paths:
|
||||
@ -247,6 +264,9 @@ class KeycodeInjector:
|
||||
if len(self.processes) == 0:
|
||||
raise OSError('Could not grab any device')
|
||||
|
||||
# key-mapper devices were added, freshly scan /dev/input
|
||||
refresh_devices()
|
||||
|
||||
@ensure_numlock
|
||||
def stop_injecting(self):
|
||||
"""Stop injecting keycodes."""
|
||||
@ -258,3 +278,5 @@ class KeycodeInjector:
|
||||
if process.is_alive():
|
||||
process.terminate()
|
||||
self.processes[i] = None
|
||||
|
||||
refresh_devices()
|
||||
|
@ -50,20 +50,31 @@ class _KeycodeReader:
|
||||
|
||||
If read is called without prior start_reading, no keycodes
|
||||
will be available.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
device : string
|
||||
The name of the device.
|
||||
"""
|
||||
paths = get_devices()[device]['paths']
|
||||
groups = get_devices(include_keymapper=True)
|
||||
for name, group in groups.items():
|
||||
# also find stuff like "key-mapper {device}"
|
||||
if device not in name:
|
||||
return
|
||||
|
||||
logger.debug(
|
||||
'Starting reading keycodes for %s on %s',
|
||||
device,
|
||||
', '.join(paths)
|
||||
)
|
||||
paths = group['paths']
|
||||
|
||||
# Watch over each one of the potentially multiple devices per hardware
|
||||
self.virtual_devices = [
|
||||
evdev.InputDevice(path)
|
||||
for path in paths
|
||||
]
|
||||
# Watch over each one of the potentially multiple devices per
|
||||
# hardware
|
||||
self.virtual_devices = [
|
||||
evdev.InputDevice(path)
|
||||
for path in paths
|
||||
]
|
||||
|
||||
logger.debug(
|
||||
'Starting reading keycodes from "%s"',
|
||||
'", "'.join([device.name for device in self.virtual_devices])
|
||||
)
|
||||
|
||||
def read(self):
|
||||
"""Get the newest keycode or None if none was pressed."""
|
||||
@ -74,12 +85,11 @@ class _KeycodeReader:
|
||||
if event is None:
|
||||
break
|
||||
|
||||
logger.spam(
|
||||
'got code:%s value:%s',
|
||||
event.code + 8, event.value
|
||||
)
|
||||
|
||||
if event.type == evdev.ecodes.EV_KEY and event.value == 1:
|
||||
logger.spam(
|
||||
'got code:%s value:%s',
|
||||
event.code + 8, event.value
|
||||
)
|
||||
# value: 1 for down, 0 for up, 2 for hold.
|
||||
# this happens to report key codes that are 8 lower
|
||||
# than the ones reported by evtest and used in xkb files
|
||||
|
@ -22,16 +22,18 @@
|
||||
import unittest
|
||||
|
||||
from keymapper.getdevices import _GetDevicesProcess
|
||||
from keymapper.config import config
|
||||
|
||||
|
||||
class FakePipe:
|
||||
devices = None
|
||||
|
||||
def send(self, devices):
|
||||
self.devices = devices
|
||||
|
||||
|
||||
class TestGetDevices(unittest.TestCase):
|
||||
def test_get_devices(self):
|
||||
class FakePipe:
|
||||
devices = None
|
||||
|
||||
def send(self, devices):
|
||||
self.devices = devices
|
||||
|
||||
# don't actually start the process, just use the `run` function.
|
||||
# otherwise the coverage tool can't keep track.
|
||||
pipe = FakePipe()
|
||||
@ -41,7 +43,8 @@ class TestGetDevices(unittest.TestCase):
|
||||
'paths': [
|
||||
'/dev/input/event11',
|
||||
'/dev/input/event10',
|
||||
'/dev/input/event13'],
|
||||
'/dev/input/event13'
|
||||
],
|
||||
'devices': [
|
||||
'device 1 foo',
|
||||
'device 1',
|
||||
@ -54,6 +57,29 @@ class TestGetDevices(unittest.TestCase):
|
||||
}
|
||||
})
|
||||
|
||||
def test_map_movement_devices(self):
|
||||
pipe = FakePipe()
|
||||
config.set_modify_movement_devices(False)
|
||||
_GetDevicesProcess(pipe).run()
|
||||
self.assertDictEqual(pipe.devices, {
|
||||
'device 1': {
|
||||
'paths': [
|
||||
'/dev/input/event11',
|
||||
'/dev/input/event10',
|
||||
'/dev/input/event13'
|
||||
],
|
||||
'devices': [
|
||||
'device 1 foo',
|
||||
'device 1',
|
||||
'device 1'
|
||||
]
|
||||
},
|
||||
'device 2': {
|
||||
'paths': ['/dev/input/event20'],
|
||||
'devices': ['device 2']
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
Loading…
Reference in New Issue
Block a user