ifeq-and-returning
sezanzeb 3 years ago
parent 7220ad7277
commit 8ac1d978ef

File diff suppressed because it is too large Load Diff

@ -30,7 +30,7 @@ import evdev
from evdev.ecodes import EV_KEY, EV_REL
from keymapper.logger import logger
from keymapper.groups import classify, GAMEPAD, groups
from keymapper.groups import classify, GAMEPAD
from keymapper import utils
from keymapper.mapping import DISABLE_CODE
from keymapper.injection.keycode_mapper import KeycodeMapper

@ -21,7 +21,7 @@
"""Executes more complex patterns of keystrokes.
To keep it short on the UI, the available functions are one-letter long.
To keep it short on the UI, basic functions are one letter long.
The outermost macro (in the examples below the one created by 'r',
'r' and 'w') will be started, which triggers a chain reaction to execute
@ -37,7 +37,10 @@ w(1000).m(Shift_L, r(2, k(a))).w(10).k(b): <1s> A A <10ms> b
import asyncio
import re
import traceback
import copy
import multiprocessing
import atexit
from evdev.ecodes import ecodes, EV_KEY, EV_REL, REL_X, REL_Y, REL_WHEEL, \
REL_HWHEEL
@ -46,8 +49,65 @@ from keymapper.logger import logger
from keymapper.state import system_mapping
# process wide information about variables set by macros
macro_variables = {}
class SharedDict:
"""Share a dictionary across processes."""
# because unittests terminate all child processes in cleanup I can't use
# multiprocessing.Manager
def __init__(self):
"""Create a shared dictionary."""
super().__init__()
self.pipe = multiprocessing.Pipe()
self.process = None
atexit.register(self._stop)
def _start(self):
"""Ensure the process to manage the dictionary is running."""
if self.process is not None and self.process.is_alive():
return
# if the manager has already been running in the past but stopped
# for some reason, the dictionary contents are lost
self.process = multiprocessing.Process(target=self.manage)
self.process.start()
def manage(self):
"""Manage the dictionary, handle read and write requests."""
shared_dict = dict()
while True:
message = self.pipe[0].recv()
if message[0] == 'stop':
return
if message[0] == 'set':
shared_dict[message[1]] = message[2]
if message[0] == 'get':
self.pipe[0].send(shared_dict.get(message[1]))
def _stop(self):
"""Stop the managing process."""
self.pipe[1].send(('stop',))
def get(self, key):
"""Get a value from the dictionary."""
return self.__getitem__(key)
def __setitem__(self, key, value):
self._start()
self.pipe[1].send(('set', key, value))
def __getitem__(self, key):
self._start()
self.pipe[1].send(('get', key))
value = self.pipe[1].recv()
return value
def __del__(self):
self._stop()
macro_variables = SharedDict()
def is_this_a_macro(output):
@ -65,8 +125,14 @@ def is_this_a_macro(output):
class _Macro:
"""Supports chaining and preparing actions.
Calling functions on _Macro does not inject anything yet, it means that
once .run is used it will be executed along with all other queued tasks.
Calling functions like keycode on _Macro doesn't inject any events yet,
it means that once .run is used it will be executed along with all other
queued tasks.
Those functions need to construct an asyncio coroutine and append it to
self.tasks. This makes parameter checking during compile time possible.
Coroutines receive a handler as argument, which is a function that can be
used to inject input events into the system.
"""
def __init__(self, code, mapping):
"""Create a macro instance that can be populated with tasks.
@ -78,10 +144,13 @@ class _Macro:
mapping : Mapping
The preset object, needed for some config stuff
"""
self.tasks = []
self.code = code
self.mapping = mapping
# List of coroutines that will be called sequentially.
# This is the compiled code
self.tasks = []
# is a lock so that h() can be realized
self._holding_lock = asyncio.Lock()
@ -94,6 +163,8 @@ class _Macro:
}
self.child_macros = []
self.keystroke_sleep_ms = None
def is_holding(self):
"""Check if the macro is waiting for a key to be released."""
@ -124,19 +195,24 @@ class _Macro:
if self.running:
logger.error('Tried to run already running macro "%s"', self.code)
return
self.keystroke_sleep_ms = self.mapping.get('macros.keystroke_sleep_ms')
self.running = True
return_value = None
for task in self.tasks:
# one could call tasks the compiled macros. it's lambda functions
# that receive the handler as an argument, so that they know
# where to send the event to.
coroutine = task(handler)
if asyncio.iscoroutine(coroutine):
await coroutine
return_value = await coroutine
# done
self.running = False
return return_value # TODO test
def press_key(self):
"""The user pressed the key down."""
if self.is_holding():
@ -156,6 +232,38 @@ class _Macro:
for macro in self.child_macros:
macro.release_key()
async def _resolve(self, handler, cast, macro, default):
"""Get the return value of a macro.
Parameters
----------
handler : function
Handler passed to `run`
cast : type
Of which type the return value should be
macro : _Macro
The macro that is expected to return something
default : any
The value to return if the macro didn't return anything or if
the returned type is incompatible
"""
if isinstance(macro, _Macro):
result = await macro.run(handler)
if result is None:
return default
try:
return cast(result)
except ValueError:
logger.error(
f'Macro "%s" returned a non-%s "%s"',
result.code, cast.__name__, result
)
return default
return default
def hold(self, macro=None):
"""Loops the execution until key release."""
if macro is None:
@ -191,8 +299,6 @@ class _Macro:
self.tasks.append(task)
self.child_macros.append(macro)
return self
def modify(self, modifier, macro):
"""Do stuff while a modifier is activated.
@ -218,19 +324,18 @@ class _Macro:
self.child_macros.append(macro)
self.tasks.append(lambda handler: handler(EV_KEY, code, 1))
self.add_keycode_pause()
self.tasks.append(self._keycode_pause)
self.tasks.append(macro.run)
self.add_keycode_pause()
self.tasks.append(self._keycode_pause)
self.tasks.append(lambda handler: handler(EV_KEY, code, 0))
self.add_keycode_pause()
return self
self.tasks.append(self._keycode_pause)
def repeat(self, repeats, macro):
"""Repeat actions.
Parameters
----------
repeats : int
repeats : int or _Macro
macro : _Macro
"""
if not isinstance(macro, _Macro):
@ -239,32 +344,26 @@ class _Macro:
f'a macro (like k(a)), but got "{macro}"'
)
try:
repeats = int(repeats)
except ValueError as error:
raise ValueError(
'Expected the first param for r (repeat) to be '
f'a number, but got "{repeats}"'
) from error
for _ in range(repeats):
self.tasks.append(macro.run)
async def repeat(handler):
nonlocal repeats
return_value = None
repeats = await self._resolve(handler, int, repeats, 1)
for _ in range(repeats):
return_value = await macro.run(handler)
return return_value # TODO test
self.tasks.append(repeat)
self.child_macros.append(macro)
return self
def add_keycode_pause(self):
async def _keycode_pause(self):
"""To add a pause between keystrokes."""
sleeptime = self.mapping.get('macros.keystroke_sleep_ms') / 1000
async def sleep(_):
await asyncio.sleep(sleeptime)
self.tasks.append(sleep)
await asyncio.sleep(self.keystroke_sleep_ms / 1000)
def keycode(self, symbol):
"""Write the symbol."""
# TODO something handy for all macro functions that
# if the parameter is a macro tells it to run it and then
# use the return value
symbol = str(symbol)
code = system_mapping.get(symbol)
@ -273,11 +372,14 @@ class _Macro:
self.capabilities[EV_KEY].add(code)
self.tasks.append(lambda handler: handler(EV_KEY, code, 1))
self.add_keycode_pause()
self.tasks.append(lambda handler: handler(EV_KEY, code, 0))
self.add_keycode_pause()
return self
async def keycode(handler):
# TODO _resolve
handler(EV_KEY, code, 1)
await self._keycode_pause()
handler(EV_KEY, code, 0)
await self._keycode_pause()
self.tasks.append(keycode)
def event(self, ev_type, code, value):
"""Write any event.
@ -308,9 +410,7 @@ class _Macro:
self.capabilities[ev_type].add(code)
self.tasks.append(lambda handler: handler(ev_type, code, value))
self.add_keycode_pause()
return self
self.tasks.append(self._keycode_pause)
def mouse(self, direction, speed):
"""Shortcut for h(e(...))."""
@ -325,8 +425,6 @@ class _Macro:
child_macro.event(EV_REL, code, value)
self.hold(child_macro)
return self # TODO test
def wheel(self, direction, speed):
"""Shortcut for h(e(...))."""
code, value = {
@ -340,8 +438,6 @@ class _Macro:
child_macro.wait(100 / speed)
self.hold(child_macro)
return self # TODO test
def wait(self, sleeptime):
"""Wait time in milliseconds."""
try:
@ -358,7 +454,6 @@ class _Macro:
await asyncio.sleep(sleeptime)
self.tasks.append(sleep)
return self
def set(self, variable, value):
"""Set a variable to a certain value."""
@ -368,7 +463,16 @@ class _Macro:
macro_variables[variable] = value
self.tasks.append(set)
return self
def get(self, variable, default=None):
"""Resolve a variable from the shared value and return it."""
# TODO test
async def get(_):
value = macro_variables.get(variable)
logger.debug('"%s" is "%s"', variable, value)
return value or default
self.tasks.append(get)
def ifeq(self, variable, value, then, otherwise=None):
"""Perform an equality check.
@ -377,25 +481,34 @@ class _Macro:
----------
variable : string
value : string | number
then : _Macro
otherwise : _Macro
then : any
otherwise : any
"""
if isinstance(then, _Macro):
self.child_macros.append(then)
elif system_mapping.get(then) is not None:
self.capabilities[EV_KEY].add(then)
if isinstance(otherwise, _Macro):
self.child_macros.append(otherwise)
elif system_mapping.get(otherwise) is not None:
self.capabilities[EV_KEY].add(otherwise)
# TODO test
async def ifeq(handler):
logger.debug('"%s" is "%s"', variable, macro_variables[variable])
if macro_variables[variable] == value:
await then.run(handler)
set_value = macro_variables.get(variable) # TODO test .get instead of []
logger.debug('"%s" is "%s"', variable, set_value)
# TODO test returns stuff
if set_value == value:
if not isinstance(then, _Macro):
return then
return await then.run(handler)
elif otherwise is not None:
await otherwise.run(handler)
if not isinstance(otherwise, _Macro):
return otherwise
return await otherwise.run(handler)
self.tasks.append(ifeq)
self.child_macros.append(then)
if otherwise is not None:
self.child_macros.append(otherwise)
return self
def _extract_params(inner):
"""Extract parameters from the inner contents of a call.
@ -467,12 +580,7 @@ def _parse_recurse(macro, mapping, macro_instance=None, depth=0):
A macro instance to add tasks to
depth : int
"""
# to anyone who knows better about compilers and thinks this is horrible:
# please make a pull request. Because it probably is.
# not using eval for security reasons ofc. And this syntax doesn't need
# string quotes for its params.
# If this gets more complicated than that I'd rather make a macro
# editor GUI and store them as json.
# not using eval for security reasons
assert isinstance(macro, str)
assert isinstance(depth, int)
@ -504,6 +612,7 @@ def _parse_recurse(macro, mapping, macro_instance=None, depth=0):
'wheel': (macro_instance.wheel, 2, 2),
'ifeq': (macro_instance.ifeq, 3, 4),
'set': (macro_instance.set, 2, 2),
'get': (macro_instance.get, 1, 2),
}
function = functions.get(call)
@ -628,4 +737,6 @@ def parse(macro, mapping, return_errors=False):
return macro_object if not return_errors else None
except Exception as error:
logger.error('Failed to parse macro "%s": %s', macro, error.__repr__())
# print the traceback in case this is a bug of key-mapper
logger.debug(''.join(traceback.format_tb(error.__traceback__)).strip())
return str(error) if return_errors else None

@ -0,0 +1,77 @@
# Macros
This document contains examples for macros with explanations. You are very
welcome to contribute your examples as well if you have a special use-case
via a pull-request.
## The syntax
The system is very trivial and basic, lots of features known from other
scripting languages are missing.
Multiple functions are chained using `.`.
There are three datatypes for function parameters: Macro, string and number.
Unlike other programming languages, `qux(bar())` would not run `bar` and then
`qux`. Instead, `bar()` is an rvalue of type macro and only when `qux` is
called, the implementation of `qux` might decide to run `bar()`. That means
that reading a macro from left to right always yields the correct order of
operations. This is comparable to using lambda functions in python.
Strings don't need quotes. This makes macros look simpler, and I hope
this decision won't cause problems later when the macro system keeps advancing.
Keywords/names/strings available are either:
- variable names (used in `set`, `get` and `ifeq`)
- funcion names (like `r` or `mouse`)
- key names (like `a` or `BTN_LEFT`)
Depending on how names are used they have a different meaning. If you were
to use `set(BTN_LEFT, 1)`, `k(BTN_KEFT)` would still write a left mouse click,
but `get(BTN_LEFT, 2)` yields `1` now. `get(BTN_RIGHT, 2)` would yield 2.
Whitespaces, newlines and tabs don't have any meaning and are removed
when the macro gets compiled.
## Combinations spanning multiple devices
**Keyboard:**
`space` -> `set(foo, bar).m(space, h()).set(foo, 0)`
`space` -> `idk1(foo, bar, space, 0)`
**Mouse:**
`middle` -> `ifeq(foo, bar, m(a, h()), m(BTN_MIDDLE, h()))`
`middle` -> `idk2(foo, bar, a, BTN_MIDDLE)`
# TODO capabilities in this case?
`middle` -> `m(ifeq(foo, bar, a, BTN_MIDDLE), h())`
Apply both presets.
If you press space on your keyboard, it will write a space exactly like
it used to.
If you hold down space and press the middle button of your mouse, it will
write "a" instead.
If you just press the middle button of your mouse it behaves like a regular
middle mouse button.
**Explanation:**
`m(space, h())` makes your key work exactly like it was mapped to "space".
It will inject a key-down event if you press it, and a key-up event after
releasing. Because `m` waits for `h()` to finish, which will block as long
as your key is held down.
`set(foo, 1).h().set(foo, 0)` sets "foo" to 1, waits until your key is
released and then sets "foo" to 0. `set` and `ifeq` work on shared memory,
so all injections will see your variables.
Combine both to get a key that works like a normal key, but that also
works as a modifier for other keys of other devices
`ifeq(foo, 1, a, BTN_MIDDLE)` returns either "a" or "BTN_MIDDLE" depending
on the value of foo. # TODO

@ -68,6 +68,9 @@ names can be chained using ` + `.
## Macros
For more advanced examples and more explanation of the syntax,
see [readme/macros.md](readme/macros.md)
It is possible to write timed macros into the center column:
- `r` repeats the execution of the second parameter
- `w` waits in milliseconds
@ -77,6 +80,12 @@ It is possible to write timed macros into the center column:
- `h` executes the parameter as long as the key is pressed down
- `.` executes two actions behind each other
- `mouse` and `wheel` take a direction like "up" and speed as parameters
- `set` set a variable to a value, visible to all injection processes
- `get` gets a variable to insert its value into a macro parameter
- `ifeq` if that variable is a certain value do something
The names for the most common functions are kept short, to make it easy to
write them into the constrained space.
Examples:
- `k(1).k(2)` 1, 2
@ -87,6 +96,9 @@ Examples:
- `mouse(right, 4)` which keeps moving the mouse while pressed.
Made out of `h(e(...))` internally
- `wheel(down, 1)` keeps scrolling down while held
- `set(foo, 1)` set "foo" to 1
- `ifeq(foo, 1, k(x), k(y))` if "foo" is 1, write x, otherwise y
- `r(get(foo), k(a))` repeat writing "a" depending on the value of "foo"
Syntax errors are shown in the UI on save. Each `k` function adds a short
delay of 10ms between key-down, key-up and at the end. See
@ -132,9 +144,6 @@ Wayland than with X11 for me.
# Advanced
If you don't have a graphical user interface, you'll need to edit the
configuration files.
## How to use unavailable symbols
For example Japanese letters. Only works in X11.
@ -165,9 +174,13 @@ to write "ヤ" now when pressing the key.
## Configuration Files
The default configuration is stored at `~/.config/key-mapper/config.json`.
The current default configuration as of 0.8.1 looks like, with
an example autoload entry:
If you don't have a graphical user interface, you'll need to edit the
configuration files.
The default configuration is stored at `~/.config/key-mapper/config.json`,
which doesn't include any mappings, but rather other parameters that
are interesting for injections. The current default configuration as of 0.8.1
looks like, with an example autoload entry:
```json
{

@ -518,6 +518,65 @@ class TestMacros(unittest.TestCase):
self.assertListEqual(self.result, [(5421, code, 154)])
self.assertEqual(len(macro.child_macros), 1)
def test_set_get_returns(self):
macro = parse('set(foo, 2).r(get(foo), k(a))', self.mapping)
code = system_mapping.get('a')
self.assertSetEqual(macro.get_capabilities()[EV_KEY], {code})
self.assertSetEqual(macro.get_capabilities()[EV_REL], set())
self.loop.run_until_complete(macro.run(self.handler))
self.assertListEqual(self.result, [
(EV_KEY, code, 1),
(EV_KEY, code, 0)
] * 2)
self.assertEqual(len(macro.child_macros), 1)
def test_ifeq_runs(self):
macro = parse('set(foo, 2).ifeq(foo, 3, k(b), k(a))', self.mapping)
code_a = system_mapping.get('a')
code_b = system_mapping.get('b')
self.assertSetEqual(macro.get_capabilities()[EV_KEY], {code_a, code_b})
self.assertSetEqual(macro.get_capabilities()[EV_REL], set())
self.loop.run_until_complete(macro.run(self.handler))
self.assertListEqual(self.result, [
(EV_KEY, code_a, 1),
(EV_KEY, code_a, 0)
])
self.assertEqual(len(macro.child_macros), 2)
def test_ifeq_adds_capabilities(self):
macro = parse('set(foo, 2).k(ifeq(foo, 3, b, a))', self.mapping)
code_a = system_mapping.get('a')
code_b = system_mapping.get('b')
self.assertSetEqual(macro.get_capabilities()[EV_KEY], {code_a, code_b})
self.assertSetEqual(macro.get_capabilities()[EV_REL], set())
self.loop.run_until_complete(macro.run(self.handler))
self.assertListEqual(self.result, [
(EV_KEY, code_a, 1),
(EV_KEY, code_a, 0)
])
self.assertEqual(len(macro.child_macros), 2)
def test_set_get_multiprocessed(self):
macro = parse('set(foo, a)', self.mapping)
def process():
macro_2 = parse('k(get(foo))', self.mapping)
code_a = system_mapping.get('a')
code_b = system_mapping.get('b')
self.assertSetEqual(macro.get_capabilities()[EV_KEY], {code_a, code_b})
self.assertSetEqual(macro.get_capabilities()[EV_REL], set())
self.loop.run_until_complete(macro.run(self.handler))
self.assertListEqual(self.result, [
(EV_KEY, code_a, 1),
(EV_KEY, code_a, 0)
])
self.assertEqual(len(macro.child_macros), 2)
def test_count_brackets(self):
self.assertEqual(_count_brackets(''), 0)
self.assertEqual(_count_brackets('()'), 2)

Loading…
Cancel
Save