input-remapper/keymapper/dev/reader.py
2021-01-07 17:15:12 +01:00

357 lines
12 KiB
Python

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# key-mapper - GUI for device specific keyboard mappings
# Copyright (C) 2021 sezanzeb <proxima@hip70890b.de>
#
# This file is part of key-mapper.
#
# key-mapper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# key-mapper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with key-mapper. If not, see <https://www.gnu.org/licenses/>.
"""Keeps reading keycodes in the background for the UI to use."""
import sys
import time
import select
import multiprocessing
import threading
import evdev
from evdev.ecodes import EV_KEY, EV_ABS, ABS_MISC, EV_REL
from keymapper.logger import logger
from keymapper.key import Key
from keymapper.state import custom_mapping
from keymapper.getdevices import get_devices
from keymapper.dev import utils
CLOSE = 1
PRIORITIES = {
EV_KEY: 100,
EV_ABS: 50,
}
FILTER_THRESHOLD = 0.01
DEBOUNCE_TICKS = 3
def prioritize(events):
"""Return the event that is most likely desired to be mapped.
KEY over ABS and everything over ABS_MISC.
"""
events = [
event for event in events
if event is not None
]
return sorted(events, key=lambda e: (
PRIORITIES.get(e.type, 0),
not (e.type == EV_ABS and e.code == ABS_MISC),
abs(e.value)
))[-1]
def will_report_up(ev_type):
"""Check if this event will ever report a key up (wheels)."""
return ev_type != EV_REL
def event_unix_time(event):
"""Get the unix timestamp of an event."""
if event is None:
return 0
return event.sec + event.usec / 1000000
class _KeycodeReader:
"""Keeps reading keycodes in the background for the UI to use.
Does not serve any purpose for the injection service.
When a button was pressed, the newest keycode can be obtained from this
object. GTK has get_key for keyboard keys, but KeycodeReader also
has knowledge of buttons like the middle-mouse button.
"""
def __init__(self):
self.virtual_devices = []
self._pipe = None
self._process = None
self.fail_counter = 0
self.previous_event = None
self.previous_result = None
self._unreleased = {}
self._debounce_remove = {}
def __del__(self):
self.stop_reading()
def stop_reading(self):
"""Stop reading keycodes."""
if self._pipe is not None:
logger.debug('Sending close msg to reader')
self._pipe[0].send(CLOSE)
self._pipe = None
def clear(self):
"""Next time when reading don't return the previous keycode."""
# just call read to clear the pipe
self.read()
self._unreleased = {}
self.previous_event = None
self.previous_result = None
def start_reading(self, device_name):
"""Tell the evdev lib to start looking for keycodes.
If read is called without prior start_reading, no keycodes
will be available.
"""
if self._pipe is not None:
self.stop_reading()
time.sleep(0.1)
self.virtual_devices = []
for name, group in get_devices().items():
if device_name not in name:
continue
# Watch over each one of the potentially multiple devices per
# hardware
for path in group['paths']:
try:
device = evdev.InputDevice(path)
except FileNotFoundError:
continue
if evdev.ecodes.EV_KEY in device.capabilities():
self.virtual_devices.append(device)
logger.debug(
'Starting reading keycodes from "%s"',
'", "'.join([device.name for device in self.virtual_devices])
)
pipe = multiprocessing.Pipe()
self._pipe = pipe
self._process = threading.Thread(target=self._read_worker)
self._process.start()
def _pipe_event(self, event, device):
"""Write the event into the pipe to the main process."""
# value: 1 for down, 0 for up, 2 for hold.
if self._pipe is None or self._pipe[1].closed:
logger.debug('Pipe closed, reader stops.')
sys.exit(0)
if event.type == EV_KEY and event.value == 2:
# ignore hold-down events
return
click_events = [
evdev.ecodes.BTN_LEFT,
evdev.ecodes.BTN_TOOL_DOUBLETAP
]
if event.type == EV_KEY and event.code in click_events:
# disable mapping the left mouse button because it would break
# the mouse. Also it is emitted right when focusing the row
# which breaks the current workflow.
return
if not utils.should_map_event_as_btn(device, event, custom_mapping):
return
self._pipe[1].send(event)
def _read_worker(self):
"""Thread that reads keycodes and buffers them into a pipe."""
# using a thread that blocks instead of read_one made it easier
# to debug via the logs, because the UI was not polling properly
# at some point which caused logs for events not to be written.
rlist = {device.fd: device for device in self.virtual_devices}
rlist[self._pipe[1]] = self._pipe[1]
while True:
ready = select.select(rlist, [], [])[0]
for fd in ready:
readable = rlist[fd] # a device or a pipe
if isinstance(readable, multiprocessing.connection.Connection):
msg = readable.recv()
if msg == CLOSE:
logger.debug('Reader stopped')
return
continue
try:
for event in rlist[fd].read():
self._pipe_event(event, readable)
except OSError:
logger.debug(
'Device "%s" disappeared from the reader',
rlist[fd].path
)
del rlist[fd]
def get_unreleased_keys(self):
"""Get a Key object of the current keyboard state."""
unreleased = list(self._unreleased.values())
if len(unreleased) == 0:
return None
return Key(*unreleased)
def _release(self, type_code):
"""Modify the state to recognize the releasing of the key."""
if type_code in self._unreleased:
del self._unreleased[type_code]
if type_code in self._debounce_remove:
del self._debounce_remove[type_code]
def _debounce_start(self, event_tuple):
"""Act like the key was released if no new event arrives in time."""
if not will_report_up(event_tuple[0]):
self._debounce_remove[event_tuple[:2]] = DEBOUNCE_TICKS
def _debounce_tick(self):
"""If the counter reaches 0, the key is not considered held down."""
for type_code in list(self._debounce_remove.keys()):
if type_code not in self._unreleased:
continue
# clear wheel events from unreleased after some time
if self._debounce_remove[type_code] == 0:
logger.key_spam(
self._unreleased[type_code],
'Considered as released'
)
self._release(type_code)
else:
self._debounce_remove[type_code] -= 1
def read(self):
"""Get the newest key/combination as Key object.
Only reports keys from down-events.
On key-down events the pipe returns changed combinations. Release
events won't cause that and the reader will return None as in
"nothing new to report". So In order to change a combination, one
of its keys has to be released and then a different one pressed.
Otherwise making combinations wouldn't be possible. Because at
some point the keys have to be released, and that shouldn't cause
the combination to get trimmed.
If the timing of two recent events is very close, prioritize
key events over abs events.
"""
# this is in some ways similar to the keycode_mapper and
# event_producer, but its much simpler because it doesn't
# have to trigger anything, manage any macros and only
# reports key-down events. This function is called periodically
# by the window.
if self._pipe is None:
self.fail_counter += 1
if self.fail_counter % 10 == 0: # spam less
logger.debug('No pipe available to read from')
return None
# remember the prevous down-event from the pipe in order to
# be able to prioritize events, and to be able to tell if the reader
# should return the updated combination
previous_event = self.previous_event
key_down_received = False
self._debounce_tick()
while self._pipe[0].poll():
# loop over all new and unhandled events
event = self._pipe[0].recv()
event_tuple = (event.type, event.code, event.value)
type_code = (event.type, event.code)
if event.value == 0:
logger.key_spam(event_tuple, 'release')
self._release(type_code)
continue
key_down_received = True
if self._unreleased.get(type_code) == event_tuple:
if event.type != EV_ABS: # spams a lot
logger.key_spam(event_tuple, 'duplicate key down')
self._debounce_start(event_tuple)
continue
delta = event_unix_time(event) - event_unix_time(previous_event)
if delta < FILTER_THRESHOLD:
if prioritize([previous_event, event]) == previous_event:
# two events happened very close, probably some weird
# spam from the device. The wacom intuos 5 adds an
# ABS_MISC event to every button press, filter that out
logger.key_spam(event_tuple, 'ignoring new event')
continue
# the previous event of the previous iteration is ignored.
# clean stuff up to remove its side effects
prev_tuple = (
previous_event.type,
previous_event.code,
previous_event.value
)
if prev_tuple[:2] in self._unreleased:
logger.key_spam(prev_tuple, 'ignoring previous event')
self._release(prev_tuple[:2])
# to keep track of combinations.
# "I have got this release event, what was this for?" A release
# event for a D-Pad axis might be any direction, hence this maps
# from release to input in order to remember it. Since all release
# events have value 0, the value is not used in the key.
logger.key_spam(event_tuple, 'down')
self._unreleased[type_code] = event_tuple
self._debounce_start(event_tuple)
previous_event = event
if not key_down_received:
# This prevents writing a subset of the combination into
# result after keys were released. In order to control the gui,
# they have to be released.
return None
self.previous_event = previous_event
if len(self._unreleased) > 0:
result = Key(*self._unreleased.values())
if result == self.previous_result:
# don't return the same stuff twice
return None
self.previous_result = result
logger.key_spam(result.keys, 'read result')
return result
return None
keycode_reader = _KeycodeReader()