HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //usr/local/lib/python3.9/site-packages/celery/worker/consumer/events.py
"""Worker Event Dispatcher Bootstep.

``Events`` -> :class:`celery.events.EventDispatcher`.
"""
from kombu.common import ignore_errors

from celery import bootsteps

from .connection import Connection

__all__ = ('Events',)


class Events(bootsteps.StartStopStep):
    """Service used for sending monitoring events."""

    requires = (Connection,)

    def __init__(self, c,
                 task_events=True,
                 without_heartbeat=False,
                 without_gossip=False,
                 **kwargs):
        self.groups = None if task_events else ['worker']
        self.send_events = (
            task_events or
            not without_gossip or
            not without_heartbeat
        )
        self.enabled = self.send_events
        c.event_dispatcher = None
        super().__init__(c, **kwargs)

    def start(self, c):
        # flush events sent while connection was down.
        prev = self._close(c)
        dis = c.event_dispatcher = c.app.events.Dispatcher(
            c.connection_for_write(),
            hostname=c.hostname,
            enabled=self.send_events,
            groups=self.groups,
            # we currently only buffer events when the event loop is enabled
            # XXX This excludes eventlet/gevent, which should actually buffer.
            buffer_group=['task'] if c.hub else None,
            on_send_buffered=c.on_send_event_buffered if c.hub else None,
        )
        if prev:
            dis.extend_buffer(prev)
            dis.flush()

    def stop(self, c):
        pass

    def _close(self, c):
        if c.event_dispatcher:
            dispatcher = c.event_dispatcher
            # remember changes from remote control commands:
            self.groups = dispatcher.groups

            # close custom connection
            if dispatcher.connection:
                ignore_errors(c, dispatcher.connection.close)
            ignore_errors(c, dispatcher.close)
            c.event_dispatcher = None
            return dispatcher

    def shutdown(self, c):
        self._close(c)