HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //usr/local/lib/python3.9/site-packages/kombu/transport/virtual/__pycache__/base.cpython-39.pyc
a

X>h���@s�dZddlmZddlZddlZddlZddlZddlmZddlm	Z	m
Z
mZddlm
Z
ddlmZddlmZdd	lmZmZdd
lmZddlmZddlmZmZdd
lmZddlmZddl m!Z!ddl"m#Z#m$Z$ddl%m&Z&ddl'm(Z(ddl)m*Z*e�rddl+m,Z,dZ-dZ.dZ/dZ0dZ1dZ2ee3�Z4edd�Z5edd�Z6Gd d!�d!�Z7Gd"d#�d#e8�Z9Gd$d%�d%e:�Z;Gd&d'�d'�Z<Gd(d)�d)�Z=Gd*d+�d+ej>�Z>Gd,d-�d-�Z?Gd.d/�d/e?ej@�ZAGd0d1�d1ejB�ZBGd2d3�d3ejC�ZCdS)4zPVirtual transport implementation.

Emulates the AMQ API for non-AMQ transports.
�)�annotationsN)�array)�OrderedDict�defaultdict�
namedtuple)�count)�Finalize)�Empty)�	monotonic�sleep)�
TYPE_CHECKING)�queue_declare_ok_t)�ChannelError�
ResourceError)�
get_logger)�base)�emergency_dump_state)�bytes_to_str�str_to_bytes)�	FairCycle��uuid�)�STANDARD_EXCHANGE_TYPES)�
TracebackType�HzlMessage could not be delivered: No queues bound to exchange {exchange!r} using binding key {routing_key!r}.
zkCannot redeclare exchange {0!r} in vhost {1!r} with different type, durable, autodelete or arguments value.z;Requeuing undeliverable message for queue %r: No consumers.z)Restoring {0!r} unacknowledged message(s)z#UNABLE TO RESTORE {0} MESSAGES: {1}�
binding_key_t)�queue�exchange�routing_key�queue_binding_t)rr�	argumentsc@s eZdZdZdd�Zdd�ZdS)�Base64z
Base64 codec.cCstt�t|���S�N)r�base64�	b64encoder��self�s�r)�F/usr/local/lib/python3.9/site-packages/kombu/transport/virtual/base.py�encodeFsz
Base64.encodecCst�t|��Sr#)r$�	b64decoderr&r)r)r*�decodeIsz
Base64.decodeN)�__name__�
__module__�__qualname__�__doc__r+r-r)r)r)r*r"Csr"c@seZdZdZdS)�NotEquivalentErrorzAEntity declaration is not equivalent to the previous declaration.N�r.r/r0r1r)r)r)r*r2Msr2c@seZdZdZdS)�UndeliverableWarningz.The message could not be delivered to a queue.Nr3r)r)r)r*r4Qsr4c@sVeZdZdZdZdZdZddd�Zdd�Zdd�Z	d	d
�Z
dd�Zd
d�Zdd�Z
dS)�BrokerStatez2Broker state holds exchanges, queues and bindings.NcCs&|durin||_i|_tt�|_dSr#)�	exchanges�bindingsr�set�queue_index)r'r6r)r)r*�__init__rszBrokerState.__init__cCs"|j��|j��|j��dSr#)r6�clearr7r9�r'r)r)r*r;ws

zBrokerState.clearcCs|||f|jvSr#)r7)r'rrrr)r)r*�has_binding|szBrokerState.has_bindingcCs.t|||�}|j�||�|j|�|�dSr#)rr7�
setdefaultr9�add)r'rrrr!�keyr)r)r*�binding_declareszBrokerState.binding_declarecCs@t|||�}z|j|=Wnty*Yn0|j|�|�dSr#)rr7�KeyErrorr9�remove)r'rrrr@r)r)r*�binding_delete�szBrokerState.binding_deletecs:z�j�|�}Wnty"Yn0�fdd�|D�dS)Ncsg|]}�j�|d��qSr#)r7�pop)�.0Zbindingr<r)r*�
<listcomp>��z5BrokerState.queue_bindings_delete.<locals>.<listcomp>)r9rErB)r'rr7r)r<r*�queue_bindings_delete�s
z!BrokerState.queue_bindings_deletecs�fdd��j|D�S)Nc3s$|]}t|j|j�j|�VqdSr#)r rrr7)rFr@r<r)r*�	<genexpr>�s�z-BrokerState.queue_bindings.<locals>.<genexpr>)r9�r'rr)r<r*�queue_bindings�s
�zBrokerState.queue_bindings)N)r.r/r0r1r6r7r9r:r;r=rArDrIrLr)r)r)r*r5Us

	r5c@s~eZdZdZdZdZdZdZddd�Zdd�Z	d	d
