renamed injector class, fixed config path for injection

xkb
sezanzeb 4 years ago committed by sezanzeb
parent a84ef5c0cd
commit 4bad927ffc

@ -1,5 +1,5 @@
[run] [run]
branch = True branch = True
source = /usr/lib/python3.8/site-packages/keymapper source = /usr/lib/python3.9/site-packages/keymapper
concurrency = multiprocessing concurrency = multiprocessing
debug = multiproc debug = multiproc

@ -59,7 +59,7 @@ def group_exists(name):
return False return False
def main(options, daemon, xmodmap_path): def main(options, daemon, config_path):
"""Do the stuff that the executable is supposed to do.""" """Do the stuff that the executable is supposed to do."""
# Is a function so that I can import it and test it # Is a function so that I can import it and test it
if options.list_devices: if options.list_devices:
@ -85,7 +85,7 @@ def main(options, daemon, xmodmap_path):
mapping = Mapping() mapping = Mapping()
preset_path = get_preset_path(device, preset) preset_path = get_preset_path(device, preset)
mapping.load(preset_path) mapping.load(preset_path)
daemon.start_injecting(device, preset_path, xmodmap_path) daemon.start_injecting(device, preset_path, config_path)
if options.command == START: if options.command == START:
if options.device is None: if options.device is None:
@ -97,7 +97,7 @@ def main(options, daemon, xmodmap_path):
sys.exit(1) sys.exit(1)
preset_path = os.path.abspath(os.path.expanduser(options.preset)) preset_path = os.path.abspath(os.path.expanduser(options.preset))
daemon.start_injecting(options.device, preset_path, xmodmap_path) daemon.start_injecting(options.device, preset_path, config_path)
if options.command == STOP: if options.command == STOP:
if options.device is None: if options.device is None:
@ -149,6 +149,6 @@ if __name__ == '__main__':
if daemon is None: if daemon is None:
sys.exit(0) sys.exit(0)
xmodmap_path = get_config_path(XMODMAP_FILENAME) config_path = get_config_path()
main(options, daemon, xmodmap_path) main(options, daemon, config_path)

@ -214,8 +214,22 @@ class GlobalConfig(ConfigBase):
"""Should this preset be loaded automatically?""" """Should this preset be loaded automatically?"""
return self.get(['autoload', device], log_unknown=False) == preset return self.get(['autoload', device], log_unknown=False) == preset
def load_config(self): def load_config(self, path=None):
"""Load the config from the file system.""" """Load the config from the file system.
Parameters
----------
path : string or None
If set, will change the path to load from and save to.
"""
if path is not None:
# TODO Test
if not os.path.exists(path):
logger.error('Config at "%s" not found', path)
return
self.path = path
self.clear_config() self.clear_config()
if not os.path.exists(self.path): if not os.path.exists(self.path):

