From f920c2d03125cb9cd29f66a7ad163ee079f625e2 Mon Sep 17 00:00:00 2001 From: sezanzeb Date: Sat, 28 Nov 2020 20:59:24 +0100 Subject: [PATCH] asyncio macros --- keymapper/dev/macros.py | 33 +++++++++++++++++---------------- tests/testcases/macros.py | 38 ++++++++++++++++++++++++++------------ 2 files changed, 43 insertions(+), 28 deletions(-) diff --git a/keymapper/dev/macros.py b/keymapper/dev/macros.py index 4f96f236..c8b17eb1 100644 --- a/keymapper/dev/macros.py +++ b/keymapper/dev/macros.py @@ -23,22 +23,19 @@ To keep it short on the UI, the available functions are one-letter long. -The global functions actually perform the stuff and always return macro -instances that can then be further chained. - -the outermost macro (in the examples below the one created by 'r', +The outermost macro (in the examples below the one created by 'r', 'r' and 'w') will be started, which triggers a chain reaction to execute all of the configured stuff. Examples -------- -r(3, k('a').w(10)): 'a' <10ms> 'a' <10ms> 'a' -r(2, k('a').k('-')).k('b'): 'a' '-' 'a' '-' 'b' -w(1000).m('SHIFT_L', r(2, k('a'))).w(10).k('b'): <1s> 'A' 'A' <10ms> 'b' +r(3, k(a).w(10)): a <10ms> a <10ms> a +r(2, k(a).k(-)).k(b): a - a - b +w(1000).m(SHIFT_L, r(2, k(a))).w(10).k(b): <1s> A A <10ms> b """ -import time +import asyncio import re import random @@ -60,11 +57,12 @@ class _Macro: self.tasks = [] self.handler = handler - def run(self): + async def run(self): """Run the macro.""" for task in self.tasks: - # TODO async, don't block the rest of the application - task() + coroutine = task() + if asyncio.iscoroutine(coroutine): + await coroutine def stop(self): """Stop the macro.""" @@ -103,12 +101,15 @@ class _Macro: def wait(self, min_time, max_time=None): """Wait a random time in milliseconds""" - if max_time is None: - sleeptime = min_time - else: - sleeptime = random.random() * (max_time - min_time) + min_time + async def sleep(): + if max_time is None: + sleeptime = min_time + else: + sleeptime = random.random() * (max_time - min_time) + min_time + + await asyncio.sleep(sleeptime / 1000) - self.tasks.append(lambda: time.sleep(sleeptime / 1000)) + self.tasks.append(sleep) return self diff --git a/tests/testcases/macros.py b/tests/testcases/macros.py index de2def5f..bbf35b8e 100644 --- a/tests/testcases/macros.py +++ b/tests/testcases/macros.py @@ -19,25 +19,29 @@ # along with key-mapper. If not, see . +import time import unittest +import asyncio -from keymapper.dev.macros import parse +from keymapper.dev.macros import parse, _Macro class TestMacros(unittest.TestCase): def setUp(self): self.result = [] self.handler = lambda char, value: self.result.append((char, value)) + self.loop = asyncio.get_event_loop() def tearDown(self): self.result = [] def test_0(self): - parse('k(1)', self.handler).run() + self.loop.run_until_complete(parse('k(1)', self.handler).run()) self.assertListEqual(self.result, [(1, 1), (1, 0)]) def test_1(self): - parse('k(1 2).k(a).k(3)', self.handler).run() + macro = 'k(1 2).k(a).k(3)' + self.loop.run_until_complete(parse(macro, self.handler).run()) self.assertListEqual(self.result, [ ('1 2', 1), ('1 2', 0), ('a', 1), ('a', 0), @@ -45,14 +49,20 @@ class TestMacros(unittest.TestCase): ]) def test_2(self): - parse('r(1, k(k))', self.handler).run() + start = time.time() + macro = 'r(1, k(k))' + self.loop.run_until_complete(parse(macro, self.handler).run()) + self.assertLess(time.time() - start, 0.1) self.assertListEqual(self.result, [ ('k', 1), ('k', 0), ]) def test_3(self): - parse('r(3, k(m).w(200, 400))', self.handler).run() - # TODO test passed time + start = time.time() + macro = 'r(3, k(m).w(100, 200))' + self.loop.run_until_complete(parse(macro, self.handler).run()) + self.assertGreater(time.time() - start, 0.1 * 3) + self.assertLess(time.time() - start, 0.21 * 3) self.assertListEqual(self.result, [ ('m', 1), ('m', 0), ('m', 1), ('m', 0), @@ -60,7 +70,8 @@ class TestMacros(unittest.TestCase): ]) def test_4(self): - parse(' r(2,\nk(\rr ).k(-\n )).k(m) ', self.handler).run() + macro = ' r(2,\nk(\rr ).k(-\n )).k(m) ' + self.loop.run_until_complete(parse(macro, self.handler).run()) self.assertListEqual(self.result, [ ('r', 1), ('r', 0), ('-', 1), ('-', 0), @@ -70,19 +81,22 @@ class TestMacros(unittest.TestCase): ]) def test_5(self): - parse('w(400).r(2,m(w,\rr(2,\tk(r))).w(10).k(k))', self.handler).run() + start = time.time() + macro = 'w(200).r(2,m(w,\rr(2,\tk(r))).w(10).k(k))' + self.loop.run_until_complete(parse(macro, self.handler).run()) + self.assertLess(time.time() - start, 0.23) + self.assertGreater(time.time() - start, 0.21) expected = [('w', 1)] expected += [('r', 1), ('r', 0)] * 2 expected += [('w', 0)] expected += [('k', 1), ('k', 0)] expected *= 2 - # TODO test passed time - # TODO test asyncio self.assertListEqual(self.result, expected) def test_6(self): - # prints nothing without .run - parse('k(a).r(3, k(b))', self.handler) + # does nothing without .run + ret = parse('k(a).r(3, k(b))', self.handler) + self.assertIsInstance(ret, _Macro) self.assertListEqual(self.result, [])