�Z
dd�Zd
d�Zdd�Z
dd�Zddd�Zdd�Zddd�Zdd�ZdS)�QoSz�Quality of Service guarantees.

    Only supports `prefetch_count` at this point.

    Arguments:
    ---------
        channel (ChannelT): Connection channel.
        prefetch_count (int): Initial prefetch count (defaults to 0).
    rNTcCsR||_|pd|_t�|_d|j_t�|_|jj|_|jj	|_
t||jdd�|_
dS)NrFr)Zexitpriority)�channel�prefetch_countr�
_delivered�restoredr8�_dirtyr?�
_quick_ack�__setitem__�
_quick_appendr�restore_unacked_once�_on_collect)r'rNrOr)r)r*r:�s


�zQoS.__init__cCs$|j}|p"t|j�t|j�|kS)z�Return true if the channel can be consumed from.

        Used to ensure the client adhers to currently active
        prefetch limits.
        )rO�lenrPrR�r'Zpcountr)r)r*�can_consume�szQoS.can_consumecCs,|j}|r(t|t|j�t|j�d�SdS)a�Return the maximum number of messages allowed to be returned.

        Returns an estimated number of messages that a consumer may be allowed
        to consume at once from the broker.  This is used for services where
        bulk 'get message' calls are preferred to many individual 'get message'
        calls - like SQS.

        Returns
        -------
            int: greater than zero.
        rN)rO�maxrXrPrRrYr)r)r*�can_consume_max_estimate�szQoS.can_consume_max_estimatecCs|jr|��|�||�dS)z&Append message to transactional state.N)rR�_flushrU)r'�message�delivery_tagr)r)r*�append�sz
QoS.appendcCs
|j|Sr#)rP�r'r_r)r)r*�get�szQoS.getcCsB|j}|j}z|��}Wnty.Yq>Yn0|�|d�qdS)z'Flush dirty (acked/rejected) tags from.N)rRrPrErB)r'Zdirty�	deliveredZ	dirty_tagr)r)r*r]�s
z
QoS._flushcCs|�|�dS)z8Acknowledge message and remove from transactional state.N)rSrar)r)r*�ack�szQoS.ackFcCs$|r|j�|j|�|�|�dS)z4Remove from transactional state and requeue message.N)rN�_restore_at_beginningrPrS�r'r_�requeuer)r)r*�reject�sz
QoS.rejectc
Cs�|��|j}g}|jj}|j}|r�z|�\}}WntyHYq�Yn0z||�Wq ty�}z|�||f�WYd}~q d}~00q |��|S)z$Restore all unacknowledged messages.N)	r]rPrN�_restore�popitemrB�
BaseExceptionr`r;)r'rc�errors�restoreZpop_message�_r^�excr)r)r*�restore_unacked�s 
&zQoS.restore_unackedcCs�|j��|��|dur tjn|}|j}|jr8|jjs<dSt	|dd�rT|rPJ�dSzf|r�t
t�t
|j��|d�|��}|r�tt|��\}}t
t�t
|�|�|d�t||d�Wd|_nd|_0dS)z�Restore all unacknowledged messages at shutdown/gc collect.

        Note:
        ----
            Can only be called once for each instance, subsequent
            calls will be ignored.
        NrQ)�file)�stderrT)rW�cancelr]�sysrrrP�restore_at_shutdownrN�
do_restore�getattr�print�
RESTORING_FMT�formatrXrp�list�zip�RESTORE_PANIC_FMTrrQ)r'rr�stateZ
unrestoredrl�messagesr)r)r*rVs,
��zQoS.restore_unacked_oncecOsdS)aRestore any pending unacknowledged messages.

        To be filled in for visibility_timeout style implementations.

        Note:
        ----
            This is implementation optional, and currently only
            used by the Redis transport.
        Nr))r'�args�kwargsr)r)r*�restore_visible2szQoS.restore_visible)r)F)N)r.r/r0r1rOrPrRrur:rZr\r`rbr]rdrhrprVr�r)r)r)r*rM�s 
	

 rMcs*eZdZdZd�fdd�	Zdd�Z�ZS)�MessagezMessage object.Ncst||_|d}|�d�}|r.|�||�d��}t�jf|||d|�d�|�d�|�d�||�d�d	d
