#91 #104 added set and ifeq macros

xkb
sezanzeb 3 years ago committed by sezanzeb
parent 8daf55c196
commit 99bc679b82

@ -30,7 +30,7 @@ import evdev
from evdev.ecodes import EV_KEY, EV_REL
from keymapper.logger import logger
from keymapper.groups import classify, GAMEPAD, groups
from keymapper.groups import classify, GAMEPAD
from keymapper import utils
from keymapper.mapping import DISABLE_CODE
from keymapper.injection.keycode_mapper import KeycodeMapper

@ -21,7 +21,7 @@
"""Executes more complex patterns of keystrokes.
To keep it short on the UI, the available functions are one-letter long.
To keep it short on the UI, basic functions are one letter long.
The outermost macro (in the examples below the one created by 'r',
'r' and 'w') will be started, which triggers a chain reaction to execute
@ -37,7 +37,11 @@ w(1000).m(Shift_L, r(2, k(a))).w(10).k(b): <1s> A A <10ms> b
import asyncio
import re
import traceback
import copy
import multiprocessing
import atexit
import select
from evdev.ecodes import ecodes, EV_KEY, EV_REL, REL_X, REL_Y, REL_WHEEL, \
REL_HWHEEL
@ -46,6 +50,72 @@ from keymapper.logger import logger
from keymapper.state import system_mapping
class SharedDict:
"""Share a dictionary across processes."""
# because unittests terminate all child processes in cleanup I can't use
# multiprocessing.Manager
def __init__(self):
"""Create a shared dictionary."""
super().__init__()
self.pipe = multiprocessing.Pipe()
self.process = None
atexit.register(self._stop)
self._start()
def _start(self):
"""Ensure the process to manage the dictionary is running."""
if self.process is not None and self.process.is_alive():
return
# if the manager has already been running in the past but stopped
# for some reason, the dictionary contents are lost
self.process = multiprocessing.Process(target=self.manage)
self.process.start()
def manage(self):
"""Manage the dictionary, handle read and write requests."""
shared_dict = dict()
while True:
message = self.pipe[0].recv()
logger.spam('SharedDict got %s', message)
if message[0] == 'stop':
return
if message[0] == 'set':
shared_dict[message[1]] = message[2]
if message[0] == 'get':
self.pipe[0].send(shared_dict.get(message[1]))
def _stop(self):
"""Stop the managing process."""
self.pipe[1].send(('stop',))
def get(self, key):
"""Get a value from the dictionary."""
return self.__getitem__(key)
def __setitem__(self, key, value):
self.pipe[1].send(('set', key, value))
def __getitem__(self, key):
self.pipe[1].send(('get', key))
# to avoid blocking forever if something goes wrong
select.select([self.pipe[1]], [], [], 0.1)
if self.pipe[1].poll():
return self.pipe[1].recv()
return None
def __del__(self):
self._stop()
macro_variables = SharedDict()
def is_this_a_macro(output):
"""Figure out if this is a macro."""
if not isinstance(output, str):
@ -61,8 +131,14 @@ def is_this_a_macro(output):
class _Macro:
"""Supports chaining and preparing actions.
Calling functions on _Macro does not inject anything yet, it means that
once .run is used it will be executed along with all other queued tasks.
Calling functions like keycode on _Macro doesn't inject any events yet,
it means that once .run is used it will be executed along with all other
queued tasks.
Those functions need to construct an asyncio coroutine and append it to
self.tasks. This makes parameter checking during compile time possible.
Coroutines receive a handler as argument, which is a function that can be
used to inject input events into the system.
"""
def __init__(self, code, mapping):
"""Create a macro instance that can be populated with tasks.
@ -74,10 +150,13 @@ class _Macro:
mapping : Mapping
The preset object, needed for some config stuff
"""
self.tasks = []
self.code = code
self.mapping = mapping
# List of coroutines that will be called sequentially.
# This is the compiled code
self.tasks = []
# is a lock so that h() can be realized
self._holding_lock = asyncio.Lock()
@ -90,6 +169,8 @@ class _Macro:
}
self.child_macros = []
self.keystroke_sleep_ms = None
def is_holding(self):
"""Check if the macro is waiting for a key to be released."""
@ -120,6 +201,8 @@ class _Macro:
if self.running:
logger.error('Tried to run already running macro "%s"', self.code)
return
self.keystroke_sleep_ms = self.mapping.get('macros.keystroke_sleep_ms')
self.running = True
for task in self.tasks:
@ -154,30 +237,42 @@ class _Macro:
def hold(self, macro=None):
"""Loops the execution until key release."""
async def hold_block(_):
# wait until the key is released. Only then it will be
# able to acquire the lock. Release it right after so that
# it can be acquired by press_key again.
try:
await self._holding_lock.acquire()
self._holding_lock.release()
except RuntimeError as error:
# The specific bug in question has been fixed already,
# but lets keep this check here for the future. Not
# catching errors here causes the macro to never be
# released
logger.error('Failed h(): %s', error)
if macro is None:
# no parameters: block until released
async def task(_):
# wait until the key is released. Only then it will be
# able to acquire the lock. Release it right after so that
# it can be acquired by press_key again.
try:
await self._holding_lock.acquire()
self._holding_lock.release()
except RuntimeError as error:
# The specific bug in question has been fixed already,
# but lets keep this check here for the future. Not
# catching errors here causes the macro to never be
# released
logger.error('Failed h(): %s', error)
self.tasks.append(hold_block)
return
self.tasks.append(task)
else:
if not isinstance(macro, _Macro):
raise ValueError(
'Expected the param for h (hold) to be '
f'a macro (like k(a)), but got "{macro}"'
)
if not isinstance(macro, _Macro):
# if macro is a key name, hold down the key while the actual
# keyboard key is held down
symbol = str(macro)
code = system_mapping.get(symbol)
if code is None:
raise KeyError(f'Unknown key "{symbol}"')
self.capabilities[EV_KEY].add(code)
self.tasks.append(lambda handler: handler(EV_KEY, code, 1))
self.tasks.append(hold_block)
self.tasks.append(lambda handler: handler(EV_KEY, code, 0))
return
if isinstance(macro, _Macro):
# repeat the macro forever while the key is held down
async def task(handler):
while self.is_holding():
# run the child macro completely to avoid
@ -187,8 +282,6 @@ class _Macro:
self.tasks.append(task)
self.child_macros.append(macro)
return self
def modify(self, modifier, macro):
"""Do stuff while a modifier is activated.
@ -214,19 +307,18 @@ class _Macro:
self.child_macros.append(macro)
self.tasks.append(lambda handler: handler(EV_KEY, code, 1))
self.add_keycode_pause()
self.tasks.append(self._keycode_pause)
self.tasks.append(macro.run)
self.add_keycode_pause()
self.tasks.append(self._keycode_pause)
self.tasks.append(lambda handler: handler(EV_KEY, code, 0))
self.add_keycode_pause()
return self
self.tasks.append(self._keycode_pause)
def repeat(self, repeats, macro):
"""Repeat actions.
Parameters
----------
repeats : int
repeats : int or _Macro
macro : _Macro
"""
if not isinstance(macro, _Macro):
@ -243,21 +335,16 @@ class _Macro:
f'a number, but got "{repeats}"'
) from error
for _ in range(repeats):
self.tasks.append(macro.run)
async def repeat(handler):
for _ in range(repeats):
await macro.run(handler)
self.tasks.append(repeat)
self.child_macros.append(macro)
return self
def add_keycode_pause(self):
async def _keycode_pause(self, _=None):
"""To add a pause between keystrokes."""
sleeptime = self.mapping.get('macros.keystroke_sleep_ms') / 1000
async def sleep(_):
await asyncio.sleep(sleeptime)
self.tasks.append(sleep)
await asyncio.sleep(self.keystroke_sleep_ms / 1000)
def keycode(self, symbol):
"""Write the symbol."""
@ -269,11 +356,13 @@ class _Macro:
self.capabilities[EV_KEY].add(code)
self.tasks.append(lambda handler: handler(EV_KEY, code, 1))
self.add_keycode_pause()
self.tasks.append(lambda handler: handler(EV_KEY, code, 0))
self.add_keycode_pause()
return self
async def keycode(handler):
handler(EV_KEY, code, 1)
await self._keycode_pause()
handler(EV_KEY, code, 0)
await self._keycode_pause()
self.tasks.append(keycode)
def event(self, ev_type, code, value):
"""Write any event.
@ -304,9 +393,7 @@ class _Macro:
self.capabilities[ev_type].add(code)
self.tasks.append(lambda handler: handler(ev_type, code, value))
self.add_keycode_pause()
return self
self.tasks.append(self._keycode_pause)
def mouse(self, direction, speed):
"""Shortcut for h(e(...))."""
@ -350,7 +437,50 @@ class _Macro:
await asyncio.sleep(sleeptime)
self.tasks.append(sleep)
return self
def set(self, variable, value):
"""Set a variable to a certain value."""
async def set(_):
logger.debug('"%s" set to "%s"', variable, value)
macro_variables[variable] = value
self.tasks.append(set)
def ifeq(self, variable, value, then, otherwise=None):
"""Perform an equality check.
Parameters
----------
variable : string
value : string | number
then : any
otherwise : any
"""
if not isinstance(then, _Macro):
raise ValueError(
'Expected the third param for ifeq to be '
f'a macro (like k(a)), but got "{then}"'
)
if otherwise and not isinstance(otherwise, _Macro):
raise ValueError(
'Expected the fourth param for ifeq to be '
f'a macro (like k(a)), but got "{otherwise}"'
)
async def ifeq(handler):
set_value = macro_variables.get(variable)
logger.debug('"%s" is "%s"', variable, set_value)
if set_value == value:
await then.run(handler)
elif otherwise is not None:
await otherwise.run(handler)
self.child_macros.append(then)
if isinstance(otherwise, _Macro):
self.child_macros.append(otherwise)
self.tasks.append(ifeq)
def _extract_params(inner):
@ -423,12 +553,7 @@ def _parse_recurse(macro, mapping, macro_instance=None, depth=0):
A macro instance to add tasks to
depth : int
"""
# to anyone who knows better about compilers and thinks this is horrible:
# please make a pull request. Because it probably is.
# not using eval for security reasons ofc. And this syntax doesn't need
# string quotes for its params.
# If this gets more complicated than that I'd rather make a macro
# editor GUI and store them as json.
# not using eval for security reasons
assert isinstance(macro, str)
assert isinstance(depth, int)
@ -457,7 +582,9 @@ def _parse_recurse(macro, mapping, macro_instance=None, depth=0):
'w': (macro_instance.wait, 1, 1),
'h': (macro_instance.hold, 0, 1),
'mouse': (macro_instance.mouse, 2, 2),
'wheel': (macro_instance.wheel, 2, 2)
'wheel': (macro_instance.wheel, 2, 2),
'ifeq': (macro_instance.ifeq, 3, 4),
'set': (macro_instance.set, 2, 2),
}
function = functions.get(call)
@ -582,4 +709,6 @@ def parse(macro, mapping, return_errors=False):
return macro_object if not return_errors else None
except Exception as error:
logger.error('Failed to parse macro "%s": %s', macro, error.__repr__())
# print the traceback in case this is a bug of key-mapper
logger.debug(''.join(traceback.format_tb(error.__traceback__)).strip())
return str(error) if return_errors else None

