|
|
@ -22,8 +22,10 @@
|
|
|
|
"""Device and evdev stuff that is independent from the display server."""
|
|
|
|
"""Device and evdev stuff that is independent from the display server."""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import re
|
|
|
|
import asyncio
|
|
|
|
import asyncio
|
|
|
|
import time
|
|
|
|
import time
|
|
|
|
|
|
|
|
import subprocess
|
|
|
|
# By using processes instead of threads, the mappings are
|
|
|
|
# By using processes instead of threads, the mappings are
|
|
|
|
# automatically copied, so that they can be worked with in the ui
|
|
|
|
# automatically copied, so that they can be worked with in the ui
|
|
|
|
# without breaking the device. And it's possible to terminate processes.
|
|
|
|
# without breaking the device. And it's possible to terminate processes.
|
|
|
@ -63,7 +65,7 @@ def _grab(path):
|
|
|
|
|
|
|
|
|
|
|
|
# it might take a little time until the device is free if
|
|
|
|
# it might take a little time until the device is free if
|
|
|
|
# it was previously grabbed.
|
|
|
|
# it was previously grabbed.
|
|
|
|
time.sleep(0.1)
|
|
|
|
time.sleep(0.15)
|
|
|
|
return device
|
|
|
|
return device
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -126,7 +128,7 @@ def _start_injecting_worker(path, pipe):
|
|
|
|
event.type, event.code, event.value
|
|
|
|
event.type, event.code, event.value
|
|
|
|
)
|
|
|
|
)
|
|
|
|
keymapper_device.write(event.type, event.code, event.value)
|
|
|
|
keymapper_device.write(event.type, event.code, event.value)
|
|
|
|
keymapper_device.syn()
|
|
|
|
# this already includes SYN events, so need to syn here again
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
if event.value == 2:
|
|
|
|
if event.value == 2:
|
|
|
@ -164,8 +166,52 @@ def _start_injecting_worker(path, pipe):
|
|
|
|
keymapper_device.syn()
|
|
|
|
keymapper_device.syn()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_numlock_on():
|
|
|
|
|
|
|
|
"""Get the current state of the numlock."""
|
|
|
|
|
|
|
|
xset_q = subprocess.check_output(['xset', 'q']).decode()
|
|
|
|
|
|
|
|
num_lock_status = re.search(
|
|
|
|
|
|
|
|
r'Num Lock:\s+(.+?)\s',
|
|
|
|
|
|
|
|
xset_q
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
if num_lock_status is not None:
|
|
|
|
|
|
|
|
return num_lock_status[1] == 'on'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def toggle_numlock():
|
|
|
|
|
|
|
|
"""Turn the numlock on or off."""
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
subprocess.check_output(['numlockx', 'toggle'])
|
|
|
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
|
|
|
# doesn't seem to be installed everywhere
|
|
|
|
|
|
|
|
logger.debug('numlockx not found, trying to inject a keycode')
|
|
|
|
|
|
|
|
# and this doesn't always work.
|
|
|
|
|
|
|
|
device = evdev.UInput(
|
|
|
|
|
|
|
|
name=f'key-mapper numlock-control',
|
|
|
|
|
|
|
|
phys='key-mapper',
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
device.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_NUMLOCK, 1)
|
|
|
|
|
|
|
|
device.syn()
|
|
|
|
|
|
|
|
device.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_NUMLOCK, 0)
|
|
|
|
|
|
|
|
device.syn()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ensure_numlock(func):
|
|
|
|
|
|
|
|
def wrapped(*args, **kwargs):
|
|
|
|
|
|
|
|
# for some reason, grabbing a device can modify the num lock state.
|
|
|
|
|
|
|
|
# remember it and apply back later
|
|
|
|
|
|
|
|
numlock_before = is_numlock_on()
|
|
|
|
|
|
|
|
result = func(*args, **kwargs)
|
|
|
|
|
|
|
|
numlock_after = is_numlock_on()
|
|
|
|
|
|
|
|
if numlock_after != numlock_before:
|
|
|
|
|
|
|
|
logger.debug('Reverting numlock status to %s', numlock_before)
|
|
|
|
|
|
|
|
toggle_numlock()
|
|
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
return wrapped
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class KeycodeInjector:
|
|
|
|
class KeycodeInjector:
|
|
|
|
"""Keeps injecting keycodes in the background based on the mapping."""
|
|
|
|
"""Keeps injecting keycodes in the background based on the mapping."""
|
|
|
|
|
|
|
|
@ensure_numlock
|
|
|
|
def __init__(self, device):
|
|
|
|
def __init__(self, device):
|
|
|
|
"""Start injecting keycodes based on custom_mapping."""
|
|
|
|
"""Start injecting keycodes based on custom_mapping."""
|
|
|
|
self.device = device
|
|
|
|
self.device = device
|
|
|
@ -193,10 +239,13 @@ class KeycodeInjector:
|
|
|
|
status = pipe[0].recv()
|
|
|
|
status = pipe[0].recv()
|
|
|
|
if status != FAILED:
|
|
|
|
if status != FAILED:
|
|
|
|
self.processes.append(worker)
|
|
|
|
self.processes.append(worker)
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
worker.join()
|
|
|
|
|
|
|
|
|
|
|
|
if len(self.processes) == 0:
|
|
|
|
if len(self.processes) == 0:
|
|
|
|
raise OSError('Could not grab any device')
|
|
|
|
raise OSError('Could not grab any device')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ensure_numlock
|
|
|
|
def stop_injecting(self):
|
|
|
|
def stop_injecting(self):
|
|
|
|
"""Stop injecting keycodes."""
|
|
|
|
"""Stop injecting keycodes."""
|
|
|
|
logger.info('Stopping injecting keycodes')
|
|
|
|
logger.info('Stopping injecting keycodes')
|
|
|
|