�	|��dS)N�
properties�body�
body_encodingr_�content-type�content-encoding�headers�
delivery_info�utf-8)	r�rNr_�content_type�content_encodingr�r�r�Z
postencode)�_rawrb�decode_body�superr:)r'�payloadrNr�r�r���	__class__r)r*r:As$
�
�zMessage.__init__cCsJ|j}|j�|j|�d��\}}t|j�}|�dd�|||j|j	|d�S)Nr��compression)r�r�r�r�r�)
r�rN�encode_bodyr�rb�dictr�rEr�r�)r'�propsr�rnr�r)r)r*�serializableSs
�
�zMessage.serializable)N)r.r/r0r1r:r��
__classcell__r)r)r�r*r�>sr�c@s\eZdZdZddd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	dd�Z
ddd�Zdd�ZdS)�AbstractChannelz�Abstract channel interface.

    This is an abstract class defining the channel methods
    you'd usually want to implement in a virtual channel.

    Note:
    ----
        Do not subclass directly, but rather inherit
        from :class:`Channel`.
    NcCstd��dS)zGet next message from `queue`.z$Virtual channels must implement _getN��NotImplementedError)r'r�timeoutr)r)r*�_getoszAbstractChannel._getcCstd��dS)zPut `message` onto `queue`.z$Virtual channels must implement _putNr�)r'rr^r)r)r*�_putsszAbstractChannel._putcCstd��dS)z!Remove all messages from `queue`.z&Virtual channels must implement _purgeNr�rKr)r)r*�_purgewszAbstractChannel._purgecCsdS)z<Return the number of messages in `queue` as an :class:`int`.rr)rKr)r)r*�_size{szAbstractChannel._sizecOs|�|�dS)z�Delete `queue`.

        Note:
        ----
            This just purges the queue, if you need to do more you can
            override this method.
        N�r�)r'rr�r�r)r)r*�_deleteszAbstractChannel._deletecKsdS)z�Create new queue.

        Note:
        ----
            Your transport can override this method if it needs
            to do something whenever a new queue is declared.
        Nr)�r'rr�r)r)r*�
_new_queue�szAbstractChannel._new_queuecKsdS)z�Verify that queue exists.

        Returns
        -------
            bool: Should return :const:`True` if the queue exists
                or :const:`False` otherwise.
        Tr)r�r)r)r*�
_has_queue�szAbstractChannel._has_queuecCs
|�|�S)z-Poll a list of queues for available messages.)rb)r'�cycle�callbackr�r)r)r*�_poll�szAbstractChannel._pollcCs|�|�}|||�dSr#)r�)r'rr�r^r)r)r*�_get_and_deliver�s
z AbstractChannel._get_and_deliver)N)N)
r.r/r0r1r�r�r�r�r�r�r�r�r�r)r)r)r*r�cs

	

