HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //usr/local/lib/python3.9/site-packages/kombu/transport/__pycache__/redis.cpython-39.pyc
a

X>h��@s�dZddlmZddlZddlZddlZddlmZddlmZddl	m
Z
ddlmZddl
mZdd	lmZdd
lmZddlmZddlmZmZdd
lmZddlmZddlmZddlmZmZm Z ddl!m"Z"ddl#m$Z$m%Z%ddl&m'Z'ddl(m)Z)ddl*m+Z+ddl,m-Z-z ddl.Z.eed��ed�kZ/Wne0�yXdZ.dZ/Yn0zddl.m1Z1Wne0�y�dZ1Yn0ed�Z2e2j3e2j4Z5Z4dZ6dZ7dZ8gd�Z9edd �Z:d!d"�Z;d#d$�Z<Gd%d&�d&e=�Z>e
d'd(��Z?d)d*�Z@Gd+d,�d,�ZAGd-d.�d.eAe.jB�ZCGd/d0�d0eAe.jDjE�ZFGd1d2�d2e.jDjG�ZHGd3d4�d4e-jI�ZIGd5d6�d6�ZJGd7d8�d8e-jK�ZKGd9d:�d:e-jL�ZLe1�r�Gd;d<�d<e1jMe.jN�ZOGd=d>�d>eK�ZPGd?d@�d@eL�ZQdS)Aa�Redis transport module for Kombu.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: No

Connection String
=================
Connection string has the following format:

.. code-block::

    redis://[USER:PASSWORD@]REDIS_ADDRESS[:PORT][/VIRTUALHOST]
    rediss://[USER:PASSWORD@]REDIS_ADDRESS[:PORT][/VIRTUALHOST]

To use sentinel for dynamic Redis discovery,
the connection string has following format:

.. code-block::

    sentinel://[USER:PASSWORD@]SENTINEL_ADDRESS[:PORT]

Transport Options
=================
* ``sep``
* ``ack_emulation``: (bool) If set to True transport will
  simulate Acknowledge of AMQP protocol.
* ``unacked_key``
* ``unacked_index_key``
* ``unacked_mutex_key``
* ``unacked_mutex_expire``
* ``visibility_timeout``
* ``unacked_restore_limit``
* ``fanout_prefix``
* ``fanout_patterns``
* ``global_keyprefix``: (str) The global key prefix to be prepended to all keys
  used by Kombu
* ``socket_timeout``
* ``socket_connect_timeout``
* ``socket_keepalive``
* ``socket_keepalive_options``
* ``queue_order_strategy``
* ``max_connections``
* ``health_check_interval``
* ``retry_on_timeout``
* ``priority_steps``
�)�annotationsN)�bisect)�
namedtuple)�contextmanager)�version)�Empty)�time)�Version)�promise)�InconsistencyError�VersionMismatch)�
get_logger)�register_after_fork)�bytes_to_str)�ERR�READ�poll)�accepts_argument)�dumps�loads)�cached_property)�
cycle_by_name)�
_parse_url�)�virtual�redisz5.3.0)�sentinelzkombu.transport.redisi��)r���	�error_classes_t)�connection_errors�channel_errorsc
Csbddlm}t|d�r|j}n|j}ttjjt	t
jtt
|j|j|j|jftjj||j|jf�S)z$Return tuple of redis error classes.r��
exceptions�InvalidData)rr%�hasattrr&�	DataErrorr!r�	Transportr"r�socket�error�IOError�OSError�ConnectionErrorZBusyLoadingError�AuthenticationError�TimeoutErrorr#ZInvalidResponse�
ResponseError)r%r(�r2�?/usr/local/lib/python3.9/site-packages/kombu/transport/redis.py�get_redis_error_classes}s*
�	��r4cCsddlm}|jS)z1Return the redis ConnectionError exception class.rr$)rr%r.r$r2r2r3�get_redis_ConnectionError�sr5c@seZdZdZdS)�	MutexHeldz)Raised when another party holds the lock.N��__name__�
__module__�__qualname__�__doc__r2r2r2r3r6�sr6ccs�|j||d�}d}zJ|jdd�}|r,dVnt��W|r�z|��Wq�tjjyZYq�0n*|r�z|��Wntjjy�Yn00dS)zTAcquire redis lock in non blocking way.

    Raise MutexHeld if not successful.
    ��timeoutF)�blockingN)�lock�acquirer6�releaserr%ZLockNotOwnedError)�client�nameZexpirer?Z
lock_acquiredr2r2r3�Mutex�s"�rDcCs|��dS�N)�_after_fork��channelr2r2r3�_after_fork_cleanup_channel�srIcsleZdZdZgd�Zddd�ddd�ddd�ddd�d	�Zd
d�Z�fdd
�Z�fdd�Zddd�Z	�Z
S)�GlobalKeyPrefixMixinaMixin to provide common logic for global key prefixing.

    Overriding all the methods used by Kombu with the same key prefixing logic
    would be cumbersome and inefficient. Hence, we override the command
    execution logic that is called by all commands.
    )ZHDELZHGETZHLENZHSETZLLENZLPUSHZPUBLISHZRPUSHZRPOPZSADDZSREMZSETZSMEMBERSZZADDZZREMZZREVRANGEBYSCORErN)�
