#197 Lazy loading groups and system_mapping

This commit is contained in:
sezanzeb 2021-11-21 21:45:02 +01:00
parent d3c955fe04
commit 0017531b22
12 changed files with 69 additions and 44 deletions

View File

@ -216,7 +216,7 @@ if __name__ == '__main__':
default=False
)
parser.add_argument(
'-n', '--symbol-names', action='store_true', dest='key_names',
'--symbol-names', action='store_true', dest='key_names',
help='Print all available names for the mapping',
default=False
)
@ -233,8 +233,8 @@ if __name__ == '__main__':
options = parser.parse_args(sys.argv[1:])
if options.debug:
add_filehandler('/var/log/key-mapper-control')
update_verbosity(True)
add_filehandler('/var/log/key-mapper-control')
if options.version:
log_info()
@ -253,4 +253,5 @@ if __name__ == '__main__':
else:
utils(options)
logger.info('Finished')
if options.command:
logger.info('Finished')

View File

@ -22,6 +22,7 @@
"""Starts injecting keycodes based on the configuration."""
import os
import sys
from argparse import ArgumentParser

View File

@ -44,6 +44,7 @@ from keymapper.config import config
from keymapper.system_mapping import system_mapping
from keymapper.groups import groups
from keymapper.paths import get_config_path, USER
from keymapper.injection.macros.macro import macro_variables
BUS_NAME = "keymapper.Control"
@ -169,11 +170,18 @@ class Daemon:
if USER != "root":
self.set_config_dir(get_config_path())
# check privileges
if os.getuid() != 0:
logger.warn("The service usually needs elevated privileges")
self.autoload_history = AutoloadHistory()
self.refreshed_devices_at = 0
atexit.register(self.stop_all)
# initialize stuff that is needed alongside the daemon process
macro_variables.start()
@classmethod
def connect(cls, fallback=True):
"""Get an interface to start and stop injecting keystrokes.
@ -257,12 +265,16 @@ class Daemon:
now = time.time()
if now - 10 > self.refreshed_devices_at:
logger.debug("Refreshing because last info is too old")
# it may take a little bit of time until devices are visible after
# changes
time.sleep(0.1)
groups.refresh()
self.refreshed_devices_at = now
return
if not groups.find(key=group_key):
logger.debug('Refreshing because "%s" is unknown', group_key)
time.sleep(0.1)
groups.refresh()
self.refreshed_devices_at = now

View File

@ -405,21 +405,21 @@ class _Groups:
"""Contains and manages all groups."""
def __init__(self):
self._groups = {}
self._find_groups()
self._groups = None
def __getattribute__(self, key):
"""To lazy load group info only when needed.
For example, this helps to keep of key-mapper-control clear when it doesnt
need it the information.
"""
if key == "_groups" and object.__getattribute__(self, "_groups") is None:
object.__setattr__(self, "_groups", {})
object.__getattribute__(self, "refresh")()
return object.__getattribute__(self, key)
def refresh(self):
"""This can be called to discover new devices.
Only call this if appropriate permissions are available, otherwise
the object may be empty afterwards.
"""
# it may take a little bit of time until devices are visible after
# changes
time.sleep(0.1)
return self._find_groups()
def _find_groups(self):
"""Look for devices and group them together.
Since this needs to do some stuff with /dev and spawn processes the

0
keymapper/gui/window.py Executable file → Normal file
View File

View File

@ -35,7 +35,6 @@ from keymapper.mapping import DISABLE_CODE
from keymapper.injection.context import Context
from keymapper.injection.numlock import set_numlock, is_numlock_on, ensure_numlock
from keymapper.injection.consumer_control import ConsumerControl
from keymapper.injection.macros.macro import macro_variables
DEV_NAME = "key-mapper"
@ -89,7 +88,7 @@ class Injector(multiprocessing.Process):
regrab_timeout = 0.2
def __init__(self, group, mapping):
"""Setup a process to start injecting keycodes based on custom_mapping.
"""
Parameters
----------
@ -109,8 +108,6 @@ class Injector(multiprocessing.Process):
self._consumer_controls = []
macro_variables.start()
super().__init__()
"""Functions to interact with the running process"""

View File

@ -223,7 +223,7 @@ def add_filehandler(log_path=LOG_PATH):
file_handler = logging.FileHandler(log_path)
file_handler.setFormatter(Formatter())
logger.info('Logging to "%s"', log_path)
logger.info('%d Logging to "%s"', os.getpid(), log_path)
logger.addHandler(file_handler)
except PermissionError:

View File

