# -*- coding: utf-8 -*-
# Licensed under the EUPL v1.2
# © 2019-2021 bicobus <bicobus@keemail.me>
import atexit
import gzip
import json
import logging
import os
import shutil
import tempfile
from collections.abc import MutableMapping
from PyQt5.QtCore import QTimer
import appdirs
logger = logging.getLogger(__name__)
dirs = appdirs.AppDirs(appname="qmm", appauthor=False)
[docs]class SettingsNotSetError(Exception):
pass
[docs]def sanitize_value_for_json(value):
if isinstance(value, os.PathLike):
return str(value)
return value
[docs]def get_config_dir(filename=None, extra_directories=None) -> str:
"""Return the full path of the user config dir.
Args:
filename: If provided, gets added at the end of the string.
extra_directories: If provided, extends on the returned path.
"""
config_path = []
if extra_directories and isinstance(extra_directories, list):
config_path.extend(extra_directories)
if filename:
config_path.append(filename)
path = os.path.join(dirs.user_config_dir, *config_path)
return path
[docs]class Config(MutableMapping):
"""Influenced by deluge's config object."""
def __init__(
self, filename, config_dir=None, defaults=None, compress=False, on_load_validators=None
):
self._data = {}
self._save_timer = False
self._compress = compress
self._validators = {}
if defaults:
for key, val in defaults.items():
self[key] = val
if config_dir:
self._filename = os.path.join(config_dir, filename)
else:
self._filename = get_config_dir(filename)
if self._compress:
self._filename = "{}.gz".format(self._filename)
if on_load_validators:
for key, val in on_load_validators.items():
self._validators[key] = val
# self._timer = QTimer(parent)
# Force saving config file at termination of the software
atexit.register(self.save)
self.load(self._filename)
def __len__(self):
return len(self._data)
def __iter__(self):
return iter(self._data)
def __getitem__(self, key):
return self._data[key]
def __setitem__(self, key, value):
if key not in self._data:
self._data[key] = value
return
if self._data[key] == value:
return
self._data[key] = value
logger.debug("Config key state changed, save timer state is: %s", self._save_timer)
if not self._save_timer:
self.delayed_save()
def __delitem__(self, key):
del self._data[key]
logger.debug("Deleting config key, save timer state is: %s", self._save_timer)
if not self._save_timer:
self.delayed_save()
def _get_data_from_file(self, filename=None):
try:
if self._compress:
with gzip.GzipFile(filename, "r") as fp:
json_bytes = fp.read()
data = json.loads(json_bytes.decode("utf-8"))
else:
with open(filename, "r", encoding="utf-8") as f:
data = json.load(f)
except IOError as e:
logger.warning("Unable to load config file %s: %s", filename, e)
return {}
return data
[docs] def load(self, filename=None):
if not filename:
filename = self._filename
if self._compress and os.path.splitext(filename)[1] != ".gz":
filename = "{}.gz".format(filename)
logger.debug("Loading information from settings file: %s", filename)
data = self._get_data_from_file(filename)
for key, val in data.items():
validator = self._validators.get(key)
if val and validator:
try:
v = validator(val)
except ValueError as e:
logger.error("%s: %s", e.args[0], val)
value = None
else:
value = v.data
else:
value = val
self._data[key] = value
[docs] def delayed_save(self, msec=5000):
"""Schedule a save in the future if one isn't already planned."""
if not self._save_timer:
QTimer.singleShot(msec, self.save) # noqa
self._save_timer = True
logger.debug("Initializing delayed save.")
def _disable_save_timer(self):
if self._save_timer:
self._save_timer = False
def _get_data_for_json(self):
d = {}
for k, v in self._data.items():
d.setdefault(k, sanitize_value_for_json(v))
return d
[docs] def save(self, filename=None):
if not filename:
filename = self._filename
elif self._compress and os.path.splitext(filename) != ".gz":
filename = "{}.gz".format(filename)
logger.debug("Saving file %s", filename)
# Do not save anything if the contents are the same.
data = self._get_data_from_file(filename)
if self._data == data:
logger.debug("Save triggered but data is unchanged: doing nothing.")
self._disable_save_timer()
return True
try:
with tempfile.NamedTemporaryFile(delete=False) as fp:
filename_tmp = fp.name
jdump = json.dumps(self._get_data_for_json(), indent=4).encode("utf-8")
if self._compress:
jdump = gzip.compress(jdump)
fp.write(jdump)
fp.flush()
os.fsync(fp) # noqa
filename = os.path.realpath(filename)
os.makedirs(os.path.dirname(filename), exist_ok=True)
if os.path.exists(filename):
shutil.move(filename, "{}.bak".format(filename))
logger.debug("Saving new config to %s", filename)
shutil.move(filename_tmp, filename)
except IOError as e:
logger.error("An error occured while saving the settings:\n%s", e)
return False
else:
logger.debug(
"Check save timer at end of save method. Auto save state: %s", self._save_timer,
)
self._disable_save_timer()
return True