HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //opt/cloudlinux/venv/lib64/python3.11/site-packages/xray/manager/plesk.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

"""
This module contains classes implementing X-Ray Manager behaviour
for Plesk
"""

import os
import subprocess
import xml.etree.ElementTree as ET
from collections import ChainMap

from xray.internal import phpinfo_utils

from .base import BaseManager
from ..internal.exceptions import XRayManagerError, XRayMissingDomain
from ..internal.types import DomainInfo
from ..internal.user_plugin_utils import (
    user_mode_verification,
    with_fpm_reload_restricted,
)


class PleskManager(BaseManager):
    """
    Class implementing an X-Ray manager behaviour for Plesk
    """

    VERSIONS_Plesk = {
        "plesk-php54": "/opt/plesk/php/5.4/etc/php.d",
        "plesk-php55": "/opt/plesk/php/5.5/etc/php.d",
        "plesk-php56": "/opt/plesk/php/5.6/etc/php.d",
        "plesk-php70": "/opt/plesk/php/7.0/etc/php.d",
        "plesk-php71": "/opt/plesk/php/7.1/etc/php.d",
        "plesk-php72": "/opt/plesk/php/7.2/etc/php.d",
        "plesk-php73": "/opt/plesk/php/7.3/etc/php.d",
        "plesk-php74": "/opt/plesk/php/7.4/etc/php.d",
        "plesk-php80": "/opt/plesk/php/8.0/etc/php.d",
        "plesk-php81": "/opt/plesk/php/8.1/etc/php.d",
        "plesk-php82": "/opt/plesk/php/8.2/etc/php.d",
        "plesk-php83": "/opt/plesk/php/8.3/etc/php.d",
        "plesk-php84": "/opt/plesk/php/8.4/etc/php.d",
    }

    def supported_versions(self) -> ChainMap:
        """
        Get supported PHP versions
        :return: a chained map with basic supported versions
                and Plesk supported versions
        """
        return ChainMap(self.VERSIONS, self.VERSIONS_Plesk)

    @user_mode_verification
    @with_fpm_reload_restricted
    def get_domain_info(self, domain_name: str) -> DomainInfo:
        """
        Retrieve information about given domain from control panel environment:
        PHP version, user of domain, fpm status
        :param domain_name: name of domain
        :return: a DomainInfo object
        """

        def resolve_lsphp_version(h):
            """
            Resolve version for lsphp handlers
            :param h: original Plesk php handler id
            :return: resolved alt-php* version
            """
            if "lsphp-custom" in h:
                ver = "alt-php56"
            elif "lsphp" in h:
                ver = f"alt-php{h.split('-')[-1]}"
            else:
                ver = "-".join(h.split("-")[:2])
            return ver

        domain_data = next(
            (
                item
                for item in self.query_db()
                if item[0] == domain_name or item[0] == f"www.{domain_name}"
            ),
            None,
        )
        if domain_data is None:
            self.logger.warning(
                "Domain does not exist on the server",
                extra={"domain_name": domain_name},
            )
            raise XRayMissingDomain(domain_name)

        domain, user, handler = domain_data
        self.logger.info(
            "Retrieved domain info: domain %s owned by %s uses php version %s",
            domain_name,
            user,
            handler,
        )

        if self.phpinfo_mode:
            config = phpinfo_utils.get_php_configuration(user, domain=domain_name)
            return DomainInfo(
                name=domain_name,
                panel_php_version=config.get_full_php_version("plesk-php"),
                php_ini_scan_dir=config.absolute_ini_scan_dir,
                # indicates that there is no need to apply selector
                # and try to resolve php version, the one given in
                # php_version is final one
                is_selector_applied=True,
                user=user,
                panel_fpm=config.is_php_fpm,
                handler=handler,
            )
        else:
            return DomainInfo(
                name=domain_name,
                panel_php_version=resolve_lsphp_version(handler),
                user=user,
                panel_fpm="fpm" in handler,
                handler=handler,
            )

    @staticmethod
    def query_db() -> tuple:
        """
        Query Plesk database through plesk db utility
        and yeild parsed xml result
        :return: tuple(domain_name, domain_user, domain_handler)
        """

        def check_path_env():
            """
            plesk db utility needs to be able to find mysql executable,
            which resides in /usr/bin.
            If we do not have it in PATH, the error will be thrown:
            'exec: "mysql": executable file not found in $PATH'
            """
            if "/usr/bin" not in os.environ.get("PATH", ""):
                return {"PATH": "/usr/bin"}
            else:
                return None

        query = """select d.name,s.login,h.php_handler_id from (select id, name from domains union select dom_id, name from domain_aliases) d join hosting h on d.id=h.dom_id join sys_users s on h.sys_user_id=s.id"""
        result = subprocess.run(
            ["/usr/sbin/plesk", "db", query, "--xml"],
            capture_output=True,
            text=True,
            env=check_path_env(),
        )
        try:
            root = ET.fromstring("".join(result.stdout))
            for row in root.iter("row"):
                domain_name = row.find("./field[@name='name']").text
                user_name = row.find("./field[@name='login']").text
                handler = row.find("./field[@name='php_handler_id']").text
                yield domain_name, user_name, handler
        except ET.ParseError as e:
            raise XRayManagerError(
                _("Failed to parse XML from plesk db output: %s") % str(result.stdout)
            ) from e

    def panel_specific_selector_enabled(self, domain_info: DomainInfo) -> bool:
        """
        Check if selector is enabled specifically for Plesk panel
        :param domain_info: a DomainInfo object
        :return: True if yes, False otherwise
        """

        def same_php_in_both_selectors():
            """
            Checks if php selector and cloudlinux selector have
            the same php version.
            :param domain_info: a DomainInfo object
            :return: digits as string or None
            """
            if domain_info.selector_php_version:
                return domain_info.selector_php_version[-2:] in domain_info.handler
            return False

        if "lsphp" in domain_info.handler:
            return "custom" in domain_info.handler or same_php_in_both_selectors()
        return not domain_info.panel_fpm

    def fpm_service_name(self, dom_info: DomainInfo) -> str:
        """
        Get Plesk FPM service name
        :param dom_info: a DomainInfo object
        :return: FPM service name
        """
        return dom_info.handler