HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //opt/cloudlinux/venv/lib/python3.11/site-packages/cl_website_collector/website_collector.py
# -*- coding: utf-8 -*-
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2024 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

import json
import logging
import os
import time
import gzip
import base64
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Any, Optional
import requests

from lve_utils import PKG_VERSION

from clcommon.cpapi import cpusers, userdomains

from cl_website_collector.apache_processor import ApacheProcessor
from cl_website_collector.docroot_processor import DocrootProcessor
from cl_website_collector.constants import (
    VERSION,
    STORAGE_DIR,
    OPT_OUT_FILE,
    DOCROOT_SCAN_TIMEOUT,
    DOCROOT_LIMIT_PER_RUN,
    SENDING_TIMEOUT,
    SENDING_COMPRESSION,
    MAX_RECORD_SIZE,
    FILE_SIZE_LIMIT,
)


class WebsiteCollector:
    """
    Website data collection.

    Collects and exports various Apache-related data:
    - Apache configuration files (.conf) from standard directories
    - Apache system information (version, loaded modules)
    - .htaccess files from website document roots
    - Maintains processing state to avoid duplicate scans
    """

    def __init__(self, logger: logging.Logger, storage_dir: Optional[Path] = None, max_sites: Optional[int] = None):
        """
        Initialize website collector for on-demand data collection and sending.

        Args:
            storage_dir: Directory for storing website collector state
            max_sites: Maximum number of sites to process (with hash-based selection)
        """
        # Use default directories if not provided
        if storage_dir is None:
            storage_dir = STORAGE_DIR

        self.logger = logger
        self.max_sites = max_sites

        # Ensure directories exist
        self._ensure_directories(storage_dir=storage_dir)

        # Initialize components for on-demand data collection
        self.docroot_processor = DocrootProcessor(logger=self.logger)
        self.apache_processor = ApacheProcessor(logger=self.logger)

        # Initialize sending stats tracking
        self.stats_file = storage_dir / "website_collector_stats.json"
        self.dry_run_file = storage_dir / "dry-run.jsonl"
        self.sending_stats = {'global_apache_configs_sent': False, 'docroots': {}}  # default values

        # Load sending stats
        self._load_sending_stats()

    def _hash_and_sort_docroots(self, docroots_data: Dict) -> Dict:
        """
        Hash and sort docroots, then return top N based on max_sites limit.
        Takes into account already sent sites to respect the global limit.

        Args:
            docroots_data: Dict with docroot as key and data as value (unprocessed only)

        Returns:
            Dict with selected docroots (limited and sorted by hash)
        """
        if self.max_sites is None:
            return docroots_data

        # Count how many sites were already sent from this server
        already_sent_count = len(self.sending_stats.get('docroots', {}))

        # Calculate how many more sites we can send
        remaining_limit = max(0, self.max_sites - already_sent_count)

        if remaining_limit <= 0:
            self.logger.debug("Max sites limit reached: %d already sent, limit is %d",
                              already_sent_count, self.max_sites)
            return {}

        if len(docroots_data) <= remaining_limit:
            return docroots_data

        # Create list of (hash, docroot, data) tuples for consistent selection
        hashed_docroots = []
        for docroot, data in docroots_data.items():
            # Hash the docroot path for consistent, random-like ordering
            docroot_hash = hashlib.sha256(docroot.encode()).hexdigest()
            hashed_docroots.append((docroot_hash, docroot, data))

        # Sort by hash to get consistent but random-like ordering
        hashed_docroots.sort(key=lambda x: x[0])

        # Take only the remaining limit
        selected_docroots = {}
        for _, docroot, data in hashed_docroots[:remaining_limit]:
            selected_docroots[docroot] = data

        self.logger.debug("Hash-based selection: %d already sent, %d remaining limit, "
                          "selected %d from %d candidates",
                          already_sent_count, remaining_limit,
                          len(selected_docroots), len(docroots_data))

        return selected_docroots

    def _get_docroots_for_sending(self) -> Dict:
        """
        Get docroots for sending, with optional filtering and limit.

        Returns:
            Dict of docroots ready for processing (limited and filtered by sent status)
        """
        selected_docroots = {}
        processed_count = 0

        try:
            # Use Panel API, but optimize by checking processed status early
            panel_users = cpusers()

            for username in panel_users:
                try:
                    user_domains = userdomains(username)
                    user_docroots_found = {}

                    # Collect ALL domains for ALL docroots for this user first
                    for domain, docroot in user_domains:
                        docroot_path = Path(docroot)
                        if docroot_path.exists() and os.access(str(docroot_path), os.R_OK):
                            # Check if already fully sent - skip processing
                            if self.is_docroot_sent(docroot):
                                processed_count += 1
                                continue

                            # Add to user's docroots (collect ALL domains per docroot)
                            if docroot not in user_docroots_found:
                                user_docroots_found[docroot] = {
                                    "docroot": docroot,
                                    "username": username,
                                    "domains": []
                                }
                            user_docroots_found[docroot]["domains"].append(domain)

                    # Add user's docroots to global selection (check limit at docroot level)
                    limit_reached = False
                    for docroot, docroot_data in user_docroots_found.items():
                        if len(selected_docroots) >= DOCROOT_LIMIT_PER_RUN:
                            limit_reached = True
                            break
                        selected_docroots[docroot] = docroot_data

                    # If we reached the limit, stop processing more users
                    if limit_reached:
                        break

                except Exception as e:
                    self.logger.debug("Failed to get domains for user %s: %s", username, e)

        except Exception as e:
            self.logger.error("[WEBSITE-COLLECTOR] Panel API failed: %s", e)

        self.logger.debug("Found %d unprocessed docroots (skipped %d already processed)",
                          len(selected_docroots), processed_count)

        return self._hash_and_sort_docroots(selected_docroots)

    def _load_sending_stats(self) -> None:
        """Load sending statistics from tracking file."""
        try:
            if self.stats_file.exists():
                with open(self.stats_file, 'r', encoding='utf-8') as f:
                    self.sending_stats = json.load(f)
                    self.logger.debug("Loaded sending stats")
        except Exception as e:
            self.logger.debug("Failed to load sending stats: %s", e)

    def read_file_content(self, file_path: str) -> Optional[str]:
        """
        Read any file content on-demand for sending.
        Supports both Apache configs and .htaccess files.
        """
        try:
            p = Path(file_path)
            is_symlink = p.is_symlink()
            real_p = p.resolve(strict=False) if is_symlink else p

            if not (real_p.exists() and real_p.is_file() and os.access(str(real_p), os.R_OK)):
                self.logger.error("[WEBSITE-COLLECTOR] Cannot read file: %s", file_path)
                return None

            if real_p.stat().st_size > FILE_SIZE_LIMIT:
                self.logger.error("[WEBSITE-COLLECTOR] File too big: %s (size: %d bytes)", file_path,
                                  real_p.stat().st_size)
                return None

            return real_p.read_text(encoding='utf-8', errors='replace')

        except Exception as e:
            self.logger.error("[WEBSITE-COLLECTOR] Error reading file %s: %s", file_path, e)
            return None

    def _save_sending_stats(self) -> None:
        """Save sending statistics to tracking file."""
        try:
            self.sending_stats['last_updated'] = datetime.now(timezone.utc).isoformat()
            with open(self.stats_file, 'w', encoding='utf-8') as f:
                json.dump(self.sending_stats, f, indent=2, ensure_ascii=False)
            self.logger.debug("Saved sending stats")
        except Exception as e:
            self.logger.error("[WEBSITE-COLLECTOR] Failed to save sending stats: %s", e)

    def is_docroot_sent(self, docroot: str) -> bool:
        """Check if docroot data was fully sent (both htaccess and user apache configs)."""
        docroot_stats = self.sending_stats.get('docroots', {}).get(docroot, False)
        return bool(docroot_stats)

    def mark_docroot_sent(self, docroot: str, username: str) -> None:
        """Mark docroot data as sent."""
        if docroot not in self.sending_stats['docroots']:
            self.sending_stats['docroots'][docroot] = {
                'username': username
            }

        self.sending_stats['docroots'][docroot]['last_processed'] = datetime.now(timezone.utc).isoformat()

    def mark_global_apache_sent(self) -> None:
        """Mark global apache configs as sent."""
        self.sending_stats['global_apache_configs_sent'] = True

    @staticmethod
    def is_collection_disabled() -> bool:
        """
        Check if website collection is disabled via opt-out file.

        Returns:
            True if collection is disabled, False otherwise
        """
        return OPT_OUT_FILE.exists()

    def _ensure_directories(self, storage_dir: Path) -> None:
        """
        Create necessary directory structure.

        Args:
            storage_dir: Directory for storing website collector state
        """
        storage_dir.mkdir(mode=0o750, parents=True, exist_ok=True)

    def _create_base_record(self, client_id: str, system_id: str, platform: str, panel: str, server: str) -> Dict[
            str, Any]:
        """Create base record with common fields."""
        return {
            'collection_timestamp': int(time.time()),
            'collection_version': VERSION,
            'client_id': client_id,
            'system_id': system_id,
            'platform': platform,
            'panel': panel,
            'server': server,
            'data': {},
            'meta_data': {}
        }

    def _serialize_record(self, record: Dict[str, Any]) -> str:
        """
        Serialize record to JSON.

        Returns:
            Serialized JSON string
        """
        return json.dumps(record, ensure_ascii=False, separators=(',', ':'))

    def _create_new_record_part(self, base_record: Dict[str, Any]) -> Dict[str, Any]:
        """
        Create new record part with base data but empty data/meta_data sections.

        Args:
            base_record: Base record to copy from

        Returns:
            New record with empty data sections
        """
        new_record = base_record.copy()
        new_record['data'] = {}
        new_record['meta_data'] = {}
        return new_record

    def _add_file_to_records(self, file_path: str, file_content: str, is_symlink: bool,
                             current_record: Dict[str, Any], base_record: Dict[str, Any],
                             records_to_send: list) -> Dict[str, Any]:
        """
        Add file to current record, splitting if size limit exceeded.

        Args:
            file_path: Path of the file to add
            file_content: Content of the file
            is_symlink: Whether the file is a symlink
            current_record: Current record being built
            base_record: Base record template for new parts
            records_to_send: List to append completed records to

        Returns:
            Updated current record (may be new if split occurred)
        """
        # Create temporary copy to check size
        temp_record = current_record.copy()
        temp_record['data'] = current_record['data'].copy()
        temp_record['meta_data'] = current_record['meta_data'].copy()

        # Add new file to temporary copy
        temp_record['data'][file_path] = file_content
        if is_symlink:
            temp_record['meta_data'][file_path] = {'is_symlink': True}

        # Check size with actual serialization
        temp_serialized = self._serialize_record(temp_record)
        temp_size = len(temp_serialized.encode('utf-8'))

        if temp_size > MAX_RECORD_SIZE and current_record['data']:
            # Current record is full, save it and create new one
            # Store serialized data before appending
            current_serialized = self._serialize_record(current_record)
            records_to_send.append(current_serialized)
            current_record = self._create_new_record_part(base_record)

            # Add file to new record
            current_record['data'][file_path] = file_content
            if is_symlink:
                current_record['meta_data'][file_path] = {'is_symlink': True}
        else:
            # File fits, use temporary copy
            current_record = temp_record

        return current_record

    def _send_or_collect_record(self, session, api_url: str, dry_run: bool, serialized_data: str) -> bool:
        """Send record or save to JSONL file for dry run. Returns True if successful."""
        if dry_run:
            try:
                # Save to dry-run.jsonl file (append mode)
                with open(self.dry_run_file, 'a', encoding='utf-8') as f:
                    f.write(serialized_data + '\n')
                return True
            except Exception as e:
                self.logger.error("[WEBSITE-COLLECTOR] Error saving dry-run record: %s", e)
                return False
        else:
            headers = {
                "Content-Type": "text/plain",
                "X-Website-Collector-Version": PKG_VERSION,
            }
            data_to_send = serialized_data

            if SENDING_COMPRESSION:
                data_to_send = serialized_data.encode('utf-8')
                compressed_data = gzip.compress(data_to_send, compresslevel=6)

                original_bytes = len(serialized_data.encode('utf-8'))
                compressed_bytes = len(compressed_data)
                reduction = (1 - compressed_bytes / original_bytes) * 100
                self.logger.debug("Applied gzip compression: %s -> %s bytes (%.1f%% reduction)",
                                  f"{original_bytes:,}", f"{compressed_bytes:,}", reduction)

                data_to_send = base64.b64encode(compressed_data).decode("ascii")
                base64_bytes = len(data_to_send)
                self.logger.debug("Base64 encoded: %s -> %s bytes",
                                  f"{compressed_bytes:,}", f"{base64_bytes:,}")

            # Try sending with retries
            max_retries = 3
            last_error = None
            last_status_code = None

            for attempt in range(max_retries):
                try:
                    response = session.post(api_url, data=data_to_send, headers=headers, timeout=30)

                    if response.status_code == 200:
                        return True

                    last_status_code = response.status_code
                    if attempt < max_retries - 1:
                        self.logger.debug("Non-200 response %s, attempt %d/%d",
                                          response.status_code, attempt + 1, max_retries)
                        # Exponential backoff: 1s, 2s, 4s
                        time.sleep(2 ** attempt)
                    else:
                        self.logger.debug("Final attempt failed: Non-200 response %s",
                                          response.status_code)

                except Exception as e:
                    last_error = str(e)
                    if attempt < max_retries - 1:
                        self.logger.debug("HTTP error on attempt %d/%d: %s",
                                          attempt + 1, max_retries, e)
                        # Exponential backoff: 1s, 2s, 4s
                        time.sleep(2 ** attempt)
                    else:
                        self.logger.error("[WEBSITE-COLLECTOR] Final HTTP error while sending record: %s", e)

            # All retries failed - determine failure reason
            if last_status_code:
                reason = f"HTTP {last_status_code}"
            elif last_error:
                reason = f"Exception: {last_error}"
            else:
                reason = "Unknown error"

            self.logger.error("[WEBSITE-COLLECTOR] Failed to send record after %d retries. Reason: %s",
                              max_retries, reason)
            return False

    def _send_global_configs(self, session, api_url: str, dry_run: bool,
                             client_id: str, system_id: str, platform: str, panel: str, server: str) -> bool:
        """
        Send global Apache configs and system info, splitting into multiple records if needed.

        Returns:
            True if all records sent successfully, False otherwise
        """

        base_record = self._create_base_record(client_id, system_id, platform, panel, server)
        base_record.update({
            'username': '',
            'domains': [],
            'document_root': '',
        })

        current_record = self._create_new_record_part(base_record)
        records_to_send = []

        # Collect global Apache config files
        global_configs = self.apache_processor.collect_global_config_paths()
        config_paths = global_configs.get('config_paths', [])

        self.logger.debug("Found %d global Apache config files", len(config_paths))
        for i, path_info in enumerate(config_paths, 1):
            self.logger.debug("Processing global config file: %s", path_info['file_path'])

            file_content = self.read_file_content(path_info['file_path'])
            if file_content is None:
                file_content = '# ERROR: File unreadable'

            current_record = self._add_file_to_records(
                path_info['file_path'], file_content, path_info.get('is_symlink', False),
                current_record, base_record, records_to_send
            )

        # Add system info to the last record
        system_info = self.apache_processor.get_apache_system_info()
        current_record['data']['modules'] = system_info.get('loaded_modules', [])
        current_record['data']['version'] = system_info.get('apache_version', '')

        # Add last record
        current_serialized = self._serialize_record(current_record)
        records_to_send.append(current_serialized)

        # Send all records
        for i, serialized_data in enumerate(records_to_send):
            self.logger.debug("Sending global config record %d/%d", i + 1, len(records_to_send))
            if not self._send_or_collect_record(session, api_url, dry_run, serialized_data):
                return False

        return True

    def _send_docroot_data(self, docroot: str, domains: list, username: str, session, api_url: str,
                           dry_run: bool, client_id: str, system_id: str,
                           platform: str, panel: str, server: str) -> bool:
        """
        Send all docroot data, splitting into multiple records if needed.

        Returns:
            True if all records sent successfully, False otherwise
        """

        base_record = self._create_base_record(client_id, system_id, platform, panel, server)
        base_record.update({
            'username': username,
            'domains': domains,
            'document_root': docroot,
        })

        current_record = self._create_new_record_part(base_record)
        records_to_send = []

        # Collect htaccess files
        htaccess_data = self.docroot_processor.collect_htaccess_paths(docroot, domains, username,
                                                                      timeout=DOCROOT_SCAN_TIMEOUT)
        if htaccess_data and htaccess_data.get('htaccess_file_paths'):
            for htaccess_path_info in htaccess_data['htaccess_file_paths']:
                content = self.read_file_content(htaccess_path_info['real_path'])
                if content is None:
                    content = '# ERROR: File unreadable'

                current_record = self._add_file_to_records(
                    htaccess_path_info['file_path'], content, htaccess_path_info.get('is_symlink', False),
                    current_record, base_record, records_to_send
                )

        # Collect user Apache configs
        user_configs = self.apache_processor.collect_user_specific_config_paths([username])
        if user_configs and user_configs.get('config_paths'):
            for path_info in user_configs['config_paths']:
                file_content = self.read_file_content(path_info['file_path'])
                if file_content is None:
                    file_content = '# ERROR: File unreadable'

                current_record = self._add_file_to_records(
                    path_info['file_path'], file_content, path_info.get('is_symlink', False),
                    current_record, base_record, records_to_send
                )

        # Add last record if it has data or if no records were created
        if current_record['data'] or not records_to_send:
            current_serialized = self._serialize_record(current_record)
            records_to_send.append(current_serialized)

        # Send all records
        for i, serialized_data in enumerate(records_to_send):
            self.logger.debug("Sending docroot %s record %d/%d", docroot, i + 1, len(records_to_send))
            if not self._send_or_collect_record(session, api_url, dry_run, serialized_data):
                return False

        return True

    def send_data(self, system_id: str, client_id: str, platform: str, panel: str, server: str, api_url: str,
                  dry_run: bool = False) -> None:
        """
        Send data directly or save to JSONL file in dry-run mode.
        Read htaccess and apache configs on-demand and track sending status.

        Args:
            dry_run: If True, saves data to dry-run.jsonl file instead of sending to API
        """

        start_time = time.time()

        session = requests.Session() if not dry_run else None

        try:
            # Step 1: Send global Apache configs if not sent yet
            if not self.sending_stats.get('global_apache_configs_sent', False):
                self.logger.debug("%s global Apache configs", "Collecting" if dry_run else "Sending")

                if self._send_global_configs(session, api_url, dry_run, client_id, system_id, platform, panel, server):
                    if not dry_run:
                        self.mark_global_apache_sent()
                        self._save_sending_stats()
                        self.logger.debug("Global Apache configs sent successfully")

            # Step 2: Process docroots
            unprocessed_docroots = self._get_docroots_for_sending()

            for docroot, domain_data in unprocessed_docroots.items():
                if time.time() - start_time > SENDING_TIMEOUT:
                    self.logger.error("[WEBSITE-COLLECTOR] Timeout reached during docroot processing")
                    break

                self.logger.debug("Processing docroot %s for user %s", docroot, domain_data['username'])

                if self._send_docroot_data(docroot, domain_data['domains'], domain_data['username'], session, api_url,
                                           dry_run, client_id, system_id, platform, panel, server):
                    if not dry_run:
                        self.mark_docroot_sent(docroot, domain_data['username'])
                        self._save_sending_stats()
                        self.logger.debug("Docroot %s sent successfully", docroot)

            execution_time = time.time() - start_time

            if dry_run:
                self.logger.info("DRY RUN completed: saved to %s in %.1f seconds", self.dry_run_file, execution_time)
            else:
                self.logger.debug("Sequential sending completed: %.1f seconds", execution_time)

        except Exception as e:
            self.logger.error("[WEBSITE-COLLECTOR] Failed during %s: %s",
                              "dry run" if dry_run else "sequential sending",
                              e)
        finally:
            if session:
                session.close()