fixed all tests

xkb
sezanzeb 4 years ago committed by sezanzeb
parent 661f56d043
commit 3e86e5c1b8

@ -39,6 +39,9 @@ INITIAL_CONFIG = {
# some time between keystrokes might be required for them to be
# detected properly in software.
'keystroke_sleep_ms': 10
},
'gamepad': {
'non_linearity': 4
}
}
@ -48,20 +51,74 @@ class _Config:
self._config = {}
self.load_config()
def set_autoload_preset(self, device, preset, load=True):
"""Set a preset to be automatically applied on start."""
if self._config.get('autoload') is None:
self._config['autoload'] = {}
if load:
self._config['autoload'][device] = preset
elif self._config['autoload'].get(device) is not None:
del self._config['autoload'][device]
def get_keystroke_sleep(self):
"""Get the seconds of sleep between key down and up events."""
macros = self._config.get('macros', {})
return macros.get('keystroke_sleep_ms', 10)
def _resolve(self, path, func):
"""Call func for the given config value."""
chunks = path.split('.')
child = self._config
while True:
chunk = chunks.pop(0)
parent = child
child = child.get(chunk)
if len(chunks) == 0:
# child is the value _resolve is looking for
return func(parent, child, chunk)
else:
# child is another object
if child is None:
parent[chunk] = {}
child = parent[chunk]
def remove(self, path):
"""Remove a config key.
Parameters
----------
path : string
For example 'macros.keystroke_sleep_ms'
"""
def do(parent, child, chunk):
if child is not None:
del parent[chunk]
self._resolve(path, do)
def set(self, path, value):
"""Set a config key.
Parameters
----------
path : string
For example 'macros.keystroke_sleep_ms'
value : any
"""
def do(parent, child, chunk):
parent[chunk] = value
self._resolve(path, do)
def get(self, path, default=None):
"""Get a config value.
Parameters
----------
path : string
For example 'macros.keystroke_sleep_ms'
"""
return self._resolve(path, lambda parent, child, chunk: child)
def set_autoload_preset(self, device, preset):
"""Set a preset to be automatically applied on start.
Parameters
----------
device : string
preset : string or None
if None, don't autoload something for this device
"""
if preset is not None:
self.set(f'autoload.{device}', preset)
else:
self.remove(f'autoload.{device}')
def iterate_autoload_presets(self):
"""Get tuples of (device, preset)."""
@ -69,15 +126,7 @@ class _Config:
def is_autoloaded(self, device, preset):
"""Should this preset be loaded automatically?"""
autoload_map = self._config.get('autoload')
if autoload_map is None:
return False
autoload_preset = autoload_map.get(device)
if autoload_preset is None:
return False
return autoload_preset == preset
return self.get(f'autoload.{device}') == preset
def load_config(self):
"""Load the config from the file system."""

