HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //usr/local/lib/python3.9/site-packages/celery/events/__pycache__/dispatcher.cpython-39.pyc
a

X>h#�@s�dZddlZddlZddlZddlmZmZddlmZddl	m
Z
ddlmZddl
mZdd	lmZmZmZd
ZGdd�d�ZdS)
zEvent dispatcher sends events.�N)�defaultdict�deque)�Producer)�app_or_default)�
anon_nodename)�	utcoffset�)�Event�get_exchange�
group_from)�EventDispatcherc@s�eZdZdZdhZdZdZdZd"dd�Zd	d
�Z	dd�Z
d
d�Zdd�Zde
fdd�Zddefdd�Zdedde
fdd�Zd#dd�Zdd�Zdd�Zdd�Zd d!�Zeee�ZdS)$ra0Dispatches event messages.

    Arguments:
        connection (kombu.Connection): Connection to the broker.

        hostname (str): Hostname to identify ourselves as,
            by default uses the hostname returned by
            :func:`~celery.utils.anon_nodename`.

        groups (Sequence[str]): List of groups to send events for.
            :meth:`send` will ignore send requests to groups not in this list.
            If this is :const:`None`, all events will be sent.
            Example groups include ``"task"`` and ``"worker"``.

        enabled (bool): Set to :const:`False` to not actually publish any
            events, making :meth:`send` a no-op.

        channel (kombu.Channel): Can be used instead of `connection` to specify
            an exact channel to use when sending events.

        buffer_while_offline (bool): If enabled events will be buffered
            while the connection is down. :meth:`flush` must be called
            as soon as the connection is re-established.

    Note:
        You need to :meth:`close` this after use.
    ZsqlNTr�c
Cs4t|p
|j�|_||_||_|p$t�|_||_|
p6t�|_||_	||_
tt�|_
t��|_d|_t�|_|pt|jjj|_t�|_t�|_t|p�g�|_tjtjg|_|jj|_|	|_ |s�|r�|jj!|_||_"|jp�|j�#�}
t$|
|jjj%d�|_&|
j'j(|j)v�r
d|_"|j"�r|�*�d|ji|_+t,�-�|_.dS)N)�nameF�hostname)/r�app�
connection�channelrr�buffer_while_offline�	frozenset�buffer_group�buffer_limit�on_send_bufferedr�list�
_group_buffer�	threading�Lock�mutex�producerr�_outbound_buffer�confZevent_serializer�
serializer�set�
on_enabled�on_disabled�groups�time�timezone�altzone�tzoffset�clock�
delivery_mode�client�enabledZconnection_for_writer
Zevent_exchange�exchange�	transportZdriver_type�DISABLED_TRANSPORTS�enable�headers�os�getpid�pid)�selfrrr,rrrr r$r*rrrZconninfo�r6�B/usr/local/lib/python3.9/site-packages/celery/events/dispatcher.py�__init__:s@



�zEventDispatcher.__init__cCs|S�Nr6�r5r6r6r7�	__enter__^szEventDispatcher.__enter__cGs|��dSr9)�close)r5�exc_infor6r6r7�__exit__aszEventDispatcher.__exit__cCs:t|jp|j|j|jdd�|_d|_|jD]
}|�q*dS)NF)r-r Zauto_declareT)rrrr-r rr,r"�r5�callbackr6r6r7r0ds�
zEventDispatcher.enablecCs*|jr&d|_|��|jD]
}|�qdS)NF)r,r<r#r?r6r6r7�disablems

zEventDispatcher.disableFc	Ks||rdn|j��}||f|jt�|j|d�|��}|j�0|j||fd|�dd�i|��Wd�S1sn0YdS)auPublish event using custom :class:`~kombu.Producer`.

        Arguments:
            type (str): Event type name, with group separated by dash (`-`).
                fields: Dictionary of event fields, must be json serializable.
            producer (kombu.Producer): Producer instance to use:
                only the ``publish`` method will be called.
            retry (bool): Retry in the event of connection failure.
            retry_policy (Mapping): Map of custom retry policy options.
                See :meth:`~kombu.Connection.ensure`.
            blind (bool): Don't set logical clock value (also don't forward
                the internal logical clock).
            Event (Callable): Event type used to create event.
                Defaults to :func:`Event`.
            utcoffset (Callable): Function returning the current
                utc offset in hours.
        N�rrr4r)�routing_key�-�.)r)�forwardrrr4r�_publish�replace)	r5�type�fieldsr�blindr	�kwargsr)�eventr6r6r7�publishts��
��zEventDispatcher.publishc	Cst|j}z*|j|||j|||g|j|j|jd�	Wn>tyn}z&|jsH�|j�	|||f�WYd}~n
d}~00dS)N)rCr-�retry�retry_policyZdeclarer r1r*)
r-rNrr r1r*�	Exceptionrr�append)	r5rMrrCrOrPrr-�excr6r6r7rG�s"�
zEventDispatcher._publishc
	Ks�|jr�|jt|�}}	|r&|	|vr&dS|	|jvr�|j��}
||f|j|�|j|
d�|��}|j|	}|�	|�t
|�|jkr�|��q�|j
r�|�
�n|j|||j||||d�SdS)a�Send event.

        Arguments:
            type (str): Event type name, with group separated by dash (`-`).
            retry (bool): Retry in the event of connection failure.
            retry_policy (Mapping): Map of custom retry policy options.
                See :meth:`~kombu.Connection.ensure`.
            blind (bool): Don't set logical clock value (also don't forward
                the internal logical clock).
            Event (Callable): Event type used to create event,
                defaults to :func:`Event`.
            utcoffset (Callable): unction returning the current utc offset
                in hours.
            **fields (Any): Event fields -- must be json serializable.
        NrB)rKr	rOrP)r,r$rrr)rFrr4rrR�lenr�flushrrNr)
r5rIrKrrOrPr	rJr$�groupr)rM�bufr6r6r7�send�s,


��



�zEventDispatcher.sendc	Cs�|rpt|j�}zT|j�0|D]\}}}|�||j|�qWd�n1sL0YW|j��n|j��0|r�|j�D|j��D](\}}|�||jd|�g|dd�<q�Wd�n1s�0YdS)zFlush the outbound buffer.Nz%s.multi)rrrrGr�clearr�items)	r5�errorsr$rWrMrC�_rV�eventsr6r6r7rU�s
2zEventDispatcher.flushcCs|j�|j�dS)z-Copy the outbound buffer of another instance.N)r�extend)r5�otherr6r6r7�
extend_buffer�szEventDispatcher.extend_buffercCs|j��o|j��d|_dS)zClose the event dispatcher.N)r�locked�releaserr:r6r6r7r<�szEventDispatcher.closecCs|jSr9�rr:r6r6r7�_get_publisher�szEventDispatcher._get_publishercCs
||_dSr9rc)r5rr6r6r7�_set_publisher�szEventDispatcher._set_publisher)NNTNTNNNrNr
N)TT)�__name__�
__module__�__qualname__�__doc__r/rr"r#r8r;r>r0rAr	rNrrGrXrUr`r<rdre�propertyZ	publisherr6r6r6r7rs8�
$	�
�
�
%
r)rir2rr%�collectionsrrZkomburZ
celery.apprZcelery.utils.nodenamesrZcelery.utils.timerrMr	r
r�__all__rr6r6r6r7�<module>s