mirror of
https://github.com/sezanzeb/input-remapper
synced 2024-11-04 12:00:16 +00:00
#197 Improved logs
This commit is contained in:
parent
a150b06748
commit
320fd37ca7
@ -75,12 +75,10 @@ def utils(options):
|
||||
from keymapper.groups import groups
|
||||
for group in groups:
|
||||
print(group.key)
|
||||
sys.exit(0)
|
||||
|
||||
if options.key_names:
|
||||
from keymapper.system_mapping import system_mapping
|
||||
print('\n'.join(system_mapping.list_names()))
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def communicate(options, daemon):
|
||||
@ -101,13 +99,15 @@ def communicate(options, daemon):
|
||||
group = groups.find(key=options.device)
|
||||
|
||||
if group is None:
|
||||
logger.error('unknown device "%s"', options.device)
|
||||
logger.error('Unknown device "%s"', options.device)
|
||||
sys.exit(1)
|
||||
|
||||
return group
|
||||
|
||||
if daemon is None:
|
||||
sys.exit(0)
|
||||
# probably broken tests
|
||||
logger.error('Daemon missing', options.device)
|
||||
sys.exit(1)
|
||||
|
||||
if options.config_dir is not None:
|
||||
path = os.path.abspath(os.path.expanduser(os.path.join(
|
||||
@ -232,24 +232,25 @@ if __name__ == '__main__':
|
||||
|
||||
options = parser.parse_args(sys.argv[1:])
|
||||
|
||||
if options.version:
|
||||
log_info()
|
||||
sys.exit(0)
|
||||
|
||||
if options.debug:
|
||||
add_filehandler('/var/log/key-mapper-control')
|
||||
update_verbosity(True)
|
||||
|
||||
logger.debug('Call for "%s"', sys.argv)
|
||||
|
||||
if options.command is not None:
|
||||
if options.command in INTERNALS:
|
||||
internals(options)
|
||||
elif options.command in COMMANDS:
|
||||
from keymapper.daemon import Daemon
|
||||
daemon = Daemon.connect(fallback=False)
|
||||
communicate(options, daemon)
|
||||
else:
|
||||
logger.error('Unknown command "%s"', options.command)
|
||||
if options.version:
|
||||
log_info()
|
||||
else:
|
||||
utils(options)
|
||||
logger.debug('Call for "%s"', sys.argv)
|
||||
|
||||
if options.command is not None:
|
||||
if options.command in INTERNALS:
|
||||
internals(options)
|
||||
elif options.command in COMMANDS:
|
||||
from keymapper.daemon import Daemon
|
||||
daemon = Daemon.connect(fallback=False)
|
||||
communicate(options, daemon)
|
||||
else:
|
||||
logger.error('Unknown command "%s"', options.command)
|
||||
else:
|
||||
utils(options)
|
||||
|
||||
logger.info('Finished')
|
||||
|
@ -212,10 +212,9 @@ def add_filehandler(log_path=LOG_PATH):
|
||||
shutil.rmtree(log_path)
|
||||
|
||||
if os.path.exists(log_path):
|
||||
# the logfile should not be longer than 1000 lines to avoid overflowing
|
||||
# the storage
|
||||
# the logfile should not be too long to avoid overflowing the storage
|
||||
with open(log_path, "r") as file:
|
||||
content = file.readlines()[-200:] + ["---\n"]
|
||||
content = file.readlines()[-1000:] + ["---\n"]
|
||||
|
||||
with open(log_path, "w") as file:
|
||||
file.truncate(0)
|
||||
|
@ -83,6 +83,13 @@ from tests.test import (
|
||||
)
|
||||
|
||||
|
||||
def wait_for_uinput_write():
|
||||
start = time.time()
|
||||
if not uinput_write_history_pipe[0].poll(timeout=10):
|
||||
raise AssertionError('No event written within 10 seconds')
|
||||
return float(time.time() - start)
|
||||
|
||||
|
||||
class TestInjector(unittest.IsolatedAsyncioTestCase):
|
||||
new_gamepad_path = "/dev/input/event100"
|
||||
|
||||
@ -794,20 +801,21 @@ class TestInjector(unittest.IsolatedAsyncioTestCase):
|
||||
event = uinput_write_history_pipe[0].recv()
|
||||
self.assertEqual(event.t, (EV_KEY, code_c, 1))
|
||||
|
||||
time.sleep(EVENT_READ_TIMEOUT * 5)
|
||||
# in 5 more read-loop ticks, nothing new should have happened
|
||||
self.assertFalse(uinput_write_history_pipe[0].poll())
|
||||
# in 5 more read-loop ticks, nothing new should have happened.
|
||||
# add a bit of a head-start of one EVENT_READ_TIMEOUT to avoid race-conditions
|
||||
# in tests
|
||||
self.assertFalse(uinput_write_history_pipe[0].poll(timeout=EVENT_READ_TIMEOUT * 6))
|
||||
|
||||
time.sleep(EVENT_READ_TIMEOUT * 6)
|
||||
# 5 more and it should be within the second phase in which
|
||||
# the horizontal wheel is used. add some tolerance
|
||||
self.assertTrue(uinput_write_history_pipe[0].poll())
|
||||
self.assertAlmostEqual(wait_for_uinput_write(), EVENT_READ_TIMEOUT * 5, delta=EVENT_READ_TIMEOUT)
|
||||
event = uinput_write_history_pipe[0].recv()
|
||||
self.assertEqual(event.t, (EV_KEY, code_b, 1))
|
||||
|
||||
time.sleep(EVENT_READ_TIMEOUT * 10 + 5 / 60)
|
||||
# after 21 read-loop ticks all events should be consumed, wait for
|
||||
# at least 3 (=5) producer-ticks so that the debouncers are triggered.
|
||||
# at least 3 (lets use 5 so that the test passes even if it lags)
|
||||
# ticks so that the debouncers are triggered.
|
||||
# Key-up events for both wheel events should be written now that no
|
||||
# new key-down event arrived.
|
||||
events = read_write_history_pipe()
|
||||
|
@ -77,13 +77,13 @@ class TestLogger(unittest.TestCase):
|
||||
os.mknod(path)
|
||||
|
||||
with open(path, "w") as f:
|
||||
f.write("line\n" * 1000 + "end")
|
||||
f.write("line\n" * 2000 + "end")
|
||||
|
||||
add_filehandler(os.path.join(tmp, "logger-test"))
|
||||
with open(path, "r") as f:
|
||||
# it only keeps the newest information
|
||||
content = f.readlines()
|
||||
self.assertLess(len(content), 500)
|
||||
self.assertLess(len(content), 1100)
|
||||
self.assertEqual(content[-1], "end---\n")
|
||||
# after "---" new log will appear
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user