ensuring numlock stays the same on grab

pull/14/head
sezanzeb 4 years ago
parent bf8d4d56b3
commit 46eafc48da

@ -22,8 +22,10 @@
"""Device and evdev stuff that is independent from the display server.""" """Device and evdev stuff that is independent from the display server."""
import re
import asyncio import asyncio
import time import time
import subprocess
# By using processes instead of threads, the mappings are # By using processes instead of threads, the mappings are
# automatically copied, so that they can be worked with in the ui # automatically copied, so that they can be worked with in the ui
# without breaking the device. And it's possible to terminate processes. # without breaking the device. And it's possible to terminate processes.
@ -63,7 +65,7 @@ def _grab(path):
# it might take a little time until the device is free if # it might take a little time until the device is free if
# it was previously grabbed. # it was previously grabbed.
time.sleep(0.1) time.sleep(0.15)
return device return device
@ -126,7 +128,7 @@ def _start_injecting_worker(path, pipe):
event.type, event.code, event.value event.type, event.code, event.value
) )
keymapper_device.write(event.type, event.code, event.value) keymapper_device.write(event.type, event.code, event.value)
keymapper_device.syn() # this already includes SYN events, so need to syn here again
continue continue
if event.value == 2: if event.value == 2:
@ -164,8 +166,52 @@ def _start_injecting_worker(path, pipe):
keymapper_device.syn() keymapper_device.syn()
def is_numlock_on():
"""Get the current state of the numlock."""
xset_q = subprocess.check_output(['xset', 'q']).decode()
num_lock_status = re.search(
r'Num Lock:\s+(.+?)\s',
xset_q
)
if num_lock_status is not None:
return num_lock_status[1] == 'on'
def toggle_numlock():
"""Turn the numlock on or off."""
try:
subprocess.check_output(['numlockx', 'toggle'])
except FileNotFoundError:
# doesn't seem to be installed everywhere
logger.debug('numlockx not found, trying to inject a keycode')
# and this doesn't always work.
device = evdev.UInput(
name=f'key-mapper numlock-control',
phys='key-mapper',
)
device.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_NUMLOCK, 1)
device.syn()
device.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_NUMLOCK, 0)
device.syn()
def ensure_numlock(func):
def wrapped(*args, **kwargs):
# for some reason, grabbing a device can modify the num lock state.
# remember it and apply back later
numlock_before = is_numlock_on()
result = func(*args, **kwargs)
numlock_after = is_numlock_on()
if numlock_after != numlock_before:
logger.debug('Reverting numlock status to %s', numlock_before)
toggle_numlock()
return result
return wrapped
class KeycodeInjector: class KeycodeInjector:
"""Keeps injecting keycodes in the background based on the mapping.""" """Keeps injecting keycodes in the background based on the mapping."""
@ensure_numlock
def __init__(self, device): def __init__(self, device):
"""Start injecting keycodes based on custom_mapping.""" """Start injecting keycodes based on custom_mapping."""
self.device = device self.device = device
@ -193,10 +239,13 @@ class KeycodeInjector:
status = pipe[0].recv() status = pipe[0].recv()
if status != FAILED: if status != FAILED:
self.processes.append(worker) self.processes.append(worker)
else:
worker.join()
if len(self.processes) == 0: if len(self.processes) == 0:
raise OSError('Could not grab any device') raise OSError('Could not grab any device')
@ensure_numlock
def stop_injecting(self): def stop_injecting(self):
"""Stop injecting keycodes.""" """Stop injecting keycodes."""
logger.info('Stopping injecting keycodes') logger.info('Stopping injecting keycodes')

@ -23,7 +23,8 @@ import unittest
import evdev import evdev
from keymapper.injector import _start_injecting_worker from keymapper.injector import _start_injecting_worker, \
is_numlock_on, toggle_numlock, ensure_numlock
from keymapper.getdevices import get_devices from keymapper.getdevices import get_devices
from keymapper.state import custom_mapping, system_mapping from keymapper.state import custom_mapping, system_mapping
@ -40,6 +41,24 @@ class TestInjector(unittest.TestCase):
self.injector.stop_injecting() self.injector.stop_injecting()
self.injector = None self.injector = None
def test_numlock(self):
before = is_numlock_on()
toggle_numlock() # should change
self.assertEqual(not before, is_numlock_on())
@ensure_numlock
def wrapped():
toggle_numlock()
return 1234
wrapped() # should not change
self.assertEqual(not before, is_numlock_on())
# toggle one more time to restore the previous configuration
toggle_numlock()
self.assertEqual(before, is_numlock_on())
def test_injector(self): def test_injector(self):
device = get_devices()['device 2'] device = get_devices()['device 2']

Loading…
Cancel
Save