started working on the daemon

pull/14/head
sezanzeb 4 years ago
parent 437cb533aa
commit 7f1d26aee7

@ -25,21 +25,16 @@
import sys
import atexit
import getpass
import dbus.mainloop.glib
from dbus.mainloop.glib import DBusGMainLoop
from argparse import ArgumentParser
from keymapper.logger import logger, update_verbosity, log_info
from keymapper.config import iterate_autoload_presets
from keymapper.injector import KeycodeInjector
from keymapper.mapping import Mapping
injectors = {}
import gi
gi.require_version('GLib', '2.0')
from gi.repository import GLib
def stop():
"""Properly stop the daemon."""
for injector in injectors:
injector.stop_injecting()
from keymapper.logger import logger, update_verbosity, log_info
from keymapper.daemon import Daemon
if __name__ == '__main__':
@ -54,18 +49,15 @@ if __name__ == '__main__':
update_verbosity(options.debug)
log_info()
atexit.register(stop)
if getpass.getuser() != 'root' and 'unittest' not in sys.modules.keys():
logger.warn('Without sudo, your devices may not be visible')
logger.warning('Without sudo, your devices may not be visible')
session_bus = dbus.SessionBus(mainloop=DBusGMainLoop())
name = dbus.service.BusName('com.keymapper.control', session_bus)
daemon = Daemon(session_bus, '/')
for device, preset in iterate_autoload_presets():
mapping = Mapping()
mapping.load(device, preset)
# TODO keycode injector needs a mapping param,
# TODO the single custom_mapping only for the UI, because the
# service has multiple
injectors[device] = KeycodeInjector(device, mapping)
atexit.register(daemon.stop)
# TODO Dbus server wait for the UI to request applying a different preset
# or disabling the injection for a specific device.
logger.info('Daemon running, waiting for dbus messages')
mainloop = GLib.MainLoop()
mainloop.run()

@ -0,0 +1,78 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# key-mapper - GUI for device specific keyboard mappings
# Copyright (C) 2020 sezanzeb <proxima@hip70890b.de>
#
# This file is part of key-mapper.
#
# key-mapper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# key-mapper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with key-mapper. If not, see <https://www.gnu.org/licenses/>.
"""Starts injecting keycodes based on the configuration."""
from dbus import service
import dbus.mainloop.glib
from keymapper.logger import logger
from keymapper.config import config
from keymapper.injector import KeycodeInjector
from keymapper.mapping import Mapping
class Daemon(service.Object):
def __init__(self, *args, **kwargs):
"""Constructs the daemon. You still need to run the GLib mainloop."""
self.injectors = {}
for device, preset in config.iterate_autoload_presets():
mapping = Mapping()
mapping.load(device, preset)
self.injectors[device] = KeycodeInjector(device, mapping)
super().__init__(*args, **kwargs)
@dbus.service.method(
'com.keymapper.control',
in_signature='s'
)
def stop_injecting(self, device):
"""Stop injecting the mapping for a single device."""
if self.injectors.get(device) is None:
logger.error('No injector running for device %s', device)
return
self.injectors[device].stop_injecting()
# TODO if ss is the correct signature for multiple parameters, add an
# example to https://gitlab.freedesktop.org/dbus/dbus-python/-/blob/master/doc/tutorial.txt # noqa
@dbus.service.method(
'com.keymapper.control',
in_signature='ss'
)
def start_injecting(self, device, preset):
"""Start injecting the preset for the device."""
if self.injectors.get(device) is not None:
self.injectors[device].stop_injecting()
mapping = Mapping()
mapping.load(device, preset)
self.injectors[device] = KeycodeInjector(device, mapping)
@dbus.service.method(
'com.keymapper.control'
)
def stop(self):
"""Properly stop the daemon."""
for injector in self.injectors.values():
injector.stop_injecting()

