mirror of
https://github.com/sezanzeb/input-remapper
synced 2024-11-12 01:10:38 +00:00
no .terminate anymore, proper multiprocessing coverage, some minor cleanup in injector
This commit is contained in:
parent
f4ca07490d
commit
17445558fb
@ -1,4 +1,5 @@
|
||||
[run]
|
||||
concurrency = multiprocessing
|
||||
branch = True
|
||||
source = /usr/lib/python3.8/site-packages/keymapper
|
||||
source = keymapper
|
||||
concurrency = multiprocessing
|
||||
debug = multiproc
|
||||
|
@ -59,7 +59,8 @@ groups
|
||||
##### Git/pip
|
||||
|
||||
```bash
|
||||
sudo pip install git+https://github.com/sezanzeb/key-mapper.git
|
||||
git clone https://github.com/sezanzeb/key-mapper.git
|
||||
cd key-mapper && sudo python3 setup.py install
|
||||
```
|
||||
|
||||
##### Manjaro/Arch
|
||||
@ -93,12 +94,12 @@ sudo dpkg -i python3-key-mapper_0.1.0-1_all.deb
|
||||
- [x] support timed macros, maybe using some sort of syntax
|
||||
- [x] add to the AUR, provide .deb file
|
||||
- [ ] automatically load presets when devices get plugged in after login
|
||||
- [ ] support gamepads as keyboard and mouse combi
|
||||
- [ ] support gamepads as keyboard and mouse combi (partially done)
|
||||
|
||||
## Tests
|
||||
|
||||
```bash
|
||||
pylint keymapper --extension-pkg-whitelist=evdev
|
||||
sudo pip install . && coverage run tests/test.py
|
||||
sudo pip install -e . && coverage run tests/test.py
|
||||
coverage combine && coverage report -m
|
||||
```
|
||||
|
@ -27,7 +27,7 @@ import json
|
||||
import shutil
|
||||
import copy
|
||||
|
||||
from keymapper.paths import CONFIG, touch
|
||||
from keymapper.paths import CONFIG, USER, touch
|
||||
from keymapper.logger import logger
|
||||
|
||||
|
||||
|
@ -138,8 +138,14 @@ class Daemon(service.Object):
|
||||
|
||||
return True
|
||||
|
||||
@dbus.service.method('keymapper.Interface')
|
||||
def stop(self):
|
||||
"""Stop all mapping injections."""
|
||||
@dbus.service.method('keymapper.Interface', in_signature='b')
|
||||
def stop(self, terminate=False):
|
||||
"""Stop all injections and end the service.
|
||||
|
||||
Raises dbus.exceptions.DBusException in your main process.
|
||||
"""
|
||||
for injector in self.injectors.values():
|
||||
injector.stop_injecting()
|
||||
|
||||
if terminate:
|
||||
exit(0)
|
||||
|
@ -39,9 +39,7 @@ from keymapper.dev.macros import parse
|
||||
|
||||
|
||||
DEV_NAME = 'key-mapper'
|
||||
DEVICE_CREATED = 1
|
||||
FAILED = 2
|
||||
DEVICE_SKIPPED = 3
|
||||
CLOSE = 0
|
||||
|
||||
|
||||
def is_numlock_on():
|
||||
@ -67,8 +65,8 @@ def toggle_numlock():
|
||||
logger.debug('numlockx not found, trying to inject a keycode')
|
||||
# and this doesn't always work.
|
||||
device = evdev.UInput(
|
||||
name=f'key-mapper numlock-control',
|
||||
phys='key-mapper',
|
||||
name=f'{DEV_NAME} numlock-control',
|
||||
phys=DEV_NAME,
|
||||
)
|
||||
device.write(EV_KEY, evdev.ecodes.KEY_NUMLOCK, 1)
|
||||
device.syn()
|
||||
@ -110,10 +108,7 @@ class KeycodeInjector:
|
||||
self.device = device
|
||||
self.mapping = mapping
|
||||
self._process = None
|
||||
|
||||
def __del__(self):
|
||||
if self._process is not None:
|
||||
self._process.terminate()
|
||||
self._msg_pipe = multiprocessing.Pipe()
|
||||
|
||||
def start_injecting(self):
|
||||
"""Start injecting keycodes."""
|
||||
@ -121,11 +116,16 @@ class KeycodeInjector:
|
||||
self._process.start()
|
||||
|
||||
def _prepare_device(self, path):
|
||||
"""Try to grab the device, return if not needed/possible."""
|
||||
"""Try to grab the device, return if not needed/possible.
|
||||
|
||||
Also return if ABS events are changed to REL mouse movements,
|
||||
because the capabilities of the returned device are changed
|
||||
so this cannot be checked later anymore.
|
||||
"""
|
||||
device = evdev.InputDevice(path)
|
||||
|
||||
if device is None:
|
||||
return None
|
||||
return None, False
|
||||
|
||||
capabilities = device.capabilities(absinfo=False)
|
||||
|
||||
@ -136,15 +136,15 @@ class KeycodeInjector:
|
||||
needed = True
|
||||
break
|
||||
|
||||
can_do_abs = evdev.ecodes.ABS_X in capabilities.get(EV_ABS, [])
|
||||
if self.map_abs_to_rel() and can_do_abs:
|
||||
map_ABS = self.map_ABS(device)
|
||||
if map_ABS:
|
||||
needed = True
|
||||
|
||||
if not needed:
|
||||
# skipping reading and checking on events from those devices
|
||||
# may be beneficial for performance.
|
||||
logger.debug('No need to grab %s', path)
|
||||
return None
|
||||
return None, False
|
||||
|
||||
attempts = 0
|
||||
while True:
|
||||
@ -164,22 +164,25 @@ class KeycodeInjector:
|
||||
|
||||
if attempts >= 4:
|
||||
logger.error('Cannot grab %s, it is possibly in use', path)
|
||||
return None
|
||||
return None, False
|
||||
|
||||
time.sleep(0.15)
|
||||
|
||||
return device
|
||||
return device, map_ABS
|
||||
|
||||
def map_abs_to_rel(self):
|
||||
def map_ABS(self, device):
|
||||
# TODO offer configuration via the UI if a gamepad is elected
|
||||
return True
|
||||
capabilities = device.capabilities(absinfo=False)
|
||||
return evdev.ecodes.ABS_X in capabilities.get(EV_ABS, [])
|
||||
|
||||
def _modify_capabilities(self, input_device):
|
||||
def _modify_capabilities(self, input_device, map_ABS):
|
||||
"""Adds all keycode into a copy of a devices capabilities.
|
||||
|
||||
Prameters
|
||||
---------
|
||||
input_device : evdev.InputDevice
|
||||
map_ABS : bool
|
||||
if ABS capabilities should be removed in favor of REL
|
||||
"""
|
||||
ecodes = evdev.ecodes
|
||||
|
||||
@ -196,9 +199,8 @@ class KeycodeInjector:
|
||||
if keycode is not None:
|
||||
capabilities[ecodes.EV_KEY].append(keycode - KEYCODE_OFFSET)
|
||||
|
||||
if self.map_abs_to_rel():
|
||||
if capabilities.get(ecodes.EV_ABS):
|
||||
del capabilities[ecodes.EV_ABS]
|
||||
if map_ABS:
|
||||
del capabilities[ecodes.EV_ABS]
|
||||
capabilities[ecodes.EV_REL] = [
|
||||
evdev.ecodes.REL_X,
|
||||
evdev.ecodes.REL_Y,
|
||||
@ -214,23 +216,37 @@ class KeycodeInjector:
|
||||
|
||||
return capabilities
|
||||
|
||||
async def _msg_listener(self, loop):
|
||||
"""Wait for messages from the main process to do special stuff."""
|
||||
while True:
|
||||
frame_available = asyncio.Event()
|
||||
loop.add_reader(self._msg_pipe[0].fileno(), frame_available.set)
|
||||
await frame_available.wait()
|
||||
frame_available.clear()
|
||||
msg = self._msg_pipe[0].recv()
|
||||
if msg == CLOSE:
|
||||
logger.debug('Received close signal')
|
||||
# stop the event loop and cause the process to reach its end
|
||||
# cleanly. Using .terminate prevents coverage from working.
|
||||
loop.stop()
|
||||
return
|
||||
|
||||
def _start_injecting(self):
|
||||
"""The injection worker that keeps injecting until terminated.
|
||||
|
||||
Stuff is non-blocking by using asyncio in order to do multiple things
|
||||
somewhat concurrently.
|
||||
"""
|
||||
# TODO do select.select insted of async_read_loop
|
||||
loop = asyncio.get_event_loop()
|
||||
coroutines = []
|
||||
|
||||
logger.info('Starting injecting the mapping for %s', self.device)
|
||||
|
||||
paths = get_devices()[self.device]['paths']
|
||||
devices = [self._prepare_device(path) for path in paths]
|
||||
|
||||
# Watch over each one of the potentially multiple devices per hardware
|
||||
for input_device in devices:
|
||||
for path in paths:
|
||||
input_device, map_ABS = self._prepare_device(path)
|
||||
if input_device is None:
|
||||
continue
|
||||
|
||||
@ -238,17 +254,19 @@ class KeycodeInjector:
|
||||
# EV_ABS capability, EV_REL won't move the mouse pointer anymore.
|
||||
# so don't merge all InputDevices into one UInput device.
|
||||
uinput = evdev.UInput(
|
||||
name=f'key-mapper {self.device}',
|
||||
phys='key-mapper',
|
||||
events=self._modify_capabilities(input_device)
|
||||
name=f'{DEV_NAME} {self.device}',
|
||||
phys=DEV_NAME,
|
||||
events=self._modify_capabilities(input_device, map_ABS)
|
||||
)
|
||||
|
||||
# TODO separate file
|
||||
# keycode injection
|
||||
coroutine = self._keycode_loop(input_device, uinput)
|
||||
coroutine = self._keycode_loop(input_device, uinput, map_ABS)
|
||||
coroutines.append(coroutine)
|
||||
|
||||
# TODO separate file
|
||||
# mouse movement injection
|
||||
if self.map_abs_to_rel():
|
||||
if map_ABS:
|
||||
self.abs_x = 0
|
||||
self.abs_y = 0
|
||||
# events only take ints, so a movement of 0.3 needs to add
|
||||
@ -262,7 +280,13 @@ class KeycodeInjector:
|
||||
logger.error('Did not grab any device')
|
||||
return
|
||||
|
||||
loop.run_until_complete(asyncio.gather(*coroutines))
|
||||
coroutines.append(self._msg_listener(loop))
|
||||
|
||||
try:
|
||||
loop.run_until_complete(asyncio.gather(*coroutines))
|
||||
except RuntimeError:
|
||||
# stopped event loop most likely
|
||||
pass
|
||||
|
||||
if len(coroutines) > 0:
|
||||
logger.debug('asyncio coroutines ended')
|
||||
@ -336,7 +360,7 @@ class KeycodeInjector:
|
||||
rel_x
|
||||
)
|
||||
|
||||
async def _keycode_loop(self, device, keymapper_device):
|
||||
async def _keycode_loop(self, device, keymapper_device, map_ABS):
|
||||
"""Inject keycodes for one of the virtual devices.
|
||||
|
||||
Parameters
|
||||
@ -345,6 +369,8 @@ class KeycodeInjector:
|
||||
where to read keycodes from
|
||||
keymapper_device : evdev.UInput
|
||||
where to write keycodes to
|
||||
map_ABS : bool
|
||||
the value of map_ABS() for the original device
|
||||
"""
|
||||
# Parse all macros beforehand
|
||||
logger.debug('Parsing macros')
|
||||
@ -363,7 +389,7 @@ class KeycodeInjector:
|
||||
)
|
||||
|
||||
async for event in device.async_read_loop():
|
||||
if self.map_abs_to_rel() and event.type == EV_ABS:
|
||||
if map_ABS and event.type == EV_ABS:
|
||||
if event.code not in [evdev.ecodes.ABS_X, evdev.ecodes.ABS_Y]:
|
||||
continue
|
||||
if event.code == evdev.ecodes.ABS_X:
|
||||
@ -438,5 +464,4 @@ class KeycodeInjector:
|
||||
def stop_injecting(self):
|
||||
"""Stop injecting keycodes."""
|
||||
logger.info('Stopping injecting keycodes for device "%s"', self.device)
|
||||
if self._process is not None and self._process.is_alive():
|
||||
self._process.terminate()
|
||||
self._msg_pipe[1].send(CLOSE)
|
||||
|
@ -31,6 +31,9 @@ from keymapper.getdevices import get_devices, refresh_devices
|
||||
from keymapper.state import KEYCODE_OFFSET
|
||||
|
||||
|
||||
CLOSE = 1
|
||||
|
||||
|
||||
class _KeycodeReader:
|
||||
"""Keeps reading keycodes in the background for the UI to use.
|
||||
|
||||
@ -47,15 +50,9 @@ class _KeycodeReader:
|
||||
self.stop_reading()
|
||||
|
||||
def stop_reading(self):
|
||||
# TODO something like this for the injector?
|
||||
if self._process is not None:
|
||||
logger.debug('Terminating reader process')
|
||||
self._process.terminate()
|
||||
self._process = None
|
||||
|
||||
if self._pipe is not None:
|
||||
logger.debug('Closing reader pipe')
|
||||
self._pipe[0].close()
|
||||
logger.debug('Sending close msg to reader')
|
||||
self._pipe[0].send(CLOSE)
|
||||
self._pipe = None
|
||||
|
||||
def clear(self):
|
||||
@ -77,8 +74,7 @@ class _KeycodeReader:
|
||||
|
||||
self.virtual_devices = []
|
||||
|
||||
for name, group in get_devices(include_keymapper=True).items():
|
||||
# also find stuff like "key-mapper {device}"
|
||||
for name, group in get_devices().items():
|
||||
if device_name not in name:
|
||||
continue
|
||||
|
||||
@ -99,35 +95,46 @@ class _KeycodeReader:
|
||||
)
|
||||
|
||||
pipe = multiprocessing.Pipe()
|
||||
self._process = multiprocessing.Process(
|
||||
target=self._read_worker,
|
||||
args=(pipe[1],)
|
||||
)
|
||||
self._process.start()
|
||||
self._pipe = pipe
|
||||
self._process = multiprocessing.Process(target=self._read_worker)
|
||||
self._process.start()
|
||||
|
||||
def _consume_event(self, event, pipe):
|
||||
def _consume_event(self, event):
|
||||
"""Write the event code into the pipe if it is a key-down press."""
|
||||
# value: 1 for down, 0 for up, 2 for hold.
|
||||
if self._pipe[1].closed:
|
||||
logger.debug('Pipe closed, reader stops.')
|
||||
exit(0)
|
||||
|
||||
if event.type == evdev.ecodes.EV_KEY and event.value == 1:
|
||||
logger.spam(
|
||||
'got code:%s value:%s',
|
||||
event.code + KEYCODE_OFFSET, event.value
|
||||
)
|
||||
pipe.send(event.code + KEYCODE_OFFSET)
|
||||
self._pipe[1].send(event.code + KEYCODE_OFFSET)
|
||||
|
||||
def _read_worker(self, pipe):
|
||||
def _read_worker(self):
|
||||
"""Process that reads keycodes and buffers them into a pipe."""
|
||||
# using a process that blocks instead of read_one made it easier
|
||||
# to debug via the logs, because the UI was not polling properly
|
||||
# at some point which caused logs for events not to be written.
|
||||
rlist = {device.fd: device for device in self.virtual_devices}
|
||||
rlist[self._pipe[1]] = self._pipe[1]
|
||||
|
||||
while True:
|
||||
ready = select.select(rlist, [], [])[0]
|
||||
for fd in ready:
|
||||
readable = rlist[fd]
|
||||
if isinstance(readable, multiprocessing.connection.Connection):
|
||||
msg = readable.recv()
|
||||
if msg == CLOSE:
|
||||
logger.debug('Reader stopped')
|
||||
return
|
||||
continue
|
||||
|
||||
try:
|
||||
for event in rlist[fd].read():
|
||||
self._consume_event(event, pipe)
|
||||
self._consume_event(event)
|
||||
except OSError:
|
||||
logger.debug(
|
||||
'Device "%s" disappeared from the reader',
|
||||
|
@ -69,6 +69,9 @@ def get_selected_row_bg():
|
||||
return color.to_string()
|
||||
|
||||
|
||||
# TODO show if the preset is being injected
|
||||
|
||||
|
||||
class Window:
|
||||
"""User Interface."""
|
||||
def __init__(self):
|
||||
|
@ -62,6 +62,13 @@ class Formatter(logging.Formatter):
|
||||
SPAM: 34,
|
||||
logging.INFO: 32,
|
||||
}.get(record.levelno, 0)
|
||||
|
||||
# if this runs in a separate process, write down the pid
|
||||
# to debug exit codes and such
|
||||
pid = ''
|
||||
if os.getpid() != logger.main_pid:
|
||||
pid = f'pid {os.getpid()}, '
|
||||
|
||||
if debug:
|
||||
self._style._fmt = ( # noqa
|
||||
'\033[1m' # bold
|
||||
@ -69,7 +76,7 @@ class Formatter(logging.Formatter):
|
||||
f'%(levelname)s'
|
||||
'\033[0m' # end style
|
||||
f'\033[{color}m' # color
|
||||
': %(filename)s, line %(lineno)d, %(message)s'
|
||||
f': {pid}%(filename)s, line %(lineno)d, %(message)s'
|
||||
'\033[0m' # end style
|
||||
)
|
||||
else:
|
||||
@ -85,6 +92,7 @@ handler.setFormatter(Formatter())
|
||||
logger.addHandler(handler)
|
||||
logger.setLevel(logging.INFO)
|
||||
logging.getLogger('asyncio').setLevel(logging.WARNING)
|
||||
logger.main_pid = os.getpid()
|
||||
|
||||
|
||||
def is_debug():
|
||||
|
13
scripts/install.sh
Executable file
13
scripts/install.sh
Executable file
@ -0,0 +1,13 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# in case setup.py does nothing instead of something.
|
||||
# call via `./scripts/build.sh`
|
||||
|
||||
# try both ways of installation
|
||||
sudo pip3 install .
|
||||
sudo python3 setup.py install
|
||||
|
||||
# copy crucial files
|
||||
sudo cp bin/* /usr/bin/ -r
|
||||
sudo mkdir /usr/share/key-mapper
|
||||
sudo cp data/* /usr/share/key-mapper -r
|
@ -158,10 +158,18 @@ def patch_select():
|
||||
import select
|
||||
|
||||
def new_select(rlist, *args):
|
||||
return ([
|
||||
device for device in rlist
|
||||
if len(pending_events.get(device, [])) > 0
|
||||
],)
|
||||
ret = []
|
||||
for thing in rlist:
|
||||
if hasattr(thing, 'poll') and thing.poll():
|
||||
# the reader receives msgs through pipes. If there is one
|
||||
# ready, provide the pipe
|
||||
ret.append(thing)
|
||||
continue
|
||||
|
||||
if len(pending_events.get(thing, [])) > 0:
|
||||
ret.append(thing)
|
||||
|
||||
return [ret, [], []]
|
||||
|
||||
select.select = new_select
|
||||
|
||||
|
@ -20,13 +20,9 @@
|
||||
|
||||
|
||||
import os
|
||||
import sys
|
||||
import multiprocessing
|
||||
import unittest
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
from importlib.util import spec_from_loader, module_from_spec
|
||||
from importlib.machinery import SourceFileLoader
|
||||
|
||||
import dbus
|
||||
import evdev
|
||||
@ -61,12 +57,10 @@ class TestDBusDaemon(unittest.TestCase):
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.interface.stop()
|
||||
time.sleep(0.1)
|
||||
cls.process.terminate()
|
||||
time.sleep(0.1)
|
||||
os.system('pkill -f key-mapper-service')
|
||||
time.sleep(0.1)
|
||||
try:
|
||||
cls.interface.stop(True)
|
||||
except dbus.exceptions.DBusException:
|
||||
pass
|
||||
|
||||
def test_can_connect(self):
|
||||
self.assertIsInstance(self.interface, dbus.Interface)
|
||||
|
@ -58,8 +58,9 @@ class TestInjector(unittest.TestCase):
|
||||
self.injector.stop_injecting()
|
||||
self.injector = None
|
||||
evdev.InputDevice.grab = self.grab
|
||||
if pending_events.get('device 2') is not None:
|
||||
del pending_events['device 2']
|
||||
keys = list(pending_events.keys())
|
||||
for key in keys:
|
||||
del pending_events[key]
|
||||
clear_write_history()
|
||||
|
||||
def test_modify_capabilities(self):
|
||||
@ -80,7 +81,11 @@ class TestInjector(unittest.TestCase):
|
||||
maps_to = system_mapping['a'] - KEYCODE_OFFSET
|
||||
|
||||
self.injector = KeycodeInjector('foo', mapping)
|
||||
capabilities = self.injector._modify_capabilities(FakeDevice())
|
||||
fake_device = FakeDevice()
|
||||
capabilities = self.injector._modify_capabilities(
|
||||
fake_device,
|
||||
map_ABS=False
|
||||
)
|
||||
|
||||
self.assertIn(EV_KEY, capabilities)
|
||||
keys = capabilities[EV_KEY]
|
||||
@ -97,17 +102,30 @@ class TestInjector(unittest.TestCase):
|
||||
path = '/dev/input/event10'
|
||||
# this test needs to pass around all other constraints of
|
||||
# _prepare_device
|
||||
device = self.injector._prepare_device(path)
|
||||
device, map_ABS = self.injector._prepare_device(path)
|
||||
self.assertFalse(map_ABS)
|
||||
self.assertEqual(self.failed, 2)
|
||||
# success on the third try
|
||||
device.name = fixtures[path]['name']
|
||||
|
||||
def test_gamepad_capabilities(self):
|
||||
self.injector = KeycodeInjector('gamepad', custom_mapping)
|
||||
|
||||
path = '/dev/input/event30'
|
||||
device, map_ABS = self.injector._prepare_device(path)
|
||||
self.assertTrue(map_ABS)
|
||||
|
||||
capabilities = self.injector._modify_capabilities(device, map_ABS)
|
||||
self.assertNotIn(evdev.ecodes.EV_ABS, capabilities)
|
||||
self.assertIn(evdev.ecodes.EV_REL, capabilities)
|
||||
|
||||
def test_skip_unused_device(self):
|
||||
# skips a device because its capabilities are not used in the mapping
|
||||
custom_mapping.change(10, 'a')
|
||||
self.injector = KeycodeInjector('device 1', custom_mapping)
|
||||
path = '/dev/input/event11'
|
||||
device = self.injector._prepare_device(path)
|
||||
device, map_ABS = self.injector._prepare_device(path)
|
||||
self.assertFalse(map_ABS)
|
||||
self.assertEqual(self.failed, 0)
|
||||
self.assertIsNone(device)
|
||||
|
||||
@ -115,7 +133,7 @@ class TestInjector(unittest.TestCase):
|
||||
# skips a device because its capabilities are not used in the mapping
|
||||
self.injector = KeycodeInjector('device 1', custom_mapping)
|
||||
path = '/dev/input/event11'
|
||||
device = self.injector._prepare_device(path)
|
||||
device, _ = self.injector._prepare_device(path)
|
||||
|
||||
# make sure the test uses a fixture without interesting capabilities
|
||||
capabilities = evdev.InputDevice(path).capabilities()
|
||||
@ -145,7 +163,7 @@ class TestInjector(unittest.TestCase):
|
||||
|
||||
def test_abs_to_rel(self):
|
||||
# maps gamepad joystick events to mouse events
|
||||
# TODO enable this somewhere so that map_abs_to_rel returns true
|
||||
# TODO enable this somewhere so that map_ABS returns true
|
||||
# in the .json file of the mapping.
|
||||
config.set('gamepad.non_linearity', 1)
|
||||
pointer_speed = 80
|
||||
@ -182,6 +200,11 @@ class TestInjector(unittest.TestCase):
|
||||
event = uinput_write_history_pipe[0].recv()
|
||||
history.append((event.type, event.code, event.value))
|
||||
|
||||
if history[0][0] == EV_ABS:
|
||||
raise AssertionError(
|
||||
'The injector probably just forwarded them unchanged'
|
||||
)
|
||||
|
||||
# movement is written at 60hz and it takes `divisor` steps to
|
||||
# move 1px. take it times 2 for both x and y events.
|
||||
self.assertGreater(len(history), 60 * sleep * 0.9 * 2 / divisor)
|
||||
|
@ -50,8 +50,10 @@ def gtk_iteration():
|
||||
Gtk.main_iteration()
|
||||
|
||||
|
||||
def launch(argv=None, bin_path='/bin/key-mapper-gtk'):
|
||||
def launch(argv=None):
|
||||
"""Start key-mapper-gtk with the command line argument array argv."""
|
||||
bin_path = os.path.join(os.getcwd(), 'bin', 'key-mapper-gtk')
|
||||
|
||||
if not argv:
|
||||
argv = ['-d']
|
||||
|
||||
|
@ -52,6 +52,10 @@ class TestReader(unittest.TestCase):
|
||||
Event(evdev.events.EV_KEY, CODE_3, 1)
|
||||
]
|
||||
keycode_reader.start_reading('device 1')
|
||||
|
||||
# sending anything arbitrary does not stop the pipe
|
||||
keycode_reader._pipe[0].send(1234)
|
||||
|
||||
time.sleep(EVENT_READ_TIMEOUT * 5)
|
||||
self.assertEqual(keycode_reader.read(), CODE_3 + 8)
|
||||
self.assertIsNone(keycode_reader.read())
|
||||
@ -67,8 +71,10 @@ class TestReader(unittest.TestCase):
|
||||
self.assertIsNone(keycode_reader.read())
|
||||
|
||||
def test_keymapper_devices(self):
|
||||
# In order to show pressed keycodes on the ui while the device is
|
||||
# grabbed, read from that as well.
|
||||
# Don't read from keymapper devices, their keycodes are not
|
||||
# representative for the original key. As long as this is not
|
||||
# intentionally programmed it won't even do that. But it was at some
|
||||
# point.
|
||||
pending_events['key-mapper device 2'] = [
|
||||
Event(evdev.events.EV_KEY, CODE_1, 1),
|
||||
Event(evdev.events.EV_KEY, CODE_2, 1),
|
||||
@ -76,7 +82,6 @@ class TestReader(unittest.TestCase):
|
||||
]
|
||||
keycode_reader.start_reading('device 2')
|
||||
time.sleep(EVENT_READ_TIMEOUT * 5)
|
||||
self.assertEqual(keycode_reader.read(), CODE_3 + 8)
|
||||
self.assertIsNone(keycode_reader.read())
|
||||
|
||||
def test_clear(self):
|
||||
|
Loading…
Reference in New Issue
Block a user