HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //usr/local/lib/python3.9/site-packages/celery/app/__pycache__/amqp.cpython-39.pyc
a

X>h\�@s
dZddlZddlmZddlmZddlmZddlm	Z	ddl
mZmZm
Z
mZmZmZddlmZdd	lmZdd
lmZddlmZddlmZdd
lmZddlmZddl m!Z!ddl"m#Z$dZ%dZ&dZ'edd�Z(ddd�Z)Gdd�de*�Z+Gdd�d�Z,dS)z/Sending/Receiving Messages (Kombu integration).�N)�
namedtuple)�Mapping)�	timedelta)�WeakValueDictionary)�
Connection�Consumer�Exchange�Producer�Queue�pools)�	Broadcast)�
maybe_list)�cached_property)�signals)�
anon_nodename)�saferepr)�indent)�maybe_make_aware�)�routes)�AMQP�Queues�task_messagei�zS
.> {0.name:<16} exchange={0.exchange.name}({0.exchange.type}) key={0.routing_key}
r��headers�
properties�body�
sent_event�utf-8cs�fdd�|��D�S)Ncs*i|]"\}}t|t�r |���n||�qS�)�
isinstance�bytes�decode)�.0�k�v��encodingr�9/usr/local/lib/python3.9/site-packages/celery/app/amqp.py�
<dictcomp>%s�zutf8dict.<locals>.<dictcomp>)�items)�dr'rr&r(�utf8dict$s
�r,cs�eZdZdZdZd!�fdd�	Z�fdd�Z�fdd	�Zd
d�Zdd
�Z	dd�Z
dd�Zdd�Zd"dd�Z
dd�Zdd�Zdd�Zdd�Zedd ��Z�ZS)#ru�Queue name⇒ declaration mapping.

    Arguments:
        queues (Iterable): Initial list/tuple or dict of queues.
        create_missing (bool): By default any unknown queues will be
            added automatically, but if this flag is disabled the occurrence
            of unknown queues in `wanted` will raise :exc:`KeyError`.
        max_priority (int): Default x-max-priority for queues with none set.
    NTc	s�t���t�|_||_||_||_|dur0tn||_||_	|dur\t
|t�s\dd�|D�}|pbi}|��D].\}}t
|t
�r�|�|�n|j|fi|��qldS)NcSsi|]}|j|�qSr)�name)r#�qrrr(r)C�z#Queues.__init__.<locals>.<dictcomp>)�super�__init__r�aliases�default_exchange�default_routing_key�create_missingr�autoexchange�max_priorityr rr*r
�add�
add_compat)	�self�queuesr3r5r6r7r4r-r.��	__class__rr(r18s
zQueues.__init__cs0z|j|WSty*t��|�YS0dS�N)r2�KeyErrorr0�__getitem__�r:r-r<rr(r@HszQueues.__getitem__cs8|jr|js|j|_t��||�|jr4||j|j<dSr>)r3�exchanger0�__setitem__�aliasr2)r:r-�queuer<rr(rCNs
zQueues.__setitem__cCs"|jr|�|�|��St|��dSr>)r5r8�new_missingr?rArrr(�__missing__UszQueues.__missing__cKs&t|t�s|j|fi|��S|�|�S)a�Add new queue.

        The first argument can either be a :class:`kombu.Queue` instance,
        or the name of a queue.  If the former the rest of the keyword
        arguments are ignored, and options are simply taken from the queue
        instance.

        Arguments:
            queue (kombu.Queue, str): Queue to add.
            exchange (kombu.Exchange, str):
                if queue is str, specifies exchange name.
            routing_key (str): if queue is str, specifies binding key.
            exchange_type (str): if queue is str, specifies type of exchange.
            **options (Any): Additional declaration options used when
                queue is a str.
        )r r
