Refactored Migrations (#232)
* refactored migrations moved migration functions to a common migrations file * moved migration tests to test_migrations.py * Simplify all_presets() in keymapper/migrations.py Co-authored-by: Tobi <to.213692@protonmail.ch> * simplifications/style improvements in migrations.py Co-authored-by: Tobi <to.213692@protonmail.ch> * migrations now use version number and we have some new tests for migrations * Added docstrings and updated migrations() * badges, 1.2.2 Co-authored-by: Tobi <to.213692@protonmail.ch> Co-authored-by: sezanzeb <proxima@sezanzeb.de>pull/239/head
parent
b23e6d5340
commit
4713511035
@ -0,0 +1,148 @@
|
|||||||
|
#!/usr/bin/python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# key-mapper - GUI for device specific keyboard mappings
|
||||||
|
# Copyright (C) 2021 sezanzeb <proxima@sezanzeb.de>
|
||||||
|
#
|
||||||
|
# This file is part of key-mapper.
|
||||||
|
#
|
||||||
|
# key-mapper is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# key-mapper is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with key-mapper. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
|
||||||
|
"""Migration functions"""
|
||||||
|
|
||||||
|
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import copy
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
from packaging import version
|
||||||
|
|
||||||
|
from keymapper.logger import logger, VERSION
|
||||||
|
from keymapper.paths import get_preset_path, mkdir, CONFIG_PATH
|
||||||
|
|
||||||
|
|
||||||
|
def all_presets():
|
||||||
|
"""All presets for all groups as list"""
|
||||||
|
preset_path = Path(get_preset_path())
|
||||||
|
presets = []
|
||||||
|
for folder in preset_path.iterdir():
|
||||||
|
if not folder.is_dir():
|
||||||
|
continue
|
||||||
|
|
||||||
|
for preset in folder.iterdir():
|
||||||
|
if preset.suffix == ".json":
|
||||||
|
presets.append(preset)
|
||||||
|
return presets
|
||||||
|
|
||||||
|
|
||||||
|
def config_version():
|
||||||
|
"""Version string in the config.json as packaging.Version"""
|
||||||
|
config_path = os.path.join(CONFIG_PATH, "config.json")
|
||||||
|
config = {}
|
||||||
|
|
||||||
|
if not os.path.exists(config_path):
|
||||||
|
return version.parse("0.0.0")
|
||||||
|
|
||||||
|
with open(config_path, "r") as file:
|
||||||
|
config = json.load(file)
|
||||||
|
|
||||||
|
if "version" in config.keys():
|
||||||
|
return version.parse(config["version"])
|
||||||
|
|
||||||
|
return version.parse("0.0.0")
|
||||||
|
|
||||||
|
|
||||||
|
def _config_suffix():
|
||||||
|
"""append .json suffix to config file"""
|
||||||
|
deprecated_path = os.path.join(CONFIG_PATH, "config")
|
||||||
|
config_path = os.path.join(CONFIG_PATH, "config.json")
|
||||||
|
if os.path.exists(deprecated_path) and not os.path.exists(config_path):
|
||||||
|
logger.info('Moving "%s" to "%s"', deprecated_path, config_path)
|
||||||
|
os.rename(deprecated_path, config_path)
|
||||||
|
|
||||||
|
|
||||||
|
def _preset_path():
|
||||||
|
"""Migrate the folder structure from < 0.4.0.
|
||||||
|
|
||||||
|
Move existing presets into the new subfolder "presets"
|
||||||
|
"""
|
||||||
|
new_preset_folder = os.path.join(CONFIG_PATH, "presets")
|
||||||
|
if os.path.exists(get_preset_path()) or not os.path.exists(CONFIG_PATH):
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info("Migrating presets from < 0.4.0...")
|
||||||
|
groups = os.listdir(CONFIG_PATH)
|
||||||
|
mkdir(get_preset_path())
|
||||||
|
for group in groups:
|
||||||
|
path = os.path.join(CONFIG_PATH, group)
|
||||||
|
if os.path.isdir(path):
|
||||||
|
target = path.replace(CONFIG_PATH, new_preset_folder)
|
||||||
|
logger.info('Moving "%s" to "%s"', path, target)
|
||||||
|
os.rename(path, target)
|
||||||
|
|
||||||
|
logger.info("done")
|
||||||
|
|
||||||
|
|
||||||
|
def _mapping_keys():
|
||||||
|
"""update all preset mappings
|
||||||
|
|
||||||
|
Update all keys in mapping to include value e.g.: "1,5"->"1,5,1"
|
||||||
|
"""
|
||||||
|
if not os.path.exists(get_preset_path()):
|
||||||
|
return # don't execute if there are no presets
|
||||||
|
for preset in all_presets():
|
||||||
|
preset_dict = {}
|
||||||
|
with open(preset, "r") as file:
|
||||||
|
preset_dict = json.load(file)
|
||||||
|
if "mapping" in preset_dict.keys():
|
||||||
|
mapping = copy.deepcopy(preset_dict["mapping"])
|
||||||
|
for key in mapping.keys():
|
||||||
|
if key.count(",") == 1:
|
||||||
|
preset_dict["mapping"][f"{key},1"] = preset_dict["mapping"].pop(key)
|
||||||
|
|
||||||
|
with open(preset, "w") as file:
|
||||||
|
json.dump(preset_dict, file, indent=4)
|
||||||
|
file.write("\n")
|
||||||
|
|
||||||
|
|
||||||
|
def _update_version():
|
||||||
|
"""Write current version string to the config file"""
|
||||||
|
config_file = os.path.join(CONFIG_PATH, "config.json")
|
||||||
|
if not os.path.exists(config_file):
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info("version in config file to %s", VERSION)
|
||||||
|
with open(config_file, "r") as file:
|
||||||
|
config = json.load(file)
|
||||||
|
|
||||||
|
config["version"] = VERSION
|
||||||
|
with open(config_file, "w") as file:
|
||||||
|
json.dump(config, file, indent=4)
|
||||||
|
|
||||||
|
|
||||||
|
def migrate():
|
||||||
|
"""Migrate config files to the current release"""
|
||||||
|
v = config_version()
|
||||||
|
if v < version.parse("0.4.0"):
|
||||||
|
_config_suffix()
|
||||||
|
_preset_path()
|
||||||
|
|
||||||
|
if v < version.parse("1.2.2"):
|
||||||
|
_mapping_keys()
|
||||||
|
|
||||||
|
# add new migrations here
|
||||||
|
|
||||||
|
if v < version.parse(VERSION):
|
||||||
|
_update_version()
|
Before Width: | Height: | Size: 1.0 KiB After Width: | Height: | Size: 1.0 KiB |
@ -0,0 +1,191 @@
|
|||||||
|
#
|
||||||
|
# key-mapper is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# key-mapper is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with key-mapper. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
|
||||||
|
import os
|
||||||
|
import unittest
|
||||||
|
import shutil
|
||||||
|
import json
|
||||||
|
|
||||||
|
from evdev.ecodes import EV_KEY, EV_ABS, ABS_HAT0X, KEY_A
|
||||||
|
|
||||||
|
from keymapper.migrations import migrate, config_version
|
||||||
|
from keymapper.mapping import Mapping, split_key
|
||||||
|
from keymapper.config import config, GlobalConfig
|
||||||
|
from keymapper.paths import touch, CONFIG_PATH, mkdir, get_preset_path
|
||||||
|
from keymapper.key import Key
|
||||||
|
|
||||||
|
from keymapper.logger import logger, VERSION
|
||||||
|
|
||||||
|
from tests.test import quick_cleanup, tmp
|
||||||
|
|
||||||
|
|
||||||
|
class TestMigrations(unittest.TestCase):
|
||||||
|
def tearDown(self):
|
||||||
|
quick_cleanup()
|
||||||
|
self.assertEqual(len(config.iterate_autoload_presets()), 0)
|
||||||
|
|
||||||
|
def test_migrate_suffix(self):
|
||||||
|
old = os.path.join(CONFIG_PATH, "config")
|
||||||
|
new = os.path.join(CONFIG_PATH, "config.json")
|
||||||
|
try:
|
||||||
|
os.remove(new)
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
touch(old)
|
||||||
|
with open(old, "w") as f:
|
||||||
|
f.write("{}")
|
||||||
|
migrate()
|
||||||
|
self.assertTrue(os.path.exists(new))
|
||||||
|
self.assertFalse(os.path.exists(old))
|
||||||
|
|
||||||
|
def test_wont_migrate_suffix(self):
|
||||||
|
old = os.path.join(CONFIG_PATH, "config")
|
||||||
|
new = os.path.join(CONFIG_PATH, "config.json")
|
||||||
|
|
||||||
|
touch(new)
|
||||||
|
with open(new, "w") as f:
|
||||||
|
f.write("{}")
|
||||||
|
|
||||||
|
touch(old)
|
||||||
|
with open(old, "w") as f:
|
||||||
|
f.write("{}")
|
||||||
|
|
||||||
|
migrate()
|
||||||
|
self.assertTrue(os.path.exists(new))
|
||||||
|
self.assertTrue(os.path.exists(old))
|
||||||
|
|
||||||
|
def test_migrate_preset(self):
|
||||||
|
if os.path.exists(tmp):
|
||||||
|
shutil.rmtree(tmp)
|
||||||
|
|
||||||
|
p1 = os.path.join(tmp, "foo1", "bar1.json")
|
||||||
|
p2 = os.path.join(tmp, "foo2", "bar2.json")
|
||||||
|
touch(p1)
|
||||||
|
touch(p2)
|
||||||
|
|
||||||
|
with open(p1, "w") as f:
|
||||||
|
f.write("{}")
|
||||||
|
|
||||||
|
with open(p2, "w") as f:
|
||||||
|
f.write("{}")
|
||||||
|
|
||||||
|
migrate()
|
||||||
|
|
||||||
|
self.assertFalse(os.path.exists(os.path.join(tmp, "foo1", "bar1.json")))
|
||||||
|
self.assertFalse(os.path.exists(os.path.join(tmp, "foo2", "bar2.json")))
|
||||||
|
|
||||||
|
self.assertTrue(
|
||||||
|
os.path.exists(os.path.join(tmp, "presets", "foo1", "bar1.json"))
|
||||||
|
)
|
||||||
|
self.assertTrue(
|
||||||
|
os.path.exists(os.path.join(tmp, "presets", "foo2", "bar2.json"))
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_wont_migrate_preset(self):
|
||||||
|
if os.path.exists(tmp):
|
||||||
|
shutil.rmtree(tmp)
|
||||||
|
|
||||||
|
p1 = os.path.join(tmp, "foo1", "bar1.json")
|
||||||
|
p2 = os.path.join(tmp, "foo2", "bar2.json")
|
||||||
|
touch(p1)
|
||||||
|
touch(p2)
|
||||||
|
|
||||||
|
with open(p1, "w") as f:
|
||||||
|
f.write("{}")
|
||||||
|
|
||||||
|
with open(p2, "w") as f:
|
||||||
|
f.write("{}")
|
||||||
|
|
||||||
|
# already migrated
|
||||||
|
mkdir(os.path.join(tmp, "presets"))
|
||||||
|
|
||||||
|
migrate()
|
||||||
|
|
||||||
|
self.assertTrue(os.path.exists(os.path.join(tmp, "foo1", "bar1.json")))
|
||||||
|
self.assertTrue(os.path.exists(os.path.join(tmp, "foo2", "bar2.json")))
|
||||||
|
|
||||||
|
self.assertFalse(
|
||||||
|
os.path.exists(os.path.join(tmp, "presets", "foo1", "bar1.json"))
|
||||||
|
)
|
||||||
|
self.assertFalse(
|
||||||
|
os.path.exists(os.path.join(tmp, "presets", "foo2", "bar2.json"))
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_migrate_mapping_keys(self):
|
||||||
|
# migrates mappings with only (type, code)
|
||||||
|
path = os.path.join(tmp, "presets", "Foo Device", "test.json")
|
||||||
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||||
|
with open(path, "w") as file:
|
||||||
|
json.dump(
|
||||||
|
{
|
||||||
|
"mapping": {
|
||||||
|
f"{EV_KEY},3": "a",
|
||||||
|
f"{EV_ABS},{ABS_HAT0X},-1": "b",
|
||||||
|
f"{EV_ABS},1,1+{EV_ABS},2,-1+{EV_ABS},3,1": "c",
|
||||||
|
# ignored because broken
|
||||||
|
f"3,1,1,2": "e",
|
||||||
|
f"3": "e",
|
||||||
|
f",,+3,1,2": "g",
|
||||||
|
f"": "h",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
file,
|
||||||
|
)
|
||||||
|
migrate()
|
||||||
|
loaded = Mapping()
|
||||||
|
self.assertEqual(loaded.num_saved_keys, 0)
|
||||||
|
loaded.load(get_preset_path("Foo Device", "test"))
|
||||||
|
self.assertEqual(len(loaded), 3)
|
||||||
|
self.assertEqual(loaded.num_saved_keys, 3)
|
||||||
|
self.assertEqual(loaded.get_symbol(Key(EV_KEY, 3, 1)), "a")
|
||||||
|
self.assertEqual(loaded.get_symbol(Key(EV_ABS, ABS_HAT0X, -1)), "b")
|
||||||
|
self.assertEqual(
|
||||||
|
loaded.get_symbol(Key((EV_ABS, 1, 1), (EV_ABS, 2, -1), Key(EV_ABS, 3, 1))),
|
||||||
|
"c",
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_add_version(self):
|
||||||
|
path = os.path.join(CONFIG_PATH, "config.json")
|
||||||
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||||
|
with open(path, "w") as file:
|
||||||
|
file.write("{}")
|
||||||
|
|
||||||
|
migrate()
|
||||||
|
self.assertEqual(VERSION, config_version().public)
|
||||||
|
|
||||||
|
def test_update_version(self):
|
||||||
|
path = os.path.join(CONFIG_PATH, "config.json")
|
||||||
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||||
|
with open(path, "w") as file:
|
||||||
|
json.dump({"version": "0.1.0"}, file)
|
||||||
|
|
||||||
|
migrate()
|
||||||
|
self.assertEqual(VERSION, config_version().public)
|
||||||
|
|
||||||
|
def test_config_version(self):
|
||||||
|
path = os.path.join(CONFIG_PATH, "config.json")
|
||||||
|
with open(path, "w") as file:
|
||||||
|
file.write("{}")
|
||||||
|
self.assertEqual("0.0.0", config_version().public)
|
||||||
|
|
||||||
|
try:
|
||||||
|
os.remove(path)
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
self.assertEqual("0.0.0", config_version().public)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
Loading…
Reference in New Issue