|
|
|
@ -40,7 +40,7 @@ from keymapper.injection.macros.parse import (
|
|
|
|
|
_parse_recurse,
|
|
|
|
|
handle_plus_syntax,
|
|
|
|
|
_count_brackets,
|
|
|
|
|
_split_keyword_arg
|
|
|
|
|
_split_keyword_arg,
|
|
|
|
|
)
|
|
|
|
|
from keymapper.injection.context import Context
|
|
|
|
|
from keymapper.config import config
|
|
|
|
@ -51,7 +51,7 @@ from keymapper.utils import PRESS, RELEASE
|
|
|
|
|
from tests.test import quick_cleanup, new_event
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestMacros(unittest.IsolatedAsyncioTestCase):
|
|
|
|
|
class MacroTestBase(unittest.IsolatedAsyncioTestCase):
|
|
|
|
|
def setUp(self):
|
|
|
|
|
self.result = []
|
|
|
|
|
|
|
|
|
@ -75,6 +75,8 @@ class TestMacros(unittest.IsolatedAsyncioTestCase):
|
|
|
|
|
print(f"\033[90mmacro wrote{(ev_type, code, value)}\033[0m")
|
|
|
|
|
self.result.append((ev_type, code, value))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestMacros(MacroTestBase):
|
|
|
|
|
async def test_named_parameter(self):
|
|
|
|
|
result = []
|
|
|
|
|
|
|
|
|
@ -399,11 +401,11 @@ class TestMacros(unittest.IsolatedAsyncioTestCase):
|
|
|
|
|
self.assertIsNotNone(error)
|
|
|
|
|
error = parse("set(a(), 2)", self.context, True)
|
|
|
|
|
self.assertIsNotNone(error)
|
|
|
|
|
error = parse("set('b,c'), 2)", self.context, True)
|
|
|
|
|
error = parse("set('b,c', 2)", self.context, True)
|
|
|
|
|
self.assertIsNotNone(error)
|
|
|
|
|
error = parse('set("b,c"), 2)', self.context, True)
|
|
|
|
|
error = parse('set("b,c", 2)', self.context, True)
|
|
|
|
|
self.assertIsNotNone(error)
|
|
|
|
|
error = parse('set(A, 2)', self.context, True)
|
|
|
|
|
error = parse("set(A, 2)", self.context, True)
|
|
|
|
|
self.assertIsNone(error)
|
|
|
|
|
|
|
|
|
|
async def test_hold(self):
|
|
|
|
@ -777,9 +779,55 @@ class TestMacros(unittest.IsolatedAsyncioTestCase):
|
|
|
|
|
self.assertListEqual(self.result, [(5421, code, 154)])
|
|
|
|
|
self.assertEqual(len(macro.child_macros), 1)
|
|
|
|
|
|
|
|
|
|
"""ifeq"""
|
|
|
|
|
async def test_wait_for_event(self):
|
|
|
|
|
macro = parse("h(a)", self.context)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# should timeout, no event known
|
|
|
|
|
await asyncio.wait_for(macro.wait_for_event(), 0.1)
|
|
|
|
|
raise AssertionError("Expected asyncio.TimeoutError")
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
# should not timeout because a new event arrived
|
|
|
|
|
macro.notify(new_event(EV_KEY, 1, 1), PRESS)
|
|
|
|
|
await asyncio.wait_for(macro.wait_for_event(), 0.1)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# should timeout, because the previous event doesn't match the filter
|
|
|
|
|
await asyncio.wait_for(macro.wait_for_event(lambda e, a: e.value == 3), 0.1)
|
|
|
|
|
raise AssertionError("Expected asyncio.TimeoutError")
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
# should not timeout because a new event arrived
|
|
|
|
|
macro.notify(new_event(EV_KEY, 1, 3), RELEASE)
|
|
|
|
|
await asyncio.wait_for(macro.wait_for_event(), 0.1)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# should timeout, because the previous event doesn't match the filter
|
|
|
|
|
await asyncio.wait_for(macro.wait_for_event(lambda _, a: a == PRESS), 0.1)
|
|
|
|
|
raise AssertionError("Expected asyncio.TimeoutError")
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
async def test_macro_breaks(self):
|
|
|
|
|
# the first parameter for `repeat` requires an integer, not "foo",
|
|
|
|
|
# which makes `repeat` throw
|
|
|
|
|
macro = parse('set(a, "foo").r($a, k(KEY_A)).k(KEY_B)', self.context)
|
|
|
|
|
await macro.run(self.handler)
|
|
|
|
|
|
|
|
|
|
# .run() it will not throw because r() breaks, and it will properly set
|
|
|
|
|
# it to stopped
|
|
|
|
|
self.assertFalse(macro.running)
|
|
|
|
|
|
|
|
|
|
# k(KEY_B) is not executed, the macro stops
|
|
|
|
|
self.assertListEqual(self.result, [])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestIfEq(MacroTestBase):
|
|
|
|
|
async def test_ifeq_runs(self):
|
|
|
|
|
# deprecated ifeq function, but kept for compatibility reasons
|
|
|
|
|
macro = parse("set(foo, 2).ifeq(foo, 2, k(a), k(b))", self.context)
|
|
|
|
|
code_a = system_mapping.get("a")
|
|
|
|
|
code_b = system_mapping.get("b")
|
|
|
|
@ -820,9 +868,41 @@ class TestMacros(unittest.IsolatedAsyncioTestCase):
|
|
|
|
|
self.assertListEqual(self.result, [(EV_KEY, code_b, 1), (EV_KEY, code_b, 0)])
|
|
|
|
|
self.assertEqual(len(macro.child_macros), 2)
|
|
|
|
|
|
|
|
|
|
"""if_eq"""
|
|
|
|
|
async def test_if_eq(self):
|
|
|
|
|
"""new version of ifeq"""
|
|
|
|
|
code_a = system_mapping.get("a")
|
|
|
|
|
code_b = system_mapping.get("b")
|
|
|
|
|
a_press = [(EV_KEY, code_a, 1), (EV_KEY, code_a, 0)]
|
|
|
|
|
b_press = [(EV_KEY, code_b, 1), (EV_KEY, code_b, 0)]
|
|
|
|
|
|
|
|
|
|
async def test(macro, expected):
|
|
|
|
|
# cleanup
|
|
|
|
|
macro_variables._clear()
|
|
|
|
|
self.assertIsNone(macro_variables.get("a"))
|
|
|
|
|
self.result.clear()
|
|
|
|
|
|
|
|
|
|
async def test_ifeq_runs_multiprocessed(self):
|
|
|
|
|
# test
|
|
|
|
|
macro = parse(macro, self.context)
|
|
|
|
|
await macro.run(self.handler)
|
|
|
|
|
self.assertListEqual(self.result, expected)
|
|
|
|
|
|
|
|
|
|
await test("if_eq(1, 1, k(a), k(b))", a_press)
|
|
|
|
|
await test("if_eq(1, 2, k(a), k(b))", b_press)
|
|
|
|
|
await test('set(a, "foo").if_eq($a, "foo", k(a), k(b))', a_press)
|
|
|
|
|
await test('set(a, "foo").if_eq("foo", $a, k(a), k(b))', a_press)
|
|
|
|
|
await test('set(a, "foo").if_eq("foo", $a, , k(b))', [])
|
|
|
|
|
await test('set(a, "qux").if_eq("foo", $a, k(a), k(b))', b_press)
|
|
|
|
|
await test('set(a, "qux").if_eq($a, "foo", k(a), k(b))', b_press)
|
|
|
|
|
await test('set(a, "qux").if_eq($a, "foo", k(a), )', [])
|
|
|
|
|
await test('set(a, "x").set(b, "y").if_eq($b, $a, k(a), k(b))', b_press)
|
|
|
|
|
await test('set(a, "x").set(b, "y").if_eq($b, $a, k(a), )', [])
|
|
|
|
|
await test('set(a, "x").set(b, "x").if_eq($b, $a, k(a), k(b))', a_press)
|
|
|
|
|
await test('set(a, "x").set(b, "x").if_eq($b, $a, , k(b))', [])
|
|
|
|
|
await test("if_eq($q, $w, k(a), else=k(b))", a_press) # both None
|
|
|
|
|
await test("set(q, 1).if_eq($q, $w, k(a), else=k(b))", b_press)
|
|
|
|
|
await test("set(q, 1).set(w, 1).if_eq($q, $w, k(a), else=k(b))", a_press)
|
|
|
|
|
|
|
|
|
|
async def test_if_eq_runs_multiprocessed(self):
|
|
|
|
|
"""ifeq on variables that have been set in other processes works."""
|
|
|
|
|
macro = parse("if_eq($foo, 3, k(a), k(b))", self.context)
|
|
|
|
|
code_a = system_mapping.get("a")
|
|
|
|
@ -862,41 +942,8 @@ class TestMacros(unittest.IsolatedAsyncioTestCase):
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
async def test_if_eq(self):
|
|
|
|
|
code_a = system_mapping.get("a")
|
|
|
|
|
code_b = system_mapping.get("b")
|
|
|
|
|
a_press = [(EV_KEY, code_a, 1), (EV_KEY, code_a, 0)]
|
|
|
|
|
b_press = [(EV_KEY, code_b, 1), (EV_KEY, code_b, 0)]
|
|
|
|
|
|
|
|
|
|
async def test(macro, expected):
|
|
|
|
|
# cleanup
|
|
|
|
|
macro_variables._clear()
|
|
|
|
|
self.assertIsNone(macro_variables.get("a"))
|
|
|
|
|
self.result.clear()
|
|
|
|
|
|
|
|
|
|
# test
|
|
|
|
|
macro = parse(macro, self.context)
|
|
|
|
|
await macro.run(self.handler)
|
|
|
|
|
self.assertListEqual(self.result, expected)
|
|
|
|
|
|
|
|
|
|
await test("if_eq(1, 1, k(a), k(b))", a_press)
|
|
|
|
|
await test("if_eq(1, 2, k(a), k(b))", b_press)
|
|
|
|
|
await test('set(a, "foo").if_eq($a, "foo", k(a), k(b))', a_press)
|
|
|
|
|
await test('set(a, "foo").if_eq("foo", $a, k(a), k(b))', a_press)
|
|
|
|
|
await test('set(a, "foo").if_eq("foo", $a, , k(b))', [])
|
|
|
|
|
await test('set(a, "qux").if_eq("foo", $a, k(a), k(b))', b_press)
|
|
|
|
|
await test('set(a, "qux").if_eq($a, "foo", k(a), k(b))', b_press)
|
|
|
|
|
await test('set(a, "qux").if_eq($a, "foo", k(a), )', [])
|
|
|
|
|
await test('set(a, "x").set(b, "y").if_eq($b, $a, k(a), k(b))', b_press)
|
|
|
|
|
await test('set(a, "x").set(b, "y").if_eq($b, $a, k(a), )', [])
|
|
|
|
|
await test('set(a, "x").set(b, "x").if_eq($b, $a, k(a), k(b))', a_press)
|
|
|
|
|
await test('set(a, "x").set(b, "x").if_eq($b, $a, , k(b))', [])
|
|
|
|
|
await test("if_eq($q, $w, k(a), else=k(b))", a_press) # both None
|
|
|
|
|
await test("set(q, 1).if_eq($q, $w, k(a), else=k(b))", b_press)
|
|
|
|
|
await test("set(q, 1).set(w, 1).if_eq($q, $w, k(a), else=k(b))", a_press)
|
|
|
|
|
|
|
|
|
|
"""if_single"""
|
|
|
|
|
|
|
|
|
|
class TestIfSingle(MacroTestBase):
|
|
|
|
|
async def test_if_single(self):
|
|
|
|
|
macro = parse("if_single(k(x), k(y))", self.context)
|
|
|
|
|
self.assertEqual(len(macro.child_macros), 2)
|
|
|
|
@ -915,9 +962,12 @@ class TestMacros(unittest.IsolatedAsyncioTestCase):
|
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
|
|
|
|
self.assertListEqual(self.result, [(EV_KEY, x, 1), (EV_KEY, x, 0)])
|
|
|
|
|
self.assertFalse(macro.running)
|
|
|
|
|
|
|
|
|
|
async def test_if_single_ignores_releases(self):
|
|
|
|
|
macro = parse("if_single(k(x), k(y))", self.context)
|
|
|
|
|
# the timeout won't break the macro, everything happens well within that
|
|
|
|
|
# timeframe.
|
|
|
|
|
macro = parse("if_single(k(x), k(y), timeout=100000)", self.context)
|
|
|
|
|
self.assertEqual(len(macro.child_macros), 2)
|
|
|
|
|
|
|
|
|
|
a = system_mapping.get("a")
|
|
|
|
@ -944,9 +994,12 @@ class TestMacros(unittest.IsolatedAsyncioTestCase):
|
|
|
|
|
macro.notify(new_event(EV_KEY, a, 1), PRESS)
|
|
|
|
|
await asyncio.sleep(0.05)
|
|
|
|
|
self.assertListEqual(self.result, [(EV_KEY, x, 1), (EV_KEY, x, 0)])
|
|
|
|
|
self.assertFalse(macro.running)
|
|
|
|
|
|
|
|
|
|
async def test_if_not_single(self):
|
|
|
|
|
# also works if if_single is a child macro
|
|
|
|
|
# Will run the `else` macro if another key is pressed.
|
|
|
|
|
# Also works if if_single is a child macro, i.e. the event is passed to it
|
|
|
|
|
# from the outside macro correctly.
|
|
|
|
|
macro = parse("r(1, if_single(k(x), k(y)))", self.context)
|
|
|
|
|
self.assertEqual(len(macro.child_macros), 1)
|
|
|
|
|
self.assertEqual(len(macro.child_macros[0].child_macros), 2)
|
|
|
|
@ -965,6 +1018,7 @@ class TestMacros(unittest.IsolatedAsyncioTestCase):
|
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
|
|
|
|
self.assertListEqual(self.result, [(EV_KEY, y, 1), (EV_KEY, y, 0)])
|
|
|
|
|
self.assertFalse(macro.running)
|
|
|
|
|
|
|
|
|
|
async def test_if_not_single_none(self):
|
|
|
|
|
macro = parse("if_single(k(x),)", self.context)
|
|
|
|
@ -983,9 +1037,30 @@ class TestMacros(unittest.IsolatedAsyncioTestCase):
|
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
|
|
|
|
self.assertListEqual(self.result, [])
|
|
|
|
|
self.assertFalse(macro.running)
|
|
|
|
|
|
|
|
|
|
async def test_if_single_times_out(self):
|
|
|
|
|
macro = parse("set(t, 300).if_single(k(x), k(y), timeout=$t)", self.context)
|
|
|
|
|
self.assertEqual(len(macro.child_macros), 2)
|
|
|
|
|
|
|
|
|
|
a = system_mapping.get("a")
|
|
|
|
|
y = system_mapping.get("y")
|
|
|
|
|
|
|
|
|
|
macro.notify(new_event(EV_KEY, a, 1), PRESS)
|
|
|
|
|
asyncio.ensure_future(macro.run(self.handler))
|
|
|
|
|
|
|
|
|
|
# no timeout yet
|
|
|
|
|
await asyncio.sleep(0.2)
|
|
|
|
|
self.assertListEqual(self.result, [])
|
|
|
|
|
self.assertTrue(macro.running)
|
|
|
|
|
|
|
|
|
|
"""if_tap"""
|
|
|
|
|
# times out now
|
|
|
|
|
await asyncio.sleep(0.2)
|
|
|
|
|
self.assertListEqual(self.result, [(EV_KEY, y, 1), (EV_KEY, y, 0)])
|
|
|
|
|
self.assertFalse(macro.running)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestIfTap(MacroTestBase):
|
|
|
|
|
async def test_if_tap(self):
|
|
|
|
|
macro = parse("if_tap(k(x), k(y), 100)", self.context)
|
|
|
|
|
self.assertEqual(len(macro.child_macros), 2)
|
|
|
|
@ -1001,6 +1076,7 @@ class TestMacros(unittest.IsolatedAsyncioTestCase):
|
|
|
|
|
await asyncio.sleep(0.05)
|
|
|
|
|
|
|
|
|
|
self.assertListEqual(self.result, [(EV_KEY, x, 1), (EV_KEY, x, 0)])
|
|
|
|
|
self.assertFalse(macro.running)
|
|
|
|
|
|
|
|
|
|
async def test_if_tap_none(self):
|
|
|
|
|
# first param none
|
|
|
|
@ -1027,6 +1103,8 @@ class TestMacros(unittest.IsolatedAsyncioTestCase):
|
|
|
|
|
await asyncio.sleep(0.05)
|
|
|
|
|
self.assertListEqual(self.result, [])
|
|
|
|
|
|
|
|
|
|
self.assertFalse(macro.running)
|
|
|
|
|
|
|
|
|
|
async def test_if_not_tap(self):
|
|
|
|
|
macro = parse("if_tap(k(x), k(y), 50)", self.context)
|
|
|
|
|
self.assertEqual(len(macro.child_macros), 2)
|
|
|
|
@ -1042,6 +1120,7 @@ class TestMacros(unittest.IsolatedAsyncioTestCase):
|
|
|
|
|
await asyncio.sleep(0.05)
|
|
|
|
|
|
|
|
|
|
self.assertListEqual(self.result, [(EV_KEY, y, 1), (EV_KEY, y, 0)])
|
|
|
|
|
self.assertFalse(macro.running)
|
|
|
|
|
|
|
|
|
|
async def test_if_not_tap_named(self):
|
|
|
|
|
macro = parse("if_tap(k(x), k(y), timeout=50)", self.context)
|
|
|
|
@ -1058,52 +1137,8 @@ class TestMacros(unittest.IsolatedAsyncioTestCase):
|
|
|
|
|
await asyncio.sleep(0.05)
|
|
|
|
|
|
|
|
|
|
self.assertListEqual(self.result, [(EV_KEY, y, 1), (EV_KEY, y, 0)])
|
|
|
|
|
|
|
|
|
|
async def test_wait_for_event(self):
|
|
|
|
|
macro = parse("h(a)", self.context)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# should timeout, no event known
|
|
|
|
|
await asyncio.wait_for(macro.wait_for_event(), 0.1)
|
|
|
|
|
raise AssertionError("Expected asyncio.TimeoutError")
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
# should not timeout because a new event arrived
|
|
|
|
|
macro.notify(new_event(EV_KEY, 1, 1), PRESS)
|
|
|
|
|
await asyncio.wait_for(macro.wait_for_event(), 0.1)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# should timeout, because the previous event doesn't match the filter
|
|
|
|
|
await asyncio.wait_for(macro.wait_for_event(lambda e, a: e.value == 3), 0.1)
|
|
|
|
|
raise AssertionError("Expected asyncio.TimeoutError")
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
# should not timeout because a new event arrived
|
|
|
|
|
macro.notify(new_event(EV_KEY, 1, 3), RELEASE)
|
|
|
|
|
await asyncio.wait_for(macro.wait_for_event(), 0.1)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# should timeout, because the previous event doesn't match the filter
|
|
|
|
|
await asyncio.wait_for(macro.wait_for_event(lambda _, a: a == PRESS), 0.1)
|
|
|
|
|
raise AssertionError("Expected asyncio.TimeoutError")
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
async def test_macro_breaks(self):
|
|
|
|
|
# the first parameter for `repeat` requires an integer, not "foo",
|
|
|
|
|
# which makes `repeat` throw
|
|
|
|
|
macro = parse('set(a, "foo").r($a, k(KEY_A)).k(KEY_B)', self.context)
|
|
|
|
|
await macro.run(self.handler)
|
|
|
|
|
|
|
|
|
|
# .run() it will not throw because r() breaks, and it will properly set
|
|
|
|
|
# it to stopped
|
|
|
|
|
self.assertFalse(macro.running)
|
|
|
|
|
|
|
|
|
|
# k(KEY_B) is not executed, the macro stops
|
|
|
|
|
self.assertListEqual(self.result, [])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
unittest.main()
|
|
|
|
|