r�c@s�eZdZdZeZeZdZee�Z	dZ
de�iZdZ
ed�ZdZdZdZdZd	Zd
d�Zdgd
d�Zdhdd�Zdidd�Zdjdd�Zdd�Zdkdd�Zdldd�Zdmdd�Zdndd�Zd d!�Zd"d#�Z d$d%�Z!d&d'�Z"d(d)�Z#d*d+�Z$d,d-�Z%dod.d/�Z&dpd0d1�Z'dqd2d3�Z(drd4d5�Z)dsd6d7�Z*d8d9�Z+d:d;�Z,dtd<d=�Z-dud>d?�Z.d@dA�Z/dBdC�Z0dvdDdE�Z1dFdG�Z2dwdHdI�Z3dxdJdK�Z4dLdM�Z5dydNdO�Z6dzdPdQ�Z7dRdS�Z8dTdU�Z9dVdWdXdYdZ�d[d\�Z:e;d]d^��Z<e;d_d`��Z=e;dadb��Z>d{dcdd�Z?dedf�Z@dS)|�Channelz�Virtual channel.

    Arguments:
    ---------
        connection (ConnectionT): The transport instance this
            channel is part of.
    TFr$rN)r��deadletter_queuer�	c	s�|�_t��_d�_i�_g�_d�_d�_�fdd��j�	�D��_��
��_�jjj
}�jD],}zt�|||�Wq`ty�Yq`0q`dS)NFcsi|]\}}||���qSr)r))rF�typ�clsr<r)r*�
<dictcomp>�sz$Channel.__init__.<locals>.<dictcomp>)�
connectionr8�
_consumers�_cycle�
_tag_to_queue�_active_queues�_qos�closed�exchange_types�items�_get_free_channel_id�
channel_id�client�transport_options�from_transport_options�setattrrB)r'r�r�Ztopts�opt_namer)r<r*r:�s"
�


zChannel.__init__�directc		Cs�|pd}|pd|}|rH||jjvrDtd�||jjjp8d�ddd��dSzD|jj|}|�|��||||||�s�t	t
�||jjjp�d���Wn.ty�||||p�igd	�|jj|<Yn0dS)
zDeclare exchange.r�zamq.%sz*NOT_FOUND - no exchange {!r} in vhost {!r}�/��2�
�Channel.exchange_declare�404N)�type�durable�auto_deleter!�table)r~r6rrzr�r��virtual_host�typeofZ
equivalentr2�NOT_EQUIVALENT_FMTrB)	r'rr�r�r�r!�nowait�passive�prevr)r)r*�exchange_declare�s6����
�r�cCs:|�|�D]\}}}|j|ddd�q
|jj�|d�dS)z'Delete `exchange` and all its bindings.T)�	if_unused�if_emptyN)�	get_table�queue_deleter~r6rE)r'rr�r��rkeyrnrr)r)r*�exchange_delete	szChannel.exchange_deletecKsj|pdt�}|rF|j|fi|��sFtd�||jjjp8d�ddd��n|j|fi|��t||�	|�d�S)zDeclare queue.z
amq.gen-%sz'NOT_FOUND - no queue {!r} in vhost {!r}r�r��Channel.queue_declarer�r)
rr�rrzr�r�r�r�r
r�)r'rr�r�r)r)r*�
queue_declares��r�c	Ksj|r|�|�rdS|j�|�D]:\}}}|�|��||||�}|j||g|�Ri|��q|j�|�dS)z
Delete queue.N)r�r~rLr��prepare_bindr�rI)	r'rr�r�r�rrr��metar)r)r*r�s
�zChannel.queue_deletecCs|�|�dSr#)r�rKr)r)r*�after_reply_message_received'sz$Channel.after_reply_message_received�cCstd��dS)Nz(transport does not support exchange_bindr��r'�destination�sourcerr�r!r)r)r*�
exchange_bind*szChannel.exchange_bindcCstd��dS)Nz*transport does not support exchange_unbindr�r�r)r)r*�exchange_unbind.szChannel.exchange_unbindcKs~|pd}|j�|||�rdS|j�||||�|jj|�dg�}|�|��||||�}|�|�|jrz|j	|g|�R�dS)z.Bind `queue` to `exchange` with `routing key`.z
amq.directNr�)
r~r=rAr6r>r�r�r`�supports_fanoutZ_queue_bind)r'rrrr!r�r�r�r)r)r*�
queue_bind2s
�
zChannel.queue_bindcsh|j�|||�z|�|�}Wnty2YdS0|�|��||||���fdd�|D�|dd�<dS)Ncsg|]}|�kr|�qSr)r))rFr��Zbinding_metar)r*rGPrHz(Channel.queue_unbind.<locals>.<listcomp>)r~rDr�rBr�r�)r'rrrr!r�r�r)r�r*�queue_unbindCs
�zChannel.queue_unbindcs�fdd��jjD�S)Nc3s.|]&}��|�D]\}}}|||fVqqdSr#)r�)rFrr��patternrr<r)r*rJSs�z(Channel.list_bindings.<locals>.<genexpr>�r~r6r<r)r<r*�
list_bindingsRs
�zChannel.list_bindingscKs
|�|�S)z%Remove all ready messages from queue.r�r�r)r)r*�queue_purgeWszChannel.queue_purgecCst�Sr#rr<r)r)r*�_next_delivery_tag[szChannel._next_delivery_tagcKsB|�|||�|r.|�|�j|||fi|��S|j||fi|��S)zPublish message.)�_inplace_augment_messager�Zdeliverr�)r'r^rrr�r)r)r*�
basic_publish^s
��zChannel.basic_publishcCsJ|�|d|j�\|d<}|d}|j||��d�|dj||d�dS)Nr�r�)r�r_r��rr)r�r��updater�)r'r^rrr�r�r)r)r*r�hs
���z Channel._inplace_augment_messagecsJ|�j|<�j�|����fdd�}|�jj|<�j�|����dS)zConsume from `queue`.cs*�j|�d�}�s"�j�||j��|�S)N�rN)r��qosr`r_)�raw_messager^�r��no_ackr'r)r*�	_callback{sz(Channel.basic_consume.<locals>._callbackN)r�r�r`r��
_callbacksr�r?�_reset_cycle)r'rr�r��consumer_tagr�r�r)r�r*�
basic_consumevs
zChannel.basic_consumecCsd||jvr`|j�|�|��|j�|d�}z|j�|�WntyNYn0|jj�|d�dS)z Cancel consumer by consumer tag.N)	r�rCr�r�rEr��
ValueErrorr�r�)r'r�rr)r)r*�basic_cancel�s
zChannel.basic_cancelcKsFz.|j|�|�|d�}|s*|j�||j�|WSty@Yn0dS)z+Get message by direct access (synchronous).r�N)r�r�r�r`r_r	)r'rr�r�r^r)r)r*�	basic_get�szChannel.basic_getcCs|j�|�dS)zAcknowledge message.N)r�rd)r'r_�multipler)r)r*�	basic_ack�szChannel.basic_ackcCs|r|j��Std��dS)zRecover unacked messages.z'Does not support recover(requeue=False)N)r�rpr�)r'rgr)r)r*�
basic_recover�s
zChannel.basic_recovercCs|jj||d�dS)zReject message.�rgN)r�rhrfr)r)r*�basic_reject�szChannel.basic_rejectcCs||j_dS)zzChange QoS settings for this channel.

        Note:
        ----
            Only `prefetch_count` is supported.
        N)r�rO)r'Z