@ -25,6 +25,7 @@ https://github.com/LEW21/pydbus/tree/cc407c8b1d25b7e28a6d661a29f9e661b1c9b964/ex
""" """
import os
import subprocess import subprocess
import json import json
@ -32,7 +33,7 @@ from pydbus import SystemBus
from gi.repository import GLib from gi.repository import GLib
from keymapper.logger import logger from keymapper.logger import logger
from keymapper.dev.injector import KeycodeInjector from keymapper.dev.injector import Injector
from keymapper.mapping import Mapping from keymapper.mapping import Mapping
from keymapper.config import config from keymapper.config import config
from keymapper.state import system_mapping from keymapper.state import system_mapping
@ -148,7 +149,7 @@ class Daemon:
"""Is this device being mapped?""" """Is this device being mapped?"""
return device in self.injectors return device in self.injectors
def start_injecting(self, device, path, xmodmap_path=None): def start_injecting(self, device, preset_path, config_dir=None):
"""Start injecting the preset for the device. """Start injecting the preset for the device.
Returns True on success. Returns True on success.
@ -157,33 +158,38 @@ class Daemon:
---------- ----------
device : string device : string
The name of the device The name of the device
path : string preset_path : string
Path to the preset. The daemon, if started via systemctl, has no Path to the preset. The daemon, if started via systemctl, has no
knowledge of the user and their home path, so the complete knowledge of the user and their home path, so the complete
absolute path needs to be provided here. absolute path needs to be provided here.
xmodmap_path : string, None config_dir : string
Path to a dump of the xkb mappings, to provide more human Contains xmodmap.json and config.json of the current users session
readable keys in the correct keyboard layout to the service.
The service cannot use `xmodmap -pke` because it's running via
systemd.
""" """
# reload the config, since it may have been changed
if config_dir is not None:
config_path = os.path.join(config_dir, 'config.json')
config.load_config(config_path)
if device not in get_devices(): if device not in get_devices():
logger.debug('Devices possibly outdated, refreshing') logger.debug('Devices possibly outdated, refreshing')
refresh_devices() refresh_devices()
# reload the config, since it may have been changed
config.load_config()
if self.injectors.get(device) is not None: if self.injectors.get(device) is not None:
self.injectors[device].stop_injecting() self.injectors[device].stop_injecting()
mapping = Mapping() mapping = Mapping()
try: try:
mapping.load(path) mapping.load(preset_path)
except FileNotFoundError as error: except FileNotFoundError as error:
logger.error(str(error)) logger.error(str(error))
return False return False
if xmodmap_path is not None: # Path to a dump of the xkb mappings, to provide more human
# readable keys in the correct keyboard layout to the service.
# The service cannot use `xmodmap -pke` because it's running via
# systemd.
if config_dir is not None:
xmodmap_path = os.path.join(config_dir, 'xmodmap.json')
try: try:
with open(xmodmap_path, 'r') as file: with open(xmodmap_path, 'r') as file:
xmodmap = json.load(file) xmodmap = json.load(file)
@ -195,7 +201,7 @@ class Daemon:
logger.error('Could not find "%s"', xmodmap_path) logger.error('Could not find "%s"', xmodmap_path)
try: try:
injector = KeycodeInjector(device, mapping) injector = Injector(device, mapping)
injector.start_injecting() injector.start_injecting()
self.injectors[device] = injector self.injectors[device] = injector
except OSError: except OSError:

@ -112,11 +112,11 @@ def is_in_capabilities(key, capabilities):
return False return False
class KeycodeInjector: class Injector:
"""Keeps injecting keycodes in the background based on the mapping. """Keeps injecting events in the background based on mapping and config.
Is a process to make it non-blocking for the rest of the code and to Is a process to make it non-blocking for the rest of the code and to
make running multiple injector easier. There is one procss per make running multiple injector easier. There is one process per
hardware-device that is being mapped. hardware-device that is being mapped.
""" """
regrab_timeout = 0.5 regrab_timeout = 0.5

@ -467,8 +467,7 @@ class Window:
self.show_status(CTX_APPLY, f'Applied preset "{preset}"') self.show_status(CTX_APPLY, f'Applied preset "{preset}"')
path = get_preset_path(device, preset) path = get_preset_path(device, preset)
xmodmap = get_config_path(XMODMAP_FILENAME) success = self.dbus.start_injecting(device, path, get_config_path())
success = self.dbus.start_injecting(device, path, xmodmap)
if not success: if not success:
self.show_status(CTX_ERROR, 'Error: Could not grab devices!') self.show_status(CTX_ERROR, 'Error: Could not grab devices!')

@ -30,6 +30,9 @@ import pwd
from keymapper.logger import logger from keymapper.logger import logger
# TODO unittest everything in here
def get_user(): def get_user():
"""Try to find the user who called sudo/pkexec.""" """Try to find the user who called sudo/pkexec."""
try: try:

