From 79f11b4e346640728a44738c75412e7048ff7e5f Mon Sep 17 00:00:00 2001 From: sezanzeb Date: Sat, 2 Oct 2021 22:40:05 +0200 Subject: [PATCH] #183 timeout for if_single, TestMacros split --- keymapper/injection/macros/macro.py | 31 ++-- readme/macros.md | 6 +- readme/pylint.svg | 4 +- tests/testcases/test_macros.py | 213 ++++++++++++++++------------ 4 files changed, 152 insertions(+), 102 deletions(-) diff --git a/keymapper/injection/macros/macro.py b/keymapper/injection/macros/macro.py index 5702ba9c..b7b9cb3b 100644 --- a/keymapper/injection/macros/macro.py +++ b/keymapper/injection/macros/macro.py @@ -494,7 +494,7 @@ class Macro: def add_set(self, variable, value): """Set a variable to a certain value.""" - _type_check_variablename(variable) # TODO test + _type_check_variablename(variable) async def task(_): # can also copy with set(a, $b) @@ -563,9 +563,9 @@ class Macro: self.child_macros.append(_else) async def task(handler): + coroutine = self._holding_event.wait() + resolved_timeout = _resolve(timeout, [int, float]) / 1000 try: - coroutine = self._holding_event.wait() - resolved_timeout = _resolve(timeout, [int, float]) / 1000 await asyncio.wait_for(coroutine, resolved_timeout) if then: await then.run(handler) @@ -575,7 +575,7 @@ class Macro: self.tasks.append(task) - def add_if_single(self, then, otherwise): + def add_if_single(self, then, otherwise, timeout=None): """If a key was pressed without combining it.""" _type_check(then, [Macro, None], "if_single", 1) _type_check(otherwise, [Macro, None], "if_single", 2) @@ -598,14 +598,25 @@ class Macro: if action in (PRESS, PRESS_NEGATIVE): return True - await self.wait_for_event(event_filter) + coroutine = self.wait_for_event(event_filter) + resolved_timeout = _resolve(timeout, allowed_types=[int, float, None]) + try: + if resolved_timeout is not None: + await asyncio.wait_for(coroutine, resolved_timeout / 1000) + else: + await coroutine - mappable_event_2 = (self._newest_event.type, self._newest_event.code) + mappable_event_2 = (self._newest_event.type, self._newest_event.code) + combined = mappable_event_1 != mappable_event_2 + if not combined: + # no timeout and not combined + if then: + await then.run(handler) + return + except asyncio.TimeoutError: + pass - combined = mappable_event_1 != mappable_event_2 - if then and not combined: - await then.run(handler) - elif otherwise: + if otherwise: await otherwise.run(handler) self.tasks.append(task) diff --git a/readme/macros.md b/readme/macros.md index 8432c25f..6191e3c4 100644 --- a/readme/macros.md +++ b/readme/macros.md @@ -208,9 +208,12 @@ Bear in mind that anti-cheat software might detect macros in games. > If the key that is mapped to the macro is pressed and released, run the `then` macro. > > If another key is pressed while the triggering key is held down, run the `else` macro. +> +> If a timeout number is provided, the macro will run `else` if no event arrives for +> more than the configured number in milliseconds. > > ```c# -> if_single(then: Macro | None, else: Macro | None) +> if_single(then: Macro | None, else: Macro | None, timeout: int | None) > ``` > > Examples: @@ -218,6 +221,7 @@ Bear in mind that anti-cheat software might detect macros in games. > ```c# > if_single(key(KEY_A), key(KEY_B)) > if_single(then=key(KEY_A), else=key(KEY_B)) +> if_single(key(KEY_A), key(KEY_B), timeout=1000) > ``` ## Syntax diff --git a/readme/pylint.svg b/readme/pylint.svg index 923dddf5..2733c52e 100644 --- a/readme/pylint.svg +++ b/readme/pylint.svg @@ -17,7 +17,7 @@ pylint - 9.64 - 9.64 + 9.65 + 9.65 \ No newline at end of file diff --git a/tests/testcases/test_macros.py b/tests/testcases/test_macros.py index 7c192978..7f6a0a0d 100644 --- a/tests/testcases/test_macros.py +++ b/tests/testcases/test_macros.py @@ -40,7 +40,7 @@ from keymapper.injection.macros.parse import ( _parse_recurse, handle_plus_syntax, _count_brackets, - _split_keyword_arg + _split_keyword_arg, ) from keymapper.injection.context import Context from keymapper.config import config @@ -51,7 +51,7 @@ from keymapper.utils import PRESS, RELEASE from tests.test import quick_cleanup, new_event -class TestMacros(unittest.IsolatedAsyncioTestCase): +class MacroTestBase(unittest.IsolatedAsyncioTestCase): def setUp(self): self.result = [] @@ -75,6 +75,8 @@ class TestMacros(unittest.IsolatedAsyncioTestCase): print(f"\033[90mmacro wrote{(ev_type, code, value)}\033[0m") self.result.append((ev_type, code, value)) + +class TestMacros(MacroTestBase): async def test_named_parameter(self): result = [] @@ -399,11 +401,11 @@ class TestMacros(unittest.IsolatedAsyncioTestCase): self.assertIsNotNone(error) error = parse("set(a(), 2)", self.context, True) self.assertIsNotNone(error) - error = parse("set('b,c'), 2)", self.context, True) + error = parse("set('b,c', 2)", self.context, True) self.assertIsNotNone(error) - error = parse('set("b,c"), 2)', self.context, True) + error = parse('set("b,c", 2)', self.context, True) self.assertIsNotNone(error) - error = parse('set(A, 2)', self.context, True) + error = parse("set(A, 2)", self.context, True) self.assertIsNone(error) async def test_hold(self): @@ -777,9 +779,55 @@ class TestMacros(unittest.IsolatedAsyncioTestCase): self.assertListEqual(self.result, [(5421, code, 154)]) self.assertEqual(len(macro.child_macros), 1) - """ifeq""" + async def test_wait_for_event(self): + macro = parse("h(a)", self.context) + + try: + # should timeout, no event known + await asyncio.wait_for(macro.wait_for_event(), 0.1) + raise AssertionError("Expected asyncio.TimeoutError") + except asyncio.TimeoutError: + pass + + # should not timeout because a new event arrived + macro.notify(new_event(EV_KEY, 1, 1), PRESS) + await asyncio.wait_for(macro.wait_for_event(), 0.1) + + try: + # should timeout, because the previous event doesn't match the filter + await asyncio.wait_for(macro.wait_for_event(lambda e, a: e.value == 3), 0.1) + raise AssertionError("Expected asyncio.TimeoutError") + except asyncio.TimeoutError: + pass + + # should not timeout because a new event arrived + macro.notify(new_event(EV_KEY, 1, 3), RELEASE) + await asyncio.wait_for(macro.wait_for_event(), 0.1) + + try: + # should timeout, because the previous event doesn't match the filter + await asyncio.wait_for(macro.wait_for_event(lambda _, a: a == PRESS), 0.1) + raise AssertionError("Expected asyncio.TimeoutError") + except asyncio.TimeoutError: + pass + + async def test_macro_breaks(self): + # the first parameter for `repeat` requires an integer, not "foo", + # which makes `repeat` throw + macro = parse('set(a, "foo").r($a, k(KEY_A)).k(KEY_B)', self.context) + await macro.run(self.handler) + + # .run() it will not throw because r() breaks, and it will properly set + # it to stopped + self.assertFalse(macro.running) + + # k(KEY_B) is not executed, the macro stops + self.assertListEqual(self.result, []) + +class TestIfEq(MacroTestBase): async def test_ifeq_runs(self): + # deprecated ifeq function, but kept for compatibility reasons macro = parse("set(foo, 2).ifeq(foo, 2, k(a), k(b))", self.context) code_a = system_mapping.get("a") code_b = system_mapping.get("b") @@ -820,9 +868,41 @@ class TestMacros(unittest.IsolatedAsyncioTestCase): self.assertListEqual(self.result, [(EV_KEY, code_b, 1), (EV_KEY, code_b, 0)]) self.assertEqual(len(macro.child_macros), 2) - """if_eq""" + async def test_if_eq(self): + """new version of ifeq""" + code_a = system_mapping.get("a") + code_b = system_mapping.get("b") + a_press = [(EV_KEY, code_a, 1), (EV_KEY, code_a, 0)] + b_press = [(EV_KEY, code_b, 1), (EV_KEY, code_b, 0)] + + async def test(macro, expected): + # cleanup + macro_variables._clear() + self.assertIsNone(macro_variables.get("a")) + self.result.clear() - async def test_ifeq_runs_multiprocessed(self): + # test + macro = parse(macro, self.context) + await macro.run(self.handler) + self.assertListEqual(self.result, expected) + + await test("if_eq(1, 1, k(a), k(b))", a_press) + await test("if_eq(1, 2, k(a), k(b))", b_press) + await test('set(a, "foo").if_eq($a, "foo", k(a), k(b))', a_press) + await test('set(a, "foo").if_eq("foo", $a, k(a), k(b))', a_press) + await test('set(a, "foo").if_eq("foo", $a, , k(b))', []) + await test('set(a, "qux").if_eq("foo", $a, k(a), k(b))', b_press) + await test('set(a, "qux").if_eq($a, "foo", k(a), k(b))', b_press) + await test('set(a, "qux").if_eq($a, "foo", k(a), )', []) + await test('set(a, "x").set(b, "y").if_eq($b, $a, k(a), k(b))', b_press) + await test('set(a, "x").set(b, "y").if_eq($b, $a, k(a), )', []) + await test('set(a, "x").set(b, "x").if_eq($b, $a, k(a), k(b))', a_press) + await test('set(a, "x").set(b, "x").if_eq($b, $a, , k(b))', []) + await test("if_eq($q, $w, k(a), else=k(b))", a_press) # both None + await test("set(q, 1).if_eq($q, $w, k(a), else=k(b))", b_press) + await test("set(q, 1).set(w, 1).if_eq($q, $w, k(a), else=k(b))", a_press) + + async def test_if_eq_runs_multiprocessed(self): """ifeq on variables that have been set in other processes works.""" macro = parse("if_eq($foo, 3, k(a), k(b))", self.context) code_a = system_mapping.get("a") @@ -862,41 +942,8 @@ class TestMacros(unittest.IsolatedAsyncioTestCase): ], ) - async def test_if_eq(self): - code_a = system_mapping.get("a") - code_b = system_mapping.get("b") - a_press = [(EV_KEY, code_a, 1), (EV_KEY, code_a, 0)] - b_press = [(EV_KEY, code_b, 1), (EV_KEY, code_b, 0)] - - async def test(macro, expected): - # cleanup - macro_variables._clear() - self.assertIsNone(macro_variables.get("a")) - self.result.clear() - - # test - macro = parse(macro, self.context) - await macro.run(self.handler) - self.assertListEqual(self.result, expected) - - await test("if_eq(1, 1, k(a), k(b))", a_press) - await test("if_eq(1, 2, k(a), k(b))", b_press) - await test('set(a, "foo").if_eq($a, "foo", k(a), k(b))', a_press) - await test('set(a, "foo").if_eq("foo", $a, k(a), k(b))', a_press) - await test('set(a, "foo").if_eq("foo", $a, , k(b))', []) - await test('set(a, "qux").if_eq("foo", $a, k(a), k(b))', b_press) - await test('set(a, "qux").if_eq($a, "foo", k(a), k(b))', b_press) - await test('set(a, "qux").if_eq($a, "foo", k(a), )', []) - await test('set(a, "x").set(b, "y").if_eq($b, $a, k(a), k(b))', b_press) - await test('set(a, "x").set(b, "y").if_eq($b, $a, k(a), )', []) - await test('set(a, "x").set(b, "x").if_eq($b, $a, k(a), k(b))', a_press) - await test('set(a, "x").set(b, "x").if_eq($b, $a, , k(b))', []) - await test("if_eq($q, $w, k(a), else=k(b))", a_press) # both None - await test("set(q, 1).if_eq($q, $w, k(a), else=k(b))", b_press) - await test("set(q, 1).set(w, 1).if_eq($q, $w, k(a), else=k(b))", a_press) - - """if_single""" +class TestIfSingle(MacroTestBase): async def test_if_single(self): macro = parse("if_single(k(x), k(y))", self.context) self.assertEqual(len(macro.child_macros), 2) @@ -915,9 +962,12 @@ class TestMacros(unittest.IsolatedAsyncioTestCase): await asyncio.sleep(0.1) self.assertListEqual(self.result, [(EV_KEY, x, 1), (EV_KEY, x, 0)]) + self.assertFalse(macro.running) async def test_if_single_ignores_releases(self): - macro = parse("if_single(k(x), k(y))", self.context) + # the timeout won't break the macro, everything happens well within that + # timeframe. + macro = parse("if_single(k(x), k(y), timeout=100000)", self.context) self.assertEqual(len(macro.child_macros), 2) a = system_mapping.get("a") @@ -944,9 +994,12 @@ class TestMacros(unittest.IsolatedAsyncioTestCase): macro.notify(new_event(EV_KEY, a, 1), PRESS) await asyncio.sleep(0.05) self.assertListEqual(self.result, [(EV_KEY, x, 1), (EV_KEY, x, 0)]) + self.assertFalse(macro.running) async def test_if_not_single(self): - # also works if if_single is a child macro + # Will run the `else` macro if another key is pressed. + # Also works if if_single is a child macro, i.e. the event is passed to it + # from the outside macro correctly. macro = parse("r(1, if_single(k(x), k(y)))", self.context) self.assertEqual(len(macro.child_macros), 1) self.assertEqual(len(macro.child_macros[0].child_macros), 2) @@ -965,6 +1018,7 @@ class TestMacros(unittest.IsolatedAsyncioTestCase): await asyncio.sleep(0.1) self.assertListEqual(self.result, [(EV_KEY, y, 1), (EV_KEY, y, 0)]) + self.assertFalse(macro.running) async def test_if_not_single_none(self): macro = parse("if_single(k(x),)", self.context) @@ -983,9 +1037,30 @@ class TestMacros(unittest.IsolatedAsyncioTestCase): await asyncio.sleep(0.1) self.assertListEqual(self.result, []) + self.assertFalse(macro.running) + + async def test_if_single_times_out(self): + macro = parse("set(t, 300).if_single(k(x), k(y), timeout=$t)", self.context) + self.assertEqual(len(macro.child_macros), 2) + + a = system_mapping.get("a") + y = system_mapping.get("y") + + macro.notify(new_event(EV_KEY, a, 1), PRESS) + asyncio.ensure_future(macro.run(self.handler)) + + # no timeout yet + await asyncio.sleep(0.2) + self.assertListEqual(self.result, []) + self.assertTrue(macro.running) - """if_tap""" + # times out now + await asyncio.sleep(0.2) + self.assertListEqual(self.result, [(EV_KEY, y, 1), (EV_KEY, y, 0)]) + self.assertFalse(macro.running) + +class TestIfTap(MacroTestBase): async def test_if_tap(self): macro = parse("if_tap(k(x), k(y), 100)", self.context) self.assertEqual(len(macro.child_macros), 2) @@ -1001,6 +1076,7 @@ class TestMacros(unittest.IsolatedAsyncioTestCase): await asyncio.sleep(0.05) self.assertListEqual(self.result, [(EV_KEY, x, 1), (EV_KEY, x, 0)]) + self.assertFalse(macro.running) async def test_if_tap_none(self): # first param none @@ -1027,6 +1103,8 @@ class TestMacros(unittest.IsolatedAsyncioTestCase): await asyncio.sleep(0.05) self.assertListEqual(self.result, []) + self.assertFalse(macro.running) + async def test_if_not_tap(self): macro = parse("if_tap(k(x), k(y), 50)", self.context) self.assertEqual(len(macro.child_macros), 2) @@ -1042,6 +1120,7 @@ class TestMacros(unittest.IsolatedAsyncioTestCase): await asyncio.sleep(0.05) self.assertListEqual(self.result, [(EV_KEY, y, 1), (EV_KEY, y, 0)]) + self.assertFalse(macro.running) async def test_if_not_tap_named(self): macro = parse("if_tap(k(x), k(y), timeout=50)", self.context) @@ -1058,52 +1137,8 @@ class TestMacros(unittest.IsolatedAsyncioTestCase): await asyncio.sleep(0.05) self.assertListEqual(self.result, [(EV_KEY, y, 1), (EV_KEY, y, 0)]) - - async def test_wait_for_event(self): - macro = parse("h(a)", self.context) - - try: - # should timeout, no event known - await asyncio.wait_for(macro.wait_for_event(), 0.1) - raise AssertionError("Expected asyncio.TimeoutError") - except asyncio.TimeoutError: - pass - - # should not timeout because a new event arrived - macro.notify(new_event(EV_KEY, 1, 1), PRESS) - await asyncio.wait_for(macro.wait_for_event(), 0.1) - - try: - # should timeout, because the previous event doesn't match the filter - await asyncio.wait_for(macro.wait_for_event(lambda e, a: e.value == 3), 0.1) - raise AssertionError("Expected asyncio.TimeoutError") - except asyncio.TimeoutError: - pass - - # should not timeout because a new event arrived - macro.notify(new_event(EV_KEY, 1, 3), RELEASE) - await asyncio.wait_for(macro.wait_for_event(), 0.1) - - try: - # should timeout, because the previous event doesn't match the filter - await asyncio.wait_for(macro.wait_for_event(lambda _, a: a == PRESS), 0.1) - raise AssertionError("Expected asyncio.TimeoutError") - except asyncio.TimeoutError: - pass - - async def test_macro_breaks(self): - # the first parameter for `repeat` requires an integer, not "foo", - # which makes `repeat` throw - macro = parse('set(a, "foo").r($a, k(KEY_A)).k(KEY_B)', self.context) - await macro.run(self.handler) - - # .run() it will not throw because r() breaks, and it will properly set - # it to stopped self.assertFalse(macro.running) - # k(KEY_B) is not executed, the macro stops - self.assertListEqual(self.result, []) - if __name__ == "__main__": unittest.main()