prefetch_sizerOZapply_globalr)r)r*�	basic_qos�szChannel.basic_qoscCst|jj�Sr#)r{r~r6r<r)r)r*�
get_exchanges�szChannel.get_exchangescCs|jj|dS)z%Get table of bindings for `exchange`.r�r�)r'rr)r)r*r��szChannel.get_tablecCs6z|jj|d}Wnty*|}Yn0|j|S)z.Get the exchange type instance for `exchange`.r�)r~r6rBr�)r'r�defaultr�r)r)r*r��s

zChannel.typeofcCs�|dur|j}|s|p|gSz |�|��|�|�|||�}WntyRg}Yn0|s�|dur�t�ttj	||d���|�
|�|g}|S)z�Find all queues matching `routing_key` for the given `exchange`.

        Returns
        -------
            list[str]: queue names -- must return `[default]`
                if default is set and no queues matched.
        Nr�)r�r��lookupr�rB�warnings�warnr4�UNDELIVERABLE_FMTrzr�)r'rrr�Rr)r)r*�_lookup�s$

�

�

zChannel._lookupcCs@|j}|��}d|d<|�|d|d�D]}|�||�q*dS)z.Redeliver message to its original destination.TZredeliveredrrN)r�r�rr�)r'r^r�rr)r)r*ri�s�zChannel._restorecCs
|�|�Sr#)ri)r'r^r)r)r*re�szChannel._restore_at_beginningcCsR|p
|jj}|jrH|j��rHt|d�r6|j|j|d�S|j|j	||d�St
��dS)N�	_get_many�r�)r��_deliverr�r�rZ�hasattrrr�r�r�r	)r'r�r�r)r)r*�drain_events�s
zChannel.drain_eventscCst||j�s|j||d�S|S)z1Convert raw message to :class:`Message` instance.)r�rN)�
isinstancer�)r'r�r)r)r*�message_to_python�szChannel.message_to_pythoncCs>|pi}|�di�|�d|p"|j�||||p2i|p8id�S)zPrepare message data.r��priority)r�r�r�r�r�)r>�default_priority)r'r�rr�r�r�r�r)r)r*�prepare_message�s�zChannel.prepare_messagecCstd��dS)z�Enable/disable message flow.

        Raises
        ------
            NotImplementedError: as flow
                is not implemented by the base virtual implementation.
        z%virtual channels do not support flow.Nr�)r'�activer)r)r*�flowszChannel.flowcCsp|jsfd|_t|j�D]}|�|�q|jr6|j��|jdurP|j��d|_|jdurf|j�	|�d|_
