fixed autoloading via udev

pull/45/head
sezanzeb 4 years ago
parent 42ffdf8d1c
commit d4f0bf8d5c

@ -26,6 +26,7 @@ import os
import sys
import json
import shutil
import copy
from keymapper.paths import CONFIG_PATH, USER, touch
from keymapper.logger import logger
@ -162,7 +163,8 @@ class ConfigBase:
if resolved is None and log_unknown:
logger.error('Unknown config key "%s"', path)
return resolved
# modifications are only allowed via set
return copy.deepcopy(resolved)
def clear_config(self):
"""Remove all configurations in memory."""
@ -236,7 +238,7 @@ class GlobalConfig(ConfigBase):
# treated like an empty config
logger.debug('Config "%s" doesn\'t exist yet', self.path)
self.clear_config()
self._config = INITIAL_CONFIG
self._config = copy.deepcopy(INITIAL_CONFIG)
self.save_config()
return

@ -30,6 +30,7 @@ import subprocess
import json
import time
import evdev
from pydbus import SystemBus
from gi.repository import GLib
@ -215,6 +216,33 @@ class Daemon:
self.config_dir = None
self.autoload_history = AutoloadHistory()
self.refreshed_devices_at = 0
def refresh_devices(self, device=None):
"""Keep the devices up to date."""
now = time.time()
if now - 10 > self.refreshed_devices_at:
logger.debug('Refreshing because last info is too old')
refresh_devices()
self.refreshed_devices_at = now
return
if device is not None:
if device.startswith('/dev/input/'):
for group in get_devices().values():
if device in group['paths']:
break
else:
logger.debug('Refreshing because path unknown')
refresh_devices()
self.refreshed_devices_at = now
return
else:
if device not in get_devices():
logger.debug('Refreshing because name unknown')
refresh_devices()
self.refreshed_devices_at = now
return
def stop_injecting(self, device):
"""Stop injecting the mapping for a single device."""
@ -262,7 +290,15 @@ class Daemon:
Device name. Expects a key that is present in get_devices().
Can also be a path starting with /dev/input/
"""
self.refresh_devices(device)
device = path_to_device_name(device)
if device not in get_devices():
# even after refresh_devices, the device is not in
# get_devices(), so it's either not relevant for key-mapper,
# or not connected yet
return
preset = config.get(['autoload', device], log_unknown=False)
if preset is None:
@ -297,6 +333,19 @@ class Daemon:
device : str
The name of the device as indexed in get_devices()
"""
if device.startswith('/dev/input/'):
# this is only here to avoid confusing console output,
# block invalid requests before any logs are written.
# Those requests are rejected later anyway.
try:
name = evdev.InputDevice(device).name
if 'key-mapper' in name:
return
except OSError:
return
logger.info('Request to autoload for "%s"', device)
if self.config_dir is None:
logger.error(
'Tried to autoload %s without configuring the daemon first '
@ -343,6 +392,8 @@ class Daemon:
preset : string
The name of the preset
"""
self.refresh_devices(device)
device = path_to_device_name(device)
if self.config_dir is None:
@ -352,10 +403,6 @@ class Daemon:
)
return
if device not in get_devices():
logger.debug('Devices possibly outdated, refreshing')
refresh_devices()
if device not in get_devices():
logger.error('Could not find device "%s"', device)
return

@ -102,12 +102,13 @@ class _GetDevices(threading.Thread):
asyncio.set_event_loop(loop)
logger.debug('Discovering device paths')
devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
# group them together by usb device because there could be stuff like
# "Logitech USB Keyboard" and "Logitech USB Keyboard Consumer Control"
grouped = {}
for device in devices:
for path in evdev.list_devices():
device = evdev.InputDevice(path)
if device.name == 'Power Button':
continue

@ -17,7 +17,7 @@
<text x="22.0" y="14">pylint</text>
</g>
<g fill="#fff" text-anchor="middle" font-family="DejaVu Sans,Verdana,Geneva,sans-serif" font-size="11">
<text x="63.0" y="15" fill="#010101" fill-opacity=".3">9.73</text>
<text x="62.0" y="14">9.73</text>
<text x="63.0" y="15" fill="#010101" fill-opacity=".3">9.74</text>
<text x="62.0" y="14">9.74</text>
</g>
</svg>

Before

Width:  |  Height:  |  Size: 1.0 KiB

After

Width:  |  Height:  |  Size: 1.0 KiB

@ -268,6 +268,7 @@ class InputDevice:
ret = [e.copy() for e in pending_events.get(self.group, [])]
if ret is not None:
# consume all of them
self.log('read all', self.group)
pending_events[self.group] = []
return ret

@ -66,8 +66,8 @@ class TestControl(unittest.TestCase):
cleanup()
def test_autoload(self):
devices = ['device 1234', 'device 2345']
presets = ['preset', 'bar', 'bar2']
devices = ['device 1', 'device 2']
presets = ['bar0', 'bar', 'bar2']
paths = [
get_preset_path(devices[0], presets[0]),
get_preset_path(devices[1], presets[1]),
@ -152,9 +152,9 @@ class TestControl(unittest.TestCase):
self.assertTrue(daemon.autoload_history.may_autoload(devices[1], presets[2]))
def test_autoload_other_path(self):
devices = ['device 1234', 'device 2345']
presets = ['preset', 'bar']
config_dir = os.path.join(tmp, 'foo', 'bar')
devices = ['device 1', 'device 2']
presets = ['bar123', 'bar2']
config_dir = os.path.join(tmp, 'qux', 'quux')
paths = [
os.path.join(config_dir, 'presets', devices[0], presets[0] + '.json'),
os.path.join(config_dir, 'presets', devices[1], presets[1] + '.json')
@ -182,7 +182,7 @@ class TestControl(unittest.TestCase):
def test_start_stop(self):
device = 'device 1234'
preset = 'preset'
preset = 'preset9'
daemon = Daemon()

@ -255,6 +255,30 @@ class TestDaemon(unittest.TestCase):
self.daemon.stop_injecting(device)
self.assertEqual(self.daemon.get_state(device), STOPPED)
def test_refresh_devices_for_unknown_paths(self):
device = '9876 name'
# this test only makes sense if this device is unknown yet
self.assertIsNone(get_devices().get(device))
self.daemon = Daemon()
# make sure the devices are populated
get_devices()
self.daemon.refresh_devices()
fixtures[self.new_fixture] = {
'capabilities': {evdev.ecodes.EV_KEY: [evdev.ecodes.KEY_A]},
'phys': '9876 phys',
'info': evdev.device.DeviceInfo(4, 5, 6, 7),
'name': device
}
self.daemon._autoload(self.new_fixture)
# test if the injector called refresh_devices successfully
self.assertIsNotNone(get_devices().get(device))
def test_xmodmap_file(self):
from_keycode = evdev.ecodes.KEY_A
to_name = 'qux'
@ -302,7 +326,7 @@ class TestDaemon(unittest.TestCase):
def test_start_stop(self):
device = 'device 1'
preset = 'preset'
preset = 'preset8'
path = '/dev/input/event11'
daemon = Daemon()
@ -369,7 +393,7 @@ class TestDaemon(unittest.TestCase):
def test_autoload(self):
device = 'device 1'
preset = 'preset'
preset = 'preset7'
path = '/dev/input/event11'
daemon = Daemon()
@ -389,10 +413,13 @@ class TestDaemon(unittest.TestCase):
config.set_autoload_preset(device, preset)
config.save_config()
self.daemon.set_config_dir(get_config_path())
len_before = len(self.daemon.autoload_history._autoload_history)
self.daemon._autoload(path)
len_after = len(self.daemon.autoload_history._autoload_history)
self.assertEqual(daemon.autoload_history._autoload_history[device][1], preset)
self.assertFalse(daemon.autoload_history.may_autoload(device, preset))
injector = daemon.injectors[device]
self.assertEqual(len_before + 1, len_after)
# calling duplicate _autoload does nothing
self.daemon._autoload(path)
@ -404,6 +431,18 @@ class TestDaemon(unittest.TestCase):
self.daemon.start_injecting(device, preset)
self.assertTrue(daemon.autoload_history.may_autoload(device, preset))
# calling autoload for (yet) unknown devices does nothing
len_before = len(self.daemon.autoload_history._autoload_history)
self.daemon._autoload('/dev/input/qux')
len_after = len(self.daemon.autoload_history._autoload_history)
self.assertEqual(len_before, len_after)
# autoloading key-mapper devices does nothing
len_before = len(self.daemon.autoload_history._autoload_history)
self.daemon.autoload_single('/dev/input/event40')
len_after = len(self.daemon.autoload_history._autoload_history)
self.assertEqual(len_before, len_after)
if __name__ == "__main__":
unittest.main()

@ -420,7 +420,6 @@ class TestInjector(unittest.TestCase):
# convert the write history to some easier to manage list
history = read_write_history_pipe()
print(history)
self.assertEqual(history.count((EV_KEY, 77, 1)), 1)
self.assertEqual(history.count((EV_ABS, ABS_RZ, value)), 1)

@ -951,6 +951,11 @@ class TestIntegration(unittest.TestCase):
custom_mapping.set('gamepad.joystick.non_linearity', 1)
self.assertEqual(speed, 2 ** 6)
# don't consume the events in the reader, they are used to test
# the injection
keycode_reader.stop_reading()
time.sleep(0.1)
pending_events['gamepad'] = [
new_event(EV_ABS, ABS_RX, -MAX_ABS),
new_event(EV_ABS, ABS_X, MAX_ABS)

Loading…
Cancel
Save