@ -88,7 +88,7 @@ def _modify_capabilities(device):
return capabilities
def _start_injecting_worker(path, pipe):
def _start_injecting_worker(path, pipe, mapping):
"""Inject keycodes for one of the virtual devices.
Parameters
@ -140,7 +140,7 @@ def _start_injecting_worker(path, pipe):
# than the ones reported by xev and that X expects
input_keycode = event.code + 8
character = custom_mapping.get_character(input_keycode)
character = mapping.get_character(input_keycode)
if character is None:
# unknown keycode, forward it
@ -213,7 +213,7 @@ def ensure_numlock(func):
class KeycodeInjector:
"""Keeps injecting keycodes in the background based on the mapping."""
@ensure_numlock
def __init__(self, device):
def __init__(self, device, mapping=custom_mapping):
"""Start injecting keycodes based on custom_mapping."""
self.device = device
self.virtual_devices = []
@ -232,7 +232,7 @@ class KeycodeInjector:
pipe = multiprocessing.Pipe()
worker = multiprocessing.Process(
target=_start_injecting_worker,
args=(path, pipe[1])
args=(path, pipe[1], mapping)
)
worker.start()
# wait for the process to notify creation of the new injection
@ -249,7 +249,7 @@ class KeycodeInjector:
@ensure_numlock
def stop_injecting(self):
"""Stop injecting keycodes."""
logger.info('Stopping injecting keycodes')
logger.info('Stopping injecting keycodes for device %s', self.device)
for i, process in enumerate(self.processes):
if process is None:
continue

@ -25,6 +25,7 @@
import sys
import time
import unittest
import multiprocessing
import evdev
@ -33,6 +34,8 @@ from keymapper.logger import update_verbosity
tmp = '/tmp/key-mapper-test'
uinput_write_history = []
# for tests that makes the injector create its processes
uinput_write_history_pipe = multiprocessing.Pipe()
pending_events = {}
# key-mapper is only interested in devices that have EV_KEY, add some
@ -159,7 +162,9 @@ def patch_evdev():
pass
def write(self, type, code, value):
uinput_write_history.append(Event(type, code, value))
event = Event(type, code, value)
uinput_write_history.append(event)
uinput_write_history_pipe[1].send(event)
def syn(self):
pass

@ -0,0 +1,91 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# key-mapper - GUI for device specific keyboard mappings
# Copyright (C) 2020 sezanzeb <proxima@hip70890b.de>
#
# This file is part of key-mapper.
#
# key-mapper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# key-mapper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with key-mapper. If not, see <https://www.gnu.org/licenses/>.
import unittest
import time
import evdev
from keymapper.state import custom_mapping, system_mapping
from keymapper.config import config
from keymapper.daemon import Daemon
from test import uinput_write_history_pipe, Event, pending_events
class TestDaemon(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.grab = evdev.InputDevice.grab
cls.daemon = None
def tearDown(self):
# avoid race conditions with other tests, daemon may run processes
if self.daemon is not None:
self.daemon.stop()
self.daemon = None
evdev.InputDevice.grab = self.grab
def test_daemon(self):
custom_mapping.change(9, 'a')
# one mapping that is unknown in the system_mapping on purpose
custom_mapping.change(10, 'b')
system_mapping.empty()
system_mapping.change(100, 'a')
custom_mapping.save('device 2', 'foo')
config.set_autoload_preset('device 2', 'foo')
pending_events['device 2'] = [
Event(evdev.events.EV_KEY, 1, 0),
Event(evdev.events.EV_KEY, 1, 1),
# ignored because unknown to the system
Event(evdev.events.EV_KEY, 2, 0),
Event(evdev.events.EV_KEY, 2, 1),
# just pass those over without modifying
Event(3124, 3564, 6542),
]
self.daemon = Daemon()
time.sleep(0.5)
write_history = []
pipe = uinput_write_history_pipe[0]
while pipe.poll():
write_history.append(pipe.recv())
self.assertEqual(write_history[0].type, evdev.events.EV_KEY)
self.assertEqual(write_history[0].code, 92)
self.assertEqual(write_history[0].value, 0)
self.assertEqual(write_history[1].type, evdev.events.EV_KEY)
self.assertEqual(write_history[1].code, 92)
self.assertEqual(write_history[1].value, 1)
self.assertEqual(write_history[2].type, 3124)
self.assertEqual(write_history[2].code, 3564)
self.assertEqual(write_history[2].value, 6542)
if __name__ == "__main__":
unittest.main()

@ -146,7 +146,8 @@ class TestInjector(unittest.TestCase):
_start_injecting_worker(
path=device['paths'][0],
pipe=FakePipe()
pipe=FakePipe(),
mapping=custom_mapping
)
self.assertEqual(uinput_write_history[0].type, evdev.events.EV_KEY)

Loading…
Cancel
Save