args_start�args_end����r)ZDEL�BRPOPZEVALSHAZWATCHcs�t|�}|�d�}|�jvr4�jt|d�|d<nx|�jvr��j|d}�j|d}|dkrn|d|�ng}g}|dur�||d�}|�fdd�|||�D�|}|g|�S)NrrKrLcsg|]}�jt|��qSr2��global_keyprefix�str��.0�arg��selfr2r3�
<listcomp>�s�z5GlobalKeyPrefixMixin._prefix_args.<locals>.<listcomp>)�list�pop�PREFIXED_SIMPLE_COMMANDSrQrR�PREFIXED_COMPLEX_COMMANDS)rW�args�commandrKrLZpre_argsZ	post_argsr2rVr3�_prefix_args�s"



��z!GlobalKeyPrefixMixin._prefix_argscsHt�j||fi|��}|dkrD|rD|\}}|t|j�d�}||fS|S)z�Parse a response from the Redis server.

        Method wraps ``redis.parse_response()`` to remove prefixes of keys
        returned by redis command.
        rON)�super�parse_response�lenrQ)rW�
connection�command_name�options�ret�key�value��	__class__r2r3ra�sz#GlobalKeyPrefixMixin.parse_responsecst�j|�|�i|��SrE�r`�execute_commandr_�rWr]�kwargsrir2r3rlsz$GlobalKeyPrefixMixin.execute_commandTcCst|j|j|||jd�S)N�rQ)�PrefixedRedisPipeline�connection_poolZresponse_callbacksrQ)rW�transactionZ
shard_hintr2r2r3�pipelines�zGlobalKeyPrefixMixin.pipeline)TN)r8r9r:r;r[r\r_rarlrs�
__classcell__r2r2rir3rJ�s�
rJc@s eZdZdZdd�Zdd�ZdS)�PrefixedStrictRedisz@Returns a ``StrictRedis`` client that prefixes the keys it uses.cOs,|�dd�|_tjj|g|�Ri|��dS�NrQ�)rZrQr�Redis�__init__rmr2r2r3ryszPrefixedStrictRedis.__init__cKst|jfd|ji|��S)NrQ)�PrefixedRedisPubSubrqrQ)rWrnr2r2r3�pubsubs���zPrefixedStrictRedis.pubsubN)r8r9r:r;ryr{r2r2r2r3rusruc@seZdZdZdd�ZdS)rpaCustom Redis pipeline that takes global_keyprefix into consideration.

    As the ``PrefixedStrictRedis`` client uses the `global_keyprefix` to prefix
    the keys it uses, the pipeline called by the client must be able to prefix
    the keys as well.
    cOs.|�dd�|_tjjj|g|�Ri|��dSrv)rZrQrrB�Pipelineryrmr2r2r3ry&szPrefixedRedisPipeline.__init__N)r8r9r:r;ryr2r2r2r3rpsrpcsDeZdZdZdZ�fdd�Zdd�Z�fdd�Z�fd	d
�Z�Z	S)rzzCRedis pubsub client that takes global_keyprefix into consideration.)Z	SUBSCRIBEZUNSUBSCRIBEZ
PSUBSCRIBEZPUNSUBSCRIBEcs$|�dd�|_t�j|i|��dSrv)rZrQr`ryrmrir2r3ry5szPrefixedRedisPubSub.__init__cs8t|�}|�d�}|�jvr.�fdd�|D�}|g|�S)Nrcsg|]}�jt|��qSr2rPrSrVr2r3rX>s�z4PrefixedRedisPubSub._prefix_args.<locals>.<listcomp>)rYrZ�PUBSUB_COMMANDS)rWr]r^r2rVr3r_9s


�z PrefixedRedisPubSub._prefix_argscsFt�j|i|��}|dur|S|�^}}}|g�fdd�|D��|�S)z�Parse a response from the Redis server.

        Method wraps ``PubSub.parse_response()`` to remove prefixes of keys
        returned by redis command.
        Ncsg|]}|t�j�d��qSrE)rbrQ)rTrHrVr2r3rXW�z6PrefixedRedisPubSub.parse_response.<locals>.<listcomp>)r`ra)rWr]rnrfZmessage_typeZchannels�messagerirVr3raEs���z"PrefixedRedisPubSub.parse_responsecst�j|�|�i|��SrErkrmrir2r3rl[sz#PrefixedRedisPubSub.execute_command)
