asyncio macros

This commit is contained in:
sezanzeb 2020-11-28 20:59:24 +01:00
parent 1e8f965394
commit f920c2d031
2 changed files with 43 additions and 28 deletions

View File

@ -23,22 +23,19 @@
To keep it short on the UI, the available functions are one-letter long. To keep it short on the UI, the available functions are one-letter long.
The global functions actually perform the stuff and always return macro The outermost macro (in the examples below the one created by 'r',
instances that can then be further chained.
the outermost macro (in the examples below the one created by 'r',
'r' and 'w') will be started, which triggers a chain reaction to execute 'r' and 'w') will be started, which triggers a chain reaction to execute
all of the configured stuff. all of the configured stuff.
Examples Examples
-------- --------
r(3, k('a').w(10)): 'a' <10ms> 'a' <10ms> 'a' r(3, k(a).w(10)): a <10ms> a <10ms> a
r(2, k('a').k('-')).k('b'): 'a' '-' 'a' '-' 'b' r(2, k(a).k(-)).k(b): a - a - b
w(1000).m('SHIFT_L', r(2, k('a'))).w(10).k('b'): <1s> 'A' 'A' <10ms> 'b' w(1000).m(SHIFT_L, r(2, k(a))).w(10).k(b): <1s> A A <10ms> b
""" """
import time import asyncio
import re import re
import random import random
@ -60,11 +57,12 @@ class _Macro:
self.tasks = [] self.tasks = []
self.handler = handler self.handler = handler
def run(self): async def run(self):
"""Run the macro.""" """Run the macro."""
for task in self.tasks: for task in self.tasks:
# TODO async, don't block the rest of the application coroutine = task()
task() if asyncio.iscoroutine(coroutine):
await coroutine
def stop(self): def stop(self):
"""Stop the macro.""" """Stop the macro."""
@ -103,12 +101,15 @@ class _Macro:
def wait(self, min_time, max_time=None): def wait(self, min_time, max_time=None):
"""Wait a random time in milliseconds""" """Wait a random time in milliseconds"""
async def sleep():
if max_time is None: if max_time is None:
sleeptime = min_time sleeptime = min_time
else: else:
sleeptime = random.random() * (max_time - min_time) + min_time sleeptime = random.random() * (max_time - min_time) + min_time
self.tasks.append(lambda: time.sleep(sleeptime / 1000)) await asyncio.sleep(sleeptime / 1000)
self.tasks.append(sleep)
return self return self

View File

@ -19,25 +19,29 @@
# along with key-mapper. If not, see <https://www.gnu.org/licenses/>. # along with key-mapper. If not, see <https://www.gnu.org/licenses/>.
import time
import unittest import unittest
import asyncio
from keymapper.dev.macros import parse from keymapper.dev.macros import parse, _Macro
class TestMacros(unittest.TestCase): class TestMacros(unittest.TestCase):
def setUp(self): def setUp(self):
self.result = [] self.result = []
self.handler = lambda char, value: self.result.append((char, value)) self.handler = lambda char, value: self.result.append((char, value))
self.loop = asyncio.get_event_loop()
def tearDown(self): def tearDown(self):
self.result = [] self.result = []
def test_0(self): def test_0(self):
parse('k(1)', self.handler).run() self.loop.run_until_complete(parse('k(1)', self.handler).run())
self.assertListEqual(self.result, [(1, 1), (1, 0)]) self.assertListEqual(self.result, [(1, 1), (1, 0)])
def test_1(self): def test_1(self):
parse('k(1 2).k(a).k(3)', self.handler).run() macro = 'k(1 2).k(a).k(3)'
self.loop.run_until_complete(parse(macro, self.handler).run())
self.assertListEqual(self.result, [ self.assertListEqual(self.result, [
('1 2', 1), ('1 2', 0), ('1 2', 1), ('1 2', 0),
('a', 1), ('a', 0), ('a', 1), ('a', 0),
@ -45,14 +49,20 @@ class TestMacros(unittest.TestCase):
]) ])
def test_2(self): def test_2(self):
parse('r(1, k(k))', self.handler).run() start = time.time()
macro = 'r(1, k(k))'
self.loop.run_until_complete(parse(macro, self.handler).run())
self.assertLess(time.time() - start, 0.1)
self.assertListEqual(self.result, [ self.assertListEqual(self.result, [
('k', 1), ('k', 0), ('k', 1), ('k', 0),
]) ])
def test_3(self): def test_3(self):
parse('r(3, k(m).w(200, 400))', self.handler).run() start = time.time()
# TODO test passed time macro = 'r(3, k(m).w(100, 200))'
self.loop.run_until_complete(parse(macro, self.handler).run())
self.assertGreater(time.time() - start, 0.1 * 3)
self.assertLess(time.time() - start, 0.21 * 3)
self.assertListEqual(self.result, [ self.assertListEqual(self.result, [
('m', 1), ('m', 0), ('m', 1), ('m', 0),
('m', 1), ('m', 0), ('m', 1), ('m', 0),
@ -60,7 +70,8 @@ class TestMacros(unittest.TestCase):
]) ])
def test_4(self): def test_4(self):
parse(' r(2,\nk(\rr ).k(-\n )).k(m) ', self.handler).run() macro = ' r(2,\nk(\rr ).k(-\n )).k(m) '
self.loop.run_until_complete(parse(macro, self.handler).run())
self.assertListEqual(self.result, [ self.assertListEqual(self.result, [
('r', 1), ('r', 0), ('r', 1), ('r', 0),
('-', 1), ('-', 0), ('-', 1), ('-', 0),
@ -70,19 +81,22 @@ class TestMacros(unittest.TestCase):
]) ])
def test_5(self): def test_5(self):
parse('w(400).r(2,m(w,\rr(2,\tk(r))).w(10).k(k))', self.handler).run() start = time.time()
macro = 'w(200).r(2,m(w,\rr(2,\tk(r))).w(10).k(k))'
self.loop.run_until_complete(parse(macro, self.handler).run())
self.assertLess(time.time() - start, 0.23)
self.assertGreater(time.time() - start, 0.21)
expected = [('w', 1)] expected = [('w', 1)]
expected += [('r', 1), ('r', 0)] * 2 expected += [('r', 1), ('r', 0)] * 2
expected += [('w', 0)] expected += [('w', 0)]
expected += [('k', 1), ('k', 0)] expected += [('k', 1), ('k', 0)]
expected *= 2 expected *= 2
# TODO test passed time
# TODO test asyncio
self.assertListEqual(self.result, expected) self.assertListEqual(self.result, expected)
def test_6(self): def test_6(self):
# prints nothing without .run # does nothing without .run
parse('k(a).r(3, k(b))', self.handler) ret = parse('k(a).r(3, k(b))', self.handler)
self.assertIsInstance(ret, _Macro)
self.assertListEqual(self.result, []) self.assertListEqual(self.result, [])