HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //usr/local/lib/python3.9/site-packages/celery/utils/nodenames.py
"""Worker name utilities."""
from __future__ import annotations

import os
import socket
from functools import partial

from kombu.entity import Exchange, Queue

from .functional import memoize
from .text import simple_format

#: Exchange for worker direct queues.
WORKER_DIRECT_EXCHANGE = Exchange('C.dq2')

#: Format for worker direct queue names.
WORKER_DIRECT_QUEUE_FORMAT = '{hostname}.dq2'

#: Separator for worker node name and hostname.
NODENAME_SEP = '@'

NODENAME_DEFAULT = 'celery'

gethostname = memoize(1, Cache=dict)(socket.gethostname)

__all__ = (
    'worker_direct',
    'gethostname',
    'nodename',
    'anon_nodename',
    'nodesplit',
    'default_nodename',
    'node_format',
    'host_format',
)


def worker_direct(hostname: str | Queue) -> Queue:
    """Return the :class:`kombu.Queue` being a direct route to a worker.

    Arguments:
        hostname (str, ~kombu.Queue): The fully qualified node name of
            a worker (e.g., ``[email protected]``).  If passed a
            :class:`kombu.Queue` instance it will simply return
            that instead.
    """
    if isinstance(hostname, Queue):
        return hostname
    return Queue(
        WORKER_DIRECT_QUEUE_FORMAT.format(hostname=hostname),
        WORKER_DIRECT_EXCHANGE,
        hostname,
    )


def nodename(name: str, hostname: str) -> str:
    """Create node name from name/hostname pair."""
    return NODENAME_SEP.join((name, hostname))


def anon_nodename(hostname: str | None = None, prefix: str = 'gen') -> str:
    """Return the nodename for this process (not a worker).

    This is used for e.g. the origin task message field.
    """
    return nodename(''.join([prefix, str(os.getpid())]), hostname or gethostname())


def nodesplit(name: str) -> tuple[None, str] | list[str]:
    """Split node name into tuple of name/hostname."""
    parts = name.split(NODENAME_SEP, 1)
    if len(parts) == 1:
        return None, parts[0]
    return parts


def default_nodename(hostname: str) -> str:
    """Return the default nodename for this process."""
    name, host = nodesplit(hostname or '')
    return nodename(name or NODENAME_DEFAULT, host or gethostname())


def node_format(s: str, name: str, **extra: dict) -> str:
    """Format worker node name ([email protected])."""
    shortname, host = nodesplit(name)
    return host_format(s, host, shortname or NODENAME_DEFAULT, p=name, **extra)


def _fmt_process_index(prefix: str = '', default: str = '0') -> str:
    from .log import current_process_index

    index = current_process_index()
    return f'{prefix}{index}' if index else default


_fmt_process_index_with_prefix = partial(_fmt_process_index, '-', '')


def host_format(s: str, host: str | None = None, name: str | None = None, **extra: dict) -> str:
    """Format host %x abbreviations."""
    host = host or gethostname()
    hname, _, domain = host.partition('.')
    name = name or hname
    keys = dict(
        {
            'h': host,
            'n': name,
            'd': domain,
            'i': _fmt_process_index,
            'I': _fmt_process_index_with_prefix,
        },
        **extra,
    )
    return simple_format(s, keys)