r8r9r:r;r}ryr_rarlrtr2r2rir3rz+srzcs�eZdZdZdZ�fdd�Z�fdd�Zd#dd	�Z�fd
d�Zd$�fd
d�	Z	e
d%dd��Zd&dd�Zd'dd�Z
d(dd�Zedd��Zedd��Zedd��Zedd ��Zed!d"��Z�ZS))�QoSzRedis Ack Emulation.Tcst�j|i|��d|_dS)Nr)r`ry�_vrestore_countrmrir2r3rydszQoS.__init__c	s�|j}|d|d}}tjddkr4|t�ig}n
t�|g}|���N}|j|jg|�R��|j|t	|j
||g����t��
||�Wd�n1s�0YdS)N�exchange�routing_keyrr)�
delivery_infor�VERSIONr�pipe_or_acquireZzadd�unacked_index_keyZhset�unacked_keyr�_raw�executer`�append)rWr�delivery_tagZdelivery�EX�RKZ	zadd_args�piperir2r3r�hs

�z
QoS.appendNcCsT|j�|��*}|jD]}|j||d�qWd�n1s<0Y|j��dS)N�rB)rH�conn_or_acquireZ
_delivered�restore_by_tag�clear)rWrB�tagr2r2r3�restore_unackedys
.zQoS.restore_unackedcs|�|���t��|�dSrE)�_remove_from_indicesr�r`�ack)rWr�rir2r3r�szQoS.ackFcs2|r|j|dd�n|�|���t��|�dS�NT)�leftmost)r�r�r�r`r�)rWr�Zrequeuerir2r3�reject�sz
QoS.rejectccsF|r|Vn6|j�|��}|��VWd�n1s80YdSrE)rHr�rs)rWr�rBr2r2r3r��szQoS.pipe_or_acquirecCsF|�|��(}|�|j|��|j|�Wd�S1s80YdSrE)r�Zzremr��hdelr�)rWr�r�r2r2r3r��s�zQoS._remove_from_indicesr�
c	
Cs�|jd7_|jd|r dS|j����}t�|j}znt||j|j��J|j|j	|d|o^||dd�}|pngD]\}}|�
||�qpWd�n1s�0YWnty�Yn0Wd�n1s�0YdS)NrrT)�start�numZ
withscores)r�rHr�r�visibility_timeoutrD�unacked_mutex_key�unacked_mutex_expireZzrevrangebyscorer�r�r6)	rWr�r��intervalrB�ceilZvisibler�Zscorer2r2r3�restore_visible�s"�
�0zQoS.restore_visiblecsN���fdd�}�j�|��}|�|�j�Wd�n1s@0YdS)NcsP|��j��}|�����|�|rLtt|��\}}}�j�||||��dSrE)�hgetr��multir�rrrH�_do_restore_message)r��p�Mr�r��r�rWr�r2r3�restore_transaction�sz/QoS.restore_by_tag.<locals>.restore_transaction)rHr�rrr�)rWr�rBr�r�r2r�r3r��szQoS.restore_by_tagcCs|jjSrE)rHr�rVr2r2r3r��szQoS.unacked_keycCs|jjSrE)rHr�rVr2r2r3r��szQoS.unacked_index_keycCs|jjSrE)rHr�rVr2r2r3r��szQoS.unacked_mutex_keycCs|jjSrE)rHr�rVr2r2r3r��szQoS.unacked_mutex_expirecCs|jjSrE)rHr�rVr2r2r3r��szQoS.visibility_timeout)N)F)NN)N)rr�r�)NF)r8r9r:r;Zrestore_at_shutdownryr�r�r�r�rr�r�r�r�rr�r�r�r�r�rtr2r2rir3r�_s,








r�c@s�eZdZdZeeBZdZdZdd�Z	dd�Z
dd	�Zd
d�Zdd
�Z
dd�Zdd�Zdd�Zdd�Zdd�Zdd�Zdd�Zdd�Zdd�Zd d!�Zd"d#�Zd(d$d%�Zed&d'��ZdS))�MultiChannelPollerz%Async I/O poller for Redis transport.FNcCs(t�|_i|_i|_t�|_t�|_dSrE)�set�	_channels�_fd_to_chan�
_chan_to_sockr�poller�
after_readrVr2r2r3ry�s
zMultiChannelPoller.__init__c
CsZ|j��D],}z|j�|�Wq
ttfy4Yq
0q
|j��|j��|j��dSrE)	r��valuesr��
unregister�KeyError�
ValueErrorr�r�r�)rW�fdr2r2r3�close�s

zMultiChannelPoller.closecCs|j�|�dSrE)r��add�rWrHr2r2r3r��szMultiChannelPoller.addcCs|j�|�dSrE)r��discardr�r2r2r3r��szMultiChannelPoller.discardc	Cs.z|j�|j�Wnttfy(Yn0dSrE)r�r��_sock�AttributeError�	TypeError�rWrcr2r2r3�_on_connection_disconnect�sz,MultiChannelPoller._on_connection_disconnectcCsr|||f|jvr|�|||�|jjdur4|j��|jj}||f|j|��<||j|||f<|j�||j	�dSrE)
r��_unregisterrcr��connectr��filenor��register�
eventflags)rWrHrB�type�sockr2r2r3�	_register�s
zMultiChannelPoller._registercCs|j�|j|||f�dSrE)r�r�r�)rWrHrBr�r2r2r3r�szMultiChannelPoller._unregistercCsLt|dd�dur0tr"|j��|_n|j�d�|_|jjduoJ|||f|jvS)Nrc�_)�getattr�"_REDIS_GET_CONNECTION_WITHOUT_ARGSrq�get_connectionrcr�r�)rWrHrB�cmdr2r2r3�_client_registereds�z%MultiChannelPoller._client_registeredcCs>||jdf}|�||jd�s,d|_|j|�|js:|��dS)zEnable BRPOP mode for channel.rOFN)rBr��_in_pollr��_brpop_start)rWrH�identr2r2r3�_register_BRPOP
s
z"MultiChannelPoller._register_BRPOPcCs8|�||jd�s&d|_|�||jd�|js4|��dS)zEnable LISTEN mode for channel.�LISTENFN)r��	subclient�
_in_listenr��
_subscriber�r2r2r3�_register_LISTENs
z#MultiChannelPoller._register_LISTENcCs:|jD].}|jr$|j��r$|�|�|jr|�|�qdSrE)r��
active_queues�qos�can_consumer��active_fanout_queuesr�r�r2r2r3�
on_poll_starts


z MultiChannelPoller.on_poll_startcCs(||_|jD]}|jj|jd�SdS�N)r�)r�r�r�r��unacked_restore_limit)rWr�rHr2r2r3�on_poll_init&s

�zMultiChannelPoller.on_poll_initcCs*|jD]}|jr|jj|jd�SqdSr�)r�r�r�r�r�r�r2r2r3�maybe_restore_messages-s

�z)MultiChannelPoller.maybe_restore_messagescCs<|jD]0}|j�d�}|durtt|dd��r|��qdS)Nr��check_health)r��__dict__�get�callabler�r�)rWrHrBr2r2r3�maybe_check_subclient_health5s
�z/MultiChannelPoller.maybe_check_subclient_healthcCs(|j|\}}|j��r$|j|�dSrE)r�r�r��handlers)rWr��chanr�r2r2r3�on_readable=s
zMultiChannelPoller.on_readablecCs:|t@r|�|�|fS|t@r6|j|\}}|�|�dSrE)rr�rr��_poll_error)rWr��eventr�r�r2r2r3�handle_eventBs
zMultiChannelPoller.handle_eventc	CsBd|_z�|jD].}|jr,|j��r,|�|�|jr|�|�q|j�	|�}|r�|D]Z\}}|�
||�}|rRWd|_|jr�z|j��}Wnt
y�Yq�Yqt0|�qtdSqR|��t��Wd|_|jr�z|j��}Wnt
y�Yq�Yq�0|�q�nDd|_|j�r<z|j��}Wnt
�y0Y�q<Yn0|��q0dS)NTF)�_in_protected_readr�r�r�r�r�r�r�r�rr�r�rZr�r�r)	rW�callbackr=rH�eventsr�r�rfZfunr2r2r3r�IsL



�

�zMultiChannelPoller.getcCs|jSrE)r�rVr2r2r3�fdsgszMultiChannelPoller.fds)N)r8r9r:r;rrr�r�r�ryr�r�r�r�r�r�r�r�r�r�r�r�r�r�r�r��propertyr�r2r2r2r3r��s.

		
r�cszeZdZdZeZdZdZdZdZdZ	dZ
dZdZdZ
iZdZdZd	Zd
ZdZdZdZeZdZdZdZdZdZd
ZeZdZ dZ!dZ"dZ#dZ$dZ%e&j'j(dZ(e)r�e)j*ndZ+e)r�e)j,ndZ-�fdd�Z.dd�Z/dd�Z0dd�Z1drdd�Z2ds�fdd�	Z3dd�Z4�fdd �Z5d!d"�Z6�fd#d$�Z7d%d&�Z8d'd(�Z9d)d*�Z:d+d,�Z;d-d.�Z<d/d0�Z=d1d2�Z>dtd4d5�Z?d6d7�Z@d8d9�ZAd:d;�ZBd<d=�ZCd>d?�ZDd@dA�ZEdBdC�ZFdDdE�ZGdudFdG�ZHdHdI�ZIdJdK�ZJdLdM�ZKdNdO�ZLdPdQ�ZM�fdRdS�ZNdTdU�ZOdVdW�ZPdvdXdY�ZQdwdZd[�ZRdxd\d]�ZSdyd^d_�ZTd`da�ZUeVdzdbdc��ZWeXddde��ZYeXdfdg��ZZe[dhdi��Z\e[djdk��Z]dldm�Z^dndo�Z_eXdpdq��Z`�ZaS){�ChannelzRedis Channel.NFTz_kombu.binding.%sz/{db}.zZunackedZ
unacked_indexZ
unacked_mutexi,ir�rwZround_robin)�sep�
ack_emulationr�r�r�r�r�r��
fanout_prefix�fanout_patternsrQ�socket_timeout�socket_connect_timeout�socket_keepalive�socket_keepalive_options�queue_order_strategy�max_connections�health_check_interval�retry_on_timeout�priority_stepscs�t�j|i|��|js tj|_d|_t|j��|_|�	�|_
|��|_t
�|_t
�|_i|_|j|jd�|_|jr�t|jt�r�|j|_nd|_z|j��Wnty�|���Yn0|jj�|�d|_|jj|_t dur�t |t!�dS)NF)rOr�rwT)"r`ryr�rr��_registeredrr��_queue_cycle�_get_client�Client�_get_response_errorr1r�r��auto_delete_queues�_fanout_to_queue�_brpop_read�_receiver�r��
isinstancerR�keyprefix_fanoutrBZping�	Exception�_disconnect_poolsrc�cycler�r"rrIrmrir2r3ry�s2



zChannel.__init__cCs|��dSrE)rrVr2r2r3rF�szChannel._after_forkcCs<|j}|j}d|_|_|dur(|��|dur8|��dSrE)�_pool�_async_pool�
disconnect)rW�pool�
async_poolr2r2r3rszChannel._disconnect_poolscCs@|j|urd|_|j|ur d|_|jr<|jjr<|jj�|�dSrE)r�r�rcrr�r�r2r2r3r�s

