|
|
|
@ -39,9 +39,7 @@ from keymapper.dev.macros import parse
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DEV_NAME = 'key-mapper'
|
|
|
|
|
DEVICE_CREATED = 1
|
|
|
|
|
FAILED = 2
|
|
|
|
|
DEVICE_SKIPPED = 3
|
|
|
|
|
CLOSE = 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_numlock_on():
|
|
|
|
@ -67,8 +65,8 @@ def toggle_numlock():
|
|
|
|
|
logger.debug('numlockx not found, trying to inject a keycode')
|
|
|
|
|
# and this doesn't always work.
|
|
|
|
|
device = evdev.UInput(
|
|
|
|
|
name=f'key-mapper numlock-control',
|
|
|
|
|
phys='key-mapper',
|
|
|
|
|
name=f'{DEV_NAME} numlock-control',
|
|
|
|
|
phys=DEV_NAME,
|
|
|
|
|
)
|
|
|
|
|
device.write(EV_KEY, evdev.ecodes.KEY_NUMLOCK, 1)
|
|
|
|
|
device.syn()
|
|
|
|
@ -110,10 +108,7 @@ class KeycodeInjector:
|
|
|
|
|
self.device = device
|
|
|
|
|
self.mapping = mapping
|
|
|
|
|
self._process = None
|
|
|
|
|
|
|
|
|
|
def __del__(self):
|
|
|
|
|
if self._process is not None:
|
|
|
|
|
self._process.terminate()
|
|
|
|
|
self._msg_pipe = multiprocessing.Pipe()
|
|
|
|
|
|
|
|
|
|
def start_injecting(self):
|
|
|
|
|
"""Start injecting keycodes."""
|
|
|
|
@ -121,11 +116,16 @@ class KeycodeInjector:
|
|
|
|
|
self._process.start()
|
|
|
|
|
|
|
|
|
|
def _prepare_device(self, path):
|
|
|
|
|
"""Try to grab the device, return if not needed/possible."""
|
|
|
|
|
"""Try to grab the device, return if not needed/possible.
|
|
|
|
|
|
|
|
|
|
Also return if ABS events are changed to REL mouse movements,
|
|
|
|
|
because the capabilities of the returned device are changed
|
|
|
|
|
so this cannot be checked later anymore.
|
|
|
|
|
"""
|
|
|
|
|
device = evdev.InputDevice(path)
|
|
|
|
|
|
|
|
|
|
if device is None:
|
|
|
|
|
return None
|
|
|
|
|
return None, False
|
|
|
|
|
|
|
|
|
|
capabilities = device.capabilities(absinfo=False)
|
|
|
|
|
|
|
|
|
@ -136,15 +136,15 @@ class KeycodeInjector:
|
|
|
|
|
needed = True
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
can_do_abs = evdev.ecodes.ABS_X in capabilities.get(EV_ABS, [])
|
|
|
|
|
if self.map_abs_to_rel() and can_do_abs:
|
|
|
|
|
map_ABS = self.map_ABS(device)
|
|
|
|
|
if map_ABS:
|
|
|
|
|
needed = True
|
|
|
|
|
|
|
|
|
|
if not needed:
|
|
|
|
|
# skipping reading and checking on events from those devices
|
|
|
|
|
# may be beneficial for performance.
|
|
|
|
|
logger.debug('No need to grab %s', path)
|
|
|
|
|
return None
|
|
|
|
|
return None, False
|
|
|
|
|
|
|
|
|
|
attempts = 0
|
|
|
|
|
while True:
|
|
|
|
@ -164,22 +164,25 @@ class KeycodeInjector:
|
|
|
|
|
|
|
|
|
|
if attempts >= 4:
|
|
|
|
|
logger.error('Cannot grab %s, it is possibly in use', path)
|
|
|
|
|
return None
|
|
|
|
|
return None, False
|
|
|
|
|
|
|
|
|
|
time.sleep(0.15)
|
|
|
|
|
|
|
|
|
|
return device
|
|
|
|
|
return device, map_ABS
|
|
|
|
|
|
|
|
|
|
def map_abs_to_rel(self):
|
|
|
|
|
def map_ABS(self, device):
|
|
|
|
|
# TODO offer configuration via the UI if a gamepad is elected
|
|
|
|
|
return True
|
|
|
|
|
capabilities = device.capabilities(absinfo=False)
|
|
|
|
|
return evdev.ecodes.ABS_X in capabilities.get(EV_ABS, [])
|
|
|
|
|
|
|
|
|
|
def _modify_capabilities(self, input_device):
|
|
|
|
|
def _modify_capabilities(self, input_device, map_ABS):
|
|
|
|
|
"""Adds all keycode into a copy of a devices capabilities.
|
|
|
|
|
|
|
|
|
|
Prameters
|
|
|
|
|
---------
|
|
|
|
|
input_device : evdev.InputDevice
|
|
|
|
|
map_ABS : bool
|
|
|
|
|
if ABS capabilities should be removed in favor of REL
|
|
|
|
|
"""
|
|
|
|
|
ecodes = evdev.ecodes
|
|
|
|
|
|
|
|
|
@ -196,9 +199,8 @@ class KeycodeInjector:
|
|
|
|
|
if keycode is not None:
|
|
|
|
|
capabilities[ecodes.EV_KEY].append(keycode - KEYCODE_OFFSET)
|
|
|
|
|
|
|
|
|
|
if self.map_abs_to_rel():
|
|
|
|
|
if capabilities.get(ecodes.EV_ABS):
|
|
|
|
|
del capabilities[ecodes.EV_ABS]
|
|
|
|
|
if map_ABS:
|
|
|
|
|
del capabilities[ecodes.EV_ABS]
|
|
|
|
|
capabilities[ecodes.EV_REL] = [
|
|
|
|
|
evdev.ecodes.REL_X,
|
|
|
|
|
evdev.ecodes.REL_Y,
|
|
|
|
@ -214,23 +216,37 @@ class KeycodeInjector:
|
|
|
|
|
|
|
|
|
|
return capabilities
|
|
|
|
|
|
|
|
|
|
async def _msg_listener(self, loop):
|
|
|
|
|
"""Wait for messages from the main process to do special stuff."""
|
|
|
|
|
while True:
|
|
|
|
|
frame_available = asyncio.Event()
|
|
|
|
|
loop.add_reader(self._msg_pipe[0].fileno(), frame_available.set)
|
|
|
|
|
await frame_available.wait()
|
|
|
|
|
frame_available.clear()
|
|
|
|
|
msg = self._msg_pipe[0].recv()
|
|
|
|
|
if msg == CLOSE:
|
|
|
|
|
logger.debug('Received close signal')
|
|
|
|
|
# stop the event loop and cause the process to reach its end
|
|
|
|
|
# cleanly. Using .terminate prevents coverage from working.
|
|
|
|
|
loop.stop()
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
def _start_injecting(self):
|
|
|
|
|
"""The injection worker that keeps injecting until terminated.
|
|
|
|
|
|
|
|
|
|
Stuff is non-blocking by using asyncio in order to do multiple things
|
|
|
|
|
somewhat concurrently.
|
|
|
|
|
"""
|
|
|
|
|
# TODO do select.select insted of async_read_loop
|
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
|
coroutines = []
|
|
|
|
|
|
|
|
|
|
logger.info('Starting injecting the mapping for %s', self.device)
|
|
|
|
|
|
|
|
|
|
paths = get_devices()[self.device]['paths']
|
|
|
|
|
devices = [self._prepare_device(path) for path in paths]
|
|
|
|
|
|
|
|
|
|
# Watch over each one of the potentially multiple devices per hardware
|
|
|
|
|
for input_device in devices:
|
|
|
|
|
for path in paths:
|
|
|
|
|
input_device, map_ABS = self._prepare_device(path)
|
|
|
|
|
if input_device is None:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
@ -238,17 +254,19 @@ class KeycodeInjector:
|
|
|
|
|
# EV_ABS capability, EV_REL won't move the mouse pointer anymore.
|
|
|
|
|
# so don't merge all InputDevices into one UInput device.
|
|
|
|
|
uinput = evdev.UInput(
|
|
|
|
|
name=f'key-mapper {self.device}',
|
|
|
|
|
phys='key-mapper',
|
|
|
|
|
events=self._modify_capabilities(input_device)
|
|
|
|
|
name=f'{DEV_NAME} {self.device}',
|
|
|
|
|
phys=DEV_NAME,
|
|
|
|
|
events=self._modify_capabilities(input_device, map_ABS)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# TODO separate file
|
|
|
|
|
# keycode injection
|
|
|
|
|
coroutine = self._keycode_loop(input_device, uinput)
|
|
|
|
|
coroutine = self._keycode_loop(input_device, uinput, map_ABS)
|
|
|
|
|
coroutines.append(coroutine)
|
|
|
|
|
|
|
|
|
|
# TODO separate file
|
|
|
|
|
# mouse movement injection
|
|
|
|
|
if self.map_abs_to_rel():
|
|
|
|
|
if map_ABS:
|
|
|
|
|
self.abs_x = 0
|
|
|
|
|
self.abs_y = 0
|
|
|
|
|
# events only take ints, so a movement of 0.3 needs to add
|
|
|
|
@ -262,7 +280,13 @@ class KeycodeInjector:
|
|
|
|
|
logger.error('Did not grab any device')
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
loop.run_until_complete(asyncio.gather(*coroutines))
|
|
|
|
|
coroutines.append(self._msg_listener(loop))
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
loop.run_until_complete(asyncio.gather(*coroutines))
|
|
|
|
|
except RuntimeError:
|
|
|
|
|
# stopped event loop most likely
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
if len(coroutines) > 0:
|
|
|
|
|
logger.debug('asyncio coroutines ended')
|
|
|
|
@ -336,7 +360,7 @@ class KeycodeInjector:
|
|
|
|
|
rel_x
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
async def _keycode_loop(self, device, keymapper_device):
|
|
|
|
|
async def _keycode_loop(self, device, keymapper_device, map_ABS):
|
|
|
|
|
"""Inject keycodes for one of the virtual devices.
|
|
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
@ -345,6 +369,8 @@ class KeycodeInjector:
|
|
|
|
|
where to read keycodes from
|
|
|
|
|
keymapper_device : evdev.UInput
|
|
|
|
|
where to write keycodes to
|
|
|
|
|
map_ABS : bool
|
|
|
|
|
the value of map_ABS() for the original device
|
|
|
|
|
"""
|
|
|
|
|
# Parse all macros beforehand
|
|
|
|
|
logger.debug('Parsing macros')
|
|
|
|
@ -363,7 +389,7 @@ class KeycodeInjector:
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
async for event in device.async_read_loop():
|
|
|
|
|
if self.map_abs_to_rel() and event.type == EV_ABS:
|
|
|
|
|
if map_ABS and event.type == EV_ABS:
|
|
|
|
|
if event.code not in [evdev.ecodes.ABS_X, evdev.ecodes.ABS_Y]:
|
|
|
|
|
continue
|
|
|
|
|
if event.code == evdev.ecodes.ABS_X:
|
|
|
|
@ -438,5 +464,4 @@ class KeycodeInjector:
|
|
|
|
|
def stop_injecting(self):
|
|
|
|
|
"""Stop injecting keycodes."""
|
|
|
|
|
logger.info('Stopping injecting keycodes for device "%s"', self.device)
|
|
|
|
|
if self._process is not None and self._process.is_alive():
|
|
|
|
|
self._process.terminate()
|
|
|
|
|
self._msg_pipe[1].send(CLOSE)
|
|
|
|
|