@ -30,12 +30,13 @@ requests.
- [x] mapping joystick directions as buttons, making it act like a D-Pad - [x] mapping joystick directions as buttons, making it act like a D-Pad
- [ ] mapping mouse wheel events to buttons - [ ] mapping mouse wheel events to buttons
- [ ] automatically load presets when devices get plugged in after login (udev) - [ ] automatically load presets when devices get plugged in after login (udev)
- [ ] configure locale for preset to provide a different set of possible keys - [ ] using keys that aren't available in the systems keyboard layout
- [ ] user-friendly way to map btn_left - [ ] user-friendly way to map the left mouse button
## Tests ## Tests
```bash ```bash
sudo pip install coverage
pylint keymapper --extension-pkg-whitelist=evdev pylint keymapper --extension-pkg-whitelist=evdev
sudo pip install . && coverage run tests/test.py sudo pip install . && coverage run tests/test.py
coverage combine && coverage report -m coverage combine && coverage report -m

@ -380,7 +380,7 @@ patch_unsaved()
patch_select() patch_select()
from keymapper.logger import update_verbosity from keymapper.logger import update_verbosity
from keymapper.dev.injector import KeycodeInjector from keymapper.dev.injector import Injector
from keymapper.config import config from keymapper.config import config
from keymapper.dev.reader import keycode_reader from keymapper.dev.reader import keycode_reader
from keymapper.getdevices import refresh_devices from keymapper.getdevices import refresh_devices
@ -388,7 +388,7 @@ from keymapper.state import system_mapping, custom_mapping
from keymapper.dev.keycode_mapper import active_macros, unreleased from keymapper.dev.keycode_mapper import active_macros, unreleased
# no need for a high number in tests # no need for a high number in tests
KeycodeInjector.regrab_timeout = 0.15 Injector.regrab_timeout = 0.15
_fixture_copy = copy.deepcopy(fixtures) _fixture_copy = copy.deepcopy(fixtures)
@ -401,7 +401,8 @@ def cleanup():
keycode_reader.newest_event = None keycode_reader.newest_event = None
keycode_reader._unreleased = {} keycode_reader._unreleased = {}
for task in asyncio.Task.all_tasks(): if asyncio.get_event_loop().is_running():
for task in asyncio.all_tasks():
task.cancel() task.cancel()
os.system('pkill -f key-mapper-service') os.system('pkill -f key-mapper-service')

@ -71,7 +71,7 @@ class TestControl(unittest.TestCase):
get_preset_path(devices[0], presets[0]), get_preset_path(devices[0], presets[0]),
get_preset_path(devices[1], presets[1]) get_preset_path(devices[1], presets[1])
] ]
xmodmap = 'a/xmodmap.json' config_dir = '/foo/bar'
Mapping().save(paths[0]) Mapping().save(paths[0])
Mapping().save(paths[1]) Mapping().save(paths[1])
@ -86,17 +86,17 @@ class TestControl(unittest.TestCase):
config.set_autoload_preset(devices[0], presets[0]) config.set_autoload_preset(devices[0], presets[0])
config.set_autoload_preset(devices[1], presets[1]) config.set_autoload_preset(devices[1], presets[1])
control(options('autoload', None, None, False, False), daemon, xmodmap) control(options('autoload', None, None, False, False), daemon, config_dir)
self.assertEqual(len(start_history), 2) self.assertEqual(len(start_history), 2)
self.assertEqual(len(stop_history), 1) self.assertEqual(len(stop_history), 1)
self.assertEqual(start_history[0], (devices[0], os.path.expanduser(paths[0]), xmodmap)) self.assertEqual(start_history[0], (devices[0], os.path.expanduser(paths[0]), config_dir))
self.assertEqual(start_history[1], (devices[1], os.path.abspath(paths[1]), xmodmap)) self.assertEqual(start_history[1], (devices[1], os.path.abspath(paths[1]), config_dir))
def test_start_stop(self): def test_start_stop(self):
device = 'device 1234' device = 'device 1234'
path = '~/a/preset.json' path = '~/a/preset.json'
xmodmap = 'a/xmodmap.json' config_dir = '/foo/bar'
daemon = Daemon() daemon = Daemon()
@ -105,12 +105,12 @@ class TestControl(unittest.TestCase):
daemon.start_injecting = lambda *args: start_history.append(args) daemon.start_injecting = lambda *args: start_history.append(args)
daemon.stop_injecting = lambda *args: stop_history.append(args) daemon.stop_injecting = lambda *args: stop_history.append(args)
control(options('start', path, device, False, False), daemon, xmodmap) control(options('start', path, device, False, False), daemon, config_dir)
control(options('stop', None, device, False, False), daemon, None) control(options('stop', None, device, False, False), daemon, None)
self.assertEqual(len(start_history), 1) self.assertEqual(len(start_history), 1)
self.assertEqual(len(stop_history), 1) self.assertEqual(len(stop_history), 1)
self.assertEqual(start_history[0], (device, os.path.expanduser(path), xmodmap)) self.assertEqual(start_history[0], (device, os.path.expanduser(path), config_dir))
self.assertEqual(stop_history[0], (device,)) self.assertEqual(stop_history[0], (device,))