@ -0,0 +1,67 @@
# Macros
This document contains examples for macros with explanations. You are very
welcome to contribute your examples as well if you have a special use-case
via a pull-request.
## The syntax
The system is very trivial and basic, lots of features known from other
scripting languages are missing.
Multiple functions are chained using `.`.
There are three datatypes for function parameters: Macro, string and number.
Unlike other programming languages, `qux(bar())` would not run `bar` and then
`qux`. Instead, `bar()` is an rvalue of type macro and only when `qux` is
called, the implementation of `qux` might decide to run `bar()`. That means
that reading a macro from left to right always yields the correct order of
operations. This is comparable to using lambda functions in python.
Strings don't need quotes. This makes macros look simpler, and I hope
this decision won't cause problems later when the macro system keeps advancing.
Keywords/names/strings available are either:
- variable names (used in `set` and `ifeq`)
- funcion names (like `r` or `mouse`)
- key names (like `a` or `BTN_LEFT`)
Whitespaces, newlines and tabs don't have any meaning and are removed
when the macro gets compiled.
## Combinations spanning multiple devices
**Keyboard:**
`space` -> `set(foo, bar).h(space).set(foo, 0)`
**Mouse:**
`middle` -> `ifeq(foo, bar, h(a), h(BTN_MIDDLE))`
Apply both presets.
If you press space on your keyboard, it will write a space exactly like
it used to.
If you hold down space and press the middle button of your mouse, it will
write "a" instead.
If you just press the middle button of your mouse it behaves like a regular
middle mouse button.
**Explanation:**
`h(space)` makes your key work exactly like it was mapped to "space".
It will inject a key-down event if you press it, does nothing as long your
hold your key down, and inject a key-up event after releasing.
`set(foo, 1).set(foo, 0)` sets "foo" to 1 and then sets "foo" to 0.
`set` and `ifeq` work on shared memory, so all injections will see your
variables.
Combine both to get a key that works like a normal key, but that also
works as a modifier for other keys of other devices
`ifeq(foo, bar, ..., ...)` runs the first param if foo is "bar", or the second
one if foo is not "bar".