dS)zTClose channel.

        Cancel all consumers, and requeue unacked messages.
        TN)r�r{r�r�r�rVr��closer��
close_channelr�)r'Zconsumerr)r)r*rs



z
Channel.closecCs.|r&|��dkr&|j�|��|�|fS||fS�Nr�)�lower�codecsrbr+�r'r��encodingr)r)r*r�$szChannel.encode_bodycCs&|r"|��dkr"|j�|��|�S|Sr)rrrbr-rr)r)r*r�)szChannel.decode_bodycCst|j|jt�|_dSr#)rr�r�r	r�r<r)r)r*r�.s
�zChannel._reset_cyclecCs|Sr#r)r<r)r)r*�	__enter__2szChannel.__enter__ztype[BaseException] | NonezBaseException | NonezTracebackType | None�None)�exc_type�exc_val�exc_tb�returncCs|��dSr#)r)r'r!r"r#r)r)r*�__exit__5szChannel.__exit__cCs|jjS)z/Broker state containing exchanges and bindings.)r�r~r<r)r)r*r~=sz
Channel.statecCs|jdur|�|�|_|jS)z&:class:`QoS` manager for this channel.N)r�rMr<r)r)r*r�Bs
zChannel.qoscCs|jdur|��|jSr#)r�r�r<r)r)r*r�Is
z
Channel.cyclec
CsVz$ttt|dd�|j�|j�}WntttfyB|j}Yn0|rR|j|S|S)z�Get priority from message.

        The value is limited to within a boundary of 0 to 9.

        Note:
        ----
            Higher value has more priority.
        r�r)	r[�min�int�max_priority�min_priority�	TypeErrorr�rBr)r'r^�reverserr)r)r*�_get_message_priorityOs	��zChannel._get_message_prioritycCsdt|jj�}td|jjd�D]"}||vr|jj�|�|Sqtd�t|jj	�|jj�d��dS)Nrz/No free channel ids, current={}, channel_max={})�r�)
