Viewing file: cpanel.py (28.99 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT
from __future__ import absolute_import
import json import os import pwd import re import subprocess from dataclasses import dataclass from functools import lru_cache from pathlib import Path from typing import Optional, Dict, Tuple, Union, List from pkg_resources import parse_version
from clcommon.cpapi import userdomains from clcommon.clwpos_lib import find_wp_paths, get_wp_cache_plugin from clcommon.lib.cledition import is_cl_solo_edition from secureio import write_file_via_tempfile
from clwpos.constants import ( WP_CLI_EXTENSIONS, WP_CLI_ENABLE_ALL_FUNCTIONS, RedisRequiredConstants, EA_PHP_PREFIX, CAGEFSCTL, CLSOP_ZIP_PATH ) from clwpos.cl_wpos_exceptions import WposError, PhpBrokenException, WpCliCommandError from clwpos.daemon import WposDaemon from clwpos.logsetup import setup_logging, ADMIN_LOGFILE_PATH, USER_LOGFILE_PATH from clwpos import gettext as _
from clwpos.utils import ( WposUser, is_run_under_user, check_domain, home_dir, clear_redis_cache_config, create_redis_cache_config, daemon_communicate, PHP, wp_cli_compatibility_check, run_in_cagefs_if_needed, create_pid_file, user_name, user_uid, litespeed_is_running )
_logger = setup_logging(__name__)
BASE_CPANEL_EA_PHP_DIR = '/opt/cpanel'
def _get_php_handler(vhost: str) -> str: result = uapi("php_get_domain_handler", {"type": "vhost", "vhost": vhost}) return result["php_handler"]
def _get_doc_roots_info() -> dict: user = pwd.getpwuid(os.geteuid()).pw_name result = {} for domain, doc_root in userdomains(user): result.setdefault(doc_root, []).append(domain)
return result
def _add_wp_path_info(user_info: dict) -> dict: wp_paths = {} for doc_root, domains in user_info.items(): # excludes only affects subpaths of doc_root excludes = list(user_info) item = { "domains": domains, "wp_paths": list(find_wp_paths(doc_root, excludes=excludes)) } wp_paths[doc_root] = item return wp_paths
def _wp_info(doc_root: str, wp_path: str) -> dict: """Convert WP path to {"path": str, "version": str}""" absolute_wp_path = Path(doc_root, wp_path) version_file = list(absolute_wp_path.glob("wp-includes/version.php"))[0] result = subprocess.run(["/bin/grep", "-Po", "(?<=wp_version = ')[^']+", version_file], capture_output=True) wp_version = result.stdout.strip().decode() return { "path": wp_path, "version": wp_version, }
def _add_wp_info(user_info: dict) -> dict: for doc_root, doc_root_info in user_info.items(): wp_paths = doc_root_info.pop("wp_paths") doc_root_info["wps"] = [_wp_info(doc_root, wp_path) for wp_path in wp_paths] return user_info
def _get_data_from_info_json(attribute: str) -> List: """ Return attribute's value from info.json file. """ from clwpos.feature_suites import get_admin_config_directory
admin_config_dir = get_admin_config_directory(user_uid()) info_json = os.path.join(admin_config_dir, "info.json")
try: with open(info_json) as f: return json.load(f)[attribute] except (OSError, KeyError, json.JSONDecodeError) as e: _logger.exception("Error during reading of \"info.json\" file: %s", e) return []
def _php_get_vhost_versions(): """ @return: [ { "account": "rm3", "account_owner": "root", "documentroot": "/home/example/public_html", "homedir": "/home/rm3", "is_suspended": 0, "main_domain": 1, "php_fpm": 1, "php_fpm_pool_parms": { "pm_max_children": 5, "pm_max_requests": 20, "pm_process_idle_timeout": 10 }, "phpversion_source": [ { "domain": "example.com", "system_default": 1 } ], "version": "ea-php72", "vhost": "otherchars.rm3.tld" } ] """ try: return daemon_communicate({"command": WposDaemon.DAEMON_PHP_GET_VHOST_VERSIONS_COMMAND})["data"] except WposError: return _get_data_from_info_json("vhost_versions")
def _php_get_installed_versions(): """ @return: [ "ea-php74" ] """ try: return daemon_communicate({"command": WposDaemon.DAEMON_PHP_GET_INSTALLED_VERSIONS_COMMAND})["data"] except WposError: return _get_data_from_info_json("installed_versions")
def php_info(): """ Returns php info, example: [{'vhost': 'sub.wposuser.com', 'account': 'stackoverflow', 'phpversion_source': {'domain': 'sub.wposuser.com'}, 'version': 'ea-php80', 'account_owner': 'root', 'php_fpm': 1, 'php_fpm_pool_parms': {'pm_process_idle_timeout': 10, 'pm_max_requests': 20, 'pm_max_children': 5}, 'main_domain': 0, 'documentroot': '/home/stackoverflow/public_html', 'homedir': '/home/stackoverflow'}, ...................................................................] """ result = _php_get_vhost_versions() for elem in result: elem["version"] = _normalized_php_version(PHP(elem["version"])) return result
def ea_php_ini_file_path(ini_name: str, php_version: str): """ Builds path to <ini_name>.ini file """ return Path(PHP(php_version).dir()).joinpath(f'root/etc/php.d/{ini_name}')
def get_supported_ea_php(): """ Looks through /opt/cpanel and gets installed phps """ base_dir = Path(BASE_CPANEL_EA_PHP_DIR) minimal_supported = parse_version('ea-php74') supported = [] for item in os.listdir(base_dir): if item.startswith('ea-php') and parse_version(item) >= minimal_supported: supported.append(item) return supported
def configure_redis_extension_for_ea(): """ Sets up redis if needed: - installing package - enables in .ini file """ need_cagefs_update = False wait_child_process = bool(os.environ.get('CL_WPOS_WAIT_CHILD_PROCESS')) php_versions_redis_data = { php: _redis_extension_info(PHP(php)) for php in get_supported_ea_php() } php_versions_to_enable_redis = [ php for php, redis_data in php_versions_redis_data.items() if not redis_data.get('is_present') or not redis_data.get('is_loaded') ] if not php_versions_to_enable_redis: return
with create_pid_file(EA_PHP_PREFIX): for php in php_versions_to_enable_redis: redis_data = php_versions_redis_data.get(php) if not redis_data.get('is_present'): redis_package = f'{php}-php-redis' result = subprocess.run(['yum', '-y', 'install', redis_package], capture_output=True, text=True) if result.returncode != 0 and 'Nothing to do' not in result.stdout: _logger.error('Failed to install package %s, due to reason: %s', redis_package, f'{result.stdout}\n{result.stderr}') continue enable_redis_extension_for_ea(php) need_cagefs_update = True elif not redis_data.get('is_loaded'): enable_redis_extension_for_ea(php) need_cagefs_update = True
if need_cagefs_update and wait_child_process and os.path.isfile(CAGEFSCTL): try: subprocess.run([CAGEFSCTL, '--check-cagefs-initialized'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) except subprocess.CalledProcessError: _logger.info('CageFS in unintialized, skipping force-update') else: subprocess.run([CAGEFSCTL, '--wait-lock', '--force-update'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def enable_redis_extension_for_ea(php_version): """ Enables (if needed) redis extension in .ini config """ path = ea_php_ini_file_path('50-redis.ini', php_version) keyword = 'redis.so' if not os.path.exists(path): _logger.error('Redis extension config: %s is not found, ensure corresponding rpm package installed: %s', str(path), f'{php_version}-php-redis') return with open(path) as f: extension_data = f.readlines()
uncommented_pattern = re.compile(fr'^\s*extension\s*=\s*{keyword}') commented_pattern = re.compile(fr'^\s*;\s*extension\s*=\s*{keyword}') enabled_line = f'extension = {keyword}\n' was_enabled = False lines = []
for line in extension_data: if uncommented_pattern.match(line): return if not was_enabled and commented_pattern.match(line): lines.append(enabled_line) was_enabled = True else: lines.append(line) if not was_enabled: lines.append(enabled_line) write_file_via_tempfile(''.join(lines), path, 0o644)
@lru_cache() def _redis_extension_info(version: PHP) -> dict: is_present = bool(list(version.dir().glob("**/redis.so"))) php_bin_path = version.bin() if os.geteuid() == 0: exec_func = subprocess.run else: exec_func = run_in_cagefs_if_needed
is_loaded = exec_func( f'{php_bin_path} -m | /bin/grep redis', shell=True, executable='/bin/bash', env={} ).returncode == 0 if is_present else False
return { "is_present": is_present, "is_loaded": is_loaded }
def _add_php(user_info: dict) -> dict: """ Updates user_info dict with php data """ result = php_info() for item in result: user_info[item["documentroot"]]["php"] = { "version": item["version"], "fpm": bool(item["php_fpm"]), "handler": _get_php_handler(item["vhost"]) }
return user_info
def _add_object_cache_info(user_info: dict) -> dict: """ Search for 'object-cache.php' files in 'wp-content/plugins' directory in order to find what plugin is being used for object caching. """ for doc_root, doc_root_info in user_info.items(): for wp in doc_root_info["wps"]: plugin = get_wp_cache_plugin(Path(doc_root).joinpath(wp["path"]), "object-cache") wp["object_cache"] = plugin
return user_info
def get_user_info() -> dict: """ Collect info about user. @return { '/home/user/public_html': { 'domains': ['domain.com'], 'wps': [ { 'path': 'wp_path_1', 'version': '5.7.2', 'object_cache': 'redis-cache' } ], 'php': { 'version': 'ea-php74', 'handler': 'cgi', 'redis_extension': False, 'fpm': True } } } """ user_info = _get_doc_roots_info()
for func in (_add_wp_path_info, _add_wp_info, _add_php): user_info = func(user_info)
return user_info
def _get_php_version(abs_wp_path: str) -> PHP: """Return PHP version.""" result = php_info()
items = [] for item in result: if abs_wp_path.startswith(item["documentroot"]): items.append((item["documentroot"], item["version"])) items.sort(reverse=True) return items[0][1]
def _normalized_php_version(version: PHP) -> PHP: """ PHP selector can replace path with symlink. It's a reason why we need normalization. """ if not is_cl_solo_edition(skip_jwt_check=True): command = f"{version.bin()} -i " \ f" | /bin/grep 'Loaded Configuration File'" \ f" | /bin/grep -oE \"(alt|ea).*php[^/]*/\"" result = run_in_cagefs_if_needed(command, shell=True, executable='/bin/bash', env={})
if result.stderr and not result.stdout: raise PhpBrokenException(str(version.bin()), result.stderr)
return PHP(result.stdout.strip().strip("/").replace("/", "-"))
return version
def filter_php_versions_with_not_loaded_redis(php_versions: List[PHP]) -> List[PHP]: """ Filter list of given php versions to find out for which redis extension is presented but not loaded. """ php_versions_with_not_loaded_redis = [] for version in php_versions: php_redis_info = _redis_extension_info(version) if not php_redis_info['is_loaded'] and php_redis_info['is_present']: php_versions_with_not_loaded_redis.append(version) return php_versions_with_not_loaded_redis
@lru_cache(maxsize=None) def get_cached_php_installed_versions() -> List[PHP]: """ List all installed php version on the system :return: installed php version """ result = _php_get_installed_versions() return [PHP(version) for version in result]
@lru_cache(maxsize=None) def get_cached_php_versions_with_redis_loaded() -> set: """ List all installed php version on the system which has redis-extension enabled :return: installed php versions which has redis-extension """ versions = get_cached_php_installed_versions() return {version for version in versions if _redis_extension_info(version)["is_loaded"]}
@lru_cache(maxsize=None) def get_cached_php_versions_with_redis_present() -> set: """ List all installed php version on the system which has redis-extension installed :return: installed php versions which has redis-extension installed """ versions = get_cached_php_installed_versions() return {version for version in versions if _redis_extension_info(version)["is_present"]}
def uapi(function: str, input_parameters: Optional[Dict[str, str]] = None): input_parameters_as_list = [f"{key}={value}" for key, value in input_parameters.items()] if input_parameters else [] result = run_in_cagefs_if_needed( ["/usr/bin/uapi", "--output=json", "LangPHP", function, *input_parameters_as_list], env={} )
return json.loads(result.stdout)["result"]["data"]
def is_multisite(path: str) -> bool: marker = 'cl_multisite_detected' command = 'if ( is_multisite() ) { echo "%s"; }' % marker result = wordpress(path, 'eval', command) if isinstance(result, WordpressError): raise WposError(message=result.message, context=result.context) return marker in result
def wp_get_constant(wp_path: str, constant: str, raise_exception=False) -> Optional[str]: """ Get: - defined constant value - None in case of error - empty string if no such constant found """ command = "if (defined('%(const)s')) { echo %(const)s; }" % {'const': constant} result = wordpress(wp_path, 'eval', command) if isinstance(result, WordpressError): if raise_exception: raise WpCliCommandError(message=result.message, context=result.context) _logger.error('Error during get WP constant: %s', result) return None return result
def diagnose_redis_connection_constants(docroot: str, wordpress_path: str): """ Check required constants for redis connection establishment """ redis_schema = wp_get_constant(os.path.join(docroot, wordpress_path), RedisRequiredConstants.WP_REDIS_SCHEME.name, raise_exception=True) if not redis_schema and redis_schema != RedisRequiredConstants.WP_REDIS_SCHEME.value: raise WposError('WordPress constant "%(constant)s" is not defined or defined with wrong value %(value)s', context={'constant': RedisRequiredConstants.WP_REDIS_SCHEME.name, 'value': redis_schema}) socket = wp_get_constant(os.path.join(docroot, wordpress_path), RedisRequiredConstants.WP_REDIS_PATH.name, raise_exception=True) if not socket: raise WposError('WordPress constant "%(constant)s" is not defined', context={'constant': RedisRequiredConstants.WP_REDIS_PATH.name}) if not os.path.exists(socket): raise WposError('Redis socket %(socket)s does not exist in the system', context={'socket': socket})
def _get_saved_php_version(abs_wp_path: str) -> Optional[PHP]: """ Get domain's php version from a previously saved file. """ if not is_run_under_user(): raise WposError('Internal Error. Contact CloudLinux support') php_file_id = os.environ.get("CLWPOS_PHP_FILE_ID") php_info_file = WposUser(user_name()).php_info.format(file_id=php_file_id) if not os.path.exists(php_info_file): return None try: with open(php_info_file) as f: _php_info = json.load(f) except (OSError, json.decoder.JSONDecodeError) as e: _logger.exception("Error during reading of \".php_info\" file: %s", e) return None
php_versions = [] for vhost_info in _php_info: if abs_wp_path.startswith(vhost_info["documentroot"]): php_versions.append((vhost_info["documentroot"], vhost_info["version"]))
if not php_versions: return None
return PHP(sorted(php_versions, reverse=True)[0][1])
def get_php_version(abs_wp_path: str) -> PHP: """ Return php_version that will be used for calling wp-cli commands. If 'CLWPOS_USE_SAVED_PHP_VERSION' envar is defined, try to get this version from a previously saved file. """ use_saved_php_version = bool(os.environ.get("CLWPOS_USE_SAVED_PHP_VERSION")) if use_saved_php_version: php_version = _get_saved_php_version(abs_wp_path) or _get_php_version(abs_wp_path) else: php_version = _get_php_version(abs_wp_path) return php_version
@dataclass class WordpressError: message: str context: dict
def wordpress(path: str, command: str, subcommand: str, *args, env=None) -> Union[str, WordpressError]: """ Helper to execute wp commands, for example wp --path=<path> plugin install redis-cache wp --path=<path> plugin activate redis-cache wp --path=<path> redis enable wp --path=<path> plugin deactivate redis-cache wp --path=<path> plugin uninstall redis-cache @return: stderr if error was happened. """ php_version = get_php_version(path) php_bin_path = str(php_version.bin()) if not os.path.exists(php_bin_path): _logger.exception("Error during wp-cli command execution \"%s\": " "invalid path to binary file \"%s\"", command, php_bin_path) return WordpressError( message=_("Error during resolving path to php binary file:\n" "got non-existent path \"%(path)s\"."), context={"path": php_bin_path} ) # [attention] compatibility check may raise WpCliUnsupportedException exception wp_cli_compatibility_check(php_bin_path) command_part = ["--path={}".format(path), command, subcommand, *args] full_command = [ php_bin_path, *WP_CLI_EXTENSIONS, # explicitly drop PHP disable_functions directive # in order to avoid errors like # 'Error: Cannot do 'launch': The PHP functions `proc_open()` and/or `proc_close()` are disabled' # during plugin manipulations WP_CLI_ENABLE_ALL_FUNCTIONS, "/opt/clwpos/wp-cli", *command_part ] environment = env or {} try: output = run_in_cagefs_if_needed(full_command, check=True, env=environment) except subprocess.CalledProcessError as error: command = ' '.join(full_command) _logger.exception("Error during command execution: \n%s\n" "stdout=%s\n" "stderr=%s", command, error.stdout, error.stderr) logger_path = ADMIN_LOGFILE_PATH if not os.getuid() else USER_LOGFILE_PATH.format(homedir='HOME') return WordpressError( message=_("Unexpected error happened during command execution: '%(command)s'.\n" "Event is logged to file: '%(logger_path)s' with stdout and stderr recorded."), context={ "command": command, "logger_path": logger_path } )
return output.stdout
class DocRootPath(str): """This class represent path to doc_root.""" pass
class DomainName(str): """This class represent domain name.""" pass
def disable_without_config_affecting( arg: Union[DocRootPath, DomainName], wp_path: str, *, module: str, ) -> Optional[WordpressError]: """ Deactivate and delete specified wordpress module. :param arg: user's docroot or domain :param wp_path: path to user's wordpress directory :param module: module on which to perform disable operations :return: error if error was happened else None """ if isinstance(arg, DomainName): doc_root = check_domain(arg)[-1] elif isinstance(arg, DocRootPath): doc_root = Path(home_dir(), arg) else: raise ValueError("Invalid argument format")
abs_wp_path = str(Path(doc_root).joinpath(wp_path).absolute()) last_error = None
errors = disable_module(module, abs_wp_path) if errors: last_error = errors[-1]
return last_error
def enable_without_config_affecting( arg: Union[DocRootPath, DomainName], wp_path: str, *, module: str, ) -> Tuple[bool, Dict[str, Union[str, dict]]]: """ Install and activate specified wordpress module. :param arg: user's docroot or domain :param wp_path: path to user's wordpress directory :param module: module on which to perform enable operations :return: tuple that consists of enabling status and details """ if isinstance(arg, DomainName): __, doc_root = check_domain(arg) elif isinstance(arg, DocRootPath): doc_root = Path(home_dir(), arg) else: raise ValueError("Invalid argument format")
wp_path = wp_path.lstrip("/") abs_wp_path = str(Path(doc_root).joinpath(wp_path).absolute())
# try to install plugin try: install_module(module, abs_wp_path) except WposError as e: return False, dict( message=_("WordPress plugin installation failed. " "Try again and contact your system administrator if issue persists."), details=e.message, context=e.context )
# try to activate plugin try: enable_module(module, abs_wp_path) except WposError as e: disable_module(module, abs_wp_path) return False, dict( message=_("WordPress plugin activation failed. Changes were reverted and caching module is now disabled. " "Try again and contact your system administrator if issue persists."), details=e.message, context=e.context )
return True, {}
def reload_redis(uid: int = None): """ Make redis reload via CLWPOS daemon :param uid: User uid (optional) """ cmd_dict = {"command": "reload"} if uid: cmd_dict['uid'] = uid daemon_communicate(cmd_dict)
def enable_module(module: str, abs_wp_path: str): """ Enable specified WP optimization feature. """ from clwpos.optimization_features import OBJECT_CACHE_FEATURE, SITE_OPTIMIZATION_FEATURE
module_func_map = { OBJECT_CACHE_FEATURE: enable_redis_object_cache, SITE_OPTIMIZATION_FEATURE: enable_site_optimization_plugin, } module_func_map[module](abs_wp_path)
def enable_redis_object_cache(abs_wp_path: str): """ Enable redis-cache plugin for user. :param abs_wp_path: absolute path to wp site :return: """ res = wordpress(abs_wp_path, "plugin", "activate", "redis-cache") if isinstance(res, WordpressError): raise WposError(message=res.message, context=res.context)
res = wordpress(abs_wp_path, "redis", "enable") if isinstance(res, WordpressError): raise WposError(message=res.message, context=res.context) create_redis_cache_config(abs_wp_path)
def obtain_wp_cli_env(abs_wp_path: str): """ Returns needed envars for wp-cli """ env = None if os.path.exists(os.path.join(abs_wp_path, '.htaccess')): server_software = 'LiteSpeed' if litespeed_is_running() else 'Apache' env = {'SERVER_SOFTWARE': server_software} return env
def enable_site_optimization_plugin(abs_wp_path: str): """ Enable cloudlinux-site-optimization plugin for user. :param abs_wp_path: absolute path to wp site :return: """ env = obtain_wp_cli_env(abs_wp_path) res = wordpress(abs_wp_path, "plugin", "activate", "clsop", env=env) if isinstance(res, WordpressError): raise WposError(message=res.message, context=res.context)
def install_module(module: str, abs_wp_path: str): """ Install specified WP optimization feature. """ from clwpos.optimization_features import OBJECT_CACHE_FEATURE, SITE_OPTIMIZATION_FEATURE
module_func_map = { OBJECT_CACHE_FEATURE: install_redis_cache, SITE_OPTIMIZATION_FEATURE: install_site_optimization_plugin, } module_func_map[module](abs_wp_path)
def install_redis_cache(abs_wp_path: str): """ Install redis-cache plugin for user. :param abs_wp_path: absolute path to wp site :return: """ res = wordpress(abs_wp_path, "plugin", "install", "redis-cache") if isinstance(res, WordpressError): raise WposError(message=res.message, context=res.context)
def install_site_optimization_plugin(abs_wp_path: str): """ Install cloudlinux-site-optimization plugin for user. :param abs_wp_path: absolute path to wp site :return: """ env = obtain_wp_cli_env(abs_wp_path) res = wordpress(abs_wp_path, "plugin", "install", CLSOP_ZIP_PATH, env=env) if isinstance(res, WordpressError): raise WposError(message=res.message, context=res.context)
def disable_module(module: str, abs_wp_path: str) -> List[WordpressError]: """ Deactivate and delete specified WP optimization feature. """ from clwpos.optimization_features import OBJECT_CACHE_FEATURE, SITE_OPTIMIZATION_FEATURE
module_func_map = { OBJECT_CACHE_FEATURE: disable_object_cache, SITE_OPTIMIZATION_FEATURE: disable_site_optimization_plugin, } return module_func_map[module](abs_wp_path)
def disable_object_cache(abs_wp_path: str) -> List[WordpressError]: """ Delete cloudlinux info from wp-config.php, deactivate and delete redis-cache plugin for user. :param abs_wp_path: absolute path to wp site :return: list of errors that occurred during command execution """ errors = [] if is_plugin_activated(abs_wp_path, 'redis-cache'): res = wordpress(abs_wp_path, "plugin", "deactivate", "redis-cache") if isinstance(res, WordpressError): errors.append(res)
if not errors and is_plugin_installed(abs_wp_path, 'redis-cache'): # continue procedure further only if previous step succeeded res = wordpress(abs_wp_path, "plugin", "delete", "redis-cache") if isinstance(res, WordpressError): errors.append(res)
if not errors: # cleanup constants in the end only if deactivation/deletion succeeded, # because it may impact on deactivating/deleting plugin try: clear_redis_cache_config(abs_wp_path) except WposError as err: _logger.exception(err) errors.append(WordpressError(err.message, err.context)) except Exception as e: _logger.exception(e) errors.append( WordpressError( message=_('Unexpected error happened while clearing cache: %(error)s'), context=dict(error=str(e))) )
return errors
def is_plugin_activated(abs_wp_path: str, plugin_name: str, wp_cli_env=None) -> bool: result = wordpress(abs_wp_path, "plugin", "is-active", plugin_name, env=wp_cli_env) return not isinstance(result, WordpressError)
def is_plugin_installed(abs_wp_path: str, plugin_name: str, wp_cli_env=None) -> bool: result = wordpress(abs_wp_path, "plugin", "is-installed", plugin_name, env=wp_cli_env) return not isinstance(result, WordpressError)
def disable_site_optimization_plugin(abs_wp_path: str) -> List[WordpressError]: """ Deactivate and delete cloudlinux-site-optimization plugin for user. :param abs_wp_path: absolute path to wp site :return: list of errors that occurred during command execution """ errors = []
env = obtain_wp_cli_env(abs_wp_path)
if is_plugin_activated(abs_wp_path, 'clsop', env):
res = wordpress(abs_wp_path, "plugin", "deactivate", "clsop", env=env) if isinstance(res, WordpressError): errors.append(res)
if not errors and is_plugin_installed(abs_wp_path, 'clsop', env): # continue procedure further only if previous step succeeded res = wordpress(abs_wp_path, "plugin", "uninstall", "clsop", env=env) if isinstance(res, WordpressError): errors.append(res)
return errors
|