z!Channel._on_connection_disconnectcCs�zzz d|dd<d|ddd<Wnty4Yn0|�||�D]4}|j|dd�}|r^|jn|j|�||�t|��qBWn ty�td|dd	�Yn0dS)
NT�headersZredeliveredZ
propertiesr�F��reversezCould not restore message: %r��exc_info)	r�Z_lookup�_get_message_priority�lpushZrpush�
_q_for_prirr
�crit)rW�payloadr�r�r�r��queue�prir2r2r3r�s�
zChannel._do_restore_messagecsb�jst��|�S|j����fdd�}����}|�|�j�Wd�n1sT0YdS)NcsP|��j��}|��|��j��|rLtt|��\}}}��||||��dSrE)r�r�r�r�rrr�)r��Pr�r�r�r�r2r3r�,sz-Channel._restore.<locals>.restore_transaction)r�r`�_restorer�r�rrr�)rWrr�r�rBrir�r3r's
zChannel._restorecCs|j|dd�Sr�)r)rWrr2r2r3�_restore_at_beginning7szChannel._restore_at_beginningcsT||jvr.|j|\}}|j�|�||j|<t�j|g|�Ri|��}|��|SrE)�_fanout_queuesr�r�rr`�
basic_consume�_update_queue_cycle)rWrr]rnr�r�rfrir2r3r":s