@ -28,8 +28,9 @@ import subprocess
import evdev
from keymapper.logger import logger
from keymapper.mapping import Mapping, DISABLE_NAME, DISABLE_CODE
from keymapper.paths import get_config_path, touch, USER
from keymapper.mapping import DISABLE_NAME, DISABLE_CODE
from keymapper.paths import get_config_path, touch
from keymapper.utils import is_service
# xkb uses keycodes that are 8 higher than those from evdev
@ -43,10 +44,21 @@ class SystemMapping:
def __init__(self):
"""Construct the system_mapping."""
self._mapping = {}
self._mapping = None
self._xmodmap = {}
self._case_insensitive_mapping = {}
self.populate()
def __getattribute__(self, key):
"""To lazy load system_mapping info only when needed.
For example, this helps to keep of key-mapper-control clear when it doesnt
need it the information.
"""
if key == "_mapping" and object.__getattribute__(self, "_mapping") is None:
object.__setattr__(self, "_mapping", {})
object.__getattribute__(self, "populate")()
return object.__getattribute__(self, key)
def list_names(self):
"""Return an array of all possible names in the mapping."""
@ -77,9 +89,10 @@ class SystemMapping:
# might be within a tty
logger.info("Optional `xmodmap` command not found. This is not critical.")
if USER != "root":
# write this stuff into the key-mapper config directory, because
# the systemd service won't know the user sessions xmodmap
if not is_service():
# Clients usually take care of that, don't let the service do funny things.
# Write this stuff into the key-mapper config directory, because
# the systemd service won't know the user sessions xmodmap.
path = get_config_path(XMODMAP_FILENAME)
touch(path)
with open(path, "w") as file:
@ -103,8 +116,12 @@ class SystemMapping:
mapping : dict
maps from name to code. Make sure your keys are lowercase.
"""
len_before = len(self._mapping)
for name, code in mapping.items():
self._set(name, code)
logger.debug(
"Updated keycodes with %d new ones", len(self._mapping) - len_before
)
def _set(self, name, code):
"""Map name to code."""

View File

@ -23,6 +23,7 @@
import math
import sys
import evdev
from evdev.ecodes import (
@ -218,7 +219,7 @@ def get_abs_range(device, code=ABS_X):
]
if len(absinfo) == 0:
logger.error(
logger.warn(
'Failed to get ABS info of "%s" for key %d: %s', device, code, capabilities
)
return None
@ -234,3 +235,7 @@ def get_max_abs(device, code=ABS_X):
"""
abs_range = get_abs_range(device, code)
return abs_range and abs_range[1]
def is_service():
return sys.argv[0].endswith("key-mapper-service")

View File

@ -17,7 +17,7 @@
<text x="22.0" y="14">pylint</text>
</g>
<g fill="#fff" text-anchor="middle" font-family="DejaVu Sans,Verdana,Geneva,sans-serif" font-size="11">
<text x="63.0" y="15" fill="#010101" fill-opacity=".3">9.61</text>
<text x="62.0" y="14">9.61</text>
<text x="63.0" y="15" fill="#010101" fill-opacity=".3">9.54</text>
<text x="62.0" y="14">9.54</text>
</g>
</svg>

Before

Width:  |  Height:  |  Size: 1.0 KiB

After

Width:  |  Height:  |  Size: 1.0 KiB

View File

@ -27,7 +27,7 @@ import subprocess
import json
import evdev
from evdev.ecodes import EV_KEY, EV_ABS
from evdev.ecodes import EV_KEY, EV_ABS, KEY_B, KEY_A
from gi.repository import Gtk
from pydbus import SystemBus
@ -151,8 +151,6 @@ class TestDaemon(unittest.TestCase):
ev_1 = (EV_KEY, 9)
ev_2 = (EV_ABS, 12)
keycode_to_1 = 100
keycode_to_2 = 101
group = groups.find(name="Bar Device")
@ -162,12 +160,6 @@ class TestDaemon(unittest.TestCase):
custom_mapping.change(Key(*ev_1, 1), "a")
custom_mapping.change(Key(*ev_2, -1), "b")
system_mapping.clear()
# since this is in the same memory as the daemon, there is no need
# to save it to disk
system_mapping._set("a", keycode_to_1)
system_mapping._set("b", keycode_to_2)
preset = "foo"
custom_mapping.save(group.get_preset_path(preset))
@ -218,7 +210,7 @@ class TestDaemon(unittest.TestCase):
event = uinput_write_history_pipe[0].recv()
self.assertEqual(event.type, EV_KEY)
self.assertEqual(event.code, keycode_to_2)
self.assertEqual(event.code, KEY_B)
self.assertEqual(event.value, 1)
def test_config_dir(self):
@ -237,7 +229,6 @@ class TestDaemon(unittest.TestCase):
os.remove(get_config_path("xmodmap.json"))
ev = (EV_KEY, 9)
keycode_to = 100
group_name = "9876 name"
# expected key of the group
@ -248,7 +239,7 @@ class TestDaemon(unittest.TestCase):
self.assertIsNone(group)
custom_mapping.change(Key(*ev, 1), "a")
system_mapping.clear()
system_mapping._set("a", keycode_to)
system_mapping._set("a", KEY_A)
# make the daemon load the file instead
with open(get_config_path("xmodmap.json"), "w") as file:
@ -283,7 +274,7 @@ class TestDaemon(unittest.TestCase):
self.assertTrue(uinput_write_history_pipe[0].poll())
event = uinput_write_history_pipe[0].recv()
self.assertEqual(event.t, (EV_KEY, keycode_to, 1))
self.assertEqual(event.t, (EV_KEY, KEY_A, 1))
self.daemon.stop_injecting(group_key)
self.assertEqual(self.daemon.get_state(group_key), STOPPED)

View File

@ -87,6 +87,7 @@ class TestSystemMapping(unittest.TestCase):
def test_system_mapping(self):
system_mapping = SystemMapping()
system_mapping.populate()
self.assertGreater(len(system_mapping._mapping), 100)
# this is case-insensitive