From 61d591978ddf756ddd385017e0b4c14a05314f5e Mon Sep 17 00:00:00 2001 From: sezanzeb Date: Thu, 19 Nov 2020 11:12:28 +0100 Subject: [PATCH] ensuring numlock stays the same on grab --- keymapper/injector.py | 53 +++++++++++++++++++++++++++++++++++-- tests/testcases/injector.py | 21 ++++++++++++++- 2 files changed, 71 insertions(+), 3 deletions(-) diff --git a/keymapper/injector.py b/keymapper/injector.py index 5390665b..6578da6a 100644 --- a/keymapper/injector.py +++ b/keymapper/injector.py @@ -22,8 +22,10 @@ """Device and evdev stuff that is independent from the display server.""" +import re import asyncio import time +import subprocess # By using processes instead of threads, the mappings are # automatically copied, so that they can be worked with in the ui # without breaking the device. And it's possible to terminate processes. @@ -63,7 +65,7 @@ def _grab(path): # it might take a little time until the device is free if # it was previously grabbed. - time.sleep(0.1) + time.sleep(0.15) return device @@ -126,7 +128,7 @@ def _start_injecting_worker(path, pipe): event.type, event.code, event.value ) keymapper_device.write(event.type, event.code, event.value) - keymapper_device.syn() + # this already includes SYN events, so need to syn here again continue if event.value == 2: @@ -164,8 +166,52 @@ def _start_injecting_worker(path, pipe): keymapper_device.syn() +def is_numlock_on(): + """Get the current state of the numlock.""" + xset_q = subprocess.check_output(['xset', 'q']).decode() + num_lock_status = re.search( + r'Num Lock:\s+(.+?)\s', + xset_q + ) + if num_lock_status is not None: + return num_lock_status[1] == 'on' + + +def toggle_numlock(): + """Turn the numlock on or off.""" + try: + subprocess.check_output(['numlockx', 'toggle']) + except FileNotFoundError: + # doesn't seem to be installed everywhere + logger.debug('numlockx not found, trying to inject a keycode') + # and this doesn't always work. + device = evdev.UInput( + name=f'key-mapper numlock-control', + phys='key-mapper', + ) + device.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_NUMLOCK, 1) + device.syn() + device.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_NUMLOCK, 0) + device.syn() + + +def ensure_numlock(func): + def wrapped(*args, **kwargs): + # for some reason, grabbing a device can modify the num lock state. + # remember it and apply back later + numlock_before = is_numlock_on() + result = func(*args, **kwargs) + numlock_after = is_numlock_on() + if numlock_after != numlock_before: + logger.debug('Reverting numlock status to %s', numlock_before) + toggle_numlock() + return result + return wrapped + + class KeycodeInjector: """Keeps injecting keycodes in the background based on the mapping.""" + @ensure_numlock def __init__(self, device): """Start injecting keycodes based on custom_mapping.""" self.device = device @@ -193,10 +239,13 @@ class KeycodeInjector: status = pipe[0].recv() if status != FAILED: self.processes.append(worker) + else: + worker.join() if len(self.processes) == 0: raise OSError('Could not grab any device') + @ensure_numlock def stop_injecting(self): """Stop injecting keycodes.""" logger.info('Stopping injecting keycodes') diff --git a/tests/testcases/injector.py b/tests/testcases/injector.py index 468042a5..898d6fdd 100644 --- a/tests/testcases/injector.py +++ b/tests/testcases/injector.py @@ -23,7 +23,8 @@ import unittest import evdev -from keymapper.injector import _start_injecting_worker +from keymapper.injector import _start_injecting_worker, \ + is_numlock_on, toggle_numlock, ensure_numlock from keymapper.getdevices import get_devices from keymapper.state import custom_mapping, system_mapping @@ -40,6 +41,24 @@ class TestInjector(unittest.TestCase): self.injector.stop_injecting() self.injector = None + def test_numlock(self): + before = is_numlock_on() + + toggle_numlock() # should change + self.assertEqual(not before, is_numlock_on()) + + @ensure_numlock + def wrapped(): + toggle_numlock() + return 1234 + + wrapped() # should not change + self.assertEqual(not before, is_numlock_on()) + + # toggle one more time to restore the previous configuration + toggle_numlock() + self.assertEqual(before, is_numlock_on()) + def test_injector(self): device = get_devices()['device 2']