zChannel.basic_consumecCs8|j}|r4|jjr*|jj�t|j|f��S|�|�SdSrE)rcrr�r�r�r
�
_basic_cancel)rW�consumer_tagrcr2r2r3�basic_cancelNs�zChannel.basic_cancelcs�z|j|}Wnty"YdS0z|j�|�WntyFYn0|�|�z|j|\}}|j�|�Wnty�Yn0t��	|�}|�
�|SrE)Z
_tag_to_queuer�r��remove�_unsubscribe_fromr!rrZr`r&r#)rWr%rr�r�rfrir2r3r$[s"
zChannel._basic_cancelcCs.|r|jrd�|j|d|g�Sd�|j|g�S)Nrw�/)r��joinr	)rWr�r�r2r2r3�_get_publish_topicos
zChannel._get_publish_topiccCs|j|\}}|�||�SrE)r!r+)rWrr�r�r2r2r3�_get_subscribe_topictszChannel._get_subscribe_topiccsN�fdd��jD�}|sdS�j}|jjdur8|j��|j�_|�|�dS)Ncsg|]}��|��qSr2)r,�rTrrVr2r3rXys�z&Channel._subscribe.<locals>.<listcomp>)r�r�rcr�r�r�Z
psubscribe)rW�keys�cr2rVr3r�xs
�
zChannel._subscribecCs.|�|�}|j}|jr*|jjr*|�|g�dSrE)r,r�rcr��unsubscribe)rWr�topicr/r2r2r3r(�s
zChannel._unsubscribe_fromcCs�t|d�dkr&|ddkr&d|_dSt|d�dkr\|d|d|d|df\}}}}n |dd|d|df\}}}}||||d�S)	Nrr0rNFZpmessagerr)r��patternrH�data)rZ
subscribed)rWrB�rr�r2rHr3r2r2r3�_handle_message�s& �zChannel._handle_messagecCsd|j}g}z|�|�|��Wnty0Yn0|jdur\|jjdd�r\|�|�|��q2t|�S)Nrr<)r�r��_receive_onerrcZcan_read�any)rWr/rfr2r2r3r�szChannel._receivec	Cs
d}z|��}Wn|jy,d|_�Yn0t|ttf��r|�||�}t|d��d��rt|d�}|d�r|ddkr�|�	d�\}}}zt
t|d��}Wn8ttfy�t
d|t|�dd	�d
d�t��Yn0|�dd
�d}|j�||j|�dSdS)
Nr�rrHr3rr)�.z&Cannot process event on channel %r: %sirrT)rar"r�rrY�tupler5r�endswith�	partitionrr�r��warning�reprr�splitrc�_deliverr)rWr/�responserrHr�rr�r2r2r3r6�s2
�
�zChannel._receive_onercsr�j�t�j����sdS��fdd��jD�|p4dg}�jj�_dg|�}�jr`�j�	|�}�jjj
|�dS)Ncs"g|]}�D]}��||��qqSr2)r)rTrr��queuesrWr2r3rX�s�z(Channel._brpop_start.<locals>.<listcomp>rrO)rZconsumerbr�r�rBrcr�rQr_Zsend_command)rWr=r.�command_argsr2rAr3r��s�

