2021-03-21 18:15:20 +00:00
|
|
|
#!/usr/bin/python3
|
|
|
|
# -*- coding: utf-8 -*-
|
2022-01-01 12:00:49 +00:00
|
|
|
# input-remapper - GUI for device specific keyboard mappings
|
2023-02-27 16:07:42 +00:00
|
|
|
# Copyright (C) 2023 sezanzeb <proxima@sezanzeb.de>
|
2021-03-21 18:15:20 +00:00
|
|
|
#
|
2022-01-01 12:00:49 +00:00
|
|
|
# This file is part of input-remapper.
|
2021-03-21 18:15:20 +00:00
|
|
|
#
|
2022-01-01 12:00:49 +00:00
|
|
|
# input-remapper is free software: you can redistribute it and/or modify
|
2021-03-21 18:15:20 +00:00
|
|
|
# it under the terms of the GNU General Public License as published by
|
|
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
|
|
# (at your option) any later version.
|
|
|
|
#
|
2022-01-01 12:00:49 +00:00
|
|
|
# input-remapper is distributed in the hope that it will be useful,
|
2021-03-21 18:15:20 +00:00
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
# GNU General Public License for more details.
|
|
|
|
#
|
|
|
|
# You should have received a copy of the GNU General Public License
|
2022-01-01 12:00:49 +00:00
|
|
|
# along with input-remapper. If not, see <https://www.gnu.org/licenses/>.
|
2022-07-23 08:53:41 +00:00
|
|
|
import asyncio
|
|
|
|
import multiprocessing
|
2021-03-21 18:15:20 +00:00
|
|
|
|
2022-11-23 21:55:28 +00:00
|
|
|
from tests.lib.cleanup import quick_cleanup
|
|
|
|
from tests.lib.tmp import tmp
|
2022-03-03 22:42:37 +00:00
|
|
|
|
2021-03-21 18:15:20 +00:00
|
|
|
import unittest
|
|
|
|
import select
|
2021-10-20 21:05:50 +00:00
|
|
|
import time
|
2022-01-17 23:32:13 +00:00
|
|
|
import os
|
2021-03-21 18:15:20 +00:00
|
|
|
|
2022-01-01 12:00:49 +00:00
|
|
|
from inputremapper.ipc.pipe import Pipe
|
|
|
|
from inputremapper.ipc.shared_dict import SharedDict
|
|
|
|
from inputremapper.ipc.socket import Server, Client, Base
|
2021-03-21 18:15:20 +00:00
|
|
|
|
2021-10-02 18:16:17 +00:00
|
|
|
|
|
|
|
class TestSharedDict(unittest.TestCase):
|
2021-10-20 21:05:50 +00:00
|
|
|
def setUp(self):
|
|
|
|
self.shared_dict = SharedDict()
|
2021-11-20 12:41:34 +00:00
|
|
|
self.shared_dict.start()
|
2021-10-20 21:05:50 +00:00
|
|
|
time.sleep(0.02)
|
|
|
|
|
2021-10-02 18:16:17 +00:00
|
|
|
def tearDown(self):
|
|
|
|
quick_cleanup()
|
|
|
|
|
|
|
|
def test_returns_none(self):
|
2021-10-20 21:05:50 +00:00
|
|
|
self.assertIsNone(self.shared_dict.get("a"))
|
|
|
|
self.assertIsNone(self.shared_dict["a"])
|
2021-10-02 18:16:17 +00:00
|
|
|
|
|
|
|
def test_set_get(self):
|
2021-10-20 21:05:50 +00:00
|
|
|
self.shared_dict["a"] = 3
|
|
|
|
self.assertEqual(self.shared_dict.get("a"), 3)
|
|
|
|
self.assertEqual(self.shared_dict["a"], 3)
|
2021-10-02 18:16:17 +00:00
|
|
|
|
2021-03-21 18:15:20 +00:00
|
|
|
|
|
|
|
class TestSocket(unittest.TestCase):
|
|
|
|
def test_socket(self):
|
|
|
|
def test(s1, s2):
|
|
|
|
self.assertEqual(s2.recv(), None)
|
|
|
|
|
|
|
|
s1.send(1)
|
|
|
|
self.assertTrue(s2.poll())
|
|
|
|
self.assertEqual(s2.recv(), 1)
|
|
|
|
self.assertFalse(s2.poll())
|
|
|
|
self.assertEqual(s2.recv(), None)
|
|
|
|
|
|
|
|
s1.send(2)
|
|
|
|
self.assertTrue(s2.poll())
|
|
|
|
s1.send(3)
|
|
|
|
self.assertTrue(s2.poll())
|
|
|
|
self.assertEqual(s2.recv(), 2)
|
|
|
|
self.assertTrue(s2.poll())
|
|
|
|
self.assertEqual(s2.recv(), 3)
|
|
|
|
self.assertFalse(s2.poll())
|
|
|
|
self.assertEqual(s2.recv(), None)
|
|
|
|
|
2022-01-17 23:32:13 +00:00
|
|
|
server = Server(os.path.join(tmp, "socket1"))
|
|
|
|
client = Client(os.path.join(tmp, "socket1"))
|
2021-03-21 18:15:20 +00:00
|
|
|
test(server, client)
|
|
|
|
|
2022-01-17 23:32:13 +00:00
|
|
|
client = Client(os.path.join(tmp, "socket2"))
|
|
|
|
server = Server(os.path.join(tmp, "socket2"))
|
2021-03-21 18:15:20 +00:00
|
|
|
test(client, server)
|
|
|
|
|
|
|
|
def test_not_connected_1(self):
|
|
|
|
# client discards old message, because it might have had a purpose
|
|
|
|
# for a different client and not for the current one
|
2022-01-17 23:32:13 +00:00
|
|
|
server = Server(os.path.join(tmp, "socket3"))
|
2021-03-21 18:15:20 +00:00
|
|
|
server.send(1)
|
|
|
|
|
2022-01-17 23:32:13 +00:00
|
|
|
client = Client(os.path.join(tmp, "socket3"))
|
2021-03-21 18:15:20 +00:00
|
|
|
server.send(2)
|
|
|
|
|
|
|
|
self.assertTrue(client.poll())
|
|
|
|
self.assertEqual(client.recv(), 2)
|
|
|
|
self.assertFalse(client.poll())
|
|
|
|
self.assertEqual(client.recv(), None)
|
|
|
|
|
|
|
|
def test_not_connected_2(self):
|
2022-01-17 23:32:13 +00:00
|
|
|
client = Client(os.path.join(tmp, "socket4"))
|
2021-03-21 18:15:20 +00:00
|
|
|
client.send(1)
|
|
|
|
|
2022-01-17 23:32:13 +00:00
|
|
|
server = Server(os.path.join(tmp, "socket4"))
|
2021-03-21 18:15:20 +00:00
|
|
|
client.send(2)
|
|
|
|
|
|
|
|
self.assertTrue(server.poll())
|
|
|
|
self.assertEqual(server.recv(), 2)
|
|
|
|
self.assertFalse(server.poll())
|
|
|
|
self.assertEqual(server.recv(), None)
|
|
|
|
|
|
|
|
def test_select(self):
|
2022-04-18 11:52:59 +00:00
|
|
|
"""Is compatible to select.select."""
|
2022-01-17 23:32:13 +00:00
|
|
|
server = Server(os.path.join(tmp, "socket6"))
|
|
|
|
client = Client(os.path.join(tmp, "socket6"))
|
2021-03-21 18:15:20 +00:00
|
|
|
|
|
|
|
server.send(1)
|
|
|
|
ready = select.select([client], [], [], 0)[0][0]
|
|
|
|
self.assertEqual(ready, client)
|
|
|
|
|
|
|
|
client.send(2)
|
|
|
|
ready = select.select([server], [], [], 0)[0][0]
|
|
|
|
self.assertEqual(ready, server)
|
|
|
|
|
|
|
|
def test_base_abstract(self):
|
2021-09-26 10:44:56 +00:00
|
|
|
self.assertRaises(NotImplementedError, lambda: Base("foo"))
|
2021-03-21 18:15:20 +00:00
|
|
|
self.assertRaises(NotImplementedError, lambda: Base.connect(None))
|
|
|
|
self.assertRaises(NotImplementedError, lambda: Base.reconnect(None))
|
|
|
|
self.assertRaises(NotImplementedError, lambda: Base.fileno(None))
|
|
|
|
|
|
|
|
|
2022-07-23 08:53:41 +00:00
|
|
|
class TestPipe(unittest.IsolatedAsyncioTestCase):
|
2021-03-21 18:15:20 +00:00
|
|
|
def test_pipe_single(self):
|
2022-01-17 23:32:13 +00:00
|
|
|
p1 = Pipe(os.path.join(tmp, "pipe"))
|
2021-03-21 18:15:20 +00:00
|
|
|
self.assertEqual(p1.recv(), None)
|
|
|
|
|
|
|
|
p1.send(1)
|
|
|
|
self.assertTrue(p1.poll())
|
|
|
|
self.assertEqual(p1.recv(), 1)
|
|
|
|
self.assertFalse(p1.poll())
|
|
|
|
self.assertEqual(p1.recv(), None)
|
|
|
|
|
|
|
|
p1.send(2)
|
|
|
|
self.assertTrue(p1.poll())
|
|
|
|
p1.send(3)
|
|
|
|
self.assertTrue(p1.poll())
|
|
|
|
self.assertEqual(p1.recv(), 2)
|
|
|
|
self.assertTrue(p1.poll())
|
|
|
|
self.assertEqual(p1.recv(), 3)
|
|
|
|
self.assertFalse(p1.poll())
|
|
|
|
self.assertEqual(p1.recv(), None)
|
|
|
|
|
|
|
|
def test_pipe_duo(self):
|
2022-01-17 23:32:13 +00:00
|
|
|
p1 = Pipe(os.path.join(tmp, "pipe"))
|
|
|
|
p2 = Pipe(os.path.join(tmp, "pipe"))
|
2021-03-21 18:15:20 +00:00
|
|
|
self.assertEqual(p2.recv(), None)
|
|
|
|
|
|
|
|
p1.send(1)
|
|
|
|
self.assertEqual(p2.recv(), 1)
|
|
|
|
self.assertEqual(p2.recv(), None)
|
|
|
|
|
|
|
|
p1.send(2)
|
|
|
|
p1.send(3)
|
|
|
|
self.assertEqual(p2.recv(), 2)
|
|
|
|
self.assertEqual(p2.recv(), 3)
|
|
|
|
self.assertEqual(p2.recv(), None)
|
|
|
|
|
2022-07-23 08:53:41 +00:00
|
|
|
async def test_async_for_loop(self):
|
|
|
|
p1 = Pipe(os.path.join(tmp, "pipe"))
|
|
|
|
iterator = p1.__aiter__()
|
|
|
|
p1.send(1)
|
|
|
|
|
|
|
|
self.assertEqual(await iterator.__anext__(), 1)
|
|
|
|
|
|
|
|
read_task = asyncio.Task(iterator.__anext__())
|
|
|
|
timeout_task = asyncio.Task(asyncio.sleep(1))
|
|
|
|
|
|
|
|
done, pending = await asyncio.wait(
|
|
|
|
(read_task, timeout_task), return_when=asyncio.FIRST_COMPLETED
|
|
|
|
)
|
|
|
|
self.assertIn(timeout_task, done)
|
|
|
|
self.assertIn(read_task, pending)
|
|
|
|
read_task.cancel()
|
|
|
|
|
|
|
|
async def test_async_for_loop_duo(self):
|
|
|
|
def writer():
|
|
|
|
p = Pipe(os.path.join(tmp, "pipe"))
|
|
|
|
for i in range(3):
|
|
|
|
p.send(i)
|
|
|
|
time.sleep(0.5)
|
|
|
|
for i in range(3):
|
|
|
|
p.send(i)
|
|
|
|
time.sleep(0.1)
|
|
|
|
p.send("stop now")
|
|
|
|
|
|
|
|
p1 = Pipe(os.path.join(tmp, "pipe"))
|
|
|
|
|
|
|
|
w_process = multiprocessing.Process(target=writer)
|
|
|
|
w_process.start()
|
|
|
|
|
|
|
|
messages = []
|
|
|
|
async for msg in p1:
|
|
|
|
messages.append(msg)
|
|
|
|
if msg == "stop now":
|
|
|
|
break
|
|
|
|
|
|
|
|
self.assertEqual(messages, [0, 1, 2, 0, 1, 2, "stop now"])
|
|
|
|
|
2021-03-21 18:15:20 +00:00
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
unittest.main()
|