@ -248,18 +248,30 @@ class TestDaemon(unittest.TestCase):
InputEvent(*event) InputEvent(*event)
] ]
xmodmap_path = os.path.join(tmp, 'foobar.json') config_dir = os.path.join(tmp, 'foo')
os.makedirs(config_dir, exist_ok=True)
config_path = os.path.join(config_dir, 'config.json')
with open(config_path, 'w') as file:
file.write('{"bar":1234}')
xmodmap_path = os.path.join(config_dir, 'xmodmap.json')
with open(xmodmap_path, 'w') as file: with open(xmodmap_path, 'w') as file:
file.write(f'{{"{to_name}":{to_keycode}}}') file.write(f'{{"{to_name}":{to_keycode}}}')
self.daemon = Daemon() self.daemon = Daemon()
self.daemon.start_injecting(device, path, xmodmap_path)
self.daemon.start_injecting(device, path, config_dir)
event = uinput_write_history_pipe[0].recv() event = uinput_write_history_pipe[0].recv()
self.assertEqual(event.type, EV_KEY) self.assertEqual(event.type, EV_KEY)
self.assertEqual(event.code, to_keycode) self.assertEqual(event.code, to_keycode)
self.assertEqual(event.value, 1) self.assertEqual(event.value, 1)
# since the daemon is running in the same process, the config
# that the test knows will be overwritten
self.assertEqual(config.get('bar'), 1234)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