zChannel._brpop_startcKs�z�z|jj|jjdfi|��}Wn"|jyB|jj���Yn0|r�|\}}t|��|jd�d}|j�	|�|j�
tt|��|�Wd|_dSt
��Wd|_nd|_0dS)NrOrrT)rBrarcr"rr�rsplitr�r�rotater?rr�r)rWreZ
dest__item�dest�itemr2r2r3r�s(��
�zChannel._brpop_readcKs*|dkr|j��n|j�|jj|�dS)Nr�)r�rarBrc)rWr�rer2r2r3r��szChannel._poll_errorcCsr|���V}|jD]8}|�|�||��}|rtt|��Wd�Sqt��Wd�n1sd0YdSrE)r�r�Zrpoprrrr)rWrrBrrGr2r2r3�_get�s

 zChannel._getc	Cs�|���~}|���V}|jD]}|�|�||��}q|��}tdd�|D��Wd�Wd�S1sn0YWd�n1s�0YdS)Ncss|]}t|tj�r|VqdSrE)r�numbers�Integral)rT�sizer2r2r3�	<genexpr>�s�z Channel._size.<locals>.<genexpr>)r�rsr��llenrr��sum)rWrrBr�r�sizesr2r2r3�_size�s


z
Channel._sizecCs$|�|�}|r |�|j�|��S|SrE)�priorityr�)rWrrr2r2r3r�s
zChannel._q_for_pricCs|j}|t||�dS)Nr)r�r)rW�nZstepsr2r2r3rQ�szChannel.prioritycKsR|j|dd�}|���(}|�|�||�t|��Wd�n1sD0YdS)zDeliver message.FrN)rr�rrr)rWrrrnrrBr2r2r3�_puts
zChannel._putcKsD|���(}|�|�||�t|��Wd�n1s60YdS)zDeliver fanout message.N)r��publishr+r)rWr�rr�rnrBr2r2r3�_put_fanout	s


�zChannel._put_fanoutcKs|r|j�|�dSrE)rr�)rWrZauto_deleternr2r2r3�
_new_queueszChannel._new_queuec	Cs�|�|�jdkr&||�dd�f|j|<|���>}|�|j|f|j�|pJd|pPd|pVdg��Wd�n1sr0YdS)N�fanout�#�*rw)	Ztypeofr��replacer!r�Zsadd�keyprefix_queuer�r*)rWr�r�r2rrBr2r2r3�_queue_binds�

��zChannel._queue_bindc
	Os�|j�|�|j|�d�d���}|�|j|f|j�|p:d|p@d|pFdg��|���6}|j	D]}	|�
|�||	��}q^|��Wd�n1s�0YWd�n1s�0YdS)NrBr�rw)
rr�r�r�Zsremr[r�r*rsr��deleterr�)
rWrr�r�r2r]rnrBr�rr2r2r3�_delete!s��

zChannel._deletec	Ks�|���p}|���H}|jD]}|�|�||��}qt|���Wd�Wd�S1s`0YWd�n1s~0YdSrE)r�rsr��existsrr7r�)rWrrnrBr�rr2r2r3�
_has_queue-s



zChannel._has_queuecsh�j|}����B}|�|�}|s4gWd�S�fdd�|D�Wd�S1sZ0YdS)Ncs g|]}tt|���j���qSr2)r9rr>r�)rT�valrVr2r3rX<r~z%Channel.get_table.<locals>.<listcomp>)r[r�Zsmembers)rWr�rgrBr�r2rVr3�	get_table4s


zChannel.get_tablec	Cs�|����}|���`}|jD] }|�||�}|�|��|�}q|��}t|ddd��Wd�Wd�S1sx0YWd�n1s�0YdS)NrN)r�rsr�rrMr]r�rN)rWrrBr�rZpriqrOr2r2r3�_purge>s