r8r��_used_channel_ids�range�channel_maxr`rrzrX�channels)r'Zused_channel_idsr�r)r)r*r�cs

��zChannel._get_free_channel_id)Nr�FFNFF)FF)NF)FF)r�r�FN)r�r�FN)Nr�N)Nr�N)F)F)F)F)rrF)r�)N)NN)NNNNN)T)N)N)F)Ar.r/r0r1r�rMrvr�rr�r�r"rr�rZ_delivery_tagsr�r�rr)r(r:r�r�r�r�r�r�r�r�r�r�r�r�r�r�r�r�r�r�rrrrr�r�rrirerrrrrr�r�r�rr%�propertyr~r�r�r,r�r)r)r)r*r��s�	
�



�
�
�
�






�





�








r�cs0eZdZdZ�fdd�Zdd�Zdd�Z�ZS)�
Managementz'Base class for the AMQP management API.cst��|�|j��|_dSr#)r�r:r�rN)r'�	transportr�r)r*r:wszManagement.__init__cCsdd�|j��D�S)NcSsg|]\}}}|||d��qS))r�r�rr))rF�q�e�rr)r)r*rG|s�z+Management.get_bindings.<locals>.<listcomp>)rNr�r<r)r)r*�get_bindings{s�zManagement.get_bindingscCs|j��dSr#)rNrr<r)r)r*rszManagement.close)r.r/r0r1r:r8rr�r)r)r�r*r3tsr3c@s�eZdZdZeZeZeZdZdZ	dZ
dZdZdZ
ejjjdeddg�dd�Zd	d
�Zdd�Zd
d�Zdd�Zdd�Zddd�Zdd�Zdd�Zdd�Zd dd�Zedd��ZdS)!�	Transportz|Virtual transport.

    Arguments:
    ---------
        client (kombu.Connection): The client this is a transport for.
    Ng�?i��Fr�Ztopic)ZasynchronousZ
exchange_typeZ
heartbeatscKs\||_t�|_g|_g|_i|_|�|j|jt�|_	|j
�d�}|durN||_t
t�|_dS)N�polling_interval)r�r5r~r1�_avail_channelsr��Cycle�_drain_channelr	r�r�rbr:r�ARRAY_TYPE_Hr.)r'r�r�r:r)r)r*r:�szTransport.__init__cCs>z|j��WSty8|�|�}|j�|�|YS0dSr#)r;rE�
IndexErrorr�r1r`)r'r�rNr)r)r*�create_channel�s
zTransport.create_channelcCsbzTz|j�|j�Wnty&Yn0z|j�|�WntyJYn0Wd|_nd|_0dSr#)r.rCr�r�r1r�)r'rNr)r)r*r�szTransport.close_channelcCs|j�|�|��|Sr#)r;r`r@r<r)r)r*�establish_connection�szTransport.establish_connectionc	CsN|j��|j|jfD]2}|rz|��}Wnty<Yq0|��qqdSr#)r�rr;r1rE�LookupError)r'r�Z	chan_listrNr)r)r*�close_connection�s
zTransport.close_connectioncCs�t�}|jj}|j}|r(|r(||kr(|}z||j|d�Wq�tyz|durft�||krft���|durvt|�Yq(0q�q(dS)Nr
)	r
r�rbr:rr	�socketr�r)r'r�r�Z
time_startrbr:r)r)r*r�szTransport.drain_eventscCsV|std�|���z|j|}Wn(tyHt�t|�|�|�Yn
0||�dS)Nz.Received message without destination queue: {})rBrzr��logger�warning�W_NO_CONSUMERS�_reject_inbound_message)r'r^rr�r)r)r*r�s��zTransport._delivercCsF|jD]:}|r|j||d�}|j�||j�|j|jdd�qBqdS)Nr�Tr)r1r�r�r`r_r)r'r�rNr^r)r)r*rH�s
z!Transport._reject_inbound_messagecCs0|r||jvrtd�||���|j||�dS)Nz,Message for queue {!r} without consumers: {})r�rBrz)r'rNr^rr)r)r*�on_message_readys��zTransport.on_message_readycCs|j||d�S)N)r�r�)r)r'rNr�r�r)r)r*r=
szTransport._drain_channelcCs|jdd�S)N�	localhost)�port�hostname)�default_portr<r)r)r*�default_connection_params
sz#Transport.default_connection_params)N)N)r.r/r0r1r�rr<r3r�rMr1r�r:r0rr9Z
implements�extend�	frozensetr:r@rrArCrrrHrIr=r2rNr)r)r)r*r9�s6
�



r9)Dr1�
__future__rr$rDrtrr�collectionsrrr�	itertoolsrZmultiprocessing.utilrrr	�timer
r�typingrZ
amqp.protocolr
Zkombu.exceptionsrrZ	kombu.logrZkombu.transportrZkombu.utils.divrZkombu.utils.encodingrrZkombu.utils.schedulingrZkombu.utils.uuidrrr�typesrr>r	r�rGryr}r.rErr r"�	Exceptionr2�UserWarningr4r5rMr�r�Z
StdChannelr�r3r9r)r)r)r*�<module>s\


G#%BR