@ -31,6 +31,7 @@ import multiprocessing
import evdev
from keymapper.logger import logger
from keymapper.config import config
from keymapper.getdevices import get_devices
from keymapper.state import system_mapping, KEYCODE_OFFSET
from keymapper.dev.macros import parse
@ -266,6 +267,51 @@ class KeycodeInjector:
value
)
async def spam_mouse_movements(self, keymapper_device):
"""Keep writing mouse movements based on the gamepad stick position."""
# TODO get absinfo beforehand
max_value = 32767
max_speed = ((max_value ** 2) * 2) ** 0.5
while True:
await asyncio.sleep(1 / 60)
abs_y = self.abs_y
abs_x = self.abs_x
non_linearity = config.get('gamepad.non_linearity', 4)
if non_linearity != 1:
# to make small movements smaller for more precision
speed = (abs_x ** 2 + abs_y ** 2) ** 0.5
factor = (speed / max_speed) ** non_linearity
else:
factor = 1
rel_x = abs_x * factor * 80 / max_value
rel_y = abs_y * factor * 80 / max_value
self.pending_x_rel += rel_x
self.pending_y_rel += rel_y
rel_x = int(self.pending_x_rel)
rel_y = int(self.pending_y_rel)
self.pending_x_rel -= rel_x
self.pending_y_rel -= rel_y
if rel_y != 0:
self._write(
keymapper_device,
evdev.ecodes.EV_REL,
evdev.ecodes.ABS_Y,
rel_y
)
if rel_x != 0:
self._write(
keymapper_device,
evdev.ecodes.EV_REL,
evdev.ecodes.ABS_X,
rel_x
)
async def _injection_loop(self, device, keymapper_device):
"""Inject keycodes for one of the virtual devices.
@ -301,46 +347,7 @@ class KeycodeInjector:
self.pending_x_rel = 0
self.pending_y_rel = 0
async def spam_mouse_movements():
# TODO get absinfo beforehand
max_value = 32767
max_speed = ((max_value ** 2) * 2) ** 0.5
while True:
await asyncio.sleep(1 / 60)
abs_y = self.abs_y
abs_x = self.abs_x
# to make small movements smaller for more precision
speed = (abs_x ** 2 + abs_y ** 2) ** 0.5
non_linearity = 4
factor = (speed / max_speed) ** non_linearity
rel_x = abs_x * factor * 80 / max_value
rel_y = abs_y * factor * 80 / max_value
self.pending_x_rel += rel_x
self.pending_y_rel += rel_y
rel_x = int(self.pending_x_rel)
rel_y = int(self.pending_y_rel)
self.pending_x_rel -= rel_x
self.pending_y_rel -= rel_y
self._write(
keymapper_device,
evdev.ecodes.EV_REL,
evdev.ecodes.ABS_Y,
rel_y
)
self._write(
keymapper_device,
evdev.ecodes.EV_REL,
evdev.ecodes.ABS_X,
rel_x
)
asyncio.ensure_future(spam_mouse_movements())
asyncio.ensure_future(self.spam_mouse_movements(keymapper_device))
async for event in device.async_read_loop():
if self.map_abs_to_rel() and event.type == evdev.ecodes.EV_ABS:

@ -111,7 +111,7 @@ class _Macro:
def add_keycode_pause(self):
"""To add a pause between keystrokes."""
sleeptime = config.get_keystroke_sleep() / 1000
sleeptime = config.get('macros.keystroke_sleep_ms', 10) / 1000
async def sleep():
await asyncio.sleep(sleeptime)

@ -129,6 +129,7 @@ class Window:
"""Safely close the application."""
for timeout in self.timeouts:
GLib.source_remove(timeout)
self.timeouts = []
keycode_reader.stop_reading()
Gtk.main_quit()
@ -301,7 +302,7 @@ class Window:
"""Load the preset automatically next time the user logs in."""
device = self.selected_device
preset = self.selected_preset
config.set_autoload_preset(device, preset, active)
config.set_autoload_preset(device, preset if active else None)
config.save_config()
def on_select_device(self, dropdown):

@ -30,6 +30,20 @@ class TestConfig(unittest.TestCase):
self.assertEqual(len(config.iterate_autoload_presets()), 0)
config.save_config()
def test_basic(self):
config.set('a', 1)
self.assertEqual(config.get('a'), 1)
config.remove('a')
config.set('a.b', 2)
self.assertEqual(config.get('a.b'), 2)
self.assertEqual(config._config['a']['b'], 2)
config.remove('a.b')
config.set('a.b.c', 3)
self.assertEqual(config.get('a.b.c'), 3)
self.assertEqual(config._config['a']['b']['c'], 3)
def test_autoload(self):
del config._config['autoload']
self.assertEqual(len(config.iterate_autoload_presets()), 0)
@ -41,7 +55,7 @@ class TestConfig(unittest.TestCase):
self.assertTrue(config.is_autoloaded('d1', 'a'))
self.assertFalse(config.is_autoloaded('d2', 'b'))
config.set_autoload_preset('d2', 'b', True)
config.set_autoload_preset('d2', 'b')
self.assertEqual(len(config.iterate_autoload_presets()), 2)
self.assertTrue(config.is_autoloaded('d1', 'a'))
self.assertTrue(config.is_autoloaded('d2', 'b'))
@ -56,7 +70,7 @@ class TestConfig(unittest.TestCase):
[('d1', 'a'), ('d2', 'c')]
)
config.set_autoload_preset('d2', 'foo', False)
config.set_autoload_preset('d2', None)
self.assertTrue(config.is_autoloaded('d1', 'a'))
self.assertFalse(config.is_autoloaded('d2', 'b'))
self.assertFalse(config.is_autoloaded('d2', 'c'))

@ -180,6 +180,8 @@ class TestInjector(unittest.TestCase):
event = uinput_write_history_pipe[0].recv()
history.append((event.type, event.code, event.value))
# 4 events for the macro
# 3 for non-macros
self.assertEqual(len(history), 7)
# since the macro takes a little bit of time to execute, its

@ -57,7 +57,8 @@ class TestMacros(unittest.TestCase):
repeats = 20
macro = f'r({repeats}, k(k))'
self.loop.run_until_complete(parse(macro, self.handler).run())
sleep_time = 2 * repeats * config.get_keystroke_sleep() / 1000
keystroke_sleep = config.get('macros.keystroke_sleep_ms', 10)
sleep_time = 2 * repeats * keystroke_sleep / 1000
self.assertGreater(time.time() - start, sleep_time * 0.9)
self.assertLess(time.time() - start, sleep_time * 1.1)
self.assertListEqual(self.result, [('k', 1), ('k', 0)] * repeats)
@ -67,7 +68,7 @@ class TestMacros(unittest.TestCase):
macro = 'r(3, k(m).w(100))'
self.loop.run_until_complete(parse(macro, self.handler).run())
keystroke_time = 6 * config.get_keystroke_sleep()
keystroke_time = 6 * config.get('macros.keystroke_sleep_ms', 10)
total_time = keystroke_time + 300
total_time /= 1000
@ -96,7 +97,7 @@ class TestMacros(unittest.TestCase):
self.loop.run_until_complete(parse(macro, self.handler).run())
num_pauses = 8 + 6 + 4
keystroke_time = num_pauses * config.get_keystroke_sleep()
keystroke_time = num_pauses * config.get('macros.keystroke_sleep_ms', 10)
wait_time = 220
total_time = (keystroke_time + wait_time) / 1000

Loading…
Cancel
Save