@ -68,6 +68,9 @@ names can be chained using ` + `.
## Macros
For more advanced examples and more explanation of the syntax,
see [readme/macros.md](readme/macros.md)
It is possible to write timed macros into the center column:
- `r` repeats the execution of the second parameter
- `w` waits in milliseconds
@ -77,6 +80,11 @@ It is possible to write timed macros into the center column:
- `h` executes the parameter as long as the key is pressed down
- `.` executes two actions behind each other
- `mouse` and `wheel` take a direction like "up" and speed as parameters
- `set` set a variable to a value, visible to all injection processes
- `ifeq` if that variable is a certain value do something
The names for the most common functions are kept short, to make it easy to
write them into the constrained space.
Examples:
- `k(1).k(2)` 1, 2
@ -87,6 +95,8 @@ Examples:
- `mouse(right, 4)` which keeps moving the mouse while pressed.
Made out of `h(e(...))` internally
- `wheel(down, 1)` keeps scrolling down while held
- `set(foo, 1)` set "foo" to 1
- `ifeq(foo, 1, k(x), k(y))` if "foo" is 1, write x, otherwise y
Syntax errors are shown in the UI on save. Each `k` function adds a short
delay of 10ms between key-down, key-up and at the end. See
@ -132,9 +142,6 @@ Wayland than with X11 for me.
# Advanced
If you don't have a graphical user interface, you'll need to edit the
configuration files.
## How to use unavailable symbols
For example Japanese letters. Only works in X11.
@ -165,9 +172,13 @@ to write "ヤ" now when pressing the key.
## Configuration Files
The default configuration is stored at `~/.config/key-mapper/config.json`.
The current default configuration as of 0.8.1 looks like, with
an example autoload entry:
If you don't have a graphical user interface, you'll need to edit the
configuration files.
The default configuration is stored at `~/.config/key-mapper/config.json`,
which doesn't include any mappings, but rather other parameters that
are interesting for injections. The current default configuration as of 0.8.1
looks like, with an example autoload entry:
```json
{

@ -517,6 +517,7 @@ from keymapper.gui.reader import reader
from keymapper.groups import groups
from keymapper.state import system_mapping, custom_mapping
from keymapper.paths import get_config_path
from keymapper.injection.macros import macro_variables
from keymapper.injection.keycode_mapper import active_macros, unreleased
# no need for a high number in tests
@ -596,8 +597,15 @@ def quick_cleanup(log=True):
if device not in environ_copy:
del os.environ[device]
if not macro_variables.process.is_alive():
raise AssertionError('the SharedDict manager is not running anymore')
macro_variables._stop()
join_children()
macro_variables._start()
reader.clear()
for _, pipe in pending_events.values():

@ -22,6 +22,8 @@
import time
import unittest
import asyncio
import multiprocessing
import sys
from evdev.ecodes import EV_REL, EV_KEY, REL_Y, REL_X, REL_WHEEL, REL_HWHEEL
@ -202,14 +204,12 @@ class TestMacros(unittest.TestCase):
self.assertIsNone(error)
error = parse('m(asdf, k(a))', self.mapping, return_errors=True)
self.assertIsNotNone(error)
error = parse('h(a)', self.mapping, return_errors=True)
self.assertIn('macro', error)
self.assertIn('a', error)
error = parse('foo(a)', self.mapping, return_errors=True)
self.assertIn('unknown', error.lower())
self.assertIn('foo', error)
def test_hold(self):
# repeats k(a) as long as the key is held down
macro = parse('k(1).h(k(a)).k(3)', self.mapping)
self.assertSetEqual(macro.get_capabilities()[EV_KEY], {
system_mapping.get('1'),
@ -217,6 +217,8 @@ class TestMacros(unittest.TestCase):
system_mapping.get('3')
})
"""down"""
macro.press_key()
self.loop.run_until_complete(asyncio.sleep(0.05))
self.assertTrue(macro.is_holding())
@ -227,6 +229,8 @@ class TestMacros(unittest.TestCase):
self.assertTrue(macro.is_holding())
self.assertGreater(len(self.result), 2)
"""up"""
macro.release_key()
self.loop.run_until_complete(asyncio.sleep(0.05))
self.assertFalse(macro.is_holding())
@ -266,6 +270,8 @@ class TestMacros(unittest.TestCase):
system_mapping.get('3')
})
"""down"""
macro.press_key()
asyncio.ensure_future(macro.run(self.handler))
self.loop.run_until_complete(asyncio.sleep(0.1))
@ -275,6 +281,8 @@ class TestMacros(unittest.TestCase):
# doesn't do fancy stuff, is blocking until the release
self.assertEqual(len(self.result), 2)
"""up"""
macro.release_key()
self.loop.run_until_complete(asyncio.sleep(0.05))
self.assertFalse(macro.is_holding())
@ -304,6 +312,37 @@ class TestMacros(unittest.TestCase):
self.assertEqual(len(macro.child_macros), 0)
def test_hold_down(self):
# writes down and waits for the up event until the key is released
macro = parse('h(a)', self.mapping)
self.assertSetEqual(macro.get_capabilities()[EV_KEY], {
system_mapping.get('a'),
})
self.assertEqual(len(macro.child_macros), 0)
"""down"""
macro.press_key()
self.loop.run_until_complete(asyncio.sleep(0.05))
self.assertTrue(macro.is_holding())
asyncio.ensure_future(macro.run(self.handler))
macro.press_key() # redundantly calling doesn't break anything
self.loop.run_until_complete(asyncio.sleep(0.2))
self.assertTrue(macro.is_holding())
self.assertEqual(len(self.result), 1)
self.assertEqual(self.result[0], (EV_KEY, system_mapping.get('a'), 1))
"""up"""
macro.release_key()
self.loop.run_until_complete(asyncio.sleep(0.05))
self.assertFalse(macro.is_holding())
self.assertEqual(len(self.result), 2)
self.assertEqual(self.result[0], (EV_KEY, system_mapping.get('a'), 1))
self.assertEqual(self.result[1], (EV_KEY, system_mapping.get('a'), 0))
def test_2(self):
start = time.time()
repeats = 20
@ -518,6 +557,73 @@ class TestMacros(unittest.TestCase):
self.assertListEqual(self.result, [(5421, code, 154)])
self.assertEqual(len(macro.child_macros), 1)
def test_ifeq_runs(self):
macro = parse('set(foo, 2).ifeq(foo, 2, k(a), k(b))', self.mapping)
code_a = system_mapping.get('a')
code_b = system_mapping.get('b')
self.assertSetEqual(macro.get_capabilities()[EV_KEY], {code_a, code_b})
self.assertSetEqual(macro.get_capabilities()[EV_REL], set())
self.loop.run_until_complete(macro.run(self.handler))
self.assertListEqual(self.result, [
(EV_KEY, code_a, 1),
(EV_KEY, code_a, 0)
])
self.assertEqual(len(macro.child_macros), 2)
def test_ifeq_unknown_key(self):
macro = parse('ifeq(qux, 2, k(a), k(b))', self.mapping)
code_a = system_mapping.get('a')
code_b = system_mapping.get('b')
self.assertSetEqual(macro.get_capabilities()[EV_KEY], {code_a, code_b})
self.assertSetEqual(macro.get_capabilities()[EV_REL], set())
self.loop.run_until_complete(macro.run(self.handler))
self.assertListEqual(self.result, [
(EV_KEY, code_b, 1),
(EV_KEY, code_b, 0)
])
self.assertEqual(len(macro.child_macros), 2)
def test_ifeq_runs_multiprocessed(self):
macro = parse('ifeq(foo, 3, k(a), k(b))', self.mapping)
code_a = system_mapping.get('a')
code_b = system_mapping.get('b')
self.assertSetEqual(macro.get_capabilities()[EV_KEY], {code_a, code_b})
self.assertSetEqual(macro.get_capabilities()[EV_REL], set())
self.assertEqual(len(macro.child_macros), 2)
def set_foo(value):
# will write foo = 2 into the shared dictionary of macros
macro_2 = parse(f'set(foo, {value})', self.mapping)
loop = asyncio.new_event_loop()
loop.run_until_complete(macro_2.run(lambda: None))
"""foo is not 3"""
process = multiprocessing.Process(target=set_foo, args=(2,))
process.start()
process.join()
self.loop.run_until_complete(macro.run(self.handler))
self.assertListEqual(self.result, [
(EV_KEY, code_b, 1),
(EV_KEY, code_b, 0)
])
"""foo is 3"""
process = multiprocessing.Process(target=set_foo, args=(3,))
process.start()
process.join()
self.loop.run_until_complete(macro.run(self.handler))
self.assertListEqual(self.result, [
(EV_KEY, code_b, 1),
(EV_KEY, code_b, 0),
(EV_KEY, code_a, 1),
(EV_KEY, code_a, 0)
])
def test_count_brackets(self):
self.assertEqual(_count_brackets(''), 0)
self.assertEqual(_count_brackets('()'), 2)

Loading…
Cancel
Save