File: //opt/cloudlinux/venv/lib64/python3.11/site-packages/clwpos/papi.py
"""
This module is the only one that is allowed to be
imported from other tools. Please, keep list of "external"
methods here, so we can keep them backwards-compatible.
The purpose is to reduce number of places in code which randomly
are dependencies of other tools (e.g. x-ray).
Don't user this module in clwpos code.
Maybe one day we will make proper api :)
"""
import json
import logging
import os
import pwd
import subprocess
from typing import List, Optional
from typing_extensions import TypedDict
from clwpos.billing import get_or_create_unique_identifier
from clwpos.cl_wpos_exceptions import WposError
from clwpos.constants import SMART_ADVISE_USER_UTILITY, WPOS_DAEMON_SOCKET_FILE
from clwpos.daemon import WposDaemon
from clwpos.data_collector_utils import php_info as _php_info
from clwpos.feature_suites import PremiumSuite, CDNSuitePro
from clwpos.feature_suites.configurations import get_visible_modules, get_allowed_modules, get_allowed_features_dict
from clwpos.optimization_features import Feature, OBJECT_CACHE_FEATURE, CDN_FEATURE, convert_feature_list_to_interface
from clwpos.user.config import LicenseApproveStatus, UserConfig
from clwpos.utils import (
is_wpos_supported as _is_wpos_supported,
daemon_communicate,
drop_permissions_if_needed,
get_subscription_status,
run_in_cagefs_if_needed,
get_server_wide_options
)
class HostInfo(TypedDict):
vhost: str
account: str
version: str
handler: str
documentroot: str
def php_get_vhost_versions(user: str) -> List[HostInfo]:
"""
Get information about vhosts in the following format:
[
{
'vhost': 'username.zone',
'account': 'username',
'version': 'ea-php80',
'handler': 'php-fpm',
'documentroot': '/home/username/public_html'
}
]
"""
return WposDaemon._php_get_vhost_versions(user)
class HostInfoExtended(HostInfo):
php_binary: str
def php_get_vhost_versions_user() -> List[HostInfoExtended]:
"""
Get information about vhosts in the following format:
[
{
'vhost': 'username.zone',
'account': 'username',
'version': 'ea-php80',
'php_binary': '/opt/php/php',
'handler': 'php-fpm',
'documentroot': '/home/username/public_html'
}
]
Unlike the php_get_vhost_versions, it automatically detects
current user and also performs php version change inside the cagefs,
providing additional information about php binary.
"""
result = _php_info()
return [
HostInfoExtended(
vhost=php_data['vhost'],
account=php_data['account'],
version=php_data['version'].identifier,
php_binary=php_data['version'].bin,
handler=php_data['handler'],
documentroot=php_data['documentroot'],
) for php_data in result
]
def is_wpos_supported() -> bool:
"""
Determines whether wpos is supported in current environment
"""
return _is_wpos_supported()
def is_feature_visible(feature: str, username: str) -> Optional[bool]:
"""
Determines whether feature is visible for username.
Return False in case if feature is unknown.
Return None in case if user is missing.
"""
try:
pw = pwd.getpwnam(username)
except KeyError:
return None
return Feature(feature).NAME in (
item.NAME for item in get_visible_modules(uid=pw.pw_uid)
)
def is_feature_hidden_server_wide(feature: str):
return Feature(feature).optimization_feature() in get_server_wide_options().hidden_features
def is_feature_allowed(feature: str, username: str) -> Optional[bool]:
"""
Determines whether feature is allowed for username to be activated.
Return False in case if feature is unknown.
Return None in case if user is missing.
"""
try:
pw = pwd.getpwnam(username)
except KeyError:
return None
return Feature(feature).NAME in (
item.NAME for item in get_allowed_modules(uid=pw.pw_uid)
)
def is_subscription_pending(feature: str, username: str) -> Optional[bool]:
"""
Determines whether feature is allowed for username to be activated.
"""
try:
pw = pwd.getpwnam(username)
except KeyError:
return None
try:
is_pending = daemon_communicate({
"command": WposDaemon.DAEMON_GET_UPGRADE_ATTEMPT_STATUS,
"feature": feature,
"uid": pw.pw_uid
})["pending"]
except WposError:
return False
return is_pending
def get_subscription_upgrade_url(feature: str, username: str) -> Optional[str]:
"""
Determines whether feature is allowed for username to be activated.
"""
try:
pw = pwd.getpwnam(username)
except KeyError:
return None
try:
upgrade_url = daemon_communicate({
"command": WposDaemon.DAEMON_GET_UPGRADE_LINK_COMMAND,
"uid": pw.pw_uid,
"feature": feature
})["upgrade_url"]
except WposError:
return None
return upgrade_url
def _get_cdn_usage_statistics(username):
"""Wrapper for easy mocking"""
with drop_permissions_if_needed(username):
get_usage_command = [SMART_ADVISE_USER_UTILITY, 'get-cdn-usage']
output = run_in_cagefs_if_needed(get_usage_command, check=True).stdout
try:
return json.loads(output)['data']
except KeyError:
raise subprocess.CalledProcessError(
returncode=0, cmd=get_usage_command,
output=output)
def get_subscriptions_info(username: str):
"""
Backwards compatibility for the third party tools.
Use get_subscriptions_info_2 instead.
"""
try:
user_pw = pwd.getpwnam(username)
except Exception:
logging.error('Cannot get uid for user: %s, subscription info will be empty', username)
return {}
return get_subscriptions_by_pw(user_pw)
def get_subscriptions_by_pw(user_pw: pwd.struct_passwd):
subscriptions = {}
allowed_features_dict = get_allowed_features_dict(user_pw.pw_uid)
converted_allowed_features = {feature_set: convert_feature_list_to_interface(features)
for feature_set, features in allowed_features_dict.items()}
for feature, suite_name in (
(OBJECT_CACHE_FEATURE.NAME, PremiumSuite.name),
(CDN_FEATURE.NAME, CDNSuitePro.name)
):
subscriptions[feature.lower()] = subscription = dict(
status=get_subscription_status(
converted_allowed_features, suite_name, feature.lower())
)
if feature == CDN_FEATURE.NAME:
usage = None
if is_feature_allowed(feature.lower(), user_pw.pw_name):
try:
info = _get_cdn_usage_statistics(user_pw.pw_name)
except Exception as e:
logging.warning("Error during obtaining usage, error: %s", str(e))
usage = None
else:
usage = calculate_cdn_usage(info)
subscription['usage'] = usage
return subscriptions
def calculate_cdn_usage(info):
warning = f"Content Delivery Network: You have reached your " \
f"{info['limit_bytes'] // 1024 ** 3}GB limit. " \
f"Please upgrade your subscription." \
f"" if info['total_bytes_used'] >= info['limit_bytes'] else None
bytes_used = info['total_bytes_used']
bytes_limit = info['limit_bytes']
# no usage ---> no warning
if bytes_used == 0 or bytes_limit == 0:
warning = None
usage = {
"bandwidth": {
"usage": bytes_used,
"limit": bytes_limit
},
"warning": warning
}
return usage
def get_user_auth_key(username: str):
"""
Reads configuration and gets identifier of user that
he can use to auth on provisioning server
"""
return get_or_create_unique_identifier(username)
def get_license_approve_status(feature: str, username: str) -> LicenseApproveStatus:
"""
Provides easy way for 3rd party tools to get information about license approve status
"""
with drop_permissions_if_needed(username):
uc = UserConfig(username)
feature = Feature(feature)
return uc.get_license_approve_status(feature)
def approve_license_agreement(feature: str, username: str):
"""
Provides easy way for 3rd party tools to approve license
"""
with drop_permissions_if_needed(username):
uc = UserConfig(username)
feature = Feature(feature)
uc.approve_license_agreement(feature)
def get_license_agreement_text(feature: str, username: str):
"""
Returns text of the license agreement for the given feature or None
if feature is not required to approve agreement
"""
with drop_permissions_if_needed(username):
feature = Feature(feature)
if not feature.HAS_LICENSE_TERMS:
return None
return open(feature.LICENSE_TERMS_PATH).read()
def is_wpos_visible(username: str) -> Optional[bool]:
"""
Determines whether at least one feature is
allowed for username to be activated.
"""
try:
pw = pwd.getpwnam(username)
except KeyError:
return None
return bool(get_visible_modules(uid=pw.pw_uid))
def is_smart_advice_notifications_disabled_server_wide() -> Optional[bool]:
"""
Returns whether WordPress SmartAdvice notifications are disabled.
"""
return get_server_wide_options().disable_smart_advice_notifications
def is_smart_advice_wordpress_plugin_disabled_server_wide() -> Optional[bool]:
"""
Returns whether WordPress SmartAdvice plugin is disabled on this server.
"""
return get_server_wide_options().disable_smart_advice_wordpress_plugin
def is_smart_advice_reminders_disabled_server_wide() -> Optional[bool]:
return get_server_wide_options().disable_smart_advice_reminders