Viewing file: config.py (9.94 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT
from __future__ import absolute_import
import json import os import pwd import traceback from typing import Iterable, Optional
from clwpos.optimization_features import ALL_OPTIMIZATION_FEATURES from clwpos.logsetup import setup_logging from clwpos.utils import ( get_relative_docroot, create_clwpos_dir_if_not_exists, is_run_under_user ) from clcommon.clwpos_lib import is_wp_path from clwpos import constants from clwpos.cl_wpos_exceptions import WposError from clwpos import gettext as _
class ConfigError(WposError): """ Used for all exceptions during handling clwpos user config in UserConfig methods """ pass
class UserConfig(object): """ Class to manage clwpos user config - read, write, set params in config. """ CONFIG_PATH = os.path.join("{homedir}", constants.USER_WPOS_DIR, constants.USER_CLWPOS_CONFIG) DEFAULT_MAX_CACHE_MEMORY = f"{constants.DEFAULT_MAX_CACHE_MEMORY}mb" DEFAULT_CONFIG = {"docroots": {}, "max_cache_memory": DEFAULT_MAX_CACHE_MEMORY}
def __init__(self, username): if not is_run_under_user(): raise ConfigError(_("Trying to use UserConfig class as root"))
self.username = username self.homedir = pwd.getpwnam(username).pw_dir self.config_path = self.CONFIG_PATH.format(homedir=self.homedir) self._logger = setup_logging(__name__) create_clwpos_dir_if_not_exists(username)
def read_config(self): """ Reads config from self.config_path """ try: with open(self.config_path, "r") as f: return json.loads(f.read()) except Exception: exc_string = traceback.format_exc() raise ConfigError( message=_("Error while reading config %(config_path)s: %(exception_string)s"), context={"config_path": self.config_path, "exception_string": exc_string} )
def write_config(self, config: dict): """ Writes config (as json) to self.config_path """ try: config_json = json.dumps(config, indent=4, sort_keys=True) with open(self.config_path, "w") as f: f.write(config_json) except Exception as e: raise ConfigError( message=_("Attempt of writing to config file failed due to error:\n%(exception)s"), context={"exception": e} )
def is_default_config(self): """ Checks if user customized his config already. """ return not os.path.exists(self.config_path)
def get_config(self): """ Returns default config or config content from self.config_path """ # if config file is not exists, returns DEFAULT CONFIG if self.is_default_config(): return self.DEFAULT_CONFIG
# Otherwise, reads config from file # and returns it if it's not broken try: config = self.read_config() except ConfigError: return self.DEFAULT_CONFIG return config if isinstance(config, dict) else self.DEFAULT_CONFIG
def set_params(self, params: dict): """ Set outer (not "docroots") params in config. Example: Old config: { "docroots": ..., "max_cache_memory": "123mb", } Input params: { "max_cache_memory": "1024mb", "param": "value" } New config: { "docroots": ..., "max_cache_memory": "1024mb", "param": "value" } """ config = self.get_config() for key, value in params.items(): config[key] = value self.write_config(config)
def is_module_enabled( self, domain: str, wp_path: str, module: str, config: Optional[dict] = None) -> bool: config = config or self.get_config() try: docroot = get_relative_docroot(domain, self.homedir) except Exception as e: self._logger.warning(e, exc_info=True) raise ConfigError( message=_("Can't find docroot for domain '%(domain)s' and homedir '%(homedir)s'"), context={"domain": domain, "homedir": self.homedir} )
if not is_wp_path(os.path.join(self.homedir, docroot, wp_path)): raise ConfigError( message=_("Wrong wordpress path '%(wp_path)s' passed"), context={"wp_path": wp_path} )
if module not in ALL_OPTIMIZATION_FEATURES: raise ConfigError( message=_("Invalid feature %(feature)s, available choices: %(choices)s"), context={"feature": module, "choices": ALL_OPTIMIZATION_FEATURES} )
try: docroots = config["docroots"] module_info = docroots.get(docroot, {}).get(wp_path, []) return module in module_info except (KeyError, AttributeError, TypeError) as e: self._logger.warning(f"config {self.config_path} is broken: {e}", exc_info=True) raise ConfigError( message=_("Config is broken.\nRepair %(config_path)s or restore from backup."), context={"config_path": self.config_path} )
def disable_module(self, domain: str, wp_path: str, module: str) -> None: try: docroot = get_relative_docroot(domain, self.homedir) except Exception as e: self._logger.exception(e) raise ConfigError( message=_("Docroot for domain '%(domain)s' is not found"), context={"domain": domain} )
if not is_wp_path(os.path.join(self.homedir, docroot, wp_path)): raise ConfigError( message=_("Wrong wordpress path '%(wp_path)s' passed"), context={"wp_path": wp_path} )
if module not in ALL_OPTIMIZATION_FEATURES: raise ConfigError( message=_("Invalid feature %(feature)s, available choices: %(choices)s"), context={"feature": module, "choices": ALL_OPTIMIZATION_FEATURES} )
config = self.get_config() # check here as well that config has expected structure if not self.is_module_enabled(domain, wp_path, module, config): return
# remove module from the list config["docroots"][docroot][wp_path].remove(module)
# delete wp_path if all modules are disabled if not config["docroots"][docroot][wp_path]: del config["docroots"][docroot][wp_path]
# delete docroot in it doesn't have wordpresses if not config["docroots"][docroot]: del config["docroots"][docroot]
self.write_config(config)
def enable_module(self, domain: str, wp_path: str, feature: str) -> None: try: docroot = get_relative_docroot(domain, self.homedir) except Exception as e: self._logger.exception(e) raise ConfigError( message=_("Docroot for domain '%(domain)s' is not found"), context={"domain": domain} )
if not is_wp_path(os.path.join(self.homedir, docroot, wp_path)): raise ConfigError( message=_("Wrong wordpress path '%(wp_path)s' passed"), context={"wp_path": wp_path} )
if feature not in ALL_OPTIMIZATION_FEATURES: raise ConfigError( message=_("Invalid feature %(feature)s, available choices: %(choices)s"), context={"feature": feature, "choices": ALL_OPTIMIZATION_FEATURES} )
config = self.get_config() # check here as well that config has expected structure if self.is_module_enabled(domain, wp_path, feature, config): return
if "docroots" not in config: config["docroots"] = {}
if docroot not in config["docroots"]: config["docroots"][docroot] = {}
if wp_path not in config["docroots"][docroot]: config["docroots"][docroot][wp_path] = []
config["docroots"][docroot][wp_path].append(feature) self.write_config(config)
def enabled_modules(self): for doc_root, doc_root_info in self.get_config()["docroots"].items(): for wp_path, module_names in doc_root_info.items(): for name in module_names: yield doc_root, wp_path, name
def wp_paths_with_enabled_module(self, module_name: str) -> Iterable[str]: """ Return absolute WP paths with specified module enabled. """ for doc_root, wp_path, name in self.enabled_modules(): if name == module_name: yield os.path.join(self.homedir, doc_root, wp_path)
def wp_paths_with_active_suite_features(self, features_set: set): """ Unique set of sites with active features from feature set SET is used here, because one site may have several features activated from one set e.g: site1 with activated object_cache, shortcodes = 1 path """ sites = set() for feature in features_set: sites_with_enabled_feature = self.wp_paths_with_enabled_module(feature) for site in sites_with_enabled_feature: sites.add(site) return sites
def get_enabled_sites_count_by_modules(self, checked_module_names): """ Returns count of sites with enabled module """ sites_count = 0 for _, doc_root_info in self.get_config().get('docroots', {}).items(): for _, module_names in doc_root_info.items(): sites_count += any(checked_module_name in module_names for checked_module_name in checked_module_names) return sites_count
|