multiple attempts at grabbing the device

xkb
sezanzeb 4 years ago committed by sezanzeb
parent d59ad26823
commit 855778c52c

@ -22,11 +22,12 @@
"""Device and evdev stuff that is independent from the display server."""
import asyncio
import time
# By using processes instead of threads, the mappings are
# automatically copied, so that they can be worked with in the ui
# without breaking the device. And it's possible to terminate processes.
import multiprocessing
import asyncio
import evdev
@ -37,6 +38,130 @@ from keymapper.state import custom_mapping, system_mapping
DEV_NAME = 'key-mapper'
DEVICE_CREATED = 1
FAILED = 2
def _grab(path):
"""Try to grab, repeat a few times with time inbetween on failure."""
attempts = 0
while True:
device = evdev.InputDevice(path)
try:
# grab to avoid e.g. the disabled keycode of 10 to confuse
# X, especially when one of the buttons of your mouse also
# uses 10. This also avoids having to load an empty xkb
# symbols file to prevent writing any unwanted keys.
device.grab()
break
except IOError:
attempts += 1
logger.debug('Failed attemt to grab %s %d', path, attempts)
if attempts >= 4:
logger.error('Cannot grab %s', path)
return None
# it might take a little time until the device is free if
# it was previously grabbed.
time.sleep(0.1)
return device
def _modify_capabilities(device):
"""Adds all keycode into a copy of a devices capabilities."""
# copy the capabilities because the keymapper_device is going
# to act like the device.
capabilities = device.capabilities(absinfo=False)
# However, make sure that it supports all keycodes, not just some
# random ones. That's why I avoid from_device for this
capabilities[evdev.ecodes.EV_KEY] = evdev.ecodes.keys.keys()
# just like what python-evdev does in from_device
if evdev.ecodes.EV_SYN in capabilities:
del capabilities[evdev.ecodes.EV_SYN]
if evdev.ecodes.EV_FF in capabilities:
del capabilities[evdev.ecodes.EV_FF]
return capabilities
def _start_injecting_worker(path, pipe):
"""Inject keycodes for one of the virtual devices.
Parameters
----------
path : string
path in /dev to read keycodes from
pipe : multiprocessing.Pipe
pipe to send status codes over
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
device = _grab(path)
if device is None:
pipe.send(FAILED)
return
capabilities = _modify_capabilities(device)
keymapper_device = evdev.UInput(
name=f'key-mapper {device.name}',
phys='key-mapper',
events=capabilities
)
pipe.send(DEVICE_CREATED)
logger.debug(
'Started injecting into %s, fd %s',
device.path, keymapper_device.fd
)
for event in device.read_loop():
if event.type != evdev.ecodes.EV_KEY:
logger.spam(
'got type:%s code:%s value:%s, forward',
event.type, event.code, event.value
)
keymapper_device.write(event.type, event.code, event.value)
keymapper_device.syn()
continue
if event.value == 2:
# linux does them itself, no need to trigger them
continue
# this happens to report key codes that are 8 lower
# than the ones reported by xev and that X expects
input_keycode = event.code + 8
character = custom_mapping.get_character(input_keycode)
if character is None:
# unknown keycode, forward it
target_keycode = input_keycode
else:
target_keycode = system_mapping.get_keycode(character)
if target_keycode is None:
logger.error(
'Cannot find character %s in the internal mapping',
character
)
continue
logger.spam(
'got code:%s value:%s, maps to code:%s char:%s',
event.code + 8, event.value, target_keycode, character
)
keymapper_device.write(
evdev.ecodes.EV_KEY,
target_keycode - 8,
event.value
)
keymapper_device.syn()
class KeycodeInjector:
@ -59,7 +184,7 @@ class KeycodeInjector:
for path in paths:
pipe = multiprocessing.Pipe()
worker = multiprocessing.Process(
target=self._start_injecting_worker,
target=_start_injecting_worker,
args=(path, pipe[1])
)
worker.start()
@ -78,88 +203,3 @@ class KeycodeInjector:
if process.is_alive():
process.terminate()
self.processes[i] = None
def _start_injecting_worker(self, path, pipe):
"""Inject keycodes for one of the virtual devices."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
device = evdev.InputDevice(path)
try:
# grab to avoid e.g. the disabled keycode of 10 to confuse X,
# especially when one of the buttons of your mouse also uses 10.
# This also avoids having to load an empty xkb symbols file
# to prevent writing any unwanted keys.
device.grab()
except IOError:
logger.error('Cannot grab %s', path)
# copy the capabilities because the keymapper_device is going
# to act like the device.
capabilities = device.capabilities(absinfo=False)
# However, make sure that it supports all keycodes, not just some
# random ones. That's why I avoid from_device for this
capabilities[evdev.ecodes.EV_KEY] = evdev.ecodes.keys.keys()
# just like what python-evdev does in from_device
if evdev.ecodes.EV_SYN in capabilities:
del capabilities[evdev.ecodes.EV_SYN]
if evdev.ecodes.EV_FF in capabilities:
del capabilities[evdev.ecodes.EV_FF]
keymapper_device = evdev.UInput(
name=f'key-mapper {device.name}',
phys='key-mapper',
events=capabilities
)
pipe.send(DEVICE_CREATED)
logger.debug(
'Started injecting into %s, fd %s',
device.path, keymapper_device.fd
)
for event in device.read_loop():
if event.type != evdev.ecodes.EV_KEY:
logger.spam(
'got type:%s code:%s value:%s, forward',
event.type, event.code, event.value
)
keymapper_device.write(event.type, event.code, event.value)
keymapper_device.syn()
continue
if event.value == 2:
# linux does them itself, no need to trigger them
continue
# this happens to report key codes that are 8 lower
# than the ones reported by xev and that X expects
input_keycode = event.code + 8
character = custom_mapping.get_character(input_keycode)
if character is None:
# unknown keycode, forward it
target_keycode = input_keycode
else:
target_keycode = system_mapping.get_keycode(character)
if target_keycode is None:
logger.error(
'Cannot find character %s in the internal mapping',
character
)
continue
logger.spam(
'got code:%s value:%s, maps to code:%s char:%s',
event.code + 8, event.value, target_keycode, character
)
keymapper_device.write(
evdev.ecodes.EV_KEY,
target_keycode - 8,
event.value
)
keymapper_device.syn()

@ -23,7 +23,7 @@ import unittest
import evdev
from keymapper.injector import KeycodeInjector
from keymapper.injector import _start_injecting_worker
from keymapper.getdevices import get_devices
from keymapper.state import custom_mapping, system_mapping
@ -31,12 +31,14 @@ from test import uinput_write_history, Event, pending_events
class TestInjector(unittest.TestCase):
def setUp(self):
self.injector = None
@classmethod
def setUpClass(cls):
cls.injector = None
def tearDown(self):
if self.injector is not None:
self.injector.stop_injecting()
self.injector = None
def test_injector(self):
device = get_devices()['device 2']
@ -64,8 +66,7 @@ class TestInjector(unittest.TestCase):
def send(self, message):
pass
self.injector = KeycodeInjector('device 2')
self.injector._start_injecting_worker(
_start_injecting_worker(
path=device['paths'][0],
pipe=FakePipe()
)

Loading…
Cancel
Save