r9�_add)r:rE�kwargsrrr(r8Zs
z
Queues.addcKs>|�d|�d��|ddur&||d<|�tj|fi|���S)N�routing_keyZbinding_key)�
setdefault�getrHr
�	from_dict)r:r-�optionsrrr(r9oszQueues.add_compatcCs`|jdus|jjdkr|j|_|js,|j|_|jdurR|jdurFi|_|�|j�|||j<|S)N�)rBr-r3rJr4r7�queue_arguments�_set_max_priority)r:rErrr(rHvs


zQueues._addcCs&d|vr"|jdur"|�d|ji�SdS)Nzx-max-priority)r7�update)r:�argsrrr(rQ�szQueues._set_max_priorityrcCs\|j}|sdSdd�t|���D�}|r8td�|�|�S|ddtd�|dd��|�S)z/Format routing table into string for log dumps.rOcSsg|]\}}t���|��qSr)�QUEUE_FORMAT�strip�format)r#�_r.rrr(�
<listcomp>�s�z!Queues.format.<locals>.<listcomp>�
rrN)�consume_from�sortedr*�
textindent�join)r:r�indent_first�active�inforrr(rV�s
�z
Queues.formatcKs,|j|fi|��}|jdur(||j|j<|S)z�Add new task queue that'll be consumed from.

        The queue will be active even when a subset has been selected
        using the :option:`celery worker -Q` option.
        N)r8�
_consume_fromr-)r:rErIr.rrr(�
select_add�s
zQueues.select_addcs |r�fdd�t|�D��_dS)z�Select a subset of currently defined queues to consume from.

        Arguments:
            include (Sequence[str], str): Names of queues to consume from.
        csi|]}|�|�qSrr)r#r-�r:rr(r)�sz!Queues.select.<locals>.<dictcomp>N)r
ra)r:�includerrcr(�select�s
�z
Queues.selectcsJ�rFt���|jdur.|��fdd�|D��S�D]}|j�|d�q2dS)z�Deselect queues so that they won't be consumed from.

        Arguments:
            exclude (Sequence[str], str): Names of queues to avoid
                consuming from.
        Nc3s|]}|�vr|VqdSr>r)r#r$��excluderr(�	<genexpr>�r/z"Queues.deselect.<locals>.<genexpr>)r
rare�pop)r:rgrErrfr(�deselect�s
zQueues.deselectcCst||�|�|�Sr>)r
r6rArrr(rF�szQueues.new_missingcCs|jdur|jS|Sr>)rarcrrr(rZ�s
zQueues.consume_from)NNTNNN)rT)�__name__�
__module__�__qualname__�__doc__rar1r@rCrGr8r9rHrQrVrbrerjrF�propertyrZ�
__classcell__rrr<r(r)s(�
rc@s*eZdZdZeZeZeZeZeZ	dZ
dZdZdZ
dZdd�Zedd��Zedd	��Zd0d
d�Zd1dd
�Zdd�Zd2dd�Zd3dd�Zd4dd�Zdd�Zdd�Zedd��Zedd��Zejd d��Zed!d"��Zed#d$��Zejd%d$��Zed&d'��Z e Z!ed(d)��Z"ed*d+��Z#ed,d-��Z$d.d/�Z%dS)5rzApp AMQP API: app.amqp.NicCs*||_|j|jd�|_|jj�|j�dS)N)r�)�app�
as_task_v1�
as_task_v2�task_protocolsZ_confZbind_to�_handle_conf_update)r:rrrrr(r1�s
�z
AMQP.__init__cCs|j|jjjSr>)rurr�confZ
task_protocolrcrrr(�create_task_message�szAMQP.create_task_messagecCs|��Sr>)�_create_task_senderrcrrr(�send_task_message�szAMQP.send_task_messagecCs�|jj}|j}|dur|j}|dur*|j}|s`|jr`d}|jdkrJddi}t|j|j||d�f}|durn|j	n|}|�
||j||||�S)NZquorumzx-queue-type)rBrJrP)rrrw�task_default_routing_key�task_create_missing_queuesZtask_queue_max_priority�task_default_queueZtask_default_queue_typer
r3r6�
queues_cls)r:r;r5r6r7rwr4rPrrr(r�s,

���zAMQP.QueuescCs&tj|j|p|j|j�d|�|jd�S)zReturn the current task router.r|)rr)�_routes�Routerrr;rrZeither)r:r;r5rrr(r�
s��zAMQP.RoutercCst�|jjj�|_dSr>)r�preparerrrw�task_routes�_rtablercrrr(�flush_routesszAMQP.flush_routescKs:|dur|jjj}|j|f||p.t|jj���d�|��S)N)�acceptr;)rrrwZaccept_contentr�listr;rZ�values)r:Zchannelr;r��kwrrr(�TaskConsumers
���zAMQP.TaskConsumerrFc!s�|pd}|pi}t|ttf�s&td��t|t�s8td��|rx|�|d�|pT|j��}|p`|jj}t	|t
|d�|d�}t|	tj�r�|�|	d�|p�|j��}|p�|jj}t	|t
|	d�|d�}	t|t
�s�|o�|��}t|	t
�s�|	o�|	��}	|du�rt||j�}|du�rt||j�}|�s"|}�fdd	�|�p4gD�}d
|||||	|||
||g|||||�pdt�||||d�} t| ||�p�dd
�||||
||d�f|�r�|||||||
||	d�	ndd�S)Nr�!task args must be a list or tuple�(task keyword arguments must be a mapping�	countdown��seconds)�tz�expirescsi|]}|�|�qSrr)r#�header�rNrr(r)Hr/z#AMQP.as_task_v2.<locals>.<dictcomp>�py)�lang�task�id�shadow�etar��group�group_index�retries�	timelimit�root_id�	parent_id�argsrepr�
kwargsrepr�origin�
ignore_result�replaced_task_nesting�stamped_headers�stampsrO�Zcorrelation_id�reply_to)�	callbacks�errbacks�chain�chord)	�uuidr�r�r-rSrIr�r�r�r)r r��tuple�	TypeErrorr�_verify_secondsrr�now�timezonerr�numbers�Real�str�	isoformatr�argsrepr_maxsize�kwargsrepr_maxsizerr)!r:�task_idr-rSrIr�r��group_idr�r�r�r�r�r�r��
time_limit�soft_time_limit�create_sent_eventr�r�r�r�r�r�r�r�r�r�r�r�rNr�rrr�r(rts�

��




������
�zAMQP.as_task_v2cKs|pd}|pi}|j}t|ttf�s,td��t|t�s>td��|rj|�|d�|pZ|j��}|t	|d�}t|	t
j�r�|�|	d�|p�|j��}|t	|	d�}	|o�|��}|	o�|	��}	t
i||p�dd�|||||||
||	|||
||f||d	�|�r||t|�t|�|
||	d
�ndd�S)Nrr�r�r�r�r�rOr�)r�r�rSrIr�r�r�r�r��utcr�r�r��tasksetr�)r�r-rSrIr�r�r�r)r�r r�r�r�rr�rrr�rr�r�r�rr)r:r�r-rSrIr�r�r�r�r�r�r�r�r�r�r�r�r�r�r�r�r�r�Z
compat_kwargsr�rrr(rszsd
�����zAMQP.as_task_v1cCs|tkrt|�d|����|S)Nz is out of range: )�INT_MIN�
ValueError)r:�s�whatrrr(r��szAMQP._verify_secondscs�|jjj�|jjj�|jjj�|j�|j�tjj	�
tjj
�tjj	�tjj
�tjj	�tjj
�|j
�|j�|jjj�	|jjj�
|jjj�d����������	�
���
��fdd�	}|S)Ncs�|dur�n|}|\}}}}|
r*|�|
�|r8|�|�|}|durP|durP�}|durxt|t�rr|�|}}n|j}|
dur�z|jj}
Wnty�Yn0|
p��}
|dur�z|jj}Wnty�d}Yn0|r�|s�|dkr�d|}}n*|du�r|jj�p�}|�p|j�p�	}|du�r<|�r<t|t	��s<|g}|du�rJ�n|}|�rdt
�fi|��n�}��r��
||||||||d�|j|f|||	�p��
|�p��|||
||||d�|��}��rԈ|||||d���r<t|t��r�||d||d|d|d	|d
d�n*�||d||d|d
|d	|dd�|�r�|�pJ�}|}t|t
��rb|j}|�|||d��|jd||||d�|S)N�directrO)�senderrrBrJ�declarerr�retry_policy)rBrJ�
serializer�compression�retryr��
delivery_moder�r�timeout�confirm_timeout)r�rrrBrJr�rrr�r�)r�r�r�rSrIr�r�rSrIr�)rErBrJz	task-sent)r�r�)rRr r�r-rBr��AttributeError�typerJr�dict�publishr�r)Zproducerr-�messagerBrJrEZevent_dispatcherr�r�r�r�r�r�rZ
exchange_typer�r�rIZheaders2rrr�qnameZ_rp�retZevdZexname�Zafter_receiversZbefore_receiversZdefault_compressorZdefault_delivery_modeZdefault_evdr3Zdefault_policy�
default_queueZ
default_retryZdefault_rkeyZdefault_serializerr;Zsend_after_publishZsend_before_publishZsend_task_sentZsent_receiversrr(rz�s�




����
��
�
�
��z3AMQP._create_task_sender.<locals>.send_task_message)NNNNNNNNNNNNNN)rrrwZtask_publish_retryZtask_publish_retry_policyZtask_default_delivery_moder�r;rZbefore_task_publish�sendZ	receiversZafter_task_publishZ	task_sent�_event_dispatcherr3r{Ztask_serializerZtask_compression)r:rzrr�r(ry�s2





�,dzAMQP._create_task_sendercCs|j|jjjSr>)r;rrrwr}rcrrr(r�6szAMQP.default_queuecCs|�|jjj�S)u"Queue name⇒ declaration mapping.)rrrrwZtask_queuesrcrrr(r;:szAMQP.queuescCs
|�|�Sr>)r)r:r;rrr(r;?scCs|jdur|��|jSr>)r�r�rcrrr(rCs
zAMQP.routescCs|��Sr>)r�rcrrr(�routerIszAMQP.routercCs|Sr>r)r:�valuerrr(r�MscCs0|jdur*tj|j��|_|jjj|j_|jSr>)�_producer_poolrZ	producersrrZconnection_for_write�pool�limitrcrrr(�
producer_poolQs
�zAMQP.producer_poolcCst|jjj|jjj�Sr>)rrrrwZtask_default_exchangeZtask_default_exchange_typercrrr(r3Zs
�zAMQP.default_exchangecCs
|jjjSr>)rrrwZ
enable_utcrcrrr(r�_szAMQP.utccCs|jjjdd�S)NF)�enabled)rr�eventsZ
Dispatcherrcrrr(r�cszAMQP._event_dispatchercOs&d|vsd|vr"|��|��|_dS)Nr�)r�r�r�)r:rSrIrrr(rvis
zAMQP._handle_conf_update)NNN)NN)NN)NNNNNNNrNNNNNNFNNNNNNNFNNNr)NNNNNNNrNNNNNNFNNNNN)&rkrlrmrnrrr	ZBrokerConnectionrr~r�r�r6r�r�r1rrxrzr�r�r�rtrsr�ryr�r;�setterrorr�r�Zpublisher_poolr3r�r�rvrrrr(r�s�

�


	�
^�
<{









r)r)-rnr��collectionsr�collections.abcr�datetimer�weakrefrZkomburrrr	r
rZkombu.commonrZkombu.utils.functionalr
Zkombu.utils.objectsrZceleryrZcelery.utils.nodenamesrZcelery.utils.safereprrZcelery.utils.textrr\Zcelery.utils.timerrOrr�__all__r�rTrr,r�rrrrrr(�<module>s2 