Viewing file: utils.py (33.13 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT
# wpos_lib.py - helper functions for clwpos utility
from __future__ import absolute_import
import os import re import shutil import struct import sys import time import json import pwd import fcntl import uuid import subprocess import psutil from contextlib import contextmanager from glob import iglob from functools import wraps from pathlib import Path from socket import socket, AF_UNIX, SOCK_STREAM from typing import Dict, List, Tuple, Optional, Set import platform
from secureio import write_file_via_tempfile from clcommon.cpapi.cpapiexceptions import NoDomain from clcommon.clpwd import ClPwd from clcommon.lib.cledition import ( is_cl_solo_edition, is_cl_shared_pro_edition, is_cl_admin_edition, CLEditionDetectionError )
from cllicenselib import check_license from clcommon.cpapi import docroot, getCPName, CPANEL_NAME from clcommon.utils import exec_utility, run_command, demote from clwpos import gettext, wp_config from clwpos.cl_wpos_exceptions import ( WposError, WPOSLicenseMissing, WpCliUnsupportedException, WpNotExists, WpConfigWriteFailed, PhpBrokenException ) from clcommon.ui_config import UIConfig from clcommon.clcagefs import in_cagefs, _is_cagefs_enabled from clcommon.const import Feature from clcommon.cpapi import is_panel_feature_supported
from .logsetup import setup_logging
from clwpos.constants import ( USER_WPOS_DIR, WPOS_DAEMON_SOCKET_FILE, CLCONFIG_UTILITY, RedisRequiredConstants, CAGEFS_ENTER_USER_BIN, CAGEFS_ENTER_UTIL, CLWPOS_OPT_DIR, ALT_PHP_PREFIX, EA_PHP_PREFIX, USER_CLWPOS_CONFIG, PUBLIC_OPTIONS, )
from .socket_utils import pack_data_for_socket, read_unpack_response_from_socket_client from .user.website_check.errors import RollbackException
logger = None
def catch_error(func): """ Decorator for catching errors """
def func_wrapper(self, *args, **kwargs): global logger if logger is None: logger = setup_logging(__name__) try: return func(self, *args, **kwargs) except RollbackException as e: error_and_exit(self._is_json, { 'context': e.context, 'result': e.message, 'issues': e.errors }) except WposError as e: if isinstance(e, WPOSLicenseMissing): logger.warning(e) else: logger.exception(e) response = {'context': e.context, 'result': e.message, 'warning': e.warning} if e.details: response['details'] = e.details error_and_exit(self._is_json, response) except Exception as e: logger.exception(e) error_and_exit(self._is_json, {'context': {}, 'result': str(e)})
return func_wrapper
def _print_dictionary(data_dict, is_json: bool = False, is_pretty: bool = False): """ Print specified dictionary :param data_dict: data dictionary to print :param is_json: True - print in JSON, False - in text :param is_pretty: True - pretty json print, False - none (default) :return: None """ if is_json: # Print as JSON if is_pretty: print(json.dumps(data_dict, indent=4, sort_keys=True)) else: print(json.dumps(data_dict, sort_keys=True)) else: # Print as text print(data_dict)
def error_and_exit(is_json: bool, message: dict, error_code: int = 1): """ Print error and exit :param is_json: :param message: Dictionary with keys "result" as string and optional "context" as dict :param error_code: Utility return code on error """ if 'warning' in message.keys() and not message.get('warning'): message.pop('warning')
if is_json: message.update({"timestamp": time.time()}) _print_dictionary(message, is_json, is_pretty=True) else: try: print(str(message["result"]) % message.get("context", {})) except KeyError as e: print("Error: %s [%s]" % (str(e), message)) sys.exit(error_code)
def print_data(is_json: bool, data: dict, result="success"): """ Output data wrapper :param is_json: :param data: data for output to stdout :param result: """ if isinstance(data, dict): data.update({"result": result, "timestamp": time.time()}) _print_dictionary(data, is_json, is_pretty=True)
def is_run_under_user() -> bool: """ Detects is we running under root :return: True - user, False - root """ return os.geteuid() != 0
def is_shared_pro_safely(safely: bool): """ Detecting of shared_pro edition depends on jwt token There are some cases when we do not fail if there are cases with decoding (e.g summary collection) """ try: return is_cl_shared_pro_edition() except CLEditionDetectionError: if safely: return False else: raise
def is_wpos_supported(safely=False) -> bool: """ Сheck if system environment is supported by WPOS :return: True - CPanel on Solo/ CL Shared Pro/ CL Admin False - else """ is_cpanel = getCPName() == CPANEL_NAME return (is_cl_solo_edition(skip_jwt_check=True) or is_shared_pro_safely(safely) or is_cl_admin_edition(skip_jwt_check=True)) \ and is_cpanel
def create_clwpos_dir_if_not_exists(username): """ Creates {homedir}/.clwpos directory if it's not exists """ clwpos_dir = os.path.join(home_dir(username), USER_WPOS_DIR) if not os.path.isdir(clwpos_dir): os.mkdir(clwpos_dir, mode=0o700)
def get_relative_docroot(domain, homedir): dr = docroot(domain)[0] if not dr.startswith(homedir): raise WposError(f"docroot {dr} for domain {domain} should start with {homedir}") return dr[len(homedir):].lstrip("/")
def home_dir(username: str = None) -> str: pw = get_pw(username=username) return pw.pw_dir
def user_name() -> str: return get_pw().pw_name
def user_uid(*, username: str = None) -> int: return get_pw(username=username).pw_uid
def get_pw(*, username: str = None): if username: return pwd.getpwnam(username) else: return pwd.getpwuid(os.geteuid())
class WposUser: """ Helper class to construct paths to user's WPOS dir and files inside it. """
def __init__(self, username: str, homedir: str = None) -> None: self.name = username self.home_dir = home_dir(username) if homedir is None else homedir self.wpos_dir = os.path.join(self.home_dir, USER_WPOS_DIR) self.wpos_config = os.path.join(self.wpos_dir, USER_CLWPOS_CONFIG) self.redis_conf = os.path.join(self.wpos_dir, 'redis.conf') self.redis_socket = os.path.join(self.wpos_dir, 'redis.sock') self.php_info = os.path.join(self.wpos_dir, '.php_info-{file_id}')
def __eq__(self, other): return self.name == other.name
def __hash__(self): return hash(self.name)
def daemon_communicate(cmd_dict: dict) -> Optional[dict]: """ Send command to CLWPOS daemon via socket :param cmd_dict: Command dictionary :return: Daemon response as dictionary, None - daemon data/socket error """ bytes_to_send = pack_data_for_socket(cmd_dict) with socket(AF_UNIX, SOCK_STREAM) as s: try: s.connect(WPOS_DAEMON_SOCKET_FILE) s.sendall(bytes_to_send) response_dict = read_unpack_response_from_socket_client(s) if response_dict is None or not isinstance(response_dict, dict): raise WposError( message=gettext('Unexpected response from daemon. ' 'Report this issue to your system administrator.'), details=str(response_dict), context={}) if response_dict['result'] != 'success': raise WposError(message=gettext('Daemon was unable to execute the requested command.'), details=response_dict['result'], context=response_dict.get('context')) return response_dict except FileNotFoundError: raise WposError(gettext('CloudLinux AccelerateWP daemon socket (%(filename)s) not found. ' 'Contact your system administrator.'), {'filename': WPOS_DAEMON_SOCKET_FILE}) except (ConnectionError, OSError, IOError, AttributeError, struct.error, KeyError) as e: raise WposError(gettext('Unexpected daemon communication error.'), details=str(e))
def redis_cache_config_section() -> List[str]: """ Construct list of lines (configuration settings) that should be in Wordpress config file to enable redis. Please note that deleting of the plugin would flush all keys related to the plugin (site) from redis. REDIS_PREFIX and SELECTIVE_FLUSH in wp-config.php would guarantee that plugin will not flush keys unrelated to this plugin (site) """ socket_path = os.path.join(home_dir(), USER_WPOS_DIR, 'redis.sock') prefix_uuid = uuid.uuid4() redis_prefix = RedisRequiredConstants.WP_REDIS_PREFIX redis_schema = RedisRequiredConstants.WP_REDIS_SCHEME redis_client = RedisRequiredConstants.WP_REDIS_CLIENT redis_flush = RedisRequiredConstants.WP_REDIS_SELECTIVE_FLUSH return ["// Start of CloudLinux generated section\n", f"define('{redis_schema.name}', '{redis_schema.value}');\n", f"define('{RedisRequiredConstants.WP_REDIS_PATH.name}', '{socket_path}');\n", f"define('{redis_client.name}', '{redis_client.value}');\n", f"define('{redis_prefix.name}', '{redis_prefix.value}{prefix_uuid}');\n", f"define('{redis_flush.name}', {redis_flush.value});\n", "// End of CloudLinux generated section\n"]
def check_wp_config_existance(wp_config_path: str) -> None: """ Check that wp-config.php exists inside Wordpress directory. :param wp_config_path: absolute path to Wordpress config file :raises: WposError """ wp_path = os.path.dirname(wp_config_path) if not os.path.exists(wp_path): raise WpNotExists(wp_path)
if not os.path.isfile(wp_config_path): raise WposError(message=gettext("Wordpress config file %(file)s is missing"), context={"file": wp_config_path})
def clear_redis_cache_config(abs_wp_path: str) -> None: """ Clear cloudlinux section with redis object cach config from docroot's wp-config.php :param abs_wp_path: Absolute path to WordPress :raises: WposError """ wp_config_path = str(wp_config.path(abs_wp_path)) check_wp_config_existance(wp_config_path) lines_to_filter = redis_cache_config_section()
def __config_filter(line: str) -> bool: """ Filter function that should delete CL config options from the `redis_cache_config_section()` """ return line not in lines_to_filter and 'WP_REDIS_PREFIX' not in line
try: wp_config_lines = wp_config.read(abs_wp_path) cleared_wp_config = list(filter(__config_filter, wp_config_lines)) write_file_via_tempfile("".join(cleared_wp_config), wp_config_path, 0o600) except (OSError, IOError) as e: raise WpConfigWriteFailed(wp_config_path, e)
def create_redis_cache_config(abs_wp_path: str) -> None: """ Create config for redis-cache. We use manual copy cause we want to preserve file metadata and permissions and also we could add some custom config editing in the future. :param abs_wp_path: absolute path to WordPress :raises: WposError """ wp_config_path = str(wp_config.path(abs_wp_path)) check_wp_config_existance(wp_config_path)
try: backup_wp_config = f"{wp_config_path}.backup" if not os.path.isfile(backup_wp_config): shutil.copy(wp_config_path, backup_wp_config)
absent_constants = {constant.name: constant.value for constant in RedisRequiredConstants}
wp_config_lines = wp_config.read(abs_wp_path) cleaned_lines = [] for line in wp_config_lines: absent_constants = {k: v for k, v in absent_constants.items() if f"define('{k}'" not in line} # nothing to do, all constants are already in conf if not absent_constants: return
# cleanup existing consts, to rewrite all if not any(f"define('{redis_constant.name}'" in line for redis_constant in RedisRequiredConstants): cleaned_lines.append(line)
updated_config = [ cleaned_lines[0], *redis_cache_config_section(), *cleaned_lines[1:], ] write_file_via_tempfile("".join(updated_config), wp_config_path, 0o600)
except (OSError, IOError) as e: raise WpConfigWriteFailed(wp_config_path, e)
def check_license_decorator(func): """Decorator to check for license validity """
@wraps(func) def wrapper(*args, **kwargs): """License check wrapper""" if not check_license(): raise WPOSLicenseMissing() return func(*args, **kwargs)
return wrapper
def check_domain(domain: str) -> Tuple[str, str]: """ Validates domain, determines it's owner and docroot or exit with error :param domain: Domain name to check :return: Tuple (username, docroot) """ try: document_root, owner = docroot(domain) return owner, document_root except NoDomain: # No such domain raise WposError(message=gettext("No such domain: %(domain)s."), context={"domain": domain})
def lock_file(path: str, attempts: Optional[int]): """ Try to take lock on file with specified number of attempts. """ lock_type = fcntl.LOCK_EX if attempts is not None: # avoid blocking on lock lock_type |= fcntl.LOCK_NB try: lock_fd = open(path, "a+") for _ in range(attempts or 1): # if attempts is None do 1 attempt try: fcntl.flock(lock_fd.fileno(), lock_type) break except OSError: time.sleep(0.3) else: raise LockFailedException(gettext("Another utility instance is already running. " "Try again later or contact system administrator " "in case if issue persists.")) except IOError: raise LockFailedException(gettext("IO error happened while getting lock.")) return lock_fd
class LockFailedException(Exception): """ Exception when failed to take lock """ pass
@contextmanager def acquire_lock(resource_path: str, attempts: Optional[int] = 10): """ Lock a file, than do something. Make specified number of attempts to acquire the lock, if attempts is None, wait until the lock is released. Usage: with acquire_lock(path, attempts=1): ... do something with files ... """ lock_fd = lock_file(resource_path + '.lock', attempts) yield release_lock(lock_fd)
def release_lock(descriptor): """ Releases lock file """ try: # lock released explicitly fcntl.flock(descriptor.fileno(), fcntl.LOCK_UN) except IOError: # we ignore this cause process will be closed soon anyway pass descriptor.close()
class PHP(str): """Class helper which hides differences of PHP behind abstract methods."""
def __new__(cls, *args): if cls != PHP: return str.__new__(cls, *args)
for tp in _AltPHP, _EaPHP: if args[0].startswith(tp.prefix()): return tp(*args)
raise Exception(f"Unknown PHP: {args[0]}")
@staticmethod def prefix() -> str: """Return prefix of PHP.""" raise NotImplementedError
def _dir_relative_path(self) -> str: """Return relative path to dir of PHP.""" raise NotImplementedError
def dir(self) -> Path: """Return path to dir of PHP.""" return Path(f"""/opt/{self._dir_relative_path()}""")
def _bin_relative_path(self) -> str: """Return relative path to bin of PHP.""" raise NotImplementedError
def bin(self) -> Path: """Return path to bin of PHP.""" return self.dir().joinpath(self._bin_relative_path())
def _ini_relative_path(self) -> str: """Return relative path to ini of PHP.""" raise NotImplementedError
def ini(self) -> Path: """Return path to ini of PHP.""" return self.dir().joinpath(self._ini_relative_path())
@property def digits(self): version = self.replace(self.prefix(), "").replace("php", "") return int(version)
class _AltPHP(PHP): """Implementation for alt-php"""
@staticmethod def prefix(): return "alt-"
def _dir_relative_path(self): return f"alt/{self[len(self.prefix()):]}"
def _bin_relative_path(self): return "usr/bin/php"
def _ini_relative_path(self): return "link/conf/default.ini"
class _EaPHP(PHP): """Implementation for ea-php"""
@staticmethod def prefix(): return "ea-"
def _dir_relative_path(self): return f"cpanel/{self}"
def _bin_relative_path(self): return "root/usr/bin/php"
def _ini_relative_path(self): return "root/etc/php.ini"
def is_conflict_modules_installed(php_version: PHP, module): """ Checks <module> enabled """ path = str(php_version.bin()) result = run_in_cagefs_if_needed([path, '-m'], env={})
if result.stderr and not result.stdout: raise PhpBrokenException(path, result.stderr)
out = result.stdout if module in out.split('\n'): return True return False
def wp_cli_compatibility_check(php_path: str): """ Ensures wp-cli is compatible, e.g some php modules may prevent stable work """ dangerous_module = 'snuffleupagus' if 'ea-php74' in php_path and is_conflict_modules_installed(PHP("ea-php74"), dangerous_module): raise WpCliUnsupportedException(message=gettext('Seems like ea-php74 %(module)s module is ' 'enabled. It may cause instabilities while managing ' 'Object Caching. Disable it and try again'), context={'module': dangerous_module})
def supported_php_handlers() -> List[str]: """ Return list of supported handlers according to edition """ supported = ['php-fpm'] if not is_cl_solo_edition(skip_jwt_check=True): supported.append('lsapi') return supported
def set_wpos_icon_visibility(hide: bool) -> Tuple[int, str]: """ Call cloudlinux-config utility to hide/show WPOS icon in user's control panel interface. """ params = [ 'set', '--data', json.dumps({'options': {'uiSettings': {'hideWPOSApp': hide}}}), '--json', ] returncode, stdout = exec_utility(CLCONFIG_UTILITY, params) return returncode, stdout
def is_ui_icon_hidden() -> bool: """ Check the current state of WPOS icon in user's control panel interface """ return UIConfig().get_param('hideWPOSApp', 'uiSettings')
def get_default_public_options() -> Dict[str, bool]: """ Return default content of /opt/clwpos/public_config.json. """ config_dict = {} is_icon_hidden = UIConfig().get_param('hideWPOSApp', 'uiSettings') config_dict['show_icon'] = not is_icon_hidden return config_dict
def get_admin_options(): """ Gets admin options by reading UI config or wpos PUBLIC_OPTIONS config """ if not os.path.isfile(PUBLIC_OPTIONS): return get_default_public_options() else: with acquire_lock(PUBLIC_OPTIONS): with open(PUBLIC_OPTIONS, 'r') as f: content = f.read() return json.loads(content)
def run_in_cagefs_if_needed(command, **kwargs): """ Wrapper for subprocess to enter cagefs do not enter cagefs if: - CloudLinux Solo - if process already started as user in cagefs """ if in_cagefs() or not is_panel_feature_supported(Feature.CAGEFS): return subprocess.run(command, text=True, capture_output=True, preexec_fn=demote(os.geteuid(), os.getegid()), **kwargs) else: if os.geteuid() == 0: raise WposError(message=gettext(f'Internal error: command {command} must not be run as root. ' 'Please contact support if you have questions: ' 'https://cloudlinux.zendesk.com')) if isinstance(command, str): with_cagefs_enter = CAGEFS_ENTER_UTIL + ' ' + command else: with_cagefs_enter = [CAGEFS_ENTER_UTIL] + command return subprocess.run(with_cagefs_enter, preexec_fn=demote(os.geteuid(), os.getegid()), text=True, capture_output=True, **kwargs)
def uid_by_name(name): """ Returns uid for user """ try: return ClPwd().get_uid(name) except ClPwd.NoSuchUserException: return None
def get_alt_php_versions() -> List[PHP]: """ Get list of installed alt-php versions. """ alt_dir = '/opt/alt' pattern = re.compile(r'^php\d{2}$') alt_php_versions = [ PHP(f'alt-{dirname}') for dirname in os.listdir(alt_dir) if pattern.match(dirname) ] return alt_php_versions
class PhpIniConfig: """ Helper class to update extensions in php .ini files. """
def __init__(self, php_version: PHP): self.php_version = php_version self.disabled_pattern = re.compile(r'^;\s*extension\s*=\s*(?P<module_name>\w+)\.so') self.enabled_pattern = re.compile(r'^\s*extension\s*=\s*(?P<module_name>\w+)\.so')
def _enabled_modules(self, path: str) -> Set[str]: """ Return enabled modules. :param path: full path to .ini file """ with open(path, 'r') as f: return {self.enabled_pattern.match(line).group('module_name') for line in f if self.enabled_pattern.match(line) is not None}
def enable_modules(self, path: str, modules: List[str]) -> bool: """ Enable specified modules in .ini php file. :param path: path to .ini file related to php directory :param modules: list of modules that should be enabled """ full_path = os.path.join(self.php_version.dir(), path) if not os.path.exists(full_path): return False modules_to_enable = set(modules) - self._enabled_modules(full_path) if modules_to_enable: with open(full_path) as f: new_ini_lines = [self._enable_module(line, modules_to_enable) for line in f.readlines()]
for module in sorted(modules_to_enable): new_ini_lines.append('extension={}.so\n'.format(module))
write_file_via_tempfile(''.join(new_ini_lines), full_path, 0o644) return True
def disable_modules(self, path: str, modules: List[str]) -> bool: """ Disable specified modules in .ini php file. :param path: path to .ini file related to php directory :param modules: list of modules that should be disabled """ full_path = os.path.join(self.php_version.dir(), path) if not os.path.exists(full_path): return False modules_to_disable = set(modules) & self._enabled_modules(full_path) if modules_to_disable: with open(full_path) as f: new_ini_lines = [self._disable_module(line, modules_to_disable) for line in f.readlines()] write_file_via_tempfile(''.join(new_ini_lines), full_path, 0o644) return True
def _enable_module(self, line: str, modules_to_enable: Set[str]) -> str: """ Search for disabled module in line, uncomment line to enable module. """ match = self.disabled_pattern.match(line) if match is not None: module_name = match.group('module_name') if module_name in modules_to_enable: modules_to_enable.remove(module_name) return line.lstrip(';').lstrip() return line
def _disable_module(self, line: str, modules_to_disable: Set[str]) -> str: """ Search for enabled module in line, comment line to disable module. """ match = self.enabled_pattern.match(line) if match is not None: module_name = match.group('module_name') if module_name in modules_to_disable: return f';{line}' return line
def _run_clwpos_as_user_in_cagefs(user=None): """ All user-related actions must run inside of cagefs for security reasons. If solo just return because cagefs is only for shared and shared pro If root executed, we enter into user cagefs if user is pointed If not in cagefs and cagefs is enabeled for user enter into cagefs """ if not is_panel_feature_supported(Feature.CAGEFS): return
if not is_run_under_user(): if user is None: raise WposError(message=gettext( "Internal Error: root enters into CageFS without specifying username" "Please contact support if you have questions: " "https://cloudlinux.zendesk.com" ) ) cmd = [CAGEFS_ENTER_USER_BIN, user] + sys.argv[:1] + sys.argv[3:] elif not in_cagefs() and _is_cagefs_enabled(user=user_name()): cmd = [CAGEFS_ENTER_UTIL] + sys.argv
else: return
p = subprocess.Popen(cmd, stdout=sys.stdout, stdin=sys.stdin, env={}) p.communicate() sys.exit(p.returncode)
class RedisConfigurePidFile: """ Helper class that provides methods to work with pid files of php redis configuration processes. """
def __init__(self, php_prefix: str) -> None: self._pid_file_name = f'{php_prefix}-cloudlinux.pid' self.path = Path(CLWPOS_OPT_DIR, self._pid_file_name)
def create(self) -> None: with self.path.open('w') as f: f.write(str(os.getpid()))
def remove(self) -> None: if self.path.is_file(): self.path.unlink()
def exists(self) -> bool: return self.path.is_file()
@property def pid(self) -> int: if not self.exists(): return -1 with self.path.open() as f: try: return int(f.read().strip()) except ValueError: pass return -1
@contextmanager def create_pid_file(php_prefix: str): """ Context manager for creating pid file of current process. Removes pid file on exit. """ pid_file = RedisConfigurePidFile(php_prefix) try: pid_file.create() yield finally: pid_file.remove()
def is_php_redis_configuration_running(php_prefix: str) -> bool: """ Find out if PHP redis configuration process is running. Based on looking for presence of pid files. For root also checks process existence. """ pid_file = RedisConfigurePidFile(php_prefix) if os.geteuid() != 0: return pid_file.exists() try: process = psutil.Process(pid_file.pid) return 'enable_redis' in process.name() except (ValueError, psutil.NoSuchProcess): return False
def is_alt_php_redis_configuration_running() -> bool: """ Find out if alt-PHP redis configuration process is running. """ return is_php_redis_configuration_running(ALT_PHP_PREFIX)
def is_ea_php_redis_configuration_running() -> bool: """ Find out if ea-PHP redis configuration process is running. """ return is_php_redis_configuration_running(EA_PHP_PREFIX)
def is_redis_configuration_running() -> bool: """ Find out if redis configuration process is running for any PHP (ea-php or alt-php). """ return is_alt_php_redis_configuration_running() or \ is_ea_php_redis_configuration_running()
def update_redis_conf(new_user: WposUser, old_user: WposUser) -> None: """ Replace user's wpos directory path in redis.conf. """ with open(new_user.redis_conf) as f: redis_conf_lines = f.readlines()
updated_lines = [ line.replace(old_user.wpos_dir, new_user.wpos_dir) for line in redis_conf_lines ] write_file_via_tempfile(''.join(updated_lines), new_user.redis_conf, 0o600)
def update_wp_config(abs_wp_path: str, new_user: WposUser, old_user: WposUser) -> None: """ Replace user's redis socket path in wp-config.php. """ try: wp_config_lines = wp_config.read(abs_wp_path) except OSError as e: print('Error occurred during opening wp-config.php ' f'located in path "{abs_wp_path}": {e}', file=sys.stderr) return
updated_lines = [ line.replace(old_user.redis_socket, new_user.redis_socket) if old_user.redis_socket in line else line for line in wp_config_lines ] write_file_via_tempfile(''.join(updated_lines), wp_config.path(abs_wp_path), 0o600)
def get_parent_pid() -> int: """ Get parent process PID. """ proc = psutil.Process(os.getpid()) return proc.ppid()
def _is_monitoring_daemon_exists() -> bool: """ Detect CL WPOS daemon presence in system :return: True - daemon works / False - No """ # /sbin/service clwpos_monitoring status # retcode != 0 - clwpos_monitoring not running/not installed # == 0 - clwpos_monitoring running returncode, _, _ = run_command(['/sbin/service', 'clwpos_monitoring', 'status'], return_full_output=True) if returncode != 0: return False return True
def _update_clwpos_daemon_config_systemd(systemd_unit_file) -> Tuple[int, str, str]: """ Update systemd unit file and reload systemd """ shutil.copy('/usr/share/cloudlinux/clwpos_monitoring.service', systemd_unit_file) retcode, stdout, stderr = run_command(['/usr/bin/systemctl', 'enable', 'clwpos_monitoring.service'], return_full_output=True) if not retcode: retcode, stdout, stderr = run_command(['/usr/bin/systemctl', 'daemon-reload'], return_full_output=True) return retcode, stdout, stderr
def _install_daemon_internal(is_solo: bool, systemd_unit_file: str, is_module_allowed_on_server: bool) -> Tuple[int, str, str]: """ Install WPOS daemon to system and start it """ retcode, stdout, stderr = 0, None, None if 'el6' in platform.release(): retcode, stdout, stderr = run_command(['/sbin/chkconfig', '--add', 'clwpos_monitoring'], return_full_output=True) else: if not is_solo and is_module_allowed_on_server: # CL Shared Pro and module enabled # Update unit file and reload systemd - setup daemon retcode, stdout, stderr = _update_clwpos_daemon_config_systemd(systemd_unit_file) if not retcode: retcode, stdout, stderr = run_command(['/sbin/service', 'clwpos_monitoring', 'start'], return_full_output=True) return retcode, stdout, stderr
def install_monitoring_daemon(is_module_allowed_on_server: bool) -> Tuple[int, str, str]: """ Install WPOS daemon to server if need: - if daemon already present - do nothing; - on CL Solo install daemon always; - on CL Shared Pro install daemon if module allowed On solo and if /etc/systemd/system/clwpos_monitoring.service present it will be updated always We do not need restart installed daemon here, it's done in rpm_posttrans.sh :param is_module_allowed_on_server: True/False """ systemd_unit_file = '/etc/systemd/system/clwpos_monitoring.service' is_solo = is_cl_solo_edition() # if from rpm_posttrans if is_solo or os.path.exists(systemd_unit_file): # Update unit file and reload systemd _update_clwpos_daemon_config_systemd(systemd_unit_file) if _is_monitoring_daemon_exists(): return 0, "", "" return _install_daemon_internal(is_solo, systemd_unit_file, is_module_allowed_on_server)
def get_status_from_daemon(service): command_get_service_status_dict = {"command": f"get-{service}-status"} try: daemon_result = daemon_communicate(command_get_service_status_dict) except WposError: return False return daemon_result.get('status')
def redis_is_running() -> bool: return get_status_from_daemon('redis')
def litespeed_is_running() -> bool: return get_status_from_daemon('litespeed')
def clean_crons(prefix: str): """Remove all cron files starting from prefix""" crondir = '/etc/cron.d' for _f in iglob(f'{crondir}/{prefix}*'): os.unlink(_f)
def clean_clwpos_crons(): """Remove all WPOS cron files""" clean_crons(prefix='clwpos_')
|