!C99Shell v. 2.0 [PHP 7 Update] [25.02.2019]!

Software: Apache. PHP/5.6.40 

uname -a: Linux cpanel06wh.bkk1.cloud.z.com 2.6.32-954.3.5.lve1.4.80.el6.x86_64 #1 SMP Thu Sep 24
01:42:00 EDT 2020 x86_64
 

uid=851(cp949260) gid=853(cp949260) groups=853(cp949260) 

Safe-mode: OFF (not secure)

/opt/alt/python37/lib/python3.7/site-packages/clwpos/   drwxr-xr-x
Free 234.41 GB of 981.82 GB (23.87%)
Home    Back    Forward    UPDIR    Refresh    Search    Buffer    Encoder    Tools    Proc.    FTP brute    Sec.    SQL    PHP-code    Update    Feedback    Self remove    Logout    


Viewing file:     utils.py (33.13 KB)      -rw-r--r--
Select action/file-type:
(+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

# wpos_lib.py - helper functions for clwpos utility

from __future__ import absolute_import

import os
import re
import shutil
import struct
import sys
import time
import json
import pwd
import fcntl
import uuid
import subprocess
import psutil
from contextlib import contextmanager
from glob import iglob
from functools import wraps
from pathlib import Path
from socket import socket, AF_UNIX, SOCK_STREAM
from typing import Dict, List, Tuple, Optional, Set
import platform

from secureio import write_file_via_tempfile
from clcommon.cpapi.cpapiexceptions import NoDomain
from clcommon.clpwd import ClPwd
from clcommon.lib.cledition import (
    is_cl_solo_edition,
    is_cl_shared_pro_edition,
    is_cl_admin_edition,
    CLEditionDetectionError
)

from cllicenselib import check_license
from clcommon.cpapi import docroot, getCPName, CPANEL_NAME
from clcommon.utils import exec_utility, run_command, demote
from clwpos import gettext, wp_config
from clwpos.cl_wpos_exceptions import (
    WposError,
    WPOSLicenseMissing,
    WpCliUnsupportedException,
    WpNotExists,
    WpConfigWriteFailed,
    PhpBrokenException
)
from clcommon.ui_config import UIConfig
from clcommon.clcagefs import in_cagefs, _is_cagefs_enabled
from clcommon.const import Feature
from clcommon.cpapi import is_panel_feature_supported

from .logsetup import setup_logging

from clwpos.constants import (
    USER_WPOS_DIR,
    WPOS_DAEMON_SOCKET_FILE,
    CLCONFIG_UTILITY,
    RedisRequiredConstants,
    CAGEFS_ENTER_USER_BIN,
    CAGEFS_ENTER_UTIL,
    CLWPOS_OPT_DIR,
    ALT_PHP_PREFIX,
    EA_PHP_PREFIX,
    USER_CLWPOS_CONFIG,
    PUBLIC_OPTIONS,
)

from .socket_utils import pack_data_for_socket, read_unpack_response_from_socket_client
from .user.website_check.errors import RollbackException

logger = None


def catch_error(func):
    """
    Decorator for catching errors
    """

    def func_wrapper(self, *args, **kwargs):
        global logger
        if logger is None:
            logger = setup_logging(__name__)
        try:
            return func(self, *args, **kwargs)
        except RollbackException as e:
            error_and_exit(self._is_json, {
                'context': e.context,
                'result': e.message,
                'issues': e.errors
            })
        except WposError as e:
            if isinstance(e, WPOSLicenseMissing):
                logger.warning(e)
            else:
                logger.exception(e)
            response = {'context': e.context, 'result': e.message, 'warning': e.warning}
            if e.details:
                response['details'] = e.details
            error_and_exit(self._is_json, response)
        except Exception as e:
            logger.exception(e)
            error_and_exit(self._is_json, {'context': {}, 'result': str(e)})

    return func_wrapper


def _print_dictionary(data_dict, is_json: bool = False, is_pretty: bool = False):
    """
    Print specified dictionary
    :param data_dict: data dictionary to print
    :param is_json: True - print in JSON, False - in text
    :param is_pretty: True - pretty json print, False - none (default)
    :return: None
    """
    if is_json:
        # Print as JSON
        if is_pretty:
            print(json.dumps(data_dict, indent=4, sort_keys=True))
        else:
            print(json.dumps(data_dict, sort_keys=True))
    else:
        # Print as text
        print(data_dict)


def error_and_exit(is_json: bool, message: dict, error_code: int = 1):
    """
    Print error and exit
    :param is_json:
    :param message: Dictionary with keys "result" as string and optional "context" as dict
    :param error_code: Utility return code on error
    """
    if 'warning' in message.keys() and not message.get('warning'):
        message.pop('warning')

    if is_json:
        message.update({"timestamp": time.time()})
        _print_dictionary(message, is_json, is_pretty=True)
    else:
        try:
            print(str(message["result"]) % message.get("context", {}))
        except KeyError as e:
            print("Error: %s [%s]" % (str(e), message))
    sys.exit(error_code)


def print_data(is_json: bool, data: dict, result="success"):
    """
    Output data wrapper
    :param is_json:
    :param data: data for output to stdout
    :param result:
    """
    if isinstance(data, dict):
        data.update({"result": result, "timestamp": time.time()})
    _print_dictionary(data, is_json, is_pretty=True)


def is_run_under_user() -> bool:
    """
    Detects is we running under root
    :return: True - user, False - root
    """
    return os.geteuid() != 0


def is_shared_pro_safely(safely: bool):
    """
    Detecting of shared_pro edition depends on jwt token
    There are some cases when we do not fail if there are
    cases with decoding (e.g summary collection)
    """
    try:
        return is_cl_shared_pro_edition()
    except CLEditionDetectionError:
        if safely:
            return False
        else:
            raise


def is_wpos_supported(safely=False) -> bool:
    """
    Сheck if system environment is supported by WPOS
    :return:
        True - CPanel on Solo/ CL Shared Pro/ CL Admin
        False - else
    """
    is_cpanel = getCPName() == CPANEL_NAME
    return (is_cl_solo_edition(skip_jwt_check=True) or is_shared_pro_safely(safely)
            or is_cl_admin_edition(skip_jwt_check=True)) \
           and is_cpanel


def create_clwpos_dir_if_not_exists(username):
    """
    Creates {homedir}/.clwpos directory if it's not exists
    """
    clwpos_dir = os.path.join(home_dir(username), USER_WPOS_DIR)
    if not os.path.isdir(clwpos_dir):
        os.mkdir(clwpos_dir, mode=0o700)


def get_relative_docroot(domain, homedir):
    dr = docroot(domain)[0]
    if not dr.startswith(homedir):
        raise WposError(f"docroot {dr} for domain {domain} should start with {homedir}")
    return dr[len(homedir):].lstrip("/")


def home_dir(username: str = None) -> str:
    pw = get_pw(username=username)
    return pw.pw_dir


def user_name() -> str:
    return get_pw().pw_name


def user_uid(*, username: str = None) -> int:
    return get_pw(username=username).pw_uid


def get_pw(*, username: str = None):
    if username:
        return pwd.getpwnam(username)
    else:
        return pwd.getpwuid(os.geteuid())


class WposUser:
    """
    Helper class to construct paths to user's WPOS dir and files inside it.
    """

    def __init__(self, username: str, homedir: str = None) -> None:
        self.name = username
        self.home_dir = home_dir(username) if homedir is None else homedir
        self.wpos_dir = os.path.join(self.home_dir, USER_WPOS_DIR)
        self.wpos_config = os.path.join(self.wpos_dir, USER_CLWPOS_CONFIG)
        self.redis_conf = os.path.join(self.wpos_dir, 'redis.conf')
        self.redis_socket = os.path.join(self.wpos_dir, 'redis.sock')
        self.php_info = os.path.join(self.wpos_dir, '.php_info-{file_id}')

    def __eq__(self, other):
        return self.name == other.name

    def __hash__(self):
        return hash(self.name)


def daemon_communicate(cmd_dict: dict) -> Optional[dict]:
    """
    Send command to CLWPOS daemon via socket
    :param cmd_dict: Command dictionary
    :return: Daemon response as dictionary, None - daemon data/socket error
    """
    bytes_to_send = pack_data_for_socket(cmd_dict)
    with socket(AF_UNIX, SOCK_STREAM) as s:
        try:
            s.connect(WPOS_DAEMON_SOCKET_FILE)
            s.sendall(bytes_to_send)
            response_dict = read_unpack_response_from_socket_client(s)
            if response_dict is None or not isinstance(response_dict, dict):
                raise WposError(
                    message=gettext('Unexpected response from daemon. '
                                    'Report this issue to your system administrator.'),
                    details=str(response_dict),
                    context={})
            if response_dict['result'] != 'success':
                raise WposError(message=gettext('Daemon was unable to execute the requested command.'),
                                details=response_dict['result'],
                                context=response_dict.get('context'))
            return response_dict
        except FileNotFoundError:
            raise WposError(gettext('CloudLinux AccelerateWP daemon socket (%(filename)s) not found. '
                                    'Contact your system administrator.'),
                            {'filename': WPOS_DAEMON_SOCKET_FILE})
        except (ConnectionError, OSError, IOError, AttributeError, struct.error, KeyError) as e:
            raise WposError(gettext('Unexpected daemon communication error.'), details=str(e))


def redis_cache_config_section() -> List[str]:
    """
    Construct list of lines (configuration settings)
    that should be in Wordpress config file to enable redis.
    Please note that deleting of the plugin would flush all keys related to the plugin (site) from redis.
    REDIS_PREFIX and SELECTIVE_FLUSH in wp-config.php would guarantee that plugin will not flush keys unrelated
    to this plugin (site)
    """
    socket_path = os.path.join(home_dir(), USER_WPOS_DIR, 'redis.sock')
    prefix_uuid = uuid.uuid4()
    redis_prefix = RedisRequiredConstants.WP_REDIS_PREFIX
    redis_schema = RedisRequiredConstants.WP_REDIS_SCHEME
    redis_client = RedisRequiredConstants.WP_REDIS_CLIENT
    redis_flush = RedisRequiredConstants.WP_REDIS_SELECTIVE_FLUSH
    return ["// Start of CloudLinux generated section\n",
            f"define('{redis_schema.name}', '{redis_schema.value}');\n",
            f"define('{RedisRequiredConstants.WP_REDIS_PATH.name}', '{socket_path}');\n",
            f"define('{redis_client.name}', '{redis_client.value}');\n",
            f"define('{redis_prefix.name}', '{redis_prefix.value}{prefix_uuid}');\n",
            f"define('{redis_flush.name}', {redis_flush.value});\n",
            "// End of CloudLinux generated section\n"]


def check_wp_config_existance(wp_config_path: str) -> None:
    """
    Check that wp-config.php exists inside Wordpress directory.
    :param wp_config_path: absolute path to Wordpress config file
    :raises: WposError
    """
    wp_path = os.path.dirname(wp_config_path)
    if not os.path.exists(wp_path):
        raise WpNotExists(wp_path)

    if not os.path.isfile(wp_config_path):
        raise WposError(message=gettext("Wordpress config file %(file)s is missing"),
                        context={"file": wp_config_path})


def clear_redis_cache_config(abs_wp_path: str) -> None:
    """
    Clear cloudlinux section with redis object cach config from docroot's wp-config.php
    :param abs_wp_path: Absolute path to WordPress
    :raises: WposError
    """
    wp_config_path = str(wp_config.path(abs_wp_path))
    check_wp_config_existance(wp_config_path)
    lines_to_filter = redis_cache_config_section()

    def __config_filter(line: str) -> bool:
        """
        Filter function that should delete CL config options from the `redis_cache_config_section()`
        """
        return line not in lines_to_filter and 'WP_REDIS_PREFIX' not in line

    try:
        wp_config_lines = wp_config.read(abs_wp_path)
        cleared_wp_config = list(filter(__config_filter, wp_config_lines))
        write_file_via_tempfile("".join(cleared_wp_config), wp_config_path, 0o600)
    except (OSError, IOError) as e:
        raise WpConfigWriteFailed(wp_config_path, e)


def create_redis_cache_config(abs_wp_path: str) -> None:
    """
    Create config for redis-cache.
    We use manual copy cause we want to preserve file metadata
    and permissions and also we could add some custom config editing in the future.
    :param abs_wp_path: absolute path to WordPress
    :raises: WposError
    """
    wp_config_path = str(wp_config.path(abs_wp_path))
    check_wp_config_existance(wp_config_path)

    try:
        backup_wp_config = f"{wp_config_path}.backup"
        if not os.path.isfile(backup_wp_config):
            shutil.copy(wp_config_path, backup_wp_config)

        absent_constants = {constant.name: constant.value for constant in RedisRequiredConstants}

        wp_config_lines = wp_config.read(abs_wp_path)
        cleaned_lines = []
        for line in wp_config_lines:
            absent_constants = {k: v for k, v in absent_constants.items() if f"define('{k}'" not in line}
            # nothing to do, all constants are already in conf
            if not absent_constants:
                return

            # cleanup existing consts, to rewrite all
            if not any(f"define('{redis_constant.name}'" in line for redis_constant in RedisRequiredConstants):
                cleaned_lines.append(line)

        updated_config = [
            cleaned_lines[0],
            *redis_cache_config_section(),
            *cleaned_lines[1:],
        ]
        write_file_via_tempfile("".join(updated_config), wp_config_path, 0o600)

    except (OSError, IOError) as e:
        raise WpConfigWriteFailed(wp_config_path, e)


def check_license_decorator(func):
    """Decorator to check for license validity
    """

    @wraps(func)
    def wrapper(*args, **kwargs):
        """License check wrapper"""
        if not check_license():
            raise WPOSLicenseMissing()
        return func(*args, **kwargs)

    return wrapper


def check_domain(domain: str) -> Tuple[str, str]:
    """
    Validates domain, determines it's owner and docroot or exit with error
    :param domain: Domain name to check
    :return: Tuple (username, docroot)
    """
    try:
        document_root, owner = docroot(domain)
        return owner, document_root
    except NoDomain:
        # No such domain
        raise WposError(message=gettext("No such domain: %(domain)s."), context={"domain": domain})


def lock_file(path: str, attempts: Optional[int]):
    """
    Try to take lock on file with specified number of attempts.
    """
    lock_type = fcntl.LOCK_EX
    if attempts is not None:
        # avoid blocking on lock
        lock_type |= fcntl.LOCK_NB
    try:
        lock_fd = open(path, "a+")
        for _ in range(attempts or 1):  # if attempts is None do 1 attempt
            try:
                fcntl.flock(lock_fd.fileno(), lock_type)
                break
            except OSError:
                time.sleep(0.3)
        else:
            raise LockFailedException(gettext("Another utility instance is already running. "
                                              "Try again later or contact system administrator "
                                              "in case if issue persists."))
    except IOError:
        raise LockFailedException(gettext("IO error happened while getting lock."))
    return lock_fd


class LockFailedException(Exception):
    """
    Exception when failed to take lock
    """
    pass


@contextmanager
def acquire_lock(resource_path: str, attempts: Optional[int] = 10):
    """
    Lock a file, than do something.
    Make specified number of attempts to acquire the lock,
    if attempts is None, wait until the lock is released.
    Usage:
    with acquire_lock(path, attempts=1):
       ... do something with files ...
    """
    lock_fd = lock_file(resource_path + '.lock', attempts)
    yield
    release_lock(lock_fd)


def release_lock(descriptor):
    """
    Releases lock file
    """
    try:
        # lock released explicitly
        fcntl.flock(descriptor.fileno(), fcntl.LOCK_UN)
    except IOError:
        # we ignore this cause process will be closed soon anyway
        pass
    descriptor.close()


class PHP(str):
    """Class helper which hides differences of PHP behind abstract methods."""

    def __new__(cls, *args):
        if cls != PHP:
            return str.__new__(cls, *args)

        for tp in _AltPHP, _EaPHP:
            if args[0].startswith(tp.prefix()):
                return tp(*args)

        raise Exception(f"Unknown PHP: {args[0]}")

    @staticmethod
    def prefix() -> str:
        """Return prefix of PHP."""
        raise NotImplementedError

    def _dir_relative_path(self) -> str:
        """Return relative path to dir of PHP."""
        raise NotImplementedError

    def dir(self) -> Path:
        """Return path to dir of PHP."""
        return Path(f"""/opt/{self._dir_relative_path()}""")

    def _bin_relative_path(self) -> str:
        """Return relative path to bin of PHP."""
        raise NotImplementedError

    def bin(self) -> Path:
        """Return path to bin of PHP."""
        return self.dir().joinpath(self._bin_relative_path())

    def _ini_relative_path(self) -> str:
        """Return relative path to ini of PHP."""
        raise NotImplementedError

    def ini(self) -> Path:
        """Return path to ini of PHP."""
        return self.dir().joinpath(self._ini_relative_path())

    @property
    def digits(self):
        version = self.replace(self.prefix(), "").replace("php", "")
        return int(version)


class _AltPHP(PHP):
    """Implementation for alt-php"""

    @staticmethod
    def prefix():
        return "alt-"

    def _dir_relative_path(self):
        return f"alt/{self[len(self.prefix()):]}"

    def _bin_relative_path(self):
        return "usr/bin/php"

    def _ini_relative_path(self):
        return "link/conf/default.ini"


class _EaPHP(PHP):
    """Implementation for ea-php"""

    @staticmethod
    def prefix():
        return "ea-"

    def _dir_relative_path(self):
        return f"cpanel/{self}"

    def _bin_relative_path(self):
        return "root/usr/bin/php"

    def _ini_relative_path(self):
        return "root/etc/php.ini"


def is_conflict_modules_installed(php_version: PHP, module):
    """
    Checks <module> enabled
    """
    path = str(php_version.bin())
    result = run_in_cagefs_if_needed([path, '-m'], env={})

    if result.stderr and not result.stdout:
        raise PhpBrokenException(path, result.stderr)

    out = result.stdout
    if module in out.split('\n'):
        return True
    return False


def wp_cli_compatibility_check(php_path: str):
    """
    Ensures wp-cli is compatible, e.g some
    php modules may prevent stable work
    """
    dangerous_module = 'snuffleupagus'
    if 'ea-php74' in php_path and is_conflict_modules_installed(PHP("ea-php74"), dangerous_module):
        raise WpCliUnsupportedException(message=gettext('Seems like ea-php74 %(module)s module is '
                                                        'enabled. It may cause instabilities while managing '
                                                        'Object Caching. Disable it and try again'),
                                        context={'module': dangerous_module})


def supported_php_handlers() -> List[str]:
    """
    Return list of supported handlers according to edition
    """
    supported = ['php-fpm']
    if not is_cl_solo_edition(skip_jwt_check=True):
        supported.append('lsapi')
    return supported


def set_wpos_icon_visibility(hide: bool) -> Tuple[int, str]:
    """
    Call cloudlinux-config utility
    to hide/show WPOS icon in user's control panel interface.
    """
    params = [
        'set',
        '--data',
        json.dumps({'options': {'uiSettings': {'hideWPOSApp': hide}}}),
        '--json',
    ]
    returncode, stdout = exec_utility(CLCONFIG_UTILITY, params)
    return returncode, stdout


def is_ui_icon_hidden() -> bool:
    """
    Check the current state of WPOS icon in user's control panel interface
    """
    return UIConfig().get_param('hideWPOSApp', 'uiSettings')


def get_default_public_options() -> Dict[str, bool]:
    """
    Return default content of /opt/clwpos/public_config.json.
    """
    config_dict = {}
    is_icon_hidden = UIConfig().get_param('hideWPOSApp', 'uiSettings')
    config_dict['show_icon'] = not is_icon_hidden
    return config_dict


def get_admin_options():
    """
    Gets admin options by reading UI config
    or wpos PUBLIC_OPTIONS config
    """
    if not os.path.isfile(PUBLIC_OPTIONS):
        return get_default_public_options()
    else:
        with acquire_lock(PUBLIC_OPTIONS):
            with open(PUBLIC_OPTIONS, 'r') as f:
                content = f.read()
                return json.loads(content)


def run_in_cagefs_if_needed(command, **kwargs):
    """
    Wrapper for subprocess to enter cagefs
    do not enter cagefs if:
     - CloudLinux Solo
     - if process already started as user in cagefs
    """
    if in_cagefs() or not is_panel_feature_supported(Feature.CAGEFS):
        return subprocess.run(command,
                              text=True,
                              capture_output=True,
                              preexec_fn=demote(os.geteuid(), os.getegid()),
                              **kwargs)
    else:
        if os.geteuid() == 0:
            raise WposError(message=gettext(f'Internal error: command {command} must not be run as root. '
                                            'Please contact support if you have questions: '
                                            'https://cloudlinux.zendesk.com'))
        if isinstance(command, str):
            with_cagefs_enter = CAGEFS_ENTER_UTIL + ' ' + command
        else:
            with_cagefs_enter = [CAGEFS_ENTER_UTIL] + command
        return subprocess.run(with_cagefs_enter,
                              preexec_fn=demote(os.geteuid(), os.getegid()),
                              text=True,
                              capture_output=True,
                              **kwargs)


def uid_by_name(name):
    """
    Returns uid for user
    """
    try:
        return ClPwd().get_uid(name)
    except ClPwd.NoSuchUserException:
        return None


def get_alt_php_versions() -> List[PHP]:
    """
    Get list of installed alt-php versions.
    """
    alt_dir = '/opt/alt'
    pattern = re.compile(r'^php\d{2}$')
    alt_php_versions = [
        PHP(f'alt-{dirname}') for dirname in os.listdir(alt_dir)
        if pattern.match(dirname)
    ]
    return alt_php_versions


class PhpIniConfig:
    """
    Helper class to update extensions in php .ini files.
    """

    def __init__(self, php_version: PHP):
        self.php_version = php_version
        self.disabled_pattern = re.compile(r'^;\s*extension\s*=\s*(?P<module_name>\w+)\.so')
        self.enabled_pattern = re.compile(r'^\s*extension\s*=\s*(?P<module_name>\w+)\.so')

    def _enabled_modules(self, path: str) -> Set[str]:
        """
        Return enabled modules.
        :param path: full path to .ini file
        """
        with open(path, 'r') as f:
            return {self.enabled_pattern.match(line).group('module_name') for line in f
                    if self.enabled_pattern.match(line) is not None}

    def enable_modules(self, path: str, modules: List[str]) -> bool:
        """
        Enable specified modules in .ini php file.
        :param path: path to .ini file related to php directory
        :param modules: list of modules that should be enabled
        """
        full_path = os.path.join(self.php_version.dir(), path)
        if not os.path.exists(full_path):
            return False
        modules_to_enable = set(modules) - self._enabled_modules(full_path)
        if modules_to_enable:
            with open(full_path) as f:
                new_ini_lines = [self._enable_module(line, modules_to_enable)
                                 for line in f.readlines()]

                for module in sorted(modules_to_enable):
                    new_ini_lines.append('extension={}.so\n'.format(module))

            write_file_via_tempfile(''.join(new_ini_lines), full_path, 0o644)
        return True

    def disable_modules(self, path: str, modules: List[str]) -> bool:
        """
        Disable specified modules in .ini php file.
        :param path: path to .ini file related to php directory
        :param modules: list of modules that should be disabled
        """
        full_path = os.path.join(self.php_version.dir(), path)
        if not os.path.exists(full_path):
            return False
        modules_to_disable = set(modules) & self._enabled_modules(full_path)
        if modules_to_disable:
            with open(full_path) as f:
                new_ini_lines = [self._disable_module(line, modules_to_disable)
                                 for line in f.readlines()]
            write_file_via_tempfile(''.join(new_ini_lines), full_path, 0o644)
        return True

    def _enable_module(self, line: str, modules_to_enable: Set[str]) -> str:
        """
        Search for disabled module in line, uncomment line to enable module.
        """
        match = self.disabled_pattern.match(line)
        if match is not None:
            module_name = match.group('module_name')
            if module_name in modules_to_enable:
                modules_to_enable.remove(module_name)
                return line.lstrip(';').lstrip()
        return line

    def _disable_module(self, line: str, modules_to_disable: Set[str]) -> str:
        """
        Search for enabled module in line, comment line to disable module.
        """
        match = self.enabled_pattern.match(line)
        if match is not None:
            module_name = match.group('module_name')
            if module_name in modules_to_disable:
                return f';{line}'
        return line


def _run_clwpos_as_user_in_cagefs(user=None):
    """
    All user-related actions must run inside of cagefs for security reasons.
    If solo just return because cagefs is only for shared and shared pro
    If root executed, we enter into user cagefs if user is pointed
    If not in cagefs and cagefs is enabeled for user enter into cagefs
    """
    if not is_panel_feature_supported(Feature.CAGEFS):
        return

    if not is_run_under_user():
        if user is None:
            raise WposError(message=gettext(
                "Internal Error: root enters into CageFS without specifying username"
                "Please contact support if you have questions: "
                "https://cloudlinux.zendesk.com"
            )
            )
        cmd = [CAGEFS_ENTER_USER_BIN, user] + sys.argv[:1] + sys.argv[3:]
    elif not in_cagefs() and _is_cagefs_enabled(user=user_name()):
        cmd = [CAGEFS_ENTER_UTIL] + sys.argv

    else:
        return

    p = subprocess.Popen(cmd, stdout=sys.stdout, stdin=sys.stdin, env={})
    p.communicate()
    sys.exit(p.returncode)


class RedisConfigurePidFile:
    """
    Helper class that provides methods to work with
    pid files of php redis configuration processes.
    """

    def __init__(self, php_prefix: str) -> None:
        self._pid_file_name = f'{php_prefix}-cloudlinux.pid'
        self.path = Path(CLWPOS_OPT_DIR, self._pid_file_name)

    def create(self) -> None:
        with self.path.open('w') as f:
            f.write(str(os.getpid()))

    def remove(self) -> None:
        if self.path.is_file():
            self.path.unlink()

    def exists(self) -> bool:
        return self.path.is_file()

    @property
    def pid(self) -> int:
        if not self.exists():
            return -1
        with self.path.open() as f:
            try:
                return int(f.read().strip())
            except ValueError:
                pass
        return -1


@contextmanager
def create_pid_file(php_prefix: str):
    """
    Context manager for creating pid file of current process.
    Removes pid file on exit.
    """
    pid_file = RedisConfigurePidFile(php_prefix)
    try:
        pid_file.create()
        yield
    finally:
        pid_file.remove()


def is_php_redis_configuration_running(php_prefix: str) -> bool:
    """
    Find out if PHP redis configuration process is running.
    Based on looking for presence of pid files.
    For root also checks process existence.
    """
    pid_file = RedisConfigurePidFile(php_prefix)
    if os.geteuid() != 0:
        return pid_file.exists()
    try:
        process = psutil.Process(pid_file.pid)
        return 'enable_redis' in process.name()
    except (ValueError, psutil.NoSuchProcess):
        return False


def is_alt_php_redis_configuration_running() -> bool:
    """
    Find out if alt-PHP redis configuration process is running.
    """
    return is_php_redis_configuration_running(ALT_PHP_PREFIX)


def is_ea_php_redis_configuration_running() -> bool:
    """
    Find out if ea-PHP redis configuration process is running.
    """
    return is_php_redis_configuration_running(EA_PHP_PREFIX)


def is_redis_configuration_running() -> bool:
    """
    Find out if redis configuration process
    is running for any PHP (ea-php or alt-php).
    """
    return is_alt_php_redis_configuration_running() or \
           is_ea_php_redis_configuration_running()


def update_redis_conf(new_user: WposUser, old_user: WposUser) -> None:
    """
    Replace user's wpos directory path in redis.conf.
    """
    with open(new_user.redis_conf) as f:
        redis_conf_lines = f.readlines()

    updated_lines = [
        line.replace(old_user.wpos_dir, new_user.wpos_dir) for line in redis_conf_lines
    ]
    write_file_via_tempfile(''.join(updated_lines), new_user.redis_conf, 0o600)


def update_wp_config(abs_wp_path: str, new_user: WposUser, old_user: WposUser) -> None:
    """
    Replace user's redis socket path in wp-config.php.
    """
    try:
        wp_config_lines = wp_config.read(abs_wp_path)
    except OSError as e:
        print('Error occurred during opening wp-config.php '
              f'located in path "{abs_wp_path}": {e}', file=sys.stderr)
        return

    updated_lines = [
        line.replace(old_user.redis_socket, new_user.redis_socket)
        if old_user.redis_socket in line else line
        for line in wp_config_lines
    ]
    write_file_via_tempfile(''.join(updated_lines), wp_config.path(abs_wp_path), 0o600)


def get_parent_pid() -> int:
    """
    Get parent process PID.
    """
    proc = psutil.Process(os.getpid())
    return proc.ppid()


def _is_monitoring_daemon_exists() -> bool:
    """
    Detect CL WPOS daemon presence in system
    :return: True - daemon works / False - No
    """
    # /sbin/service clwpos_monitoring status
    # retcode != 0 - clwpos_monitoring not running/not installed
    #         == 0 - clwpos_monitoring running
    returncode, _, _ = run_command(['/sbin/service', 'clwpos_monitoring', 'status'], return_full_output=True)
    if returncode != 0:
        return False
    return True


def _update_clwpos_daemon_config_systemd(systemd_unit_file) -> Tuple[int, str, str]:
    """
    Update systemd unit file and reload systemd
    """
    shutil.copy('/usr/share/cloudlinux/clwpos_monitoring.service', systemd_unit_file)
    retcode, stdout, stderr = run_command(['/usr/bin/systemctl', 'enable', 'clwpos_monitoring.service'],
                                          return_full_output=True)
    if not retcode:
        retcode, stdout, stderr = run_command(['/usr/bin/systemctl', 'daemon-reload'], return_full_output=True)
    return retcode, stdout, stderr


def _install_daemon_internal(is_solo: bool, systemd_unit_file: str,
                             is_module_allowed_on_server: bool) -> Tuple[int, str, str]:
    """
    Install WPOS daemon to system and start it
    """
    retcode, stdout, stderr = 0, None, None
    if 'el6' in platform.release():
        retcode, stdout, stderr = run_command(['/sbin/chkconfig', '--add', 'clwpos_monitoring'],
                                              return_full_output=True)
    else:
        if not is_solo and is_module_allowed_on_server:
            # CL Shared Pro and module enabled
            # Update unit file and reload systemd - setup daemon
            retcode, stdout, stderr = _update_clwpos_daemon_config_systemd(systemd_unit_file)
    if not retcode:
        retcode, stdout, stderr = run_command(['/sbin/service', 'clwpos_monitoring', 'start'],
                                              return_full_output=True)
    return retcode, stdout, stderr


def install_monitoring_daemon(is_module_allowed_on_server: bool) -> Tuple[int, str, str]:
    """
    Install WPOS daemon to server if need:
        - if daemon already present - do nothing;
        - on CL Solo install daemon always;
        - on CL Shared Pro install daemon if module allowed
    On solo and if /etc/systemd/system/clwpos_monitoring.service present it will be updated always
    We do not need restart installed daemon here, it's done in rpm_posttrans.sh
    :param is_module_allowed_on_server: True/False
    """
    systemd_unit_file = '/etc/systemd/system/clwpos_monitoring.service'
    is_solo = is_cl_solo_edition()
    # if from rpm_posttrans
    if is_solo or os.path.exists(systemd_unit_file):
        # Update unit file and reload systemd
        _update_clwpos_daemon_config_systemd(systemd_unit_file)
    if _is_monitoring_daemon_exists():
        return 0, "", ""
    return _install_daemon_internal(is_solo, systemd_unit_file, is_module_allowed_on_server)


def get_status_from_daemon(service):
    command_get_service_status_dict = {"command": f"get-{service}-status"}
    try:
        daemon_result = daemon_communicate(command_get_service_status_dict)
    except WposError:
        return False
    return daemon_result.get('status')


def redis_is_running() -> bool:
    return get_status_from_daemon('redis')


def litespeed_is_running() -> bool:
    return get_status_from_daemon('litespeed')


def clean_crons(prefix: str):
    """Remove all cron files starting from prefix"""
    crondir = '/etc/cron.d'
    for _f in iglob(f'{crondir}/{prefix}*'):
        os.unlink(_f)


def clean_clwpos_crons():
    """Remove all WPOS cron files"""
    clean_crons(prefix='clwpos_')

:: Command execute ::

Enter:
 
Select:
 

:: Search ::
  - regexp 

:: Upload ::
 
[ Read-Only ]

:: Make Dir ::
 
[ Read-Only ]
:: Make File ::
 
[ Read-Only ]

:: Go Dir ::
 
:: Go File ::
 

--[ c99shell v. 2.0 [PHP 7 Update] [25.02.2019] maintained by KaizenLouie | C99Shell Github | Generation time: 0.0392 ]--