zChannel._purgecs�d|_|jr,z|��Wnty*Yn0|js�|jj�|�|j�	d�}|durx|j
D]}||jvrZ|j||d�qZ|�
�|��t���dS)NTrBr�)�_closingr�rr�closedrcrr�r�r�r!rZqueue_deleter�_close_clientsr`r�)rWrBrrir2r3r�Gs 

z
Channel.closecCsNdD]D}z$|j|}|jd}|_|��Wqtt|jfyFYq0qdS)N)rBr�)r�rcrr�r�r1)rW�attrrBrcr2r2r3rf\s
zChannel._close_clientscCsft|tj�sb|r|dkrt}n|�d�r4|dd�}zt|�}Wn ty`td�|���Yn0|S)Nr)rz/Database is int between 0 and limit - 1, not {})rrIrJ�
DEFAULT_DB�
startswith�intr��format)rWZvhostr2r2r3�_prepare_virtual_hostfs
��
zChannel._prepare_virtual_hostcKs|SrEr2)rWr�r��paramsr2r2r3�_filter_tcp_connparamsuszChannel._filter_tcp_connparamsc
s�|jj}|jpd|jp|jj|j|j|j|j|j	|j
|j|j|j
|jd�}|j}t|d�r�|g}t|d�rx|t|j�7}|D]}t|jd�r|q�q||�d�|jr�z|�|j�|j|d<Wnty�Yn0|d}d|v�r|t|�\}}	}	}
}}}
|d	k�rT|jfi|��}|jtjd
|d�fi|
��|�dd�|�d
d�|�dd�|
|d<||d<|�dd�|�dd�|�|�dd��|d<|�|�d��p�|j}|�r�G�fdd�d|�}|}||d<|S)Nz	127.0.0.1)�host�port�virtual_host�username�passwordr�r�r�r�r�r�r�ry�	__bases__r��connection_classroz://r*r))ru�pathr�r�r�rrrsrprq�dbcseZdZ��fdd�Z�ZS)z'Channel._connparams.<locals>.Connectioncs t�j|��jr��|�dSrE)r`rr�r�)rWr])rjrHr2r3r�sz2Channel._connparams.<locals>.Connection.disconnect)r8r9r:rrtr2rGrir3�
Connection�srx) rcrB�hostnamerp�default_portrqZuseridrsr�r�r�r�r�r�r�rur'rYrtrryrZ�ssl�update�connection_class_sslr�rrnrZUnixDomainSocketConnectionrlr�)rW�asynchronousZconninfo�
connparamsZ
conn_class�classes�klassro�schemer�rrrsrv�queryZconnection_clsrxr2rGr3�_connparamsysv�




��
��zChannel._connparamscCs |r|j|jd�S|j|jd�S)N)rq)rrr)rWr~r2r2r3�_create_client�szChannel._create_clientcCs0|j|d�}|jj|dd�|_tjfi|��S�N�r~rw)rw)r�r	rkr�ConnectionPool�rWr~rmr2r2r3�	_get_pool�szChannel._get_poolcCs4tjdkrtd�t���|jr.tjt|jd�StjS)N)rrNrzSRedis transport requires redis-py versions 3.2.0 or later. You have {0.__version__}ro)	rr�rrkrQ�	functools�partialrurxrVr2r2r3r�s
���zChannel._get_clientccs|r|Vn
|��VdSrE�r��rWrBr2r2r3r��szChannel.conn_or_acquirecCs|jdur|��|_|jSrE)r
r�rVr2r2r3r�s

zChannel.poolcCs|jdur|jdd�|_|jS)NTr�)rr�rVr2r2r3r�s
zChannel.async_poolcCs|jdd�S)z+Client used to publish messages, BRPOP etc.Tr�r�rVr2r2r3rB�szChannel.clientcCs|jdd�}|��S)z1Pub/Sub connection used to consume fanout queues.Tr�)r�r{r�r2r2r3r��szChannel.subclientcCs|j�|j�dSrE)rr|r�rVr2r2r3r#szChannel._update_queue_cyclecCsddlm}|jS)Nrr$)rr%r1)rWr%r2r2r3rszChannel._get_response_errorcs�fdd��jD�S)z<Set of queues being consumed from (excluding fanout queues).csh|]}|�jvr|�qSr2)r�r-rVr2r3�	<setcomp>
s
�z(Channel.active_queues.<locals>.<setcomp>)Z_active_queuesrVr2rVr3r�szChannel.active_queues)F)F)r)F)NN)F)F)F)N)br8r9r:r;r�Z_clientZ
_subclientrdZsupports_fanoutr[r	r�r�r�r!r�r�r�r�r�r�r��PRIORITY_STEPSr�r�r�r�r�r�r��DEFAULT_HEALTH_CHECK_INTERVALr�r�r�rQr�rr
rr��from_transport_optionsrrxru�
SSLConnectionr}ryrFrr�r�rr r"r&r$r+r,r�r(r5rr6r�rr�rHrPrrQrSrUrVr\r^r`rbrcr�rfrlrnr�r�r�rrr�r�rrrrBr�r#rr�rtr2r2rir3r�ls���(	�


	

	
�

Q