@ -27,7 +27,7 @@ import evdev
from evdev.ecodes import EV_REL, EV_KEY, EV_ABS, ABS_HAT0X, BTN_LEFT, KEY_A from evdev.ecodes import EV_REL, EV_KEY, EV_ABS, ABS_HAT0X, BTN_LEFT, KEY_A
from keymapper.dev.injector import is_numlock_on, set_numlock, \ from keymapper.dev.injector import is_numlock_on, set_numlock, \
ensure_numlock, KeycodeInjector, is_in_capabilities ensure_numlock, Injector, is_in_capabilities
from keymapper.state import custom_mapping, system_mapping from keymapper.state import custom_mapping, system_mapping
from keymapper.mapping import Mapping, DISABLE_CODE, DISABLE_NAME from keymapper.mapping import Mapping, DISABLE_CODE, DISABLE_NAME
from keymapper.config import config from keymapper.config import config
@ -101,7 +101,7 @@ class TestInjector(unittest.TestCase):
two = system_mapping.get('2') two = system_mapping.get('2')
btn_left = system_mapping.get('BtN_lEfT') btn_left = system_mapping.get('BtN_lEfT')
self.injector = KeycodeInjector('foo', mapping) self.injector = Injector('foo', mapping)
fake_device = FakeDevice() fake_device = FakeDevice()
capabilities_1 = self.injector._modify_capabilities( capabilities_1 = self.injector._modify_capabilities(
{60: macro}, {60: macro},
@ -141,7 +141,7 @@ class TestInjector(unittest.TestCase):
# path is from the fixtures # path is from the fixtures
custom_mapping.change(Key(EV_KEY, 10, 1), 'a') custom_mapping.change(Key(EV_KEY, 10, 1), 'a')
self.injector = KeycodeInjector('device 1', custom_mapping) self.injector = Injector('device 1', custom_mapping)
path = '/dev/input/event10' path = '/dev/input/event10'
# this test needs to pass around all other constraints of # this test needs to pass around all other constraints of
# _prepare_device # _prepare_device
@ -155,7 +155,7 @@ class TestInjector(unittest.TestCase):
self.make_it_fail = 10 self.make_it_fail = 10
custom_mapping.change(Key(EV_KEY, 10, 1), 'a') custom_mapping.change(Key(EV_KEY, 10, 1), 'a')
self.injector = KeycodeInjector('device 1', custom_mapping) self.injector = Injector('device 1', custom_mapping)
path = '/dev/input/event10' path = '/dev/input/event10'
device, abs_to_rel = self.injector._prepare_device(path) device, abs_to_rel = self.injector._prepare_device(path)
self.assertFalse(abs_to_rel) self.assertFalse(abs_to_rel)
@ -171,7 +171,7 @@ class TestInjector(unittest.TestCase):
def test_prepare_device_1(self): def test_prepare_device_1(self):
# according to the fixtures, /dev/input/event30 can do ABS_HAT0X # according to the fixtures, /dev/input/event30 can do ABS_HAT0X
custom_mapping.change(Key(EV_ABS, ABS_HAT0X, 1), 'a') custom_mapping.change(Key(EV_ABS, ABS_HAT0X, 1), 'a')
self.injector = KeycodeInjector('foobar', custom_mapping) self.injector = Injector('foobar', custom_mapping)
_prepare_device = self.injector._prepare_device _prepare_device = self.injector._prepare_device
self.assertIsNone(_prepare_device('/dev/input/event10')[0]) self.assertIsNone(_prepare_device('/dev/input/event10')[0])
@ -179,13 +179,13 @@ class TestInjector(unittest.TestCase):
def test_prepare_device_non_existing(self): def test_prepare_device_non_existing(self):
custom_mapping.change(Key(EV_ABS, ABS_HAT0X, 1), 'a') custom_mapping.change(Key(EV_ABS, ABS_HAT0X, 1), 'a')
self.injector = KeycodeInjector('foobar', custom_mapping) self.injector = Injector('foobar', custom_mapping)
_prepare_device = self.injector._prepare_device _prepare_device = self.injector._prepare_device
self.assertIsNone(_prepare_device('/dev/input/event1234')[0]) self.assertIsNone(_prepare_device('/dev/input/event1234')[0])
def test_gamepad_capabilities(self): def test_gamepad_capabilities(self):
self.injector = KeycodeInjector('gamepad', custom_mapping) self.injector = Injector('gamepad', custom_mapping)
path = '/dev/input/event30' path = '/dev/input/event30'
device, abs_to_rel = self.injector._prepare_device(path) device, abs_to_rel = self.injector._prepare_device(path)
@ -210,7 +210,7 @@ class TestInjector(unittest.TestCase):
def test_adds_ev_key(self): def test_adds_ev_key(self):
# for some reason, having any EV_KEY capability is needed to # for some reason, having any EV_KEY capability is needed to
# be able to control the mouse. it probably wants the mouse click. # be able to control the mouse. it probably wants the mouse click.
self.injector = KeycodeInjector('gamepad 2', custom_mapping) self.injector = Injector('gamepad 2', custom_mapping)
"""gamepad without any existing key capability""" """gamepad without any existing key capability"""
@ -263,7 +263,7 @@ class TestInjector(unittest.TestCase):
def test_skip_unused_device(self): def test_skip_unused_device(self):
# skips a device because its capabilities are not used in the mapping # skips a device because its capabilities are not used in the mapping
custom_mapping.change(Key(EV_KEY, 10, 1), 'a') custom_mapping.change(Key(EV_KEY, 10, 1), 'a')
self.injector = KeycodeInjector('device 1', custom_mapping) self.injector = Injector('device 1', custom_mapping)
path = '/dev/input/event11' path = '/dev/input/event11'
device, abs_to_rel = self.injector._prepare_device(path) device, abs_to_rel = self.injector._prepare_device(path)
self.assertFalse(abs_to_rel) self.assertFalse(abs_to_rel)
@ -272,7 +272,7 @@ class TestInjector(unittest.TestCase):
def test_skip_unknown_device(self): def test_skip_unknown_device(self):
# skips a device because its capabilities are not used in the mapping # skips a device because its capabilities are not used in the mapping
self.injector = KeycodeInjector('device 1', custom_mapping) self.injector = Injector('device 1', custom_mapping)
path = '/dev/input/event11' path = '/dev/input/event11'
device, _ = self.injector._prepare_device(path) device, _ = self.injector._prepare_device(path)
@ -329,7 +329,7 @@ class TestInjector(unittest.TestCase):
InputEvent(EV_ABS, rel_y, -y), InputEvent(EV_ABS, rel_y, -y),
] ]
self.injector = KeycodeInjector('gamepad', custom_mapping) self.injector = Injector('gamepad', custom_mapping)
self.injector.start_injecting() self.injector.start_injecting()
# wait for the injector to start sending, at most 1s # wait for the injector to start sending, at most 1s
@ -400,7 +400,7 @@ class TestInjector(unittest.TestCase):
InputEvent(3124, 3564, 6542), InputEvent(3124, 3564, 6542),
] ]
self.injector = KeycodeInjector('device 2', custom_mapping) self.injector = Injector('device 2', custom_mapping)
self.injector.start_injecting() self.injector.start_injecting()
uinput_write_history_pipe[0].poll(timeout=1) uinput_write_history_pipe[0].poll(timeout=1)
@ -500,7 +500,7 @@ class TestInjector(unittest.TestCase):
InputEvent(*d_up), InputEvent(*d_up),
] ]
self.injector = KeycodeInjector('gamepad', custom_mapping) self.injector = Injector('gamepad', custom_mapping)
# the injector will otherwise skip the device because # the injector will otherwise skip the device because
# the capabilities don't contain EV_TYPE # the capabilities don't contain EV_TYPE
@ -536,7 +536,7 @@ class TestInjector(unittest.TestCase):
ev_3 = (EV_KEY, 43, 1) ev_3 = (EV_KEY, 43, 1)
# a combination # a combination
mapping.change(Key(ev_1, ev_2, ev_3), 'k(a)') mapping.change(Key(ev_1, ev_2, ev_3), 'k(a)')
self.injector = KeycodeInjector('device 1', mapping) self.injector = Injector('device 1', mapping)
history = [] history = []
@ -576,7 +576,7 @@ class TestInjector(unittest.TestCase):
system_mapping._set('a', 51) system_mapping._set('a', 51)
system_mapping._set('b', 52) system_mapping._set('b', 52)
injector = KeycodeInjector('device 1', mapping) injector = Injector('device 1', mapping)
self.assertEqual(injector._key_to_code.get((ev_1,)), 51) self.assertEqual(injector._key_to_code.get((ev_1,)), 51)
# permutations to make matching combinations easier # permutations to make matching combinations easier
self.assertEqual(injector._key_to_code.get((ev_2, ev_3, ev_4)), 52) self.assertEqual(injector._key_to_code.get((ev_2, ev_3, ev_4)), 52)

Loading…
Cancel
Save