|
|
@ -24,13 +24,13 @@
|
|
|
|
|
|
|
|
|
|
|
|
import subprocess
|
|
|
|
import subprocess
|
|
|
|
import time
|
|
|
|
import time
|
|
|
|
import threading
|
|
|
|
import multiprocessing
|
|
|
|
import asyncio
|
|
|
|
import asyncio
|
|
|
|
|
|
|
|
|
|
|
|
import evdev
|
|
|
|
import evdev
|
|
|
|
|
|
|
|
|
|
|
|
from keymapper.logger import logger
|
|
|
|
from keymapper.logger import logger
|
|
|
|
from keymapper.cli import apply_empty_symbols
|
|
|
|
from keymapper.cli import apply_empty_symbols, setxkbmap
|
|
|
|
from keymapper.getdevices import get_devices
|
|
|
|
from keymapper.getdevices import get_devices
|
|
|
|
from keymapper.mapping import custom_mapping, system_mapping
|
|
|
|
from keymapper.mapping import custom_mapping, system_mapping
|
|
|
|
|
|
|
|
|
|
|
@ -47,47 +47,47 @@ def can_grab(path):
|
|
|
|
return p.returncode == 1
|
|
|
|
return p.returncode == 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class KeycodeReader:
|
|
|
|
class KeycodeInjector:
|
|
|
|
"""Keeps reading keycodes in the background for the UI to use.
|
|
|
|
"""Keeps injecting keycodes in the background based on the mapping."""
|
|
|
|
|
|
|
|
|
|
|
|
When a button was pressed, the newest keycode can be obtained from this
|
|
|
|
|
|
|
|
object.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, device):
|
|
|
|
def __init__(self, device):
|
|
|
|
self.device = device
|
|
|
|
self.device = device
|
|
|
|
self.virtual_devices = []
|
|
|
|
self.virtual_devices = []
|
|
|
|
|
|
|
|
self.processes = []
|
|
|
|
self.start_injecting()
|
|
|
|
self.start_injecting()
|
|
|
|
|
|
|
|
|
|
|
|
def clear(self):
|
|
|
|
def _start_injecting_worker(self, path, mapping):
|
|
|
|
"""Next time when reading don't return the previous keycode."""
|
|
|
|
|
|
|
|
# read all of them to clear the buffer or whatever
|
|
|
|
|
|
|
|
for virtual_device in self.virtual_devices:
|
|
|
|
|
|
|
|
while virtual_device.read_one():
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def start_injecting_worker(self, path):
|
|
|
|
|
|
|
|
"""Inject keycodes for one of the virtual devices."""
|
|
|
|
"""Inject keycodes for one of the virtual devices."""
|
|
|
|
|
|
|
|
# TODO test
|
|
|
|
loop = asyncio.new_event_loop()
|
|
|
|
loop = asyncio.new_event_loop()
|
|
|
|
asyncio.set_event_loop(loop)
|
|
|
|
asyncio.set_event_loop(loop)
|
|
|
|
device = evdev.InputDevice(path)
|
|
|
|
device = evdev.InputDevice(path)
|
|
|
|
|
|
|
|
# foo = evdev.InputDevice('/dev/input/event2')
|
|
|
|
keymapper_device = evdev.UInput(
|
|
|
|
keymapper_device = evdev.UInput(
|
|
|
|
name='key-mapper',
|
|
|
|
name='key-mapper',
|
|
|
|
phys='key-mapper-uinput'
|
|
|
|
phys='key-mapper-uinput'
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug(
|
|
|
|
|
|
|
|
'Started injecting into %s, fd %s',
|
|
|
|
|
|
|
|
device.path, keymapper_device.fd
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
for event in device.read_loop():
|
|
|
|
for event in device.read_loop():
|
|
|
|
if event.type != evdev.ecodes.EV_KEY:
|
|
|
|
if event.type != evdev.ecodes.EV_KEY:
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print('got', event.code, event.value, 'from device')
|
|
|
|
|
|
|
|
|
|
|
|
# this happens to report key codes that are 8 lower
|
|
|
|
# this happens to report key codes that are 8 lower
|
|
|
|
# than the ones reported by xev and that X expects
|
|
|
|
# than the ones reported by xev and that X expects
|
|
|
|
input_keycode = event.code + 8
|
|
|
|
input_keycode = event.code + 8
|
|
|
|
|
|
|
|
|
|
|
|
character = custom_mapping.get_character(input_keycode)
|
|
|
|
character = mapping.get_character(input_keycode)
|
|
|
|
|
|
|
|
|
|
|
|
if character is None:
|
|
|
|
if character is None:
|
|
|
|
# unknown keycode, forward it
|
|
|
|
# unknown keycode, forward it
|
|
|
|
target_keycode = input_keycode
|
|
|
|
target_keycode = input_keycode
|
|
|
|
|
|
|
|
continue
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
target_keycode = system_mapping.get_keycode(character)
|
|
|
|
target_keycode = system_mapping.get_keycode(character)
|
|
|
|
if target_keycode is None:
|
|
|
|
if target_keycode is None:
|
|
|
@ -99,24 +99,60 @@ class KeycodeReader:
|
|
|
|
# turns out, if I don't sleep here X/Linux gets confused. Lets
|
|
|
|
# turns out, if I don't sleep here X/Linux gets confused. Lets
|
|
|
|
# assume a mapping of 10 to z. Without sleep it would always
|
|
|
|
# assume a mapping of 10 to z. Without sleep it would always
|
|
|
|
# result in 1z 1z 1z. Even though the empty xkb symbols file
|
|
|
|
# result in 1z 1z 1z. Even though the empty xkb symbols file
|
|
|
|
# was applied on the mouse! And I really made sure .write was
|
|
|
|
# was applied on the mouse! And I really made sure `write` was
|
|
|
|
# not called twice. 1 just somewhow sneaks past the symbols.
|
|
|
|
# not called twice. '1' just somewhow sneaks past the symbols.
|
|
|
|
# 0.0005 has many errors. 0.001 has them super rare.
|
|
|
|
# 0.0005 has many errors. 0.001 has them super rare.
|
|
|
|
# 5ms is still faster than anything on the planet so that's.
|
|
|
|
# 5ms is still faster than anything on the planet so that's.
|
|
|
|
# fine. I came up with that after randomly poking around in,
|
|
|
|
# fine. I came up with that after randomly poking around in,
|
|
|
|
# frustration. I don't know of any helpful resource that
|
|
|
|
# frustration. I don't know of any helpful resource that
|
|
|
|
# explains this
|
|
|
|
# explains this
|
|
|
|
time.sleep(0.005)
|
|
|
|
time.sleep(0.01)
|
|
|
|
|
|
|
|
"""if event.value == 2:
|
|
|
|
|
|
|
|
print('device simulated up', event.value, 0)
|
|
|
|
|
|
|
|
device.write(
|
|
|
|
|
|
|
|
evdev.ecodes.EV_KEY,
|
|
|
|
|
|
|
|
event.code,
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
device.write(evdev.ecodes.EV_SYN, evdev.ecodes.SYN_REPORT, 0)"""
|
|
|
|
|
|
|
|
|
|
|
|
# TODO test for the stuff put into write
|
|
|
|
# TODO test for the stuff put into write
|
|
|
|
keymapper_device.write(evdev.ecodes.EV_KEY, target_keycode - 8, event.value)
|
|
|
|
"""logger.debug(
|
|
|
|
|
|
|
|
'Injecting %s -> %s -> %s',
|
|
|
|
|
|
|
|
input_keycode,
|
|
|
|
|
|
|
|
character,
|
|
|
|
|
|
|
|
target_keycode,
|
|
|
|
|
|
|
|
)"""
|
|
|
|
|
|
|
|
print('km write', target_keycode - 8, event.value)
|
|
|
|
|
|
|
|
keymapper_device.write(
|
|
|
|
|
|
|
|
evdev.ecodes.EV_KEY,
|
|
|
|
|
|
|
|
target_keycode - 8,
|
|
|
|
|
|
|
|
event.value
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# the second device that starts writing an event.value of 2 will
|
|
|
|
|
|
|
|
# take ownership of what is happening. Following example:
|
|
|
|
|
|
|
|
# (KB = keyboard, example devices)
|
|
|
|
|
|
|
|
# hold a on KB1:
|
|
|
|
|
|
|
|
# a-1, a-2, a-2, a-2, ...
|
|
|
|
|
|
|
|
# hold shift on KB2:
|
|
|
|
|
|
|
|
# shift-2, shift-2, shift-2, ...
|
|
|
|
|
|
|
|
# No a-2 on KB1 happening anymore. The xkb symbols of KB2 will
|
|
|
|
|
|
|
|
# be used! So if KB2 maps shift+a to b, it will write b, even
|
|
|
|
|
|
|
|
# though KB1 maps shift+a to c! And if you reverse this, hold
|
|
|
|
|
|
|
|
# shift on KB2 first and then a on KB1, the xkb mapping of KB1
|
|
|
|
|
|
|
|
# will take effect and write c!
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# foo.write(evdev.ecodes.EV_SYN, evdev.ecodes.SYN_REPORT, 0)
|
|
|
|
keymapper_device.syn()
|
|
|
|
keymapper_device.syn()
|
|
|
|
|
|
|
|
|
|
|
|
def start_injecting(self):
|
|
|
|
def start_injecting(self):
|
|
|
|
"""Read keycodes and inject the mapped character forever."""
|
|
|
|
"""Read keycodes and inject the mapped character forever."""
|
|
|
|
|
|
|
|
self.stop_injecting()
|
|
|
|
|
|
|
|
|
|
|
|
paths = get_devices()[self.device]['paths']
|
|
|
|
paths = get_devices()[self.device]['paths']
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug(
|
|
|
|
logger.info(
|
|
|
|
'Starting injecting the mapping for %s on %s',
|
|
|
|
'Starting injecting the mapping for %s on %s',
|
|
|
|
self.device,
|
|
|
|
self.device,
|
|
|
|
', '.join(paths)
|
|
|
|
', '.join(paths)
|
|
|
@ -126,22 +162,24 @@ class KeycodeReader:
|
|
|
|
|
|
|
|
|
|
|
|
# Watch over each one of the potentially multiple devices per hardware
|
|
|
|
# Watch over each one of the potentially multiple devices per hardware
|
|
|
|
for path in paths:
|
|
|
|
for path in paths:
|
|
|
|
threading.Thread(
|
|
|
|
worker = multiprocessing.Process(
|
|
|
|
target=self.start_injecting_worker,
|
|
|
|
target=self._start_injecting_worker,
|
|
|
|
args=(path,)
|
|
|
|
args=(path, custom_mapping)
|
|
|
|
).start()
|
|
|
|
)
|
|
|
|
|
|
|
|
worker.start()
|
|
|
|
def read(self):
|
|
|
|
self.processes.append(worker)
|
|
|
|
"""Get the newest key or None if none was pressed."""
|
|
|
|
|
|
|
|
newest_keycode = None
|
|
|
|
def stop_injecting(self):
|
|
|
|
for virtual_device in self.virtual_devices:
|
|
|
|
"""Stop injecting keycodes."""
|
|
|
|
while True:
|
|
|
|
# TODO test
|
|
|
|
event = virtual_device.read_one()
|
|
|
|
logger.info('Stopping injecting keycodes')
|
|
|
|
if event is None:
|
|
|
|
for i, process in enumerate(self.processes):
|
|
|
|
break
|
|
|
|
if process is None:
|
|
|
|
if event.type == evdev.ecodes.EV_KEY and event.value == 1:
|
|
|
|
continue
|
|
|
|
# value: 1 for down, 0 for up, 2 for hold.
|
|
|
|
|
|
|
|
# this happens to report key codes that are 8 lower
|
|
|
|
if process.is_alive():
|
|
|
|
# than the ones reported by xev
|
|
|
|
process.terminate()
|
|
|
|
newest_keycode = event.code + 8
|
|
|
|
self.processes[i] = None
|
|
|
|
return newest_keycode
|
|
|
|
|
|
|
|
|
|
|
|
# apply the default layout back
|
|
|
|
|
|
|
|
setxkbmap(self.device)
|
|
|
|