r�csteZdZdZeZdZeZdZdZ	e
jjj
degd��d�ZerHe�\ZZ�fdd�Zd	d
�Zdd�Zd
d�Z�ZS)r)zRedis Transport.NrT)�directr1rW)r~Z
exchange_typecs.tdurtd��t�j|i|��t�|_dS)Nz)Missing redis library (pip install redis))r�ImportErrorr`ryr�rrmrir2r3ry szTransport.__init__cCstjSrE)r�__version__rVr2r2r3�driver_version(szTransport.driver_versioncs�|j����j��j��j�|j����fdd�}|�_����fdd���j�����	d�j
�|jj�
dt�}��	|�j�dS)Ncs@|jr��|j��jr<z�j���Wnty:Yn0dSrE)r�r'r��on_tickr�)rc)r�loopr�r2r3�_on_disconnect2sz:Transport.register_with_event_loop.<locals>._on_disconnectcs ����fdd��jD�dS)Ncsg|]}�|�|��qSr2r2)rTr�)�
add_readerr�r2r3rXAr~zMTransport.register_with_event_loop.<locals>.on_poll_start.<locals>.<listcomp>)r�r2)r�r�cycle_poll_startr�r2r3r�?sz9Transport.register_with_event_loop.<locals>.on_poll_startr�r�)rr�r�r�r�r�r�r�r�Zcall_repeatedlyr�rBZtransport_optionsr�r�r�)rWrcr�r�r�r2)r�rr�r�r�r�r3�register_with_event_loop+s$��z"Transport.register_with_event_loopcCs|j�|�dS)z1Handle AIO event for one of our file descriptors.N)rr�)rWr�r2r2r3r�MszTransport.on_readable)r8r9r:r;r�Zpolling_interval�DEFAULT_PORTrzZdriver_typeZdriver_namerr)Z
implements�extend�	frozensetrr4r"r#ryr�r�r�rtr2r2rir3r)s 
�
"r)c@seZdZdZdS)�SentinelManagedSSLConnectionz�Connect to a Redis server using Sentinel + TLS.

        Use Sentinel to identify which Redis server is the current master
        to connect to and when connecting to the Master server, use an
        SSL Connection.
        Nr7r2r2r2r3r�Ssr�c@sHeZdZdZejdZer ejndZer,e	ndZ
d	dd�Zd
dd�ZdS)�SentinelChannelaChannel with explicit Redis Sentinel knowledge.

    Broker url is supposed to look like:

    .. code-block::

        sentinel://0.0.0.0:26379;sentinel://0.0.0.0:26380/...

    where each sentinel is separated by a `;`.

    Other arguments for the sentinel should come from the transport options
    (see `transport_options` of :class:`~kombu.connection.Connection`).

    You must provide at least one option in Transport options:
     * `master_name` - name of the redis group to poll

    Example:
    -------
    .. code-block:: python

        >>> import kombu
        >>> c = kombu.Connection(
             'sentinel://sentinel1:26379;sentinel://sentinel2:26379',
             transport_options={'master_name': 'mymaster'}
        )
        >>> c.connect()
    )�master_name�min_other_sentinels�sentinel_kwargsNFc	Cs�|�|�}|��}|�dd�|�dd�g}|jjjD]4}t|�}|jdkr8|jpZ|jj	}|�
|j|f�q8|s�|�
|d|df�tj
|ft|dd�t|dd�d�|��}t|dd�}|dur�td	��|�|tj�jS)
Nrorprr�rr�)r�r�r�z1'master_name' transport option must be specified.)r��copyrZrcrBZaltrr�rprzr�ryrZSentinelr�r�Z
master_forrrxrq)	rWr~rZadditional_paramsZ	sentinels�urlrpZ
sentinel_instr�r2r2r3�_sentinel_managed_pool�s:

�

����z&SentinelChannel._sentinel_managed_poolcCs*|j|d�}|jj|dd�|_|�|�Sr�)r�r	rkr�r�r2r2r3r��szSentinelChannel._get_pool)F)F)
r8r9r:r;r�r�r�SentinelManagedConnectionrur�r}r�r�r2r2r2r3r�`s

%r�c@seZdZdZdZeZdS)�SentinelTransportzRedis Sentinel Transport.igN)r8r9r:r;rzr�r�r2r2r2r3r��sr�)Rr;�
__future__rr�rIr*r�collectionsr�
contextlibr�importlib.metadatarrrrZpackaging.versionr	Zviner
Zkombu.exceptionsrrZ	kombu.logr
Zkombu.utils.compatrZkombu.utils.encodingrZkombu.utils.eventiorrrZkombu.utils.functionalrZkombu.utils.jsonrrZkombu.utils.objectsrZkombu.utils.schedulingrZkombu.utils.urlrrwrrr�r�r�logger�criticalr<rr�rhr�r�r!r4r5r
r6rDrIrJrxrurBr|rpZPubSubrzr�r�r�r)r�r�r�r�r�r2r2r2r3�<module>s�5



S
4k#'D
�
P